
v0.3.0-nightly.20220610 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2022 License: Apache-2.0 Imports: 15 Imported by: 0



Package stream defines a message and nodes that can be composed into a data pipeline. Nodes are connected with channels which are used to pass messages between them.

We distinguish 3 node types: PubNode, PubSubNode and SubNode. A PubNode is at the start of the pipeline and publishes messages. A PubSubNode sits between two nodes, it receives messages from one node, processes them and sends them to the next one. A SubNode is the last node in the pipeline and only receives messages without sending them to any other node.

A message can have of these statuses:

Open      The message starts out in an open state and will stay open while
          it's passed around between the nodes.
Acked     Once a node successfully processes the message (e.g. it is sent to
          the destination or is filtered out by a processor) it is acked.
Nacked    If some node fails to process the message it can nack the message
          and once it's successfully nacked (e.g. sent to a dead letter queue)
          it becomes nacked.
Dropped   If a node experiences a non-recoverable error or has to stop running
          without sending the message to the next node (e.g. force stop) it
          can drop the message, then the message status changes to dropped.

In other words, once a node receives a message it has 4 options for how to handle it: it can either pass it to the next node (message stays open), ack the message and keep running, nack the message and keep running or drop the message and stop running. This means that no message will be left in an open status when the pipeline stops.

Nodes can register functions on the message which will be called when the status of a message changes. For more information see StatusChangeHandler.

Nodes can implement LoggingNode to receive a logger struct that can be used to output logs. The node should use the message context to create logs, this way the logger will automatically attach the message ID as well as the node ID to the message, making debugging easier.

Example (ComplexStream)
package main

