kafka

package
v2.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	FirstOffset = kafgo.FirstOffset
	LastOffset  = kafgo.LastOffset
)
View Source
var (
	ErrCreatingConsumer = errors.New("error creating consumer")
	ErrCreatingProducer = errors.New("error creating producer")
)

Kafka Errors

Functions

This section is empty.

Types

type AfterFunc

type AfterFunc func(context.Context, kafgo.Message, interface{}) context.Context

AfterFunc are invoked after executing endpoint

type BeforeFunc

type BeforeFunc func(context.Context, kafgo.Message) context.Context

BeforeFunc is executed prior to invoking the endpoint. RequestFunc may take information from request recieved in the Consumer and put it in the context. For instance, if the context needs the information about the topic or the group-id, that is populated here

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is kafka Consumer

func NewConsumer

func NewConsumer(
	brokers []string,
	logger log.Logger,
	options ...ConsumerOption,
) (*Consumer, error)

NewConsumer returns kafka consumer for the given brokers

func (*Consumer) Open

func (c *Consumer) Open() error

Open actually handles the subcriber messages

type ConsumerOption

type ConsumerOption func(*Consumer)

ConsumerOption provies set of options to modify a subscriber

func WithAfterFuncsConsumerOption

func WithAfterFuncsConsumerOption(fns ...AfterFunc) ConsumerOption

WithAfterFuncsConsumerOption provides a way to set AfterFunc(s) to the consumer

func WithAutoCommitConsumerOption

func WithAutoCommitConsumerOption(flag bool) ConsumerOption

WithAutoCommitConsumerOption sets the autocommit property of consumer

func WithAutoCommitTimeConsumerOption

func WithAutoCommitTimeConsumerOption(dur time.Duration) ConsumerOption

WithAutoCommitTimeConsumerOption sets the auto commit time for Consumer

func WithBeforeFuncsConsumerOption

func WithBeforeFuncsConsumerOption(fns ...BeforeFunc) ConsumerOption

WithBeforeFuncsConsumerOption provides a way to set BeforeFunc(s) to the consumer

func WithDecoderConsumerOption

func WithDecoderConsumerOption(fn Decoder) ConsumerOption

WithDecoderConsumerOption sets the decoder for the Consumer Message

func WithEndpointConsumerOption

func WithEndpointConsumerOption(end endpoint.Endpoint) ConsumerOption

WithEndpointConsumerOption provides a way to set endpoint to the consumer

func WithErrorFuncConsumerOption

func WithErrorFuncConsumerOption(fn ErrorFunc) ConsumerOption

WithErrorFuncConsumerOption provides a callback to handle error

func WithGroupIDConsumerOption

func WithGroupIDConsumerOption(groupID string) ConsumerOption

WithGroupIDConsumerOption provides an option to modify the GroupID for a consumer Group

func WithMaxMinByteConsumerOption

func WithMaxMinByteConsumerOption(min, max int) ConsumerOption

WithMaxMinByteConsumerOption provi-des an option to modify the min/max byte that can written to kafka

func WithOffsetConsumerOption

func WithOffsetConsumerOption(offset int64) ConsumerOption

WithOffsetConsumerOption lets you set the kafka offset to read from

func WithReaderConsumerOption

func WithReaderConsumerOption(reader *kafgo.Reader) ConsumerOption

WithReaderConsumerOption lets you set the reader for kafka

func WithTopicConsumerOption

func WithTopicConsumerOption(topic string) ConsumerOption

WithTopicConsumerOption provides an option to modify the topic on which the Consumer will listen to

type Decoder

type Decoder func(context.Context, kafgo.Message) (interface{}, error)

Decoder decodes the message recieved on Kafka and converts in business logic

type Encoder

type Encoder func(context.Context, interface{}) (kafgo.Message, error)

Encoder encodes the value passed to it and converts to kafka message

type ErrorFunc

type ErrorFunc func(context.Context, kafgo.Message, error)

ErrorFunc handles the error condition

type ErrorHandler

type ErrorHandler interface{ transport.ErrorHandler }

ErrorHandler is wrapper on top of kit.transport.ErrorHandler

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer produces message on Kafka

func NewProducer

func NewProducer(
	brokers []string,
	logger log.Logger,
	options ...ProducerOption,
) (*Producer, error)

NewProducer returns a new kafka producer

func (*Producer) Endpoint

func (p *Producer) Endpoint() endpoint.Endpoint

Endpoint returns a usable endpoint

type ProducerOption

type ProducerOption func(*Producer)

ProducerOption lets you modify properties for producer

func WithAfterProducerOption

func WithAfterProducerOption(fns ...AfterFunc) ProducerOption

WithAfterProducerOption sets the after functions which are executed after the message is published on the kafka

func WithBalancerProducerOption

func WithBalancerProducerOption(bal kafgo.Balancer) ProducerOption

WithBalancerProducerOption sets the balancer for Kafka Producer

func WithBeforesProducerOption

func WithBeforesProducerOption(fns ...BeforeFunc) ProducerOption

WithBeforesProducerOption sets before functions for the producer, befores are triggered before the message is emitted on kafka

func WithEncoderProducerOption

func WithEncoderProducerOption(fn Encoder) ProducerOption

WithEncoderProducerOption encodes the message passed onto endpoint in desired format

func WithMaxAttemptsProducerOption

func WithMaxAttemptsProducerOption(attempts int) ProducerOption

WithMaxAttemptsProducerOption sets the number of tries/attempts the kafka producer will try before giving up

func WithMaxBatchBytesOption

func WithMaxBatchBytesOption(batchBytes int64) ProducerOption

WithMaxBatchBytesOption sets the maximum bytes of record size kafka producer will try to produce

func WithQueueCapacityProducerOption

func WithQueueCapacityProducerOption(qc int) ProducerOption

WithQueueCapacityProducerOption sets the internal buffer capacity used to cache incoming messages before publishing on kafka

func WithTopicProducerOption

func WithTopicProducerOption(topic string) ProducerOption

WithTopicProducerOption sets the topic for the producer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL