kafka

package
v1.0.0-beta.181 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package kafka implements tools to work with kafka Producers and Consumers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConsumeLogChannel

func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)

ConsumeLogChannel is supposed to be called in a goroutine. It consumes a log channel returned by a LogEmitter.

func LogProcessor

func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))

LogProcessor consumes logs from a LogEmitter and passes them to an slog.Logger.

func ProvisionTopic

func ProvisionTopic(ctx context.Context, adminClient *kafka.AdminClient, logger *slog.Logger, topic string, partitions int) error

Types

type BrokerAddressFamily

type BrokerAddressFamily string
const (
	BrokerAddressFamilyAny  BrokerAddressFamily = "any"
	BrokerAddressFamilyIPv4 BrokerAddressFamily = "v4"
	BrokerAddressFamilyIPv6 BrokerAddressFamily = "v6"
)

func (BrokerAddressFamily) String

func (s BrokerAddressFamily) String() string

func (*BrokerAddressFamily) UnmarshalJSON

func (s *BrokerAddressFamily) UnmarshalJSON(data []byte) error

func (*BrokerAddressFamily) UnmarshalText

func (s *BrokerAddressFamily) UnmarshalText(text []byte) error

type CommonConfigParams

type CommonConfigParams struct {
	Brokers          string
	SecurityProtocol string
	SaslMechanisms   string
	SaslUsername     string
	SaslPassword     string

	StatsInterval TimeDurationMilliSeconds

	// BrokerAddressFamily defines the IP address family to be used for network communication with Kafka cluster
	BrokerAddressFamily BrokerAddressFamily
	// SocketKeepAliveEnable defines if TCP socket keep-alive is enabled to prevent closing idle connections
	// by Kafka brokers.
	SocketKeepAliveEnabled bool
	// TopicMetadataRefreshInterval defines how frequently the Kafka client needs to fetch metadata information
	// (brokers, topic, partitions, etc) from the Kafka cluster.
	// The 5 minutes default value is appropriate for mostly static Kafka clusters, but needs to be lowered
	// in case of large clusters where changes are more frequent.
	// This value must not be set to value lower than 10s.
	TopicMetadataRefreshInterval TimeDurationMilliSeconds

	// Enable contexts for extensive debugging of librdkafka.
	// See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
	DebugContexts DebugContexts

	// ClientID sets the Consumer/Producer identifier
	ClientID string
}

func (CommonConfigParams) AsConfigMap

func (c CommonConfigParams) AsConfigMap() (kafka.ConfigMap, error)

func (CommonConfigParams) Validate

func (c CommonConfigParams) Validate() error

type ConfigMapper

type ConfigMapper interface {
	AsConfigMap() (kafka.ConfigMap, error)
}

type ConfigValidator

type ConfigValidator interface {
	Validate() error
}

type ConsumerConfig

type ConsumerConfig struct {
	CommonConfigParams
	ConsumerConfigParams
}

func (ConsumerConfig) AsConfigMap

func (c ConsumerConfig) AsConfigMap() (kafka.ConfigMap, error)

func (ConsumerConfig) Validate

func (c ConsumerConfig) Validate() error

type ConsumerConfigParams

type ConsumerConfigParams struct {
	// ConsumerGroupID defines the group id. All clients sharing the same ConsumerGroupID belong to the same group.
	ConsumerGroupID string
	// ConsumerGroupInstanceID defines the instance id in consumer group. Setting this parameter enables static group membership.
	// Static group members are able to leave and rejoin a group within the configured SessionTimeout without prompting a group rebalance.
	// This should be used in combination with a larger session.timeout.ms to avoid group rebalances caused by transient unavailability (e.g. process restarts).
	ConsumerGroupInstanceID string

	// SessionTimeout defines the consumer group session and failure detection timeout.
	// The consumer sends periodic heartbeats (HeartbeatInterval) to indicate its liveness to the broker.
	// If no hearts are received by the broker for a group member within the session timeout,
	// the broker will remove the consumer from the group and trigger a rebalance.
	SessionTimeout TimeDurationMilliSeconds
	// Defines the consumer group session keepalive heartbeat interval.
	HeartbeatInterval TimeDurationMilliSeconds

	// EnableAutoCommit enables automatically and periodically commit offsets in the background.
	EnableAutoCommit bool
	// EnableAutoOffsetStore enables automatically store offset of last message provided to application.
	// The offset store is an in-memory store of the next offset to (auto-)commit for each partition.
	EnableAutoOffsetStore bool
	// AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range:
	// * "smallest","earliest","beginning": automatically reset the offset to the smallest offset
	// * "largest","latest","end": automatically reset the offset to the largest offset
	// * "error":  trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
	AutoOffsetReset string
}

func (ConsumerConfigParams) AsConfigMap

func (c ConsumerConfigParams) AsConfigMap() (kafka.ConfigMap, error)

func (ConsumerConfigParams) Validate

func (c ConsumerConfigParams) Validate() error

type DebugContext

