Documentation
¶
Overview ¶
Package events is for event streaming and storage
Index ¶
- Variables
- func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
- func Publish(topic string, msg interface{}, opts ...PublishOption) error
- type AckFunc
- type Backup
- type ConsumeOption
- type ConsumeOptions
- type Event
- type NackFunc
- type Option
- type Options
- type PublishOption
- type PublishOptions
- type ReadOption
- type ReadOptions
- type Store
- type StoreOption
- type StoreOptions
- type Stream
- type WriteOption
- type WriteOptions
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultStream is the default events stream implementation. DefaultStream Stream // DefaultStore is the default events store implementation. DefaultStore Store )
var ( // ErrMissingTopic is returned if a blank topic was provided to publish. ErrMissingTopic = errors.New("Missing topic") // ErrEncodingMessage is returned from publish if there was an error encoding the message option. ErrEncodingMessage = errors.New("Error encoding message") )
Functions ¶
func Consume ¶
func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
Consume to events.
func Publish ¶
func Publish(topic string, msg interface{}, opts ...PublishOption) error
Publish an event to a topic.
Types ¶
type ConsumeOption ¶
type ConsumeOption func(o *ConsumeOptions)
ConsumeOption sets attributes on ConsumeOptions.
func WithAutoAck ¶
func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption
WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received the message is requeued in case auto ack is turned off.
func WithGroup ¶
func WithGroup(q string) ConsumeOption
WithGroup sets the consumer group to be part of when consuming events.
func WithOffset ¶
func WithOffset(t time.Time) ConsumeOption
WithOffset sets the offset time at which to start consuming events.
func WithRetryLimit ¶
func WithRetryLimit(retries int) ConsumeOption
WithRetryLimit sets the RetryLimit field on ConsumeOptions. Set to -1 for infinite retries (default).
type ConsumeOptions ¶
type ConsumeOptions struct { // Group is the name of the consumer group, if two consumers have the same group the events // are distributed between them Group string // Offset is the time from which the messages should be consumed from. If not provided then // the messages will be consumed starting from the moment the Subscription starts. Offset time.Time // AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered. // If false specifies that each message need ts to be manually acknowledged by the subscriber. // If processing is successful the message should be ack'ed to remove the message from the stream. // If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will // remain on the stream to be processed again. AutoAck bool AckWait time.Duration // RetryLimit indicates number of times a message is retried RetryLimit int // CustomRetries indicates whether to use RetryLimit CustomRetries bool }
ConsumeOptions contains all the options which can be provided when subscribing to a topic.
func (ConsumeOptions) GetRetryLimit ¶
func (s ConsumeOptions) GetRetryLimit() int
type Event ¶
type Event struct { // ID to uniquely identify the event ID string // Topic of event, e.g. "registry.service.created" Topic string // Timestamp of the event Timestamp time.Time // Metadata contains the values the event was indexed by Metadata map[string]string // Payload contains the encoded message Payload []byte // contains filtered or unexported fields }
Event is the object returned by the broker when you subscribe to a topic.
func (*Event) Nack ¶
Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode.
func (*Event) SetAckFunc ¶
func (*Event) SetNackFunc ¶
type Options ¶
func NewOptions ¶
type PublishOption ¶
type PublishOption func(o *PublishOptions)
PublishOption sets attributes on PublishOptions.
func WithMetadata ¶
func WithMetadata(md map[string]string) PublishOption
WithMetadata sets the Metadata field on PublishOptions.
func WithTimestamp ¶
func WithTimestamp(t time.Time) PublishOption
WithTimestamp sets the timestamp field on PublishOptions.
type PublishOptions ¶
type PublishOptions struct { // Metadata contains any keys which can be used to query the data, for example a customer id Metadata map[string]string // Timestamp to set for the event, if the timestamp is a zero value, the current time will be used Timestamp time.Time }
PublishOptions contains all the options which can be provided when publishing an event.
type ReadOption ¶
type ReadOption func(o *ReadOptions)
ReadOption sets attributes on ReadOptions.
func ReadLimit ¶
func ReadLimit(l uint) ReadOption
ReadLimit sets the limit attribute on ReadOptions.
func ReadOffset ¶
func ReadOffset(l uint) ReadOption
ReadOffset sets the offset attribute on ReadOptions.
type ReadOptions ¶
type ReadOptions struct { // Limit the number of results to return Limit uint // Offset the results by this number, useful for paginated queries Offset uint }
ReadOptions contains all the options which can be provided when reading events from a store.
type Store ¶
type Store interface { Read(topic string, opts ...ReadOption) ([]*Event, error) Write(event *Event, opts ...WriteOption) error }
Store is an event store interface.
func NewStore ¶
func NewStore(opts ...StoreOption) Store
NewStore returns an initialized events store.
type StoreOption ¶
type StoreOption func(o *StoreOptions)
func WithLogger ¶
func WithLogger(l logger.Logger) StoreOption
WithLogger sets the underline logger.
type StoreOptions ¶
type Stream ¶
type Stream interface { Publish(topic string, msg interface{}, opts ...PublishOption) error Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) }
Stream is an event streaming interface.
type WriteOption ¶
type WriteOption func(o *WriteOptions)
WriteOption sets attributes on WriteOptions.
func WithTTL ¶
func WithTTL(d time.Duration) WriteOption
WithTTL sets the TTL attribute on WriteOptions.
type WriteOptions ¶
type WriteOptions struct { // TTL is the duration the event should be recorded for, a zero value TTL indicates the event should // be stored indefinitely TTL time.Duration }
WriteOptions contains all the options which can be provided when writing an event to a store.