Documentation
¶
Overview ¶
Package kafka implements tools to work with kafka Producers and Consumers.
Index ¶
- Variables
- func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)
- func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))
- type AdminClient
- type AdminConfig
- type AutoOffsetReset
- type BrokerAddressFamily
- type CommonConfigParams
- type ConfigMapper
- type ConfigValidator
- type ConsumerConfig
- type ConsumerConfigParams
- type DebugContext
- type DebugContexts
- type LogEmitter
- type PartitionAssignmentStrategies
- type PartitionAssignmentStrategy
- type Partitioner
- type ProducerConfig
- type ProducerConfigParams
- type TimeDurationMilliSeconds
- type TopicConfig
- type TopicProvisioner
- type TopicProvisionerConfig
- type ValidValues
Constants ¶
This section is empty.
Variables ¶
var AutoOffsetResetValues = []AutoOffsetReset{ AutoOffsetResetSmallest, AutoOffsetResetEarliest, AutoOffsetResetBeginning, AutoOffsetResetLargest, AutoOffsetResetLatest, AutoOffsetResetEnd, AutoOffsetResetError, }
var BrokerAddressFamilyValues = ValidValues[BrokerAddressFamily]{ BrokerAddressFamilyAny, BrokerAddressFamilyIPv4, BrokerAddressFamilyIPv6, }
var CooperativePartitionAssignmentStrategyValues = ValidValues[PartitionAssignmentStrategy]{ PartitionAssignmentStrategyCooperativeSticky, }
var DebugContextValues = ValidValues[DebugContext]{ DebugContextGeneric, DebugContextBroker, DebugContextTopic, DebugContextMetadata, DebugContextFeature, DebugContextQueue, DebugContextMessage, DebugContextProtocol, DebugContextConsumerGroup, DebugContextSecurity, DebugContextFetch, DebugContextInterceptor, DebugContextPlugin, DebugContextConsumer, DebugContextAdmin, DebugContextIdempotentProducer, DebugContextMock, DebugContextAssignor, DebugContextConfig, DebugContextAll, }
var EagerPartitionAssignmentStrategyValues = ValidValues[PartitionAssignmentStrategy]{ PartitionAssignmentStrategyRange, PartitionAssignmentStrategyRoundRobin, }
var PartitionAssignmentStrategyValues = ValidValues[PartitionAssignmentStrategy]{ PartitionAssignmentStrategyRange, PartitionAssignmentStrategyRoundRobin, PartitionAssignmentStrategyCooperativeSticky, }
Functions ¶
func ConsumeLogChannel ¶
func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)
ConsumeLogChannel is supposed to be called in a goroutine. It consumes a log channel returned by a LogEmitter.
func LogProcessor ¶
func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))
LogProcessor consumes logs from a LogEmitter and passes them to an slog.Logger.
Types ¶
type AdminClient ¶
type AdminClient interface { CreateTopics(ctx context.Context, topics []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error) DeleteTopics(ctx context.Context, topics []string, options ...kafka.DeleteTopicsAdminOption) ([]kafka.TopicResult, error) }
type AdminConfig ¶
type AdminConfig struct {
CommonConfigParams
}
func (AdminConfig) AsConfigMap ¶
func (c AdminConfig) AsConfigMap() (kafka.ConfigMap, error)
func (AdminConfig) Validate ¶
func (c AdminConfig) Validate() error
type AutoOffsetReset ¶
type AutoOffsetReset string
const ( // AutoOffsetResetSmallest automatically reset the offset to the smallest offset. AutoOffsetResetSmallest AutoOffsetReset = "smallest" // AutoOffsetResetEarliest automatically reset the offset to the smallest offset. AutoOffsetResetEarliest AutoOffsetReset = "earliest" // AutoOffsetResetBeginning automatically reset the offset to the smallest offset. AutoOffsetResetBeginning AutoOffsetReset = "beginning" // AutoOffsetResetLargest automatically reset the offset to the largest offset. AutoOffsetResetLargest AutoOffsetReset = "largest" // AutoOffsetResetLatest automatically reset the offset to the largest offset. AutoOffsetResetLatest AutoOffsetReset = "latest" // AutoOffsetResetEnd automatically reset the offset to the largest offset. AutoOffsetResetEnd AutoOffsetReset = "end" // AutoOffsetResetError trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by // consuming messages and checking 'message->err' AutoOffsetResetError AutoOffsetReset = "error" )
func (AutoOffsetReset) String ¶
func (s AutoOffsetReset) String() string
func (*AutoOffsetReset) UnmarshalJSON ¶
func (s *AutoOffsetReset) UnmarshalJSON(data []byte) error
func (*AutoOffsetReset) UnmarshalText ¶
func (s *AutoOffsetReset) UnmarshalText(text []byte) error
type BrokerAddressFamily ¶
type BrokerAddressFamily string
const ( BrokerAddressFamilyAny BrokerAddressFamily = "any" BrokerAddressFamilyIPv4 BrokerAddressFamily = "v4" BrokerAddressFamilyIPv6 BrokerAddressFamily = "v6" )
func (BrokerAddressFamily) String ¶
func (s BrokerAddressFamily) String() string
func (*BrokerAddressFamily) UnmarshalJSON ¶
func (s *BrokerAddressFamily) UnmarshalJSON(data []byte) error
func (*BrokerAddressFamily) UnmarshalText ¶
func (s *BrokerAddressFamily) UnmarshalText(text []byte) error
type CommonConfigParams ¶
type CommonConfigParams struct { Brokers string SecurityProtocol string SaslMechanisms string SaslUsername string SaslPassword string StatsInterval TimeDurationMilliSeconds // BrokerAddressFamily defines the IP address family to be used for network communication with Kafka cluster BrokerAddressFamily BrokerAddressFamily // SocketKeepAliveEnable defines if TCP socket keep-alive is enabled to prevent closing idle connections // by Kafka brokers. SocketKeepAliveEnabled bool // TopicMetadataRefreshInterval defines how frequently the Kafka client needs to fetch metadata information // (brokers, topic, partitions, etc) from the Kafka cluster. // The 5 minutes default value is appropriate for mostly static Kafka clusters, but needs to be lowered // in case of large clusters where changes are more frequent. // This value must not be set to value lower than 10s. TopicMetadataRefreshInterval TimeDurationMilliSeconds // Enable contexts for extensive debugging of librdkafka. // See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts DebugContexts DebugContexts // ClientID sets the Consumer/Producer identifier ClientID string }
func (CommonConfigParams) AsConfigMap ¶
func (c CommonConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (CommonConfigParams) Validate ¶
func (c CommonConfigParams) Validate() error
type ConfigMapper ¶
type ConfigValidator ¶
type ConfigValidator interface {
Validate() error
}
type ConsumerConfig ¶
type ConsumerConfig struct { CommonConfigParams ConsumerConfigParams }
func (ConsumerConfig) AsConfigMap ¶
func (c ConsumerConfig) AsConfigMap() (kafka.ConfigMap, error)
func (ConsumerConfig) Validate ¶
func (c ConsumerConfig) Validate() error
type ConsumerConfigParams ¶
type ConsumerConfigParams struct { // ConsumerGroupID defines the group id. All clients sharing the same ConsumerGroupID belong to the same group. ConsumerGroupID string // ConsumerGroupInstanceID defines the instance id in consumer group. Setting this parameter enables static group membership. // Static group members are able to leave and rejoin a group within the configured SessionTimeout without prompting a group rebalance. // This should be used in combination with a larger session.timeout.ms to avoid group rebalances caused by transient unavailability (e.g. process restarts). ConsumerGroupInstanceID string // SessionTimeout defines the consumer group session and failure detection timeout. // The consumer sends periodic heartbeats (HeartbeatInterval) to indicate its liveness to the broker. // If no hearts are received by the broker for a group member within the session timeout, // the broker will remove the consumer from the group and trigger a rebalance. SessionTimeout TimeDurationMilliSeconds // Defines the consumer group session keepalive heartbeat interval. HeartbeatInterval TimeDurationMilliSeconds // EnableAutoCommit enables automatically and periodically commit offsets in the background. EnableAutoCommit bool // EnableAutoOffsetStore enables automatically store offset of last message provided to application. // The offset store is an in-memory store of the next offset to (auto-)commit for each partition. EnableAutoOffsetStore bool // AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range: // * "smallest","earliest","beginning": automatically reset the offset to the smallest offset // * "largest","latest","end": automatically reset the offset to the largest offset // * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'. AutoOffsetReset AutoOffsetReset // PartitionAssignmentStrategy defines one or more partition assignment strategies. // The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. // If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). // Cooperative and non-cooperative (eager) strategies must not be mixed. // Available strategies: PartitionAssignmentStrategyRange, PartitionAssignmentStrategyRoundRobin, PartitionAssignmentStrategyCooperativeSticky. PartitionAssignmentStrategy PartitionAssignmentStrategies // The maximum delay between invocations of poll() when using consumer group management. // This places an upper bound on the amount of time that the consumer can be idle before fetching more records. // If poll() is not called before expiration of this timeout, then the consumer is considered failed and // the group will rebalance in order to reassign the partitions to another member. // See https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#max-poll-interval-ms MaxPollInterval TimeDurationMilliSeconds }
func (ConsumerConfigParams) AsConfigMap ¶
func (c ConsumerConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (ConsumerConfigParams) Validate ¶
func (c ConsumerConfigParams) Validate() error
type DebugContext ¶
type DebugContext string
const ( // DebugContextGeneric enables generic client instance level debugging. // Includes initialization and termination debugging. // Client Type: producer, consumer DebugContextGeneric DebugContext = "generic" // DebugContextBroker enables broker and connection state debugging. // Client Type: producer, consumer DebugContextBroker DebugContext = "broker" // DebugContextTopic enables topic and partition state debugging. Includes leader changes. // Client Type: producer, consumer DebugContextTopic DebugContext = "topic" // DebugContextMetadata enables cluster and topic metadata retrieval debugging. // Client Type: producer, consumer DebugContextMetadata DebugContext = "metadata" // DebugContextFeature enables Kafka protocol feature support as negotiated with the broker. // Client Type: producer, consumer DebugContextFeature DebugContext = "feature" // DebugContextQueue enables message queue debugging. // Client Type: producer DebugContextQueue DebugContext = "queue" // DebugContextMessage enables message debugging. Includes information about batching, compression, sizes, etc. // Client Type: producer, consumer DebugContextMessage DebugContext = "msg" // DebugContextProtocol enables Kafka protocol request/response debugging. Includes latency (rtt) printouts. // Client Type: producer, consumer DebugContextProtocol DebugContext = "protocol" // DebugContextConsumerGroup enables low-level consumer group state debugging. // Client Type: consumer DebugContextConsumerGroup DebugContext = "cgrp" // DebugContextSecurity enables security and authentication debugging. // Client Type: producer, consumer DebugContextSecurity DebugContext = "security" // DebugContextFetch enables consumer message fetch debugging. Includes decision when and why messages are fetched. // Client Type: consumer DebugContextFetch DebugContext = "fetch" // DebugContextInterceptor enables interceptor interface debugging. // Client Type: producer, consumer DebugContextInterceptor DebugContext = "interceptor" // DebugContextPlugin enables plugin loading debugging. // Client Type: producer, consumer DebugContextPlugin DebugContext = "plugin" // DebugContextConsumer enables high-level consumer debugging. // Client Type: consumer DebugContextConsumer DebugContext = "consumer" // DebugContextAdmin enables admin API debugging. // Client Type: admin DebugContextAdmin DebugContext = "admin" // DebugContextIdempotentProducer enables idempotent Producer debugging. // Client Type: producer DebugContextIdempotentProducer DebugContext = "eos" // DebugContextMock enables mock cluster functionality debugging. // Client Type: producer, consumer DebugContextMock DebugContext = "mock" // DebugContextAssignor enables detailed consumer group partition assignor debugging. // Client Type: consumer DebugContextAssignor DebugContext = "assignor" // DebugContextConfig enables displaying set configuration properties on startup. // Client Type: producer, consumer DebugContextConfig DebugContext = "conf" // DebugContextAll enables all of the above. // Client Type: producer, consumer DebugContextAll DebugContext = "all" )
func (DebugContext) String ¶
func (c DebugContext) String() string
func (*DebugContext) UnmarshalJSON ¶
func (c *DebugContext) UnmarshalJSON(data []byte) error
func (*DebugContext) UnmarshalText ¶
func (c *DebugContext) UnmarshalText(text []byte) error
type DebugContexts ¶
type DebugContexts []DebugContext
func (DebugContexts) String ¶
func (d DebugContexts) String() string
type LogEmitter ¶
LogEmitter emits logs from a kafka.Consumer or kafka.Producer.
Requires `go.logs.channel.enable` option set to true.
This feature was implemented in this PR.
type PartitionAssignmentStrategies ¶
type PartitionAssignmentStrategies []PartitionAssignmentStrategy
PartitionAssignmentStrategies one or more partition assignment strategies. If there is more than one eligible strategy, preference is determined by the configured order of strategies. IMPORTANT: cooperative and non-cooperative (eager) strategies must NOT be mixed.
func (PartitionAssignmentStrategies) String ¶
func (p PartitionAssignmentStrategies) String() string
type PartitionAssignmentStrategy ¶
type PartitionAssignmentStrategy string
const ( // PartitionAssignmentStrategyRange assigns partitions on a per-topic basis. PartitionAssignmentStrategyRange PartitionAssignmentStrategy = "range" // PartitionAssignmentStrategyRoundRobin assigns partitions to consumers in a round-robin fashion. PartitionAssignmentStrategyRoundRobin PartitionAssignmentStrategy = "roundrobin" // PartitionAssignmentStrategyCooperativeSticky guarantees an assignment that is maximally balanced while preserving // as many existing partition assignments as possible while allowing cooperative rebalancing. PartitionAssignmentStrategyCooperativeSticky PartitionAssignmentStrategy = "cooperative-sticky" )
func (PartitionAssignmentStrategy) String ¶
func (c PartitionAssignmentStrategy) String() string
func (*PartitionAssignmentStrategy) UnmarshalJSON ¶
func (c *PartitionAssignmentStrategy) UnmarshalJSON(data []byte) error
func (*PartitionAssignmentStrategy) UnmarshalText ¶
func (c *PartitionAssignmentStrategy) UnmarshalText(text []byte) error
type Partitioner ¶
type Partitioner string
const ( // PartitionerRandom uses random distribution. PartitionerRandom Partitioner = "random" // PartitionerConsistent uses the CRC32 hash of key while Empty and NULL keys are mapped to single partition. PartitionerConsistent Partitioner = "consistent" // PartitionerConsistentRandom uses CRC32 hash of key while Empty and NULL keys are randomly partitioned. PartitionerConsistentRandom Partitioner = "consistent_random" // PartitionerMurmur2 uses Java Producer compatible Murmur2 hash of key while NULL keys are mapped to single partition. PartitionerMurmur2 Partitioner = "murmur2" // PartitionerMurmur2Random uses Java Producer compatible Murmur2 hash of key whileNULL keys are randomly partitioned. // This is functionally equivalent to the default partitioner in the Java Producer. PartitionerMurmur2Random Partitioner = "murmur2_random" // PartitionerFnv1a uses FNV-1a hash of key whileNULL keys are mapped to single partition. PartitionerFnv1a Partitioner = "fnv1a" // PartitionerFnv1aRandom uses FNV-1a hash of key whileNULL keys are randomly partitioned. PartitionerFnv1aRandom Partitioner = "fnv1a_random" )
func (Partitioner) String ¶
func (s Partitioner) String() string
func (*Partitioner) UnmarshalJSON ¶
func (s *Partitioner) UnmarshalJSON(data []byte) error
func (*Partitioner) UnmarshalText ¶
func (s *Partitioner) UnmarshalText(text []byte) error
type ProducerConfig ¶
type ProducerConfig struct { CommonConfigParams ProducerConfigParams }
func (ProducerConfig) AsConfigMap ¶
func (c ProducerConfig) AsConfigMap() (kafka.ConfigMap, error)
func (ProducerConfig) Validate ¶
func (c ProducerConfig) Validate() error
type ProducerConfigParams ¶
type ProducerConfigParams struct { // Partitioner defines the algorithm used for assigning topic partition for produced message based its partition key. Partitioner Partitioner }
func (ProducerConfigParams) AsConfigMap ¶
func (p ProducerConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (ProducerConfigParams) Validate ¶
func (p ProducerConfigParams) Validate() error
type TimeDurationMilliSeconds ¶
func (TimeDurationMilliSeconds) Duration ¶
func (d TimeDurationMilliSeconds) Duration() time.Duration
func (TimeDurationMilliSeconds) String ¶
func (d TimeDurationMilliSeconds) String() string
func (*TimeDurationMilliSeconds) UnmarshalJSON ¶
func (d *TimeDurationMilliSeconds) UnmarshalJSON(data []byte) error
func (*TimeDurationMilliSeconds) UnmarshalText ¶
func (d *TimeDurationMilliSeconds) UnmarshalText(text []byte) error
type TopicConfig ¶
type TopicConfig struct { Name string Partitions int Replicas int RetentionTime TimeDurationMilliSeconds }
func (TopicConfig) Validate ¶
func (c TopicConfig) Validate() error
type TopicProvisioner ¶
type TopicProvisioner interface { Provision(ctx context.Context, topics ...TopicConfig) error DeProvision(ctx context.Context, topics ...string) error }
func NewTopicProvisioner ¶
func NewTopicProvisioner(config TopicProvisionerConfig) (TopicProvisioner, error)
NewTopicProvisioner returns a new TopicProvisioner.
type TopicProvisionerConfig ¶
type TopicProvisionerConfig struct { AdminClient AdminClient Logger *slog.Logger Meter metric.Meter // CacheSize stores he maximum number of entries stored in topic cache at a time which after the least recently used is evicted. // Setting it to 0 makes the cache size unlimited. CacheSize int // CacheTTL stores maximum time an entries is kept in cache before being evicted. // Setting it to 0 disables cache entry expiration. CacheTTL time.Duration // ProtectedTopics defines a list of topics which are protected from deletion. ProtectedTopics []string }
type ValidValues ¶
type ValidValues[T comparable] []T
func (ValidValues[T]) AsKeyMap ¶
func (v ValidValues[T]) AsKeyMap() map[T]struct{}