Documentation ¶
Index ¶
- Constants
- Variables
- type Config
- type MessagePoolConfig
- type Option
- func WithBatchDecoder(d goduck.EndpointBatchDecoder) Option
- func WithBatchProcessor(p goduck.BatchProcessor) Option
- func WithConfig(userConfig Config) Option
- func WithDecoder(d goduck.EndpointDecoder) Option
- func WithEndpoint(e endpoint.Endpoint) Option
- func WithInputStreams(s ...goduck.Stream) Option
- func WithMiddlewares(middlewares ...endpoint.Middleware) Option
- func WithProcessor(p goduck.Processor) Option
- func WithSink(s Sink, e SinkEncoder) Option
- type Pipeline
- type Sink
- type SinkEncoder
- type SinkMessage
Constants ¶
const ( // ShutdownPriorityEngine opinionated value for the engine shutdown // priority. Change this if your app needs another value ShutdownPriorityEngine = app.ShutdownPriority(100) // ShutdownPriorityStream opinionated value for the input stream shutdown // priority. Change this if your app needs another value ShutdownPriorityStream = app.ShutdownPriority(70) )
TODO: These could be the default value, but pipeline should allow for configuring these values.
const (
EngineTypeRunOnce = "run_once_engine"
)
Variables ¶
var ( // ErrShutdown is an error returned when the shutdown doesn't finishes before // the given context is closed. ErrShutdown = errors.New("shutdown didn't finished as expected") // ErrEndpointNil is an error returned when the endpoint is nil. ErrEndpointNil = errors.New("endpoint is nil") // ErrEmptyInputStreamOrMessagePool is an error returned when the streams and the message pool a nil or empty slices. ErrEmptyInputStreamOrMessagePool = errors.New("input streams and message pool are nil or empty") //ErrBothInputSet is an error returned if both input stream and message pool are set. Only one is allowed. ErrBothInputSet = errors.New("input streams and message pool cannot be set simultaneously") // ErrNilDecoders is an error returned when both decoders are nil. ErrNilDecoders = errors.New("decoder and batch decoder are both nil") // ErrBothDecodersSet is an error returned when both decoders are set. Only one is allowed. ErrBothDecodersSet = errors.New("decoder and batch decoder cannot be set simultaneously") // ErrBatchSizeInvalid is an error returned when the batch size is less than 1. ErrBatchSizeInvalid = errors.New("invalid batch size") // ErrSinkNil is an error returned when the sink is not set. ErrSinkNil = errors.New("sink is nil") // ErrSinkEncoderNil is an error returned when the sink encoder is not set. ErrSinkEncoderNil = errors.New("sink encoder is nil") // ErrSystemNameEmpty is an error returned when the system name is empty. ErrSystemNameEmpty = errors.New("system name is empty") // ErrInfiniteBehavior is an error returned when the DLQ is active but max retries is set as infinite. // This may cause an infinite loop if the returned service error with wright severity was not SeverityInput type. ErrInfiniteBehavior = errors.New("DLQ is active but max retries is set as infinite") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { SystemName string EngineType string InputStream struct { Provider string Kafka struct { Topic string GroupID string NWorkers int } BatchSize int `default:"1"` MaxTimeoutMilli int `default:"0"` CommitIntervalMilli int `default:"0"` ProcessingTimeoutMilli int `default:"60000"` MaxProcessingRetries int `default:"10"` DLQKafkaTopic string } MessagePool MessagePoolConfig StaleAfter time.Duration Kafka struct { Brokers string Username string Password string `secret:"true"` } Backoffmiddleware struct { InitialDelay time.Duration `default:"200ms"` MaxDelay time.Duration `default:"10s"` Spread float64 `default:"0.2"` Factor float64 `default:"1.5"` MaxRetries int `default:"-1"` } }
Config contains the parameters for a general purpose pipeline. This should be set by the user that is running the pipeline. This goes nicely with app.SetupConfig().
type MessagePoolConfig ¶ added in v0.13.0
type MessagePoolConfig struct { Provider string NWorkers int `default:"1"` Pubsub pubsubqueue.PubsubConfigs }
MessagePoolConfig contains parameters for configuring a Message Poll Engine. This is already embeded in the Config struct and is used by New function when filled.
type Option ¶
type Option func(*pipelineBuilderOptions)
Option applies an option to the Config struct
func WithBatchDecoder ¶
func WithBatchDecoder(d goduck.EndpointBatchDecoder) Option
WithBatchDecoder adds a batch decoder to the Config struct
func WithBatchProcessor ¶ added in v0.15.3
func WithBatchProcessor(p goduck.BatchProcessor) Option
WithBatchProcessor adds a batchProcessor to the config struct
func WithConfig ¶
WithConfig takes the Config struct and configures batch size and max timeout of the pipeline and also configures middlewares and a DLQ. This is doing too much and must be replaced by other options.
func WithDecoder ¶
func WithDecoder(d goduck.EndpointDecoder) Option
WithDecoder adds a decoder to the Config struct
func WithEndpoint ¶
WithEndpoint adds an endpoint to the Config struct
func WithInputStreams ¶
WithInputStreams adds an input stream to the Config struct
func WithMiddlewares ¶
func WithMiddlewares( middlewares ...endpoint.Middleware, ) Option
WithMiddlewares chains the specified middlewares on the transformation and sink layers
func WithProcessor ¶ added in v0.15.3
WithProcessor adds a processor to the config struct
func WithSink ¶
func WithSink(s Sink, e SinkEncoder) Option
WithSink adds a sink to the Config struct
type Pipeline ¶
Pipeline is a goduck worker that read messages, proccess them and send them to a sink.
type Sink ¶
type Sink interface {
Store(ctx context.Context, input ...SinkMessage) error
}
Sink stores a batch of messages
func SinkWithRetry ¶
SinkWithRetry decorates a sink with a retrier. It uses an exponential backoff and tries for 5 times. Only runtime errors are retried.
type SinkEncoder ¶
type SinkEncoder func(ctx context.Context, response interface{}) ([]SinkMessage, error)
SinkEncoder encodes the transformation output into the Sink input type.
type SinkMessage ¶
type SinkMessage interface{}
SinkMessage is the input type to the pipeline sink. This type should be an array