type DebugContext string
const (
	// DebugContextGeneric enables generic client instance level debugging.
	// Includes initialization and termination debugging.
	// Client Type: producer, consumer
	DebugContextGeneric DebugContext = "generic"
	// DebugContextBroker enables broker and connection state debugging.
	// Client Type: producer, consumer
	DebugContextBroker DebugContext = "broker"
	// DebugContextTopic enables topic and partition state debugging. Includes leader changes.
	// Client Type: producer, consumer
	DebugContextTopic DebugContext = "topic"
	// DebugContextMetadata enables cluster and topic metadata retrieval debugging.
	// Client Type: producer, consumer
	DebugContextMetadata DebugContext = "metadata"
	// DebugContextFeature enables Kafka protocol feature support as negotiated with the broker.
	// Client Type: producer, consumer
	DebugContextFeature DebugContext = "feature"
	// DebugContextQueue enables message queue debugging.
	// Client Type: producer
	DebugContextQueue DebugContext = "queue"
	// DebugContextMessage enables message debugging. Includes information about batching, compression, sizes, etc.
	// Client Type: producer, consumer
	DebugContextMessage DebugContext = "msg"
	// DebugContextProtocol enables Kafka protocol request/response debugging. Includes latency (rtt) printouts.
	// Client Type: producer, consumer
	DebugContextProtocol DebugContext = "protocol"
	// DebugContextConsumerGroup enables low-level consumer group state debugging.
	// Client Type: consumer
	DebugContextConsumerGroup DebugContext = "cgrp"
	// DebugContextSecurity enables security and authentication debugging.
	// Client Type: producer, consumer
	DebugContextSecurity DebugContext = "security"
	// DebugContextFetch enables consumer message fetch debugging. Includes decision when and why messages are fetched.
	// Client Type: consumer
	DebugContextFetch DebugContext = "fetch"
	// DebugContextInterceptor enables interceptor interface debugging.
	// Client Type: producer, consumer
	DebugContextInterceptor DebugContext = "interceptor"
	// DebugContextPlugin enables plugin loading debugging.
	// Client Type: producer, consumer
	DebugContextPlugin DebugContext = "plugin"
	// DebugContextConsumer enables high-level consumer debugging.
	// Client Type: consumer
	DebugContextConsumer DebugContext = "consumer"
	// DebugContextAdmin enables admin API debugging.
	// Client Type: admin
	DebugContextAdmin DebugContext = "admin"
	// DebugContextIdempotentProducer enables idempotent Producer debugging.
	// Client Type: producer
	DebugContextIdempotentProducer DebugContext = "eos"
	// DebugContextMock enables mock cluster functionality debugging.
	// Client Type: producer, consumer
	DebugContextMock DebugContext = "mock"
	// DebugContextAssignor enables detailed consumer group partition assignor debugging.
	// Client Type: consumer
	DebugContextAssignor DebugContext = "assignor"
	// DebugContextConfig enables displaying set configuration properties on startup.
	// Client Type: producer, consumer
	DebugContextConfig DebugContext = "conf"
	// DebugContextAll enables all of the above.
	// Client Type: producer, consumer
	DebugContextAll DebugContext = "all"
)

func (DebugContext) String

func (c DebugContext) String() string

func (*DebugContext) UnmarshalJSON

func (c *DebugContext) UnmarshalJSON(data []byte) error

func (*DebugContext) UnmarshalText

func (c *DebugContext) UnmarshalText(text []byte) error

type DebugContexts

type DebugContexts []DebugContext

func (DebugContexts) String

func (d DebugContexts) String() string

type LogEmitter

type LogEmitter interface {
	Logs() chan kafka.LogEvent
}

LogEmitter emits logs from a kafka.Consumer or kafka.Producer.

Requires `go.logs.channel.enable` option set to true.

This feature was implemented in this PR.

type ProducerConfig

type ProducerConfig struct {
	CommonConfigParams
	ProducerConfigParams
}

func (ProducerConfig) AsConfigMap

func (c ProducerConfig) AsConfigMap() (kafka.ConfigMap, error)

func (ProducerConfig) Validate

func (c ProducerConfig) Validate() error

type ProducerConfigParams

type ProducerConfigParams struct{}

func (ProducerConfigParams) AsConfigMap

func (p ProducerConfigParams) AsConfigMap() (kafka.ConfigMap, error)

func (ProducerConfigParams) Validate

func (p ProducerConfigParams) Validate() error

type TimeDurationMilliSeconds

type TimeDurationMilliSeconds time.Duration

func (TimeDurationMilliSeconds) Duration

func (d TimeDurationMilliSeconds) Duration() time.Duration

func (TimeDurationMilliSeconds) String

func (d TimeDurationMilliSeconds) String() string

func (*TimeDurationMilliSeconds) UnmarshalJSON

func (d *TimeDurationMilliSeconds) UnmarshalJSON(data []byte) error

func (*TimeDurationMilliSeconds) UnmarshalText

func (d *TimeDurationMilliSeconds) UnmarshalText(text []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL