runtime_sarama

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2023 License: GPL-3.0 Imports: 12 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultConfiguration

func DefaultConfiguration() *sarama.Config

func FromConsumerMessage

func FromConsumerMessage(source *sarama.ConsumerMessage) (message.Message[message.Bytes, message.Bytes], error)

mapping sarama consumer message to internal representation

func FromConsumerMessages

func FromConsumerMessages(sources []*sarama.ConsumerMessage) ([]message.Message[message.Bytes, message.Bytes], error)

func NewProducer

func NewProducer(configurations ...runtime.Configuration[*Producer]) runtime.Producer

func ToProducerMessage

func ToProducerMessage(source message.Message[message.Bytes, message.Bytes]) (*sarama.ProducerMessage, error)

mapping internal representation into sarama producer message

func ToProducerMessages

func ToProducerMessages(sources []message.Message[message.Bytes, message.Bytes]) ([]*sarama.ProducerMessage, error)

func WithConsumerBroker

func WithConsumerBroker(brokers ...string) runtime.Configuration[*Consumer]

func WithConsumerGroupName

func WithConsumerGroupName(groupName string) runtime.Configuration[*Consumer]

func WithConsumerLoop

func WithConsumerLoop(loop ConsumerLoop) runtime.Configuration[*Consumer]

func WithConsumerRuntimeController

func WithConsumerRuntimeController(controller runtime.Controller) runtime.Configuration[*Consumer]

func WithConsumerSaramaConfig

func WithConsumerSaramaConfig(saramaConfig *sarama.Config) runtime.Configuration[*Consumer]

func WithConsumerTopic

func WithConsumerTopic(topics ...string) runtime.Configuration[*Consumer]

func WithLoopBatchFunction

func WithLoopBatchFunction(loopFunction stateless.BatchFunction) runtime.Configuration[*ConsumerBatchedLoop]

func WithLoopBatchMaxBufferred

func WithLoopBatchMaxBufferred(maxBuffered int64) runtime.Configuration[*ConsumerBatchedLoop]

configuration

func WithLoopBatchMaxDelay

func WithLoopBatchMaxDelay(maxDelay time.Duration) runtime.Configuration[*ConsumerBatchedLoop]

func WithLoopBatchPrometheus

func WithLoopBatchPrometheus() runtime.Configuration[*ConsumerBatchedLoop]

func WithLoopSingleFunction

func WithLoopSingleFunction(loopFunction stateless.SingleFunction) runtime.Configuration[*ConsumerSingleLoop]

configuration

func WithLoopSinglePrometheus

func WithLoopSinglePrometheus() runtime.Configuration[*ConsumerSingleLoop]

func WithProducerBroker

func WithProducerBroker(brokers ...string) runtime.Configuration[*Producer]

func WithProducerRuntimeController

func WithProducerRuntimeController(controller runtime.Controller) runtime.Configuration[*Producer]

func WithProducerSaramaConfig

func WithProducerSaramaConfig(saramaConfig *sarama.Config) runtime.Configuration[*Producer]

Types

type Consumer

type Consumer struct {
	// required
	Topics              []string
	Brokers             []string
	SaramaConfiguration *sarama.Config
	Loop                ConsumerLoop
	Controller          runtime.Controller

	// not required
	GroupName string

	// set in start
	Group   sarama.ConsumerGroup
	Context context.Context
	Cancel  context.CancelFunc
}

implementation

func NewConsumer

func NewConsumer(configurations ...runtime.Configuration[*Consumer]) *Consumer

constructor

func (*Consumer) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumerGroup loop of ConsumerGroupClaim's Messages().

func (*Consumer) Run

func (c *Consumer) Run()

func (*Consumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim, marks the consumer group as ready

func (*Consumer) Start

func (c *Consumer) Start() error

func (*Consumer) Stop

func (c *Consumer) Stop()

type ConsumerBatchedLoop

type ConsumerBatchedLoop struct {
	// batching configuration
	MaxBufferred int64
	MaxDelay     time.Duration
	F            stateless.BatchFunction
	// contains filtered or unexported fields
}

implementation

func (*ConsumerBatchedLoop) Loop

func (*ConsumerBatchedLoop) Start

func (consumerSarama *ConsumerBatchedLoop) Start() error

func (*ConsumerBatchedLoop) Stop

func (consumerSarama *ConsumerBatchedLoop) Stop()

type ConsumerLoop

type ConsumerLoop interface {
	Loop(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
	Start() error
	Stop()
}

func NewBatchLoop

func NewBatchLoop(configurations ...runtime.Configuration[*ConsumerBatchedLoop]) ConsumerLoop

constructor

func NewSingleLoop

func NewSingleLoop(configurations ...runtime.Configuration[*ConsumerSingleLoop]) ConsumerLoop

constructor

type ConsumerSingleLoop

type ConsumerSingleLoop struct {
	F stateless.SingleFunction
	// contains filtered or unexported fields
}

implementation

func (*ConsumerSingleLoop) Loop

func (consumerSarama *ConsumerSingleLoop) Loop(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*ConsumerSingleLoop) Start

func (consumerSarama *ConsumerSingleLoop) Start() error

func (*ConsumerSingleLoop) Stop

func (consumerSarama *ConsumerSingleLoop) Stop()

type Producer

type Producer struct {
	// required
	Brokers             []string
	SaramaConfiguration *sarama.Config
	Controller          runtime.Controller

	// set in start
	Producer sarama.SyncProducer
}

func (*Producer) Produce

func (p *Producer) Produce(c context.Context, sources []message.Message[message.Bytes, message.Bytes]) error

func (*Producer) Start

func (p *Producer) Start() error

func (*Producer) Stop

func (p *Producer) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL