Documentation ¶
Index ¶
- Variables
- func NewKSUID() (string, error)
- func NewUUID() (string, error)
- type Bus
- type DeduplicationStorage
- type EmbeddedDeduplicationStorage
- type EmbeddedDeduplicationStorageConfig
- type ErrUnrecoverableWrap
- type Event
- type EventRegistry
- type IdentifierFactory
- type Message
- type NoopWriter
- type Publisher
- type PublisherConfig
- type PublisherOption
- type ReadTask
- type Reader
- type ReaderHandleFunc
- type ReaderMiddlewareFunc
- type SubscriberScheduler
- func (r *SubscriberScheduler) Shutdown() error
- func (r *SubscriberScheduler) Start() error
- func (r *SubscriberScheduler) Subscribe(topic string, event Event, handler ReaderHandleFunc) *ReadTask
- func (r *SubscriberScheduler) SubscribeEvent(event Event, handler ReaderHandleFunc) *ReadTask
- func (r *SubscriberScheduler) SubscribeEventSafe(event Event, handler ReaderHandleFunc) (*ReadTask, error)
- func (r *SubscriberScheduler) SubscribeTopic(topic string, handler ReaderHandleFunc) *ReadTask
- type Writer
Constants ¶
This section is empty.
Variables ¶
var ( ErrBusIsShutdown = errors.New("streams: bus has been terminated") ErrEmptyMessage = errors.New("streams: message is empty") ErrUnrecoverable = errors.New("streams: unrecoverable error") ErrEventNotFound = errors.New("streams: event not found") ErrNoSubscriberRegistered = errors.New("streams: subscriber scheduler has no subscriber tasks") )
Functions ¶
Types ¶
type Bus ¶
type Bus struct { Publisher SubscriberScheduler EventRegistry }
A Bus is the top-level component used by systems to interact with data-in-motion platforms. If finer-grained tuning or experience is desired, use low-level components such as Writer or Reader.
type DeduplicationStorage ¶
type DeduplicationStorage interface { // Commit registers and acknowledges a message has been processed correctly. Commit(ctx context.Context, workerID, messageID string) // IsDuplicated indicates if a message has been processed before. IsDuplicated(ctx context.Context, workerID, messageID string) (bool, error) }
A DeduplicationStorage is a special kind of storage used by a data-in motion system to keep track of duplicate messages.
It is recommended to use in-memory (or at least external) storages to increase performance significantly and reduce main database backpressure.
type EmbeddedDeduplicationStorage ¶
type EmbeddedDeduplicationStorage struct {
// contains filtered or unexported fields
}
A EmbeddedDeduplicationStorage is the in-memory concrete implementation of DeduplicationStorage using package allegro.BigCache as high-performance underlying storage.
Consider by using this storage, your computing instance becomes stateful, meaning if the node gets down, the deduplicated message database will be dropped as well.
func NewEmbeddedDeduplicationStorage ¶
func NewEmbeddedDeduplicationStorage(cfg EmbeddedDeduplicationStorageConfig, cache *bigcache.BigCache) EmbeddedDeduplicationStorage
NewEmbeddedDeduplicationStorage allocates an in-memory DeduplicationStorage instance.
func (EmbeddedDeduplicationStorage) Commit ¶
func (e EmbeddedDeduplicationStorage) Commit(_ context.Context, workerID, messageID string)
func (EmbeddedDeduplicationStorage) IsDuplicated ¶
type EmbeddedDeduplicationStorageConfig ¶
type EmbeddedDeduplicationStorageConfig struct { Logger *log.Logger ErrorLogger *log.Logger KeyDelimiter string }
EmbeddedDeduplicationStorageConfig is the EmbeddedDeduplicationStorage schema configuration.
type ErrUnrecoverableWrap ¶
type ErrUnrecoverableWrap struct {
ParentErr error
}
A ErrUnrecoverableWrap is a special wrapper for certain type of errors with no recoverable action.
func (ErrUnrecoverableWrap) Error ¶
func (u ErrUnrecoverableWrap) Error() string
func (ErrUnrecoverableWrap) String ¶
func (u ErrUnrecoverableWrap) String() string
func (ErrUnrecoverableWrap) Unwrap ¶
func (u ErrUnrecoverableWrap) Unwrap() error
Unwrap returns ErrUnrecoverable variable. Thus, calls to errors.Is(err, ErrUnrecoverable) routine will detect this wrapper as unrecoverable error.
type Event ¶
type Event interface { // GetHeaders allocates a set of key-value items to be passed through data-in-motion platforms as message headers. GetHeaders() map[string]string // GetKey retrieves a key used by certain data-in-motion platforms (e.g. Apache Kafka) to route // messages with the same key to certain partitions/queues. Leave empty if routing is NOT desired. GetKey() string }
An Event is an abstraction unit of Message. Represents factual information about a happening inside a system (hence its immutable).
type EventRegistry ¶
A EventRegistry is a low-level storage used to create relationships between Event types and topics (streams).
func (EventRegistry) GetEventTopic ¶
func (r EventRegistry) GetEventTopic(event Event) (string, error)
GetEventTopic retrieves the attached topic of the Event. Returns ErrEventNotFound if Event entry is not available.
func (EventRegistry) RegisterEvent ¶
func (r EventRegistry) RegisterEvent(event Event, topic string)
RegisterEvent creates a relationship between an Event type and a topic (stream).
type IdentifierFactory ¶
An IdentifierFactory is a small component function that generates unique identifiers.
type Message ¶
type Message struct { ID string `json:"message_id"` // Message unique identifier. StreamName string `json:"stream_name"` // Name of the stream this Message will be/is transported. StreamKey string `json:"stream_key"` // A key used by underlying systems to route a Message to specific partitions/queues. Headers map[string]string `json:"message_headers"` // Map which passes additional context and metadata about the message. // Type of Data content. // // The usage of the RFC2046 MIME specification is preferred (e.g. application/json, application/xml). // // [Reference](https://www.rfc-editor.org/rfc/rfc2046). ContentType string `json:"content_type"` Data []byte `json:"data"` // Encoded information generated by a system. Time time.Time `json:"message_time"` // Timestamp of a Message publishing operation. DecodedData any `json:"-"` // Only available on readers. Decoded Data using an underlying codec.Codec implementation. }
A Message is the unit of transport containing information generated by a system. In addition, it contains metadata used by `streams` internal mechanisms. A Message will be/is transported through one or more streams.
type NoopWriter ¶
type NoopWriter struct {
WantWriterErr error
}
NoopWriter no-operation Writer instance.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
A Publisher is a high-level component which writes Event(s) into topics (streams). Depending on the underlying Writer, publish routines will write Event(s) in batches or one-by-one.
func NewPublisher ¶
func NewPublisher(w Writer, eventReg EventRegistry, options ...PublisherOption) Publisher
NewPublisher allocates a new Publisher instance ready to be used. Specify options to customize default configurations.
type PublisherConfig ¶
type PublisherConfig struct { IdentifierFactory IdentifierFactory Codec codec.Codec }
PublisherConfig is the Publisher configuration.
type PublisherOption ¶
type PublisherOption interface {
// contains filtered or unexported methods
}
func WithIdentifierFactory ¶
func WithIdentifierFactory(f IdentifierFactory) PublisherOption
func WithPublisherCodec ¶
func WithPublisherCodec(c codec.Codec) PublisherOption
type ReadTask ¶
type ReadTask struct { Stream string Handler ReaderHandleFunc ExternalArgs map[string]any }
A ReadTask is the unit of information a SubscriberScheduler passes to Reader workers in order to start stream-reading jobs. Use ExternalArgs to specify driver-specific configuration.
func (*ReadTask) SetArg ¶
SetArg sets an entry into ExternalArgs and returns the ReadTask instance ready to be chained to another builder routine (Fluent API-like). Arguments will be later passed to Reader concrete implementations; useful for situations where readers accept extra arguments such as consumer groups identifiers (Apache Kafka).
func (*ReadTask) WithMiddleware ¶
func (t *ReadTask) WithMiddleware(middlewareFunc ReaderMiddlewareFunc) *ReadTask
WithMiddleware appends a ReaderHandleFunc instance to ReadTask.Handler; this is also known as chain of responsibility pattern.
type Reader ¶
type Reader interface { // Read reads from the specified stream in ReadTask, blocking the I/O. Everytime a new message arrives, // Reader will execute ReadTask.Handler routine in a separate goroutine. // // Use ctx context.Context to signal shutdowns. Read(ctx context.Context, task ReadTask) error }
A Reader is a low-level component which allows systems to read from a stream.
type ReaderHandleFunc ¶
ReaderHandleFunc routine to be executed for each message received by Reader instances.
type ReaderMiddlewareFunc ¶
type ReaderMiddlewareFunc func(next ReaderHandleFunc) ReaderHandleFunc
func WithDeadLetterQueue ¶
func WithDeadLetterQueue(writer Writer) ReaderMiddlewareFunc
WithDeadLetterQueue appends to ReaderHandleFunc(s) a mechanism to send poisoned or failed (after retries) messages to a dead-letter queue (DLQ). The dead-letter queue MIGHT retain these messages for a longer time that a normal queue.
Dead-letter queue messages are emitted to the Message.StreamName but with the suffix ".dlq".
After failures, a dead-letter queue comes into play as engineering teams can manually/automatically enqueue failed messages again into the original queue (i.e. re-drive/replay policies), so messages can be processed again without further overhead.
Moreover, this dead-letter queue could not only be a message bus like Apache Kafka or services like Amazon SQS; even a blob storage service like Amazon S3 could implement Writer and retain failed messages.
func WithDeduplication ¶
func WithDeduplication(group string, storage DeduplicationStorage) ReaderMiddlewareFunc
WithDeduplication appends to ReaderHandleFunc(s) a mechanism to deduplicate processed messages, ensuring idempotency. Uses DeduplicationStorage to keep track of processed messages and group identifier to enable multiple isolated workers commit their processed messages to the same storage.
func WithReaderErrorLogger ¶
func WithReaderErrorLogger(logger *log.Logger) ReaderMiddlewareFunc
WithReaderErrorLogger appends to ReaderHandleFunc(s) a mechanism to log errors using a logger instance.
func WithReaderRetry ¶
func WithReaderRetry(retry *retrier.Retrier) ReaderMiddlewareFunc
WithReaderRetry appends to ReaderHandleFunc(s) a mechanism to retry up to N times. Uses retrier.Retrier package to enable advanced backoff mechanisms such as exponential plus jitter.
type SubscriberScheduler ¶
type SubscriberScheduler struct {
// contains filtered or unexported fields
}
A SubscriberScheduler is a high-level component used to manage and schedule Reader tasks.
Zero value is NOT ready to use.
func NewSubscriberScheduler ¶
func NewSubscriberScheduler(r Reader, eventReg EventRegistry) SubscriberScheduler
NewSubscriberScheduler allocates a new SubscriberScheduler instance ready to be used. Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).
func (*SubscriberScheduler) Shutdown ¶
func (r *SubscriberScheduler) Shutdown() error
Shutdown triggers graceful shutdown of running ReadTask(s) worker(s). This routine will block I/O until all workers have been properly shutdown.
func (*SubscriberScheduler) Start ¶
func (r *SubscriberScheduler) Start() error
Start schedules and spins up a worker for each registered ReadTask(s).
func (*SubscriberScheduler) Subscribe ¶
func (r *SubscriberScheduler) Subscribe(topic string, event Event, handler ReaderHandleFunc) *ReadTask
Subscribe registers a stream reading job using Event registered topic from EventRegistry. This routine will append a new entry to EventRegistry if Event was not found at first try, automating event-topic registration.
Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).
func (*SubscriberScheduler) SubscribeEvent ¶
func (r *SubscriberScheduler) SubscribeEvent(event Event, handler ReaderHandleFunc) *ReadTask
SubscribeEvent registers a stream reading job using Event registered topic from EventRegistry. This routine will panic if Event was not previously registered.
Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).
func (*SubscriberScheduler) SubscribeEventSafe ¶
func (r *SubscriberScheduler) SubscribeEventSafe(event Event, handler ReaderHandleFunc) (*ReadTask, error)
SubscribeEventSafe registers a stream reading job using Event registered topic from EventRegistry. Returns ErrEventNotFound if Event was not previously registered.
func (*SubscriberScheduler) SubscribeTopic ¶
func (r *SubscriberScheduler) SubscribeTopic(topic string, handler ReaderHandleFunc) *ReadTask
SubscribeTopic registers a stream reading job to a specific topic. Returns a ReadTask instance ready to be chained to another builder routine (Fluent API-like).
type Writer ¶
type Writer interface { // Write writes a message batch into a stream specified on each Message through the Message.StreamName field. // // Depending on the underlying Writer implementation, this routine will write messages in actual batches, // batch chunks or one-by-one. Write(ctx context.Context, msgBatch []Message) error }
A Writer is a low-level component that writes messages into streams. Each message MUST have its own stream name specified.