Documentation ¶
Index ¶
- Variables
- func DefaultConfiguration() *sarama.Config
- func FromConsumerMessage(source *sarama.ConsumerMessage) (flow.Message[structure.Bytes, structure.Bytes], error)
- func FromConsumerMessages(sources []*sarama.ConsumerMessage) ([]flow.Message[structure.Bytes, structure.Bytes], error)
- func NewConsumer(configurations ...runtime.Configuration[*Consumer]) runtime.Runtime
- func NewProducer(configurations ...runtime.Configuration[*Producer]) flow.Producer
- func ToProducerMessage(source flow.Message[structure.Bytes, structure.Bytes]) (*sarama.ProducerMessage, error)
- func ToProducerMessages(sources []flow.Message[structure.Bytes, structure.Bytes]) ([]*sarama.ProducerMessage, error)
- func WithConsumerBroker(brokers ...string) runtime.Configuration[*Consumer]
- func WithConsumerGroupName(groupName string) runtime.Configuration[*Consumer]
- func WithConsumerLoop(loop ConsumerHandler) runtime.Configuration[*Consumer]
- func WithConsumerSaramaConfigModifier(modifier SaramaConfigModifier) runtime.Configuration[*Consumer]
- func WithConsumerTopic(topics ...string) runtime.Configuration[*Consumer]
- func WithKeyedHandlerFunction(loopFunction stateless.BatchFunction) runtime.Configuration[*KeyedHandler]
- func WithKeyedHandlerKeyFunction(keyFunction stateful.PersistenceIdFunction[structure.Bytes, structure.Bytes]) runtime.Configuration[*KeyedHandler]
- func WithKeyedHandlerMaxBufferred(maxBuffered int64) runtime.Configuration[*KeyedHandler]
- func WithKeyedHandlerMaxDelay(maxDelay time.Duration) runtime.Configuration[*KeyedHandler]
- func WithKeyedHandlerMaxPerKey(maxPerKey int64) runtime.Configuration[*KeyedHandler]
- func WithKeyedHandlerPrometheus() runtime.Configuration[*KeyedHandler]
- func WithProducerBroker(brokers ...string) runtime.Configuration[*Producer]
- func WithProducerSaramaConfigModifier(modifier SaramaConfigModifier) runtime.Configuration[*Producer]
- type Consumer
- type ConsumerHandler
- type KeyedHandler
- func (kh *KeyedHandler) Cleanup(sarama.ConsumerGroupSession) error
- func (h *KeyedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (h *KeyedHandler) ConsumeClaimIteration(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, ...) (bool, error)
- func (kh *KeyedHandler) Setup(sarama.ConsumerGroupSession) error
- func (kh *KeyedHandler) Start() error
- func (kh *KeyedHandler) Stop()
- type Producer
- type SaramaConfigModifier
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrConsumerIsNil = errors.New("consumer is nil") ErrConsumerLoopIsNil = errors.New("consumer loop is nil") ErrConsumerSaramaConfigurationIsNil = errors.New("consumer sarama configuration is nil") ErrConsumerTopicsEmpty = errors.New("consumer topics are empty") ErrConsumerBrokersEmpty = errors.New("consumer brokers are empty") )
Errors
Functions ¶
func DefaultConfiguration ¶
func FromConsumerMessage ¶
func FromConsumerMessage(source *sarama.ConsumerMessage) (flow.Message[structure.Bytes, structure.Bytes], error)
mapping sarama consumer message to internal representation
func FromConsumerMessages ¶
func NewConsumer ¶
func NewConsumer(configurations ...runtime.Configuration[*Consumer]) runtime.Runtime
constructor
func NewProducer ¶
func NewProducer(configurations ...runtime.Configuration[*Producer]) flow.Producer
func ToProducerMessage ¶
func ToProducerMessage(source flow.Message[structure.Bytes, structure.Bytes]) (*sarama.ProducerMessage, error)
mapping internal representation into sarama producer message
func ToProducerMessages ¶
func WithConsumerBroker ¶
func WithConsumerBroker(brokers ...string) runtime.Configuration[*Consumer]
func WithConsumerGroupName ¶
func WithConsumerGroupName(groupName string) runtime.Configuration[*Consumer]
func WithConsumerLoop ¶
func WithConsumerLoop(loop ConsumerHandler) runtime.Configuration[*Consumer]
func WithConsumerSaramaConfigModifier ¶ added in v0.0.15
func WithConsumerSaramaConfigModifier(modifier SaramaConfigModifier) runtime.Configuration[*Consumer]
func WithConsumerTopic ¶
func WithConsumerTopic(topics ...string) runtime.Configuration[*Consumer]
func WithKeyedHandlerFunction ¶ added in v0.1.0
func WithKeyedHandlerFunction(loopFunction stateless.BatchFunction) runtime.Configuration[*KeyedHandler]
func WithKeyedHandlerKeyFunction ¶ added in v0.1.0
func WithKeyedHandlerKeyFunction(keyFunction stateful.PersistenceIdFunction[structure.Bytes, structure.Bytes]) runtime.Configuration[*KeyedHandler]
func WithKeyedHandlerMaxBufferred ¶ added in v0.1.0
func WithKeyedHandlerMaxBufferred(maxBuffered int64) runtime.Configuration[*KeyedHandler]
configuration
func WithKeyedHandlerMaxDelay ¶ added in v0.1.0
func WithKeyedHandlerMaxDelay(maxDelay time.Duration) runtime.Configuration[*KeyedHandler]
func WithKeyedHandlerMaxPerKey ¶ added in v0.1.3
func WithKeyedHandlerMaxPerKey(maxPerKey int64) runtime.Configuration[*KeyedHandler]
func WithKeyedHandlerPrometheus ¶ added in v0.1.0
func WithKeyedHandlerPrometheus() runtime.Configuration[*KeyedHandler]
func WithProducerBroker ¶
func WithProducerBroker(brokers ...string) runtime.Configuration[*Producer]
func WithProducerSaramaConfigModifier ¶ added in v0.0.15
func WithProducerSaramaConfigModifier(modifier SaramaConfigModifier) runtime.Configuration[*Producer]
Types ¶
type Consumer ¶
type Consumer struct { // required Topics []string Brokers []string SaramaConfiguration *sarama.Config Handler ConsumerHandler // not required GroupName string // set in start Group sarama.ConsumerGroup }
type ConsumerHandler ¶ added in v0.1.1
type ConsumerHandler interface { ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error Setup(sarama.ConsumerGroupSession) error Cleanup(sarama.ConsumerGroupSession) error Start() error Stop() }
func NewKeyedHandler ¶ added in v0.1.0
func NewKeyedHandler(configurations ...runtime.Configuration[*KeyedHandler]) ConsumerHandler
constructor
type KeyedHandler ¶ added in v0.1.0
type KeyedHandler struct { // batching configuration MaxBufferred int64 MaxDelay time.Duration MaxPerKey int64 F stateless.BatchFunction K stateful.PersistenceIdFunction[structure.Bytes, structure.Bytes] // contains filtered or unexported fields }
implementation
func (*KeyedHandler) Cleanup ¶ added in v0.1.0
func (kh *KeyedHandler) Cleanup(sarama.ConsumerGroupSession) error
func (*KeyedHandler) ConsumeClaim ¶ added in v0.1.0
func (h *KeyedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*KeyedHandler) ConsumeClaimIteration ¶ added in v0.1.0
func (h *KeyedHandler) ConsumeClaimIteration(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, khs *keyedHandlerState) (bool, error)
func (*KeyedHandler) Setup ¶ added in v0.1.0
func (kh *KeyedHandler) Setup(sarama.ConsumerGroupSession) error
func (*KeyedHandler) Start ¶ added in v0.1.0
func (kh *KeyedHandler) Start() error
func (*KeyedHandler) Stop ¶ added in v0.1.0
func (kh *KeyedHandler) Stop()
type Producer ¶
type Producer struct { // required Brokers []string SaramaConfiguration *sarama.Config // set in start Producer sarama.SyncProducer }
type SaramaConfigModifier ¶ added in v0.0.15
Click to show internal directories.
Click to hide internal directories.