Documentation ¶
Index ¶
- Constants
- Variables
- func CtxIsDLQEnabled(ctx context.Context) bool
- func CtxIsDLQProcess(ctx context.Context) bool
- func SkipAppend(skip SkipMap) func(SkipMap) SkipMap
- func SkipReplace(skip SkipMap) func(SkipMap) SkipMap
- func ToPtr[T any](v T) *T
- type CallBackFunc
- type Client
- func (c *Client) AddTrigger(t func(context.Context))
- func (c *Client) Admin() *kadm.Client
- func (c *Client) AppName() string
- func (c *Client) Brokers() []string
- func (c *Client) ClientID() []byte
- func (c *Client) Close()
- func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...OptionConsumer) error
- func (c *Client) DLQRecord() DLQRecord
- func (c *Client) DLQRetry(opts ...OptionDLQTriggerFn)
- func (c *Client) DLQTopics() []string
- func (c *Client) GroupID() string
- func (c *Client) Logger() Logger
- func (c *Client) ProduceRaw(ctx context.Context, records []*kgo.Record) error
- func (c *Client) Skip(ctx context.Context, modify func(SkipMap) SkipMap)
- func (c *Client) SkipCheck() SkipMap
- func (c *Client) Topics() []string
- type Config
- type ConsumerConfig
- type ConsumerPreConfig
- type DLQConfig
- type DLQError
- type DLQRecord
- type DLQTriggerSpec
- type GroupIDValidation
- type Header
- type LogNoop
- type Logger
- type Meter
- type OffsetConfig
- type Option
- func WithAppName(appName string) Option
- func WithAutoTopicCreation(v bool) Option
- func WithClientID(clientID string) Option
- func WithClientInfo(appName, version string) Option
- func WithConsumer(cfg ConsumerConfig) Option
- func WithKGOOptions(opts ...kgo.Opt) Option
- func WithKGOOptionsDLQ(opts ...kgo.Opt) Option
- func WithLogger(logger Logger) Option
- func WithMeter(m Meter) Option
- func WithPing(v bool) Option
- func WithPingBackoff(b backoff.BackOff) Option
- func WithPingRetry(v bool) Option
- func WithPlugin[T any](name string, fn PluginFunc[T]) Option
- type OptionConsumer
- type OptionDLQTrigger
- type OptionDLQTriggerFn
- type OptionProducer
- type PluginFunc
- type Producer
- type Record
- type SalsConfig
- type SaslConfigs
- type SaslPlain
- type SaslSCRAM
- type SecurityConfig
- type SkipMap
- type TLSConfig
- type Validation
Constants ¶
const ( // KeyRecord is the context key for *Record. KeyRecord ctxKey = "kafka_record" // KeyIsDLQProcess is the context key for is DLQ. KeyIsDLQProcess ctxKey = "kafka_is_dlq" // KeyIsDLQEnabled is the context key for is DLQ enabled. KeyIsDLQEnabled ctxKey = "kafka_is_dlq_enabled" )
const HeaderServiceKey = "service"
Variables ¶
var ( DefaultRetryInterval = 10 * time.Second DefaultRetryMaxInterval = 15 * time.Minute )
var ( // ErrSkip is use to skip message in the PreCheck hook or Decode function. ErrSkip = errors.New("skip message") // ErrDLQ use with callback function to send message to DLQ topic. // Prefer to use WrapErrDLQ to wrap error. ErrDLQ = errors.New("error DLQ") )
var ( ScramSha256 = "SCRAM-SHA-256" ScramSha512 = "SCRAM-SHA-512" )
var DefaultBatchCount = 100
DefaultBatchCount is default batch count for batch consumer, if not set.
var (
DefaultClientID string
)
var ErrConnection = errors.New("connect to kafka brokers failed")
Functions ¶
func CtxIsDLQEnabled ¶ added in v0.2.0
CtxIsDLQEnabled usable in the callback function to understand DLQ is enabled.
- If the context is nil, or the KeyIsDLQEnabled is not set, false is returned.
func CtxIsDLQProcess ¶ added in v0.2.0
CtxIsDLQProcess usable in the callback function to understand processing a DLQ message.
- If the context is nil, or the KeyIsDLQProcess is not set, false is returned.
func SkipAppend ¶ added in v0.3.0
func SkipReplace ¶ added in v0.3.0
Types ¶
type CallBackFunc ¶ added in v0.1.0
type CallBackFunc func(*optionConsumer) error
func WithCallback ¶ added in v0.0.7
func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc
WithCallback to set wkafka consumer's callback function.
- Default is json.Unmarshal, use WithDecode option to add custom decode function.
- If []byte then default decode function will be skipped.
func WithCallbackBatch ¶ added in v0.0.7
func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallBackFunc
WithCallbackBatch to set wkafka consumer's callback function.
- Default is json.Unmarshal, use WithDecode option to add custom decode function.
- If [][]byte then default decode function will be skipped.
type Client ¶
type Client struct { Kafka *kgo.Client KafkaDLQ *kgo.Client // contains filtered or unexported fields }
func (*Client) AddTrigger ¶ added in v0.4.0
func (*Client) Consume ¶
func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...OptionConsumer) error
Consume starts consuming messages from kafka and blocks until context is done or an error occurs.
- Only works if client is created with consumer config.
- Just run one time.
func (*Client) DLQRecord ¶ added in v0.3.5
DLQRecord returns stucked DLQRecord if exists.
- Warning: return pointer record and not modify it.
func (*Client) DLQRetry ¶ added in v0.4.1
func (c *Client) DLQRetry(opts ...OptionDLQTriggerFn)
DLQRetry to trigger DLQ retry and not wait sleep.
func (*Client) ProduceRaw ¶
Produce sends a message to kafka. For type producer check wkafka.NewProducer.
func (*Client) Skip ¶ added in v0.3.0
Skip for modifying skip configuration in runtime.
- Useful for DLQ topic.
- Don't wait inside the modify function.
type Config ¶
type Config struct { // Brokers is a list of kafka brokers to connect to. // Not all brokers need to be specified, the list is so that // if one broker is unavailable, another can be used. // Required at least one broker. Example value is 'localhost:9092'. Brokers []string `cfg:"brokers" json:"brokers"` Security SecurityConfig `cfg:"security" json:"security"` // Compressions is chosen in the order preferred based on broker support. // The default is to use no compression. // Available: // - gzip // - snappy // - lz4 // - zstd Compressions []string `cfg:"compressions" json:"compressions"` // Consumer is a pre configuration for consumer and validation. Consumer ConsumerPreConfig `cfg:"consumer" json:"consumer"` // Plugins add custom plugins to the client like handler. Plugins map[string]interface{} `cfg:"plugins" json:"plugins"` }
type ConsumerConfig ¶ added in v0.0.7
type ConsumerConfig struct { // Topics is a list of kafka topics to consume. // Required at least one topic, topic name if not exist will be created or consumer waits for topic creation. Topics []string `cfg:"topics" json:"topics"` // GroupID is the kafka consumer group ID, unique per application. // All created with the same ID will not be able to see messages that another consumer // is consuming. If a message is committed and that consumer fails for some reason, // another consumer with the same group ID will pick up where the other left off. // Required. GroupID string `cfg:"group_id" json:"group_id"` // StartOffset is used when there is no committed offset for GroupID. // // Available options: // 0 : Start consuming from the earliest offset. // -1 : Start consuming from the latest offset. // 0 < n : Start consuming from the offset n. StartOffset int64 `cfg:"start_offset" json:"start_offset"` // Skip are optional message offsets to be skipped and not streamed by the consumer callback. // // This is a programmatic feature and message will come and check by the library. // // The format is a map of topic to partition to offsets. // mytopic: // topic name // 0: // partition number // offsets: // list of offsets to skip // - 31 // - 90 // before: 20 // skip all offsets before or equal to this offset Skip map[string]map[int32]OffsetConfig `cfg:"skip" json:"skip"` // MaxPollRecords is the maximum number of records returned in a single call to poll. // - Default is max.poll.records in the broker configuration, usually 500. // - Fetching messages from broker, this is not related with batch processing! MaxPollRecords int `cfg:"max_poll_records" json:"max_poll_records"` // BatchCount is a number of messages processed in a single batch. // - Processing count could be less than BatchCount if the batch is not full. // - Usable with WithConsumerBatch // - Default is 100. BatchCount int `cfg:"batch_count" json:"batch_count"` // DLQ is a dead letter queue configuration. DLQ DLQConfig `cfg:"dlq" json:"dlq"` }
type ConsumerPreConfig ¶ added in v0.0.7
type ConsumerPreConfig struct { // PrefixGroupID add prefix to group_id. PrefixGroupID string `cfg:"prefix_group_id" json:"prefix_group_id"` // FormatDLQTopic is a format string to generate DLQ topic name. // - Example is "finops_{{.AppName}}_dlq" // - It should be exist if DLQ is enabled and topic is not set. // // - Available variables: // - AppName FormatDLQTopic string `cfg:"format_dlq_topic" json:"format_dlq_topic"` // Validation is a configuration for validation when consumer initialized. Validation Validation `cfg:"validation" json:"validation"` }
type DLQConfig ¶ added in v0.1.1
type DLQConfig struct { // Disabled is a flag to disable DLQ. // - Default is false. // - If topic is not set, it will be generated from format_dlq_topic. // - If topic and format_dlq_topic is not set, dlq will be disabled! Disabled bool `cfg:"disabled" json:"disabled"` // RetryInterval is a time interval to retry again of DLQ messages. // - Default is 10 seconds. RetryInterval time.Duration `cfg:"retry_interval" json:"retry_interval"` // RetryMaxInterval is a maximum time interval to retry again of DLQ messages. // - Default is 15 minutes. RetryMaxInterval time.Duration `cfg:"retry_max_interval" json:"retry_max_interval"` // StartOffset is used when there is no committed offset for GroupID. // // Available options: // 0 : Start consuming from the earliest offset. // -1 : Start consuming from the latest offset. // 0 < n : Start consuming from the offset n. StartOffset int64 `cfg:"start_offset" json:"start_offset"` // Skip are optional message offsets to be skipped. // // The format is a map of partition directyl and offsets. // 0: // partition number // offsets: // list of offsets to skip // - 31 // - 90 // before: 20 // skip all offsets before or equal to this offset Skip map[int32]OffsetConfig `cfg:"skip" json:"skip"` // Topic is a topic name to send messages that failed to process also could be used for DLQ. Topic string `cfg:"topic" json:"topic"` // TopicExtra is extra a list of kafka topics to just consume from DLQ. TopicsExtra []string `cfg:"topics_extra" json:"topics_extra"` }
type DLQError ¶ added in v0.1.1
type DLQError struct { // Err is default error to add in header. // If not setted, header will just show "DLQ indexed error" Err error // Indexes to use send specific batch index to DLQ. // If index's error is nil, default error is used. Indexes map[int]error }
DLQError is use with callback function to send message to DLQ topic.
func IsDQLError ¶ added in v0.3.5
IsDQLError check if error is DLQ error and return it.
func WrapErrDLQ ¶ added in v0.1.1
type DLQTriggerSpec ¶ added in v0.4.2
type GroupIDValidation ¶ added in v0.0.7
type GroupIDValidation struct { Enabled bool `cfg:"enabled" json:"enabled"` // RgxGroupID is a regex pattern to validate RgxGroupID. RgxGroupID string `cfg:"rgx_group_id" json:"rgx_group_id"` }
GroupIDValidation is a configuration for group_id validation.
func (GroupIDValidation) Validate ¶ added in v0.0.7
func (v GroupIDValidation) Validate(groupID string) error
type Header ¶
type Header = kgo.RecordHeader
type Meter ¶ added in v0.2.1
type OffsetConfig ¶ added in v0.1.1
type OffsetConfig struct { // Offsets is a list of offsets numbers in that partition to skip. Offsets []int64 `cfg:"offsets" json:"offsets"` // Before skips all offsets before or equal to this offset. // In other words, before is 10, the listener skips all offsets that // are <= 10, so offsets 1, 2, 3, 4, 5, 6, 7, 8, 9 and 10 will be skipped // but not 11, 12, 13, ... Before *int64 `cfg:"before" json:"before"` }
type Option ¶
type Option func(*options)
func WithAppName ¶ added in v0.1.0
WithAppName to set app name in kafka server. Default is using idProgname variable.
Use WithClientInfo instead if you want to set version and appname.
func WithAutoTopicCreation ¶
WithAutoTopicCreation to enable auto topic creation for producer and consumer.
Default is enabled.
- Producer will fail if topic not exists and auto topic creation is disabled.
- Consumer will wait and not fail when is disabled.
func WithClientID ¶
WithClientID to set client_id in kafka server. Default is using DefaultClientID variable.
No need to set most of time!
func WithClientInfo ¶ added in v0.0.5
WithClientInfo to set client_id in kafka server. Not usable with WithClientID option.
- appname:version@hostname
func WithConsumer ¶
func WithConsumer(cfg ConsumerConfig) Option
WithConsumer configures the client to use the provided consumer config.
- It is shallow copied and to make safe use skip function to modify skip map.
func WithKGOOptions ¶
WithKGOOptions to set kgo options.
func WithKGOOptionsDLQ ¶ added in v0.1.0
WithKGOOptionsDLQ to set kgo options for DLQ client.
If empty than it will use same options as main client.
func WithLogger ¶ added in v0.1.0
WithLogger configures the client to use the provided logger.
- For zerolog logz.AdapterKV{Log: logger} can usable.
- Default is using zerolog's global logger.
- Set nil to disable logging.
func WithPing ¶ added in v0.3.2
WithPing to ping kafka brokers on client creation.
- Default is enabled.
func WithPingBackoff ¶ added in v0.3.2
func WithPingBackoff(b backoff.BackOff) Option
func WithPingRetry ¶ added in v0.3.2
WithPingRetry to retry ping kafka brokers on client creation.
func WithPlugin ¶ added in v0.3.6
func WithPlugin[T any](name string, fn PluginFunc[T]) Option
type OptionConsumer ¶ added in v0.0.7
type OptionConsumer func(*optionConsumer) error
func WithCallbackDLQ ¶ added in v0.1.0
func WithCallbackDLQ[T any](fn func(ctx context.Context, msg T) error) OptionConsumer
WithCallbackDLQ to set wkafka consumer's callback function for DLQ.
- Use this option if you want to process DLQ messages in different function.
func WithDecode ¶ added in v0.0.7
WithDecode to set wkafka consumer's decode function.
- Default is json.Unmarshal.
- Use this option after the WithCallback option.
func WithPreCheck ¶ added in v0.0.7
WithPreCheck to set wkafka consumer's pre check function.
- Return ErrSkip will skip the message.
type OptionDLQTrigger ¶ added in v0.4.1
type OptionDLQTrigger struct { Force bool `cfg:"force" json:"force"` Spec *DLQTriggerSpec `cfg:"spec" json:"spec"` SpecPartitions map[string][]int32 `cfg:"spec_partitions" json:"spec_partitions"` }
func (*OptionDLQTrigger) ToOption ¶ added in v0.4.1
func (o *OptionDLQTrigger) ToOption() OptionDLQTriggerFn
type OptionDLQTriggerFn ¶ added in v0.4.1
type OptionDLQTriggerFn func(*OptionDLQTrigger)
func WithDLQTriggerForce ¶ added in v0.4.2
func WithDLQTriggerForce() OptionDLQTriggerFn
func WithDLQTriggerSpec ¶ added in v0.4.2
func WithDLQTriggerSpec(specs *DLQTriggerSpec) OptionDLQTriggerFn
func WithDLQTriggerSpecPartitions ¶ added in v0.4.2
func WithDLQTriggerSpecPartitions(partitions map[string][]int32) OptionDLQTriggerFn
type OptionProducer ¶ added in v0.1.0
type OptionProducer func(producerConfigInf) error
func WithEncoder ¶ added in v0.1.0
func WithEncoder[T any](fn func(T) ([]byte, error)) OptionProducer
WithEncoder to set encoder function.
func WithHeaders ¶ added in v0.1.0
func WithHeaders[T any](headers ...Header) OptionProducer
WithHeaders to append headers.
type PluginFunc ¶ added in v0.3.6
type Producer ¶
type Producer[T any] struct { // contains filtered or unexported fields }
func NewProducer ¶
NewProducer to create a new procuder with type.
- If data is []byte, Encoder will be ignored.
- WithHook, WithEncoder, WithHeaders options can be used.
type Record ¶
func CtxRecord ¶ added in v0.1.0
CtxRecord returns the *Record from the context in callback function.
- If the context is nil, or the Record is not set, nil is returned.
- This is only used in callback function.
- Don't edit the returned *Record!
func CtxRecordBatch ¶ added in v0.1.0
CtxRecordBatch returns the []*Record from the context in callback function.
- If the context is nil, or the Record is not set, nil is returned.
- This is only used in batch callback function.
- Don't edit the returned []*Record!
type SalsConfig ¶
type SaslConfigs ¶
type SaslConfigs []SalsConfig
type SaslPlain ¶
type SaslPlain struct { // Enabled this config. Enabled bool `cfg:"enabled" json:"enabled"` // Zid is an optional authorization ID to use in authenticating. Zid string `cfg:"zid" json:"zid"` // User is the SASL username. User string `cfg:"user" json:"user"` // Pass is the SASL password. Pass string `cfg:"pass" json:"pass" log:"false"` }
SaslPlain contains options for SASL/SCRAM authentication.
type SaslSCRAM ¶
type SaslSCRAM struct { // Enabled this config. Enabled bool `cfg:"enabled" json:"enabled"` // Algorithm valid values are "SCRAM-SHA-256" and "SCRAM-SHA-512". // Empty is plain SASL. Algorithm string `cfg:"algorithm" json:"algorithm"` // Zid is an optional authorization ID to use in authenticating. Zid string `cfg:"zid" json:"zid"` // Username is the SASL username. User string `cfg:"user" json:"user"` // Pass is the SASL password. Pass string `cfg:"pass" json:"pass" log:"false"` // IsToken, if true, suffixes the "tokenauth=true" extra attribute to // the initial authentication message. // // Set this to true if the user and pass are from a delegation token. IsToken bool `cfg:"is_token" json:"is_token"` }
type SecurityConfig ¶
type SecurityConfig struct { TLS TLSConfig `cfg:"tls" json:"tls"` SASL SaslConfigs `cfg:"sasl" json:"sasl"` }
SecurityConfig contains options for TLS and SASL authentication. Zero value is used if the kafka instance has a plaintext listener.
type SkipMap ¶ added in v0.3.0
type SkipMap = map[string]map[int32]OffsetConfig
type TLSConfig ¶
type TLSConfig struct { // Enabled is whether TLS is enabled. Enabled bool `cfg:"enabled" json:"enabled"` // CertFile is the path to the client's TLS certificate. // Should be use with KeyFile. CertFile string `cfg:"cert_file" json:"cert_file"` // KeyFile is the path to the client's TLS key. // Should be use with CertFile. KeyFile string `cfg:"key_file" json:"key_file"` // CAFile is the path to the CA certificate. // If empty, the server's root CA set will be used. CAFile string `cfg:"ca_file" json:"ca_file"` }
TLSConfig contains options for TLS authentication.
type Validation ¶ added in v0.0.6
type Validation struct {
GroupID GroupIDValidation `cfg:"group_id" json:"group_id"`
}
Validation is a configuration for validation when consumer initialized.
func (Validation) Validate ¶ added in v0.0.7
func (v Validation) Validate(consumerConfig *ConsumerConfig) error