runtime_sarama

package
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2024 License: GPL-3.0 Imports: 15 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConsumerIsNil                    = errors.New("consumer is nil")
	ErrConsumerLoopIsNil                = errors.New("consumer loop is nil")
	ErrConsumerSaramaConfigurationIsNil = errors.New("consumer sarama configuration is nil")
	ErrConsumerTopicsEmpty              = errors.New("consumer topics are empty")
	ErrConsumerBrokersEmpty             = errors.New("consumer brokers are empty")
)

Errors

Functions

func DefaultConfiguration

func DefaultConfiguration() *sarama.Config

func FromConsumerMessage

func FromConsumerMessage(source *sarama.ConsumerMessage) (flow.Message[structure.Bytes, structure.Bytes], error)

mapping sarama consumer message to internal representation

func FromConsumerMessages

func FromConsumerMessages(sources []*sarama.ConsumerMessage) ([]flow.Message[structure.Bytes, structure.Bytes], error)

func NewConsumer

func NewConsumer(configurations ...runtime.Configuration[*Consumer]) runtime.Runtime

constructor

func NewProducer

func NewProducer(configurations ...runtime.Configuration[*Producer]) flow.Producer

func ToProducerMessage

func ToProducerMessage(source flow.Message[structure.Bytes, structure.Bytes]) (*sarama.ProducerMessage, error)

mapping internal representation into sarama producer message

func ToProducerMessages

func ToProducerMessages(sources []flow.Message[structure.Bytes, structure.Bytes]) ([]*sarama.ProducerMessage, error)

func WithConsumerBroker

func WithConsumerBroker(brokers ...string) runtime.Configuration[*Consumer]

func WithConsumerGroupName

func WithConsumerGroupName(groupName string) runtime.Configuration[*Consumer]

func WithConsumerLoop

func WithConsumerLoop(loop ConsumerHandler) runtime.Configuration[*Consumer]

func WithConsumerSaramaConfigModifier added in v0.0.15

func WithConsumerSaramaConfigModifier(modifier SaramaConfigModifier) runtime.Configuration[*Consumer]

func WithConsumerTopic

func WithConsumerTopic(topics ...string) runtime.Configuration[*Consumer]

func WithKeyedHandlerFunction added in v0.1.0

func WithKeyedHandlerFunction(loopFunction stateless.BatchFunction) runtime.Configuration[*KeyedHandler]

func WithKeyedHandlerKeyFunction added in v0.1.0

func WithKeyedHandlerKeyFunction(keyFunction stateful.PersistenceIdFunction[structure.Bytes, structure.Bytes]) runtime.Configuration[*KeyedHandler]

func WithKeyedHandlerMaxBufferred added in v0.1.0

func WithKeyedHandlerMaxBufferred(maxBuffered int64) runtime.Configuration[*KeyedHandler]

configuration

func WithKeyedHandlerMaxDelay added in v0.1.0

func WithKeyedHandlerMaxDelay(maxDelay time.Duration) runtime.Configuration[*KeyedHandler]

func WithKeyedHandlerMaxPerKey added in v0.1.3

func WithKeyedHandlerMaxPerKey(maxPerKey int64) runtime.Configuration[*KeyedHandler]

func WithKeyedHandlerPrometheus added in v0.1.0

func WithKeyedHandlerPrometheus() runtime.Configuration[*KeyedHandler]

func WithProducerBroker

func WithProducerBroker(brokers ...string) runtime.Configuration[*Producer]

func WithProducerSaramaConfigModifier added in v0.0.15

func WithProducerSaramaConfigModifier(modifier SaramaConfigModifier) runtime.Configuration[*Producer]

Types

type Consumer

type Consumer struct {
	// required
	Topics              []string
	Brokers             []string
	SaramaConfiguration *sarama.Config
	Handler             ConsumerHandler

	// not required
	GroupName string

	// set in start
	Group sarama.ConsumerGroup
}

func (*Consumer) Loop

func (r *Consumer) Loop(ctx context.Context, cancel context.CancelFunc) error

func (*Consumer) Start

func (r *Consumer) Start() error

func (*Consumer) Stop

func (r *Consumer) Stop()

type ConsumerHandler added in v0.1.1

type ConsumerHandler interface {
	ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
	Setup(sarama.ConsumerGroupSession) error
	Cleanup(sarama.ConsumerGroupSession) error
	Start() error
	Stop()
}

func NewKeyedHandler added in v0.1.0

func NewKeyedHandler(configurations ...runtime.Configuration[*KeyedHandler]) ConsumerHandler

constructor

type KeyedHandler added in v0.1.0

type KeyedHandler struct {
	// batching configuration
	MaxBufferred int64
	MaxDelay     time.Duration
	MaxPerKey    int64
	F            stateless.BatchFunction
	K            stateful.PersistenceIdFunction[structure.Bytes, structure.Bytes]
	// contains filtered or unexported fields
}

implementation

func (*KeyedHandler) Cleanup added in v0.1.0

func (*KeyedHandler) ConsumeClaim added in v0.1.0

func (h *KeyedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*KeyedHandler) ConsumeClaimIteration added in v0.1.0

func (h *KeyedHandler) ConsumeClaimIteration(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, khs *keyedHandlerState) (bool, error)

func (*KeyedHandler) Setup added in v0.1.0

func (*KeyedHandler) Start added in v0.1.0

func (kh *KeyedHandler) Start() error

func (*KeyedHandler) Stop added in v0.1.0

func (kh *KeyedHandler) Stop()

type Producer

type Producer struct {
	// required
	Brokers             []string
	SaramaConfiguration *sarama.Config

	// set in start
	Producer sarama.SyncProducer
}

func (*Producer) Produce

func (p *Producer) Produce(c context.Context, sources []flow.Message[structure.Bytes, structure.Bytes]) error

func (*Producer) Start

func (p *Producer) Start() error

func (*Producer) Stop

func (p *Producer) Stop()

type SaramaConfigModifier added in v0.0.15

type SaramaConfigModifier func(*sarama.Config)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL