Documentation ¶
Index ¶
- func DefaultConfiguration() *sarama.Config
- func FromConsumerMessage(source *sarama.ConsumerMessage) (message.Message[message.Bytes, message.Bytes], error)
- func FromConsumerMessages(sources []*sarama.ConsumerMessage) ([]message.Message[message.Bytes, message.Bytes], error)
- func NewProducer(configurations ...runtime.Configuration[*Producer]) runtime.Producer
- func ToProducerMessage(source message.Message[message.Bytes, message.Bytes]) (*sarama.ProducerMessage, error)
- func ToProducerMessages(sources []message.Message[message.Bytes, message.Bytes]) ([]*sarama.ProducerMessage, error)
- func WithConsumerBroker(brokers ...string) runtime.Configuration[*Consumer]
- func WithConsumerGroupName(groupName string) runtime.Configuration[*Consumer]
- func WithConsumerLoop(loop ConsumerLoop) runtime.Configuration[*Consumer]
- func WithConsumerRuntimeController(controller runtime.Controller) runtime.Configuration[*Consumer]
- func WithConsumerSaramaConfig(saramaConfig *sarama.Config) runtime.Configuration[*Consumer]
- func WithConsumerTopic(topics ...string) runtime.Configuration[*Consumer]
- func WithLoopBatchFunction(loopFunction stateless.BatchFunction) runtime.Configuration[*ConsumerBatchedLoop]
- func WithLoopBatchMaxBufferred(maxBuffered int64) runtime.Configuration[*ConsumerBatchedLoop]
- func WithLoopBatchMaxDelay(maxDelay time.Duration) runtime.Configuration[*ConsumerBatchedLoop]
- func WithLoopBatchPrometheus() runtime.Configuration[*ConsumerBatchedLoop]
- func WithLoopSingleFunction(loopFunction stateless.SingleFunction) runtime.Configuration[*ConsumerSingleLoop]
- func WithLoopSinglePrometheus() runtime.Configuration[*ConsumerSingleLoop]
- func WithProducerBroker(brokers ...string) runtime.Configuration[*Producer]
- func WithProducerRuntimeController(controller runtime.Controller) runtime.Configuration[*Producer]
- func WithProducerSaramaConfig(saramaConfig *sarama.Config) runtime.Configuration[*Producer]
- type Consumer
- func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error
- func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *Consumer) Run()
- func (c *Consumer) Setup(sarama.ConsumerGroupSession) error
- func (c *Consumer) Start() error
- func (c *Consumer) Stop()
- type ConsumerBatchedLoop
- type ConsumerLoop
- type ConsumerSingleLoop
- type Producer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultConfiguration ¶
func FromConsumerMessage ¶
func FromConsumerMessage(source *sarama.ConsumerMessage) (message.Message[message.Bytes, message.Bytes], error)
mapping sarama consumer message to internal representation
func FromConsumerMessages ¶
func NewProducer ¶
func NewProducer(configurations ...runtime.Configuration[*Producer]) runtime.Producer
func ToProducerMessage ¶
func ToProducerMessage(source message.Message[message.Bytes, message.Bytes]) (*sarama.ProducerMessage, error)
mapping internal representation into sarama producer message
func ToProducerMessages ¶
func WithConsumerBroker ¶
func WithConsumerBroker(brokers ...string) runtime.Configuration[*Consumer]
func WithConsumerGroupName ¶
func WithConsumerGroupName(groupName string) runtime.Configuration[*Consumer]
func WithConsumerLoop ¶
func WithConsumerLoop(loop ConsumerLoop) runtime.Configuration[*Consumer]
func WithConsumerRuntimeController ¶
func WithConsumerRuntimeController(controller runtime.Controller) runtime.Configuration[*Consumer]
func WithConsumerSaramaConfig ¶
func WithConsumerSaramaConfig(saramaConfig *sarama.Config) runtime.Configuration[*Consumer]
func WithConsumerTopic ¶
func WithConsumerTopic(topics ...string) runtime.Configuration[*Consumer]
func WithLoopBatchFunction ¶
func WithLoopBatchFunction(loopFunction stateless.BatchFunction) runtime.Configuration[*ConsumerBatchedLoop]
func WithLoopBatchMaxBufferred ¶
func WithLoopBatchMaxBufferred(maxBuffered int64) runtime.Configuration[*ConsumerBatchedLoop]
configuration
func WithLoopBatchMaxDelay ¶
func WithLoopBatchMaxDelay(maxDelay time.Duration) runtime.Configuration[*ConsumerBatchedLoop]
func WithLoopBatchPrometheus ¶
func WithLoopBatchPrometheus() runtime.Configuration[*ConsumerBatchedLoop]
func WithLoopSingleFunction ¶
func WithLoopSingleFunction(loopFunction stateless.SingleFunction) runtime.Configuration[*ConsumerSingleLoop]
configuration
func WithLoopSinglePrometheus ¶
func WithLoopSinglePrometheus() runtime.Configuration[*ConsumerSingleLoop]
func WithProducerBroker ¶
func WithProducerBroker(brokers ...string) runtime.Configuration[*Producer]
func WithProducerRuntimeController ¶
func WithProducerRuntimeController(controller runtime.Controller) runtime.Configuration[*Producer]
func WithProducerSaramaConfig ¶
func WithProducerSaramaConfig(saramaConfig *sarama.Config) runtime.Configuration[*Producer]
Types ¶
type Consumer ¶
type Consumer struct { // required Topics []string Brokers []string SaramaConfiguration *sarama.Config Loop ConsumerLoop Controller runtime.Controller // not required GroupName string // set in start Group sarama.ConsumerGroup Context context.Context Cancel context.CancelFunc }
implementation
func NewConsumer ¶
func NewConsumer(configurations ...runtime.Configuration[*Consumer]) *Consumer
constructor
func (*Consumer) Cleanup ¶
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Consumer) ConsumeClaim ¶
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumerGroup loop of ConsumerGroupClaim's Messages().
type ConsumerBatchedLoop ¶
type ConsumerBatchedLoop struct { // batching configuration MaxBufferred int64 MaxDelay time.Duration F stateless.BatchFunction // contains filtered or unexported fields }
implementation
func (*ConsumerBatchedLoop) Loop ¶
func (batchConsume *ConsumerBatchedLoop) Loop(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*ConsumerBatchedLoop) Start ¶
func (consumerSarama *ConsumerBatchedLoop) Start() error
func (*ConsumerBatchedLoop) Stop ¶
func (consumerSarama *ConsumerBatchedLoop) Stop()
type ConsumerLoop ¶
type ConsumerLoop interface { Loop(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error Start() error Stop() }
func NewBatchLoop ¶
func NewBatchLoop(configurations ...runtime.Configuration[*ConsumerBatchedLoop]) ConsumerLoop
constructor
func NewSingleLoop ¶
func NewSingleLoop(configurations ...runtime.Configuration[*ConsumerSingleLoop]) ConsumerLoop
constructor
type ConsumerSingleLoop ¶
type ConsumerSingleLoop struct { F stateless.SingleFunction // contains filtered or unexported fields }
implementation
func (*ConsumerSingleLoop) Loop ¶
func (consumerSarama *ConsumerSingleLoop) Loop(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*ConsumerSingleLoop) Start ¶
func (consumerSarama *ConsumerSingleLoop) Start() error
func (*ConsumerSingleLoop) Stop ¶
func (consumerSarama *ConsumerSingleLoop) Stop()
Click to show internal directories.
Click to hide internal directories.