Documentation ¶
Index ¶
Constants ¶
const LagTechnicalSummary string = "Messages awaiting handling exceed the configured lag tolerance. Check if Kafka consumer is stuck."
LagTechnicalSummary is used as technical summary in consumer monitoring healthchecks.
Variables ¶
var ( ErrConsumerNotConnected = fmt.Errorf("consumer is not connected to Kafka") ErrMonitorNotConnected = fmt.Errorf("consumer monitor is not connected to Kafka") )
var ErrProducerNotConnected = fmt.Errorf("producer is not connected to Kafka")
Functions ¶
func DefaultConsumerOptions ¶
DefaultConsumerOptions returns a new sarama configuration with predefined default settings.
func DefaultProducerOptions ¶
DefaultProducerOptions creates a new Sarama producer configuration with default values.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer which will keep trying to reconnect to Kafka on a specified interval. The underlying consumer group is created lazily when message listening is started.
func NewConsumer ¶
func NewConsumer(config ConsumerConfig, topics []*Topic, log *logger.UPPLogger) *Consumer
func (*Consumer) Close ¶
Close closes the consumer connection to Kafka if the consumer is connected.
func (*Consumer) ConnectivityCheck ¶
ConnectivityCheck checks whether a connection to Kafka can be established.
func (*Consumer) MonitorCheck ¶
MonitorCheck checks whether the consumer group is lagging behind when reading messages.
func (*Consumer) Start ¶
Start will attempt to establish consumer group and consumer monitor connections until successful. Once those are established, message consumption and consumer monitoring processes are started.
Each message will be handled using the provided handler.
The consumer monitoring process is using a separate Kafka connection and will:
- Request the offsets for a topic and the respective claimed partitions on a given time interval from the Kafka broker;
- Deduce the message lag by subtracting the last committed consumer group offset from the next topic offset;
- Store the partition lag if such is present;
- Report a status error on MonitorCheck() calls.
Close() calls will terminate both the message consumption and the consumer monitoring processes.
type ConsumerConfig ¶
type ConsumerConfig struct { BrokersConnectionString string ConsumerGroup string // Time interval between each connection attempt. // Only used for subsequent attempts if the initial one fails. // Default value (1 minute) would be used if not set or exceeds 5 minutes. ConnectionRetryInterval time.Duration // Time interval between each offset fetching request. // Default value (3 minutes) would be used if not set or exceeds 10 minutes. OffsetFetchInterval time.Duration // Total count of offset fetching request failures until consumer status is marked as unknown. // Default value (5) would be used if not set or exceeds 10. // Note: A single failure will result in follow-up requests to be sent on // shorter interval than the value of OffsetFetchInterval until successful. OffsetFetchMaxFailureCount int // Whether to disable the automatic reset of the sarama.ClusterAdmin // monitoring connection upon exceeding the OffsetFetchMaxFailureCount threshold. // Default value is false. // Note: Resetting the connection is necessary for the current version of Sarama (1.33.0) // due to numerous issues originating from the library. This flag is currently only used in tests. DisableMonitoringConnectionReset bool Options *sarama.Config }
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer which will keep trying to reconnect to Kafka on a specified interval. The underlying producer is created in a separate go-routine when the Producer is initialized.
func NewProducer ¶
func NewProducer(config ProducerConfig, logger *logger.UPPLogger) *Producer
func (*Producer) ConnectivityCheck ¶
ConnectivityCheck checks whether a connection to Kafka can be established.
func (*Producer) SendMessage ¶
SendMessage checks if the producer is connected, then sends a message to Kafka.
type ProducerConfig ¶
type ProducerConfig struct { BrokersConnectionString string Topic string // Time interval between each connection attempt. // Only used for subsequent attempts if the initial one fails. // Default value (1 minute) would be used if not set or exceeds 5 minutes. ConnectionRetryInterval time.Duration Options *sarama.Config }
type Topic ¶
type Topic struct { Name string // contains filtered or unexported fields }
func NewTopic ¶
func NewTopic(name string, opts ...TopicOption) *Topic
type TopicOption ¶
type TopicOption func(topic *Topic)
func WithLagTolerance ¶
func WithLagTolerance(tolerance int64) TopicOption
WithLagTolerance sets custom lag tolerance threshold used for monitoring. Consumer lagging behind with more messages than the configured tolerance will be reported as unhealthy. Default is 500 messages.
func WithReplica ¶
func WithReplica() TopicOption
WithReplica indicates that the topic is being replicated from another Kafka cluster in a new topic called "Topic.Name + Replica". Default is false.