Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrEmptyQueue = fmt.Errorf("queue is empty")
Functions ¶
This section is empty.
Types ¶
type DedupeBuffer ¶
type DedupeBuffer struct {
// contains filtered or unexported fields
}
DedupeBuffer buffer implements the syncer callbacks API on its input, and calls another syncer callback on its output. In-between it maintains an in-order queue of KV updates and a tracking map to record what is in the queue.
If an update comes in for a key that already has a value in the queue then the value in the queue is replaced with the updated KV.
This can cause reordering between different resources (which is allowed by the syncer API) but it ensures that the amount of KVs in flight is bounded to the total size of the datastore even under substantial overload. The effect is that the client will periodically "skip ahead" to the more recent state of the datastore without seeing intermediate states.
func New ¶
func New() *DedupeBuffer
func (*DedupeBuffer) OnStatusUpdated ¶
func (d *DedupeBuffer) OnStatusUpdated(status api.SyncStatus)
OnStatusUpdated queues a status update to be sent to the sink.
func (*DedupeBuffer) OnUpdates ¶
func (d *DedupeBuffer) OnUpdates(updates []api.Update)
OnUpdates adds a slice of updates to the buffer and does housekeeping to deduplicate in-flight updates to the same keys. It should only block for short periods even if the downstream sink blocks for a long time.
func (*DedupeBuffer) OnUpdatesKeysKnown ¶
func (d *DedupeBuffer) OnUpdatesKeysKnown(updates []api.Update, keys []string)
OnUpdatesKeysKnown is like OnUpdates, but it allows for the pre-serialised keys of the KV pairs to be passed in. If an entry in keys is "" or if keys is shorter than updates the key will be computed.
The updates and keys slices are not retained.
func (*DedupeBuffer) SendToSinkForever ¶
func (d *DedupeBuffer) SendToSinkForever(sink api.SyncerCallbacks)
func (*DedupeBuffer) Stop ¶
func (d *DedupeBuffer) Stop()