kafka

package
v1.0.0-beta.191 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package kafka implements tools to work with kafka Producers and Consumers.

Index

Constants

This section is empty.

Variables

Functions

func ConsumeLogChannel

func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)

ConsumeLogChannel is supposed to be called in a goroutine. It consumes a log channel returned by a LogEmitter.

func LogProcessor

func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))

LogProcessor consumes logs from a LogEmitter and passes them to an slog.Logger.

Types

type AdminClient

type AdminClient interface {
	CreateTopics(ctx context.Context, topics []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error)
	DeleteTopics(ctx context.Context, topics []string, options ...kafka.DeleteTopicsAdminOption) ([]kafka.TopicResult, error)
}

type AdminConfig

type AdminConfig struct {
	CommonConfigParams
}

func (AdminConfig) AsConfigMap

func (c AdminConfig) AsConfigMap() (kafka.ConfigMap, error)

func (AdminConfig) Validate

func (c AdminConfig) Validate() error

type AutoOffsetReset

type AutoOffsetReset string
const (
	// AutoOffsetResetSmallest automatically reset the offset to the smallest offset.
	AutoOffsetResetSmallest AutoOffsetReset = "smallest"
	// AutoOffsetResetEarliest automatically reset the offset to the smallest offset.
	AutoOffsetResetEarliest AutoOffsetReset = "earliest"
	// AutoOffsetResetBeginning automatically reset the offset to the smallest offset.
	AutoOffsetResetBeginning AutoOffsetReset = "beginning"
	// AutoOffsetResetLargest automatically reset the offset to the largest offset.
	AutoOffsetResetLargest AutoOffsetReset = "largest"
	// AutoOffsetResetLatest automatically reset the offset to the largest offset.
	AutoOffsetResetLatest AutoOffsetReset = "latest"
	// AutoOffsetResetEnd automatically reset the offset to the largest offset.
	AutoOffsetResetEnd AutoOffsetReset = "end"
	// AutoOffsetResetError trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by
	// consuming messages and checking 'message->err'
	AutoOffsetResetError AutoOffsetReset = "error"
)

func (AutoOffsetReset) String

func (s AutoOffsetReset) String() string

func (*AutoOffsetReset) UnmarshalJSON

func (s *AutoOffsetReset) UnmarshalJSON(data []byte) error

func (*AutoOffsetReset) UnmarshalText

func (s *AutoOffsetReset) UnmarshalText(text []byte) error

type BrokerAddressFamily

type BrokerAddressFamily string
const (
	BrokerAddressFamilyAny  BrokerAddressFamily = "any"
	BrokerAddressFamilyIPv4 BrokerAddressFamily = "v4"
	BrokerAddressFamilyIPv6 BrokerAddressFamily = "v6"
)

func (BrokerAddressFamily) String

func (s BrokerAddressFamily) String() string

func (*BrokerAddressFamily) UnmarshalJSON

func (s *BrokerAddressFamily) UnmarshalJSON(data []byte) error

func (*BrokerAddressFamily) UnmarshalText

func (s *BrokerAddressFamily) UnmarshalText(text []byte) error

type CommonConfigParams

type CommonConfigParams struct {
	Brokers          string
	SecurityProtocol string
	SaslMechanisms   string
	SaslUsername     string
	SaslPassword     string

	StatsInterval TimeDurationMilliSeconds

	// BrokerAddressFamily defines the IP address family to be used for network communication with Kafka cluster
	BrokerAddressFamily BrokerAddressFamily
	// SocketKeepAliveEnable defines if TCP socket keep-alive is enabled to prevent closing idle connections
	// by Kafka brokers.
	SocketKeepAliveEnabled bool
	// TopicMetadataRefreshInterval defines how frequently the Kafka client needs to fetch metadata information
	// (brokers, topic, partitions, etc) from the Kafka cluster.
	// The 5 minutes default value is appropriate for mostly static Kafka clusters, but needs to be lowered
	// in case of large clusters where changes are more frequent.
	// This value must not be set to value lower than 10s.
	TopicMetadataRefreshInterval TimeDurationMilliSeconds

	// Enable contexts for extensive debugging of librdkafka.
	// See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
	DebugContexts DebugContexts

	// ClientID sets the Consumer/Producer identifier
	ClientID string
}

func (CommonConfigParams) AsConfigMap

