Documentation ¶
Index ¶
Constants ¶
const ( // OffsetOldest uses sequence number of oldest known message as the current offset OffsetOldest = sarama.OffsetOldest // OffsetNewest option uses sequence number of newest message as the current offset OffsetNewest = sarama.OffsetNewest )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer interface { // Name returns the name of this consumer group. Name() string // Topics returns the names of the topics being consumed. Topics() ConsumerTopicList // Start starts the consumer Start() error // Stop stops the consumer Stop() // Closed returns a channel which will be closed after this consumer is completely shutdown Closed() <-chan struct{} // Messages return the message channel for this consumer Messages() <-chan Message // MergeDLQ consumes the offset ranges for the partitions from the DLQ topic for the specified ConsumerTopic // Topic should be the __dlq topic name. MergeDLQ(cluster, group, topic string, partition int32, offsetRange OffsetRange) error }
Consumer is the interface for a kafka consumer
type ConsumerConfig ¶
type ConsumerConfig struct { // GroupName identifies your consumer group. Unless your application creates // multiple consumer groups (in which case it's suggested to have application name as // prefix of the group name), this should match your application name. GroupName string // TopicList is a list of consumer topics TopicList ConsumerTopicList // OffsetConfig is the offset-handling policy for this consumer group. Offsets struct { // Initial specifies the fallback offset configuration on consumer start. // The consumer will use the offsets persisted from its last run unless \ // the offsets are too old or too new. Initial struct { // Offset is the initial offset to use if there is no previous offset // committed. Use OffsetNewest for high watermark and OffsetOldest for // low watermark. Defaults to OffsetOldest. Offset int64 } // Commits a policy for committing consumer offsets to Kafka. Commits struct { // Enabled if you want the library to commit offsets on your behalf. // Defaults to true. // // The retry and dlq topic commit will always be committed for you since those topics are abstracted away from you. Enabled bool } } // Concurrency determines the number of concurrent messages to process. // When using the handler based API, this corresponds to the number of concurrent go // routines handler functions the library will run. Default is 1. Concurrency int // TLSConfig is the configuration to use for secure connections if // enabled (not nil) (defaults to disabled, nil). TLSConfig *tls.Config }
ConsumerConfig describes the config for a consumer group
func NewConsumerConfig ¶
func NewConsumerConfig(groupName string, topicList ConsumerTopicList) *ConsumerConfig
NewConsumerConfig returns ConsumerConfig with sane defaults.
func (ConsumerConfig) MarshalLogObject ¶
func (c ConsumerConfig) MarshalLogObject(e zapcore.ObjectEncoder) error
MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.
type ConsumerTopic ¶
type ConsumerTopic struct { Topic RetryQ Topic DLQ Topic MaxRetries int64 // MaxRetries = -1 for infinite retries. }
ConsumerTopic contains information for a consumer topic.
func (ConsumerTopic) DLQEnabled ¶
func (c ConsumerTopic) DLQEnabled() bool
DLQEnabled returns true if DLQ.Name and DLQ.Cluster are not empty.
func (ConsumerTopic) MarshalLogObject ¶
func (c ConsumerTopic) MarshalLogObject(e zapcore.ObjectEncoder) error
MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.
type ConsumerTopicList ¶
type ConsumerTopicList []ConsumerTopic
ConsumerTopicList is a list of consumer topics
func (ConsumerTopicList) GetConsumerTopicByClusterTopic ¶
func (c ConsumerTopicList) GetConsumerTopicByClusterTopic(clusterName, topicName string) (ConsumerTopic, error)
GetConsumerTopicByClusterTopic returns the ConsumerTopic for the cluster, topic pair.
func (ConsumerTopicList) MarshalLogArray ¶
func (c ConsumerTopicList) MarshalLogArray(e zapcore.ArrayEncoder) error
MarshalLogArray implements zapcore.ArrayMarshaler for structured logging.
func (ConsumerTopicList) TopicNames ¶
func (c ConsumerTopicList) TopicNames() []string
TopicNames returns the list of topics to consume as a string array.
type Message ¶
type Message interface { zapcore.ObjectMarshaler // Key is a mutable reference to the message's key. Key() []byte // Value is a mutable reference to the message's value. Value() []byte // Topic is the topic from which the message was read. Topic() string // Partition is the ID of the partition from which the message was read. Partition() int32 // Offset is the message's offset. Offset() int64 // Timestamp returns the timestamp for this message Timestamp() time.Time // RetryCount is an incrementing integer denoting the number of times this message has been redelivered. // The first delivery of the message will be 0, incrementing on each subsequent redelivery. RetryCount() int64 // Ack marks the message as successfully processed. Ack() error // Nack marks the message processing as failed and the message will be retried or sent to DLQ. Nack() error // NackToDLQ marks the message processing as failed and sends it immediately to DLQ. NackToDLQ() error }
Message is the interface for a Kafka message
type NameResolver ¶
type NameResolver interface { // ResolveCluster returns a list of IP addresses for the brokers ResolveIPForCluster(cluster string) ([]string, error) // ResolveClusterForTopic returns the logical cluster names corresponding to a topic name // // It is possible for a topic to exist on multiple clusters in order to // transparently handle topic migration between clusters. // TODO (gteo): Remove to simplify API because not needed anymore ResolveClusterForTopic(topic string) ([]string, error) }
NameResolver is an interface that will be used by the consumer library to resolve (1) topic to cluster name and (2) cluster name to broker IP addresses. Implementations of KafkaNameResolver should be threadsafe.
func NewStaticNameResolver ¶
func NewStaticNameResolver( topicsToCluster map[string][]string, clusterToBrokers map[string][]string, ) NameResolver
NewStaticNameResolver returns a instance of NameResolver that relies on a static map of topic to list of brokers and map of topics to cluster
type OffsetRange ¶
type OffsetRange struct { // LowOffset is the low watermark for this offset range. // -1 indicates the value is not set. LowOffset int64 // HighOffset is the high watermark for this offset range. // -1 indicates the value is not set. HighOffset int64 }
OffsetRange is a range of offsets
func NewOffsetRange ¶
func NewOffsetRange(low int64, high ...int64) OffsetRange
NewOffsetRange returns a new OffsetRange with the LowOffset of the range as specified. First variadic argument is used to set the HighOffset and all other variadic arguments are ignored. If no variadic arguments are provided, HighOffset is set to -1 to indicate that it is not set.
func (OffsetRange) MarshalLogObject ¶
func (o OffsetRange) MarshalLogObject(e zapcore.ObjectEncoder) error
MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.
type Topic ¶
type Topic struct { // Name for the topic Name string // Cluster is the logical name of the cluster to find this topic on. Cluster string // Delay is msg consumption delay applied on the topic. Delay time.Duration }
Topic contains information for a topic. Our topics are uniquely defined by a Topic Name and Cluster pair.
func (Topic) MarshalLogObject ¶
func (t Topic) MarshalLogObject(e zapcore.ObjectEncoder) error
MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.