Documentation ¶
Index ¶
- Variables
- type AfterFunc
- type BeforeFunc
- type Consumer
- type ConsumerOption
- func WithAfterFuncsConsumerOption(fns ...AfterFunc) ConsumerOption
- func WithAutoCommitConsumerOption(flag bool) ConsumerOption
- func WithAutoCommitTimeConsumerOption(dur time.Duration) ConsumerOption
- func WithBeforeFuncsConsumerOption(fns ...BeforeFunc) ConsumerOption
- func WithDecoderConsumerOption(fn Decoder) ConsumerOption
- func WithEndpointConsumerOption(end endpoint.Endpoint) ConsumerOption
- func WithErrorFuncConsumerOption(fn ErrorFunc) ConsumerOption
- func WithGroupIDConsumerOption(groupID string) ConsumerOption
- func WithMaxMinByteConsumerOption(min, max int) ConsumerOption
- func WithOffsetConsumerOption(offset int64) ConsumerOption
- func WithReaderConsumerOption(reader *kafgo.Reader) ConsumerOption
- func WithTopicConsumerOption(topic string) ConsumerOption
- type Decoder
- type Encoder
- type ErrorFunc
- type ErrorHandler
- type Producer
- type ProducerOption
- func WithAfterProducerOption(fns ...AfterFunc) ProducerOption
- func WithBalancerProducerOption(bal kafgo.Balancer) ProducerOption
- func WithBeforesProducerOption(fns ...BeforeFunc) ProducerOption
- func WithEncoderProducerOption(fn Encoder) ProducerOption
- func WithMaxAttemptsProducerOption(attempts int) ProducerOption
- func WithMaxBatchBytesOption(batchBytes int64) ProducerOption
- func WithQueueCapacityProducerOption(qc int) ProducerOption
- func WithTopicProducerOption(topic string) ProducerOption
Constants ¶
This section is empty.
Variables ¶
var ( FirstOffset = kafgo.FirstOffset LastOffset = kafgo.LastOffset )
var ( ErrCreatingConsumer = errors.New("error creating consumer") ErrCreatingProducer = errors.New("error creating producer") )
Kafka Errors
Functions ¶
This section is empty.
Types ¶
type BeforeFunc ¶
BeforeFunc is executed prior to invoking the endpoint. RequestFunc may take information from request recieved in the Consumer and put it in the context. For instance, if the context needs the information about the topic or the group-id, that is populated here
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is kafka Consumer
func NewConsumer ¶
func NewConsumer( brokers []string, logger log.Logger, options ...ConsumerOption, ) (*Consumer, error)
NewConsumer returns kafka consumer for the given brokers
type ConsumerOption ¶
type ConsumerOption func(*Consumer)
ConsumerOption provies set of options to modify a subscriber
func WithAfterFuncsConsumerOption ¶
func WithAfterFuncsConsumerOption(fns ...AfterFunc) ConsumerOption
WithAfterFuncsConsumerOption provides a way to set AfterFunc(s) to the consumer
func WithAutoCommitConsumerOption ¶
func WithAutoCommitConsumerOption(flag bool) ConsumerOption
WithAutoCommitConsumerOption sets the autocommit property of consumer
func WithAutoCommitTimeConsumerOption ¶
func WithAutoCommitTimeConsumerOption(dur time.Duration) ConsumerOption
WithAutoCommitTimeConsumerOption sets the auto commit time for Consumer
func WithBeforeFuncsConsumerOption ¶
func WithBeforeFuncsConsumerOption(fns ...BeforeFunc) ConsumerOption
WithBeforeFuncsConsumerOption provides a way to set BeforeFunc(s) to the consumer
func WithDecoderConsumerOption ¶
func WithDecoderConsumerOption(fn Decoder) ConsumerOption
WithDecoderConsumerOption sets the decoder for the Consumer Message
func WithEndpointConsumerOption ¶
func WithEndpointConsumerOption(end endpoint.Endpoint) ConsumerOption
WithEndpointConsumerOption provides a way to set endpoint to the consumer
func WithErrorFuncConsumerOption ¶
func WithErrorFuncConsumerOption(fn ErrorFunc) ConsumerOption
WithErrorFuncConsumerOption provides a callback to handle error
func WithGroupIDConsumerOption ¶
func WithGroupIDConsumerOption(groupID string) ConsumerOption
WithGroupIDConsumerOption provides an option to modify the GroupID for a consumer Group
func WithMaxMinByteConsumerOption ¶
func WithMaxMinByteConsumerOption(min, max int) ConsumerOption
WithMaxMinByteConsumerOption provi-des an option to modify the min/max byte that can written to kafka
func WithOffsetConsumerOption ¶
func WithOffsetConsumerOption(offset int64) ConsumerOption
WithOffsetConsumerOption lets you set the kafka offset to read from
func WithReaderConsumerOption ¶
func WithReaderConsumerOption(reader *kafgo.Reader) ConsumerOption
WithReaderConsumerOption lets you set the reader for kafka
func WithTopicConsumerOption ¶
func WithTopicConsumerOption(topic string) ConsumerOption
WithTopicConsumerOption provides an option to modify the topic on which the Consumer will listen to
type ErrorHandler ¶
type ErrorHandler interface{ transport.ErrorHandler }
ErrorHandler is wrapper on top of kit.transport.ErrorHandler
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer produces message on Kafka
func NewProducer ¶
func NewProducer( brokers []string, logger log.Logger, options ...ProducerOption, ) (*Producer, error)
NewProducer returns a new kafka producer
type ProducerOption ¶
type ProducerOption func(*Producer)
ProducerOption lets you modify properties for producer
func WithAfterProducerOption ¶
func WithAfterProducerOption(fns ...AfterFunc) ProducerOption
WithAfterProducerOption sets the after functions which are executed after the message is published on the kafka
func WithBalancerProducerOption ¶
func WithBalancerProducerOption(bal kafgo.Balancer) ProducerOption
WithBalancerProducerOption sets the balancer for Kafka Producer
func WithBeforesProducerOption ¶
func WithBeforesProducerOption(fns ...BeforeFunc) ProducerOption
WithBeforesProducerOption sets before functions for the producer, befores are triggered before the message is emitted on kafka
func WithEncoderProducerOption ¶
func WithEncoderProducerOption(fn Encoder) ProducerOption
WithEncoderProducerOption encodes the message passed onto endpoint in desired format
func WithMaxAttemptsProducerOption ¶
func WithMaxAttemptsProducerOption(attempts int) ProducerOption
WithMaxAttemptsProducerOption sets the number of tries/attempts the kafka producer will try before giving up
func WithMaxBatchBytesOption ¶
func WithMaxBatchBytesOption(batchBytes int64) ProducerOption
WithMaxBatchBytesOption sets the maximum bytes of record size kafka producer will try to produce
func WithQueueCapacityProducerOption ¶
func WithQueueCapacityProducerOption(qc int) ProducerOption
WithQueueCapacityProducerOption sets the internal buffer capacity used to cache incoming messages before publishing on kafka
func WithTopicProducerOption ¶
func WithTopicProducerOption(topic string) ProducerOption
WithTopicProducerOption sets the topic for the producer