func (c CommonConfigParams) AsConfigMap() (kafka.ConfigMap, error)

func (CommonConfigParams) Validate

func (c CommonConfigParams) Validate() error

type ConfigMapper

type ConfigMapper interface {
	AsConfigMap() (kafka.ConfigMap, error)
}

type ConfigValidator

type ConfigValidator interface {
	Validate() error
}

type ConsumerConfig

type ConsumerConfig struct {
	CommonConfigParams
	ConsumerConfigParams
}

func (ConsumerConfig) AsConfigMap

func (c ConsumerConfig) AsConfigMap() (kafka.ConfigMap, error)

func (ConsumerConfig) Validate

func (c ConsumerConfig) Validate() error

type ConsumerConfigParams

type ConsumerConfigParams struct {
	// ConsumerGroupID defines the group id. All clients sharing the same ConsumerGroupID belong to the same group.
	ConsumerGroupID string
	// ConsumerGroupInstanceID defines the instance id in consumer group. Setting this parameter enables static group membership.
	// Static group members are able to leave and rejoin a group within the configured SessionTimeout without prompting a group rebalance.
	// This should be used in combination with a larger session.timeout.ms to avoid group rebalances caused by transient unavailability (e.g. process restarts).
	ConsumerGroupInstanceID string

	// SessionTimeout defines the consumer group session and failure detection timeout.
	// The consumer sends periodic heartbeats (HeartbeatInterval) to indicate its liveness to the broker.
	// If no hearts are received by the broker for a group member within the session timeout,
	// the broker will remove the consumer from the group and trigger a rebalance.
	SessionTimeout TimeDurationMilliSeconds
	// Defines the consumer group session keepalive heartbeat interval.
	HeartbeatInterval TimeDurationMilliSeconds

	// EnableAutoCommit enables automatically and periodically commit offsets in the background.
	EnableAutoCommit bool
	// EnableAutoOffsetStore enables automatically store offset of last message provided to application.
	// The offset store is an in-memory store of the next offset to (auto-)commit for each partition.
	EnableAutoOffsetStore bool
	// AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range:
	// * "smallest","earliest","beginning": automatically reset the offset to the smallest offset
	// * "largest","latest","end": automatically reset the offset to the largest offset
	// * "error":  trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
	AutoOffsetReset AutoOffsetReset

	// PartitionAssignmentStrategy defines one or more partition assignment strategies.
	// The elected group leader will use a strategy supported by all members of the group to assign partitions to group members.
	// If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority).
	// Cooperative and non-cooperative (eager) strategies must not be mixed.
	// Available strategies: PartitionAssignmentStrategyRange, PartitionAssignmentStrategyRoundRobin, PartitionAssignmentStrategyCooperativeSticky.
	PartitionAssignmentStrategy PartitionAssignmentStrategies

	// The maximum delay between invocations of poll() when using consumer group management.
	// This places an upper bound on the amount of time that the consumer can be idle before fetching more records.
	// If poll() is not called before expiration of this timeout, then the consumer is considered failed and
	// the group will rebalance in order to reassign the partitions to another member.
	// See https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#max-poll-interval-ms
	MaxPollInterval TimeDurationMilliSeconds
}

func (ConsumerConfigParams) AsConfigMap

func (c ConsumerConfigParams) AsConfigMap() (kafka.ConfigMap, error)

func (ConsumerConfigParams) Validate

func (c ConsumerConfigParams) Validate() error

type DebugContext

