Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultConsumerOptions ¶
DefaultConsumerOptions returns a new sarama configuration with predefined default settings.
func DefaultProducerOptions ¶
DefaultProducerOptions creates a new Sarama producer configuration with default values.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer which will keep trying to reconnect to Kafka on a specified interval. The underlying consumer group is created lazily when message listening is started.
func NewConsumer ¶
func NewConsumer(config ConsumerConfig, topics []*Topic, log *logger.UPPLogger, connectionRetryInterval time.Duration) *Consumer
func (*Consumer) Close ¶
Close closes the consumer connection to Kafka if the consumer is connected.
func (*Consumer) ConnectivityCheck ¶
ConnectivityCheck checks whether a connection to Kafka can be established.
func (*Consumer) MonitorCheck ¶
MonitorCheck checks whether the consumer group is lagging behind when reading messages.
func (*Consumer) StartListening ¶
StartListening is a blocking call that tries to establish a connection to Kafka and then starts listening for messages.
type ConsumerConfig ¶
type ConsumerConfig struct { BrokersConnectionString string ConsumerGroup string // Time interval between each offset fetching request. Default is 3 minutes. OffsetFetchInterval time.Duration // Total count of offset fetching request failures until consumer status is marked as unknown. // Default is 5. Note: A single failure will result in follow-up requests to be sent on // shorter interval than the value of OffsetFetchInterval until successful. OffsetFetchMaxFailureCount int Options *sarama.Config }
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer which will keep trying to reconnect to Kafka on a specified interval. The underlying producer is created in a separate go-routine when the Producer is initialized.
func NewProducer ¶
func NewProducer(config ProducerConfig, logger *logger.UPPLogger, initialDelay, retryInterval time.Duration) *Producer
func (*Producer) ConnectivityCheck ¶
ConnectivityCheck checks whether a connection to Kafka can be established.
func (*Producer) SendMessage ¶
SendMessage checks if the producer is connected, then sends a message to Kafka.
type ProducerConfig ¶
type Topic ¶
type Topic struct { Name string // contains filtered or unexported fields }
func NewTopic ¶
func NewTopic(name string, opts ...TopicOption) *Topic
type TopicOption ¶
type TopicOption func(topic *Topic)
func WithLagTolerance ¶
func WithLagTolerance(tolerance int64) TopicOption
WithLagTolerance sets custom lag tolerance threshold used for monitoring. Consumer lagging behind with more messages than the configured tolerance will be reported as unhealthy. Default is 500 messages.