Documentation ¶
Index ¶
- Constants
- Variables
- func AlwaysCommitStrategy(ctx context.Context, message ConsumeMessage, handler EventHandler) error
- func CommitOnSuccessStrategy(ctx context.Context, message ConsumeMessage, handler EventHandler) error
- func GetConsumerGroupFromContext(ctx context.Context) string
- func RegisterConsumeStrategy(name string, fn ConsumeStrategyFactory)
- func RegisterListener(name string, fn ListenerFactory)
- func RegisterSender(name string, fn SenderFactory)
- func RegisterWriter(name string, fn WriterFactory)
- type Closer
- type ConsumeMessage
- type ConsumeStrategy
- type ConsumeStrategyFactory
- type Consumer
- func (c *Consumer) Start() error
- func (c *Consumer) Stop() error
- func (c *Consumer) StopContext(ctx context.Context) error
- func (c *Consumer) Subscribe(ctx context.Context, topic, group string, handler EventHandler) error
- func (c *Consumer) Use(middlewares ...EventMiddleware)
- func (c *Consumer) WithConsumeStrategy(strategy ConsumeStrategy)
- type ConsumerConfig
- type DriverConfig
- type Emitter
- type EmitterConfig
- type EventConfig
- type EventConsumeMessage
- type EventHandler
- type EventLogger
- func (e *EventLogger) Delete(ctx context.Context, message *EventMessage) error
- func (e *EventLogger) Listen(ctx context.Context, topic, group string) (Iterator, error)
- func (e *EventLogger) Next(ctx context.Context) (ConsumeMessage, error)
- func (e *EventLogger) Send(ctx context.Context, message *EventMessage) error
- type EventMessage
- type EventMiddleware
- type Iterator
- type IteratorFunc
- type Job
- type Listener
- type ListenerFactory
- type ListenerWorkerPool
- type OutboxRecord
- type Sender
- type SenderFactory
- type WorkerPoolConfig
- type Writer
- type WriterFactory
Constants ¶
const ( MetaHash = "hash" MetaTime = "timestamp" MetaEvent = "event" MetaVersion = "version" MetaDefault = "default" )
const (
DefaultConsumerWorkers = 1
)
Variables ¶
var (
ErrConsumerStarted = errors.New("Consumer already started")
)
Functions ¶
func AlwaysCommitStrategy ¶
func AlwaysCommitStrategy(ctx context.Context, message ConsumeMessage, handler EventHandler) error
AlwaysCommitStrategy will always commit the message no matter what is the handler result
func CommitOnSuccessStrategy ¶
func CommitOnSuccessStrategy(ctx context.Context, message ConsumeMessage, handler EventHandler) error
CommitOnSuccessStrategy will only commit the message if handler doesn't return error
func GetConsumerGroupFromContext ¶
GetConsumerGroupFromContext return consumer group from contet if any
func RegisterConsumeStrategy ¶
func RegisterConsumeStrategy(name string, fn ConsumeStrategyFactory)
RegisterConsumeStrategy register consumestrategy
func RegisterListener ¶
func RegisterListener(name string, fn ListenerFactory)
func RegisterSender ¶
func RegisterSender(name string, fn SenderFactory)
func RegisterWriter ¶
func RegisterWriter(name string, fn WriterFactory)
Types ¶
type ConsumeMessage ¶
type ConsumeStrategy ¶
type ConsumeStrategy func(ctx context.Context, message ConsumeMessage, handler EventHandler) error
type ConsumeStrategyFactory ¶
type ConsumeStrategyFactory func(ctx context.Context, config interface{}) (ConsumeStrategy, error)
func NoConfigConsumeStrategyFactory ¶
func NoConfigConsumeStrategyFactory(strategy ConsumeStrategy) ConsumeStrategyFactory
NoConfigConsumeStrategyFactory util function to create factory for consume strategy which doesn't need any config
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(ctx context.Context, config *ConsumerConfig) (*Consumer, error)
NewConsumer create new instance of consumer
func (*Consumer) Stop ¶
Stop gracefully stop the consumer waiting for all workers to complete before exiting
func (*Consumer) StopContext ¶
StopContext gracefully stop the consumer or until the context timeout
func (*Consumer) Subscribe ¶
Subscribe to a topic with specific group this should be call before Start the consumer
func (*Consumer) Use ¶
func (c *Consumer) Use(middlewares ...EventMiddleware)
Use add middlewares to actual event handler before accessing the actual handler Please add your middlewares before calling subscribe or it may not work properly
func (*Consumer) WithConsumeStrategy ¶
func (c *Consumer) WithConsumeStrategy(strategy ConsumeStrategy)
WithConsumeStrategy, set consume strategy for this consumer
type ConsumerConfig ¶
type ConsumerConfig struct { Listener *DriverConfig `json:"listener" mapstructure:"listener"` EventConfig *EventConfig `json:"event_config" mapstructure:"event_config"` WorkerPoolConfig *WorkerPoolConfig `json:"worker_pool_config" mapstructure:"worker_pool_config"` ConsumeStrategy *DriverConfig `json:"consume_strategy" mapstructure:"consume_strategy"` }
type DriverConfig ¶
type DriverConfig struct { Type string `json:"type" mapstructure:"type"` Config interface{} `json:"config" mapstructure:"config"` }
type EmitterConfig ¶
type EmitterConfig struct { Sender *DriverConfig `json:"sender" mapstructure:"sender"` Writer *DriverConfig `json:"writer" mapstructure:"writer"` EventConfig *EventConfig `json:"event_config" mapstructure:"event_config"` }
type EventConfig ¶
type EventConfig struct { Metadata map[string]map[string]interface{} `json:"metadata,omitempty" mapstructure:"metadata"` EventMap map[string]string `json:"event_map,omitempty" mapstructure:"event_map"` GroupMap map[string]string `json:"group_map,omitempty" mapstructure:"group_map"` }
func NewEventConfig ¶
func NewEventConfig() *EventConfig
type EventConsumeMessage ¶
type EventConsumeMessage struct { Topic string Key string Metadata map[string]interface{} Data []byte }
func NewEventConsumeMessage ¶
func NewEventConsumeMessage(v []byte) (*EventConsumeMessage, error)
NewEventConsumeMessage return event consume message from byte data
type EventHandler ¶
type EventHandler func(ctx context.Context, message *EventConsumeMessage) error
type EventLogger ¶
type EventLogger struct {
// contains filtered or unexported fields
}
func NewEventLogger ¶
func NewEventLogger(ctx context.Context, config interface{}) (*EventLogger, error)
func (*EventLogger) Delete ¶
func (e *EventLogger) Delete(ctx context.Context, message *EventMessage) error
func (*EventLogger) Next ¶
func (e *EventLogger) Next(ctx context.Context) (ConsumeMessage, error)
func (*EventLogger) Send ¶
func (e *EventLogger) Send(ctx context.Context, message *EventMessage) error
type EventMessage ¶
type EventMessage struct { Topic string `json:"-"` Key string `json:"-"` Data interface{} `json:"data,omitempty" mapstructure:"data"` Metadata map[string]interface{} `json:"metadata,omitempty" mapstructure:"metadata"` RawData []byte `json:"-"` //To provide raw data to consumer }
func (*EventMessage) Hash ¶
func (m *EventMessage) Hash() (string, error)
func (*EventMessage) ToBytes ¶
func (m *EventMessage) ToBytes() ([]byte, error)
type EventMiddleware ¶
type EventMiddleware func(next EventHandler) EventHandler
type IteratorFunc ¶
type IteratorFunc func(ctx context.Context) (ConsumeMessage, error)
func (IteratorFunc) Next ¶
func (fn IteratorFunc) Next(ctx context.Context) (ConsumeMessage, error)
type ListenerFactory ¶
type ListenerWorkerPool ¶
type ListenerWorkerPool struct {
// contains filtered or unexported fields
}
type OutboxRecord ¶
type OutboxRecord struct { ID string `json:"id,omitempty" mapstructure:"id"` Topic string `json:"topic,omitempty" mapstructure:"topic"` Key string `json:"key,omitempty" mapstructure:"key"` Value string `json:"value,omitempty" mapstructure:"value"` CreatedAt time.Time `json:"created_at,omitempty" mapstructure:"created_at"` }
OutboxRecord outbox model
func OutboxFromMessage ¶
func OutboxFromMessage(msg *EventMessage) (*OutboxRecord, error)
func (*OutboxRecord) GenerateID ¶
func (o *OutboxRecord) GenerateID() *OutboxRecord
GenerateID generate record ID
type SenderFactory ¶
type WorkerPoolConfig ¶
type WorkerPoolConfig map[string]interface{}