Documentation
¶
Index ¶
- Constants
- type Assignment
- type Client
- type ClientProvider
- type Config
- type ConsumerTopicConfig
- type DeserializationConfig
- type FakeClient
- type FakeMessage
- type Formatter
- type KReader
- type KWriter
- func (w *KWriter) Close()
- func (w *KWriter) Write(ctx context.Context, value any, opts ...WriteOption) (Response, error)
- func (w *KWriter) WriteKey(ctx context.Context, key string, value any, opts ...WriteOption) (Response, error)
- func (w *KWriter) WriteRaw(ctx context.Context, key *string, value []byte, opts ...WriteOption) (Response, error)
- type KafkaConsumer
- type KafkaProducer
- type LifecycleHooks
- type LifecycleHooksOption
- type LifecyclePostAckMeta
- type LifecyclePostProcessingMeta
- type LifecyclePostReadImmediateMeta
- type LifecyclePostReadMeta
- type LifecyclePreProcessingMeta
- type LifecyclePreWriteMeta
- type LifecyclePreWriteResp
- type Logger
- type Message
- type NoopLogger
- type Option
- func KafkaGroupPrefixOption(prefix string) Option
- func LoggerOption(logger Logger) Option
- func WithClientLifecycleHooks(h LifecycleHooks) Option
- func WithClientTextMapPropagator(p propagation.TextMapPropagator) Option
- func WithClientTracerProviderOption(tp trace.TracerProvider) Option
- func WithConsumerProvider(provider func(config map[string]any) (KafkaConsumer, error)) Option
- func WithProducerProvider(provider func(config map[string]any) (KafkaProducer, error)) Option
- type ProcessError
- type ProducerTopicConfig
- type Reader
- type ReaderOption
- type ReaderSettings
- type Response
- type SchemaRegistryConfig
- type SerializationConfig
- type TopicPartition
- type Work
- type WorkFactory
- type WorkFactoryOption
- type WorkOption
- func CircuitBreakAfter(times uint32) WorkOption
- func CircuitBreakFor(duration time.Duration) WorkOption
- func DisableBusyLoopBreaker() WorkOptiondeprecated
- func DisableCircuitBreaker() WorkOptiondeprecated
- func Speedup(times uint16) WorkOption
- func WithDeadLetterTopic(deadLetterTopicConfig ProducerTopicConfig) WorkOption
- func WithDisableBusyLoopBreaker(isDisabled bool) WorkOption
- func WithDisableCircuitBreaker(isDisabled bool) WorkOption
- func WithLifecycleHooks(h LifecycleHooks) WorkOption
- func WithOnDone(f func(ctx context.Context, message *Message, err error)) WorkOption
- func WithWorkTextMapPropagator(p propagation.TextMapPropagator) WorkOption
- type WriteOption
- type Writer
- type WriterOption
- type WriterSettings
Constants ¶
const ( // CustomFmt indicates that the user would pass in their own Formatter later CustomFmt zfmt.FormatterType = "custom" // AvroSchemaRegistry uses confluent's schema registry. It encodes a schemaID as the first 5 bytes and then avro serializes (binary) // for the remaining part of the payload. It is the successor to `avro_schema` which ships with zfmt, AvroSchemaRegistry zfmt.FormatterType = "avro_schema_registry" // ProtoSchemaRegistry uses confluent's schema registry. It encodes a schemaID as well as the message types as // a payload prefix and then proto serializes (binary) for the remaining part of the payload. // zfmt.ProtoSchemaDeprecatedFmt had a bug in its implementation and didn't work properly with confluent ProtoSchemaRegistry zfmt.FormatterType = "proto_schema_registry" // JSONSchemaRegistry uses confluent's schema registry. It encodes a schemaID as the first 5 bytes and then json serializes (human readable) // for the remaining part of the payload. It is the successor to `json_schema` which ships with zfmt, JSONSchemaRegistry zfmt.FormatterType = "json_schema_registry" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Assignment ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client helps instantiate usable readers and writers
func (*Client) Reader ¶
func (c *Client) Reader(_ context.Context, topicConfig ConsumerTopicConfig, opts ...ReaderOption) (Reader, error)
Reader gets a kafka consumer from the provided config, either from cache or from a new instance
func (*Client) Writer ¶
func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts ...WriterOption) (Writer, error)
Writer gets a kafka producer from the provided config, either from cache or from a new instance
type ClientProvider ¶
type ClientProvider interface { Reader(ctx context.Context, topicConfig ConsumerTopicConfig, opts ...ReaderOption) (Reader, error) Writer(ctx context.Context, topicConfig ProducerTopicConfig, opts ...WriterOption) (Writer, error) Close() error }
ClientProvider is the convenient interface for kafka Client
type Config ¶
type Config struct { // BootstrapServers is a list of broker addresses BootstrapServers []string // SaslUsername and SaslPassword for accessing Kafka Cluster SaslUsername *string SaslPassword *string // CAFile, KeyFile, CertFile are used to enable TLS with valid configuration // If not defined, TLS with InsecureSkipVerify=false is used. CAFile string KeyFile string CertFile string }
Config holds configuration to create underlying kafka client
type ConsumerTopicConfig ¶
type ConsumerTopicConfig struct { // ClientID is required and should be unique. This is used as a cache key for the client ClientID string // GroupID is required for observability per ZG Kafka Best Practices // http://analytics.pages.zgtools.net/data-engineering/data-infra/streamz/docs/#/guides/kafka-guidelines?id=observability // The convention is [team_name]/[service]/[group], e.g. concierge/search/index-reader GroupID string // Topic is the name of the topic to be consumed. At least one should be specified between the Topic and Topics attributes Topic string // Topics are the names of the topics to be consumed. At least one should be specified between the Topic and Topics attributes Topics []string // BootstrapServers are the addresses of the possible brokers to be connected to. // If not defined, Reader and Writer will attempt to use the brokers defined by the client BootstrapServers []string // AutoCommitIntervalMs is a setting which indicates how often offsets will be committed to the kafka broker. AutoCommitIntervalMs *int // AdditionalProps is defined as an escape hatch to specify properties not specified as strongly typed fields. // The values here will be overwritten by the values of TopicConfig fields if specified there as well. AdditionalProps map[string]interface{} // Formatter is json if not defined Formatter zfmt.FormatterType SchemaRegistry SchemaRegistryConfig // SchemaID defines the schema registered with Confluent schema Registry // Default value is 0, and it implies that both Writer and Reader do not care about schema validation // and should encode/decode the message based on data type provided. // Currently, this only works with SchematizedAvroFormatter SchemaID int // Enable kafka transaction, default to false Transaction bool // ReadTimeoutMillis specifies how much time, in milliseconds, before a kafka read times out (and error is returned) ReadTimeoutMillis *int // ProcessTimeoutMillis specifies how much time, in milliseconds, // is given to process a particular message before cancellation is calls. // Default to 1 minute ProcessTimeoutMillis *int // SessionTimeoutMillis specifies how much time, in milliseconds, // is given by the broker, where in the absence of a heartbeat being successfully received from the consumer // group member, the member is considered failed (and a rebalance is initiated). // Defaults to 1 minute 1 second (just over default `ProcessTimeoutMillis`) SessionTimeoutMillis *int // MaxPollIntervalMillis specifies how much time, in milliseconds, // is given by the broker, where in the absence of `Read`/`Poll` being called by a consumer, the member is considered failed (and a rebalance is initiated). // Defaults to 1 minute 1 second (just over default `ProcessTimeoutMillis`) MaxPollIntervalMillis *int // ProcessDelayMillis specifies how much time, in milliseconds, // a virtual partition processor should pause prior to calling processor. // The specified duration represents the maximum pause a processor will execute. The virtual partition processor // uses the message's timestamp and its local estimate of `now` to determine // the observed delay. If the observed delay is less than the amount configured here, // an additional pause is executed. ProcessDelayMillis *int // SaslUsername and SaslPassword for accessing Kafka Cluster SaslUsername *string SaslPassword *string // DeadLetterTopicConfig allows you to specify a topic for which to write messages which failed during processing to DeadLetterTopicConfig *ProducerTopicConfig }
ConsumerTopicConfig holds configuration to create reader for a kafka topic
func (ConsumerTopicConfig) GetFormatter ¶
func (p ConsumerTopicConfig) GetFormatter() zfmt.FormatterType
func (ConsumerTopicConfig) GetSchemaID ¶
func (p ConsumerTopicConfig) GetSchemaID() int
type DeserializationConfig ¶
type DeserializationConfig struct { // Schema is used exclusively by the avro schema registry formatter today. It's necessary to provide proper schema evolution properties // expected by typical use cases. Schema string }
type FakeClient ¶
FakeClient is a convenience struct for testing purposes. It allows the specification of your own Reader/Writer while implementing the `ClientProvider` interface, which makes it compatible with a work factory.
func (FakeClient) Close ¶
func (f FakeClient) Close() error
func (FakeClient) Reader ¶
func (f FakeClient) Reader(_ context.Context, _ ConsumerTopicConfig, _ ...ReaderOption) (Reader, error)
func (FakeClient) Writer ¶
func (f FakeClient) Writer(_ context.Context, _ ProducerTopicConfig, _ ...WriterOption) (Writer, error)
type FakeMessage ¶
type FakeMessage struct { Key *string Value []byte // ValueData allows the specification of serializable instance and uses the provided formatter // to create ValueData. Any error during serialization is ignored. ValueData any DoneFunc func(ctx context.Context) Headers map[string][]byte Offset int64 Partition int32 Topic string GroupID string TimeStamp time.Time Fmt zfmt.Formatter }
FakeMessage can be used during testing to construct Message objects. The Message object has private fields which might need to be tested
type Formatter ¶
Formatter allows the user to extend formatting capability to unsupported data types
type KReader ¶
type KReader struct {
// contains filtered or unexported fields
}
KReader is a Reader implementation which allows for the subscription to multiple topics. It provides methods for consuming messages from its subscribed topics and assigned partitions.
func (*KReader) Assignments ¶
func (r *KReader) Assignments(_ context.Context) ([]Assignment, error)
Assignments returns the current partition assignments for the kafka consumer
type KWriter ¶
type KWriter struct {
// contains filtered or unexported fields
}
KWriter is a kafka producer. KWriter should be initialized from the Client to be usable
func (*KWriter) Close ¶
func (w *KWriter) Close()
Close terminates the writer gracefully and mark it as closed
func (*KWriter) Write ¶
Write sends messages to kafka with message key set as nil. The value arg passed to this method is marshalled by the configured formatter and used as the kafka message's value
func (*KWriter) WriteKey ¶
func (w *KWriter) WriteKey(ctx context.Context, key string, value any, opts ...WriteOption) (Response, error)
WriteKey send message to kafka with a defined keys. The value arg passed to this method is marshalled by the configured formatter and used as the kafka message's value
func (*KWriter) WriteRaw ¶
func (w *KWriter) WriteRaw(ctx context.Context, key *string, value []byte, opts ...WriteOption) (Response, error)
WriteRaw allows you to write messages using a lower level API than Write and WriteKey. WriteRaw raw doesn't use a formatter to marshall the value data and instead takes the bytes as is and places them as the value for the kafka message It's convenient for forwarding message in dead letter operations.
type KafkaConsumer ¶
type KafkaConsumer interface { SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error ReadMessage(timeout time.Duration) (*kafka.Message, error) Commit() ([]kafka.TopicPartition, error) StoreOffsets(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error) Close() error Assignment() (partitions []kafka.TopicPartition, err error) AssignmentLost() bool }
type KafkaProducer ¶
type LifecycleHooks ¶
type LifecycleHooks struct { // Called by work after reading a message (guaranteed non nil), offers the ability to customize the context object (resulting context object passed to work processor) PostRead func(ctx context.Context, meta LifecyclePostReadMeta) (context.Context, error) // Called by work immediately after an attempt to read a message. Msg might be nil, if there was an error // or no available messages. PostReadImmediate func(ctx context.Context, meta LifecyclePostReadImmediateMeta) // Called after receiving a message and before processing it. PreProcessing func(ctx context.Context, meta LifecyclePreProcessingMeta) (context.Context, error) // Called after processing a message PostProcessing func(ctx context.Context, meta LifecyclePostProcessingMeta) error // Called after sending a message to the queue PostAck func(ctx context.Context, meta LifecyclePostAckMeta) error // Called prior to executing write operation PreWrite func(ctx context.Context, meta LifecyclePreWriteMeta) (LifecyclePreWriteResp, error) // Call after the reader attempts a fanOut call. PostFanout func(ctx context.Context) }
func ChainLifecycleHooks ¶
func ChainLifecycleHooks(hooks ...LifecycleHooks) LifecycleHooks
ChainLifecycleHooks chains multiple lifecycle hooks into one. The hooks are called in the order they are passed. All hooks are called, even when errors occur. Errors are accumulated in a wrapper error and returned to the caller.
type LifecycleHooksOption ¶
type LifecycleHooksOption struct {
// contains filtered or unexported fields
}
type LifecyclePostAckMeta ¶
type LifecyclePostReadMeta ¶
type LifecyclePreWriteMeta ¶
type LifecyclePreWriteMeta struct{}
type LifecyclePreWriteResp ¶
type Logger ¶
type Logger interface { Debugw(ctx context.Context, msg string, keysAndValues ...any) Infow(ctx context.Context, msg string, keysAndValues ...any) Warnw(ctx context.Context, msg string, keysAndValues ...any) Errorw(ctx context.Context, msg string, keysAndValues ...any) }
Logger is the interface that wraps basic logging functions
type Message ¶
type Message struct { Key string Headers map[string][]byte Offset int64 Partition int32 Topic string GroupID string TimeStamp time.Time // contains filtered or unexported fields }
Message is a container for kafka message
func GetMsgFromFake ¶
func GetMsgFromFake(input *FakeMessage) *Message
GetMsgFromFake allows the construction of a Message object (allowing the specification of some private fields).
func (*Message) Done ¶
func (m *Message) Done()
Done is used to alert that message processing has completed. This marks the message offset to be committed
func (*Message) DoneWithContext ¶
DoneWithContext is used to alert that message processing has completed. This marks the message offset to be committed
type NoopLogger ¶
type NoopLogger struct{}
type Option ¶
type Option func(*Client)
Option is a function that modify the client configurations
func KafkaGroupPrefixOption ¶
KafkaGroupPrefixOption creates a groupPrefix which will be added to all client and producer groupID strings if created after this option is added
func LoggerOption ¶
LoggerOption applies logger to the client and to all writers/readers which are created after this call.
func WithClientLifecycleHooks ¶
func WithClientLifecycleHooks(h LifecycleHooks) Option
func WithClientTextMapPropagator ¶
func WithClientTextMapPropagator(p propagation.TextMapPropagator) Option
WithClientTextMapPropagator applies an otel p to the client and to all writers/readers which are created
func WithClientTracerProviderOption ¶
func WithClientTracerProviderOption(tp trace.TracerProvider) Option
WithClientTracerProviderOption applies an otel tracer provider to the client and to all writers/readers which are created
func WithConsumerProvider ¶
func WithConsumerProvider(provider func(config map[string]any) (KafkaConsumer, error)) Option
WithConsumerProvider allows for the specification of a factory which is responsible for returning a KafkaConsumer given a config map.
func WithProducerProvider ¶
func WithProducerProvider(provider func(config map[string]any) (KafkaProducer, error)) Option
WithProducerProvider allows for the specification of a factory which is responsible for returning a KafkaProducer given a config map.
type ProcessError ¶
type ProcessError struct { // Err is the actual error that the processor encountered. Err error // DisableCircuitBreak indicates that this error should be ignored for // purposes of managing the circuit breaker. Any returned errors where // this is set to true will not cause the processing of messages to slow. DisableCircuitBreak bool // DisableDLTWrite indicates that this message should not be written to // a dead letter topic (if one is configured) as it cannot be retried // successfully. DisableDLTWrite bool }
ProcessError wraps an error that a processor encounters, while also exposing controls that allow for specifying how the error should be handled.
func (ProcessError) Error ¶
func (p ProcessError) Error() string
func (ProcessError) Unwrap ¶
func (p ProcessError) Unwrap() error
type ProducerTopicConfig ¶
type ProducerTopicConfig struct { // ClientID is required and should be unique. This is used as a cache key for the client ClientID string // Topic is required Topic string // BootstrapServers are the addresses of the possible brokers to be connected to. // If not defined, Reader and Writer will attempt to use the brokers defined by the client BootstrapServers []string // DeliveryTimeoutMs is a librdkafka setting. Local message timeout. // This value is only enforced locally and limits the time a produced message waits for successful delivery. // A time of 0 is infinite. This is the maximum time librdkafka may use to deliver a message (including retries). // Delivery error occurs when either the retry count or the message timeout are exceeded. // See defaults in librdkafka configuration DeliveryTimeoutMs *int // AdditionalProps is defined as an escape hatch to specify properties not specified as strongly typed fields. // The values here will be overwritten by the values of TopicConfig fields if specified there as well. AdditionalProps map[string]interface{} // Formatter is json if not defined Formatter zfmt.FormatterType // SchemaRegistry provides details about connecting to a schema registry including URL // as well as others. SchemaRegistry SchemaRegistryConfig // SchemaID defines the schema registered with Confluent schema Registry // Default value is 0, and it implies that both Writer and Reader do not care about schema validation // and should encode/decode the message based on data type provided. // Currently, this only works with SchematizedAvroFormatter SchemaID int // Enable kafka transaction, default to false Transaction bool // RequestRequiredAcks indicates the number of acknowledgments the leader broker must receieve from In Sync Replica (ISR) brokers before responding // to the request (0=Broker does not send any response to client, -1 or all=broker blocks until all ISRs commit) RequestRequiredAcks *string // EnableIdempotence When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. // Default to true EnableIdempotence *bool // LingerMillis specifies the delay, in milliseconds, to wait for messages in the producer to accumulate before constructing message batches. LingerMillis int NagleDisable *bool // SaslUsername and SaslPassword for accessing Kafka Cluster SaslUsername *string SaslPassword *string }
ProducerTopicConfig holds configuration to create writer to kafka topic
func (ProducerTopicConfig) GetFormatter ¶
func (p ProducerTopicConfig) GetFormatter() zfmt.FormatterType
func (ProducerTopicConfig) GetSchemaID ¶
func (p ProducerTopicConfig) GetSchemaID() int
type ReaderOption ¶
type ReaderOption func(*ReaderSettings)
ReaderOption is a function that modify the KReader configurations
func RFormatterOption ¶
func RFormatterOption(formatter Formatter) ReaderOption
RFormatterOption sets the formatter for this reader
type ReaderSettings ¶
type ReaderSettings struct {
// contains filtered or unexported fields
}
type Response ¶
Response is a kafka response with the Partition where message was sent to along with its assigned Offset
type SchemaRegistryConfig ¶
type SchemaRegistryConfig struct { // URL is the schema registry URL. During serialization and deserialization // schema registry is checked against to confirm schema compatability. URL string // Serialization provides additional information used by schema registry formatters during serialization (data write) Serialization SerializationConfig // Deserialization provides additional information used by schema registry formatters during deserialization (data read) Deserialization DeserializationConfig // SubjectName allows the specification of the SubjectName. If not specified defaults to [topic name strategy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy) SubjectName string }
type SerializationConfig ¶
type SerializationConfig struct { // AutoRegisterSchemas indicates whether new schemas (those that evolve existing schemas or are brand new) should be registered // with schema registry dynamically. This feature is typically not used for production workloads AutoRegisterSchemas bool // Schema is used exclusively by the avro schema registry formatter today. Its necessary to provide proper schema evolution properties // expected by typical use cases. Schema string }
type TopicPartition ¶
type Work ¶
type Work struct {
// contains filtered or unexported fields
}
Work has a single public method `Run()` which continuously reads and process messages from the topic (or topics) it is registered to listen to. `Run()` executes the following steps
- Read a kafka.Message using the provided reader.
- Select the virtual partition pool allocated for a specific topic
- Select and write the `kafka.Message` to the pool's virtual partition based on a hash of the `kafka.Message.Key` (virtual partition selection)
- A goroutine is assigned for each virtual partition. Its responsibility is to continuously read from its virtual partition, call the Process callback function, and then store the offset of the message.
Additional responsibilities includes:
- Logging
- Executing callbacks of special lifecycle events (useful for observability like tracing/metrics)
- Tripping circuit breaker for error conditions
- Writing to dead letter topic, when configured.
func (*Work) Run ¶
Run executes a pipeline with a single reader (possibly subscribed to multiple topics) fanning out read messages to virtual partitions (preserving message order) and subsequently being processed by the registered processor (user code which executes per kafka message).
It returns either after context.Context cancellation or receiving a shutdown signal from settings (both of which will cause the awaited execReader and execProcessor methods to return
type WorkFactory ¶
type WorkFactory struct {
// contains filtered or unexported fields
}
WorkFactory creates a work object which reads messages from kafka topic and processes messages concurrently.
func NewWorkFactory ¶
func NewWorkFactory( kafkaProvider ClientProvider, options ...WorkFactoryOption, ) WorkFactory
NewWorkFactory initializes a new WorkFactory
func (WorkFactory) Create ¶
func (f WorkFactory) Create(topicConfig ConsumerTopicConfig, processor processor, options ...WorkOption) *Work
Create creates a new Work instance.
func (WorkFactory) CreateWithFunc ¶
func (f WorkFactory) CreateWithFunc(topicConfig ConsumerTopicConfig, p func(_ context.Context, msg *Message) error, options ...WorkOption) *Work
CreateWithFunc creates a new Work instance, but allows for the processor to be specified as a callback function instead of an interface
type WorkFactoryOption ¶
type WorkFactoryOption interface {
// contains filtered or unexported methods
}
WorkFactoryOption interface to identify functional options
func WithLogger ¶
func WithLogger(l Logger) WorkFactoryOption
WithLogger provides option to override the logger to use. default is noop
func WithTextMapPropagator ¶
func WithTextMapPropagator(p propagation.TextMapPropagator) WorkFactoryOption
WithTextMapPropagator provides option to specify the otel text map propagator used by the created work object. Defaults to nil (which means no propagation of transport across transports)
func WithTracerProvider ¶
func WithTracerProvider(tp trace.TracerProvider) WorkFactoryOption
WithTracerProvider provides option to specify the otel tracer provider used by the created work object. Defaults to nil (which means no tracing)
func WithWorkLifecycleHooks ¶
func WithWorkLifecycleHooks(h LifecycleHooks) WorkFactoryOption
WithWorkLifecycleHooks provides option to override the lifecycle hooks. Default is noop.
type WorkOption ¶
type WorkOption interface {
// contains filtered or unexported methods
}
WorkOption interface to identify functional options
func CircuitBreakAfter ¶
func CircuitBreakAfter(times uint32) WorkOption
CircuitBreakAfter these many consecutive failures
func CircuitBreakFor ¶
func CircuitBreakFor(duration time.Duration) WorkOption
CircuitBreakFor sets the duration for which to keep the circuit open once broken
func DisableBusyLoopBreaker
deprecated
func DisableBusyLoopBreaker() WorkOption
Deprecated: DisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes. Without blb we see increased cpu usage when circuit is open
func DisableCircuitBreaker
deprecated
func DisableCircuitBreaker() WorkOption
Deprecated: DisableCircuitBreaker disables the circuit breaker so that it never breaks
func Speedup ¶
func Speedup(times uint16) WorkOption
Speedup increases the concurrencyFactor for a worker. concurrencyFactor is how many go routines can be running in parallel. NOTE: it's strongly recommended to add more worker instances rather than using this option to speed up each worker.
func WithDeadLetterTopic ¶
func WithDeadLetterTopic(deadLetterTopicConfig ProducerTopicConfig) WorkOption
WithDeadLetterTopic allows you to specify a dead letter topic to forward messages to when work processing fails
func WithDisableBusyLoopBreaker ¶
func WithDisableBusyLoopBreaker(isDisabled bool) WorkOption
WithDisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes. Without blb we see increased cpu usage when circuit is open
func WithDisableCircuitBreaker ¶
func WithDisableCircuitBreaker(isDisabled bool) WorkOption
WithDisableCircuitBreaker allows the user to control whether circuit breaker is disabled or not
func WithLifecycleHooks ¶
func WithLifecycleHooks(h LifecycleHooks) WorkOption
func WithOnDone ¶
func WithOnDone(f func(ctx context.Context, message *Message, err error)) WorkOption
WithOnDone allows you to specify a callback function executed after processing of a kafka message
func WithWorkTextMapPropagator ¶ added in v2.2.0
func WithWorkTextMapPropagator(p propagation.TextMapPropagator) WorkOption
WithWorkTextMapPropagator enables the specification of a propagator at the moment a work instance is instantiated
type WriteOption ¶
type WriteOption interface {
// contains filtered or unexported methods
}
WriteOption is a function that modifies the kafka.Message to be transmitted
func WithHeaders ¶
func WithHeaders(headers map[string]string) WriteOption
WithHeaders allows for the specification of headers. Specified headers will override collisions.
type Writer ¶
type Writer interface { // Write sends messages to kafka with message key set as nil. // The value arg passed to this method is marshalled by // the configured formatter and used as the kafka message's value Write(ctx context.Context, value any, opts ...WriteOption) (Response, error) // WriteKey send message to kafka with a defined keys. // The value arg passed to this method is marshalled by // the configured formatter and used as the kafka message's value WriteKey(ctx context.Context, key string, value any, opts ...WriteOption) (Response, error) // WriteRaw sends messages to kafka. The caller is responsible for marshalling the data themselves. WriteRaw(ctx context.Context, key *string, value []byte, opts ...WriteOption) (Response, error) Close() }
Writer is the convenient interface for kafka KWriter
type WriterOption ¶
type WriterOption func(*WriterSettings)
WriterOption is a function that modify the writer configurations
func WFormatterOption ¶
func WFormatterOption(f Formatter) WriterOption
WFormatterOption sets the formatter for this writer
type WriterSettings ¶
type WriterSettings struct { DisableTracePropagation bool // contains filtered or unexported fields }
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
example
|
|
Package mock_zkafka is a generated GoMock package.
|
Package mock_zkafka is a generated GoMock package. |
confluent
Package mock_confluent is a generated GoMock package.
|
Package mock_confluent is a generated GoMock package. |
test
|
|
evolution/avro1
Code generated by avro/gen.
|
Code generated by avro/gen. |
evolution/avro2
Code generated by avro/gen.
|
Code generated by avro/gen. |