dedupebuffer

package
v0.0.0-...-8f3027a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2024 License: Apache-2.0, Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyQueue = fmt.Errorf("queue is empty")

Functions

This section is empty.

Types

type DedupeBuffer

type DedupeBuffer struct {
	// contains filtered or unexported fields
}

DedupeBuffer buffer implements the syncer callbacks API on its input, and calls another syncer callback on its output. In-between it maintains an in-order queue of KV updates and a tracking map to record what is in the queue.

If an update comes in for a key that already has a value in the queue then the value in the queue is replaced with the updated KV.

This can cause reordering between different resources (which is allowed by the syncer API) but it ensures that the amount of KVs in flight is bounded to the total size of the datastore even under substantial overload. The effect is that the client will periodically "skip ahead" to the more recent state of the datastore without seeing intermediate states.

func New

func New() *DedupeBuffer

func (*DedupeBuffer) OnStatusUpdated

func (d *DedupeBuffer) OnStatusUpdated(status api.SyncStatus)

OnStatusUpdated queues a status update to be sent to the sink.

func (*DedupeBuffer) OnUpdates

func (d *DedupeBuffer) OnUpdates(updates []api.Update)

OnUpdates adds a slice of updates to the buffer and does housekeeping to deduplicate in-flight updates to the same keys. It should only block for short periods even if the downstream sink blocks for a long time.

func (*DedupeBuffer) OnUpdatesKeysKnown

func (d *DedupeBuffer) OnUpdatesKeysKnown(updates []api.Update, keys []string)

OnUpdatesKeysKnown is like OnUpdates, but it allows for the pre-serialised keys of the KV pairs to be passed in. If an entry in keys is "" or if keys is shorter than updates the key will be computed.

The updates and keys slices are not retained.

func (*DedupeBuffer) SendToSinkForever

func (d *DedupeBuffer) SendToSinkForever(sink api.SyncerCallbacks)

func (*DedupeBuffer) Stop

func (d *DedupeBuffer) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL