funnel

package
v0.13.0-nightly.20250115 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Example (SimpleStream)
ctx, killAll := context.WithCancel(context.Background())
defer killAll()

logger := newLogger()
ctrl := gomockCtrl(logger)

batchCount := 10
batchSize := 1

dlq := NewDLQ(
	"dlq",
	noopDLQDestination(ctrl),
	logger,
	noop.Timer{},
	noop.Histogram{},
	1,
	0,
)
srcTask := NewSourceTask(
	"generator",
	generatorSource(ctrl, logger, "generator", batchSize, batchCount),
	logger,
	noop.Timer{},
	noop.Histogram{},
)
destTask := NewDestinationTask(
	"printer",
	printerDestination(ctrl, logger, "printer", batchSize),
	logger,
	noop.Timer{},
	noop.Histogram{},
)

w, err := NewWorker(
	[]Task{srcTask, destTask},
	[][]int{{1}, {}},
	dlq,
	logger,
	noop.Timer{},
)
if err != nil {
	panic(err)
}

err = w.Open(ctx)
if err != nil {
	panic(err)
}

var wg csync.WaitGroup
wg.Add(1)
go func() {
	defer wg.Done()
	err := w.Do(ctx)
	if err != nil {
		panic(err)
	}
}()

// stop node after 150ms, which should be enough to process the 10 messages
time.AfterFunc(150*time.Millisecond, func() { _ = w.Stop(ctx) })

if err := wg.WaitTimeout(ctx, time.Second); err != nil {
	killAll()
} else {
	logger.Info(ctx).Msg("finished successfully")
}

err = w.Close(ctx)
if err != nil {
	panic(err)
}
Output:

DBG opening source component=task:source connector_id=generator
DBG source open component=task:source connector_id=generator
DBG opening destination component=task:destination connector_id=printer
DBG destination open component=task:destination connector_id=printer
DBG opening destination component=task:destination connector_id=dlq
DBG destination open component=task:destination connector_id=dlq
DBG got record node_id=printer position=generator/0
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/1
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/2
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/3
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/4
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/5
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/6
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/7
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/8
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/9
DBG received ack node_id=generator
INF finished successfully

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch represents a batch of records that are processed together. It keeps track of the status of each record in the batch, and provides methods to update the status of records.

func NewBatch

func NewBatch(records []opencdc.Record) *Batch

func (*Batch) ActiveRecords

func (b *Batch) ActiveRecords() []opencdc.Record

ActiveRecords returns the records that are not filtered.

func (*Batch) Filter

func (b *Batch) Filter(i int, j ...int)

Filter marks the record at index i as filtered out. If a second index is provided, all records between i and j are marked as filtered. If multiple indices are provided, the method panics.

func (*Batch) Nack

func (b *Batch) Nack(i int, errs ...error)

Nack marks the record at index i as nacked. If multiple errors are provided, they are assigned to the records starting at index i.

func (*Batch) Retry

func (b *Batch) Retry(i int, j ...int)

Retry marks the record at index i to be retried. If a second index is provided, all records between i and j are marked as acked. If multiple indices are provided, the method panics.

func (*Batch) SetRecords

func (b *Batch) SetRecords(i int, recs []opencdc.Record)

SetRecords replaces the records in the batch starting at index i with the provided records.

type DLQ

type DLQ struct {
	// contains filtered or unexported fields
}

func NewDLQ

func NewDLQ(
	id string,
	destination Destination,
	logger log.CtxLogger,
	timer metrics.Timer,
	histogram metrics.Histogram,

	windowSize int,
	windowNackThreshold int,
) *DLQ

func (*DLQ) Ack

func (d *DLQ) Ack(_ context.Context, batch *Batch)

func (*DLQ) Close

func (d *DLQ) Close(ctx context.Context) error

func (*DLQ) ID

func (d *DLQ) ID() string

func (*DLQ) Nack

func (d *DLQ) Nack(ctx context.Context, batch *Batch, taskID string) (int, error)

func (*DLQ) Open

func (d *DLQ) Open(ctx context.Context) error

type Destination

type Destination interface {
	ID() string
	Open(context.Context) error
	Write(context.Context, []opencdc.Record) error
	Ack(context.Context) ([]connector.DestinationAck, error)
	Teardown(context.Context) error
	// TODO figure out if we want to handle these errors. This returns errors
	//  coming from the persister, which persists the connector asynchronously.
	//  Are we even interested in these errors in the pipeline? Sounds like
	//  something we could surface and handle globally in the runtime instead.
	Errors() <-chan error
}

type DestinationTask

type DestinationTask struct {
	// contains filtered or unexported fields
}

func NewDestinationTask

func NewDestinationTask(
	id string,
	destination Destination,
	logger log.CtxLogger,
	timer metrics.Timer,
	histogram metrics.Histogram,
) *DestinationTask

func (*DestinationTask) Close

func (t *DestinationTask) Close(ctx context.Context) error

func (*DestinationTask) Do

func (t *DestinationTask) Do(ctx context.Context, batch *Batch) error

func (*DestinationTask) ID

func (t *DestinationTask) ID() string

func (*DestinationTask) Open

func (t *DestinationTask) Open(ctx context.Context) error

type Order

type Order [][]int

Order represents the order of tasks in a pipeline. Each index in the slice represents a task, and the value at that index is a slice of indices of the next tasks to be executed. If the slice is empty, the task is the last one in the pipeline.

func (Order) AppendOrder

func (o Order) AppendOrder(next Order) Order

AppendOrder appends the next order to the current order. The next order indices are adjusted to match the new order length.

func (Order) AppendSingle

func (o Order) AppendSingle(next []int) Order

AppendSingle appends a single element to the current order.

func (Order) Increase

func (o Order) Increase(incr int) Order

Increase increases all indices in the order by the given increment.

type Processor

type Processor interface {
	// Open configures and opens a processor plugin
	Open(ctx context.Context) error
	Process(context.Context, []opencdc.Record) []sdk.ProcessedRecord
	// Teardown tears down a processor plugin.
	// In case of standalone plugins, that means stopping the WASM module.
	Teardown(context.Context) error
}

type ProcessorTask

type ProcessorTask struct {
	// contains filtered or unexported fields
}

func NewProcessorTask

func NewProcessorTask(
	id string,
	processor Processor,
	logger log.CtxLogger,
	timer metrics.Timer,
) *ProcessorTask

func (*ProcessorTask) Close

func (t *ProcessorTask) Close(ctx context.Context) error

func (*ProcessorTask) Do

func (t *ProcessorTask) Do(ctx context.Context, b *Batch) error

func (*ProcessorTask) ID

func (t *ProcessorTask) ID() string

func (*ProcessorTask) Open

func (t *ProcessorTask) Open(ctx context.Context) error

type RecordFlag

type RecordFlag int
const (
	RecordFlagAck    RecordFlag = iota // ack
	RecordFlagNack                     // nack
	RecordFlagRetry                    // retry
	RecordFlagFilter                   // filter
)

func (RecordFlag) String

func (i RecordFlag) String() string

type RecordStatus

type RecordStatus struct {
	Flag  RecordFlag
	Error error
}

RecordStatus holds the status of a record in a batch. The flag indicates the status of the record, and the error is set if the record was nacked.

type Source

type Source interface {
	ID() string
	Open(context.Context) error
	Read(context.Context) ([]opencdc.Record, error)
	Ack(context.Context, []opencdc.Position) error
	Teardown(context.Context) error
	// TODO figure out if we want to handle these errors. This returns errors
	//  coming from the persister, which persists the connector asynchronously.
	//  Are we even interested in these errors in the pipeline? Sounds like
	//  something we could surface and handle globally in the runtime instead.
	Errors() <-chan error
}

type SourceTask

type SourceTask struct {
	// contains filtered or unexported fields
}

func NewSourceTask

func NewSourceTask(
	id string,
	source Source,
	logger log.CtxLogger,
	timer metrics.Timer,
	histogram metrics.Histogram,
) *SourceTask

func (*SourceTask) Close

func (t *SourceTask) Close(context.Context) error

func (*SourceTask) Do

func (t *SourceTask) Do(ctx context.Context, b *Batch) error

func (*SourceTask) GetSource

func (t *SourceTask) GetSource() Source

func (*SourceTask) ID

func (t *SourceTask) ID() string

func (*SourceTask) Open

func (t *SourceTask) Open(ctx context.Context) error

type Task

type Task interface {
	// ID returns the identifier of this Task. Each Task in a pipeline must be
	// uniquely identified by the ID.
	ID() string

	// Open opens the Task for processing. It is called once before the worker
	// starts processing records.
	Open(context.Context) error
	// Close closes the Task. It is called once after the worker has stopped
	// processing records.
	Close(context.Context) error
	// Do processes the given batch of records. It is called for each batch of
	// records that the worker processes.
	Do(context.Context, *Batch) error
}

Task is a unit of work that can be executed by a Worker. Each Task in a pipeline is executed sequentially, except for tasks related to different destinations, which can be executed in parallel.

type Worker

type Worker struct {
	Source Source
	Tasks  []Task
	// Order defines the next task to be executed. Multiple indices are used to
	// show parallel execution of tasks.
	//
	// Example:
	// [[1], [2], [3,5], [4], [], []]
	//
	//            /-> 3 -> 4
	// 0 -> 1 -> 2
	//            \-> 5
	Order Order
	DLQ   *DLQ
	// contains filtered or unexported fields
}

Worker collects the tasks that need to be executed in a pipeline for a specific source. It processes records from the source through the tasks until it is stopped. The worker is responsible for coordinating tasks and acking/nacking records.

Batches are processed in the following way:

  • The first task is always a source task which reads a batch of records from the source. The batch is then passed to the next task.
  • Any task between the source and the destination can process the batch by updating the records or their status (see RecordStatus). If a record in the batch is marked as filtered, the next task will skip processing it and consider it as already processed. If a record is marked as nacked, the record will be sent to the DLQ. If a record is marked as retry, the record will be reprocessed by the same task (relevant if a task processed only part of the batch, experienced an error and skipped the rest).
  • The last task is always a destination task which writes the batch of records to the destination. The batch is then acked.

Note that if a task marks a record in the middle of a batch as nacked, the batch is split into sub-batches. The records that were successfully processed continue to the next task (and ideally to the end of the pipeline), because Conduit provides ordering guarantees. Only once the records before the nacked record are end-to-end processed, will the nacked record be sent to the DLQ. The rest of the records are processed as a sub-batch, and the same rules apply to them.

func NewWorker

func NewWorker(
	tasks []Task,
	order Order,
	dlq *DLQ,
	logger log.CtxLogger,
	timer metrics.Timer,
) (*Worker, error)

func (*Worker) Ack

func (w *Worker) Ack(ctx context.Context, batch *Batch) error

func (*Worker) Close

func (w *Worker) Close(ctx context.Context) error

func (*Worker) Do

func (w *Worker) Do(ctx context.Context) error

Do processes records from the source until the worker is stopped. It returns no error if the worker is stopped gracefully.

func (*Worker) Nack

func (w *Worker) Nack(ctx context.Context, batch *Batch, taskID string) error

func (*Worker) Open

func (w *Worker) Open(ctx context.Context) (err error)

Open opens the worker for processing. It opens all tasks and the DLQ. If any task fails to open, the worker is not opened and the error is returned. Once a worker is opened, it can start processing records. The worker should be closed using Close after it is no longer needed.

func (*Worker) Stop

func (w *Worker) Stop(ctx context.Context) error

Stop stops the worker from processing more records. It does not stop the current batch from being processed. If a batch is currently being processed, the method will block and trigger the stop after the batch is processed.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL