Documentation ¶
Overview ¶
Package drain implements a composable message distribution package for Go.
This package is a forked and altered version of the original package from the Docker project, which can be found at: https://github.com/docker/go-events
Index ¶
- Variables
- type BroadcasterSink
- type ChannelSink
- type ExponentialBackoffConfig
- type FilterFn
- type KinesisAPI
- type MapperFn
- type Marshaller
- type RetrySinkStrategy
- type Sink
- func NewFilter[M any](dst Sink[M], matcher FilterFn[M]) Sink[M]
- func NewIOWriter[M any](iow io.Writer, marshaller Marshaller[M]) Sink[M]
- func NewKinesisSink[M any](streamName string, api KinesisAPI, marshaller Marshaller[M], ...) (Sink[M], error)
- func NewMapper[M any](dst Sink[M], mapper MapperFn[M]) Sink[M]
- func NewNop[M any]() Sink[M]
- func NewQueue[M any](dst Sink[M], throughput int, dropHandling WriteErrorFn[M]) Sink[M]
- func NewRetrying[M any](sink Sink[M], strategy RetrySinkStrategy[M], dropHandling WriteErrorFn[M]) Sink[M]
- type WriteErrorFn
- type Writer
Constants ¶
This section is empty.
Variables ¶
var ( // JSONMarshaller a simple JSON marshaller function that uses the standard // library `encoding/json` package to marshal events.Event messages. JSONMarshaller = func(m events.Event) ([]byte, error) { return json.Marshal(m) } // ProtoMarshaller assumes the input message as `proto.Message`, and marshall // using `proto.Marshal()`. ProtoMarshaller = func(m events.Event) ([]byte, error) { if pb, ok := m.(proto.Message); ok { return proto.Marshal(pb) } return nil, fmt.Errorf("could not marshal (proto) message of type `%T`, not a proto message", m) } // AnyPBMarshaller assumes the input message as a `proto.Message`, and // marshall using `anypb` package. That is, it wraps the original proto // message in an `Any` message, and then marshall the `Any` message. AnyPBMarshaller = func(m events.Event) ([]byte, error) { if pb, ok := m.(proto.Message); ok { anyMsg, err := anypb.New(pb) if err != nil { return nil, fmt.Errorf("%w: could not marshal (proto+anypb) message", err) } return proto.Marshal(anyMsg) } return nil, fmt.Errorf("could not marshal (proto+anypb) message of type `%T`, not a proto message", m) } // ProtoJSONMarshaller similar to "AnyPBMarshaller". Assumes that the message // is a `proto.Message` instance, and marshall it using `protojson` package. ProtoJSONMarshaller = func(m events.Event) ([]byte, error) { if pb, ok := m.(proto.Message); ok { return protojson.Marshal(pb) } return nil, fmt.Errorf("could not marshal (proto+json) message of type `%T`, not a proto message", m) } )
List of commonly used marshallers.
var DefaultExponentialBackoffConfig = ExponentialBackoffConfig{ Base: time.Second, Factor: time.Second, Max: 20 * time.Second, }
DefaultExponentialBackoffConfig provides a default configuration for exponential backoff.
var ( // ErrSinkClosed is returned if Writer.Write call is issued to a sink that // has been closed. If encountered, the error should be considered terminal // and retries will not be successful. ErrSinkClosed = fmt.Errorf("sink closed") )
Functions ¶
This section is empty.
Types ¶
type BroadcasterSink ¶
type BroadcasterSink[M any] interface { Sink[M] // Add adds the sink to the broadcaster. // The provided sink must be comparable with equality. Typically, this just // works with a regular pointer type. Add(sink Sink[M]) error // Remove the provided sink. Remove(sink Sink[M]) error }
BroadcasterSink sends messages to multiple, reliable Sinks. The goal of this component is to dispatch messages to configured endpoints. Reliability can be provided by wrapping incoming sinks.
func NewBroadcaster ¶
func NewBroadcaster[M any](wErrHandler WriteErrorFn[M], to ...Sink[M]) BroadcasterSink[M]
NewBroadcaster appends one or more sinks to the list of sinks. The broadcaster behavior will be affected by the properties of the sink. Generally, the sink should accept all messages and deal with reliability on its own. Use of QueueSink and RetryingSink should be used here.
type ChannelSink ¶
type ChannelSink[M any] interface { Sink[M] // Done returns a channel that will always proceed once the sink is closed. Done() <-chan struct{} // Wait returns a channel that unblocks when a new message arrives. // Must be called in a separate goroutine from the writer. Wait() <-chan M }
ChannelSink defines a sink that can be listened on. The writer and channel listener must operate in separate goroutines.
Consumers should listen on Channel.C until Closed is closed.
func NewChannel ¶
func NewChannel[M any](buffer int) ChannelSink[M]
NewChannel returns a channel. If buffer is zero, the channel is unbuffered.
type ExponentialBackoffConfig ¶
type ExponentialBackoffConfig struct { // Base is the minimum bound for backing off after failure. Base time.Duration // Factor sets the amount of time by which the backoff grows with each // failure. Factor time.Duration // Max is the absolute maxiumum bound for a single backoff. Max time.Duration }
ExponentialBackoffConfig configures backoff parameters.
Note that these parameters operate on the upper bound for choosing a random value. For example, at Base=1s, a random value in [0,1s) will be chosen for the backoff value.
type FilterFn ¶
FilterFn defines a function filters out messages. If the function returns true, the message will be passed to the underlying sink. Otherwise, the message will be silently dropped.
type KinesisAPI ¶
type KinesisAPI interface {
PutRecord(ctx context.Context, params *kinesis.PutRecordInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordOutput, error)
}
KinesisAPI represents a Kinesis client for sending messages.
type Marshaller ¶
Marshaller converts an input message into a byte stream.
type RetrySinkStrategy ¶
type RetrySinkStrategy[M any] interface { // Proceed is called before every message send. If proceed returns a // positive, non-zero integer, the retryer will back off by the provided // duration. // // A message is provided, by may be ignored. Proceed(M) time.Duration // Failure reports a failure to the strategy. If this method returns true, // the message should be dropped. Failure(M, error) bool // Success should be called when a message is sent successfully. Success(M) }
RetrySinkStrategy defines a strategy for retrying message sink writes.
All methods should be goroutine safe.
func NewBreakerStrategy ¶
func NewBreakerStrategy[M any](threshold int, backoff time.Duration) RetrySinkStrategy[M]
NewBreakerStrategy returns a breaker that will backoff after the threshold has been tripped. A Breaker is thread safe and may be shared by many goroutines.
func NewExponentialBackoff ¶
func NewExponentialBackoff[M any](config ExponentialBackoffConfig) RetrySinkStrategy[M]
NewExponentialBackoff returns an exponential backoff strategy with the desired config. If config is nil, the default is returned.
type Sink ¶
Sink accepts and sends messages. A sink once closed, will not accept any more messages.
func NewFilter ¶
NewFilter returns a new filter that will send to messages to dst that return true for FilterFn.
func NewIOWriter ¶
func NewIOWriter[M any](iow io.Writer, marshaller Marshaller[M]) Sink[M]
NewIOWriter builds a sink that writes messages into the provided io.Writer.
func NewKinesisSink ¶
func NewKinesisSink[M any]( streamName string, api KinesisAPI, marshaller Marshaller[M], timeout time.Duration, onError WriteErrorFn[M], ) (Sink[M], error)
NewKinesisSink builds a new sink that sends messages to a Kinesis Stream.
func NewQueue ¶
func NewQueue[M any](dst Sink[M], throughput int, dropHandling WriteErrorFn[M]) Sink[M]
NewQueue returns a queue Sink with a given throughput to the provided Sink dst. nil dropHandling will set a noop handler.
func NewRetrying ¶
func NewRetrying[M any](sink Sink[M], strategy RetrySinkStrategy[M], dropHandling WriteErrorFn[M]) Sink[M]
NewRetrying returns a sink that will retry writes to a sink, backing off on failure. Parameters threshold and backoff adjust the behavior of the circuit breaker.
type WriteErrorFn ¶
WriteErrorFn defines a function that is invoked each time a message fails to be written to the underlying sink.
type Writer ¶
type Writer[M any] interface { // Write writes a message. If no error is returned, the caller can assume // that the message have been committed. If an error is received, the caller // may retry sending the message. Write(M) error }
Writer defines a component where messages can be written to.