Documentation ¶
Index ¶
- Constants
- type Client
- type ClientConfig
- type ClientIface
- type Consumer
- func (c Consumer) Close()
- func (c Consumer) ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, ...) error
- func (c Consumer) ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, ...) error
- func (c Consumer) ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, ...) error
- type ConsumerConfig
- type ConsumerIface
- type ConsumerMetrics
- type MessageHandler
- type MessageUnmarshaler
- type MockClient
- type MockKafkaConsumer
- func (m *MockKafkaConsumer) Close()
- func (m *MockKafkaConsumer) ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, ...) error
- func (m *MockKafkaConsumer) ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, ...) error
- func (m *MockKafkaConsumer) ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, ...) error
- func (m *MockKafkaConsumer) EmitReadResult(offsets PartitionOffsets)
- type MockProducer
- type PartitionOffsets
- type Producer
- type ProducerConfig
- type ProducerIface
- type ProducerMetrics
- type SchemaRegistryConfig
Constants ¶
const ( // RequiredAcksAll waits for all in-sync replicas to commit before responding. RequiredAcksAll int = -1 // RequiredAcksNone waits for no acknowledgements. RequiredAcksNone = 0 // RequiredAcksLocal waits for only the local commit to succeed before responding. RequiredAcksLocal = 1 )
Constant values that represent the required acks setting for produced messages. These map to the sarama.RequiredAcks constants
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { ClientConfig SaramaClient sarama.Client // contains filtered or unexported fields }
Client wraps a sarama client and Kafka configuration and can be used to create producers and consumers
func (Client) Close ¶
Close the underlying Kafka client and stop the Kafka broker metrics gathering task. If an error occurs closing the client, the error is logged.
func (Client) NewConsumer ¶
func (c Client) NewConsumer(config ConsumerConfig, logger *zap.Logger) (ConsumerIface, error)
NewConsumer sets up a high-level Kafka consumer that can be used for reading all partitions of a given topic a given offset. NewConsumer also sets up Prometheus metrics with the default registerer and configures schema registry if set in the configuration.
func (Client) NewProducer ¶
func (c Client) NewProducer(config ProducerConfig, logger *zap.Logger, returnMessages bool) (ProducerIface, error)
NewProducer creates a sarama producer from a client. If the returnMessages flag is true, messages from the producer will be produced on the Success or Errors channel depending on the outcome of the produced message. This method also registers producer metrics with the default Prometheus registerer. Note that this method has the side effect of setting the compression level and codec on the provided client's underlying configuration.
type ClientConfig ¶ added in v0.10.0
type ClientConfig struct { Broker string ClientID string TLSCaCrtPath string TLSCrtPath string TLSKeyPath string Verbose bool KafkaVersion string }
ClientConfig contains connection settings and configuration for communicating with a Kafka cluster
func (ClientConfig) NewClient ¶ added in v0.10.0
func (c ClientConfig) NewClient(ctx context.Context) (Client, error)
NewClient creates a Sarama client from configuration and starts a periodic task for capturing Kafka broker metrics. The client is instantiated with configuration from the ClientConfiguration and the following options are turned on: * The Consumer returns errors * The Producer returns successes * The Producer returns errors
func (*ClientConfig) RegisterFlags ¶ added in v0.10.0
func (c *ClientConfig) RegisterFlags(flags *pflag.FlagSet)
Registers Kafka client flags with pflags
type ClientIface ¶ added in v0.7.0
type ClientIface interface { NewConsumer(config ConsumerConfig, logger *zap.Logger) (ConsumerIface, error) NewProducer(config ProducerConfig, logger *zap.Logger, returnMessages bool) (ProducerIface, error) }
ClientIface is an interface for creating consumers and producers
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a high-level Kafka consumer that can consume off all partitions on a given topic, tracks metrics, provides optional logging, and unmarshals messages before passing them off to a handler.
func (Consumer) ConsumeTopic ¶
func (c Consumer) ConsumeTopic( ctx context.Context, handler MessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool, ) error
ConsumeTopic consumes a particular Kafka topic from startOffset to endOffset or from startOffset to forever
This function will create consumers for all partitions in a topic and read from the given offset on each partition to the latest offset when the consumer was started, then notify the caller via catchupWg. If exitAfterCaughtUp is true, the consumer will exit after it reads message at the latest offset when it started up. When all partition consumers are closed, it will send the last offset read on each partition through the readResult channel.
func (Consumer) ConsumeTopicFromBeginning ¶
func (c Consumer) ConsumeTopicFromBeginning( ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool, ) error
ConsumeTopicFromBeginning starts Kafka consumers on all partitions in a given topic from the message with the oldest offset.
func (Consumer) ConsumeTopicFromLatest ¶
func (c Consumer) ConsumeTopicFromLatest( ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, ) error
ConsumeTopicFromLatest starts Kafka consumers on all partitions in a given topic from the message with the latest offset.
type ConsumerConfig ¶ added in v0.10.0
type ConsumerConfig struct { JSONEnabled bool SchemaRegistry *SchemaRegistryConfig }
ConsumerConfig contains consumer-specific configuration including whether or not to use JSON deserialization and schema registry configuration.
func NewDefaultConsumerConfig ¶ added in v0.12.1
func NewDefaultConsumerConfig() ConsumerConfig
NewDefaultConsumerConfig constructs and returns a usable ConsumerConfig with default settings
func (*ConsumerConfig) RegisterFlags ¶ added in v0.10.0
func (c *ConsumerConfig) RegisterFlags(flags *pflag.FlagSet)
Registers high-level consumer flags with pflags
type ConsumerIface ¶
type ConsumerIface interface { ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets) error Close() }
ConsumerIface is an interface for consuming messages from a Kafka topic
type ConsumerMetrics ¶ added in v0.10.0
type ConsumerMetrics struct { MessagesProcessed *prometheus.GaugeVec MessageErrors *prometheus.GaugeVec MessageProcessingTime *prometheus.SummaryVec ErrorsProcessed *prometheus.GaugeVec }
ConsumerMetrics is a collection of Prometheus metrics for tracking a Kafka consumer's performance
func RegisterConsumerMetrics ¶ added in v0.10.0
func RegisterConsumerMetrics(registerer prometheus.Registerer) ConsumerMetrics
RegisterConsumerMetrics registers Kafka consumer metrics with the provided registerer and returns a struct containing consumer Prometheus metrics.
type MessageHandler ¶
type MessageHandler interface {
HandleMessage(ctx context.Context, msg *sarama.ConsumerMessage, unmarshaler MessageUnmarshaler) error
}
MessageHandler defines an interface for handling new messages received by the Kafka consumer
type MessageUnmarshaler ¶
type MessageUnmarshaler interface {
UnmarshalMessage(ctx context.Context, msg *sarama.ConsumerMessage, target interface{}) error
}
MessageUnmarshaler defines an interface for unmarshaling messages received from Kafka to Go types
type MockClient ¶ added in v0.7.0
MockClient implements ClientIface for testing purposes
func (*MockClient) NewConsumer ¶ added in v0.7.0
func (m *MockClient) NewConsumer(config ConsumerConfig, logger *zap.Logger) (ConsumerIface, error)
NewConsumer creates a new mock consumer
func (*MockClient) NewProducer ¶ added in v0.7.0
func (m *MockClient) NewProducer(config ProducerConfig, logger *zap.Logger, returnMessages bool) (ProducerIface, error)
NewProducer creates a new mock producer
type MockKafkaConsumer ¶
MockKafkaConsumer implements KafkaConsumerIface for testing purposes
func (*MockKafkaConsumer) Close ¶
func (m *MockKafkaConsumer) Close()
Close mocks the Kafka consumer Close method
func (*MockKafkaConsumer) ConsumeTopic ¶
func (m *MockKafkaConsumer) ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
ConsumeTopic mocks the Kafka consumer ConsumeTopic method
func (*MockKafkaConsumer) ConsumeTopicFromBeginning ¶
func (m *MockKafkaConsumer) ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
ConsumeTopicFromBeginning mocks the Kafka consumer ConsumeTopicFromBeginning method
func (*MockKafkaConsumer) ConsumeTopicFromLatest ¶
func (m *MockKafkaConsumer) ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets) error
ConsumeTopicFromLatest mocks the Kafka consumer ConsumeTopicFromLatest method
func (*MockKafkaConsumer) EmitReadResult ¶
func (m *MockKafkaConsumer) EmitReadResult(offsets PartitionOffsets)
EmitReadResult allows tests to send values through the readResult channel passed into the mock consumer.
type MockProducer ¶ added in v0.7.0
type MockProducer struct { mock.Mock Messages chan *sarama.ProducerMessage Done chan bool // contains filtered or unexported fields }
MockProducer implements the producer interface for testing and includes channels for mocking out messages
func NewMockProducer ¶ added in v0.7.0
func NewMockProducer() *MockProducer
NewMockProducer creates a new mock producer
func (*MockProducer) Errors ¶ added in v0.7.0
func (m *MockProducer) Errors() chan *sarama.ProducerError
Errors returns the channel on which messages that could not be published will be returned
func (*MockProducer) RunProducer ¶ added in v0.7.0
func (m *MockProducer) RunProducer(messages chan *sarama.ProducerMessage, done chan bool)
RunProducer mocks the RunProducer call. This method has the side effect of setting the Messages and Done channels on the mock producer to the channel that was passed into the method.
func (*MockProducer) Successes ¶ added in v0.7.0
func (m *MockProducer) Successes() chan *sarama.ProducerMessage
Successes returns the channel on which successfully published messages will be returned
type PartitionOffsets ¶
PartitionOffsets is a mapping of partition ID to an offset to which a consumer read on that partition
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is a wrapped Sarama producer that tracks producer metrics and provides optional logging.
func (Producer) Errors ¶ added in v0.5.0
func (p Producer) Errors() chan *sarama.ProducerError
Errors returns the channel on which messages that could not be published will be returned
func (Producer) RunProducer ¶
func (p Producer) RunProducer(messages chan *sarama.ProducerMessage, done chan bool)
RunProducer wraps the sarama AsyncProducer and adds metrics and optional logging to the producer. To stop the producer, close the messages channel; when the producer is shutdown the done channel will be closed. If the messages channel is unbuffered, each message sent to the producer is guaranteed to at least have been attempted to be produced to Kafka.
func (Producer) Successes ¶ added in v0.5.0
func (p Producer) Successes() chan *sarama.ProducerMessage
Successes returns the channel on which successfully published messages will be returned
type ProducerConfig ¶ added in v0.10.0
type ProducerConfig struct { ProducerCompressionCodec string ProducerCompressionLevel int ProducerRequiredAcks int }
ProducerConfig contains producer-specific configuration information
func (*ProducerConfig) RegisterFlags ¶ added in v0.10.0
func (c *ProducerConfig) RegisterFlags(flags *pflag.FlagSet)
RegisterFlags registers producer flags with pflags
type ProducerIface ¶ added in v0.7.0
type ProducerIface interface { RunProducer(messages chan *sarama.ProducerMessage, done chan bool) Successes() chan *sarama.ProducerMessage Errors() chan *sarama.ProducerError }
ProducerIface is an interface for producing Kafka messages
type ProducerMetrics ¶ added in v0.10.0
type ProducerMetrics struct { MessagesProduced *prometheus.GaugeVec ErrorsProduced *prometheus.GaugeVec }
ProducerMetrics is a collection of Prometheus metrics for tracking a Kafka producer's performance
func RegisterProducerMetrics ¶ added in v0.10.0
func RegisterProducerMetrics(registerer prometheus.Registerer) ProducerMetrics
RegisterProducerMetrics registers Kafka producer metrics with the provided registerer and returns a struct containing gauges for the number of messages and errors produced.
type SchemaRegistryConfig ¶
type SchemaRegistryConfig struct { SchemaRegistryURL string // contains filtered or unexported fields }
SchemaRegistryConfig defines the necessary configuration for interacting with Schema Registry
func (*SchemaRegistryConfig) RegisterFlags ¶
func (src *SchemaRegistryConfig) RegisterFlags(flags *pflag.FlagSet)
RegisterFlags registers Kafka flags with pflags
func (*SchemaRegistryConfig) UnmarshalMessage ¶
func (src *SchemaRegistryConfig) UnmarshalMessage( ctx context.Context, msg *sarama.ConsumerMessage, target interface{}, ) error
UnmarshalMessage Implements the KafkaMessageUnmarshaler interface. Decodes an Avro message into a Go struct type, specifically an Avro message from Kafka. Avro schemas are fetched from Kafka schema registry. To use this function, tag each field of the target struct with a `kafka` tag whose value indicates which key on the Avro message to set as the value.