Versions in this module Expand all Collapse all v1 v1.0.0 Jun 8, 2024 Changes in this version + const DefaultConsumerWorkers + const MetaDefault + const MetaEvent + const MetaHash + const MetaTime + const MetaVersion + var ErrConsumerStarted = errors.New("Consumer already started") + func AlwaysCommitStrategy(ctx context.Context, message ConsumeMessage, handler EventHandler) error + func CommitOnSuccessStrategy(ctx context.Context, message ConsumeMessage, handler EventHandler) error + func GetConsumerGroupFromContext(ctx context.Context) string + func RegisterConsumeStrategy(name string, fn ConsumeStrategyFactory) + func RegisterListener(name string, fn ListenerFactory) + func RegisterSender(name string, fn SenderFactory) + func RegisterWriter(name string, fn WriterFactory) + type Closer interface + Close func() error + type ConsumeMessage interface + Commit func(ctx context.Context) error + GetEventConsumeMessage func(ctx context.Context) (*EventConsumeMessage, error) + type ConsumeStrategy func(ctx context.Context, message ConsumeMessage, handler EventHandler) error + type ConsumeStrategyFactory func(ctx context.Context, config interface{}) (ConsumeStrategy, error) + func NoConfigConsumeStrategyFactory(strategy ConsumeStrategy) ConsumeStrategyFactory + type Consumer struct + func NewConsumer(ctx context.Context, config *ConsumerConfig) (*Consumer, error) + func (c *Consumer) Start() error + func (c *Consumer) Stop() error + func (c *Consumer) StopContext(ctx context.Context) error + func (c *Consumer) Subscribe(ctx context.Context, topic, group string, handler EventHandler) error + func (c *Consumer) Use(middlewares ...EventMiddleware) + func (c *Consumer) WithConsumeStrategy(strategy ConsumeStrategy) + type ConsumerConfig struct + ConsumeStrategy *DriverConfig + EventConfig *EventConfig + Listener *DriverConfig + WorkerPoolConfig *WorkerPoolConfig + type DriverConfig struct + Config interface{} + Type string + type Emitter struct + func New(ctx context.Context, config *EmitterConfig) (*Emitter, error) + func (e *Emitter) Publish(ctx context.Context, event, key string, message interface{}, ...) error + type EmitterConfig struct + EventConfig *EventConfig + Sender *DriverConfig + Writer *DriverConfig + type EventConfig struct + EventMap map[string]string + GroupMap map[string]string + Metadata map[string]map[string]interface{} + func NewEventConfig() *EventConfig + type EventConsumeMessage struct + Data []byte + Key string + Metadata map[string]interface{} + Topic string + func NewEventConsumeMessage(v []byte) (*EventConsumeMessage, error) + type EventHandler func(ctx context.Context, message *EventConsumeMessage) error + type EventLogger struct + func NewEventLogger(ctx context.Context, config interface{}) (*EventLogger, error) + func (e *EventLogger) Delete(ctx context.Context, message *EventMessage) error + func (e *EventLogger) Listen(ctx context.Context, topic, group string) (Iterator, error) + func (e *EventLogger) Next(ctx context.Context) (ConsumeMessage, error) + func (e *EventLogger) Send(ctx context.Context, message *EventMessage) error + type EventMessage struct + Data interface{} + Key string + Metadata map[string]interface{} + RawData []byte + Topic string + func (m *EventMessage) Hash() (string, error) + func (m *EventMessage) ToBytes() ([]byte, error) + type EventMiddleware func(next EventHandler) EventHandler + type Iterator interface + Next func(ctx context.Context) (ConsumeMessage, error) + type IteratorFunc func(ctx context.Context) (ConsumeMessage, error) + func (fn IteratorFunc) Next(ctx context.Context) (ConsumeMessage, error) + type Job func(ctx context.Context) error + type Listener interface + Listen func(ctx context.Context, topic, group string) (Iterator, error) + func EventLoggerListener(ctx context.Context, config interface{}) (Listener, error) + type ListenerFactory func(ctx context.Context, config interface{}) (Listener, error) + type ListenerWorkerPool struct + type OutboxRecord struct + CreatedAt time.Time + ID string + Key string + Topic string + Value string + func OutboxFromMessage(msg *EventMessage) (*OutboxRecord, error) + func (o *OutboxRecord) GenerateID() *OutboxRecord + func (o *OutboxRecord) Hash() []byte + type Sender interface + Send func(ctx context.Context, message *EventMessage) error + func EventLoggerSender(ctx context.Context, config interface{}) (Sender, error) + type SenderFactory func(ctx context.Context, config interface{}) (Sender, error) + type WorkerPoolConfig map[string]interface + type Writer interface + Delete func(ctx context.Context, message *EventMessage) error + func EventLoggerWriter(ctx context.Context, config interface{}) (Writer, error) + type WriterFactory func(ctx context.Context, config interface{}) (Writer, error)