import (

	connmock ""

	procmock ""

func main() {
	ctx, killAll := context.WithCancel(context.Background())
	defer killAll()

	logger := newLogger()
	ctrl := gomockCtrl(logger)

	var count int

	node1 := &stream.SourceNode{
		Name:          "generator1",
		Source:        generatorSource(ctrl, logger, "generator1", 10, time.Millisecond*10),
		PipelineTimer: noop.Timer{},
	node2 := &stream.SourceNode{
		Name:          "generator2",
		Source:        generatorSource(ctrl, logger, "generator2", 10, time.Millisecond*10),
		PipelineTimer: noop.Timer{},
	node3 := &stream.FaninNode{Name: "fanin"}
	node4 := &stream.ProcessorNode{
		Name:           "counter",
		Processor:      counterProcessor(ctrl, &count),
		ProcessorTimer: noop.Timer{},
	node5 := &stream.FanoutNode{Name: "fanout"}
	node6 := &stream.DestinationNode{
		Name:           "printer1",
		Destination:    printerDestination(ctrl, logger, "printer1"),
		ConnectorTimer: noop.Timer{},
	node7 := &stream.DestinationNode{
		Name:           "printer2",
		Destination:    printerDestination(ctrl, logger, "printer2"),
		ConnectorTimer: noop.Timer{},
	node8 := &stream.AckerNode{
		Name:        "printer1-acker",
		Destination: node6.Destination,
	node6.AckerNode = node8
	node9 := &stream.AckerNode{
		Name:        "printer2-acker",
		Destination: node7.Destination,
	node7.AckerNode = node9

	// put everything together
	out := node1.Pub()
	out = node2.Pub()

	out = node3.Pub()
	out = node4.Pub()

	out = node5.Pub()
	out = node5.Pub()

	// run nodes
	nodes := []stream.Node{node1, node2, node3, node4, node5, node6, node7, node8, node9}

	var wg sync.WaitGroup
	for _, n := range nodes {
		stream.SetLogger(n, logger)
		go runNode(ctx, &wg, n)

	// stop nodes after 250ms, which should be enough to process the 20 messages
		func() {
	// give the nodes some time to process the records, plus a bit of time to stop
	if waitTimeout(&wg, 1000*time.Millisecond) {
	} else {
		logger.Info(ctx).Msgf("counter node counted %d messages", count)
		logger.Info(ctx).Msg("finished successfully")


func newLogger() log.CtxLogger {
	w := zerolog.NewConsoleWriter()
	w.NoColor = true
	w.PartsExclude = []string{zerolog.TimestampFieldName}

	zlogger := zerolog.New(w)
	zlogger = zlogger.Level(zerolog.DebugLevel)
	logger := log.New(zlogger)
	logger = logger.CtxHook(ctxutil.MessageIDLogCtxHook{})

	return logger

func generatorSource(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string, recordCount int, delay time.Duration) connector.Source {
	position := 0

	stop := make(chan struct{})
	source := connmock.NewSource(ctrl)
	source.EXPECT().Ack(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, p record.Position) error {
		logger.Debug(ctx).Str("node_id", nodeID).Msg("received ack")
		return nil
	source.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) (record.Record, error) {

		if position > recordCount {

			return record.Record{}, plugin.ErrStreamNotOpen

		return record.Record{

			SourceID: "p",
			Position: record.Position(nodeID + "-" + strconv.Itoa(position)),
		}, nil
	}).MinTimes(recordCount + 1)
	source.EXPECT().Stop(gomock.Any()).DoAndReturn(func(context.Context) error {
		return nil
	source.EXPECT().Errors().Return(make(chan error))

	return source

func printerDestination(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string) connector.Destination {
	rchan := make(chan record.Record)
	destination := connmock.NewDestination(ctrl)
	destination.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r record.Record) error {
			Str("node_id", nodeID).
			Msg("got record")
		rchan <- r
		return nil
	destination.EXPECT().Ack(gomock.Any()).DoAndReturn(func(ctx context.Context) (record.Position, error) {
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		case r, ok := <-rchan:
			if !ok {
				return nil, nil
			return r.Position, nil
	destination.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
		return nil
	destination.EXPECT().Errors().Return(make(chan error))

	return destination

func counterProcessor(ctrl *gomock.Controller, count *int) processor.Processor {
	proc := procmock.NewProcessor(ctrl)
	proc.EXPECT().Execute(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r record.Record) (record.Record, error) {
		return r, nil
	return proc

func gomockCtrl(logger log.CtxLogger) *gomock.Controller {
	return gomock.NewController(gomockLogger(logger))

type gomockLogger log.CtxLogger

func (g gomockLogger) Errorf(format string, args ...interface{}) {
	g.Error().Msgf(format, args...)
func (g gomockLogger) Fatalf(format string, args ...interface{}) {
	g.Fatal().Msgf(format, args...)

func runNode(ctx context.Context, wg *sync.WaitGroup, n stream.Node) {
	defer wg.Done()
	err := n.Run(ctx)
	if err != nil {
		fmt.Printf("%s error: %v\n", n.ID(), err)

func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
	c := make(chan struct{})
	go func() {
		defer close(c)
	select {
	case <-c:
		return false
	case <-time.After(timeout):
		return true

DBG got record message_id=p/generator2-1 node_id=printer2
DBG got record message_id=p/generator2-1 node_id=printer1
DBG received ack message_id=p/generator2-1 node_id=generator2
DBG got record message_id=p/generator1-1 node_id=printer1
DBG got record message_id=p/generator1-1 node_id=printer2
DBG received ack message_id=p/generator1-1 node_id=generator1
DBG got record message_id=p/generator2-2 node_id=printer2
DBG got record message_id=p/generator2-2 node_id=printer1
DBG received ack message_id=p/generator2-2 node_id=generator2
DBG got record message_id=p/generator1-2 node_id=printer1
DBG got record message_id=p/generator1-2 node_id=printer2
DBG received ack message_id=p/generator1-2 node_id=generator1
DBG got record message_id=p/generator2-3 node_id=printer2
DBG got record message_id=p/generator2-3 node_id=printer1
DBG received ack message_id=p/generator2-3 node_id=generator2
DBG got record message_id=p/generator1-3 node_id=printer1
DBG got record message_id=p/generator1-3 node_id=printer2
DBG received ack message_id=p/generator1-3 node_id=generator1
DBG got record message_id=p/generator2-4 node_id=printer2
DBG got record message_id=p/generator2-4 node_id=printer1
DBG received ack message_id=p/generator2-4 node_id=generator2
DBG got record message_id=p/generator1-4 node_id=printer2
DBG got record message_id=p/generator1-4 node_id=printer1
DBG received ack message_id=p/generator1-4 node_id=generator1
DBG got record message_id=p/generator2-5 node_id=printer2
DBG got record message_id=p/generator2-5 node_id=printer1
DBG received ack message_id=p/generator2-5 node_id=generator2
DBG got record message_id=p/generator1-5 node_id=printer1
DBG got record message_id=p/generator1-5 node_id=printer2
DBG received ack message_id=p/generator1-5 node_id=generator1
DBG got record message_id=p/generator2-6 node_id=printer2
DBG got record message_id=p/generator2-6 node_id=printer1
DBG received ack message_id=p/generator2-6 node_id=generator2
DBG got record message_id=p/generator1-6 node_id=printer1
DBG got record message_id=p/generator1-6 node_id=printer2
DBG received ack message_id=p/generator1-6 node_id=generator1
DBG got record message_id=p/generator2-7 node_id=printer2
DBG got record message_id=p/generator2-7 node_id=printer1
DBG received ack message_id=p/generator2-7 node_id=generator2
DBG got record message_id=p/generator1-7 node_id=printer1
DBG got record message_id=p/generator1-7 node_id=printer2
DBG received ack message_id=p/generator1-7 node_id=generator1
DBG got record message_id=p/generator2-8 node_id=printer2
DBG got record message_id=p/generator2-8 node_id=printer1
DBG received ack message_id=p/generator2-8 node_id=generator2
DBG got record message_id=p/generator1-8 node_id=printer1
DBG got record message_id=p/generator1-8 node_id=printer2
DBG received ack message_id=p/generator1-8 node_id=generator1
DBG got record message_id=p/generator2-9 node_id=printer1
DBG got record message_id=p/generator2-9 node_id=printer2
DBG received ack message_id=p/generator2-9 node_id=generator2
DBG got record message_id=p/generator1-9 node_id=printer2
DBG got record message_id=p/generator1-9 node_id=printer1
DBG received ack message_id=p/generator1-9 node_id=generator1
DBG got record message_id=p/generator2-10 node_id=printer1
DBG got record message_id=p/generator2-10 node_id=printer2
DBG received ack message_id=p/generator2-10 node_id=generator2
DBG got record message_id=p/generator1-10 node_id=printer2
DBG got record message_id=p/generator1-10 node_id=printer1
DBG received ack message_id=p/generator1-10 node_id=generator1
INF stopping source connector component=SourceNode node_id=generator1
INF stopping source connector component=SourceNode node_id=generator2
DBG received error on error channel error="error reading from source: stream not open" component=SourceNode node_id=generator1
DBG received error on error channel error="error reading from source: stream not open" component=SourceNode node_id=generator2
DBG incoming messages channel closed component=ProcessorNode node_id=counter
DBG incoming messages channel closed component=DestinationNode node_id=printer2
DBG incoming messages channel closed component=DestinationNode node_id=printer1
INF counter node counted 20 messages
INF finished successfully
Example (SimpleStream)
package main

import (

	connmock ""

func main() {
	ctx, killAll := context.WithCancel(context.Background())
	defer killAll()

	logger := newLogger()
	ctrl := gomockCtrl(logger)

	node1 := &stream.SourceNode{
		Name:          "generator",
		Source:        generatorSource(ctrl, logger, "generator", 10, time.Millisecond*10),
		PipelineTimer: noop.Timer{},
	node2 := &stream.DestinationNode{
		Name:           "printer",
		Destination:    printerDestination(ctrl, logger, "printer"),
		ConnectorTimer: noop.Timer{},
	node3 := &stream.AckerNode{
		Name:        "printer-acker",
		Destination: node2.Destination,
	node2.AckerNode = node3

	stream.SetLogger(node1, logger)
	stream.SetLogger(node2, logger)
	stream.SetLogger(node3, logger)

	// put everything together
	out := node1.Pub()

	var wg sync.WaitGroup
	go runNode(ctx, &wg, node3)
	go runNode(ctx, &wg, node2)
	go runNode(ctx, &wg, node1)

	// stop node after 150ms, which should be enough to process the 10 messages
	time.AfterFunc(150*time.Millisecond, func() { node1.Stop(nil) })
	// give the node some time to process the records, plus a bit of time to stop
	if waitTimeout(&wg, 1000*time.Millisecond) {
	} else {
		logger.Info(ctx).Msg("finished successfully")


func newLogger() log.CtxLogger {
	w := zerolog.NewConsoleWriter()
	w.NoColor = true
	w.PartsExclude = []string{zerolog.TimestampFieldName}

	zlogger := zerolog.New(w)
	zlogger = zlogger.Level(zerolog.DebugLevel)
	logger := log.New(zlogger)
	logger = logger.CtxHook(ctxutil.MessageIDLogCtxHook{})

	return logger

func generatorSource(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string, recordCount int, delay time.Duration) connector.Source {
	position := 0

	stop := make(chan struct{})
	source := connmock.NewSource(ctrl)
	source.EXPECT().Ack(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, p record.Position) error {
		logger.Debug(ctx).Str("node_id", nodeID).Msg("received ack")
		return nil
	source.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) (record.Record, error) {

		if position > recordCount {

			return record.Record{}, plugin.ErrStreamNotOpen

		return record.Record{

			SourceID: "p",
			Position: record.Position(nodeID + "-" + strconv.Itoa(position)),
		}, nil
	}).MinTimes(recordCount + 1)
	source.EXPECT().Stop(gomock.Any()).DoAndReturn(func(context.Context) error {
		return nil
	source.EXPECT().Errors().Return(make(chan error))

	return source

func printerDestination(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string) connector.Destination {
	rchan := make(chan record.Record)
	destination := connmock.NewDestination(ctrl)
	destination.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r record.Record) error {
			Str("node_id", nodeID).
			Msg("got record")
		rchan <- r
		return nil
	destination.EXPECT().Ack(gomock.Any()).DoAndReturn(func(ctx context.Context) (record.Position, error) {
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		case r, ok := <-rchan:
			if !ok {
				return nil, nil
			return r.Position, nil
	destination.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
		return nil
	destination.EXPECT().Errors().Return(make(chan error))

	return destination

func gomockCtrl(logger log.CtxLogger) *gomock.Controller {
	return gomock.NewController(gomockLogger(logger))

type gomockLogger log.CtxLogger

func (g gomockLogger) Errorf(format string, args ...interface{}) {
	g.Error().Msgf(format, args...)
func (g gomockLogger) Fatalf(format string, args ...interface{}) {
	g.Fatal().Msgf(format, args...)

func runNode(ctx context.Context, wg *sync.WaitGroup, n stream.Node) {
	defer wg.Done()
	err := n.Run(ctx)
	if err != nil {
		fmt.Printf("%s error: %v\n", n.ID(), err)

func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
	c := make(chan struct{})
	go func() {
		defer close(c)
	select {
	case <-c:
		return false
	case <-time.After(timeout):
		return true

DBG got record message_id=p/generator-1 node_id=printer
DBG received ack message_id=p/generator-1 node_id=generator
DBG got record message_id=p/generator-2 node_id=printer
DBG received ack message_id=p/generator-2 node_id=generator
DBG got record message_id=p/generator-3 node_id=printer
DBG received ack message_id=p/generator-3 node_id=generator
DBG got record message_id=p/generator-4 node_id=printer
DBG received ack message_id=p/generator-4 node_id=generator
DBG got record message_id=p/generator-5 node_id=printer
DBG received ack message_id=p/generator-5 node_id=generator
DBG got record message_id=p/generator-6 node_id=printer
DBG received ack message_id=p/generator-6 node_id=generator
DBG got record message_id=p/generator-7 node_id=printer
DBG received ack message_id=p/generator-7 node_id=generator
DBG got record message_id=p/generator-8 node_id=printer
DBG received ack message_id=p/generator-8 node_id=generator
DBG got record message_id=p/generator-9 node_id=printer
DBG received ack message_id=p/generator-9 node_id=generator
DBG got record message_id=p/generator-10 node_id=printer
DBG received ack message_id=p/generator-10 node_id=generator
INF stopping source connector component=SourceNode node_id=generator
DBG received error on error channel error="error reading from source: stream not open" component=SourceNode node_id=generator
DBG incoming messages channel closed component=DestinationNode node_id=printer
INF finished successfully




This section is empty.


View Source
var (
	ErrMessageDropped          = cerrors.New("message is dropped")
	ErrUnexpectedMessageStatus = cerrors.New("unexpected message status")


func SetLogger

func SetLogger(n Node, logger log.CtxLogger)

SetLogger figures out if the node needs a logger, sets static metadata in the logger and supplies it to the node.


type AckHandler

type AckHandler func(*Message) error

AckHandler is a variation of the StatusChangeHandler that is only called when a message is acked. For more info see StatusChangeHandler.

type AckMiddleware

type AckMiddleware func(*Message, AckHandler) error

AckMiddleware is a variation of the StatusChangeMiddleware that is only called when a message is acked. For more info see StatusChangeMiddleware.

type AckerNode added in v0.2.0

type AckerNode struct {
	Name        string
	Destination connector.Destination
	// contains filtered or unexported fields

AckerNode is responsible for handling acknowledgments received from the destination and forwarding them to the correct message.

func (*AckerNode) ExpectAck added in v0.2.0

func (n *AckerNode) ExpectAck(msg *Message) error

ExpectAck makes the handler aware of the message and signals to it that an ack for this message might be received at some point.

func (*AckerNode) ForgetAndDrop added in v0.2.0

func (n *AckerNode) ForgetAndDrop(msg *Message)

ForgetAndDrop signals the handler that an ack for this message won't be received, and it should remove it from its cache. In case an ack for this message wasn't yet received it drops the message, otherwise it does nothing.

func (*AckerNode) ID added in v0.2.0

func (n *AckerNode) ID() string

func (*AckerNode) Run added in v0.2.0

func (n *AckerNode) Run(ctx context.Context) (err error)

Run continuously fetches acks from the destination and forwards them to the correct message by calling Ack or Nack on that message.

func (*AckerNode) SetLogger added in v0.2.0

func (n *AckerNode) SetLogger(logger log.CtxLogger)

SetLogger sets the logger.

func (*AckerNode) Wait added in v0.2.0

func (n *AckerNode) Wait(ctx context.Context)

Wait can be used to wait for the count of outstanding acks to drop to 0 or the context gets canceled. Wait is expected to be the last function called on AckerNode, after Wait returns AckerNode will soon stop running.

type DestinationNode

type DestinationNode struct {
	Name           string
	Destination    connector.Destination
	ConnectorTimer metrics.Timer
	// AckerNode is responsible for handling acks
	AckerNode *AckerNode
	// contains filtered or unexported fields

DestinationNode wraps a Destination connector and implements the Sub node interface

func (*DestinationNode) ID

func (n *DestinationNode) ID() string

func (*DestinationNode) Run

func (n *DestinationNode) Run(ctx context.Context) (err error)

func (*DestinationNode) SetLogger

func (n *DestinationNode) SetLogger(logger log.CtxLogger)

SetLogger sets the logger.

func (*DestinationNode) Sub

func (n *DestinationNode) Sub(in <-chan *Message)

Sub will subscribe this node to an incoming channel.

type DropHandler

type DropHandler func(*Message, error)

DropHandler is a variation of the StatusChangeHandler that is only called when a message is dropped. For more info see StatusChangeHandler.

type DropMiddleware

type DropMiddleware func(*Message, error, DropHandler)

DropMiddleware is a variation of the StatusChangeMiddleware that is only called when a message is dropped. For more info see StatusChangeMiddleware.

type FaninNode

type FaninNode struct {
	Name string
	// contains filtered or unexported fields

func (*FaninNode) ID

func (n *FaninNode) ID() string

func (*FaninNode) Pub

func (n *FaninNode) Pub() <-chan *Message

func (*FaninNode) Run

func (n *FaninNode) Run(ctx context.Context) error

func (*FaninNode) Sub

func (n *FaninNode) Sub(in <-chan *Message)

type FanoutNode

type FanoutNode struct {
	Name string
	// contains filtered or unexported fields

func (*FanoutNode) ID

func (n *FanoutNode) ID() string

func (*FanoutNode) Pub

func (n *FanoutNode) Pub() <-chan *Message

func (*FanoutNode) Run

func (n *FanoutNode) Run(ctx context.Context) error

func (*FanoutNode) Sub

func (n *FanoutNode) Sub(in <-chan *Message)

type LoggingNode

type LoggingNode interface {

	// SetLogger sets the logger used by the node for logging.

LoggingNode is a node which expects a logger.

type Message

type Message struct {
	// Ctx is the context in which the record was fetched. It should be used for
	// any function calls when processing the message. If the context is done
	// the message should be dropped as soon as possible and not processed
	// further.
	Ctx context.Context
	// Record represents a single record attached to the message.
	Record record.Record
	// contains filtered or unexported fields

Message represents a single message flowing through a pipeline.

func (*Message) Ack

func (m *Message) Ack() error

Ack marks the message as acked, calls the corresponding status change handlers and closes the channel returned by Acked. If an ack handler returns an error, the message is dropped instead, which means that registered status change handlers are again notified about the drop and the channel returned by Dropped is closed instead. Calling Ack after the message has already been nacked will panic, while subsequent calls to Ack on an acked or dropped message are a noop and return the same value.

func (*Message) Acked

func (m *Message) Acked() <-chan struct{}

Acked returns a channel that's closed when the message has been acked. Successive calls to Acked return the same value. This function can be used to wait for a message to be acked without notifying the acker.

func (*Message) Clone

func (m *Message) Clone() *Message

Clone returns a cloned message with the same content but separate ack and nack handling.

func (*Message) Drop

func (m *Message) Drop()

Drop marks the message as dropped, calls the registered status change handlers and closes the channel returned by Dropped. Calling Drop after the message has already been acked or nacked will panic, while subsequent calls to Drop on a dropped message are a noop.

func (*Message) Dropped

func (m *Message) Dropped() <-chan struct{}

Dropped returns a channel that's closed when the message has been dropped. Successive calls to Dropped return the same value. This function can be used to wait for a message to be dropped without notifying the dropper.

func (*Message) ID

func (m *Message) ID() string

ID returns a string representing a unique ID of this message. This is meant only for logging purposes.

func (*Message) Nack

func (m *Message) Nack(reason error) error

Nack marks the message as nacked, calls the registered status change handlers and closes the channel returned by Nacked. If no nack handlers were registered or a nack handler returns an error, the message is dropped instead, which means that registered status change handlers are again notified about the drop and the channel returned by Dropped is closed instead. Calling Nack after the message has already been acked will panic, while subsequent calls to Nack on a nacked or dropped message are a noop and return the same value.

func (*Message) Nacked

func (m *Message) Nacked() <-chan struct{}

Nacked returns a channel that's closed when the message has been nacked. Successive calls to Nacked return the same value. This function can be used to wait for a message to be nacked without notifying the nacker.

func (*Message) RegisterAckHandler

func (m *Message) RegisterAckHandler(mw AckMiddleware)

RegisterAckHandler is used to register a function that will be called when the message is acked. This function can only be called if the message status is open, otherwise it panics.

func (*Message) RegisterDropHandler

func (m *Message) RegisterDropHandler(mw DropMiddleware)

RegisterDropHandler is used to register a function that will be called when the message is dropped. This function can only be called if the message status is open, otherwise it panics.

func (*Message) RegisterNackHandler

func (m *Message) RegisterNackHandler(mw NackMiddleware)

RegisterNackHandler is used to register a function that will be called when the message is nacked. This function can only be called if the message status is open, otherwise it panics.

func (*Message) RegisterStatusHandler

func (m *Message) RegisterStatusHandler(mw StatusChangeMiddleware)

RegisterStatusHandler is used to register a function that will be called on any status change of the message. This function can only be called if the message status is open, otherwise it panics. Middlewares are called in the reverse order of how they were registered.

func (*Message) Status

func (m *Message) Status() MessageStatus

Status returns the current message status.

type MessageStatus

type MessageStatus int

MessageStatus represents the state of the message (acked, nacked, dropped or open).

const (
	MessageStatusAcked MessageStatus = iota

func (MessageStatus) String

func (i MessageStatus) String() string

type MetricsNode

type MetricsNode struct {
	Name           string
	BytesHistogram metrics.Histogram
	// contains filtered or unexported fields

func (*MetricsNode) ID

func (n *MetricsNode) ID() string

func (*MetricsNode) Pub

func (n *MetricsNode) Pub() <-chan *Message

func (*MetricsNode) Run

func (n *MetricsNode) Run(ctx context.Context) error

func (*MetricsNode) SetLogger

func (n *MetricsNode) SetLogger(logger log.CtxLogger)

func (*MetricsNode) Sub

func (n *MetricsNode) Sub(in <-chan *Message)

type NackHandler

type NackHandler func(*Message, error) error

NackHandler is a variation of the StatusChangeHandler that is only called when a message is nacked. For more info see StatusChangeHandler.

type NackMiddleware

type NackMiddleware func(*Message, error, NackHandler) error

NackMiddleware is a variation of the StatusChangeMiddleware that is only called when a message is nacked. For more info see StatusChangeMiddleware.

type Node

type Node interface {
	// ID returns the identifier of this Node. Each Node in a pipeline must be
	// uniquely identified by the ID.
	ID() string

	// Run first verifies if the Node is set up correctly and either returns a
	// descriptive error or starts processing messages. Processing should stop
	// as soon as the supplied context is done. If an error occurs while
	// processing messages, the processing should stop and the error should be
	// returned. If processing stopped because the context was canceled, the
	// function should return ctx.Err().
	// Run has different responsibilities, depending on the node type:
	//  * PubNode has to start producing new messages into the outgoing channel.
	//    The context supplied to Run has to be attached to all messages. Each
	//    message will be either acked or nacked by downstream nodes, it's the
	//    responsibility of PubNode to handle these acks/nacks if applicable.
	//    The outgoing channel has to be closed when Run returns, regardless of
	//    the return value.
	//  * SubNode has to start listening to messages sent to the incoming
	//    channel. It has to use the context supplied in the message for calls
	//    to other functions (imagine the message context as a request context).
	//    It is the responsibility of SubNode to ack or nack a message if it's
	//    processed correctly or with an error. If the incoming channel is
	//    closed, then Run should stop and return nil.
	//  * PubSubNode has to start listening to incoming messages, process them
	//    and forward them to the outgoing channel. The node should not ack/nack
	//    forwarded messages. If a message is dropped and not forwarded to the
	//    outgoing channel (i.e. filters), the message should be acked. If an
	//    error is encountered while processing the message, the message has to
	//    be nacked and Run should return with an error. If the incoming channel
	//	  is closed, then Run should stop and return nil. The outgoing channel
	//    has to be closed when Run returns, regardless of the return value.
	//    The incoming message pointers need to be forwarded, as upstream nodes
	//    could be waiting for acks/nacks on that exact pointer. If the node
	//    forwards a new message (not the exact pointer it received), then it
	//    needs to forward any acks/nacks to the original message pointer.
	Run(ctx context.Context) error

Node represents a single node in a pipeline that knows how to process messages flowing through a pipeline.

type ProcessorNode

type ProcessorNode struct {
	Name           string
	Processor      processor.Processor
	ProcessorTimer metrics.Timer
	// contains filtered or unexported fields

func (*ProcessorNode) ID

func (n *ProcessorNode) ID() string

func (*ProcessorNode) Pub

func (n *ProcessorNode) Pub() <-chan *Message

func (*ProcessorNode) Run

func (n *ProcessorNode) Run(ctx context.Context) error

func (*ProcessorNode) SetLogger

func (n *ProcessorNode) SetLogger(logger log.CtxLogger)

func (*ProcessorNode) Sub

func (n *ProcessorNode) Sub(in <-chan *Message)

type PubNode

type PubNode interface {

	// Pub returns the outgoing channel, that can be used to connect downstream
	// nodes to PubNode. It is the responsibility of PubNode to close this
	// channel when it stops running (see Node.Run). Pub needs to be called
	// before running a PubNode, otherwise Node.Run should return an error.
	Pub() <-chan *Message

PubNode represents a node at the start of a pipeline, which pushes new messages to downstream nodes.

type PubSubNode

type PubSubNode interface {

PubSubNode represents a node in the middle of a pipeline, located between two nodes. It listens to incoming messages from the incoming channel, processes them and forwards them to the outgoing channel.

type SourceNode

type SourceNode struct {
	Name          string
	Source        connector.Source
	PipelineTimer metrics.Timer
	// contains filtered or unexported fields

SourceNode wraps a Source connector and implements the Pub node interface

func (*SourceNode) ID

func (n *SourceNode) ID() string

ID returns a properly formatted SourceNode ID prefixed with `source/`

func (*SourceNode) Pub

func (n *SourceNode) Pub() <-chan *Message

func (*SourceNode) Run

func (n *SourceNode) Run(ctx context.Context) (err error)

func (*SourceNode) SetLogger

func (n *SourceNode) SetLogger(logger log.CtxLogger)

func (*SourceNode) Stop

func (n *SourceNode) Stop(reason error)

type StatusChange

type StatusChange struct {
	Old MessageStatus
	New MessageStatus
	// Reason contains the error that triggered the status change in case of a
	// nack or drop.
	Reason error

StatusChange is passed to StatusChangeMiddleware and StatusChangeHandler when the status of a message changes.

type StatusChangeHandler

type StatusChangeHandler func(*Message, StatusChange) error

StatusChangeHandler is executed when a message status changes. The handlers are triggered by a call to either of these functions: Message.Nack, Message.Ack, Message.Drop. These functions will block until the handlers finish handling the message and will return the error returned by the handlers. The function receives the message and the status change describing the old and new message status as well as the reason for the status change in case of a nack or drop.

type StatusChangeMiddleware

type StatusChangeMiddleware func(*Message, StatusChange, StatusChangeHandler) error

StatusChangeMiddleware can be registered on a message and will be executed in case of a status change (see StatusChangeHandler). Middlewares are called in the reverse order of how they were registered. The middleware has two options when processing a message status change:

  • If it successfully processed the status change it should call the next handler and return its error. The handler may inspect the error and act accordingly, but it must return that error (or another error that contains it). It must not return an error if the next handler was called and it returned nil.
  • If it failed to process the status change successfully it must not call the next handler but instead return an error right away.

Applying these rules means each middleware can be sure that all middlewares before it processed the status change successfully.

type StoppableNode

type StoppableNode interface {

	// Stop signals a running StopNode that it should gracefully shutdown. It
	// should stop producing new messages, wait to receive acks/nacks for any
	// in-flight messages, close the outgoing channel and return nil from
	// Node.Run. Stop should return right away, not waiting for the node to
	// actually stop running. If the node is not running the function does not
	// do anything. The reason supplied to Stop will be returned by Node.Run.
	Stop(reason error)

type SubNode

type SubNode interface {

	// Sub sets the incoming channel, that is used to listen to new messages.
	// Node.Run should listen to messages coming from this channel until the
	// channel is closed. Sub needs to be called before running a SubNode,
	// otherwise Node.Run should return an error.
	Sub(in <-chan *Message)

SubNode represents a node at the end of a pipeline, which listens to incoming messages from upstream nodes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL