Documentation ¶
Overview ¶
Package stream defines a message and nodes that can be composed into a data pipeline. Nodes are connected with channels which are used to pass messages between them.
We distinguish 3 node types: PubNode, PubSubNode and SubNode. A PubNode is at the start of the pipeline and publishes messages. A PubSubNode sits between two nodes, it receives messages from one node, processes them and sends them to the next one. A SubNode is the last node in the pipeline and only receives messages without sending them to any other node.
A message can have of these statuses:
Open The message starts out in an open state and will stay open while it's passed around between the nodes. Acked Once a node successfully processes the message (e.g. it is sent to the destination or is filtered out by a processor) it is acked. Nacked If some node fails to process the message it nacks the message. In that case a handler can pick it up to send it to a dead letter queue.
In other words, once a node receives a message it has 3 options for how to handle it: it can either pass it to the next node (message stays open), ack the message and keep running if ack is successful, nack the message and keep running if nack is successful. This means that no message will be left in an open status when the pipeline stops.
Nodes can register functions on the message which will be called when the status of a message changes. For more information see StatusChangeHandler.
Nodes can implement LoggingNode to receive a logger struct that can be used to output logs. The node should use the message context to create logs, this way the logger will automatically attach the message ID as well as the node ID to the message, making debugging easier.
Example (ComplexStream) ¶
package main import ( "context" "fmt" "strconv" "sync" "time" "github.com/conduitio/conduit-commons/csync" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/connector" "github.com/conduitio/conduit/pkg/foundation/ctxutil" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/foundation/metrics/noop" "github.com/conduitio/conduit/pkg/pipeline/stream" streammock "github.com/conduitio/conduit/pkg/pipeline/stream/mock" connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector" "github.com/rs/zerolog" "go.uber.org/mock/gomock" ) func main() { ctx, killAll := context.WithCancel(context.Background()) defer killAll() logger := newLogger() ctrl := gomockCtrl(logger) var count int dlqNode := &stream.DLQHandlerNode{ Name: "dlq", Handler: noopDLQHandler(ctrl), WindowSize: 1, WindowNackThreshold: 0, } dlqNode.Add(2) // 2 sources node1 := &stream.SourceNode{ Name: "generator1", Source: generatorSource(ctrl, logger, "generator1", 10, time.Millisecond*10), PipelineTimer: noop.Timer{}, } node2 := &stream.SourceAckerNode{ Name: "generator1-acker", Source: node1.Source, DLQHandlerNode: dlqNode, } node3 := &stream.SourceNode{ Name: "generator2", Source: generatorSource(ctrl, logger, "generator2", 10, time.Millisecond*10), PipelineTimer: noop.Timer{}, } node4 := &stream.SourceAckerNode{ Name: "generator2-acker", Source: node3.Source, DLQHandlerNode: dlqNode, } node5 := &stream.FaninNode{Name: "fanin"} node6 := &stream.ProcessorNode{ Name: "counter", Processor: counterProcessor(ctrl, &count), ProcessorTimer: noop.Timer{}, } node7 := &stream.FanoutNode{Name: "fanout"} node8 := &stream.DestinationNode{ Name: "printer1", Destination: printerDestination(ctrl, logger, "printer1"), ConnectorTimer: noop.Timer{}, } node9 := &stream.DestinationAckerNode{ Name: "printer1-acker", Destination: node8.Destination, } node10 := &stream.DestinationNode{ Name: "printer2", Destination: printerDestination(ctrl, logger, "printer2"), ConnectorTimer: noop.Timer{}, } node11 := &stream.DestinationAckerNode{ Name: "printer2-acker", Destination: node10.Destination, } // put everything together // this is the pipeline we are building // [1] -> [2] -\ /-> [8] -> [9] // |- [5] -> [6] -> [7] -| // [3] -> [4] -/ \-> [10] -> [11] node2.Sub(node1.Pub()) node4.Sub(node3.Pub()) node5.Sub(node2.Pub()) node5.Sub(node4.Pub()) node6.Sub(node5.Pub()) node7.Sub(node6.Pub()) node8.Sub(node7.Pub()) node10.Sub(node7.Pub()) node9.Sub(node8.Pub()) node11.Sub(node10.Pub()) // run nodes nodes := []stream.Node{dlqNode, node1, node2, node3, node4, node5, node6, node7, node8, node9, node10, node11} var wg sync.WaitGroup wg.Add(len(nodes)) for _, n := range nodes { stream.SetLogger(n, logger) go runNode(ctx, &wg, n) } // stop nodes after 250ms, which should be enough to process the 20 messages time.AfterFunc( 250*time.Millisecond, func() { _ = node1.Stop(ctx, nil) _ = node3.Stop(ctx, nil) }, ) // give the nodes some time to process the records, plus a bit of time to stop if (*csync.WaitGroup)(&wg).WaitTimeout(ctx, time.Second) != nil { killAll() } else { logger.Info(ctx).Msgf("counter node counted %d messages", count) logger.Info(ctx).Msg("finished successfully") } } func newLogger() log.CtxLogger { w := zerolog.NewConsoleWriter() w.NoColor = true w.PartsExclude = []string{zerolog.TimestampFieldName} zlogger := zerolog.New(w) zlogger = zlogger.Level(zerolog.DebugLevel) logger := log.New(zlogger) logger.Logger = logger.Hook(ctxutil.MessageIDLogCtxHook{}) return logger } func generatorSource(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string, recordCount int, delay time.Duration) stream.Source { position := 0 teardown := make(chan struct{}) source := streammock.NewSource(ctrl) source.EXPECT().ID().Return(nodeID).AnyTimes() source.EXPECT().Open(gomock.Any()).Return(nil) source.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(context.Context) error { close(teardown) return nil }) source.EXPECT().Ack(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, p []opencdc.Position) error { logger.Debug(ctx).Str("node_id", nodeID).Msg("received ack") return nil }).Times(recordCount) source.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) ([]opencdc.Record, error) { time.Sleep(delay) if position == recordCount { <-teardown return nil, connectorPlugin.ErrStreamNotOpen } position++ return []opencdc.Record{{ Position: opencdc.Position(strconv.Itoa(position)), }}, nil }).MinTimes(recordCount + 1) source.EXPECT().Stop(gomock.Any()).DoAndReturn(func(context.Context) (opencdc.Position, error) { return opencdc.Position(strconv.Itoa(position)), nil }) source.EXPECT().Errors().Return(make(chan error)) return source } func printerDestination(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string) stream.Destination { var lastPosition opencdc.Position rchan := make(chan opencdc.Record, 1) destination := streammock.NewDestination(ctrl) destination.EXPECT().Open(gomock.Any()).Return(nil) destination.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, recs []opencdc.Record) error { for _, r := range recs { logger.Debug(ctx). Str("node_id", nodeID). Msg("got record") lastPosition = r.Position rchan <- r } return nil }).AnyTimes() destination.EXPECT().Ack(gomock.Any()).DoAndReturn(func(ctx context.Context) ([]connector.DestinationAck, error) { select { case <-ctx.Done(): return nil, ctx.Err() case r, ok := <-rchan: if !ok { return nil, nil } return []connector.DestinationAck{{Position: r.Position}}, nil } }).AnyTimes() destination.EXPECT().Stop(gomock.Any(), EqLazy(func() interface{} { return lastPosition })).Return(nil) destination.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(ctx context.Context) error { close(rchan) return nil }) destination.EXPECT().Errors().Return(make(chan error)) return destination } func counterProcessor(ctrl *gomock.Controller, count *int) stream.Processor { proc := streammock.NewProcessor(ctrl) proc.EXPECT().Open(gomock.Any()) proc.EXPECT(). Process(gomock.Any(), gomock.Any()). DoAndReturn(func(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord { *count++ out := make([]sdk.ProcessedRecord, len(records)) for i, r := range records { out[i] = sdk.SingleRecord(r) } return out }).AnyTimes() proc.EXPECT().Teardown(gomock.Any()) return proc } func noopDLQHandler(ctrl *gomock.Controller) stream.DLQHandler { handler := streammock.NewDLQHandler(ctrl) handler.EXPECT().Open(gomock.Any()).Return(nil) handler.EXPECT().Close(gomock.Any()) return handler } func gomockCtrl(logger log.CtxLogger) *gomock.Controller { return gomock.NewController(gomockLogger(logger)) } type gomockLogger log.CtxLogger func (g gomockLogger) Errorf(format string, args ...interface{}) { g.Error().Msgf(format, args...) } func (g gomockLogger) Fatalf(format string, args ...interface{}) { g.Fatal().Msgf(format, args...) } func runNode(ctx context.Context, wg *sync.WaitGroup, n stream.Node) { defer wg.Done() err := n.Run(ctx) if err != nil { fmt.Printf("%s error: %v\n", n.ID(), err) } } func EqLazy(x func() interface{}) gomock.Matcher { return eqMatcherLazy{x} } type eqMatcherLazy struct { x func() interface{} } func (e eqMatcherLazy) Matches(x interface{}) bool { return gomock.Eq(e.x()).Matches(x) } func (e eqMatcherLazy) String() string { return gomock.Eq(e.x()).String() }
Output: DBG opening processor component=ProcessorNode node_id=counter DBG got record message_id=generator2/1 node_id=printer2 DBG got record message_id=generator2/1 node_id=printer1 DBG received ack message_id=generator2/1 node_id=generator2 DBG got record message_id=generator1/1 node_id=printer1 DBG got record message_id=generator1/1 node_id=printer2 DBG received ack message_id=generator1/1 node_id=generator1 DBG got record message_id=generator2/2 node_id=printer2 DBG got record message_id=generator2/2 node_id=printer1 DBG received ack message_id=generator2/2 node_id=generator2 DBG got record message_id=generator1/2 node_id=printer1 DBG got record message_id=generator1/2 node_id=printer2 DBG received ack message_id=generator1/2 node_id=generator1 DBG got record message_id=generator2/3 node_id=printer2 DBG got record message_id=generator2/3 node_id=printer1 DBG received ack message_id=generator2/3 node_id=generator2 DBG got record message_id=generator1/3 node_id=printer1 DBG got record message_id=generator1/3 node_id=printer2 DBG received ack message_id=generator1/3 node_id=generator1 DBG got record message_id=generator2/4 node_id=printer2 DBG got record message_id=generator2/4 node_id=printer1 DBG received ack message_id=generator2/4 node_id=generator2 DBG got record message_id=generator1/4 node_id=printer2 DBG got record message_id=generator1/4 node_id=printer1 DBG received ack message_id=generator1/4 node_id=generator1 DBG got record message_id=generator2/5 node_id=printer2 DBG got record message_id=generator2/5 node_id=printer1 DBG received ack message_id=generator2/5 node_id=generator2 DBG got record message_id=generator1/5 node_id=printer1 DBG got record message_id=generator1/5 node_id=printer2 DBG received ack message_id=generator1/5 node_id=generator1 DBG got record message_id=generator2/6 node_id=printer2 DBG got record message_id=generator2/6 node_id=printer1 DBG received ack message_id=generator2/6 node_id=generator2 DBG got record message_id=generator1/6 node_id=printer1 DBG got record message_id=generator1/6 node_id=printer2 DBG received ack message_id=generator1/6 node_id=generator1 DBG got record message_id=generator2/7 node_id=printer2 DBG got record message_id=generator2/7 node_id=printer1 DBG received ack message_id=generator2/7 node_id=generator2 DBG got record message_id=generator1/7 node_id=printer1 DBG got record message_id=generator1/7 node_id=printer2 DBG received ack message_id=generator1/7 node_id=generator1 DBG got record message_id=generator2/8 node_id=printer2 DBG got record message_id=generator2/8 node_id=printer1 DBG received ack message_id=generator2/8 node_id=generator2 DBG got record message_id=generator1/8 node_id=printer1 DBG got record message_id=generator1/8 node_id=printer2 DBG received ack message_id=generator1/8 node_id=generator1 DBG got record message_id=generator2/9 node_id=printer1 DBG got record message_id=generator2/9 node_id=printer2 DBG received ack message_id=generator2/9 node_id=generator2 DBG got record message_id=generator1/9 node_id=printer2 DBG got record message_id=generator1/9 node_id=printer1 DBG received ack message_id=generator1/9 node_id=generator1 DBG got record message_id=generator2/10 node_id=printer1 DBG got record message_id=generator2/10 node_id=printer2 DBG received ack message_id=generator2/10 node_id=generator2 DBG got record message_id=generator1/10 node_id=printer2 DBG got record message_id=generator1/10 node_id=printer1 DBG received ack message_id=generator1/10 node_id=generator1 INF stopping source connector component=SourceNode node_id=generator1 INF stopping source connector component=SourceNode node_id=generator2 INF stopping source node component=SourceNode node_id=generator1 record_position=10 INF stopping source node component=SourceNode node_id=generator2 record_position=10 DBG incoming messages channel closed component=SourceAckerNode node_id=generator1-acker DBG incoming messages channel closed component=SourceAckerNode node_id=generator2-acker DBG incoming messages channel closed component=ProcessorNode node_id=counter DBG tearing down processor component=ProcessorNode node_id=counter DBG incoming messages channel closed component=DestinationNode node_id=printer1 DBG incoming messages channel closed component=DestinationNode node_id=printer2 DBG incoming messages channel closed component=DestinationAckerNode node_id=printer1-acker DBG incoming messages channel closed component=DestinationAckerNode node_id=printer2-acker INF counter node counted 20 messages INF finished successfully
Example (SimpleStream) ¶
package main import ( "context" "fmt" "strconv" "sync" "time" "github.com/conduitio/conduit-commons/csync" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit/pkg/connector" "github.com/conduitio/conduit/pkg/foundation/ctxutil" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/foundation/metrics/noop" "github.com/conduitio/conduit/pkg/pipeline/stream" streammock "github.com/conduitio/conduit/pkg/pipeline/stream/mock" connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector" "github.com/rs/zerolog" "go.uber.org/mock/gomock" ) func main() { ctx, killAll := context.WithCancel(context.Background()) defer killAll() logger := newLogger() ctrl := gomockCtrl(logger) dlqNode := &stream.DLQHandlerNode{ Name: "dlq", Handler: noopDLQHandler(ctrl), WindowSize: 1, WindowNackThreshold: 0, } dlqNode.Add(1) // 1 source node1 := &stream.SourceNode{ Name: "generator", Source: generatorSource(ctrl, logger, "generator", 10, time.Millisecond*10), PipelineTimer: noop.Timer{}, } node2 := &stream.SourceAckerNode{ Name: "generator-acker", Source: node1.Source, DLQHandlerNode: dlqNode, } node3 := &stream.DestinationNode{ Name: "printer", Destination: printerDestination(ctrl, logger, "printer"), ConnectorTimer: noop.Timer{}, } node4 := &stream.DestinationAckerNode{ Name: "printer-acker", Destination: node3.Destination, } stream.SetLogger(node1, logger) stream.SetLogger(node2, logger) stream.SetLogger(node3, logger) stream.SetLogger(node4, logger) // put everything together node2.Sub(node1.Pub()) node3.Sub(node2.Pub()) node4.Sub(node3.Pub()) var wg sync.WaitGroup wg.Add(5) go runNode(ctx, &wg, dlqNode) go runNode(ctx, &wg, node4) go runNode(ctx, &wg, node3) go runNode(ctx, &wg, node2) go runNode(ctx, &wg, node1) // stop node after 150ms, which should be enough to process the 10 messages time.AfterFunc(150*time.Millisecond, func() { _ = node1.Stop(ctx, nil) }) // give the node some time to process the records, plus a bit of time to stop if (*csync.WaitGroup)(&wg).WaitTimeout(ctx, time.Second) != nil { killAll() } else { logger.Info(ctx).Msg("finished successfully") } } func newLogger() log.CtxLogger { w := zerolog.NewConsoleWriter() w.NoColor = true w.PartsExclude = []string{zerolog.TimestampFieldName} zlogger := zerolog.New(w) zlogger = zlogger.Level(zerolog.DebugLevel) logger := log.New(zlogger) logger.Logger = logger.Hook(ctxutil.MessageIDLogCtxHook{}) return logger } func generatorSource(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string, recordCount int, delay time.Duration) stream.Source { position := 0 teardown := make(chan struct{}) source := streammock.NewSource(ctrl) source.EXPECT().ID().Return(nodeID).AnyTimes() source.EXPECT().Open(gomock.Any()).Return(nil) source.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(context.Context) error { close(teardown) return nil }) source.EXPECT().Ack(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, p []opencdc.Position) error { logger.Debug(ctx).Str("node_id", nodeID).Msg("received ack") return nil }).Times(recordCount) source.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) ([]opencdc.Record, error) { time.Sleep(delay) if position == recordCount { <-teardown return nil, connectorPlugin.ErrStreamNotOpen } position++ return []opencdc.Record{{ Position: opencdc.Position(strconv.Itoa(position)), }}, nil }).MinTimes(recordCount + 1) source.EXPECT().Stop(gomock.Any()).DoAndReturn(func(context.Context) (opencdc.Position, error) { return opencdc.Position(strconv.Itoa(position)), nil }) source.EXPECT().Errors().Return(make(chan error)) return source } func printerDestination(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string) stream.Destination { var lastPosition opencdc.Position rchan := make(chan opencdc.Record, 1) destination := streammock.NewDestination(ctrl) destination.EXPECT().Open(gomock.Any()).Return(nil) destination.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, recs []opencdc.Record) error { for _, r := range recs { logger.Debug(ctx). Str("node_id", nodeID). Msg("got record") lastPosition = r.Position rchan <- r } return nil }).AnyTimes() destination.EXPECT().Ack(gomock.Any()).DoAndReturn(func(ctx context.Context) ([]connector.DestinationAck, error) { select { case <-ctx.Done(): return nil, ctx.Err() case r, ok := <-rchan: if !ok { return nil, nil } return []connector.DestinationAck{{Position: r.Position}}, nil } }).AnyTimes() destination.EXPECT().Stop(gomock.Any(), EqLazy(func() interface{} { return lastPosition })).Return(nil) destination.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(ctx context.Context) error { close(rchan) return nil }) destination.EXPECT().Errors().Return(make(chan error)) return destination } func noopDLQHandler(ctrl *gomock.Controller) stream.DLQHandler { handler := streammock.NewDLQHandler(ctrl) handler.EXPECT().Open(gomock.Any()).Return(nil) handler.EXPECT().Close(gomock.Any()) return handler } func gomockCtrl(logger log.CtxLogger) *gomock.Controller { return gomock.NewController(gomockLogger(logger)) } type gomockLogger log.CtxLogger func (g gomockLogger) Errorf(format string, args ...interface{}) { g.Error().Msgf(format, args...) } func (g gomockLogger) Fatalf(format string, args ...interface{}) { g.Fatal().Msgf(format, args...) } func runNode(ctx context.Context, wg *sync.WaitGroup, n stream.Node) { defer wg.Done() err := n.Run(ctx) if err != nil { fmt.Printf("%s error: %v\n", n.ID(), err) } } func EqLazy(x func() interface{}) gomock.Matcher { return eqMatcherLazy{x} } type eqMatcherLazy struct { x func() interface{} } func (e eqMatcherLazy) Matches(x interface{}) bool { return gomock.Eq(e.x()).Matches(x) } func (e eqMatcherLazy) String() string { return gomock.Eq(e.x()).String() }
Output: DBG got record message_id=generator/1 node_id=printer DBG received ack message_id=generator/1 node_id=generator DBG got record message_id=generator/2 node_id=printer DBG received ack message_id=generator/2 node_id=generator DBG got record message_id=generator/3 node_id=printer DBG received ack message_id=generator/3 node_id=generator DBG got record message_id=generator/4 node_id=printer DBG received ack message_id=generator/4 node_id=generator DBG got record message_id=generator/5 node_id=printer DBG received ack message_id=generator/5 node_id=generator DBG got record message_id=generator/6 node_id=printer DBG received ack message_id=generator/6 node_id=generator DBG got record message_id=generator/7 node_id=printer DBG received ack message_id=generator/7 node_id=generator DBG got record message_id=generator/8 node_id=printer DBG received ack message_id=generator/8 node_id=generator DBG got record message_id=generator/9 node_id=printer DBG received ack message_id=generator/9 node_id=generator DBG got record message_id=generator/10 node_id=printer DBG received ack message_id=generator/10 node_id=generator INF stopping source connector component=SourceNode node_id=generator INF stopping source node component=SourceNode node_id=generator record_position=10 DBG incoming messages channel closed component=SourceAckerNode node_id=generator-acker DBG incoming messages channel closed component=DestinationNode node_id=printer DBG incoming messages channel closed component=DestinationAckerNode node_id=printer-acker INF finished successfully
Index ¶
- Variables
- func LoggerWithComponent(logger log.CtxLogger, v Node) log.CtxLogger
- func LoggerWithNodeID(logger log.CtxLogger, n Node) log.CtxLogger
- func SetLogger(n Node, logger log.CtxLogger, ...)
- type AckHandler
- type ControlMessageType
- type DLQHandler
- type DLQHandlerNode
- func (n *DLQHandlerNode) Ack(msg *Message)
- func (n *DLQHandlerNode) Add(delta int)
- func (n *DLQHandlerNode) Done()
- func (n *DLQHandlerNode) ForceStop(ctx context.Context)
- func (n *DLQHandlerNode) ID() string
- func (n *DLQHandlerNode) Nack(msg *Message, nackMetadata NackMetadata) (err error)
- func (n *DLQHandlerNode) Run(ctx context.Context) (err error)
- func (n *DLQHandlerNode) SetLogger(logger log.CtxLogger)
- type Destination
- type DestinationAckerNode
- type DestinationNode
- func (n *DestinationNode) ForceStop(ctx context.Context)
- func (n *DestinationNode) ID() string
- func (n *DestinationNode) Pub() <-chan *Message
- func (n *DestinationNode) Run(ctx context.Context) (err error)
- func (n *DestinationNode) SetLogger(logger log.CtxLogger)
- func (n *DestinationNode) Sub(in <-chan *Message)
- type FaninNode
- type FanoutNode
- type ForceStoppableNode
- type LoggingNode
- type Message
- func (m *Message) Ack() error
- func (m *Message) Acked() <-chan struct{}
- func (m *Message) Clone() *Message
- func (m *Message) ControlMessageType() ControlMessageType
- func (m *Message) ID() string
- func (m *Message) Nack(reason error, nodeID string) error
- func (m *Message) Nacked() <-chan struct{}
- func (m *Message) RegisterAckHandler(h AckHandler)
- func (m *Message) RegisterNackHandler(h NackHandler)
- func (m *Message) RegisterStatusHandler(h StatusChangeHandler)
- func (m *Message) Status() MessageStatus
- func (m *Message) StatusError() error
- type MessageStatus
- type MetricsNode
- type NackHandler
- type NackMetadata
- type Node
- type OpenMessagesTracker
- type ParallelNode
- type Processor
- type ProcessorNode
- type PubNode
- type PubSubNode
- type Source
- type SourceAckerNode
- type SourceNode
- func (n *SourceNode) ForceStop(ctx context.Context)
- func (n *SourceNode) ID() string
- func (n *SourceNode) Pub() <-chan *Message
- func (n *SourceNode) Run(ctx context.Context) (err error)
- func (n *SourceNode) SetLogger(logger log.CtxLogger)
- func (n *SourceNode) Stop(ctx context.Context, reason error) error
- type StatusChange
- type StatusChangeHandler
- type StoppableNode
- type SubNode
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrUnexpectedMessageStatus = cerrors.New("unexpected message status")
Functions ¶
func LoggerWithComponent ¶ added in v0.6.0
LoggerWithComponent creates a logger with the component set to the node name.
func LoggerWithNodeID ¶ added in v0.6.0
LoggerWithNodeID creates a logger with the node ID field.
Types ¶
type AckHandler ¶
AckHandler is a variation of the StatusChangeHandler that is only called when a message is acked. For more info see StatusChangeHandler.
type ControlMessageType ¶ added in v0.3.0
type ControlMessageType string
ControlMessageType represents the type of a control message.
const (
ControlMessageStopSourceNode ControlMessageType = "stop-source-node"
)
type DLQHandler ¶ added in v0.4.0
type DLQHandlerNode ¶ added in v0.4.0
type DLQHandlerNode struct { Name string Handler DLQHandler WindowSize int WindowNackThreshold int Timer metrics.Timer Histogram metrics.RecordBytesHistogram // contains filtered or unexported fields }
DLQHandlerNode is called by each SourceAckerNode in a pipeline to store nacked messages in a dead-letter-queue.
func (*DLQHandlerNode) Ack ¶ added in v0.4.0
func (n *DLQHandlerNode) Ack(msg *Message)
func (*DLQHandlerNode) Add ¶ added in v0.4.0
func (n *DLQHandlerNode) Add(delta int)
Add should be called before Run to increase the counter of components depending on DLQHandlerNode. The node will keep running until the counter reaches 0 (see Done).
Note a call with positive delta to Add should happen before Run.
func (*DLQHandlerNode) Done ¶ added in v0.4.0
func (n *DLQHandlerNode) Done()
Done should be called before a component depending on DLQHandlerNode stops running and guarantees there will be no more calls to DLQHandlerNode.
func (*DLQHandlerNode) ForceStop ¶ added in v0.5.3
func (n *DLQHandlerNode) ForceStop(ctx context.Context)
func (*DLQHandlerNode) ID ¶ added in v0.4.0
func (n *DLQHandlerNode) ID() string
func (*DLQHandlerNode) Nack ¶ added in v0.4.0
func (n *DLQHandlerNode) Nack(msg *Message, nackMetadata NackMetadata) (err error)
func (*DLQHandlerNode) Run ¶ added in v0.4.0
func (n *DLQHandlerNode) Run(ctx context.Context) (err error)
Run runs the DLQ handler node until all components depending on this node call Done. Dependents can be added or removed while the node is running.
func (*DLQHandlerNode) SetLogger ¶ added in v0.4.0
func (n *DLQHandlerNode) SetLogger(logger log.CtxLogger)
type Destination ¶ added in v0.5.0
type DestinationAckerNode ¶ added in v0.3.0
type DestinationAckerNode struct { Name string Destination Destination // contains filtered or unexported fields }
DestinationAckerNode is responsible for handling acknowledgments received from the destination and forwarding them to the correct message.
func (*DestinationAckerNode) ForceStop ¶ added in v0.5.3
func (n *DestinationAckerNode) ForceStop(ctx context.Context)
func (*DestinationAckerNode) ID ¶ added in v0.3.0
func (n *DestinationAckerNode) ID() string
func (*DestinationAckerNode) Run ¶ added in v0.3.0
func (n *DestinationAckerNode) Run(ctx context.Context) (err error)
func (*DestinationAckerNode) SetLogger ¶ added in v0.3.0
func (n *DestinationAckerNode) SetLogger(logger log.CtxLogger)
SetLogger sets the logger.
func (*DestinationAckerNode) Sub ¶ added in v0.3.0
func (n *DestinationAckerNode) Sub(in <-chan *Message)
Sub will subscribe this node to an incoming channel.
type DestinationNode ¶
type DestinationNode struct { Name string Destination Destination ConnectorTimer metrics.Timer // contains filtered or unexported fields }
DestinationNode wraps a Destination connector and implements the Sub node interface
func (*DestinationNode) ForceStop ¶ added in v0.5.3
func (n *DestinationNode) ForceStop(ctx context.Context)
func (*DestinationNode) ID ¶
func (n *DestinationNode) ID() string
func (*DestinationNode) Pub ¶ added in v0.3.0
func (n *DestinationNode) Pub() <-chan *Message
Pub will return the outgoing channel.
func (*DestinationNode) SetLogger ¶
func (n *DestinationNode) SetLogger(logger log.CtxLogger)
SetLogger sets the logger.
func (*DestinationNode) Sub ¶
func (n *DestinationNode) Sub(in <-chan *Message)
Sub will subscribe this node to an incoming channel.
type FanoutNode ¶
type FanoutNode struct { Name string // contains filtered or unexported fields }
func (*FanoutNode) ID ¶
func (n *FanoutNode) ID() string
func (*FanoutNode) Pub ¶
func (n *FanoutNode) Pub() <-chan *Message
func (*FanoutNode) Sub ¶
func (n *FanoutNode) Sub(in <-chan *Message)
type ForceStoppableNode ¶ added in v0.5.3
type ForceStoppableNode interface { Node // ForceStop signals a running ForceStoppableNode that it should stop // running as soon as it can, even if that means that it doesn't properly // clean up after itself. This method is a last resort in case something // goes wrong and the pipeline gets stuck (e.g. a connector plugin is not // responding). ForceStop(ctx context.Context) }
type LoggingNode ¶
type LoggingNode interface { Node // SetLogger sets the logger used by the node for logging. SetLogger(log.CtxLogger) }
LoggingNode is a node which expects a logger.
type Message ¶
type Message struct { // Ctx is the context in which the record was fetched. It should be used for // any function calls when processing the message. If the context is done // the message should be nacked as soon as possible and not processed // further. Ctx context.Context // Record represents a single record attached to the message. Record opencdc.Record // SourceID contains the source connector ID. SourceID string // contains filtered or unexported fields }
Message represents a single message flowing through a pipeline. Only a single node is allowed to hold a message and access its fields at a specific point in time, otherwise we could introduce race conditions.
func (*Message) Ack ¶
Ack marks the message as acked, calls the corresponding status change handlers and closes the channel returned by Acked. Errors from ack handlers get collected and returned as a single error. If Ack returns an error, the caller node should stop processing new messages and return the error. Calling Ack after the message has already been nacked will panic, while subsequent calls to Ack on an acked message are a noop and return the same value.
func (*Message) Acked ¶
func (m *Message) Acked() <-chan struct{}
Acked returns a channel that's closed when the message has been acked. Successive calls to Acked return the same value. This function can be used to wait for a message to be acked without notifying the acker.
func (*Message) Clone ¶
Clone returns a cloned message with the same content but separate ack and nack handling.
func (*Message) ControlMessageType ¶ added in v0.3.0
func (m *Message) ControlMessageType() ControlMessageType
func (*Message) ID ¶
ID returns a string representing a unique ID of this message. This is meant only for logging purposes.
func (*Message) Nack ¶
Nack marks the message as nacked, calls the registered status change handlers and closes the channel returned by Nacked. If no nack handlers were registered Nack will return an error. Errors from nack handlers get collected and returned as a single error. If Nack returns an error, the caller node should stop processing new messages and return the error. Calling Nack after the message has already been acked will panic, while subsequent calls to Nack on a nacked message are a noop and return the same value.
func (*Message) Nacked ¶
func (m *Message) Nacked() <-chan struct{}
Nacked returns a channel that's closed when the message has been nacked. Successive calls to Nacked return the same value. This function can be used to wait for a message to be nacked without notifying the nacker.
func (*Message) RegisterAckHandler ¶
func (m *Message) RegisterAckHandler(h AckHandler)
RegisterAckHandler is used to register a function that will be called when the message is acked. This function can only be called if the message status is open, otherwise it panics.
func (*Message) RegisterNackHandler ¶
func (m *Message) RegisterNackHandler(h NackHandler)
RegisterNackHandler is used to register a function that will be called when the message is nacked. This function can only be called if the message status is open, otherwise it panics.
func (*Message) RegisterStatusHandler ¶
func (m *Message) RegisterStatusHandler(h StatusChangeHandler)
RegisterStatusHandler is used to register a function that will be called on any status change of the message. This function can only be called if the message status is open, otherwise it panics. Handlers are called in the reverse order of how they were registered.
func (*Message) Status ¶
func (m *Message) Status() MessageStatus
Status returns the current message status.
func (*Message) StatusError ¶ added in v0.6.0
StatusError returns the error that was returned when the message was acked or nacked. If the message was successfully acked/nacked or it is still open the method returns nil.
type MessageStatus ¶
type MessageStatus int
MessageStatus represents the state of the message (acked, nacked or open).
const ( MessageStatusAcked MessageStatus = iota MessageStatusNacked MessageStatusOpen )
func (MessageStatus) String ¶
func (i MessageStatus) String() string
type MetricsNode ¶
type MetricsNode struct { Name string Histogram metrics.RecordBytesHistogram // contains filtered or unexported fields }
func (*MetricsNode) ID ¶
func (n *MetricsNode) ID() string
func (*MetricsNode) Pub ¶
func (n *MetricsNode) Pub() <-chan *Message
func (*MetricsNode) SetLogger ¶
func (n *MetricsNode) SetLogger(logger log.CtxLogger)
func (*MetricsNode) Sub ¶
func (n *MetricsNode) Sub(in <-chan *Message)
type NackHandler ¶
type NackHandler func(*Message, NackMetadata) error
NackHandler is a variation of the StatusChangeHandler that is only called when a message is nacked. For more info see StatusChangeHandler.
type NackMetadata ¶ added in v0.4.0
type Node ¶
type Node interface { // ID returns the identifier of this Node. Each Node in a pipeline must be // uniquely identified by the ID. ID() string // Run first verifies if the Node is set up correctly and either returns a // descriptive error or starts processing messages. Processing should stop // as soon as the supplied context is done. If an error occurs while // processing messages, the processing should stop and the error should be // returned. If processing stopped because the context was canceled, the // function should return ctx.Err(). All nodes that are part of the same // pipeline will receive the same context in Run and as soon as one node // returns an error the context will be canceled. // Run has different responsibilities, depending on the node type: // * PubNode has to start producing new messages into the outgoing channel. // The context supplied to Run has to be attached to all messages. Each // message will be either acked or nacked by downstream nodes, it's the // responsibility of PubNode to handle these acks/nacks if applicable. // The outgoing channel has to be closed when Run returns, regardless of // the return value. // * SubNode has to start listening to messages sent to the incoming // channel. It has to use the context supplied in the message for calls // to other functions (imagine the message context as a request context). // It is the responsibility of SubNode to ack or nack a message if it's // processed correctly or with an error. If the incoming channel is // closed, then Run should stop and return nil. // * PubSubNode has to start listening to incoming messages, process them // and forward them to the outgoing channel. The node should not ack/nack // forwarded messages. If a message is dropped and not forwarded to the // outgoing channel (i.e. filters), the message should be acked. If an // error is encountered while processing the message, the message has to // be nacked and Run should return with an error. If the incoming channel // is closed, then Run should stop and return nil. The outgoing channel // has to be closed when Run returns, regardless of the return value. // The incoming message pointers need to be forwarded, as upstream nodes // could be waiting for acks/nacks on that exact pointer. If the node // forwards a new message (not the exact pointer it received), then it // needs to forward any acks/nacks to the original message pointer. Run(ctx context.Context) error }
Node represents a single node in a pipeline that knows how to process messages flowing through a pipeline.
type OpenMessagesTracker ¶ added in v0.3.0
OpenMessagesTracker allows you to track messages until they reach the end of the pipeline.
func (*OpenMessagesTracker) Add ¶ added in v0.3.0
func (t *OpenMessagesTracker) Add(msg *Message)
Add will increase the counter in the wait group and register a status handler that will decrease the counter when the message is acked or nacked.
func (*OpenMessagesTracker) Wait ¶ added in v0.3.0
func (t *OpenMessagesTracker) Wait()
type ParallelNode ¶ added in v0.6.0
type ParallelNode struct { Name string // NewNode is the constructor of the wrapped PubSubNode, it should create // the i-th node (useful for distinguishing nodes in logs). NewNode func(i int) PubSubNode Workers int // contains filtered or unexported fields }
ParallelNode wraps a PubSubNode and parallelizes it by running multiple instances of the node in separate worker goroutines. It also spawns a coordinator goroutine that is responsible for collecting the results and forwarding them to the next node down the line while maintaining the order of messages.
func (*ParallelNode) ID ¶ added in v0.6.0
func (n *ParallelNode) ID() string
func (*ParallelNode) Pub ¶ added in v0.6.0
func (n *ParallelNode) Pub() <-chan *Message
func (*ParallelNode) Run ¶ added in v0.6.0
func (n *ParallelNode) Run(ctx context.Context) error
Run is continuously fetching messages from the incoming channel (i.e. from the previous node) and submitting jobs to the job channel shared by all worker goroutines. Once a worker picks up the job, the same job is also sent to the coordinator which starts waiting for the job to be done. The worker is responsible for forwarding the message to the wrapped PubSubNode, waiting for it to process the message and then mark the job as done. Once the jobs are done the results are picked up by the coordinator which decides if the message should be sent to the next node, if it was filtered out or if an error happened and the node needs to stop running. The coordinator collects job results in the same order as the order of the dispatched jobs, so the order of messages is maintained.
func (*ParallelNode) SetLogger ¶ added in v0.6.0
func (n *ParallelNode) SetLogger(logger log.CtxLogger)
func (*ParallelNode) Sub ¶ added in v0.6.0
func (n *ParallelNode) Sub(in <-chan *Message)
type Processor ¶ added in v0.9.0
type Processor interface { // Open configures and opens a processor plugin Open(ctx context.Context) error Process(context.Context, []opencdc.Record) []sdk.ProcessedRecord // Teardown tears down a processor plugin. // In case of standalone plugins, that means stopping the WASM module. Teardown(context.Context) error }
type ProcessorNode ¶
type ProcessorNode struct { Name string Processor Processor ProcessorTimer metrics.Timer // contains filtered or unexported fields }
func (*ProcessorNode) ID ¶
func (n *ProcessorNode) ID() string
func (*ProcessorNode) Pub ¶
func (n *ProcessorNode) Pub() <-chan *Message
func (*ProcessorNode) SetLogger ¶
func (n *ProcessorNode) SetLogger(logger log.CtxLogger)
func (*ProcessorNode) Sub ¶
func (n *ProcessorNode) Sub(in <-chan *Message)
type PubNode ¶
type PubNode interface { Node // Pub returns the outgoing channel, that can be used to connect downstream // nodes to PubNode. It is the responsibility of PubNode to close this // channel when it stops running (see Node.Run). Pub needs to be called // before running a PubNode, otherwise Node.Run should return an error. Pub() <-chan *Message }
PubNode represents a node at the start of a pipeline, which pushes new messages to downstream nodes.
type PubSubNode ¶
PubSubNode represents a node in the middle of a pipeline, located between two nodes. It listens to incoming messages from the incoming channel, processes them and forwards them to the outgoing channel.
type SourceAckerNode ¶ added in v0.3.0
type SourceAckerNode struct { Name string Source Source DLQHandlerNode *DLQHandlerNode // contains filtered or unexported fields }
SourceAckerNode is responsible for handling acknowledgments for messages of a specific source and forwarding them to the source in the correct order.
func (*SourceAckerNode) ID ¶ added in v0.3.0
func (n *SourceAckerNode) ID() string
func (*SourceAckerNode) Pub ¶ added in v0.3.0
func (n *SourceAckerNode) Pub() <-chan *Message
func (*SourceAckerNode) Run ¶ added in v0.3.0
func (n *SourceAckerNode) Run(ctx context.Context) error
func (*SourceAckerNode) SetLogger ¶ added in v0.3.0
func (n *SourceAckerNode) SetLogger(logger log.CtxLogger)
func (*SourceAckerNode) Sub ¶ added in v0.3.0
func (n *SourceAckerNode) Sub(in <-chan *Message)
type SourceNode ¶
type SourceNode struct { Name string Source Source PipelineTimer metrics.Timer // contains filtered or unexported fields }
SourceNode wraps a Source connector and implements the Pub node interface
func (*SourceNode) ForceStop ¶ added in v0.5.3
func (n *SourceNode) ForceStop(ctx context.Context)
func (*SourceNode) ID ¶
func (n *SourceNode) ID() string
ID returns a properly formatted SourceNode ID prefixed with `source/`
func (*SourceNode) Pub ¶
func (n *SourceNode) Pub() <-chan *Message
func (*SourceNode) SetLogger ¶
func (n *SourceNode) SetLogger(logger log.CtxLogger)
type StatusChange ¶
type StatusChange struct { Old MessageStatus New MessageStatus // Reason contains the error that triggered the status change in case of a // nack. NackMetadata NackMetadata }
StatusChange is passed to StatusChangeHandler when the status of a message changes.
type StatusChangeHandler ¶
type StatusChangeHandler func(*Message, StatusChange) error
StatusChangeHandler is executed when a message status changes. The handlers are triggered by a call to either of these functions: Message.Nack, Message.Ack. These functions will block until the handlers finish handling the message and will return the error returned by the handlers. The function receives the message and the status change describing the old and new message status as well as the reason for the status change in case of a nack.
type StoppableNode ¶
type StoppableNode interface { Node // Stop signals a running StopNode that it should gracefully shutdown. It // should stop producing new messages, wait to receive acks/nacks for any // in-flight messages, close the outgoing channel and return from Node.Run. // Stop should return right away, not waiting for the node to actually stop // running. If the node is not running the function does not do anything. // The reason supplied to Stop will be returned by Node.Run. Stop(ctx context.Context, reason error) error }
type SubNode ¶
type SubNode interface { Node // Sub sets the incoming channel, that is used to listen to new messages. // Node.Run should listen to messages coming from this channel until the // channel is closed. Sub needs to be called before running a SubNode, // otherwise Node.Run should return an error. Sub(in <-chan *Message) }
SubNode represents a node at the end of a pipeline, which listens to incoming messages from upstream nodes.