stream

package
v0.12.0-nightly.20241008 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package stream defines a message and nodes that can be composed into a data pipeline. Nodes are connected with channels which are used to pass messages between them.

We distinguish 3 node types: PubNode, PubSubNode and SubNode. A PubNode is at the start of the pipeline and publishes messages. A PubSubNode sits between two nodes, it receives messages from one node, processes them and sends them to the next one. A SubNode is the last node in the pipeline and only receives messages without sending them to any other node.

A message can have of these statuses:

Open      The message starts out in an open state and will stay open while
          it's passed around between the nodes.
Acked     Once a node successfully processes the message (e.g. it is sent to
          the destination or is filtered out by a processor) it is acked.
Nacked    If some node fails to process the message it nacks the message. In
          that case a handler can pick it up to send it to a dead letter
          queue.

In other words, once a node receives a message it has 3 options for how to handle it: it can either pass it to the next node (message stays open), ack the message and keep running if ack is successful, nack the message and keep running if nack is successful. This means that no message will be left in an open status when the pipeline stops.

Nodes can register functions on the message which will be called when the status of a message changes. For more information see StatusChangeHandler.

Nodes can implement LoggingNode to receive a logger struct that can be used to output logs. The node should use the message context to create logs, this way the logger will automatically attach the message ID as well as the node ID to the message, making debugging easier.

Example (ComplexStream)
package main

import (
	"context"
	"fmt"
	"strconv"
	"sync"
	"time"

	"github.com/conduitio/conduit-commons/csync"
	"github.com/conduitio/conduit-commons/opencdc"

	sdk "github.com/conduitio/conduit-processor-sdk"
	"github.com/conduitio/conduit/pkg/connector"
	"github.com/conduitio/conduit/pkg/foundation/ctxutil"
	"github.com/conduitio/conduit/pkg/foundation/log"
	"github.com/conduitio/conduit/pkg/foundation/metrics/noop"
	"github.com/conduitio/conduit/pkg/lifecycle/stream"

	streammock "github.com/conduitio/conduit/pkg/lifecycle/stream/mock"

	connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
	"github.com/rs/zerolog"
	"go.uber.org/mock/gomock"
)

func main() {
	ctx, killAll := context.WithCancel(context.Background())
	defer killAll()

	logger := newLogger()
	ctrl := gomockCtrl(logger)

	var count int

	dlqNode := &stream.DLQHandlerNode{
		Name:                "dlq",
		Handler:             noopDLQHandler(ctrl),
		WindowSize:          1,
		WindowNackThreshold: 0,
	}
	dlqNode.Add(2) // 2 sources
	node1 := &stream.SourceNode{
		Name:          "generator1",
		Source:        generatorSource(ctrl, logger, "generator1", 10, time.Millisecond*10),
		PipelineTimer: noop.Timer{},
	}
	node2 := &stream.SourceAckerNode{
		Name:           "generator1-acker",
		Source:         node1.Source,
		DLQHandlerNode: dlqNode,
	}
	node3 := &stream.SourceNode{
		Name:          "generator2",
		Source:        generatorSource(ctrl, logger, "generator2", 10, time.Millisecond*10),
		PipelineTimer: noop.Timer{},
	}
	node4 := &stream.SourceAckerNode{
		Name:           "generator2-acker",
		Source:         node3.Source,
		DLQHandlerNode: dlqNode,
	}
	node5 := &stream.FaninNode{Name: "fanin"}
	node6 := &stream.ProcessorNode{
		Name:           "counter",
		Processor:      counterProcessor(ctrl, &count),
		ProcessorTimer: noop.Timer{},
	}
	node7 := &stream.FanoutNode{Name: "fanout"}
	node8 := &stream.DestinationNode{
		Name:           "printer1",
		Destination:    printerDestination(ctrl, logger, "printer1"),
		ConnectorTimer: noop.Timer{},
	}
	node9 := &stream.DestinationAckerNode{
		Name:        "printer1-acker",
		Destination: node8.Destination,
	}
	node10 := &stream.DestinationNode{
		Name:           "printer2",
		Destination:    printerDestination(ctrl, logger, "printer2"),
		ConnectorTimer: noop.Timer{},
	}
	node11 := &stream.DestinationAckerNode{
		Name:        "printer2-acker",
		Destination: node10.Destination,
	}

	// put everything together
	// this is the pipeline we are building
	// [1] -> [2] -\                       /-> [8] -> [9]
	//              |- [5] -> [6] -> [7] -|
	// [3] -> [4] -/                       \-> [10] -> [11]
	node2.Sub(node1.Pub())
	node4.Sub(node3.Pub())

	node5.Sub(node2.Pub())
	node5.Sub(node4.Pub())

	node6.Sub(node5.Pub())
	node7.Sub(node6.Pub())

	node8.Sub(node7.Pub())
	node10.Sub(node7.Pub())

	node9.Sub(node8.Pub())
	node11.Sub(node10.Pub())

	// run nodes
	nodes := []stream.Node{dlqNode, node1, node2, node3, node4, node5, node6, node7, node8, node9, node10, node11}

	var wg sync.WaitGroup
	wg.Add(len(nodes))
	for _, n := range nodes {
		stream.SetLogger(n, logger)
		go runNode(ctx, &wg, n)
	}

	// stop nodes after 250ms, which should be enough to process the 20 messages
	time.AfterFunc(
		250*time.Millisecond,
		func() {
			_ = node1.Stop(ctx, nil)
			_ = node3.Stop(ctx, nil)
		},
	)
	// give the nodes some time to process the records, plus a bit of time to stop
	if (*csync.WaitGroup)(&wg).WaitTimeout(ctx, time.Second) != nil {
		killAll()
	} else {
		logger.Info(ctx).Msgf("counter node counted %d messages", count)
		logger.Info(ctx).Msg("finished successfully")
	}

}

func newLogger() log.CtxLogger {
	w := zerolog.NewConsoleWriter()
	w.NoColor = true
	w.PartsExclude = []string{zerolog.TimestampFieldName}

	zlogger := zerolog.New(w)
	zlogger = zlogger.Level(zerolog.DebugLevel)
	logger := log.New(zlogger)
	logger.Logger = logger.Hook(ctxutil.MessageIDLogCtxHook{})

	return logger
}

func generatorSource(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string, recordCount int, delay time.Duration) stream.Source {
	position := 0

	teardown := make(chan struct{})
	source := streammock.NewSource(ctrl)
	source.EXPECT().ID().Return(nodeID).AnyTimes()
	source.EXPECT().Open(gomock.Any()).Return(nil)
	source.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(context.Context) error {
		close(teardown)
		return nil
	})
	source.EXPECT().Ack(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, p []opencdc.Position) error {
		logger.Debug(ctx).Str("node_id", nodeID).Msg("received ack")
		return nil
	}).Times(recordCount)
	source.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) ([]opencdc.Record, error) {
		time.Sleep(delay)

		if position == recordCount {

			<-teardown
			return nil, connectorPlugin.ErrStreamNotOpen
		}

		position++
		return []opencdc.Record{{
			Position: opencdc.Position(strconv.Itoa(position)),
		}}, nil
	}).MinTimes(recordCount + 1)
	source.EXPECT().Stop(gomock.Any()).DoAndReturn(func(context.Context) (opencdc.Position, error) {
		return opencdc.Position(strconv.Itoa(position)), nil
	})
	source.EXPECT().Errors().Return(make(chan error))

	return source
}

func printerDestination(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string) stream.Destination {
	var lastPosition opencdc.Position
	rchan := make(chan opencdc.Record, 1)
	destination := streammock.NewDestination(ctrl)
	destination.EXPECT().Open(gomock.Any()).Return(nil)
	destination.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, recs []opencdc.Record) error {
		for _, r := range recs {
			logger.Debug(ctx).
				Str("node_id", nodeID).
				Msg("got record")
			lastPosition = r.Position
			rchan <- r
		}
		return nil
	}).AnyTimes()
	destination.EXPECT().Ack(gomock.Any()).DoAndReturn(func(ctx context.Context) ([]connector.DestinationAck, error) {
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		case r, ok := <-rchan:
			if !ok {
				return nil, nil
			}
			return []connector.DestinationAck{{Position: r.Position}}, nil
		}
	}).AnyTimes()
	destination.EXPECT().Stop(gomock.Any(), EqLazy(func() interface{} { return lastPosition })).Return(nil)
	destination.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
		close(rchan)
		return nil
	})
	destination.EXPECT().Errors().Return(make(chan error))

	return destination
}

func counterProcessor(ctrl *gomock.Controller, count *int) stream.Processor {
	proc := streammock.NewProcessor(ctrl)
	proc.EXPECT().Open(gomock.Any())
	proc.EXPECT().
		Process(gomock.Any(), gomock.Any()).
		DoAndReturn(func(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
			*count++

			out := make([]sdk.ProcessedRecord, len(records))
			for i, r := range records {
				out[i] = sdk.SingleRecord(r)
			}

			return out
		}).AnyTimes()
	proc.EXPECT().Teardown(gomock.Any())
	return proc
}

func noopDLQHandler(ctrl *gomock.Controller) stream.DLQHandler {
	handler := streammock.NewDLQHandler(ctrl)
	handler.EXPECT().Open(gomock.Any()).Return(nil)
	handler.EXPECT().Close(gomock.Any())
	return handler
}

func gomockCtrl(logger log.CtxLogger) *gomock.Controller {
	return gomock.NewController(gomockLogger(logger))
}

type gomockLogger log.CtxLogger

func (g gomockLogger) Errorf(format string, args ...interface{}) {
	g.Error().Msgf(format, args...)
}

func (g gomockLogger) Fatalf(format string, args ...interface{}) {
	g.Fatal().Msgf(format, args...)
}

func runNode(ctx context.Context, wg *sync.WaitGroup, n stream.Node) {
	defer wg.Done()
	err := n.Run(ctx)
	if err != nil {
		fmt.Printf("%s error: %v\n", n.ID(), err)
	}
}

func EqLazy(x func() interface{}) gomock.Matcher { return eqMatcherLazy{x} }

type eqMatcherLazy struct {
	x func() interface{}
}

func (e eqMatcherLazy) Matches(x interface{}) bool {
	return gomock.Eq(e.x()).Matches(x)
}

func (e eqMatcherLazy) String() string {
	return gomock.Eq(e.x()).String()
}
Output:

DBG opening processor component=ProcessorNode node_id=counter
DBG got record message_id=generator2/1 node_id=printer2
DBG got record message_id=generator2/1 node_id=printer1
DBG received ack message_id=generator2/1 node_id=generator2
DBG got record message_id=generator1/1 node_id=printer1
DBG got record message_id=generator1/1 node_id=printer2
DBG received ack message_id=generator1/1 node_id=generator1
DBG got record message_id=generator2/2 node_id=printer2
DBG got record message_id=generator2/2 node_id=printer1
DBG received ack message_id=generator2/2 node_id=generator2
DBG got record message_id=generator1/2 node_id=printer1
DBG got record message_id=generator1/2 node_id=printer2
DBG received ack message_id=generator1/2 node_id=generator1
DBG got record message_id=generator2/3 node_id=printer2
DBG got record message_id=generator2/3 node_id=printer1
DBG received ack message_id=generator2/3 node_id=generator2
DBG got record message_id=generator1/3 node_id=printer1
DBG got record message_id=generator1/3 node_id=printer2
DBG received ack message_id=generator1/3 node_id=generator1
DBG got record message_id=generator2/4 node_id=printer2
DBG got record message_id=generator2/4 node_id=printer1
DBG received ack message_id=generator2/4 node_id=generator2
DBG got record message_id=generator1/4 node_id=printer2
DBG got record message_id=generator1/4 node_id=printer1
DBG received ack message_id=generator1/4 node_id=generator1
DBG got record message_id=generator2/5 node_id=printer2
DBG got record message_id=generator2/5 node_id=printer1
DBG received ack message_id=generator2/5 node_id=generator2
DBG got record message_id=generator1/5 node_id=printer1
DBG got record message_id=generator1/5 node_id=printer2
DBG received ack message_id=generator1/5 node_id=generator1
DBG got record message_id=generator2/6 node_id=printer2
DBG got record message_id=generator2/6 node_id=printer1
DBG received ack message_id=generator2/6 node_id=generator2
DBG got record message_id=generator1/6 node_id=printer1
DBG got record message_id=generator1/6 node_id=printer2
DBG received ack message_id=generator1/6 node_id=generator1
DBG got record message_id=generator2/7 node_id=printer2
DBG got record message_id=generator2/7 node_id=printer1
DBG received ack message_id=generator2/7 node_id=generator2
DBG got record message_id=generator1/7 node_id=printer1
DBG got record message_id=generator1/7 node_id=printer2
DBG received ack message_id=generator1/7 node_id=generator1
DBG got record message_id=generator2/8 node_id=printer2
DBG got record message_id=generator2/8 node_id=printer1
DBG received ack message_id=generator2/8 node_id=generator2
DBG got record message_id=generator1/8 node_id=printer1
DBG got record message_id=generator1/8 node_id=printer2
DBG received ack message_id=generator1/8 node_id=generator1
DBG got record message_id=generator2/9 node_id=printer1
DBG got record message_id=generator2/9 node_id=printer2
DBG received ack message_id=generator2/9 node_id=generator2
DBG got record message_id=generator1/9 node_id=printer2
DBG got record message_id=generator1/9 node_id=printer1
DBG received ack message_id=generator1/9 node_id=generator1
DBG got record message_id=generator2/10 node_id=printer1
DBG got record message_id=generator2/10 node_id=printer2
DBG received ack message_id=generator2/10 node_id=generator2
DBG got record message_id=generator1/10 node_id=printer2
DBG got record message_id=generator1/10 node_id=printer1
DBG received ack message_id=generator1/10 node_id=generator1
INF stopping source connector component=SourceNode node_id=generator1
INF stopping source connector component=SourceNode node_id=generator2
INF stopping source node component=SourceNode node_id=generator1 record_position=10
INF stopping source node component=SourceNode node_id=generator2 record_position=10
DBG incoming messages channel closed component=SourceAckerNode node_id=generator1-acker
DBG incoming messages channel closed component=SourceAckerNode node_id=generator2-acker
DBG incoming messages channel closed component=ProcessorNode node_id=counter
DBG tearing down processor component=ProcessorNode node_id=counter
DBG incoming messages channel closed component=DestinationNode node_id=printer1
DBG incoming messages channel closed component=DestinationNode node_id=printer2
DBG incoming messages channel closed component=DestinationAckerNode node_id=printer1-acker
DBG incoming messages channel closed component=DestinationAckerNode node_id=printer2-acker
INF counter node counted 20 messages
INF finished successfully
Example (SimpleStream)
package main

import (
	"context"
	"fmt"
	"strconv"
	"sync"
	"time"

	"github.com/conduitio/conduit-commons/csync"
	"github.com/conduitio/conduit-commons/opencdc"
	"github.com/conduitio/conduit/pkg/connector"
	"github.com/conduitio/conduit/pkg/foundation/ctxutil"
	"github.com/conduitio/conduit/pkg/foundation/log"
	"github.com/conduitio/conduit/pkg/foundation/metrics/noop"
	"github.com/conduitio/conduit/pkg/lifecycle/stream"

	streammock "github.com/conduitio/conduit/pkg/lifecycle/stream/mock"

	connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
	"github.com/rs/zerolog"
	"go.uber.org/mock/gomock"
)

func main() {
	ctx, killAll := context.WithCancel(context.Background())
	defer killAll()

	logger := newLogger()
	ctrl := gomockCtrl(logger)

	dlqNode := &stream.DLQHandlerNode{
		Name:                "dlq",
		Handler:             noopDLQHandler(ctrl),
		WindowSize:          1,
		WindowNackThreshold: 0,
	}
	dlqNode.Add(1) // 1 source
	node1 := &stream.SourceNode{
		Name:          "generator",
		Source:        generatorSource(ctrl, logger, "generator", 10, time.Millisecond*10),
		PipelineTimer: noop.Timer{},
	}
	node2 := &stream.SourceAckerNode{
		Name:           "generator-acker",
		Source:         node1.Source,
		DLQHandlerNode: dlqNode,
	}
	node3 := &stream.DestinationNode{
		Name:           "printer",
		Destination:    printerDestination(ctrl, logger, "printer"),
		ConnectorTimer: noop.Timer{},
	}
	node4 := &stream.DestinationAckerNode{
		Name:        "printer-acker",
		Destination: node3.Destination,
	}

	stream.SetLogger(node1, logger)
	stream.SetLogger(node2, logger)
	stream.SetLogger(node3, logger)
	stream.SetLogger(node4, logger)

	// put everything together
	node2.Sub(node1.Pub())
	node3.Sub(node2.Pub())
	node4.Sub(node3.Pub())

	var wg sync.WaitGroup
	wg.Add(5)
	go runNode(ctx, &wg, dlqNode)
	go runNode(ctx, &wg, node4)
	go runNode(ctx, &wg, node3)
	go runNode(ctx, &wg, node2)
	go runNode(ctx, &wg, node1)

	// stop node after 150ms, which should be enough to process the 10 messages
	time.AfterFunc(150*time.Millisecond, func() { _ = node1.Stop(ctx, nil) })
	// give the node some time to process the records, plus a bit of time to stop
	if (*csync.WaitGroup)(&wg).WaitTimeout(ctx, time.Second) != nil {
		killAll()
	} else {
		logger.Info(ctx).Msg("finished successfully")
	}

}

func newLogger() log.CtxLogger {
	w := zerolog.NewConsoleWriter()
	w.NoColor = true
	w.PartsExclude = []string{zerolog.TimestampFieldName}

	zlogger := zerolog.New(w)
	zlogger = zlogger.Level(zerolog.DebugLevel)
	logger := log.New(zlogger)
	logger.Logger = logger.Hook(ctxutil.MessageIDLogCtxHook{})

	return logger
}

func generatorSource(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string, recordCount int, delay time.Duration) stream.Source {
	position := 0

	teardown := make(chan struct{})
	source := streammock.NewSource(ctrl)
	source.EXPECT().ID().Return(nodeID).AnyTimes()
	source.EXPECT().Open(gomock.Any()).Return(nil)
	source.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(context.Context) error {
		close(teardown)
		return nil
	})
	source.EXPECT().Ack(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, p []opencdc.Position) error {
		logger.Debug(ctx).Str("node_id", nodeID).Msg("received ack")
		return nil
	}).Times(recordCount)
	source.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) ([]opencdc.Record, error) {
		time.Sleep(delay)

		if position == recordCount {

			<-teardown
			return nil, connectorPlugin.ErrStreamNotOpen
		}

		position++
		return []opencdc.Record{{
			Position: opencdc.Position(strconv.Itoa(position)),
		}}, nil
	}).MinTimes(recordCount + 1)
	source.EXPECT().Stop(gomock.Any()).DoAndReturn(func(context.Context) (opencdc.Position, error) {
		return opencdc.Position(strconv.Itoa(position)), nil
	})
	source.EXPECT().Errors().Return(make(chan error))

	return source
}

func printerDestination(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string) stream.Destination {
	var lastPosition opencdc.Position
	rchan := make(chan opencdc.Record, 1)
	destination := streammock.NewDestination(ctrl)
	destination.EXPECT().Open(gomock.Any()).Return(nil)
	destination.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, recs []opencdc.Record) error {
		for _, r := range recs {
			logger.Debug(ctx).
				Str("node_id", nodeID).
				Msg("got record")
			lastPosition = r.Position
			rchan <- r
		}
		return nil
	}).AnyTimes()
	destination.EXPECT().Ack(gomock.Any()).DoAndReturn(func(ctx context.Context) ([]connector.DestinationAck, error) {
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		case r, ok := <-rchan:
			if !ok {
				return nil, nil
			}
			return []connector.DestinationAck{{Position: r.Position}}, nil
		}
	}).AnyTimes()
	destination.EXPECT().Stop(gomock.Any(), EqLazy(func() interface{} { return lastPosition })).Return(nil)
	destination.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
		close(rchan)
		return nil
	})
	destination.EXPECT().Errors().Return(make(chan error))

	return destination
}

func noopDLQHandler(ctrl *gomock.Controller) stream.DLQHandler {
	handler := streammock.NewDLQHandler(ctrl)
	handler.EXPECT().Open(gomock.Any()).Return(nil)
	handler.EXPECT().Close(gomock.Any())
	return handler
}

func gomockCtrl(logger log.CtxLogger) *gomock.Controller {
	return gomock.NewController(gomockLogger(logger))
}

type gomockLogger log.CtxLogger

func (g gomockLogger) Errorf(format string, args ...interface{}) {
	g.Error().Msgf(format, args...)
}

func (g gomockLogger) Fatalf(format string, args ...interface{}) {
	g.Fatal().Msgf(format, args...)
}

func runNode(ctx context.Context, wg *sync.WaitGroup, n stream.Node) {
	defer wg.Done()
	err := n.Run(ctx)
	if err != nil {
		fmt.Printf("%s error: %v\n", n.ID(), err)
	}
}

func EqLazy(x func() interface{}) gomock.Matcher { return eqMatcherLazy{x} }

type eqMatcherLazy struct {
	x func() interface{}
}

func (e eqMatcherLazy) Matches(x interface{}) bool {
	return gomock.Eq(e.x()).Matches(x)
}

func (e eqMatcherLazy) String() string {
	return gomock.Eq(e.x()).String()
}
Output:

DBG got record message_id=generator/1 node_id=printer
DBG received ack message_id=generator/1 node_id=generator
DBG got record message_id=generator/2 node_id=printer
DBG received ack message_id=generator/2 node_id=generator
DBG got record message_id=generator/3 node_id=printer
DBG received ack message_id=generator/3 node_id=generator
DBG got record message_id=generator/4 node_id=printer
DBG received ack message_id=generator/4 node_id=generator
DBG got record message_id=generator/5 node_id=printer
DBG received ack message_id=generator/5 node_id=generator
DBG got record message_id=generator/6 node_id=printer
DBG received ack message_id=generator/6 node_id=generator
DBG got record message_id=generator/7 node_id=printer
DBG received ack message_id=generator/7 node_id=generator
DBG got record message_id=generator/8 node_id=printer
DBG received ack message_id=generator/8 node_id=generator
DBG got record message_id=generator/9 node_id=printer
DBG received ack message_id=generator/9 node_id=generator
DBG got record message_id=generator/10 node_id=printer
DBG received ack message_id=generator/10 node_id=generator
INF stopping source connector component=SourceNode node_id=generator
INF stopping source node component=SourceNode node_id=generator record_position=10
DBG incoming messages channel closed component=SourceAckerNode node_id=generator-acker
DBG incoming messages channel closed component=DestinationNode node_id=printer
DBG incoming messages channel closed component=DestinationAckerNode node_id=printer-acker
INF finished successfully

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrUnexpectedMessageStatus = cerrors.New("unexpected message status")

Functions

func LoggerWithComponent

func LoggerWithComponent(logger log.CtxLogger, v Node) log.CtxLogger

LoggerWithComponent creates a logger with the component set to the node name.

func LoggerWithNodeID

func LoggerWithNodeID(logger log.CtxLogger, n Node) log.CtxLogger

LoggerWithNodeID creates a logger with the node ID field.

func SetLogger

func SetLogger(n Node, logger log.CtxLogger, options ...func(log.CtxLogger, Node) log.CtxLogger)

SetLogger figures out if the node needs a logger, sets static metadata in the logger and supplies it to the node. Behavior can be customized by supplying custom options.

Types

type AckHandler

type AckHandler func(*Message) error

AckHandler is a variation of the StatusChangeHandler that is only called when a message is acked. For more info see StatusChangeHandler.

type ControlMessageType

type ControlMessageType string

ControlMessageType represents the type of a control message.

const (
	ControlMessageStopSourceNode ControlMessageType = "stop-source-node"
)

type DLQHandler

type DLQHandler interface {
	Open(context.Context) error
	Write(context.Context, opencdc.Record) error
	Close(context.Context) error
}

type DLQHandlerNode

type DLQHandlerNode struct {
	Name    string
	Handler DLQHandler

	WindowSize          int
	WindowNackThreshold int

	Timer     metrics.Timer
	Histogram metrics.RecordBytesHistogram
	// contains filtered or unexported fields
}

DLQHandlerNode is called by each SourceAckerNode in a pipeline to store nacked messages in a dead-letter-queue.

func (*DLQHandlerNode) Ack

func (n *DLQHandlerNode) Ack(msg *Message)

func (*DLQHandlerNode) Add

func (n *DLQHandlerNode) Add(delta int)

Add should be called before Run to increase the counter of components depending on DLQHandlerNode. The node will keep running until the counter reaches 0 (see Done).

Note a call with positive delta to Add should happen before Run.

func (*DLQHandlerNode) Done

func (n *DLQHandlerNode) Done()

Done should be called before a component depending on DLQHandlerNode stops running and guarantees there will be no more calls to DLQHandlerNode.

func (*DLQHandlerNode) ForceStop

func (n *DLQHandlerNode) ForceStop(ctx context.Context)

func (*DLQHandlerNode) ID

func (n *DLQHandlerNode) ID() string

func (*DLQHandlerNode) Nack

func (n *DLQHandlerNode) Nack(msg *Message, nackMetadata NackMetadata) error

func (*DLQHandlerNode) Run

func (n *DLQHandlerNode) Run(ctx context.Context) error

Run runs the DLQ handler node until all components depending on this node call Done. Dependents can be added or removed while the node is running.

func (*DLQHandlerNode) SetLogger

func (n *DLQHandlerNode) SetLogger(logger log.CtxLogger)

type Destination

type Destination interface {
	ID() string
	Open(context.Context) error
	Write(context.Context, []opencdc.Record) error
	Ack(context.Context) ([]connector.DestinationAck, error)
	Stop(context.Context, opencdc.Position) error
	Teardown(context.Context) error
	Errors() <-chan error
}

type DestinationAckerNode

type DestinationAckerNode struct {
	Name        string
	Destination Destination
	// contains filtered or unexported fields
}

DestinationAckerNode is responsible for handling acknowledgments received from the destination and forwarding them to the correct message.

func (*DestinationAckerNode) ForceStop

func (n *DestinationAckerNode) ForceStop(ctx context.Context)

func (*DestinationAckerNode) ID

func (n *DestinationAckerNode) ID() string

func (*DestinationAckerNode) Run

func (n *DestinationAckerNode) Run(ctx context.Context) (err error)

func (*DestinationAckerNode) SetLogger

func (n *DestinationAckerNode) SetLogger(logger log.CtxLogger)

SetLogger sets the logger.

func (*DestinationAckerNode) Sub

func (n *DestinationAckerNode) Sub(in <-chan *Message)

Sub will subscribe this node to an incoming channel.

type DestinationNode

type DestinationNode struct {
	Name           string
	Destination    Destination
	ConnectorTimer metrics.Timer
	// contains filtered or unexported fields
}

DestinationNode wraps a Destination connector and implements the Sub node interface

func (*DestinationNode) ForceStop

func (n *DestinationNode) ForceStop(ctx context.Context)

func (*DestinationNode) ID

func (n *DestinationNode) ID() string

func (*DestinationNode) Pub

func (n *DestinationNode) Pub() <-chan *Message

Pub will return the outgoing channel.

func (*DestinationNode) Run

func (n *DestinationNode) Run(ctx context.Context) (err error)

func (*DestinationNode) SetLogger

func (n *DestinationNode) SetLogger(logger log.CtxLogger)

SetLogger sets the logger.

func (*DestinationNode) Sub

func (n *DestinationNode) Sub(in <-chan *Message)

Sub will subscribe this node to an incoming channel.

type FaninNode

type FaninNode struct {
	Name string
	// contains filtered or unexported fields
}

func (*FaninNode) ID

func (n *FaninNode) ID() string

func (*FaninNode) Pub

func (n *FaninNode) Pub() <-chan *Message

func (*FaninNode) Run

func (n *FaninNode) Run(ctx context.Context) error

func (*FaninNode) Sub

func (n *FaninNode) Sub(in <-chan *Message)

type FanoutNode

type FanoutNode struct {
	Name string
	// contains filtered or unexported fields
}

func (*FanoutNode) ID

func (n *FanoutNode) ID() string

func (*FanoutNode) Pub

func (n *FanoutNode) Pub() <-chan *Message

func (*FanoutNode) Run

func (n *FanoutNode) Run(ctx context.Context) error

func (*FanoutNode) Sub

func (n *FanoutNode) Sub(in <-chan *Message)

type ForceStoppableNode

type ForceStoppableNode interface {
	Node

	// ForceStop signals a running ForceStoppableNode that it should stop
	// running as soon as it can, even if that means that it doesn't properly
	// clean up after itself. This method is a last resort in case something
	// goes wrong and the pipeline gets stuck (e.g. a connector plugin is not
	// responding).
	ForceStop(ctx context.Context)
}

type LoggingNode

type LoggingNode interface {
	Node

	// SetLogger sets the logger used by the node for logging.
	SetLogger(log.CtxLogger)
}

LoggingNode is a node which expects a logger.

type Message

type Message struct {
	// Ctx is the context in which the record was fetched. It should be used for
	// any function calls when processing the message. If the context is done
	// the message should be nacked as soon as possible and not processed
	// further.
	Ctx context.Context
	// Record represents a single record attached to the message.
	Record opencdc.Record

	// SourceID contains the source connector ID.
	SourceID string
	// contains filtered or unexported fields
}

Message represents a single message flowing through a pipeline. Only a single node is allowed to hold a message and access its fields at a specific point in time, otherwise we could introduce race conditions.

func (*Message) Ack

func (m *Message) Ack() error

Ack marks the message as acked, calls the corresponding status change handlers and closes the channel returned by Acked. Errors from ack handlers get collected and returned as a single error. If Ack returns an error, the caller node should stop processing new messages and return the error. Calling Ack after the message has already been nacked will panic, while subsequent calls to Ack on an acked message are a noop and return the same value.

func (*Message) Acked

func (m *Message) Acked() <-chan struct{}

Acked returns a channel that's closed when the message has been acked. Successive calls to Acked return the same value. This function can be used to wait for a message to be acked without notifying the acker.

func (*Message) Clone

func (m *Message) Clone() *Message

Clone returns a cloned message with the same content but separate ack and nack handling.

func (*Message) ControlMessageType

func (m *Message) ControlMessageType() ControlMessageType

func (*Message) ID

func (m *Message) ID() string

ID returns a string representing a unique ID of this message. This is meant only for logging purposes.

func (*Message) Nack

func (m *Message) Nack(reason error, nodeID string) error

Nack marks the message as nacked, calls the registered status change handlers and closes the channel returned by Nacked. If no nack handlers were registered Nack will return an error. Errors from nack handlers get collected and returned as a single error. If Nack returns an error, the caller node should stop processing new messages and return the error. Calling Nack after the message has already been acked will panic, while subsequent calls to Nack on a nacked message are a noop and return the same value.

func (*Message) Nacked

func (m *Message) Nacked() <-chan struct{}

Nacked returns a channel that's closed when the message has been nacked. Successive calls to Nacked return the same value. This function can be used to wait for a message to be nacked without notifying the nacker.

func (*Message) RegisterAckHandler

func (m *Message) RegisterAckHandler(h AckHandler)

RegisterAckHandler is used to register a function that will be called when the message is acked. This function can only be called if the message status is open, otherwise it panics.

func (*Message) RegisterNackHandler

func (m *Message) RegisterNackHandler(h NackHandler)

RegisterNackHandler is used to register a function that will be called when the message is nacked. This function can only be called if the message status is open, otherwise it panics.

func (*Message) RegisterStatusHandler

func (m *Message) RegisterStatusHandler(h StatusChangeHandler)

RegisterStatusHandler is used to register a function that will be called on any status change of the message. This function can only be called if the message status is open, otherwise it panics. Handlers are called in the reverse order of how they were registered.

func (*Message) Status

func (m *Message) Status() MessageStatus

Status returns the current message status.

func (*Message) StatusError

func (m *Message) StatusError() error

StatusError returns the error that was returned when the message was acked or nacked. If the message was successfully acked/nacked or it is still open the method returns nil.

type MessageStatus

type MessageStatus int

MessageStatus represents the state of the message (acked, nacked or open).

const (
	MessageStatusAcked MessageStatus = iota
	MessageStatusNacked
	MessageStatusOpen
)

func (MessageStatus) String

func (i MessageStatus) String() string

type MetricsNode

type MetricsNode struct {
	Name      string
	Histogram metrics.RecordBytesHistogram
	// contains filtered or unexported fields
}

func (*MetricsNode) ID

func (n *MetricsNode) ID() string

func (*MetricsNode) Pub

func (n *MetricsNode) Pub() <-chan *Message

func (*MetricsNode) Run

func (n *MetricsNode) Run(ctx context.Context) error

func (*MetricsNode) SetLogger

func (n *MetricsNode) SetLogger(logger log.CtxLogger)

func (*MetricsNode) Sub

func (n *MetricsNode) Sub(in <-chan *Message)

type NackHandler

type NackHandler func(*Message, NackMetadata) error

NackHandler is a variation of the StatusChangeHandler that is only called when a message is nacked. For more info see StatusChangeHandler.

type NackMetadata

type NackMetadata struct {
	Reason error
	NodeID string
}

type Node

type Node interface {
	// ID returns the identifier of this Node. Each Node in a pipeline must be
	// uniquely identified by the ID.
	ID() string

	// Run first verifies if the Node is set up correctly and either returns a
	// descriptive error or starts processing messages. Processing should stop
	// as soon as the supplied context is done. If an error occurs while
	// processing messages, the processing should stop and the error should be
	// returned. If processing stopped because the context was canceled, the
	// function should return ctx.Err(). All nodes that are part of the same
	// pipeline will receive the same context in Run and as soon as one node
	// returns an error the context will be canceled.
	// Run has different responsibilities, depending on the node type:
	//  * PubNode has to start producing new messages into the outgoing channel.
	//    The context supplied to Run has to be attached to all messages. Each
	//    message will be either acked or nacked by downstream nodes, it's the
	//    responsibility of PubNode to handle these acks/nacks if applicable.
	//    The outgoing channel has to be closed when Run returns, regardless of
	//    the return value.
	//  * SubNode has to start listening to messages sent to the incoming
	//    channel. It has to use the context supplied in the message for calls
	//    to other functions (imagine the message context as a request context).
	//    It is the responsibility of SubNode to ack or nack a message if it's
	//    processed correctly or with an error. If the incoming channel is
	//    closed, then Run should stop and return nil.
	//  * PubSubNode has to start listening to incoming messages, process them
	//    and forward them to the outgoing channel. The node should not ack/nack
	//    forwarded messages. If a message is dropped and not forwarded to the
	//    outgoing channel (i.e. filters), the message should be acked. If an
	//    error is encountered while processing the message, the message has to
	//    be nacked and Run should return with an error. If the incoming channel
	//	  is closed, then Run should stop and return nil. The outgoing channel
	//    has to be closed when Run returns, regardless of the return value.
	//    The incoming message pointers need to be forwarded, as upstream nodes
	//    could be waiting for acks/nacks on that exact pointer. If the node
	//    forwards a new message (not the exact pointer it received), then it
	//    needs to forward any acks/nacks to the original message pointer.
	Run(ctx context.Context) error
}

Node represents a single node in a pipeline that knows how to process messages flowing through a pipeline.

type OpenMessagesTracker

type OpenMessagesTracker sync.WaitGroup

OpenMessagesTracker allows you to track messages until they reach the end of the pipeline.

func (*OpenMessagesTracker) Add

func (t *OpenMessagesTracker) Add(msg *Message)

Add will increase the counter in the wait group and register a status handler that will decrease the counter when the message is acked or nacked.

func (*OpenMessagesTracker) Wait

func (t *OpenMessagesTracker) Wait()

type ParallelNode

type ParallelNode struct {
	Name string
	// NewNode is the constructor of the wrapped PubSubNode, it should create
	// the i-th node (useful for distinguishing nodes in logs).
	NewNode func(i int) PubSubNode
	Workers int
	// contains filtered or unexported fields
}

ParallelNode wraps a PubSubNode and parallelizes it by running multiple instances of the node in separate worker goroutines. It also spawns a coordinator goroutine that is responsible for collecting the results and forwarding them to the next node down the line while maintaining the order of messages.

func (*ParallelNode) ID

func (n *ParallelNode) ID() string

func (*ParallelNode) Pub

func (n *ParallelNode) Pub() <-chan *Message

func (*ParallelNode) Run

func (n *ParallelNode) Run(ctx context.Context) error

Run is continuously fetching messages from the incoming channel (i.e. from the previous node) and submitting jobs to the job channel shared by all worker goroutines. Once a worker picks up the job, the same job is also sent to the coordinator which starts waiting for the job to be done. The worker is responsible for forwarding the message to the wrapped PubSubNode, waiting for it to process the message and then mark the job as done. Once the jobs are done the results are picked up by the coordinator which decides if the message should be sent to the next node, if it was filtered out or if an error happened and the node needs to stop running. The coordinator collects job results in the same order as the order of the dispatched jobs, so the order of messages is maintained.

func (*ParallelNode) SetLogger

func (n *ParallelNode) SetLogger(logger log.CtxLogger)

func (*ParallelNode) Sub

func (n *ParallelNode) Sub(in <-chan *Message)

type Processor

type Processor interface {
	// Open configures and opens a processor plugin
	Open(ctx context.Context) error
	Process(context.Context, []opencdc.Record) []sdk.ProcessedRecord
	// Teardown tears down a processor plugin.
	// In case of standalone plugins, that means stopping the WASM module.
	Teardown(context.Context) error
}

type ProcessorNode

type ProcessorNode struct {
	Name           string
	Processor      Processor
	ProcessorTimer metrics.Timer
	// contains filtered or unexported fields
}

func (*ProcessorNode) ID

func (n *ProcessorNode) ID() string

func (*ProcessorNode) Pub

func (n *ProcessorNode) Pub() <-chan *Message

func (*ProcessorNode) Run

func (n *ProcessorNode) Run(ctx context.Context) error

func (*ProcessorNode) SetLogger

func (n *ProcessorNode) SetLogger(logger log.CtxLogger)

func (*ProcessorNode) Sub

func (n *ProcessorNode) Sub(in <-chan *Message)

type PubNode

type PubNode interface {
	Node

	// Pub returns the outgoing channel, that can be used to connect downstream
	// nodes to PubNode. It is the responsibility of PubNode to close this
	// channel when it stops running (see Node.Run). Pub needs to be called
	// before running a PubNode, otherwise Node.Run should return an error.
	Pub() <-chan *Message
}

PubNode represents a node at the start of a pipeline, which pushes new messages to downstream nodes.

type PubSubNode

type PubSubNode interface {
	PubNode
	SubNode
}

PubSubNode represents a node in the middle of a pipeline, located between two nodes. It listens to incoming messages from the incoming channel, processes them and forwards them to the outgoing channel.

type Source

type Source interface {
	ID() string
	Open(context.Context) error
	Read(context.Context) ([]opencdc.Record, error)
	Ack(context.Context, []opencdc.Position) error
	Stop(context.Context) (opencdc.Position, error)
	Teardown(context.Context) error
	Errors() <-chan error
}

type SourceAckerNode

type SourceAckerNode struct {
	Name           string
	Source         Source
	DLQHandlerNode *DLQHandlerNode
	// contains filtered or unexported fields
}

SourceAckerNode is responsible for handling acknowledgments for messages of a specific source and forwarding them to the source in the correct order.

func (*SourceAckerNode) ID

func (n *SourceAckerNode) ID() string

func (*SourceAckerNode) Pub

func (n *SourceAckerNode) Pub() <-chan *Message

func (*SourceAckerNode) Run

func (n *SourceAckerNode) Run(ctx context.Context) error

func (*SourceAckerNode) SetLogger

func (n *SourceAckerNode) SetLogger(logger log.CtxLogger)

func (*SourceAckerNode) Sub

func (n *SourceAckerNode) Sub(in <-chan *Message)

type SourceNode

type SourceNode struct {
	Name          string
	Source        Source
	PipelineTimer metrics.Timer
	// contains filtered or unexported fields
}

SourceNode wraps a Source connector and implements the Pub node interface

func (*SourceNode) ForceStop

func (n *SourceNode) ForceStop(ctx context.Context)

func (*SourceNode) ID

func (n *SourceNode) ID() string

ID returns a properly formatted SourceNode ID prefixed with `source/`

func (*SourceNode) Pub

func (n *SourceNode) Pub() <-chan *Message

func (*SourceNode) Run

func (n *SourceNode) Run(ctx context.Context) (err error)

func (*SourceNode) SetLogger

func (n *SourceNode) SetLogger(logger log.CtxLogger)

func (*SourceNode) Stop

func (n *SourceNode) Stop(ctx context.Context, reason error) error

type StatusChange

type StatusChange struct {
	Old MessageStatus
	New MessageStatus
	// Reason contains the error that triggered the status change in case of a
	// nack.
	NackMetadata NackMetadata
}

StatusChange is passed to StatusChangeHandler when the status of a message changes.

type StatusChangeHandler

type StatusChangeHandler func(*Message, StatusChange) error

StatusChangeHandler is executed when a message status changes. The handlers are triggered by a call to either of these functions: Message.Nack, Message.Ack. These functions will block until the handlers finish handling the message and will return the error returned by the handlers. The function receives the message and the status change describing the old and new message status as well as the reason for the status change in case of a nack.

type StoppableNode

type StoppableNode interface {
	Node

	// Stop signals a running StopNode that it should gracefully shutdown. It
	// should stop producing new messages, wait to receive acks/nacks for any
	// in-flight messages, close the outgoing channel and return from Node.Run.
	// Stop should return right away, not waiting for the node to actually stop
	// running. If the node is not running the function does not do anything.
	// The reason supplied to Stop will be returned by Node.Run.
	Stop(ctx context.Context, reason error) error
}

type SubNode

type SubNode interface {
	Node

	// Sub sets the incoming channel, that is used to listen to new messages.
	// Node.Run should listen to messages coming from this channel until the
	// channel is closed. Sub needs to be called before running a SubNode,
	// otherwise Node.Run should return an error.
	Sub(in <-chan *Message)
}

SubNode represents a node at the end of a pipeline, which listens to incoming messages from upstream nodes.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL