Documentation ¶
Index ¶
- type AsyncProducer
- type Config
- func (c *Config) NewClient(ctx context.Context) (sarama.Client, error)
- func (c *Config) RegisterAdminFlags(flags *pflag.FlagSet)
- func (c *Config) RegisterBaseFlags(flags *pflag.FlagSet)
- func (c *Config) RegisterConsumerFlags(flags *pflag.FlagSet)
- func (c *Config) RegisterConsumerGroupFlags(flags *pflag.FlagSet)
- func (c *Config) RegisterMetadataFlags(flags *pflag.FlagSet)
- func (c *Config) RegisterNetFlags(flags *pflag.FlagSet)
- func (c *Config) RegisterProducerFlags(flags *pflag.FlagSet)
- type ConnectAvroUnmarshaller
- type ConnectJSONUnmarshaller
- type Consumer
- type ConsumerMetrics
- type MessageUnmarshaller
- type PartitionConsumer
- type ProducerMetrics
- type SchemaRegistryClient
- func (c *SchemaRegistryClient) CheckSchema(ctx context.Context, subject string, schema string, isKey bool) (*schemaResponse, error)
- func (c *SchemaRegistryClient) CreateSchema(ctx context.Context, subject string, schema string, isKey bool) (*schemaResponse, error)
- func (c *SchemaRegistryClient) DecodeKafkaAvroMessage(ctx context.Context, message *sarama.ConsumerMessage) (interface{}, error)
- func (c *SchemaRegistryClient) EncodeKafkaAvroMessage(ctx context.Context, schemaID uint, message interface{}) ([]byte, error)
- func (c *SchemaRegistryClient) GetCodec(ctx context.Context, id uint) (*goavro.Codec, error)
- func (c *SchemaRegistryClient) GetSchema(ctx context.Context, id uint) (string, error)
- type SchemaRegistryConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncProducer ¶ added in v0.29.0
type AsyncProducer struct { sarama.AsyncProducer // contains filtered or unexported fields }
AsyncProducer is a drop-in replacement for the sarama AsyncProducer that adds Prometheus metrics on its performance.
func NewAsyncProducerFromClient ¶ added in v0.29.0
func NewAsyncProducerFromClient(client sarama.Client, metrics ProducerMetrics) (AsyncProducer, error)
NewAsyncProducerFromClient creates a new AsyncProducer from a sarama Client that pushes metrics to the provided ProducerMetrics. Note that metrics are only collected if the producer is configured to return successes and errors.
func (AsyncProducer) AsyncClose ¶ added in v0.29.0
func (ap AsyncProducer) AsyncClose()
AsyncClose triggers a shutdown of the producer. The producer will be shutdown when the input, errors, and successes channels are closed.
func (AsyncProducer) Close ¶ added in v0.29.0
func (ap AsyncProducer) Close() error
Close synchronously shuts down the producer and waits for any buffered messages to be flushed before returning.
func (AsyncProducer) Errors ¶ added in v0.29.0
func (ap AsyncProducer) Errors() <-chan *sarama.ProducerError
Errors returns the output channel where errored messages will be returned if ProducerReturnErrors was true when configuring the client.
func (AsyncProducer) Successes ¶ added in v0.29.0
func (ap AsyncProducer) Successes() <-chan *sarama.ProducerMessage
Successes returns the output channel where successfully written messages will be returned if ProducerReturnSuccesses was true when configuring the client.
type Config ¶
type Config struct { sarama.Config // Prometheus registerer for metrics Registerer prometheus.Registerer // Frequency with which to collect metrics MetricsFrequency time.Duration BrokerAddrs []string Verbose bool TLSCaCrtPath string TLSCrtPath string TLSKeyPath string // version to be parsed and loaded into sarama.Config.KafkaVersion KafkaVersion string // value to be cast to sarama.RequiredAcks and loaded into sarama.Config.Producer.RequiredAcks ProducerRequiredAcks int16 // value to be parsed to sarama.CompressionCodec and loaded into sarama.Config.Producer.CompressionCoded ProducerCompressionCodec string }
Config contains all configuration for Kafka message consumption and production
func NewDefaultConfig ¶ added in v0.29.0
func NewDefaultConfig() Config
NewDefaultConfig creates a new default Kafka configuration
func (*Config) NewClient ¶
NewClient creates a new Sarama Client from the tools configuration. Using this version of NewClient enables setting of Sarama configuration from the CLI and environment variables. In addition, this method has the side effect of running a periodic task to collect prometheus from the Sarama internal metrics registry. Calling the cancel function associated with the provided context stops the periodic metrics collection. Note that this method overrides the Producer.Return.Successes, Producer.Return.Errors, and Consumer.Return.Errors and sets them all to true since those options are required for metrics collection and tracing. Code that uses the client generated by this method must handle those cases appropriately.
func (*Config) RegisterAdminFlags ¶ added in v0.29.0
RegisterAdminFlags registers options for the Kafka admin API.
func (*Config) RegisterBaseFlags ¶ added in v0.29.0
RegisterBaseFlags registers basic Kafka configuration. If using Kafka, these flags should always be registered.
func (*Config) RegisterConsumerFlags ¶ added in v0.29.0
RegisterConsumerFlags registers configuration options for Kafka consumers.
func (*Config) RegisterConsumerGroupFlags ¶ added in v0.29.0
RegisterConsumerGroupFlags registers options for Kafka consumer group configuration.
func (*Config) RegisterMetadataFlags ¶ added in v0.29.0
RegisterMetadataFlags registers configuration for fetching metadata from the Kafka broker.
func (*Config) RegisterNetFlags ¶ added in v0.29.0
RegisterNetFlags registers configuration for connection to the Kafka broker including TLS configuration.
func (*Config) RegisterProducerFlags ¶ added in v0.29.0
RegisterProducerFlags registers configuration options for Kafka producers.
type ConnectAvroUnmarshaller ¶ added in v0.29.0
type ConnectAvroUnmarshaller struct {
SchemaRegistryClient
}
ConnectAvroUnmarshaller is a helper for unmarshalling Kafka Avro messages, taking into account the quirks of the Kafka Connect producer's format
func (ConnectAvroUnmarshaller) Unmarshal ¶ added in v0.29.0
func (u ConnectAvroUnmarshaller) Unmarshal(ctx context.Context, msg *sarama.ConsumerMessage, target interface{}) []error
Unmarshal takes the contents of the ConsumerMessage and unmarshals it into the target using avro decoding, returning any and all errors that occur during unmarshalling
type ConnectJSONUnmarshaller ¶ added in v0.29.0
type ConnectJSONUnmarshaller struct{}
ConnectJSONUnmarshaller is a helper for unmarshalling Kafka JSON messages, taking into account the quirks of the Kafka Connect producer's format
func (ConnectJSONUnmarshaller) Unmarshal ¶ added in v0.29.0
func (u ConnectJSONUnmarshaller) Unmarshal(ctx context.Context, msg *sarama.ConsumerMessage, target interface{}) []error
Unmarshal takes the contents of the ConsumerMessage and unmarshals it into the target using JSON decoding, returning any and all errors that occur during unmarshalling
type Consumer ¶
Consumer is a drop-in replacement for the sarama consumer that adds Prometheus metrics on the number of messages read and errors received. This consumer implementation creates the drop-in PartitionConsumer from this package.
func NewConsumerFromClient ¶ added in v0.29.0
func NewConsumerFromClient(client sarama.Client, metrics ConsumerMetrics) (Consumer, error)
NewConsumerFromClient creates a new Consumer from a sarama Client with the given set of metrics. Note that error metrics can only be collected if the consumer is configured to return errors.
func (Consumer) ConsumePartition ¶ added in v0.29.0
func (c Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
ConsumePartition creates a wrapped PartitionConsumer on the given topic and partition starting at the given offset.
type ConsumerMetrics ¶ added in v0.10.0
type ConsumerMetrics struct {
// contains filtered or unexported fields
}
ConsumerMetrics is a collection of Prometheus metrics for tracking a Kafka consumer's performance
func NewConsumerMetrics ¶ added in v0.29.0
func NewConsumerMetrics(registerer prometheus.Registerer) (ConsumerMetrics, error)
NewConsumerMetrics creates and registers metrics for the Kafka Consumer with the provided prometheus registerer
type MessageUnmarshaller ¶ added in v0.29.0
type MessageUnmarshaller interface { // Unmarshal takes the contents of the ConsumerMessage and unmarshals it into the target, returning // any and all errors that occur during unmarshalling Unmarshal(ctx context.Context, msg *sarama.ConsumerMessage, target interface{}) []error }
MessageUnmarshaller are helpers for unmarshalling Kafka messages into Go types
type PartitionConsumer ¶ added in v0.29.0
type PartitionConsumer struct { sarama.PartitionConsumer // contains filtered or unexported fields }
PartitionConsumer is a drop-in replacement for the sarama partition consumer.
func (PartitionConsumer) AsyncClose ¶ added in v0.29.0
func (pc PartitionConsumer) AsyncClose()
AsyncClose initiates a shutdown of the PartitionConsumer. This method returns immediately. Once the consumer is shutdown, the message and error channels are closed.
func (PartitionConsumer) Close ¶ added in v0.29.0
func (pc PartitionConsumer) Close() error
Close synchronously shuts down the partition consumer and returns any outstanding errors.
func (PartitionConsumer) Errors ¶ added in v0.29.0
func (pc PartitionConsumer) Errors() <-chan *sarama.ConsumerError
Errors returns the read channel of errors that occurred during consumption if ConsumerReturnErrors was true when configuring the client.
func (PartitionConsumer) Messages ¶ added in v0.29.0
func (pc PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
Messages returns the read channel for the messages returned by the broker
type ProducerMetrics ¶ added in v0.10.0
type ProducerMetrics struct {
// contains filtered or unexported fields
}
ProducerMetrics is a collection of Prometheus metrics for tracking a Kafka producer's performance
func NewProducerMetrics ¶ added in v0.29.0
func NewProducerMetrics(registerer prometheus.Registerer) (ProducerMetrics, error)
NewProducerMetrics creates and registers metrics for the Kafka Producer with the provided prometheus registerer.
type SchemaRegistryClient ¶ added in v0.29.0
type SchemaRegistryClient struct { SchemaRegistryConfig // contains filtered or unexported fields }
SchemaRegistryClient provides functionality for interacting with Kafka schema registry. This type has methods for getting schemas from the registry and decoding sarama ConsumerMessages from Avro into Go types. In addition, since the schema registry is immutable, the client contains a cache of schemas so that a network request to the registry does not have to be made for every Kafka message that needs to be decoded.
func (*SchemaRegistryClient) CheckSchema ¶ added in v0.46.0
func (c *SchemaRegistryClient) CheckSchema(ctx context.Context, subject string, schema string, isKey bool) (*schemaResponse, error)
CheckSchema will check if the schema exists for the given subject
func (*SchemaRegistryClient) CreateSchema ¶ added in v0.46.0
func (c *SchemaRegistryClient) CreateSchema(ctx context.Context, subject string, schema string, isKey bool) (*schemaResponse, error)
CreateSchema creates a new schema in Schema Registry. The Schema Registry compares this against existing known schemas. If this schema matches an existing schema, a new schema will not be created and instead the existing ID will be returned. This applies even if the schema is assgined only to another subject.
func (*SchemaRegistryClient) DecodeKafkaAvroMessage ¶ added in v0.29.0
func (c *SchemaRegistryClient) DecodeKafkaAvroMessage(ctx context.Context, message *sarama.ConsumerMessage) (interface{}, error)
DecodeKafkaAvroMessage decodes the given Kafka message encoded with Avro into a Go type.
func (*SchemaRegistryClient) EncodeKafkaAvroMessage ¶ added in v0.48.0
func (c *SchemaRegistryClient) EncodeKafkaAvroMessage(ctx context.Context, schemaID uint, message interface{}) ([]byte, error)
EncodeKafkaAvroMessage encode the given Kafka message encoded with Avro into a Go type.
type SchemaRegistryConfig ¶
type SchemaRegistryConfig struct {
URL string
}
SchemaRegistryConfig defines the necessary configuration for interacting with Kafka Schema Registry
func (SchemaRegistryConfig) NewSchemaRegistryClient ¶ added in v0.29.0
func (c SchemaRegistryConfig) NewSchemaRegistryClient(httpMetrics shHTTP.Metrics) *SchemaRegistryClient
NewSchemaRegistryClient creates a schema registry client with the given HTTP metrics bundle.
func (*SchemaRegistryConfig) RegisterFlags ¶
func (c *SchemaRegistryConfig) RegisterFlags(flags *pflag.FlagSet)
RegisterFlags registers schema registry flags with pflags