Documentation
¶
Overview ¶
Package kafka implements tools to work with kafka Producers and Consumers.
Index ¶
- func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)
- func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))
- func ProvisionTopic(ctx context.Context, adminClient *kafka.AdminClient, logger *slog.Logger, ...) error
- type BrokerAddressFamily
- type DebugContext
- type DebugContexts
- type LogEmitter
- type TimeDurationMilliSeconds
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConsumeLogChannel ¶
func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)
ConsumeLogChannel is supposed to be called in a goroutine. It consumes a log channel returned by a LogEmitter.
func LogProcessor ¶
func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))
LogProcessor consumes logs from a LogEmitter and passes them to an slog.Logger.
Types ¶
type BrokerAddressFamily ¶
type BrokerAddressFamily string
const ( BrokerAddressFamilyAny BrokerAddressFamily = "any" BrokerAddressFamilyIPv4 BrokerAddressFamily = "v4" BrokerAddressFamilyIPv6 BrokerAddressFamily = "v6" )
func (BrokerAddressFamily) String ¶
func (s BrokerAddressFamily) String() string
func (*BrokerAddressFamily) UnmarshalJSON ¶
func (s *BrokerAddressFamily) UnmarshalJSON(data []byte) error
func (*BrokerAddressFamily) UnmarshalText ¶
func (s *BrokerAddressFamily) UnmarshalText(text []byte) error
type DebugContext ¶
type DebugContext string
const ( // DebugContextGeneric enables generic client instance level debugging. // Includes initialization and termination debugging. // Client Type: producer, consumer DebugContextGeneric DebugContext = "generic" // DebugContextBroker enables broker and connection state debugging. // Client Type: producer, consumer DebugContextBroker DebugContext = "broker" // DebugContextTopic enables topic and partition state debugging. Includes leader changes. // Client Type: producer, consumer DebugContextTopic DebugContext = "topic" // DebugContextMetadata enables cluster and topic metadata retrieval debugging. // Client Type: producer, consumer DebugContextMetadata DebugContext = "metadata" // DebugContextFeature enables Kafka protocol feature support as negotiated with the broker. // Client Type: producer, consumer DebugContextFeature DebugContext = "feature" // DebugContextQueue enables message queue debugging. // Client Type: producer DebugContextQueue DebugContext = "queue" // DebugContextMessage enables message debugging. Includes information about batching, compression, sizes, etc. // Client Type: producer, consumer DebugContextMessage DebugContext = "msg" // DebugContextProtocol enables Kafka protocol request/response debugging. Includes latency (rtt) printouts. // Client Type: producer, consumer DebugContextProtocol DebugContext = "protocol" // DebugContextConsumerGroup enables low-level consumer group state debugging. // Client Type: consumer DebugContextConsumerGroup DebugContext = "cgrp" // DebugContextSecurity enables security and authentication debugging. // Client Type: producer, consumer DebugContextSecurity DebugContext = "security" // DebugContextFetch enables consumer message fetch debugging. Includes decision when and why messages are fetched. // Client Type: consumer DebugContextFetch DebugContext = "fetch" // DebugContextInterceptor enables interceptor interface debugging. // Client Type: producer, consumer DebugContextInterceptor DebugContext = "interceptor" // DebugContextPlugin enables plugin loading debugging. // Client Type: producer, consumer DebugContextPlugin DebugContext = "plugin" // DebugContextConsumer enables high-level consumer debugging. // Client Type: consumer DebugContextConsumer DebugContext = "consumer" // DebugContextAdmin enables admin API debugging. // Client Type: admin DebugContextAdmin DebugContext = "admin" // DebugContextIdempotentProducer enables idempotent Producer debugging. // Client Type: producer DebugContextIdempotentProducer DebugContext = "eos" // DebugContextMock enables mock cluster functionality debugging. // Client Type: producer, consumer DebugContextMock DebugContext = "mock" // DebugContextAssignor enables detailed consumer group partition assignor debugging. // Client Type: consumer DebugContextAssignor DebugContext = "assignor" // DebugContextConfig enables displaying set configuration properties on startup. // Client Type: producer, consumer DebugContextConfig DebugContext = "conf" // DebugContextAll enables all of the above. // Client Type: producer, consumer DebugContextAll DebugContext = "all" )
func (DebugContext) String ¶
func (c DebugContext) String() string
func (*DebugContext) UnmarshalJSON ¶
func (c *DebugContext) UnmarshalJSON(data []byte) error
func (*DebugContext) UnmarshalText ¶
func (c *DebugContext) UnmarshalText(text []byte) error
type DebugContexts ¶
type DebugContexts []DebugContext
func (DebugContexts) String ¶
func (d DebugContexts) String() string
type LogEmitter ¶
LogEmitter emits logs from a kafka.Consumer or kafka.Producer.
Requires `go.logs.channel.enable` option set to true.
This feature was implemented in this PR.
type TimeDurationMilliSeconds ¶
func (TimeDurationMilliSeconds) Duration ¶
func (d TimeDurationMilliSeconds) Duration() time.Duration
func (TimeDurationMilliSeconds) String ¶
func (d TimeDurationMilliSeconds) String() string
func (*TimeDurationMilliSeconds) UnmarshalJSON ¶
func (d *TimeDurationMilliSeconds) UnmarshalJSON(data []byte) error
func (*TimeDurationMilliSeconds) UnmarshalText ¶
func (d *TimeDurationMilliSeconds) UnmarshalText(text []byte) error
Click to show internal directories.
Click to hide internal directories.