Documentation ¶
Overview ¶
Package kafka abstracts the production and consumption of records to and from Kafka.
Index ¶
- Variables
- type BatchWriteListener
- type CommonConfig
- type CompressionCodec
- type Consumer
- type ConsumerConfig
- type Manager
- func (m *Manager) Close() error
- func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) error
- func (m *Manager) Healthy(ctx context.Context) error
- func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (metric.Registration, error)
- func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error)
- type ManagerConfig
- type Producer
- type ProducerConfig
- type SASLMechanism
- type TopicAttributeFunc
- type TopicCreator
- type TopicCreatorConfig
- type TopicLogFieldFunc
Constants ¶
This section is empty.
Variables ¶
var ( // ErrCommitFailed may be returned by `consumer.Run` when DeliveryType is // apmqueue.AtMostOnceDelivery. ErrCommitFailed = errors.New("kafka: failed to commit offsets") )
Functions ¶
This section is empty.
Types ¶
type BatchWriteListener ¶ added in v2.1.3
BatchWriteListener specifies a callback function that is invoked after a batch is successfully produced to a Kafka broker. It is invoked with the corresponding topic and the amount of bytes written to that topic (taking compression into account, when applicable).
func (BatchWriteListener) OnProduceBatchWritten ¶ added in v2.1.3
func (l BatchWriteListener) OnProduceBatchWritten(_ kgo.BrokerMetadata, topic string, _ int32, m kgo.ProduceBatchMetrics)
OnProduceBatchWritten implements the kgo.HookProduceBatchWritten interface.
type CommonConfig ¶
type CommonConfig struct { // ConfigFile holds the path to an optional YAML configuration file, // which configures Brokers and SASL. // // If ConfigFile is unspecified, but $KAFKA_CONFIG_FILE is specified, // it will be used to populate ConfigFile. Either way if a file is // specified, it must exist when a client is initially created. // // The following properties from // https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md // are honoured: // // - bootstrap.servers ($KAFKA_BROKERS) // - sasl.mechanism ($KAFKA_SASL_MECHANISM) // - sasl.username ($KAFKA_USERNAME) // - sasl.password ($KAFKA_PASSWORD) // // If bootstrap.servers is defined, then it takes precedence over // CommonCnfig.Brokers. When a connection to a broker fails, the // config file will be reloaded, and the seed brokers will be // updated if bootstrap.servers has changed. // // If sasl.mechanism is set to PLAIN, or if sasl.username is defined, // then SASL/PLAIN will be configured. Whenever a new connection is // created, the config will be reloaded in case the username or // password has been updated. If sasl.mechanism is set to AWS_MSK_IAM, // then SASL/AWS_MSK_IAM is configured using the AWS SDK. Dynamic // changes to the sasl.mechanism value are not supported. ConfigFile string // Namespace holds a namespace for Kafka topics. // // This is added as a prefix for topics names, and acts as a filter // on topics monitored or described by the manager. // // Namespace is always removed from topic names before they are // returned to callers. The only way Namespace will surface is in // telemetry (e.g. metrics), as an independent dimension. This // enables users to filter metrics by namespace, while maintaining // stable topic names. Namespace string // Brokers is the list of kafka brokers used to seed the Kafka client. // // If Brokers is unspecified, but $KAFKA_BROKERS is specified, it will // be parsed as a comma-separated list of broker addresses and used. Brokers []string // ClientID to use when connecting to Kafka. This is used for logging // and client identification purposes. ClientID string // Version is the software version to use in the Kafka client. This is // useful since it shows up in Kafka metrics and logs. Version string // SASL configures the kgo.Client to use SASL authorization. // // If SASL is unspecified, then it may be derived from environment // variables as follows: // // - if $KAFKA_SASL_MECHANISM is set to PLAIN, or if $KAFKA_USERNAME // and $KAFKA_PASSWORD are both specified, then SASL/PLAIN will be // configured // - if $KAFKA_SASL_MECHANISM is set to AWS_MSK_IAM, then // SASL/AWS_MSK_IAM will be configured using the AWS SDK SASL SASLMechanism // TLS configures the kgo.Client to use TLS for authentication. // This option conflicts with Dialer. Only one can be used. // // If neither TLS nor Dialer are specified, then TLS will be configured // by default unless the environment variable $KAFKA_PLAINTEXT is set to // "true". In case TLS is auto-configured, $KAFKA_TLS_INSECURE may be // set to "true" to disable server certificate and hostname verification. TLS *tls.Config // Dialer uses fn to dial addresses, overriding the default dialer that uses a // 10s dial timeout and no TLS (unless TLS option is set). // // The context passed to the dial function is the context used in the request // that caused the dial. If the request is a client-internal request, the // context is the context on the client itself (which is canceled when the // client is closed). // This option conflicts with TLS. Only one can be used. Dialer func(ctx context.Context, network, address string) (net.Conn, error) // Logger to use for any errors. Logger *zap.Logger // DisableTelemetry disables the OpenTelemetry hook. DisableTelemetry bool // TracerProvider allows specifying a custom otel tracer provider. // Defaults to the global one. TracerProvider trace.TracerProvider // MeterProvider allows specifying a custom otel meter provider. // Defaults to the global one. MeterProvider metric.MeterProvider // TopicAttributeFunc can be used to create custom dimensions from a Kafka // topic for these metrics: // - producer.messages.count // - consumer.messages.fetched TopicAttributeFunc TopicAttributeFunc // TopicAttributeFunc can be used to create custom dimensions from a Kafka // topic for log messages TopicLogFieldFunc TopicLogFieldFunc // MetadataMaxAge is the maximum age of metadata before it is refreshed. // The lower the value the more frequently new topics will be discovered. // If zero, the default value of 5 minutes is used. MetadataMaxAge time.Duration // contains filtered or unexported fields }
CommonConfig defines common configuration for Kafka consumers, producers, and managers.
type CompressionCodec ¶
type CompressionCodec = kgo.CompressionCodec
CompressionCodec configures how records are compressed before being sent. Type alias to kgo.CompressionCodec.
func GzipCompression ¶
func GzipCompression() CompressionCodec
GzipCompression enables gzip compression with the default compression level.
func Lz4Compression ¶
func Lz4Compression() CompressionCodec
Lz4Compression enables lz4 compression with the fastest compression level.
func NoCompression ¶
func NoCompression() CompressionCodec
NoCompression is a compression option that avoids compression. This can always be used as a fallback compression.
func SnappyCompression ¶
func SnappyCompression() CompressionCodec
SnappyCompression enables snappy compression.
func ZstdCompression ¶
func ZstdCompression() CompressionCodec
ZstdCompression enables zstd compression with the default compression level.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer wraps a Kafka consumer and the consumption implementation details. Consumes each partition in a dedicated goroutine.
func NewConsumer ¶
func NewConsumer(cfg ConsumerConfig) (*Consumer, error)
NewConsumer creates a new instance of a Consumer. The consumer will read from each partition concurrently by using a dedicated goroutine per partition.
func (*Consumer) Healthy ¶
Healthy returns an error if the Kafka client fails to reach a discovered broker.
func (*Consumer) Run ¶
Run the consumer until a non recoverable error is found:
- ErrCommitFailed.
To shut down the consumer, call consumer.Close() or cancel the context. Calling `consumer.Close` is advisable to ensure graceful shutdown and avoid any records from being lost (AMOD), or processed twice (ALOD). To ensure that all polled records are processed. Close() must be called, even when the context is canceled.
If called more than once, returns `apmqueue.ErrConsumerAlreadyRunning`.
type ConsumerConfig ¶
type ConsumerConfig struct { CommonConfig // Topics that the consumer will consume messages from Topics []apmqueue.Topic // ConsumeRegex sets the client to parse all topics passed to ConsumeTopics // as regular expressions. ConsumeRegex bool // GroupID to join as part of the consumer group. GroupID string // MaxPollRecords defines an upper bound to the number of records that can // be polled on a single fetch. If MaxPollRecords <= 0, defaults to 500. // Note that this setting doesn't change how `franz-go` fetches and buffers // events from Kafka brokers, it merely affects the number of records that // are returned on `client.PollRecords`. // The higher this setting, the higher the general processing throughput // be. However, when Delivery is set to AtMostOnce, the higher this number, // the more events lost if the process crashes or terminates abruptly. // // It is best to keep the number of polled records small or the consumer // risks being forced out of the group if it exceeds rebalance.timeout.ms. // Default: 500 // Kafka consumer setting: max.poll.records // Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_max.poll.records MaxPollRecords int // MaxPollWait defines the maximum amount of time a broker will wait for a // fetch response to hit the minimum number of required bytes before // returning // Default: 5s // Kafka consumer setting: fetch.max.wait.ms // Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_fetch.max.wait.ms MaxPollWait time.Duration // // MaxConcurrentFetches sets the maximum number of fetch requests to allow in // flight or buffered at once, overriding the unbounded (i.e. number of // brokers) default. // This setting, paired with FetchMaxBytes, can upper bound the maximum amount // of memory that the client can use for consuming. // Default: Unbounded, total number of brokers. // Docs: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#MaxConcurrentFetches MaxConcurrentFetches int // MaxPollBytes sets the maximum amount of bytes a broker will try to send // during a fetch // Default: 52428800 bytes (~52MB, 50MiB) // Kafka consumer setting: fetch.max.bytes // Docs: https://kafka.apache.org/28/documentation.html#brokerconfigs_fetch.max.bytes MaxPollBytes int32 // MaxPollPartitionBytes sets the maximum amount of bytes that will be consumed for // a single partition in a fetch request // Default: 1048576 bytes (~1MB, 1MiB) // Kafka consumer setting: max.partition.fetch.bytes // Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_max.partition.fetch.bytes MaxPollPartitionBytes int32 // ShutdownGracePeriod defines the maximum amount of time to wait for the // partition consumers to process events before the underlying kgo.Client // is closed, overriding the default 5s. ShutdownGracePeriod time.Duration // Delivery mechanism to use to acknowledge the messages. // AtMostOnceDeliveryType and AtLeastOnceDeliveryType are supported. // If not set, it defaults to apmqueue.AtMostOnceDeliveryType. Delivery apmqueue.DeliveryType // Processor that will be used to process each event individually. // It is recommended to keep the synchronous processing fast and below the // rebalance.timeout.ms setting in Kafka. // // The processing time of each processing cycle can be calculated as: // record.process.time * MaxPollRecords. Processor apmqueue.Processor // FetchMinBytes sets the minimum amount of bytes a broker will try to send // during a fetch, overriding the default 1 byte. // Default: 1 // Kafka consumer setting: fetch.min.bytes // Docs: https://kafka.apache.org/28/documentation.html#consumerconfigs_fetch.min.bytes FetchMinBytes int32 // BrokerMaxReadBytes sets the maximum response size that can be read from // Kafka, overriding the default 100MiB. BrokerMaxReadBytes int32 // ConsumePreferringLagFn alters the order in which partitions are consumed. // Use with caution, as this can lead to uneven consumption of partitions, // and in the worst case scenario, in partitions starved out from being consumed. PreferLagFn kgo.PreferLagFn }
ConsumerConfig defines the configuration for the Kafka consumer.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages Kafka topics.
func NewManager ¶
func NewManager(cfg ManagerConfig) (*Manager, error)
NewManager returns a new Manager with the given config.
func (*Manager) Close ¶
Close closes the manager's resources, including its connections to the Kafka brokers and any associated goroutines.
func (*Manager) DeleteTopics ¶
DeleteTopics deletes one or more topics.
No error is returned for topics that do not exist.
func (*Manager) Healthy ¶
Healthy returns an error if the Kafka client fails to reach a discovered broker.
func (*Manager) MonitorConsumerLag ¶
func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (metric.Registration, error)
MonitorConsumerLag registers a callback with OpenTelemetry to measure consumer group lag for the given topics.
func (*Manager) NewTopicCreator ¶
func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error)
NewTopicCreator returns a new TopicCreator with the given config.
type ManagerConfig ¶
type ManagerConfig struct {
CommonConfig
}
ManagerConfig holds configuration for managing Kafka topics.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer publishes events to Kafka. Implements the Producer interface.
func NewProducer ¶
func NewProducer(cfg ProducerConfig) (*Producer, error)
NewProducer returns a new Producer with the given config.
func (*Producer) Close ¶
Close stops the producer
This call is blocking and will cause all the underlying clients to stop producing. If producing is asynchronous, it'll block until all messages have been produced. After Close() is called, Producer cannot be reused.
func (*Producer) Healthy ¶
Healthy returns an error if the Kafka client fails to reach a discovered broker.
func (*Producer) Produce ¶
Produce produces N records. If the Producer is synchronous, waits until all records are produced, otherwise, returns as soon as the records are stored in the producer buffer, or when the records are produced to the queue if sync producing is configured. If the context has been enriched with metadata, each entry will be added as a record's header. Produce takes ownership of Record and any modifications after Produce is called may cause an unhandled exception.
type ProducerConfig ¶
type ProducerConfig struct { CommonConfig // MaxBufferedRecords sets the max amount of records the client will buffer MaxBufferedRecords int // ProducerBatchMaxBytes upper bounds the size of a record batch ProducerBatchMaxBytes int32 // ManualFlushing disables auto-flushing when producing. ManualFlushing bool // Sync can be used to indicate whether production should be synchronous. Sync bool // CompressionCodec specifies a list of compression codecs. // See kgo.ProducerBatchCompression for more details. // // If CompressionCodec is empty, then the default will be set // based on $KAFKA_PRODUCER_COMPRESSION_CODEC, which should be // a comma-separated list of codec preferences from the list: // // [none, gzip, snappy, lz4, zstd] // // If $KAFKA_PRODUCER_COMPRESSION_CODEC is not specified, then // the default behaviour of franz-go is to use [snappy, none]. CompressionCodec []CompressionCodec // ProduceCallback is a hook called after the record has been produced ProduceCallback func(*kgo.Record, error) // BatchListener is called per topic/partition after a batch is // successfully produced to a Kafka broker. BatchListener BatchWriteListener // RecordPartitioner is a function that returns the partition to which // a record should be sent. If nil, the default partitioner is used. RecordPartitioner kgo.Partitioner // AllowAutoTopicCreation enables topics to be auto created if they do // not exist when fetching their metadata. AllowAutoTopicCreation bool }
ProducerConfig holds configuration for publishing events to Kafka.
type TopicAttributeFunc ¶ added in v2.1.2
TopicAttributeFunc run on `kgo.HookProduceBatchWritten` and `kgo.HookFetchBatchRead` for each topic/partition. It can be used include additionaly dimensions for `consumer.messages.fetched` and `producer.messages.count` metrics.
type TopicCreator ¶
type TopicCreator struct {
// contains filtered or unexported fields
}
TopicCreator creates Kafka topics.
func (*TopicCreator) CreateTopics ¶
CreateTopics creates one or more topics.
Topics that already exist will be updated.
type TopicCreatorConfig ¶
type TopicCreatorConfig struct { // PartitionCount is the number of partitions to assign to // newly created topics. // // Must be non-zero. If PartitonCount is -1, the broker's // default value (requires Kafka 2.4+). PartitionCount int // TopicConfigs holds any topic configs to assign to newly // created topics, such as `retention.ms`. // // See https://kafka.apache.org/documentation/#topicconfigs TopicConfigs map[string]string // MeterProvider used to create meters and record metrics (Optional). MeterProvider metric.MeterProvider }
TopicCreatorConfig holds configuration for creating Kafka topics.
func (TopicCreatorConfig) Validate ¶
func (cfg TopicCreatorConfig) Validate() error
Validate ensures the configuration is valid, returning an error otherwise.
type TopicLogFieldFunc ¶ added in v2.10.0
TopicLogFieldFunc is a function that returns a zap.Field for a given topic.