Documentation ¶
Overview ¶
Suite of components that can be composed to make sinks. By extracting complex, non-sink specific behaviour, we minimise the effort involved in implementing a new sink while ensuring each sink has consistent semantics.
Index ¶
Constants ¶
const ( SchemaHandlerFailed SchemaHandlerOutcome = "failed" // configuration failed SchemaHandlerNoop = "noop" // nothing was changed, no action required SchemaHandlerUpdate = "update" // sink was updated, the returned inserter takes precedence )
Variables ¶
var EmptyInsertResult = NewInsertResult().Resolve(0, nil, nil)
EmptyInsertResult represents an insertion that did no work. It can be used as the base element when recursively folding insertions together.
var SinkBuilder = sinkBuilderFunc(func(opts ...func(*sink)) Sink { s := &sink{ builders: []func(AsyncInserter) AsyncInserter{}, router: NewRouter(), } for _, opt := range opts { opt(s) } return s })
SinkBuilder allows sink implementations to easily compose the sink-specific implementations into a generic sink implementation that fulfils the Sink contract.
Functions ¶
func NewInsertResult ¶
func NewInsertResult() *insertResult
Types ¶
type AckCallback ¶
AckCallback will acknowledge successful publication of up-to this message. It is not guaranteed to be called for any intermediate messages.
type AsyncInserter ¶
type AsyncInserter interface { // Insert has the same signature as the synchronous Inserter, but returns an // InsertResult that will be fulfilled at some later time. Insert(context.Context, []*changelog.Modification) InsertResult // Flush is called to force a write of buffered chanelog modifications to the underlying // inserter. The returned result resolves with the sum of rows inserted, and the highest // LSN successfully written, if any that were flushed had an associated LSN. Flush(context.Context) InsertResult }
func NewAsyncInserter ¶
func NewAsyncInserter(i Inserter) AsyncInserter
NewAsyncInserter converts a synchronous inserter to the async contract
func NewBufferedInserter ¶
func NewBufferedInserter(i AsyncInserter, bufferSize int) AsyncInserter
NewBufferedInserter wraps any async inserter with a buffer. When an insertion overflows the buffer, the insert is triggered is passed to the underlying inserter. Calling Flush will also push buffered inserts into the underlying system.
type InsertResult ¶
type InsertResult interface { Get(context.Context) (count int, lsn *uint64, err error) Fold(InsertResult) InsertResult }
InsertResult is a promise interface for async insert operations. Get() will block until a value is fulfilled, while Fold() will merge the result of another InsertResult, taking the higher of the two lsns (if provided)
type Inserter ¶
type Inserter interface { // Insert receives changelog modifications to insert into a table. It returns the count // of rows inserted, and the highest non-nil LSN from the batch of modification // confirmed to be written to the sink Insert(context.Context, []*changelog.Modification) (count int, lsn *uint64, err error) }
Inserter provides a synchronous interface around inserting data into a sink
func NewInstrumentedInserter ¶
NewInstrumentedInserter wraps an existing synchronous inserter, causing every insert to be logged, capture batch size and duration in metrics, and create new spans.
type InserterFunc ¶
type InserterFunc func(context.Context, []*changelog.Modification) (count int, lsn *uint64, err error)
InserterFunc can create an Inserter from an anonymous function.
func (InserterFunc) Insert ¶
func (i InserterFunc) Insert(ctx context.Context, modifications []*changelog.Modification) (count int, lsn *uint64, err error)
type MemoryInserter ¶
MemoryInserter is a reference implementation of an inserter, storing modifications in an in-memory buffer. It satisfies all requirements of an inserter, including race-safety.
Beyond offering a useful reference implementation, this can be used for testing generic inserter logic without being coupled to an actual backend.
func NewMemoryInserter ¶
func NewMemoryInserter() *MemoryInserter
func (*MemoryInserter) Batches ¶
func (i *MemoryInserter) Batches() [][]*changelog.Modification
func (*MemoryInserter) Insert ¶
func (i *MemoryInserter) Insert(ctx context.Context, modifications []*changelog.Modification) (count int, lsn *uint64, err error)
func (*MemoryInserter) Store ¶
func (i *MemoryInserter) Store() []*changelog.Modification
type Route ¶
type Route string
Route is a string representing a routing key for incoming modifications. Inserters are associated with these routes.
type Router ¶
type Router interface { // Register notifies the router that all subsequent insertions for the given namespace // should be routed to a new inserter. Register returns an InsertResult from flushing // any inserters that were previously routed via this namespace. Any previous inserters // flush is added to the routers in-flight result, preserving the semantics of flush to // ensure we appropriately handle failed flushes. Register(context.Context, Route, AsyncInserter) InsertResult // Otherwise, a router looks exactly like a normal AsyncInserter. It should be // transparent that each insert is routed to other inserters, and it should be possible // to compose this with other inserter constructs. AsyncInserter }
type SchemaHandler ¶
type SchemaHandler interface {
Handle(context.Context, *changelog.Schema) (Inserter, SchemaHandlerOutcome, error)
}
SchemaHandler responds to new schemas by idempotently updating and configuring the Sink to receive corresponding modifications. It returns an Inserter that can be used to handle modification associated with the given schema.
func SchemaHandlerCacheOnFingerprint ¶
func SchemaHandlerCacheOnFingerprint(handler SchemaHandler) SchemaHandler
SchemaHandlerCacheOnFingerprint caches schema handler responses on the fingerprint of the received schema. This means any subsequent identical schemas are provided the old, cached version of the previous handler call.
func SchemaHandlerGlobalInserter ¶
func SchemaHandlerGlobalInserter(inserter Inserter, schemaHandler func(context.Context, *changelog.Schema) error) SchemaHandler
SchemaHandlerGlobalInserter is used to register a single global inserter for all modifications to this sink, along with a handler function that is used to respond to new schemas but is not expected to return a modified inserter.
type SchemaHandlerFunc ¶
type SchemaHandlerFunc func(context.Context, *changelog.Schema) (Inserter, SchemaHandlerOutcome, error)
SchemaHandlerFunc is shorthand for creating a handler from a function
func (SchemaHandlerFunc) Handle ¶
func (s SchemaHandlerFunc) Handle(ctx context.Context, schema *changelog.Schema) (Inserter, SchemaHandlerOutcome, error)
type SchemaHandlerOutcome ¶
type SchemaHandlerOutcome string
type Sink ¶
Sink is a generic sink destination for a changelog. It will consume entries until either an error, or the entries run out.
If the process producing the changelog is long-running, then the AckCallback is used to acknowledge successfully writes into the sync. If you to wait for all writes to be completely processed to the sync, then wait for Consume to return.