type DebugContext string
const (
	// DebugContextGeneric enables generic client instance level debugging.
	// Includes initialization and termination debugging.
	// Client Type: producer, consumer
	DebugContextGeneric DebugContext = "generic"
	// DebugContextBroker enables broker and connection state debugging.
	// Client Type: producer, consumer
	DebugContextBroker DebugContext = "broker"
	// DebugContextTopic enables topic and partition state debugging. Includes leader changes.
	// Client Type: producer, consumer
	DebugContextTopic DebugContext = "topic"
	// DebugContextMetadata enables cluster and topic metadata retrieval debugging.
	// Client Type: producer, consumer
	DebugContextMetadata DebugContext = "metadata"
	// DebugContextFeature enables Kafka protocol feature support as negotiated with the broker.
	// Client Type: producer, consumer
	DebugContextFeature DebugContext = "feature"
	// DebugContextQueue enables message queue debugging.
	// Client Type: producer
	DebugContextQueue DebugContext = "queue"
	// DebugContextMessage enables message debugging. Includes information about batching, compression, sizes, etc.
	// Client Type: producer, consumer
	DebugContextMessage DebugContext = "msg"
	// DebugContextProtocol enables Kafka protocol request/response debugging. Includes latency (rtt) printouts.
	// Client Type: producer, consumer
	DebugContextProtocol DebugContext = "protocol"
	// DebugContextConsumerGroup enables low-level consumer group state debugging.
	// Client Type: consumer
	DebugContextConsumerGroup DebugContext = "cgrp"
	// DebugContextSecurity enables security and authentication debugging.
	// Client Type: producer, consumer
	DebugContextSecurity DebugContext = "security"
	// DebugContextFetch enables consumer message fetch debugging. Includes decision when and why messages are fetched.
	// Client Type: consumer
	DebugContextFetch DebugContext = "fetch"
	// DebugContextInterceptor enables interceptor interface debugging.
	// Client Type: producer, consumer
	DebugContextInterceptor DebugContext = "interceptor"
	// DebugContextPlugin enables plugin loading debugging.
	// Client Type: producer, consumer
	DebugContextPlugin DebugContext = "plugin"
	// DebugContextConsumer enables high-level consumer debugging.
	// Client Type: consumer
	DebugContextConsumer DebugContext = "consumer"
	// DebugContextAdmin enables admin API debugging.
	// Client Type: admin
	DebugContextAdmin DebugContext = "admin"
	// DebugContextIdempotentProducer enables idempotent Producer debugging.
	// Client Type: producer
	DebugContextIdempotentProducer DebugContext = "eos"
	// DebugContextMock enables mock cluster functionality debugging.
	// Client Type: producer, consumer
	DebugContextMock DebugContext = "mock"
	// DebugContextAssignor enables detailed consumer group partition assignor debugging.
	// Client Type: consumer
	DebugContextAssignor DebugContext = "assignor"
	// DebugContextConfig enables displaying set configuration properties on startup.
	// Client Type: producer, consumer
	DebugContextConfig DebugContext = "conf"
	// DebugContextAll enables all of the above.
	// Client Type: producer, consumer
	DebugContextAll DebugContext = "all"
)

func (DebugContext) String

func (c DebugContext) String() string

func (*DebugContext) UnmarshalJSON

func (c *DebugContext) UnmarshalJSON(data []byte) error

func (*DebugContext) UnmarshalText

func (c *DebugContext) UnmarshalText(text []byte) error

type DebugContexts

type DebugContexts []DebugContext

func (DebugContexts) String

func (d DebugContexts) String() string

type LogEmitter

type LogEmitter interface {
	Logs() chan kafka.LogEvent
}

LogEmitter emits logs from a kafka.Consumer or kafka.Producer.

Requires `go.logs.channel.enable` option set to true.

This feature was implemented in this PR.

type PartitionAssignmentStrategies

type PartitionAssignmentStrategies []PartitionAssignmentStrategy

PartitionAssignmentStrategies one or more partition assignment strategies. If there is more than one eligible strategy, preference is determined by the configured order of strategies. IMPORTANT: cooperative and non-cooperative (eager) strategies must NOT be mixed.

func (PartitionAssignmentStrategies) String

type PartitionAssignmentStrategy

type PartitionAssignmentStrategy string
const (
	// PartitionAssignmentStrategyRange assigns partitions on a per-topic basis.
	PartitionAssignmentStrategyRange PartitionAssignmentStrategy = "range"
	// PartitionAssignmentStrategyRoundRobin assigns partitions to consumers in a round-robin fashion.
	PartitionAssignmentStrategyRoundRobin PartitionAssignmentStrategy = "roundrobin"
	// PartitionAssignmentStrategyCooperativeSticky guarantees an assignment that is maximally balanced while preserving
	// as many existing partition assignments as possible while allowing cooperative rebalancing.
	PartitionAssignmentStrategyCooperativeSticky PartitionAssignmentStrategy = "cooperative-sticky"
)

func (PartitionAssignmentStrategy) String

func (*PartitionAssignmentStrategy) UnmarshalJSON

func (c *PartitionAssignmentStrategy) UnmarshalJSON(data []byte) error

func (*PartitionAssignmentStrategy) UnmarshalText

func (c *PartitionAssignmentStrategy) UnmarshalText(text []byte) error

type Partitioner

type Partitioner string
const (
	// PartitionerRandom uses random distribution.
	PartitionerRandom Partitioner = "random"
	// PartitionerConsistent uses the CRC32 hash of key while Empty and NULL keys are mapped to single partition.
	PartitionerConsistent Partitioner = "consistent"
	// PartitionerConsistentRandom uses CRC32 hash of key while Empty and NULL keys are randomly partitioned.
	PartitionerConsistentRandom Partitioner = "consistent_random"
	// PartitionerMurmur2 uses Java Producer compatible Murmur2 hash of key while NULL keys are mapped to single partition.
	PartitionerMurmur2 Partitioner = "murmur2"
	// PartitionerMurmur2Random uses Java Producer compatible Murmur2 hash of key whileNULL keys are randomly partitioned.
	// This is functionally equivalent to the default partitioner in the Java Producer.
	PartitionerMurmur2Random Partitioner = "murmur2_random"
	// PartitionerFnv1a uses FNV-1a hash of key whileNULL keys are mapped to single partition.
	PartitionerFnv1a Partitioner = "fnv1a"
	// PartitionerFnv1aRandom uses FNV-1a hash of key whileNULL keys are randomly partitioned.
	PartitionerFnv1aRandom Partitioner = "fnv1a_random"
)

func (Partitioner) String

func (s Partitioner) String() string

func (*Partitioner) UnmarshalJSON

func (s *Partitioner) UnmarshalJSON(data []byte) error

func (*Partitioner) UnmarshalText

func (s *Partitioner) UnmarshalText(text []byte) error

type ProducerConfig

type ProducerConfig struct {
	CommonConfigParams
	ProducerConfigParams
}

func (ProducerConfig) AsConfigMap

func (c ProducerConfig) AsConfigMap() (kafka.ConfigMap, error)

func (ProducerConfig) Validate

func (c ProducerConfig) Validate() error

type ProducerConfigParams

type ProducerConfigParams struct {
	// Partitioner defines the algorithm used for assigning topic partition for produced message based its partition key.
	Partitioner Partitioner
}

func (ProducerConfigParams) AsConfigMap

func (p ProducerConfigParams) AsConfigMap() (kafka.ConfigMap, error)

func (ProducerConfigParams) Validate

func (p ProducerConfigParams) Validate() error

type TimeDurationMilliSeconds

type TimeDurationMilliSeconds time.Duration

func (TimeDurationMilliSeconds) Duration

func (d TimeDurationMilliSeconds) Duration() time.Duration

func (TimeDurationMilliSeconds) String

func (d TimeDurationMilliSeconds) String() string

func (*TimeDurationMilliSeconds) UnmarshalJSON

func (d *TimeDurationMilliSeconds) UnmarshalJSON(data []byte) error

func (*TimeDurationMilliSeconds) UnmarshalText

func (d *TimeDurationMilliSeconds) UnmarshalText(text []byte) error

type TopicConfig

type TopicConfig struct {
	Name          string
	Partitions    int
	Replicas      int
	RetentionTime TimeDurationMilliSeconds
}

func (TopicConfig) Validate

func (c TopicConfig) Validate() error

type TopicProvisioner

type TopicProvisioner interface {
	Provision(ctx context.Context, topics ...TopicConfig) error
	DeProvision(ctx context.Context, topics ...string) error
}

func NewTopicProvisioner

func NewTopicProvisioner(config TopicProvisionerConfig) (TopicProvisioner, error)

NewTopicProvisioner returns a new TopicProvisioner.

type TopicProvisionerConfig

type TopicProvisionerConfig struct {
	AdminClient AdminClient
	Logger      *slog.Logger
	Meter       metric.Meter

	// CacheSize stores he maximum number of entries stored in topic cache at a time which after the least recently used is evicted.
	// Setting it to 0 makes the cache size unlimited.
	CacheSize int

	// CacheTTL stores maximum time an entries is kept in cache before being evicted.
	// Setting it to 0 disables cache entry expiration.
	CacheTTL time.Duration

	// ProtectedTopics defines a list of topics which are protected from deletion.
	ProtectedTopics []string
}

type ValidValues

type ValidValues[T comparable] []T

func (ValidValues[T]) AsKeyMap

func (v ValidValues[T]) AsKeyMap() map[T]struct{}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL