Documentation ¶
Overview ¶
Example (SimpleStream) ¶
ctx, killAll := context.WithCancel(context.Background()) defer killAll() logger := newLogger() ctrl := gomockCtrl(logger) batchCount := 10 batchSize := 1 dlq := NewDLQ( "dlq", noopDLQDestination(ctrl), logger, noop.Timer{}, noop.Histogram{}, 1, 0, ) srcTask := NewSourceTask( "generator", generatorSource(ctrl, logger, "generator", batchSize, batchCount), logger, noop.Timer{}, noop.Histogram{}, ) destTask := NewDestinationTask( "printer", printerDestination(ctrl, logger, "printer", batchSize), logger, noop.Timer{}, noop.Histogram{}, ) w, err := NewWorker( []Task{srcTask, destTask}, [][]int{{1}, {}}, dlq, logger, noop.Timer{}, ) if err != nil { panic(err) } err = w.Open(ctx) if err != nil { panic(err) } var wg csync.WaitGroup wg.Add(1) go func() { defer wg.Done() err := w.Do(ctx) if err != nil { panic(err) } }() // stop node after 150ms, which should be enough to process the 10 messages time.AfterFunc(150*time.Millisecond, func() { _ = w.Stop(ctx) }) if err := wg.WaitTimeout(ctx, time.Second); err != nil { killAll() } else { logger.Info(ctx).Msg("finished successfully") } err = w.Close(ctx) if err != nil { panic(err) }
Output: DBG opening source component=task:source connector_id=generator DBG source open component=task:source connector_id=generator DBG opening destination component=task:destination connector_id=printer DBG destination open component=task:destination connector_id=printer DBG opening destination component=task:destination connector_id=dlq DBG destination open component=task:destination connector_id=dlq DBG got record node_id=printer position=generator/0 DBG received ack node_id=generator DBG got record node_id=printer position=generator/1 DBG received ack node_id=generator DBG got record node_id=printer position=generator/2 DBG received ack node_id=generator DBG got record node_id=printer position=generator/3 DBG received ack node_id=generator DBG got record node_id=printer position=generator/4 DBG received ack node_id=generator DBG got record node_id=printer position=generator/5 DBG received ack node_id=generator DBG got record node_id=printer position=generator/6 DBG received ack node_id=generator DBG got record node_id=printer position=generator/7 DBG received ack node_id=generator DBG got record node_id=printer position=generator/8 DBG received ack node_id=generator DBG got record node_id=printer position=generator/9 DBG received ack node_id=generator INF finished successfully
Index ¶
- type Batch
- type DLQ
- type Destination
- type DestinationTask
- type Order
- type Processor
- type ProcessorTask
- type RecordFlag
- type RecordStatus
- type Source
- type SourceTask
- type Task
- type Worker
- func (w *Worker) Ack(ctx context.Context, batch *Batch) error
- func (w *Worker) Close(ctx context.Context) error
- func (w *Worker) Do(ctx context.Context) error
- func (w *Worker) Nack(ctx context.Context, batch *Batch, taskID string) error
- func (w *Worker) Open(ctx context.Context) (err error)
- func (w *Worker) Stop(ctx context.Context) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch represents a batch of records that are processed together. It keeps track of the status of each record in the batch, and provides methods to update the status of records.
func (*Batch) ActiveRecords ¶
ActiveRecords returns the records that are not filtered.
func (*Batch) Filter ¶
Filter marks the record at index i as filtered out. If a second index is provided, all records between i and j are marked as filtered. If multiple indices are provided, the method panics.
func (*Batch) Nack ¶
Nack marks the record at index i as nacked. If multiple errors are provided, they are assigned to the records starting at index i.
type DLQ ¶
type DLQ struct {
// contains filtered or unexported fields
}
type Destination ¶
type Destination interface { ID() string Open(context.Context) error Write(context.Context, []opencdc.Record) error Ack(context.Context) ([]connector.DestinationAck, error) Teardown(context.Context) error // TODO figure out if we want to handle these errors. This returns errors // coming from the persister, which persists the connector asynchronously. // Are we even interested in these errors in the pipeline? Sounds like // something we could surface and handle globally in the runtime instead. Errors() <-chan error }
type DestinationTask ¶
type DestinationTask struct {
// contains filtered or unexported fields
}
func NewDestinationTask ¶
func NewDestinationTask( id string, destination Destination, logger log.CtxLogger, timer metrics.Timer, histogram metrics.Histogram, ) *DestinationTask
func (*DestinationTask) ID ¶
func (t *DestinationTask) ID() string
type Order ¶
type Order [][]int
Order represents the order of tasks in a pipeline. Each index in the slice represents a task, and the value at that index is a slice of indices of the next tasks to be executed. If the slice is empty, the task is the last one in the pipeline.
func (Order) AppendOrder ¶
AppendOrder appends the next order to the current order. The next order indices are adjusted to match the new order length.
func (Order) AppendSingle ¶
AppendSingle appends a single element to the current order.
type Processor ¶
type Processor interface { // Open configures and opens a processor plugin Open(ctx context.Context) error Process(context.Context, []opencdc.Record) []sdk.ProcessedRecord // Teardown tears down a processor plugin. // In case of standalone plugins, that means stopping the WASM module. Teardown(context.Context) error }
type ProcessorTask ¶
type ProcessorTask struct {
// contains filtered or unexported fields
}
func NewProcessorTask ¶
func (*ProcessorTask) ID ¶
func (t *ProcessorTask) ID() string
type RecordFlag ¶
type RecordFlag int
const ( RecordFlagAck RecordFlag = iota // ack RecordFlagNack // nack RecordFlagRetry // retry RecordFlagFilter // filter )
func (RecordFlag) String ¶
func (i RecordFlag) String() string
type RecordStatus ¶
type RecordStatus struct { Flag RecordFlag Error error }
RecordStatus holds the status of a record in a batch. The flag indicates the status of the record, and the error is set if the record was nacked.
type Source ¶
type Source interface { ID() string Open(context.Context) error Read(context.Context) ([]opencdc.Record, error) Ack(context.Context, []opencdc.Position) error Teardown(context.Context) error // TODO figure out if we want to handle these errors. This returns errors // coming from the persister, which persists the connector asynchronously. // Are we even interested in these errors in the pipeline? Sounds like // something we could surface and handle globally in the runtime instead. Errors() <-chan error }
type SourceTask ¶
type SourceTask struct {
// contains filtered or unexported fields
}
func NewSourceTask ¶
func (*SourceTask) GetSource ¶
func (t *SourceTask) GetSource() Source
func (*SourceTask) ID ¶
func (t *SourceTask) ID() string
type Task ¶
type Task interface { // ID returns the identifier of this Task. Each Task in a pipeline must be // uniquely identified by the ID. ID() string // Open opens the Task for processing. It is called once before the worker // starts processing records. Open(context.Context) error // Close closes the Task. It is called once after the worker has stopped // processing records. Close(context.Context) error // Do processes the given batch of records. It is called for each batch of // records that the worker processes. Do(context.Context, *Batch) error }
Task is a unit of work that can be executed by a Worker. Each Task in a pipeline is executed sequentially, except for tasks related to different destinations, which can be executed in parallel.
type Worker ¶
type Worker struct { Source Source Tasks []Task // Order defines the next task to be executed. Multiple indices are used to // show parallel execution of tasks. // // Example: // [[1], [2], [3,5], [4], [], []] // // /-> 3 -> 4 // 0 -> 1 -> 2 // \-> 5 Order Order DLQ *DLQ // contains filtered or unexported fields }
Worker collects the tasks that need to be executed in a pipeline for a specific source. It processes records from the source through the tasks until it is stopped. The worker is responsible for coordinating tasks and acking/nacking records.
Batches are processed in the following way:
- The first task is always a source task which reads a batch of records from the source. The batch is then passed to the next task.
- Any task between the source and the destination can process the batch by updating the records or their status (see RecordStatus). If a record in the batch is marked as filtered, the next task will skip processing it and consider it as already processed. If a record is marked as nacked, the record will be sent to the DLQ. If a record is marked as retry, the record will be reprocessed by the same task (relevant if a task processed only part of the batch, experienced an error and skipped the rest).
- The last task is always a destination task which writes the batch of records to the destination. The batch is then acked.
Note that if a task marks a record in the middle of a batch as nacked, the batch is split into sub-batches. The records that were successfully processed continue to the next task (and ideally to the end of the pipeline), because Conduit provides ordering guarantees. Only once the records before the nacked record are end-to-end processed, will the nacked record be sent to the DLQ. The rest of the records are processed as a sub-batch, and the same rules apply to them.
func (*Worker) Do ¶
Do processes records from the source until the worker is stopped. It returns no error if the worker is stopped gracefully.