Documentation ¶
Overview ¶
Package consumer is a generated protocol buffer package.
It is generated from these files:
dlqMetadata.proto
It has these top-level messages:
DLQMetadata
Index ¶
- func NewSaramaClient(brokers []string, config *sarama.Config) (sarama.Client, error)
- func NewSaramaProducer(client sarama.Client) (sarama.AsyncProducer, error)
- type ClusterConsumer
- type ClusterGroup
- type Constructors
- type DLQ
- type DLQMetadata
- func (*DLQMetadata) Descriptor() ([]byte, []int)
- func (m *DLQMetadata) GetData() []byte
- func (m *DLQMetadata) GetOffset() int64
- func (m *DLQMetadata) GetPartition() int32
- func (m *DLQMetadata) GetRetryCount() int64
- func (m *DLQMetadata) GetTimestampNs() int64
- func (m *DLQMetadata) GetTopic() string
- func (*DLQMetadata) ProtoMessage()
- func (m *DLQMetadata) Reset()
- func (m *DLQMetadata) String() string
- type DLQMetadataDecoder
- type ErrorQType
- type Message
- func (m *Message) Ack() error
- func (m *Message) Key() (key []byte)
- func (m *Message) MarshalLogObject(e zapcore.ObjectEncoder) error
- func (m *Message) Nack() error
- func (m *Message) NackToDLQ() error
- func (m *Message) Offset() int64
- func (m *Message) Partition() int32
- func (m *Message) RetryCount() int64
- func (m *Message) Timestamp() time.Time
- func (m *Message) Topic() string
- func (m *Message) Value() []byte
- type MultiClusterConsumer
- func (c *MultiClusterConsumer) Closed() <-chan struct{}
- func (c *MultiClusterConsumer) MergeDLQ(cluster, group, topic string, partition int32, offsetRange kafka.OffsetRange) error
- func (c *MultiClusterConsumer) Messages() <-chan kafka.Message
- func (c *MultiClusterConsumer) Name() string
- func (c *MultiClusterConsumer) ResetOffset(cluster, group, topic string, partition int32, offsetRange kafka.OffsetRange) error
- func (c *MultiClusterConsumer) Start() error
- func (c *MultiClusterConsumer) Stop()
- func (c *MultiClusterConsumer) Topics() kafka.ConsumerTopicList
- type Options
- type PartitionConsumer
- func NewPartitionConsumer(topic Topic, sarama SaramaConsumer, pConsumer cluster.PartitionConsumer, ...) PartitionConsumer
- func NewPartitionConsumerWithoutCommit(topic Topic, sarama SaramaConsumer, pConsumer cluster.PartitionConsumer, ...) PartitionConsumer
- func NewRangePartitionConsumer(topic Topic, sarama SaramaConsumer, pConsumer cluster.PartitionConsumer, ...) PartitionConsumer
- type PartitionConsumerFactory
- type SaramaConsumer
- type Topic
- type TopicConsumer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewSaramaClient ¶
NewSaramaClient returns an internal sarama Client, which can be safely closed multiple times.
func NewSaramaProducer ¶
func NewSaramaProducer(client sarama.Client) (sarama.AsyncProducer, error)
NewSaramaProducer returns a new AsyncProducer that has Close method that can be called multiple times.
Types ¶
type ClusterConsumer ¶
type ClusterConsumer struct {
// contains filtered or unexported fields
}
ClusterConsumer is a consumer for a single kafka cluster.
func NewClusterConsumer ¶
func NewClusterConsumer( cluster string, saramaConsumer SaramaConsumer, consumerMap map[string]*TopicConsumer, scope tally.Scope, logger *zap.Logger, ) *ClusterConsumer
NewClusterConsumer returns a new single cluster consumer.
func (*ClusterConsumer) Closed ¶
func (c *ClusterConsumer) Closed() <-chan struct{}
Closed returns a channel which will closed after this consumer is shutdown
func (*ClusterConsumer) ResetOffset ¶
func (c *ClusterConsumer) ResetOffset(topic string, partition int32, offsetRange kafka.OffsetRange) error
ResetOffset will reset the consumer offset for the specified topic, partition.
type ClusterGroup ¶ added in v0.2.2
type Constructors ¶
type Constructors struct { NewSaramaProducer func(sarama.Client) (sarama.AsyncProducer, error) NewSaramaConsumer func([]string, string, []string, *cluster.Config) (SaramaConsumer, error) NewSaramaClient func([]string, *sarama.Config) (sarama.Client, error) }
Constructors wraps multiple Sarama Constructors, which can be used for tests.
type DLQ ¶
type DLQ interface { // Start the DLQ producer Start() error // Stop the DLQ producer and close resources it holds. Stop() // Add adds the given message to DLQ. // This is a synchronous call and will block until sending is successful. Add(m kafka.Message, qTypes ...ErrorQType) error }
DLQ is the interface for implementations that can take a message and put them into some sort of error queue for later processing
func NewBufferedDLQ ¶
func NewBufferedDLQ(topic kafka.Topic, producer sarama.AsyncProducer, scope tally.Scope, logger *zap.Logger) DLQ
NewBufferedDLQ returns a DLQ that is backed by a buffered async sarama producer.
func NewRetryDLQMultiplexer ¶
NewRetryDLQMultiplexer returns a DLQ that will produce messages to retryTopic or dlqTopic depending on the threshold.
Messages that are added to this DLQ will be sent to retryTopic if the retry count of the message is < the threshold. Else, it will go to the dlqTopic.
type DLQMetadata ¶
type DLQMetadata struct { // retry_count is an incrementing value denoting the number // of times a message has been redelivered. // It will be 0 on first delivery. RetryCount int64 `protobuf:"varint,1,opt,name=retry_count,json=retryCount" json:"retry_count,omitempty"` // topic is the original kafka topic the mesasge was received on. // This is analogous to the logical topic name. Topic string `protobuf:"bytes,2,opt,name=topic" json:"topic,omitempty"` // partition is the original kafka partition the message was received on. Partition int32 `protobuf:"varint,3,opt,name=partition" json:"partition,omitempty"` // offset is the record offset of the original message in the original topic-partition. Offset int64 `protobuf:"varint,4,opt,name=offset" json:"offset,omitempty"` // timestamp_ns is the original record timestamp of the original mesage. TimestampNs int64 `protobuf:"varint,5,opt,name=timestamp_ns,json=timestampNs" json:"timestamp_ns,omitempty"` // data is a byte buffer for storing arbitrary information. // This is useful if the kafka Broker version used is < 0.11 // and hence kafka native record headers (KAFKA-4208) are unavaiable // so the DLQ metadata must be stored in the record Key or Value. Data []byte `protobuf:"bytes,6,opt,name=data,proto3" json:"data,omitempty"` }
DLQMetadata contains metadata from the original kafka message. The metadata will be encoded and decoded when sending or receiving messages from the DLQ cluster in order to present the library user a seamless logical topic.
func NoopDLQMetadataDecoder ¶ added in v0.1.4
func NoopDLQMetadataDecoder(b []byte) (*DLQMetadata, error)
NoopDLQMetadataDecoder does no decoding and returns a default DLQMetadata object.
func ProtobufDLQMetadataDecoder ¶ added in v0.1.4
func ProtobufDLQMetadataDecoder(b []byte) (*DLQMetadata, error)
ProtobufDLQMetadataDecoder uses proto.Unmarshal to decode protobuf encoded binary into the DLQMetadata object.
func (*DLQMetadata) Descriptor ¶
func (*DLQMetadata) Descriptor() ([]byte, []int)
func (*DLQMetadata) GetData ¶
func (m *DLQMetadata) GetData() []byte
func (*DLQMetadata) GetOffset ¶
func (m *DLQMetadata) GetOffset() int64
func (*DLQMetadata) GetPartition ¶
func (m *DLQMetadata) GetPartition() int32
func (*DLQMetadata) GetRetryCount ¶
func (m *DLQMetadata) GetRetryCount() int64
func (*DLQMetadata) GetTimestampNs ¶
func (m *DLQMetadata) GetTimestampNs() int64
func (*DLQMetadata) GetTopic ¶
func (m *DLQMetadata) GetTopic() string
func (*DLQMetadata) ProtoMessage ¶
func (*DLQMetadata) ProtoMessage()
func (*DLQMetadata) Reset ¶
func (m *DLQMetadata) Reset()
func (*DLQMetadata) String ¶
func (m *DLQMetadata) String() string
type DLQMetadataDecoder ¶ added in v0.1.4
type DLQMetadataDecoder func([]byte) (*DLQMetadata, error)
DLQMetadataDecoder decodes a byte array into DLQMetadata.
type ErrorQType ¶ added in v0.2.1
type ErrorQType string
ErrorQType is the queue type to send messages to when using the DLQ interface.
var ( // RetryQErrorQType is the error queue for the retryQ. RetryQErrorQType ErrorQType = "retryQ" // DLQErrorQType is the error queue for DLQ. DLQErrorQType ErrorQType = "DLQ" // DLQConsumerGroupNameSuffix is the consumer group name used by the DLQ merge process. DLQConsumerGroupNameSuffix = "-dlq-merger" )
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message is a wrapper around kafka consumer message
func (*Message) MarshalLogObject ¶
func (m *Message) MarshalLogObject(e zapcore.ObjectEncoder) error
MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.
func (*Message) Nack ¶
Nack negatively acknowledges the message also moves the message to a DLQ if the consumer has a dlq configured. This method will *block* until enqueue to the dlq succeeds
func (*Message) NackToDLQ ¶ added in v0.2.1
NackToDLQ negatively acknowledges the message by sending it directly to the DLQ. This method will *block* until enqueue to the dlq succeeds
func (*Message) RetryCount ¶
RetryCount returns the number of times this message has be retried.
type MultiClusterConsumer ¶
type MultiClusterConsumer struct {
// contains filtered or unexported fields
}
MultiClusterConsumer is a map that contains multiple kafka consumers
func NewMultiClusterConsumer ¶
func NewMultiClusterConsumer( groupName string, topics kafka.ConsumerTopicList, clusterConsumerMap map[ClusterGroup]*ClusterConsumer, saramaClients map[ClusterGroup]sarama.Client, msgC chan kafka.Message, scope tally.Scope, logger *zap.Logger, ) *MultiClusterConsumer
NewMultiClusterConsumer returns a new consumer that consumes messages from multiple kafka clusters.
func (*MultiClusterConsumer) Closed ¶
func (c *MultiClusterConsumer) Closed() <-chan struct{}
Closed returns a channel that will be closed when the consumer is closed.
func (*MultiClusterConsumer) MergeDLQ ¶
func (c *MultiClusterConsumer) MergeDLQ(cluster, group, topic string, partition int32, offsetRange kafka.OffsetRange) error
MergeDLQ will merge the offset range for each partition of the DLQ topic for the specified ConsumerTopic. Topic should be the DLQ topic (with __dlq).
func (*MultiClusterConsumer) Messages ¶
func (c *MultiClusterConsumer) Messages() <-chan kafka.Message
Messages returns a channel to receive messages on.
func (*MultiClusterConsumer) Name ¶
func (c *MultiClusterConsumer) Name() string
Name returns the consumer group name used by this consumer.
func (*MultiClusterConsumer) ResetOffset ¶
func (c *MultiClusterConsumer) ResetOffset(cluster, group, topic string, partition int32, offsetRange kafka.OffsetRange) error
ResetOffset will reset the consumer offset for the specified cluster, topic, partition.
func (*MultiClusterConsumer) Start ¶
func (c *MultiClusterConsumer) Start() error
Start will fail to start if there is any clusterConsumer that fails.
func (*MultiClusterConsumer) Stop ¶
func (c *MultiClusterConsumer) Stop()
Stop will stop the consumer.
func (*MultiClusterConsumer) Topics ¶
func (c *MultiClusterConsumer) Topics() kafka.ConsumerTopicList
Topics returns a list of topics this consumer is consuming from.
type Options ¶
type Options struct { ClientID string // client ID RcvBufferSize int // aggregate message buffer size PartitionRcvBufferSize int // message buffer size for each partition Concurrency int // number of goroutines that will concurrently process messages OffsetPolicy int64 OffsetCommitInterval time.Duration RebalanceDwellTime time.Duration MaxProcessingTime time.Duration // amount of time a partitioned consumer will wait during a drain ConsumerMode cluster.ConsumerMode ProducerMaxMessageByes int FetchDefaultBytes int32 OtherConsumerTopics []Topic TLSConfig *tls.Config // TLSConfig is the configuration to use for secure connections, not nil -> enable, nil -> disabled }
Options are the tunable and injectable options for the consumer
type PartitionConsumer ¶
type PartitionConsumer interface { Start() error Stop() Drain(time.Duration) ResetOffset(kafka.OffsetRange) error }
PartitionConsumer is the consumer for a specific kafka partition
func NewPartitionConsumer ¶
func NewPartitionConsumer( topic Topic, sarama SaramaConsumer, pConsumer cluster.PartitionConsumer, options *Options, msgCh chan kafka.Message, dlq DLQ, scope tally.Scope, logger *zap.Logger) PartitionConsumer
NewPartitionConsumer returns a kafka consumer that can read messages from a given [ topic, partition ] tuple
func NewPartitionConsumerWithoutCommit ¶ added in v0.2.0
func NewPartitionConsumerWithoutCommit( topic Topic, sarama SaramaConsumer, pConsumer cluster.PartitionConsumer, options *Options, msgCh chan kafka.Message, dlq DLQ, scope tally.Scope, logger *zap.Logger) PartitionConsumer
NewPartitionConsumerWithoutCommit returns a kafka consumer that can read messages from a given [ topic, partition ] tuple where commits are disabled.
func NewRangePartitionConsumer ¶
func NewRangePartitionConsumer( topic Topic, sarama SaramaConsumer, pConsumer cluster.PartitionConsumer, options *Options, msgCh chan kafka.Message, dlq DLQ, scope tally.Scope, logger *zap.Logger) PartitionConsumer
NewRangePartitionConsumer returns a kafka consumer that can read messages from a given [ topic, partition ] tuple. Commits are always enabled.
type PartitionConsumerFactory ¶
type PartitionConsumerFactory func( topic Topic, sarama SaramaConsumer, pConsumer cluster.PartitionConsumer, options *Options, msgCh chan kafka.Message, dlq DLQ, scope tally.Scope, logger *zap.Logger) PartitionConsumer
PartitionConsumerFactory is a factory method for returning PartitionConsumer. NewPartitionConsumer returns an unbounded partition consumer. NewRangePartitionConsumer returns a range partition consumer.
type SaramaConsumer ¶
type SaramaConsumer interface { Close() error Errors() <-chan error Notifications() <-chan *cluster.Notification Partitions() <-chan cluster.PartitionConsumer CommitOffsets() error Messages() <-chan *sarama.ConsumerMessage HighWaterMarks() map[string]map[int32]int64 MarkOffset(msg *sarama.ConsumerMessage, metadata string) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) }
SaramaConsumer is an interface for external consumer library (sarama)
func NewSaramaConsumer ¶
func NewSaramaConsumer(brokers []string, groupID string, topics []string, config *cluster.Config) (SaramaConsumer, error)
NewSaramaConsumer returns a new SaramaConsumer that has a Close method that can be called multiple times.
type Topic ¶
type Topic struct { kafka.ConsumerTopic DLQMetadataDecoder PartitionConsumerFactory ConsumerGroupSuffix string }
Topic is an internal wrapper around kafka.ConsumerTopic
func (Topic) MarshalLogObject ¶
func (t Topic) MarshalLogObject(e zapcore.ObjectEncoder) error
MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.
type TopicConsumer ¶
type TopicConsumer struct {
// contains filtered or unexported fields
}
TopicConsumer is a consumer for a specific topic. TopicConsumer is an abstraction that runs on the same goroutine as the cluster consumer.
func NewTopicConsumer ¶
func NewTopicConsumer( topic Topic, msgC chan kafka.Message, consumer SaramaConsumer, dlq DLQ, options *Options, scope tally.Scope, logger *zap.Logger, ) *TopicConsumer
NewTopicConsumer returns a new TopicConsumer for consuming from a single topic.
func (*TopicConsumer) ResetOffset ¶
func (c *TopicConsumer) ResetOffset(partition int32, offsetRange kafka.OffsetRange) error
ResetOffset will reset the consumer offset for the specified topic, partition.
func (*TopicConsumer) Start ¶
func (c *TopicConsumer) Start() error
Start the DLQ consumer goroutine.
func (*TopicConsumer) Stop ¶
func (c *TopicConsumer) Stop()
Stop shutdown and frees the resource held by this TopicConsumer and stops the batch DLQ producer.