Documentation ¶
Overview ¶
Package stream defines a message and nodes that can be composed into a data pipeline. Nodes are connected with channels which are used to pass messages between them.
We distinguish 3 node types: PubNode, PubSubNode and SubNode. A PubNode is at the start of the pipeline and publishes messages. A PubSubNode sits between two nodes, it receives messages from one node, processes them and sends them to the next one. A SubNode is the last node in the pipeline and only receives messages without sending them to any other node.
A message can have of these statuses:
Open The message starts out in an open state and will stay open while it's passed around between the nodes. Acked Once a node successfully processes the message (e.g. it is sent to the destination or is filtered out by a processor) it is acked. Nacked If some node fails to process the message it can nack the message and once it's successfully nacked (e.g. sent to a dead letter queue) it becomes nacked. Dropped If a node experiences a non-recoverable error or has to stop running without sending the message to the next node (e.g. force stop) it can drop the message, then the message status changes to dropped.
In other words, once a node receives a message it has 4 options for how to handle it: it can either pass it to the next node (message stays open), ack the message and keep running, nack the message and keep running or drop the message and stop running. This means that no message will be left in an open status when the pipeline stops.
Nodes can register functions on the message which will be called when the status of a message changes. For more information see StatusChangeHandler.
Nodes can implement LoggingNode to receive a logger struct that can be used to output logs. The node should use the message context to create logs, this way the logger will automatically attach the message ID as well as the node ID to the message, making debugging easier.
Example (ComplexStream) ¶
package main import ( "context" "fmt" "strconv" "sync" "time" "github.com/conduitio/conduit/pkg/connector" connmock "github.com/conduitio/conduit/pkg/connector/mock" "github.com/conduitio/conduit/pkg/foundation/ctxutil" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/foundation/metrics/noop" "github.com/conduitio/conduit/pkg/pipeline/stream" "github.com/conduitio/conduit/pkg/plugins" "github.com/conduitio/conduit/pkg/processor" procmock "github.com/conduitio/conduit/pkg/processor/mock" "github.com/conduitio/conduit/pkg/record" "github.com/golang/mock/gomock" "github.com/rs/zerolog" ) func main() { ctx, killAll := context.WithCancel(context.Background()) defer killAll() logger := newLogger() ctrl := gomockCtrl(logger) var count int node1 := &stream.SourceNode{ Name: "generator1", Source: generatorSource(ctrl, logger, "generator1", 10, time.Millisecond*10), ConnectorTimer: noop.Timer{}, PipelineTimer: noop.Timer{}, } node2 := &stream.SourceNode{ Name: "generator2", Source: generatorSource(ctrl, logger, "generator2", 10, time.Millisecond*10), ConnectorTimer: noop.Timer{}, PipelineTimer: noop.Timer{}, } node3 := &stream.FaninNode{Name: "fanin"} node4 := &stream.ProcessorNode{ Name: "counter", Processor: counterProcessor(ctrl, &count), ProcessorTimer: noop.Timer{}, } node5 := &stream.FanoutNode{Name: "fanout"} node6 := &stream.DestinationNode{ Name: "printer1", Destination: printerDestination(ctrl, logger, "printer1"), ConnectorTimer: noop.Timer{}, } node7 := &stream.DestinationNode{ Name: "printer2", Destination: printerDestination(ctrl, logger, "printer2"), ConnectorTimer: noop.Timer{}, } // put everything together out := node1.Pub() node3.Sub(out) out = node2.Pub() node3.Sub(out) out = node3.Pub() node4.Sub(out) out = node4.Pub() node5.Sub(out) out = node5.Pub() node6.Sub(out) out = node5.Pub() node7.Sub(out) // run nodes nodes := []stream.Node{node1, node2, node3, node4, node5, node6, node7} var wg sync.WaitGroup wg.Add(len(nodes)) for _, n := range nodes { stream.SetLogger(n, logger) go runNode(ctx, &wg, n) } // stop nodes after 250ms, which should be enough to process the 20 messages time.AfterFunc( 250*time.Millisecond, func() { node1.Stop(nil) node2.Stop(nil) }, ) // give the nodes some time to process the records, plus a bit of time to stop if waitTimeout(&wg, 300*time.Millisecond) { killAll() } else { logger.Info(ctx).Msgf("counter node counted %d messages", count) logger.Info(ctx).Msg("finished successfully") } } func newLogger() log.CtxLogger { w := zerolog.NewConsoleWriter() w.NoColor = true w.PartsExclude = []string{zerolog.TimestampFieldName} zlogger := zerolog.New(w) zlogger = zlogger.Level(zerolog.DebugLevel) logger := log.New(zlogger) logger = logger.CtxHook(ctxutil.MessageIDLogCtxHook{}) return logger } func generatorSource(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string, recordCount int, delay time.Duration) connector.Source { position := 0 source := connmock.NewSource(ctrl) source.EXPECT().Open(gomock.Any()).Return(nil).Times(1) source.EXPECT().Teardown(gomock.Any()).Return(nil).Times(1) source.EXPECT().Ack(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, p record.Position) error { logger.Debug(ctx).Str("node_id", nodeID).Msg("received ack") return nil }).Times(recordCount) source.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) (record.Record, error) { time.Sleep(delay) position++ if position > recordCount { return record.Record{}, plugins.ErrEndData } return record.Record{ Position: record.Position(strconv.Itoa(position)), }, nil }).MinTimes(recordCount + 1) source.EXPECT().Errors().Return(make(chan error)) return source } func printerDestination(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string) connector.Destination { destination := connmock.NewDestination(ctrl) destination.EXPECT().Open(gomock.Any()).Return(nil).Times(1) destination.EXPECT().Teardown(gomock.Any()).Return(nil).Times(1) destination.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r record.Record) error { logger.Debug(ctx). Str("node_id", nodeID). Msg("got record") return nil }).AnyTimes() destination.EXPECT().Errors().Return(make(chan error)) return destination } func counterProcessor(ctrl *gomock.Controller, count *int) processor.Processor { proc := procmock.NewProcessor(ctrl) proc.EXPECT().Execute(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r record.Record) (record.Record, error) { *count++ return r, nil }).AnyTimes() return proc } func gomockCtrl(logger log.CtxLogger) *gomock.Controller { return gomock.NewController(gomockLogger(logger)) } type gomockLogger log.CtxLogger func (g gomockLogger) Errorf(format string, args ...interface{}) { g.Error().Msgf(format, args...) } func (g gomockLogger) Fatalf(format string, args ...interface{}) { g.Fatal().Msgf(format, args...) } func runNode(ctx context.Context, wg *sync.WaitGroup, n stream.Node) { defer wg.Done() err := n.Run(ctx) if err != nil { fmt.Printf("%s error: %v\n", n.ID(), err) } } func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { c := make(chan struct{}) go func() { defer close(c) wg.Wait() }() select { case <-c: return false case <-time.After(timeout): return true } }
Output: DBG got record message_id=generator2/1 node_id=printer2 DBG got record message_id=generator2/1 node_id=printer1 DBG received ack message_id=generator2/1 node_id=generator2 DBG got record message_id=generator1/1 node_id=printer1 DBG got record message_id=generator1/1 node_id=printer2 DBG received ack message_id=generator1/1 node_id=generator1 DBG got record message_id=generator2/2 node_id=printer2 DBG got record message_id=generator2/2 node_id=printer1 DBG received ack message_id=generator2/2 node_id=generator2 DBG got record message_id=generator1/2 node_id=printer1 DBG got record message_id=generator1/2 node_id=printer2 DBG received ack message_id=generator1/2 node_id=generator1 DBG got record message_id=generator2/3 node_id=printer2 DBG got record message_id=generator2/3 node_id=printer1 DBG received ack message_id=generator2/3 node_id=generator2 DBG got record message_id=generator1/3 node_id=printer1 DBG got record message_id=generator1/3 node_id=printer2 DBG received ack message_id=generator1/3 node_id=generator1 DBG got record message_id=generator2/4 node_id=printer2 DBG got record message_id=generator2/4 node_id=printer1 DBG received ack message_id=generator2/4 node_id=generator2 DBG got record message_id=generator1/4 node_id=printer2 DBG got record message_id=generator1/4 node_id=printer1 DBG received ack message_id=generator1/4 node_id=generator1 DBG got record message_id=generator2/5 node_id=printer2 DBG got record message_id=generator2/5 node_id=printer1 DBG received ack message_id=generator2/5 node_id=generator2 DBG got record message_id=generator1/5 node_id=printer1 DBG got record message_id=generator1/5 node_id=printer2 DBG received ack message_id=generator1/5 node_id=generator1 DBG got record message_id=generator2/6 node_id=printer2 DBG got record message_id=generator2/6 node_id=printer1 DBG received ack message_id=generator2/6 node_id=generator2 DBG got record message_id=generator1/6 node_id=printer1 DBG got record message_id=generator1/6 node_id=printer2 DBG received ack message_id=generator1/6 node_id=generator1 DBG got record message_id=generator2/7 node_id=printer2 DBG got record message_id=generator2/7 node_id=printer1 DBG received ack message_id=generator2/7 node_id=generator2 DBG got record message_id=generator1/7 node_id=printer1 DBG got record message_id=generator1/7 node_id=printer2 DBG received ack message_id=generator1/7 node_id=generator1 DBG got record message_id=generator2/8 node_id=printer2 DBG got record message_id=generator2/8 node_id=printer1 DBG received ack message_id=generator2/8 node_id=generator2 DBG got record message_id=generator1/8 node_id=printer1 DBG got record message_id=generator1/8 node_id=printer2 DBG received ack message_id=generator1/8 node_id=generator1 DBG got record message_id=generator2/9 node_id=printer1 DBG got record message_id=generator2/9 node_id=printer2 DBG received ack message_id=generator2/9 node_id=generator2 DBG got record message_id=generator1/9 node_id=printer2 DBG got record message_id=generator1/9 node_id=printer1 DBG received ack message_id=generator1/9 node_id=generator1 DBG got record message_id=generator2/10 node_id=printer1 DBG got record message_id=generator2/10 node_id=printer2 DBG received ack message_id=generator2/10 node_id=generator2 DBG got record message_id=generator1/10 node_id=printer2 DBG got record message_id=generator1/10 node_id=printer1 DBG received ack message_id=generator1/10 node_id=generator1 DBG stop channel closed component=SourceNode node_id=generator1 DBG stop channel closed component=SourceNode node_id=generator2 DBG incoming messages channel closed component=ProcessorNode node_id=counter DBG incoming messages channel closed component=DestinationNode node_id=printer2 DBG incoming messages channel closed component=DestinationNode node_id=printer1 INF counter node counted 20 messages INF finished successfully
Example (SimpleStream) ¶
package main import ( "context" "fmt" "strconv" "sync" "time" "github.com/conduitio/conduit/pkg/connector" connmock "github.com/conduitio/conduit/pkg/connector/mock" "github.com/conduitio/conduit/pkg/foundation/ctxutil" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/foundation/metrics/noop" "github.com/conduitio/conduit/pkg/pipeline/stream" "github.com/conduitio/conduit/pkg/plugins" "github.com/conduitio/conduit/pkg/record" "github.com/golang/mock/gomock" "github.com/rs/zerolog" ) func main() { ctx, killAll := context.WithCancel(context.Background()) defer killAll() logger := newLogger() ctrl := gomockCtrl(logger) node1 := &stream.SourceNode{ Name: "generator", Source: generatorSource(ctrl, logger, "generator", 10, time.Millisecond*10), ConnectorTimer: noop.Timer{}, PipelineTimer: noop.Timer{}, } node2 := &stream.DestinationNode{ Name: "printer", Destination: printerDestination(ctrl, logger, "printer"), ConnectorTimer: noop.Timer{}, } stream.SetLogger(node1, logger) stream.SetLogger(node2, logger) // put everything together out := node1.Pub() node2.Sub(out) var wg sync.WaitGroup wg.Add(2) go runNode(ctx, &wg, node2) go runNode(ctx, &wg, node1) // stop node after 150ms, which should be enough to process the 10 messages time.AfterFunc(150*time.Millisecond, func() { node1.Stop(nil) }) // give the node some time to process the records, plus a bit of time to stop if waitTimeout(&wg, 200*time.Millisecond) { killAll() } else { logger.Info(ctx).Msg("finished successfully") } } func newLogger() log.CtxLogger { w := zerolog.NewConsoleWriter() w.NoColor = true w.PartsExclude = []string{zerolog.TimestampFieldName} zlogger := zerolog.New(w) zlogger = zlogger.Level(zerolog.DebugLevel) logger := log.New(zlogger) logger = logger.CtxHook(ctxutil.MessageIDLogCtxHook{}) return logger } func generatorSource(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string, recordCount int, delay time.Duration) connector.Source { position := 0 source := connmock.NewSource(ctrl) source.EXPECT().Open(gomock.Any()).Return(nil).Times(1) source.EXPECT().Teardown(gomock.Any()).Return(nil).Times(1) source.EXPECT().Ack(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, p record.Position) error { logger.Debug(ctx).Str("node_id", nodeID).Msg("received ack") return nil }).Times(recordCount) source.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) (record.Record, error) { time.Sleep(delay) position++ if position > recordCount { return record.Record{}, plugins.ErrEndData } return record.Record{ Position: record.Position(strconv.Itoa(position)), }, nil }).MinTimes(recordCount + 1) source.EXPECT().Errors().Return(make(chan error)) return source } func printerDestination(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string) connector.Destination { destination := connmock.NewDestination(ctrl) destination.EXPECT().Open(gomock.Any()).Return(nil).Times(1) destination.EXPECT().Teardown(gomock.Any()).Return(nil).Times(1) destination.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r record.Record) error { logger.Debug(ctx). Str("node_id", nodeID). Msg("got record") return nil }).AnyTimes() destination.EXPECT().Errors().Return(make(chan error)) return destination } func gomockCtrl(logger log.CtxLogger) *gomock.Controller { return gomock.NewController(gomockLogger(logger)) } type gomockLogger log.CtxLogger func (g gomockLogger) Errorf(format string, args ...interface{}) { g.Error().Msgf(format, args...) } func (g gomockLogger) Fatalf(format string, args ...interface{}) { g.Fatal().Msgf(format, args...) } func runNode(ctx context.Context, wg *sync.WaitGroup, n stream.Node) { defer wg.Done() err := n.Run(ctx) if err != nil { fmt.Printf("%s error: %v\n", n.ID(), err) } } func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { c := make(chan struct{}) go func() { defer close(c) wg.Wait() }() select { case <-c: return false case <-time.After(timeout): return true } }
Output: DBG got record message_id=generator/1 node_id=printer DBG received ack message_id=generator/1 node_id=generator DBG got record message_id=generator/2 node_id=printer DBG received ack message_id=generator/2 node_id=generator DBG got record message_id=generator/3 node_id=printer DBG received ack message_id=generator/3 node_id=generator DBG got record message_id=generator/4 node_id=printer DBG received ack message_id=generator/4 node_id=generator DBG got record message_id=generator/5 node_id=printer DBG received ack message_id=generator/5 node_id=generator DBG got record message_id=generator/6 node_id=printer DBG received ack message_id=generator/6 node_id=generator DBG got record message_id=generator/7 node_id=printer DBG received ack message_id=generator/7 node_id=generator DBG got record message_id=generator/8 node_id=printer DBG received ack message_id=generator/8 node_id=generator DBG got record message_id=generator/9 node_id=printer DBG received ack message_id=generator/9 node_id=generator DBG got record message_id=generator/10 node_id=printer DBG received ack message_id=generator/10 node_id=generator DBG stop channel closed component=SourceNode node_id=generator DBG incoming messages channel closed component=DestinationNode node_id=printer INF finished successfully
Index ¶
- Variables
- func SetLogger(n Node, logger log.CtxLogger)
- type AckHandler
- type AckMiddleware
- type DestinationNode
- type DropHandler
- type DropMiddleware
- type FaninNode
- type FanoutNode
- type LoggingNode
- type Message
- func (m *Message) Ack() error
- func (m *Message) Acked() <-chan struct{}
- func (m *Message) Clone() *Message
- func (m *Message) Drop()
- func (m *Message) Dropped() <-chan struct{}
- func (m *Message) ID() string
- func (m *Message) Nack(reason error) error
- func (m *Message) Nacked() <-chan struct{}
- func (m *Message) RegisterAckHandler(mw AckMiddleware)
- func (m *Message) RegisterDropHandler(mw DropMiddleware)
- func (m *Message) RegisterNackHandler(mw NackMiddleware)
- func (m *Message) RegisterStatusHandler(mw StatusChangeMiddleware)
- func (m *Message) Status() MessageStatus
- type MessageStatus
- type MetricsNode
- type NackHandler
- type NackMiddleware
- type Node
- type ProcessorNode
- type PubNode
- type PubSubNode
- type SourceNode
- type StatusChange
- type StatusChangeHandler
- type StatusChangeMiddleware
- type StoppableNode
- type SubNode
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrMessageDropped = cerrors.New("message is dropped") ErrUnexpectedMessageStatus = cerrors.New("unexpected message status") )
Functions ¶
Types ¶
type AckHandler ¶
AckHandler is a variation of the StatusChangeHandler that is only called when a message is acked. For more info see StatusChangeHandler.
type AckMiddleware ¶
type AckMiddleware func(*Message, AckHandler) error
AckMiddleware is a variation of the StatusChangeMiddleware that is only called when a message is acked. For more info see StatusChangeMiddleware.
type DestinationNode ¶
type DestinationNode struct { Name string Destination connector.Destination ConnectorTimer metrics.Timer // contains filtered or unexported fields }
DestinationNode wraps a Destination connector and implements the Sub node interface
func (*DestinationNode) ID ¶
func (n *DestinationNode) ID() string
func (*DestinationNode) SetLogger ¶
func (n *DestinationNode) SetLogger(logger log.CtxLogger)
SetLogger sets the logger.
func (*DestinationNode) Sub ¶
func (n *DestinationNode) Sub(in <-chan *Message)
Sub will subscribe this node to an incoming channel.
type DropHandler ¶
DropHandler is a variation of the StatusChangeHandler that is only called when a message is dropped. For more info see StatusChangeHandler.
type DropMiddleware ¶
type DropMiddleware func(*Message, error, DropHandler)
DropMiddleware is a variation of the StatusChangeMiddleware that is only called when a message is dropped. For more info see StatusChangeMiddleware.
type FanoutNode ¶
type FanoutNode struct { Name string // contains filtered or unexported fields }
func (*FanoutNode) ID ¶
func (n *FanoutNode) ID() string
func (*FanoutNode) Pub ¶
func (n *FanoutNode) Pub() <-chan *Message
func (*FanoutNode) Sub ¶
func (n *FanoutNode) Sub(in <-chan *Message)
type LoggingNode ¶
type LoggingNode interface { Node // SetLogger sets the logger used by the node for logging. SetLogger(log.CtxLogger) }
LoggingNode is a node which expects a logger.
type Message ¶
type Message struct { // Ctx is the context in which the record was fetched. It should be used for // any function calls when processing the message. If the context is done // the message should be dropped as soon as possible and not processed // further. Ctx context.Context // Record represents a single record attached to the message. Record record.Record // contains filtered or unexported fields }
Message represents a single message flowing through a pipeline.
func (*Message) Ack ¶
Ack marks the message as acked, calls the corresponding status change handlers and closes the channel returned by Acked. If an ack handler returns an error, the message is dropped instead, which means that registered status change handlers are again notified about the drop and the channel returned by Dropped is closed instead. Calling Ack after the message has already been nacked will panic, while subsequent calls to Ack on an acked or dropped message are a noop and return the same value.
func (*Message) Acked ¶
func (m *Message) Acked() <-chan struct{}
Acked returns a channel that's closed when the message has been acked. Successive calls to Acked return the same value. This function can be used to wait for a message to be acked without notifying the acker.
func (*Message) Clone ¶
Clone returns a cloned message with the same content but separate ack and nack handling.
func (*Message) Drop ¶
func (m *Message) Drop()
Drop marks the message as dropped, calls the registered status change handlers and closes the channel returned by Dropped. Calling Drop after the message has already been acked or nacked will panic, while subsequent calls to Drop on a dropped message are a noop.
func (*Message) Dropped ¶
func (m *Message) Dropped() <-chan struct{}
Dropped returns a channel that's closed when the message has been dropped. Successive calls to Dropped return the same value. This function can be used to wait for a message to be dropped without notifying the dropper.
func (*Message) ID ¶
ID returns a string representing a unique ID of this message. This is meant only for logging purposes.
func (*Message) Nack ¶
Nack marks the message as nacked, calls the registered status change handlers and closes the channel returned by Nacked. If no nack handlers were registered or a nack handler returns an error, the message is dropped instead, which means that registered status change handlers are again notified about the drop and the channel returned by Dropped is closed instead. Calling Nack after the message has already been acked will panic, while subsequent calls to Nack on a nacked or dropped message are a noop and return the same value.
func (*Message) Nacked ¶
func (m *Message) Nacked() <-chan struct{}
Nacked returns a channel that's closed when the message has been nacked. Successive calls to Nacked return the same value. This function can be used to wait for a message to be nacked without notifying the nacker.
func (*Message) RegisterAckHandler ¶
func (m *Message) RegisterAckHandler(mw AckMiddleware)
RegisterAckHandler is used to register a function that will be called when the message is acked. This function can only be called if the message status is open, otherwise it panics.
func (*Message) RegisterDropHandler ¶
func (m *Message) RegisterDropHandler(mw DropMiddleware)
RegisterDropHandler is used to register a function that will be called when the message is dropped. This function can only be called if the message status is open, otherwise it panics.
func (*Message) RegisterNackHandler ¶
func (m *Message) RegisterNackHandler(mw NackMiddleware)
RegisterNackHandler is used to register a function that will be called when the message is nacked. This function can only be called if the message status is open, otherwise it panics.
func (*Message) RegisterStatusHandler ¶
func (m *Message) RegisterStatusHandler(mw StatusChangeMiddleware)
RegisterStatusHandler is used to register a function that will be called on any status change of the message. This function can only be called if the message status is open, otherwise it panics. Middlewares are called in the reverse order of how they were registered.
func (*Message) Status ¶
func (m *Message) Status() MessageStatus
Status returns the current message status.
type MessageStatus ¶
type MessageStatus int
MessageStatus represents the state of the message (acked, nacked, dropped or open).
const ( MessageStatusAcked MessageStatus = iota MessageStatusNacked MessageStatusOpen MessageStatusDropped )
func (MessageStatus) String ¶
func (i MessageStatus) String() string
type MetricsNode ¶
type MetricsNode struct { Name string BytesHistogram metrics.Histogram // contains filtered or unexported fields }
func (*MetricsNode) ID ¶
func (n *MetricsNode) ID() string
func (*MetricsNode) Pub ¶
func (n *MetricsNode) Pub() <-chan *Message
func (*MetricsNode) SetLogger ¶
func (n *MetricsNode) SetLogger(logger log.CtxLogger)
func (*MetricsNode) Sub ¶
func (n *MetricsNode) Sub(in <-chan *Message)
type NackHandler ¶
NackHandler is a variation of the StatusChangeHandler that is only called when a message is nacked. For more info see StatusChangeHandler.
type NackMiddleware ¶
type NackMiddleware func(*Message, error, NackHandler) error
NackMiddleware is a variation of the StatusChangeMiddleware that is only called when a message is nacked. For more info see StatusChangeMiddleware.
type Node ¶
type Node interface { // ID returns the identifier of this Node. Each Node in a pipeline must be // uniquely identified by the ID. ID() string // Run first verifies if the Node is set up correctly and either returns a // descriptive error or starts processing messages. Processing should stop // as soon as the supplied context is done. If an error occurs while // processing messages, the processing should stop and the error should be // returned. If processing stopped because the context was canceled, the // function should return ctx.Err(). // Run has different responsibilities, depending on the node type: // * PubNode has to start producing new messages into the outgoing channel. // The context supplied to Run has to be attached to all messages. Each // message will be either acked or nacked by downstream nodes, it's the // responsibility of PubNode to handle these acks/nacks if applicable. // The outgoing channel has to be closed when Run returns, regardless of // the return value. // * SubNode has to start listening to messages sent to the incoming // channel. It has to use the context supplied in the message for calls // to other functions (imagine the message context as a request context). // It is the responsibility of SubNode to ack or nack a message if it's // processed correctly or with an error. If the incoming channel is // closed, then Run should stop and return nil. // * PubSubNode has to start listening to incoming messages, process them // and forward them to the outgoing channel. The node should not ack/nack // forwarded messages. If a message is dropped and not forwarded to the // outgoing channel (i.e. filters), the message should be acked. If an // error is encountered while processing the message, the message has to // be nacked and Run should return with an error. If the incoming channel // is closed, then Run should stop and return nil. The outgoing channel // has to be closed when Run returns, regardless of the return value. // The incoming message pointers need to be forwarded, as upstream nodes // could be waiting for acks/nacks on that exact pointer. If the node // forwards a new message (not the exact pointer it received), then it // needs to forward any acks/nacks to the original message pointer. Run(ctx context.Context) error }
Node represents a single node in a pipeline that knows how to process messages flowing through a pipeline.
type ProcessorNode ¶
type ProcessorNode struct { Name string Processor processor.Processor ProcessorTimer metrics.Timer // contains filtered or unexported fields }
func (*ProcessorNode) ID ¶
func (n *ProcessorNode) ID() string
func (*ProcessorNode) Pub ¶
func (n *ProcessorNode) Pub() <-chan *Message
func (*ProcessorNode) SetLogger ¶
func (n *ProcessorNode) SetLogger(logger log.CtxLogger)
func (*ProcessorNode) Sub ¶
func (n *ProcessorNode) Sub(in <-chan *Message)
type PubNode ¶
type PubNode interface { Node // Pub returns the outgoing channel, that can be used to connect downstream // nodes to PubNode. It is the responsibility of PubNode to close this // channel when it stops running (see Node.Run). Pub needs to be called // before running a PubNode, otherwise Node.Run should return an error. Pub() <-chan *Message }
PubNode represents a node at the start of a pipeline, which pushes new messages to downstream nodes.
type PubSubNode ¶
PubSubNode represents a node in the middle of a pipeline, located between two nodes. It listens to incoming messages from the incoming channel, processes them and forwards them to the outgoing channel.
type SourceNode ¶
type SourceNode struct { Name string Source connector.Source ConnectorTimer metrics.Timer PipelineTimer metrics.Timer // contains filtered or unexported fields }
SourceNode wraps a Source connector and implements the Pub node interface
func (*SourceNode) ID ¶
func (n *SourceNode) ID() string
ID returns a properly formatted SourceNode ID prefixed with `source/`
func (*SourceNode) Pub ¶
func (n *SourceNode) Pub() <-chan *Message
func (*SourceNode) SetLogger ¶
func (n *SourceNode) SetLogger(logger log.CtxLogger)
func (*SourceNode) Stop ¶
func (n *SourceNode) Stop(reason error)
type StatusChange ¶
type StatusChange struct { Old MessageStatus New MessageStatus // Reason contains the error that triggered the status change in case of a // nack or drop. Reason error }
StatusChange is passed to StatusChangeMiddleware and StatusChangeHandler when the status of a message changes.
type StatusChangeHandler ¶
type StatusChangeHandler func(*Message, StatusChange) error
StatusChangeHandler is executed when a message status changes. The handlers are triggered by a call to either of these functions: Message.Nack, Message.Ack, Message.Drop. These functions will block until the handlers finish handling the message and will return the error returned by the handlers. The function receives the message and the status change describing the old and new message status as well as the reason for the status change in case of a nack or drop.
type StatusChangeMiddleware ¶
type StatusChangeMiddleware func(*Message, StatusChange, StatusChangeHandler) error
StatusChangeMiddleware can be registered on a message and will be executed in case of a status change (see StatusChangeHandler). Middlewares are called in the reverse order of how they were registered. The middleware has two options when processing a message status change:
- If it successfully processed the status change it should call the next handler and return its error. The handler may inspect the error and act accordingly, but it must return that error (or another error that contains it). It must not return an error if the next handler was called and it returned nil.
- If it failed to process the status change successfully it must not call the next handler but instead return an error right away.
Applying these rules means each middleware can be sure that all middlewares before it processed the status change successfully.
type StoppableNode ¶
type StoppableNode interface { Node // Stop signals a running StopNode that it should gracefully shutdown. It // should stop producing new messages, wait to receive acks/nacks for any // in-flight messages, close the outgoing channel and return nil from // Node.Run. Stop should return right away, not waiting for the node to // actually stop running. If the node is not running the function does not // do anything. The reason supplied to Stop will be returned by Node.Run. Stop(reason error) }
type SubNode ¶
type SubNode interface { Node // Sub sets the incoming channel, that is used to listen to new messages. // Node.Run should listen to messages coming from this channel until the // channel is closed. Sub needs to be called before running a SubNode, // otherwise Node.Run should return an error. Sub(in <-chan *Message) }
SubNode represents a node at the end of a pipeline, which listens to incoming messages from upstream nodes.