Documentation ¶
Index ¶
- Constants
- Variables
- func CtxIsDLQEnabled(ctx context.Context) bool
- func CtxIsDLQProcess(ctx context.Context) bool
- func SkipAppend(skip SkipMap) func(SkipMap) SkipMap
- func SkipReplace(skip SkipMap) func(SkipMap) SkipMap
- type CallBackFunc
- type Client
- func (c *Client) Admin() *kadm.Client
- func (c *Client) Close()
- func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...OptionConsumer) error
- func (c *Client) DLQTopics() []string
- func (c *Client) GetClientID() []byte
- func (c *Client) ProduceRaw(ctx context.Context, records []*kgo.Record) error
- func (c *Client) Skip(modify func(SkipMap) SkipMap)
- type Config
- type ConsumerConfig
- type ConsumerPreConfig
- type DLQConfig
- type DLQError
- type GroupIDValidation
- type Header
- type LogNoop
- type Logger
- type Meter
- type OffsetConfig
- type Option
- func WithAppName(appName string) Option
- func WithAutoTopicCreation(v bool) Option
- func WithClientID(clientID string) Option
- func WithClientInfo(appName, version string) Option
- func WithConsumer(cfg ConsumerConfig) Option
- func WithKGOOptions(opts ...kgo.Opt) Option
- func WithKGOOptionsDLQ(opts ...kgo.Opt) Option
- func WithLogger(logger Logger) Option
- func WithMeter(m Meter) Option
- func WithNoLogger(v bool) Option
- func WithPing(v bool) Option
- func WithPingBackoff(b backoff.BackOff) Option
- func WithPingRetry(v bool) Option
- type OptionConsumer
- type OptionProducer
- type Processor
- type ProcessorDecode
- type ProcessorPreCheck
- type Producer
- type Record
- type SalsConfig
- type SaslConfigs
- type SaslPlain
- type SaslSCRAM
- type SecurityConfig
- type SkipMap
- type TLSConfig
- type Validation
Constants ¶
const ( // KeyRecord is the context key for *Record. KeyRecord ctxKey = "kafka_record" // KeyIsDLQProcess is the context key for is DLQ. KeyIsDLQProcess ctxKey = "kafka_is_dlq" // KeyIsDLQEnabled is the context key for is DLQ enabled. KeyIsDLQEnabled ctxKey = "kafka_is_dlq_enabled" )
const HeaderServiceKey = "service"
Variables ¶
var ( // ErrSkip is use to skip message in the PreCheck hook or Decode function. ErrSkip = errors.New("skip message") // ErrDLQ use with callback function to send message to DLQ topic. // Prefer to use WrapErrDLQ to wrap error. ErrDLQ = errors.New("error DLQ") )
var ( ScramSha256 = "SCRAM-SHA-256" ScramSha512 = "SCRAM-SHA-512" )
var DefaultBatchCount = 100
DefaultBatchCount is default batch count for batch consumer, if not set.
var (
DefaultClientID string
)
var DefaultRetryInterval = 10 * time.Second
var ErrConnection = errors.New("connect to kafka brokers failed")
Functions ¶
func CtxIsDLQEnabled ¶ added in v0.2.0
CtxIsDLQEnabled usable in the callback function to understand DLQ is enabled.
- If the context is nil, or the KeyIsDLQEnabled is not set, false is returned.
func CtxIsDLQProcess ¶ added in v0.2.0
CtxIsDLQProcess usable in the callback function to understand processing a DLQ message.
- If the context is nil, or the KeyIsDLQProcess is not set, false is returned.
func SkipAppend ¶ added in v0.3.0
func SkipReplace ¶ added in v0.3.0
Types ¶
type CallBackFunc ¶ added in v0.1.0
type CallBackFunc func(*optionConsumer) error
func WithCallback ¶ added in v0.0.7
func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc
WithCallback to set wkafka consumer's callback function.
- Default is json.Unmarshal, use WithDecode option to add custom decode function.
- If []byte then default decode function will be skipped.
func WithCallbackBatch ¶ added in v0.0.7
func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallBackFunc
WithCallbackBatch to set wkafka consumer's callback function.
- Default is json.Unmarshal, use WithDecode option to add custom decode function.
- If [][]byte then default decode function will be skipped.
type Client ¶
type Client struct { Kafka *kgo.Client KafkaDLQ *kgo.Client Brokers []string Meter Meter // contains filtered or unexported fields }
func (*Client) Consume ¶
func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...OptionConsumer) error
Consume starts consuming messages from kafka and blocks until context is done or an error occurs.
- Only works if client is created with consumer config.
- Just run one time.
func (*Client) GetClientID ¶ added in v0.3.3
func (*Client) ProduceRaw ¶
Produce sends a message to kafka. For type producer check wkafka.NewProducer.
type Config ¶
type Config struct { // Brokers is a list of kafka brokers to connect to. // Not all brokers need to be specified, the list is so that // if one broker is unavailable, another can be used. // Required at least one broker. Example value is 'localhost:9092'. Brokers []string `cfg:"brokers"` Security SecurityConfig `cfg:"security"` // Compressions is chosen in the order preferred based on broker support. // The default is to use no compression. // Available: // - gzip // - snappy // - lz4 // - zstd Compressions []string `cfg:"compressions"` // Consumer is a pre configuration for consumer and validation. Consumer ConsumerPreConfig `cfg:"consumer"` }
type ConsumerConfig ¶ added in v0.0.7
type ConsumerConfig struct { // Topics is a list of kafka topics to consume. // Required at least one topic, topic name if not exist will be created or consumer waits for topic creation. Topics []string `cfg:"topics"` // GroupID is the kafka consumer group ID, unique per application. // All created with the same ID will not be able to see messages that another consumer // is consuming. If a message is committed and that consumer fails for some reason, // another consumer with the same group ID will pick up where the other left off. // Required. GroupID string `cfg:"group_id"` // StartOffset is used when there is no committed offset for GroupID. // // Available options: // 0 : Start consuming from the earliest offset. // -1 : Start consuming from the latest offset. // 0 < n : Start consuming from the offset n. StartOffset int64 `cfg:"start_offset"` // Skip are optional message offsets to be skipped and not streamed by the consumer callback. // // This is a programmatic feature and message will come and check by the library. // // The format is a map of topic to partition to offsets. // mytopic: // topic name // 0: // partition number // offsets: // list of offsets to skip // - 31 // - 90 // before: 20 // skip all offsets before or equal to this offset Skip map[string]map[int32]OffsetConfig `cfg:"skip"` // MaxPollRecords is the maximum number of records returned in a single call to poll. // - Default is max.poll.records in the broker configuration, usually 500. // - Fetching messages from broker, this is not related with batch processing! MaxPollRecords int `cfg:"max_poll_records"` // BatchCount is a number of messages processed in a single batch. // - Processing count could be less than BatchCount if the batch is not full. // - Usable with WithConsumerBatch // - Default is 100. BatchCount int `cfg:"batch_count"` // DLQ is a dead letter queue configuration. DLQ DLQConfig `cfg:"dlq"` }
type ConsumerPreConfig ¶ added in v0.0.7
type ConsumerPreConfig struct { // PrefixGroupID add prefix to group_id. PrefixGroupID string `cfg:"prefix_group_id"` // FormatDLQTopic is a format string to generate DLQ topic name. // - Example is "finops_{{.AppName}}_dlq" // - It should be exist if DLQ is enabled and topic is not set. // // - Available variables: // - AppName FormatDLQTopic string `cfg:"format_dlq_topic"` // Validation is a configuration for validation when consumer initialized. Validation Validation `cfg:"validation"` }
type DLQConfig ¶ added in v0.1.1
type DLQConfig struct { // Disabled is a flag to disable DLQ. // - Default is false. // - If topic is not set, it will be generated from format_dlq_topic. // - If topic and format_dlq_topic is not set, dlq will be disabled! Disabled bool `cfg:"disabled"` // RetryInterval is a time interval to retry again of DLQ messages. // - Default is 10 seconds. RetryInterval time.Duration `cfg:"retry_interval"` // StartOffset is used when there is no committed offset for GroupID. // // Available options: // 0 : Start consuming from the earliest offset. // -1 : Start consuming from the latest offset. // 0 < n : Start consuming from the offset n. StartOffset int64 `cfg:"start_offset"` // Skip are optional message offsets to be skipped. // // The format is a map of partition directyl and offsets. // 0: // partition number // offsets: // list of offsets to skip // - 31 // - 90 // before: 20 // skip all offsets before or equal to this offset Skip map[int32]OffsetConfig `cfg:"skip"` // Topic is a topic name to send messages that failed to process also could be used for DLQ. Topic string `cfg:"topic"` // TopicExtra is extra a list of kafka topics to just consume from DLQ. TopicsExtra []string `cfg:"topics_extra"` }
type DLQError ¶ added in v0.1.1
type DLQError struct { // Err is default error to add in header. // If not setted, header will just show "DLQ indexed error" Err error // Indexes to use send specific batch index to DLQ. // If index's error is nil, default error is used. Indexes map[int]error }
DLQError is use with callback function to send message to DLQ topic.
func WrapErrDLQ ¶ added in v0.1.1
type GroupIDValidation ¶ added in v0.0.7
type GroupIDValidation struct { Enabled bool `cfg:"enabled"` // RgxGroupID is a regex pattern to validate RgxGroupID. RgxGroupID string `cfg:"rgx_group_id"` }
GroupIDValidation is a configuration for group_id validation.
func (GroupIDValidation) Validate ¶ added in v0.0.7
func (v GroupIDValidation) Validate(groupID string) error
type Header ¶
type Header = kgo.RecordHeader
type Meter ¶ added in v0.2.1
type OffsetConfig ¶ added in v0.1.1
type OffsetConfig struct { // Offsets is a list of offsets numbers in that partition to skip. Offsets []int64 `cfg:"offsets"` // Before skips all offsets before or equal to this offset. // In other words, before is 10, the listener skips all offsets that // are <= 10, so offsets 1, 2, 3, 4, 5, 6, 7, 8, 9 and 10 will be skipped // but not 11, 12, 13, ... Before int64 `cfg:"before"` }
type Option ¶
type Option func(*options)
func WithAppName ¶ added in v0.1.0
WithAppName to set app name in kafka server. Default is using idProgname variable.
Use WithClientInfo instead if you want to set version and appname.
func WithAutoTopicCreation ¶
WithAutoTopicCreation to enable auto topic creation for producer and consumer.
Default is enabled.
- Producer will fail if topic not exists and auto topic creation is disabled.
- Consumer will wait and not fail when is disabled.
func WithClientID ¶
WithClientID to set client_id in kafka server. Default is using DefaultClientID variable.
No need to set most of time!
func WithClientInfo ¶ added in v0.0.5
WithClientInfo to set client_id in kafka server. Not usable with WithClientID option.
- appname:version@hostname
func WithConsumer ¶
func WithConsumer(cfg ConsumerConfig) Option
WithConsumer configures the client to use the provided consumer config.
- It is shallow copied and to make safe use skip function to modify skip map.
func WithKGOOptions ¶
WithKGOOptions to set kgo options.
func WithKGOOptionsDLQ ¶ added in v0.1.0
WithKGOOptionsDLQ to set kgo options for DLQ client.
If empty than it will use same options as main client.
func WithLogger ¶ added in v0.1.0
WithLogger configures the client to use the provided logger.
- For zerolog logz.AdapterKV{Log: logger} can usable.
- Default is using zerolog's global logger.
func WithNoLogger ¶ added in v0.1.0
WithNoLogger to disable logger.
func WithPing ¶ added in v0.3.2
WithPing to ping kafka brokers on client creation.
- Default is enabled.
func WithPingBackoff ¶ added in v0.3.2
func WithPingBackoff(b backoff.BackOff) Option
func WithPingRetry ¶ added in v0.3.2
WithPingRetry to retry ping kafka brokers on client creation.
type OptionConsumer ¶ added in v0.0.7
type OptionConsumer func(*optionConsumer) error
func WithCallbackDLQ ¶ added in v0.1.0
func WithCallbackDLQ[T any](fn func(ctx context.Context, msg T) error) OptionConsumer
WithCallbackDLQ to set wkafka consumer's callback function for DLQ.
- Use this option if you want to process DLQ messages in different function.
func WithDecode ¶ added in v0.0.7
WithDecode to set wkafka consumer's decode function.
- Default is json.Unmarshal.
- Use this option after the WithCallback option.
func WithPreCheck ¶ added in v0.0.7
WithPreCheck to set wkafka consumer's pre check function.
- Return ErrSkip will skip the message.
type OptionProducer ¶ added in v0.1.0
type OptionProducer func(producerConfigInf) error
func WithEncoder ¶ added in v0.1.0
func WithEncoder[T any](fn func(T) ([]byte, error)) OptionProducer
WithEncoder to set encoder function.
func WithHeaders ¶ added in v0.1.0
func WithHeaders[T any](headers ...Header) OptionProducer
WithHeaders to append headers.
type ProcessorDecode ¶ added in v0.0.3
type ProcessorPreCheck ¶ added in v0.0.3
type Producer ¶
type Producer[T any] struct { // contains filtered or unexported fields }
func NewProducer ¶
NewProducer to create a new procuder with type.
- If data is []byte, Encoder will be ignored.
- WithHook, WithEncoder, WithHeaders options can be used.
type Record ¶
func CtxRecord ¶ added in v0.1.0
CtxRecord returns the *Record from the context in callback function.
- If the context is nil, or the Record is not set, nil is returned.
- This is only used in callback function.
- Don't edit the returned *Record!
func CtxRecordBatch ¶ added in v0.1.0
CtxRecordBatch returns the []*Record from the context in callback function.
- If the context is nil, or the Record is not set, nil is returned.
- This is only used in batch callback function.
- Don't edit the returned []*Record!
type SalsConfig ¶
type SaslConfigs ¶
type SaslConfigs []SalsConfig
type SaslPlain ¶
type SaslPlain struct { // Enabled this config. Enabled bool `cfg:"enabled"` // Zid is an optional authorization ID to use in authenticating. Zid string `cfg:"zid"` // User is the SASL username. User string `cfg:"user"` // Pass is the SASL password. Pass string `cfg:"pass" log:"false"` }
SaslPlain contains options for SASL/SCRAM authentication.
type SaslSCRAM ¶
type SaslSCRAM struct { // Enabled this config. Enabled bool `cfg:"enabled"` // Algorithm valid values are "SCRAM-SHA-256" and "SCRAM-SHA-512". // Empty is plain SASL. Algorithm string `cfg:"algorithm"` // Zid is an optional authorization ID to use in authenticating. Zid string `cfg:"zid"` // Username is the SASL username. User string `cfg:"user"` // Pass is the SASL password. Pass string `cfg:"pass" log:"false"` // IsToken, if true, suffixes the "tokenauth=true" extra attribute to // the initial authentication message. // // Set this to true if the user and pass are from a delegation token. IsToken bool `cfg:"is_token"` }
type SecurityConfig ¶
type SecurityConfig struct { TLS TLSConfig `cfg:"tls"` SASL SaslConfigs `cfg:"sasl"` }
SecurityConfig contains options for TLS and SASL authentication. Zero value is used if the kafka instance has a plaintext listener.
type SkipMap ¶ added in v0.3.0
type SkipMap = map[string]map[int32]OffsetConfig
type TLSConfig ¶
type TLSConfig struct { // Enabled is whether TLS is enabled. Enabled bool `cfg:"enabled"` // CertFile is the path to the client's TLS certificate. // Should be use with KeyFile. CertFile string `cfg:"cert_file"` // KeyFile is the path to the client's TLS key. // Should be use with CertFile. KeyFile string `cfg:"key_file"` // CAFile is the path to the CA certificate. // If empty, the server's root CA set will be used. CAFile string `cfg:"ca_file"` }
TLSConfig contains options for TLS authentication.
type Validation ¶ added in v0.0.6
type Validation struct {
GroupID GroupIDValidation `cfg:"group_id"`
}
Validation is a configuration for validation when consumer initialized.
func (Validation) Validate ¶ added in v0.0.7
func (v Validation) Validate(consumerConfig *ConsumerConfig) error