Versions in this module Expand all Collapse all v4 v4.7.1 Feb 23, 2023 Changes in this version + var DefaultStore Store + var DefaultStream Stream + var ErrEncodingMessage = errors.New("Error encoding message") + var ErrMissingTopic = errors.New("Missing topic") + func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) + func Publish(topic string, msg interface{}, opts ...PublishOption) error + type AckFunc func() error + type Backup interface + Snapshot func(st store.Store) error + type ConsumeOption func(o *ConsumeOptions) + func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption + func WithGroup(q string) ConsumeOption + func WithOffset(t time.Time) ConsumeOption + func WithRetryLimit(retries int) ConsumeOption + type ConsumeOptions struct + AckWait time.Duration + AutoAck bool + CustomRetries bool + Group string + Offset time.Time + RetryLimit int + func (s ConsumeOptions) GetRetryLimit() int + type Event struct + ID string + Metadata map[string]string + Payload []byte + Timestamp time.Time + Topic string + func Read(topic string, opts ...ReadOption) ([]*Event, error) + func (e *Event) Ack() error + func (e *Event) Nack() error + func (e *Event) SetAckFunc(f AckFunc) + func (e *Event) SetNackFunc(f NackFunc) + func (e *Event) Unmarshal(v interface{}) error + type NackFunc func() error + type Option func(o *Options) + type Options struct + type PublishOption func(o *PublishOptions) + func WithMetadata(md map[string]string) PublishOption + func WithTimestamp(t time.Time) PublishOption + type PublishOptions struct + Metadata map[string]string + Timestamp time.Time + type ReadOption func(o *ReadOptions) + func ReadLimit(l uint) ReadOption + func ReadOffset(l uint) ReadOption + type ReadOptions struct + Limit uint + Offset uint + type Store interface + Read func(topic string, opts ...ReadOption) ([]*Event, error) + Write func(event *Event, opts ...WriteOption) error + func NewStore(opts ...StoreOption) Store + type StoreOption func(o *StoreOptions) + type StoreOptions struct + Backup Backup + TTL time.Duration + type Stream interface + Consume func(topic string, opts ...ConsumeOption) (<-chan Event, error) + Publish func(topic string, msg interface{}, opts ...PublishOption) error + func NewStream(opts ...Option) (Stream, error) + type WriteOption func(o *WriteOptions) + func WithTTL(d time.Duration) WriteOption + type WriteOptions struct + TTL time.Duration