Versions in this module Expand all Collapse all v2 v2.2.0 Mar 10, 2025 Changes in this version type WorkOption + func WithWorkTextMapPropagator(p propagation.TextMapPropagator) WorkOption v2.1.1 Feb 13, 2025 v2.1.0 Jan 24, 2025 Changes in this version type WriterSettings + DisableTracePropagation bool v2.0.5 Jan 14, 2025 v2.0.4 Dec 19, 2024 v2.0.3 Nov 22, 2024 v2.0.2 Nov 19, 2024 v2.0.1 Oct 29, 2024 v2.0.0 Oct 19, 2024 Changes in this version + const AvroSchemaRegistry + const CustomFmt + const JSONSchemaRegistry + const ProtoSchemaRegistry + type Assignment struct + Partition int32 + Topic string + type Client struct + func NewClient(conf Config, opts ...Option) *Client + func (c *Client) Close() error + func (c *Client) Reader(_ context.Context, topicConfig ConsumerTopicConfig, opts ...ReaderOption) (Reader, error) + func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts ...WriterOption) (Writer, error) + type ClientProvider interface + Close func() error + Reader func(ctx context.Context, topicConfig ConsumerTopicConfig, opts ...ReaderOption) (Reader, error) + Writer func(ctx context.Context, topicConfig ProducerTopicConfig, opts ...WriterOption) (Writer, error) + type Config struct + BootstrapServers []string + CAFile string + CertFile string + KeyFile string + SaslPassword *string + SaslUsername *string + type ConsumerTopicConfig struct + AdditionalProps map[string]interface{} + AutoCommitIntervalMs *int + BootstrapServers []string + ClientID string + DeadLetterTopicConfig *ProducerTopicConfig + Formatter zfmt.FormatterType + GroupID string + MaxPollIntervalMillis *int + ProcessDelayMillis *int + ProcessTimeoutMillis *int + ReadTimeoutMillis *int + SaslPassword *string + SaslUsername *string + SchemaID int + SchemaRegistry SchemaRegistryConfig + SessionTimeoutMillis *int + Topic string + Topics []string + Transaction bool + func (p ConsumerTopicConfig) GetFormatter() zfmt.FormatterType + func (p ConsumerTopicConfig) GetSchemaID() int + type DeserializationConfig struct + Schema string + type FakeClient struct + R Reader + W Writer + func (f FakeClient) Close() error + func (f FakeClient) Reader(_ context.Context, _ ConsumerTopicConfig, _ ...ReaderOption) (Reader, error) + func (f FakeClient) Writer(_ context.Context, _ ProducerTopicConfig, _ ...WriterOption) (Writer, error) + type FakeMessage struct + DoneFunc func(ctx context.Context) + Fmt zfmt.Formatter + GroupID string + Headers map[string][]byte + Key *string + Offset int64 + Partition int32 + TimeStamp time.Time + Topic string + Value []byte + ValueData any + type Formatter interface + Marshall func(v any) ([]byte, error) + Unmarshal func(b []byte, v any) error + type KReader struct + func (r *KReader) Assignments(_ context.Context) ([]Assignment, error) + func (r *KReader) Close() error + func (r *KReader) Read(ctx context.Context) (*Message, error) + type KWriter struct + func (w *KWriter) Close() + func (w *KWriter) Write(ctx context.Context, value any, opts ...WriteOption) (Response, error) + func (w *KWriter) WriteKey(ctx context.Context, key string, value any, opts ...WriteOption) (Response, error) + func (w *KWriter) WriteRaw(ctx context.Context, key *string, value []byte, opts ...WriteOption) (Response, error) + type KafkaConsumer interface + Assignment func() (partitions []kafka.TopicPartition, err error) + AssignmentLost func() bool + Close func() error + Commit func() ([]kafka.TopicPartition, error) + ReadMessage func(timeout time.Duration) (*kafka.Message, error) + StoreOffsets func(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error) + SubscribeTopics func(topics []string, rebalanceCb kafka.RebalanceCb) error + type KafkaProducer interface + Close func() + Produce func(msg *kafka.Message, deliveryChan chan kafka.Event) error + type LifecycleHooks struct + PostAck func(ctx context.Context, meta LifecyclePostAckMeta) error + PostFanout func(ctx context.Context) + PostProcessing func(ctx context.Context, meta LifecyclePostProcessingMeta) error + PostRead func(ctx context.Context, meta LifecyclePostReadMeta) (context.Context, error) + PostReadImmediate func(ctx context.Context, meta LifecyclePostReadImmediateMeta) + PreProcessing func(ctx context.Context, meta LifecyclePreProcessingMeta) (context.Context, error) + PreWrite func(ctx context.Context, meta LifecyclePreWriteMeta) (LifecyclePreWriteResp, error) + func ChainLifecycleHooks(hooks ...LifecycleHooks) LifecycleHooks + type LifecycleHooksOption struct + type LifecyclePostAckMeta struct + ProduceTime time.Time + Topic string + type LifecyclePostProcessingMeta struct + GroupID string + Msg *Message + ProcessingTime time.Duration + ResponseErr error + Topic string + VirtualPartitionIndex int + type LifecyclePostReadImmediateMeta struct + Err error + Message *Message + type LifecyclePostReadMeta struct + GroupID string + Message *Message + Topic string + type LifecyclePreProcessingMeta struct + GroupID string + Message *Message + Topic string + TopicLag time.Duration + VirtualPartitionIndex int + type LifecyclePreWriteMeta struct + type LifecyclePreWriteResp struct + Headers map[string][]byte + type Logger interface + Debugw func(ctx context.Context, msg string, keysAndValues ...any) + Errorw func(ctx context.Context, msg string, keysAndValues ...any) + Infow func(ctx context.Context, msg string, keysAndValues ...any) + Warnw func(ctx context.Context, msg string, keysAndValues ...any) + type Message struct + GroupID string + Headers map[string][]byte + Key string + Offset int64 + Partition int32 + TimeStamp time.Time + Topic string + func GetFakeMessage(key string, value any, fmt zfmt.Formatter, doneFunc func()) *Message + func GetMsgFromFake(input *FakeMessage) *Message + func (m *Message) Decode(v any) error + func (m *Message) Done() + func (m *Message) DoneWithContext(ctx context.Context) + func (m *Message) Value() []byte + type NoopLogger struct + func (l NoopLogger) Debugw(_ context.Context, _ string, _ ...any) + func (l NoopLogger) Errorw(_ context.Context, _ string, _ ...any) + func (l NoopLogger) Infow(_ context.Context, _ string, _ ...any) + func (l NoopLogger) Warnw(_ context.Context, _ string, _ ...any) + type Option func(*Client) + func KafkaGroupPrefixOption(prefix string) Option + func LoggerOption(logger Logger) Option + func WithClientLifecycleHooks(h LifecycleHooks) Option + func WithClientTextMapPropagator(p propagation.TextMapPropagator) Option + func WithClientTracerProviderOption(tp trace.TracerProvider) Option + func WithConsumerProvider(provider func(config map[string]any) (KafkaConsumer, error)) Option + func WithProducerProvider(provider func(config map[string]any) (KafkaProducer, error)) Option + type ProcessError struct + DisableCircuitBreak bool + DisableDLTWrite bool + Err error + func (p ProcessError) Error() string + func (p ProcessError) Unwrap() error + type ProducerTopicConfig struct + AdditionalProps map[string]interface{} + BootstrapServers []string + ClientID string + DeliveryTimeoutMs *int + EnableIdempotence *bool + Formatter zfmt.FormatterType + LingerMillis int + NagleDisable *bool + RequestRequiredAcks *string + SaslPassword *string + SaslUsername *string + SchemaID int + SchemaRegistry SchemaRegistryConfig + Topic string + Transaction bool + func (p ProducerTopicConfig) GetFormatter() zfmt.FormatterType + func (p ProducerTopicConfig) GetSchemaID() int + type Reader interface + Close func() error + Read func(ctx context.Context) (*Message, error) + type ReaderOption func(*ReaderSettings) + func RFormatterOption(formatter Formatter) ReaderOption + type ReaderSettings struct + type Response struct + Offset int64 + Partition int32 + type SchemaRegistryConfig struct + Deserialization DeserializationConfig + Serialization SerializationConfig + SubjectName string + URL string + type SerializationConfig struct + AutoRegisterSchemas bool + Schema string + type TopicPartition struct + Offset int64 + Partition int32 + type Work struct + func (w *Work) Run(ctx context.Context, shutdown <-chan struct{}) error + type WorkFactory struct + func NewWorkFactory(kafkaProvider ClientProvider, options ...WorkFactoryOption) WorkFactory + func (f WorkFactory) Create(topicConfig ConsumerTopicConfig, processor processor, options ...WorkOption) *Work + func (f WorkFactory) CreateWithFunc(topicConfig ConsumerTopicConfig, p func(_ context.Context, msg *Message) error, ...) *Work + type WorkFactoryOption interface + func WithLogger(l Logger) WorkFactoryOption + func WithTextMapPropagator(p propagation.TextMapPropagator) WorkFactoryOption + func WithTracerProvider(tp trace.TracerProvider) WorkFactoryOption + func WithWorkLifecycleHooks(h LifecycleHooks) WorkFactoryOption + type WorkOption interface + func CircuitBreakAfter(times uint32) WorkOption + func CircuitBreakFor(duration time.Duration) WorkOption + func DisableBusyLoopBreaker() WorkOption + func DisableCircuitBreaker() WorkOption + func Speedup(times uint16) WorkOption + func WithDeadLetterTopic(deadLetterTopicConfig ProducerTopicConfig) WorkOption + func WithDisableBusyLoopBreaker(isDisabled bool) WorkOption + func WithDisableCircuitBreaker(isDisabled bool) WorkOption + func WithLifecycleHooks(h LifecycleHooks) WorkOption + func WithOnDone(f func(ctx context.Context, message *Message, err error)) WorkOption + type WriteOption interface + func WithHeaders(headers map[string]string) WriteOption + type Writer interface + Close func() + Write func(ctx context.Context, value any, opts ...WriteOption) (Response, error) + WriteKey func(ctx context.Context, key string, value any, opts ...WriteOption) (Response, error) + WriteRaw func(ctx context.Context, key *string, value []byte, opts ...WriteOption) (Response, error) + type WriterOption func(*WriterSettings) + func WFormatterOption(f Formatter) WriterOption + type WriterSettings struct Other modules containing this package github.com/zillow/zkafka