events

package
v5.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2022 License: Apache-2.0 Imports: 9 Imported by: 3

Documentation

Overview

Package events is for event streaming and storage

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultStream is the default events stream implementation.
	DefaultStream Stream
	// DefaultStore is the default events store implementation.
	DefaultStore Store
)
View Source
var (
	// ErrMissingTopic is returned if a blank topic was provided to publish.
	ErrMissingTopic = errors.New("Missing topic")
	// ErrEncodingMessage is returned from publish if there was an error encoding the message option.
	ErrEncodingMessage = errors.New("Error encoding message")
)

Functions

func Consume

func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)

Consume to events.

func Publish

func Publish(topic string, msg interface{}, opts ...PublishOption) error

Publish an event to a topic.

Types

type AckFunc

type AckFunc func() error

type Backup

type Backup interface {
	Snapshot(st store.Store) error
}

Backup is an interface for snapshotting the events store to long term storage.

type ConsumeOption

type ConsumeOption func(o *ConsumeOptions)

ConsumeOption sets attributes on ConsumeOptions.

func WithAutoAck

func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption

WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received the message is requeued in case auto ack is turned off.

func WithGroup

func WithGroup(q string) ConsumeOption

WithGroup sets the consumer group to be part of when consuming events.

func WithOffset

func WithOffset(t time.Time) ConsumeOption

WithOffset sets the offset time at which to start consuming events.

func WithRetryLimit

func WithRetryLimit(retries int) ConsumeOption

WithRetryLimit sets the RetryLimit field on ConsumeOptions. Set to -1 for infinite retries (default).

type ConsumeOptions

type ConsumeOptions struct {
	// Group is the name of the consumer group, if two consumers have the same group the events
	// are distributed between them
	Group string
	// Offset is the time from which the messages should be consumed from. If not provided then
	// the messages will be consumed starting from the moment the Subscription starts.
	Offset time.Time
	// AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered.
	// If false specifies that each message need ts to be manually acknowledged by the subscriber.
	// If processing is successful the message should be ack'ed to remove the message from the stream.
	// If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will
	// remain on the stream to be processed again.
	AutoAck bool
	AckWait time.Duration
	// RetryLimit indicates number of times a message is retried
	RetryLimit int
	// CustomRetries indicates whether to use RetryLimit
	CustomRetries bool
}

ConsumeOptions contains all the options which can be provided when subscribing to a topic.

func (ConsumeOptions) GetRetryLimit

func (s ConsumeOptions) GetRetryLimit() int

type Event

type Event struct {
	// ID to uniquely identify the event
	ID string
	// Topic of event, e.g. "registry.service.created"
	Topic string
	// Timestamp of the event
	Timestamp time.Time
	// Metadata contains the values the event was indexed by
	Metadata map[string]string
	// Payload contains the encoded message
	Payload []byte
	// contains filtered or unexported fields
}

Event is the object returned by the broker when you subscribe to a topic.

func Read

func Read(topic string, opts ...ReadOption) ([]*Event, error)

Read events for a topic.

func (*Event) Ack

func (e *Event) Ack() error

Ack acknowledges successful processing of the event in ManualAck mode.

func (*Event) Nack

func (e *Event) Nack() error

Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode.

func (*Event) SetAckFunc

func (e *Event) SetAckFunc(f AckFunc)

func (*Event) SetNackFunc

func (e *Event) SetNackFunc(f NackFunc)

func (*Event) Unmarshal

func (e *Event) Unmarshal(v interface{}) error

Unmarshal the events message into an object.

type NackFunc

type NackFunc func() error

type Option

type Option func(o *Options)

type Options

type Options struct {
	Logger logger.Logger
}

func NewOptions

func NewOptions(opts ...Option) *Options

type PublishOption

type PublishOption func(o *PublishOptions)

PublishOption sets attributes on PublishOptions.

func WithMetadata

func WithMetadata(md map[string]string) PublishOption

WithMetadata sets the Metadata field on PublishOptions.

func WithTimestamp

func WithTimestamp(t time.Time) PublishOption

WithTimestamp sets the timestamp field on PublishOptions.

type PublishOptions

type PublishOptions struct {
	// Metadata contains any keys which can be used to query the data, for example a customer id
	Metadata map[string]string
	// Timestamp to set for the event, if the timestamp is a zero value, the current time will be used
	Timestamp time.Time
}

PublishOptions contains all the options which can be provided when publishing an event.

type ReadOption

type ReadOption func(o *ReadOptions)

ReadOption sets attributes on ReadOptions.

func ReadLimit

func ReadLimit(l uint) ReadOption

ReadLimit sets the limit attribute on ReadOptions.

func ReadOffset

func ReadOffset(l uint) ReadOption

ReadOffset sets the offset attribute on ReadOptions.

type ReadOptions

type ReadOptions struct {
	// Limit the number of results to return
	Limit uint
	// Offset the results by this number, useful for paginated queries
	Offset uint
}

ReadOptions contains all the options which can be provided when reading events from a store.

type Store

type Store interface {
	Read(topic string, opts ...ReadOption) ([]*Event, error)
	Write(event *Event, opts ...WriteOption) error
}

Store is an event store interface.

func NewStore

func NewStore(opts ...StoreOption) Store

NewStore returns an initialized events store.

type StoreOption

type StoreOption func(o *StoreOptions)

func WithLogger

func WithLogger(l logger.Logger) StoreOption

WithLogger sets the underline logger.

type StoreOptions

type StoreOptions struct {
	TTL    time.Duration
	Backup Backup
	Logger logger.Logger
}

type Stream

type Stream interface {
	Publish(topic string, msg interface{}, opts ...PublishOption) error
	Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
}

Stream is an event streaming interface.

func NewStream

func NewStream(opts ...Option) (Stream, error)

NewStream returns an initialized memory stream.

type WriteOption

type WriteOption func(o *WriteOptions)

WriteOption sets attributes on WriteOptions.

func WithTTL

func WithTTL(d time.Duration) WriteOption

WithTTL sets the TTL attribute on WriteOptions.

type WriteOptions

type WriteOptions struct {
	// TTL is the duration the event should be recorded for, a zero value TTL indicates the event should
	// be stored indefinitely
	TTL time.Duration
}

WriteOptions contains all the options which can be provided when writing an event to a store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL