Documentation
¶
Overview ¶
Package kafka implements tools to work with kafka Producers and Consumers.
Index ¶
- func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)
- func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))
- func ProvisionTopic(ctx context.Context, adminClient *kafka.AdminClient, logger *slog.Logger, ...) error
- type BrokerAddressFamily
- type CommonConfigParams
- type ConfigMapper
- type ConfigValidator
- type ConsumerConfig
- type ConsumerConfigParams
- type DebugContext
- type DebugContexts
- type LogEmitter
- type ProducerConfig
- type ProducerConfigParams
- type TimeDurationMilliSeconds
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConsumeLogChannel ¶
func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)
ConsumeLogChannel is supposed to be called in a goroutine. It consumes a log channel returned by a LogEmitter.
func LogProcessor ¶
func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))
LogProcessor consumes logs from a LogEmitter and passes them to an slog.Logger.
Types ¶
type BrokerAddressFamily ¶
type BrokerAddressFamily string
const ( BrokerAddressFamilyAny BrokerAddressFamily = "any" BrokerAddressFamilyIPv4 BrokerAddressFamily = "v4" BrokerAddressFamilyIPv6 BrokerAddressFamily = "v6" )
func (BrokerAddressFamily) String ¶
func (s BrokerAddressFamily) String() string
func (*BrokerAddressFamily) UnmarshalJSON ¶
func (s *BrokerAddressFamily) UnmarshalJSON(data []byte) error
func (*BrokerAddressFamily) UnmarshalText ¶
func (s *BrokerAddressFamily) UnmarshalText(text []byte) error
type CommonConfigParams ¶
type CommonConfigParams struct { Brokers string SecurityProtocol string SaslMechanisms string SaslUsername string SaslPassword string StatsInterval TimeDurationMilliSeconds // BrokerAddressFamily defines the IP address family to be used for network communication with Kafka cluster BrokerAddressFamily BrokerAddressFamily // SocketKeepAliveEnable defines if TCP socket keep-alive is enabled to prevent closing idle connections // by Kafka brokers. SocketKeepAliveEnabled bool // TopicMetadataRefreshInterval defines how frequently the Kafka client needs to fetch metadata information // (brokers, topic, partitions, etc) from the Kafka cluster. // The 5 minutes default value is appropriate for mostly static Kafka clusters, but needs to be lowered // in case of large clusters where changes are more frequent. // This value must not be set to value lower than 10s. TopicMetadataRefreshInterval TimeDurationMilliSeconds // Enable contexts for extensive debugging of librdkafka. // See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts DebugContexts DebugContexts // ClientID sets the Consumer/Producer identifier ClientID string }
func (CommonConfigParams) AsConfigMap ¶
func (c CommonConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (CommonConfigParams) Validate ¶
func (c CommonConfigParams) Validate() error
type ConfigMapper ¶
type ConfigValidator ¶
type ConfigValidator interface {
Validate() error
}
type ConsumerConfig ¶
type ConsumerConfig struct { CommonConfigParams ConsumerConfigParams }
func (ConsumerConfig) AsConfigMap ¶
func (c ConsumerConfig) AsConfigMap() (kafka.ConfigMap, error)
func (ConsumerConfig) Validate ¶
func (c ConsumerConfig) Validate() error
type ConsumerConfigParams ¶
type ConsumerConfigParams struct { // ConsumerGroupID defines the group id. All clients sharing the same ConsumerGroupID belong to the same group. ConsumerGroupID string // ConsumerGroupInstanceID defines the instance id in consumer group. Setting this parameter enables static group membership. // Static group members are able to leave and rejoin a group within the configured SessionTimeout without prompting a group rebalance. // This should be used in combination with a larger session.timeout.ms to avoid group rebalances caused by transient unavailability (e.g. process restarts). ConsumerGroupInstanceID string // SessionTimeout defines the consumer group session and failure detection timeout. // The consumer sends periodic heartbeats (HeartbeatInterval) to indicate its liveness to the broker. // If no hearts are received by the broker for a group member within the session timeout, // the broker will remove the consumer from the group and trigger a rebalance. SessionTimeout TimeDurationMilliSeconds // Defines the consumer group session keepalive heartbeat interval. HeartbeatInterval TimeDurationMilliSeconds // EnableAutoCommit enables automatically and periodically commit offsets in the background. EnableAutoCommit bool // EnableAutoOffsetStore enables automatically store offset of last message provided to application. // The offset store is an in-memory store of the next offset to (auto-)commit for each partition. EnableAutoOffsetStore bool // AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range: // * "smallest","earliest","beginning": automatically reset the offset to the smallest offset // * "largest","latest","end": automatically reset the offset to the largest offset // * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'. AutoOffsetReset string }
func (ConsumerConfigParams) AsConfigMap ¶
func (c ConsumerConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (ConsumerConfigParams) Validate ¶
func (c ConsumerConfigParams) Validate() error
type DebugContext ¶
type DebugContext string
const ( // DebugContextGeneric enables generic client instance level debugging. // Includes initialization and termination debugging. // Client Type: producer, consumer DebugContextGeneric DebugContext = "generic" // DebugContextBroker enables broker and connection state debugging. // Client Type: producer, consumer DebugContextBroker DebugContext = "broker" // DebugContextTopic enables topic and partition state debugging. Includes leader changes. // Client Type: producer, consumer DebugContextTopic DebugContext = "topic" // DebugContextMetadata enables cluster and topic metadata retrieval debugging. // Client Type: producer, consumer DebugContextMetadata DebugContext = "metadata" // DebugContextFeature enables Kafka protocol feature support as negotiated with the broker. // Client Type: producer, consumer DebugContextFeature DebugContext = "feature" // DebugContextQueue enables message queue debugging. // Client Type: producer DebugContextQueue DebugContext = "queue" // DebugContextMessage enables message debugging. Includes information about batching, compression, sizes, etc. // Client Type: producer, consumer DebugContextMessage DebugContext = "msg" // DebugContextProtocol enables Kafka protocol request/response debugging. Includes latency (rtt) printouts. // Client Type: producer, consumer DebugContextProtocol DebugContext = "protocol" // DebugContextConsumerGroup enables low-level consumer group state debugging. // Client Type: consumer DebugContextConsumerGroup DebugContext = "cgrp" // DebugContextSecurity enables security and authentication debugging. // Client Type: producer, consumer DebugContextSecurity DebugContext = "security" // DebugContextFetch enables consumer message fetch debugging. Includes decision when and why messages are fetched. // Client Type: consumer DebugContextFetch DebugContext = "fetch" // DebugContextInterceptor enables interceptor interface debugging. // Client Type: producer, consumer DebugContextInterceptor DebugContext = "interceptor" // DebugContextPlugin enables plugin loading debugging. // Client Type: producer, consumer DebugContextPlugin DebugContext = "plugin" // DebugContextConsumer enables high-level consumer debugging. // Client Type: consumer DebugContextConsumer DebugContext = "consumer" // DebugContextAdmin enables admin API debugging. // Client Type: admin DebugContextAdmin DebugContext = "admin" // DebugContextIdempotentProducer enables idempotent Producer debugging. // Client Type: producer DebugContextIdempotentProducer DebugContext = "eos" // DebugContextMock enables mock cluster functionality debugging. // Client Type: producer, consumer DebugContextMock DebugContext = "mock" // DebugContextAssignor enables detailed consumer group partition assignor debugging. // Client Type: consumer DebugContextAssignor DebugContext = "assignor" // DebugContextConfig enables displaying set configuration properties on startup. // Client Type: producer, consumer DebugContextConfig DebugContext = "conf" // DebugContextAll enables all of the above. // Client Type: producer, consumer DebugContextAll DebugContext = "all" )
func (DebugContext) String ¶
func (c DebugContext) String() string
func (*DebugContext) UnmarshalJSON ¶
func (c *DebugContext) UnmarshalJSON(data []byte) error
func (*DebugContext) UnmarshalText ¶
func (c *DebugContext) UnmarshalText(text []byte) error
type DebugContexts ¶
type DebugContexts []DebugContext
func (DebugContexts) String ¶
func (d DebugContexts) String() string
type LogEmitter ¶
LogEmitter emits logs from a kafka.Consumer or kafka.Producer.
Requires `go.logs.channel.enable` option set to true.
This feature was implemented in this PR.
type ProducerConfig ¶
type ProducerConfig struct { CommonConfigParams ProducerConfigParams }
func (ProducerConfig) AsConfigMap ¶
func (c ProducerConfig) AsConfigMap() (kafka.ConfigMap, error)
func (ProducerConfig) Validate ¶
func (c ProducerConfig) Validate() error
type ProducerConfigParams ¶
type ProducerConfigParams struct{}
func (ProducerConfigParams) AsConfigMap ¶
func (p ProducerConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (ProducerConfigParams) Validate ¶
func (p ProducerConfigParams) Validate() error
type TimeDurationMilliSeconds ¶
func (TimeDurationMilliSeconds) Duration ¶
func (d TimeDurationMilliSeconds) Duration() time.Duration
func (TimeDurationMilliSeconds) String ¶
func (d TimeDurationMilliSeconds) String() string
func (*TimeDurationMilliSeconds) UnmarshalJSON ¶
func (d *TimeDurationMilliSeconds) UnmarshalJSON(data []byte) error
func (*TimeDurationMilliSeconds) UnmarshalText ¶
func (d *TimeDurationMilliSeconds) UnmarshalText(text []byte) error
Click to show internal directories.
Click to hide internal directories.