pipeline

package
v0.15.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: BSD-3-Clause Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ShutdownPriorityEngine opinionated value for the engine shutdown
	// priority. Change this if your app needs another value
	ShutdownPriorityEngine = app.ShutdownPriority(100)
	// ShutdownPriorityStream opinionated value for the input stream shutdown
	// priority. Change this if your app needs another value
	ShutdownPriorityStream = app.ShutdownPriority(70)
)

TODO: These could be the default value, but pipeline should allow for configuring these values.

View Source
const (
	EngineTypeRunOnce = "run_once_engine"
)

Variables

View Source
var (
	// ErrShutdown is an error returned when the shutdown doesn't finishes before
	// the given context is closed.
	ErrShutdown = errors.New("shutdown didn't finished as expected")
	// ErrEndpointNil is an error returned when the endpoint is nil.
	ErrEndpointNil = errors.New("endpoint is nil")
	// ErrEmptyInputStreamOrMessagePool is an error returned when the streams and the message pool a nil or empty slices.
	ErrEmptyInputStreamOrMessagePool = errors.New("input streams and message pool are nil or empty")
	//ErrBothInputSet is an error returned if both input stream and message pool are set. Only one is allowed.
	ErrBothInputSet = errors.New("input streams and message pool cannot be set simultaneously")
	// ErrNilDecoders is an error returned when both decoders are nil.
	ErrNilDecoders = errors.New("decoder and batch decoder are both nil")
	// ErrBothDecodersSet is an error returned when both decoders are set. Only one is allowed.
	ErrBothDecodersSet = errors.New("decoder and batch decoder cannot be set simultaneously")
	// ErrBatchSizeInvalid is an error returned when the batch size is less than 1.
	ErrBatchSizeInvalid = errors.New("invalid batch size")
	// ErrSinkNil is an error returned when the sink is not set.
	ErrSinkNil = errors.New("sink is nil")
	// ErrSinkEncoderNil is an error returned when the sink encoder is not set.
	ErrSinkEncoderNil = errors.New("sink encoder is nil")
	// ErrSystemNameEmpty is an error returned when the system name is empty.
	ErrSystemNameEmpty = errors.New("system name is empty")
	// ErrInfiniteBehavior is an error returned when the DLQ is active but max retries is set as infinite.
	// This may cause an infinite loop if the returned service error with wright severity was not SeverityInput type.
	ErrInfiniteBehavior = errors.New("DLQ is active but max retries is set as infinite")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	SystemName  string
	EngineType  string
	InputStream struct {
		Provider string
		Kafka    struct {
			Topic    string
			GroupID  string
			NWorkers int
		}
		BatchSize           int `default:"1"`
		MaxTimeoutMilli     int `default:"0"`
		CommitIntervalMilli int `default:"0"`

		ProcessingTimeoutMilli int `default:"60000"`
		MaxProcessingRetries   int `default:"10"`
		DLQKafkaTopic          string
	}
	MessagePool MessagePoolConfig
	StaleAfter  time.Duration
	Kafka       struct {
		Brokers  string
		Username string
		Password string `secret:"true"`
	}
	Backoffmiddleware struct {
		InitialDelay time.Duration `default:"200ms"`
		MaxDelay     time.Duration `default:"10s"`
		Spread       float64       `default:"0.2"`
		Factor       float64       `default:"1.5"`
		MaxRetries   int           `default:"-1"`
	}
}

Config contains the parameters for a general purpose pipeline. This should be set by the user that is running the pipeline. This goes nicely with app.SetupConfig().

type MessagePoolConfig added in v0.13.0

type MessagePoolConfig struct {
	Provider string
	NWorkers int `default:"1"`
	Pubsub   pubsubqueue.PubsubConfigs
}

MessagePoolConfig contains parameters for configuring a Message Poll Engine. This is already embeded in the Config struct and is used by New function when filled.

type Option

type Option func(*pipelineBuilderOptions)

Option applies an option to the Config struct

func WithBatchDecoder

func WithBatchDecoder(d goduck.EndpointBatchDecoder) Option

WithBatchDecoder adds a batch decoder to the Config struct

func WithBatchProcessor added in v0.15.3

func WithBatchProcessor(p goduck.BatchProcessor) Option

WithBatchProcessor adds a batchProcessor to the config struct

func WithConfig

func WithConfig(userConfig Config) Option

WithConfig takes the Config struct and configures batch size and max timeout of the pipeline and also configures middlewares and a DLQ. This is doing too much and must be replaced by other options.

func WithDecoder

func WithDecoder(d goduck.EndpointDecoder) Option

WithDecoder adds a decoder to the Config struct

func WithEndpoint

func WithEndpoint(e endpoint.Endpoint) Option

WithEndpoint adds an endpoint to the Config struct

func WithInputStreams

func WithInputStreams(s ...goduck.Stream) Option

WithInputStreams adds an input stream to the Config struct

func WithMiddlewares

func WithMiddlewares(
	middlewares ...endpoint.Middleware,
) Option

WithMiddlewares chains the specified middlewares on the transformation and sink layers

func WithProcessor added in v0.15.3

func WithProcessor(p goduck.Processor) Option

WithProcessor adds a processor to the config struct

func WithSink

func WithSink(s Sink, e SinkEncoder) Option

WithSink adds a sink to the Config struct

type Pipeline

type Pipeline interface {
	Run(ctx context.Context) error
	Shutdown(ctx context.Context) error
}

Pipeline is a goduck worker that read messages, proccess them and send them to a sink.

func MustNew

func MustNew(opts ...Option) Pipeline

MustNew returns a new pipeline but panics in case of error.

func New

func New(opts ...Option) (Pipeline, error)

New returns a new pipeline.

type Sink

type Sink interface {
	Store(ctx context.Context, input ...SinkMessage) error
}

Sink stores a batch of messages

func SinkWithRetry

func SinkWithRetry(next Sink) Sink

SinkWithRetry decorates a sink with a retrier. It uses an exponential backoff and tries for 5 times. Only runtime errors are retried.

type SinkEncoder

type SinkEncoder func(ctx context.Context, response interface{}) ([]SinkMessage, error)

SinkEncoder encodes the transformation output into the Sink input type.

type SinkMessage

type SinkMessage interface{}

SinkMessage is the input type to the pipeline sink. This type should be an array

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL