wkafka

package module
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2024 License: MIT Imports: 25 Imported by: 5

README

wkafka

License Coverage GitHub Workflow Status Go Report Card Go PKG

wkafka is a wrapper for kafka library to initialize and use for microservices.

go get github.com/worldline-go/wkafka

This library is using franz-go.

Usage

First set the connection config to create a new kafka client.
Main config struct that contains brokers, security settings and consumer validation.

brokers: # list of brokers, default is empty
  - localhost:9092
security:
  tls:
    enabled: false
    cert_file: ""
    key_file: ""
    ca_file: ""
  sasl: # SASL/SCRAM authentication could be multiple and will be used in order
    - plain:
        enabled: false
        user: ""
        pass: ""
      scram:
        enabled: false
        algorithm: "" # "SCRAM-SHA-256" or "SCRAM-SHA-512"
        user: ""
        pass: ""
consumer: # consumer validation and default values
  prefix_group_id: "" # add always a prefix to group id
  format_dlq_topic: "" # format dead letter topic name, ex: "finops_{{.AppName}}_dlq"
  validation:
    group_id: # validate group id
      enabled: false
      rgx_group_id: "" # regex to validate group id ex: "^finops_.*$"
Consumer

For creating a consumer we need to give additional consumer config when initializing the client.

topics: [] # list of topics to subscribe
group_id: "" # group id to subscribe, make is as unique as possible per service
# start offset to consume, 0 is the earliest offset, -1 is the latest offset and more than 0 is the offset number
# group_id has already committed offset then this will be ignored
start_offset: 0 # -1 to start end of the offsets
skip: # this is programatically skip, kafka will still consume the message
  # example skip topic and offset
  mytopic: # topic name to skip
    0: # partition number
      offsets: # list of offsets to skip
        - 31
        - 90
      before: 20 # skip all offsets before or equal to this offset
# max records to consume per poll, 0 is default value from kafka usually 500
# no need to touch most of the time, but batch consume's count max is max_poll_records
max_poll_records: 0 
# max records to consume per batch to give callback function, default is 100
# if this value is more than max_poll_records then max_poll_records will be used
batch_count: 100
dlq:
  disabled: false # disable dead letter queue
  topic: "" # dead letter topic name, it can be assigned in the kafka config's format_dlq_topic
  retry_interval: "10s" # retry time interval of the message if can't be processed, default is 10s
  start_offset: 0 # -1 to start end of the offsets
  skip: # same as skip but just for dead letter topic and not need to specify topic name
    # example skip offset
    0:
      offsets:
        - 31
      before: 20

Always give the client information so we can view in publish message's headers and kafka UI.

client, err := wkafka.New(
  ctx, kafkaConfig,
  wkafka.WithConsumer(consumeConfig),
  wkafka.WithClientInfo("testapp", "v0.1.0"),
)
if err != nil {
  return err
}

defer client.Close()

Now you need to run consumer with a callback function.
There is 2 options to run consumer, batch or single (WithCallbackBatch or WithCallback).
Default decoder is json, but you can change it with WithDecode option.
If you use []byte as data type then raw data will be passed to the callback function, batch consumer like [][]byte type.

// example single consumer
if err := client.Consume(ctx, wkafka.WithCallback(ProcessSingle)); err != nil {
  return fmt.Errorf("consume: %w", err)
}

Send record to dead letter queue, use WrapErrDLQ function with to wrap the error and it will be send to dead letter queue.

Check the aditional options for custom decode and precheck.

Skip Handler

Editing the skip map and use our handler to initialize server mux.

// import github.com/worldline-go/wkafka/handler

mux := http.NewServeMux()
mux.Handle(handler.New(client))

reflector := grpcreflect.NewStaticReflector(wkafkaconnect.WkafkaServiceName)

mux.Handle(grpcreflect.NewHandlerV1(reflector))
mux.Handle(grpcreflect.NewHandlerV1Alpha(reflector))
Handler Example
make env

# run the example
EXAMPLE=consumer_single_handler make example

Add messages in here to skip the message http://localhost:7071

Go to this side for grpcUI http://localhost:8082/#/grpc/wkafka

Producer

Use consumer client or create without consumer settings, New also try to connect to brokers.

client, err := wkafka.New(kafkaConfig)
if err != nil {
    return err
}
defer client.Close()

Create a producer based of client and specific data type.

WithHook, WithEncoder, WithHeaders options are optional.
Use WithHook to get metadata of the record and modify to produce record.

producer, err := wkafka.NewProducer[*Data](client, "test", wkafka.WithHook(ProduceHook))
if err != nil {
  return err
}

return producer.Produce(ctx, data)

Development

Initialize kafka and redpanda console with docker-compose.

# using "docker compose" command, if you use podman then add compose extension and link docker with podman binary
make env
Service Description
localhost:9092 Kafka broker
localhost:7071 Redpanda console

Use examples with EXAMPLE env variable:

EXAMPLE=... make example

Documentation

Index

Constants

View Source
const (
	// KeyRecord is the context key for *Record.
	KeyRecord ctxKey = "kafka_record"
	// KeyIsDLQProcess is the context key for is DLQ.
	KeyIsDLQProcess ctxKey = "kafka_is_dlq"
	// KeyIsDLQEnabled is the context key for is DLQ enabled.
	KeyIsDLQEnabled ctxKey = "kafka_is_dlq_enabled"
)
View Source
const HeaderServiceKey = "service"

Variables

View Source
var (

	// ErrSkip is use to skip message in the PreCheck hook or Decode function.
	ErrSkip = errors.New("skip message")
	// ErrDLQ use with callback function to send message to DLQ topic.
	// Prefer to use WrapErrDLQ to wrap error.
	ErrDLQ = errors.New("error DLQ")
)
View Source
var (
	ScramSha256 = "SCRAM-SHA-256"
	ScramSha512 = "SCRAM-SHA-512"
)
View Source
var DefaultBatchCount = 100

DefaultBatchCount is default batch count for batch consumer, if not set.

View Source
var (
	DefaultClientID string
)
View Source
var DefaultRetryInterval = 10 * time.Second
View Source
var ErrConnection = errors.New("connect to kafka brokers failed")

Functions

func CtxIsDLQEnabled added in v0.2.0

func CtxIsDLQEnabled(ctx context.Context) bool

CtxIsDLQEnabled usable in the callback function to understand DLQ is enabled.

  • If the context is nil, or the KeyIsDLQEnabled is not set, false is returned.

func CtxIsDLQProcess added in v0.2.0

func CtxIsDLQProcess(ctx context.Context) bool

CtxIsDLQProcess usable in the callback function to understand processing a DLQ message.

  • If the context is nil, or the KeyIsDLQProcess is not set, false is returned.

func SkipAppend added in v0.3.0

func SkipAppend(skip SkipMap) func(SkipMap) SkipMap

func SkipReplace added in v0.3.0

func SkipReplace(skip SkipMap) func(SkipMap) SkipMap

Types

type CallBackFunc added in v0.1.0

type CallBackFunc func(*optionConsumer) error

func WithCallback added in v0.0.7

func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc

WithCallback to set wkafka consumer's callback function.

  • Default is json.Unmarshal, use WithDecode option to add custom decode function.
  • If []byte then default decode function will be skipped.

func WithCallbackBatch added in v0.0.7

func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallBackFunc

WithCallbackBatch to set wkafka consumer's callback function.

  • Default is json.Unmarshal, use WithDecode option to add custom decode function.
  • If [][]byte then default decode function will be skipped.

type Client

type Client struct {
	Kafka    *kgo.Client
	KafkaDLQ *kgo.Client

	Brokers []string

	Meter Meter
	// contains filtered or unexported fields
}

func New added in v0.0.7

func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error)

func (*Client) Admin

func (c *Client) Admin() *kadm.Client

Admin returns an admin client to manage kafka.

func (*Client) Close

func (c *Client) Close()

func (*Client) Consume

func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...OptionConsumer) error

Consume starts consuming messages from kafka and blocks until context is done or an error occurs.

  • Only works if client is created with consumer config.
  • Just run one time.

func (*Client) DLQTopics added in v0.3.2

func (c *Client) DLQTopics() []string

func (*Client) GetClientID added in v0.3.3

func (c *Client) GetClientID() []byte

func (*Client) ProduceRaw

func (c *Client) ProduceRaw(ctx context.Context, records []*kgo.Record) error

Produce sends a message to kafka. For type producer check wkafka.NewProducer.

func (*Client) Skip added in v0.3.0

func (c *Client) Skip(modify func(SkipMap) SkipMap)

type Config

type Config struct {
	// Brokers is a list of kafka brokers to connect to.
	// Not all brokers need to be specified, the list is so that
	// if one broker is unavailable, another can be used.
	// Required at least one broker. Example value is 'localhost:9092'.
	Brokers  []string       `cfg:"brokers"`
	Security SecurityConfig `cfg:"security"`
	// Compressions is chosen in the order preferred based on broker support.
	// The default is to use no compression.
	//  Available:
	//  - gzip
	//  - snappy
	//  - lz4
	//  - zstd
	Compressions []string `cfg:"compressions"`

	// Consumer is a pre configuration for consumer and validation.
	Consumer ConsumerPreConfig `cfg:"consumer"`
}

type ConsumerConfig added in v0.0.7

type ConsumerConfig struct {
	// Topics is a list of kafka topics to consume.
	// Required at least one topic, topic name if not exist will be created or consumer waits for topic creation.
	Topics []string `cfg:"topics"`
	// GroupID is the kafka consumer group ID, unique per application.
	// All created with the same ID will not be able to see messages that another consumer
	// is consuming. If a message is committed and that consumer fails for some reason,
	// another consumer with the same group ID will pick up where the other left off.
	// Required.
	GroupID string `cfg:"group_id"`
	// StartOffset is used when there is no committed offset for GroupID.
	//
	// Available options:
	//      0 : Start consuming from the earliest offset.
	//     -1 : Start consuming from the latest offset.
	//  0 < n : Start consuming from the offset n.
	StartOffset int64 `cfg:"start_offset"`
	// Skip are optional message offsets to be skipped and not streamed by the consumer callback.
	//
	// This is a programmatic feature and message will come and check by the library.
	//
	// The format is a map of topic to partition to offsets.
	//  mytopic: // topic name
	//    0: // partition number
	//      offsets: // list of offsets to skip
	//        - 31
	//        - 90
	//      before: 20 // skip all offsets before or equal to this offset
	Skip map[string]map[int32]OffsetConfig `cfg:"skip"`
	// MaxPollRecords is the maximum number of records returned in a single call to poll.
	//  - Default is max.poll.records in the broker configuration, usually 500.
	//  - Fetching messages from broker, this is not related with batch processing!
	MaxPollRecords int `cfg:"max_poll_records"`
	// BatchCount is a number of messages processed in a single batch.
	//  - Processing count could be less than BatchCount if the batch is not full.
	//  - Usable with WithConsumerBatch
	//  - Default is 100.
	BatchCount int `cfg:"batch_count"`
	// DLQ is a dead letter queue configuration.
	DLQ DLQConfig `cfg:"dlq"`
}

type ConsumerPreConfig added in v0.0.7

type ConsumerPreConfig struct {
	// PrefixGroupID add prefix to group_id.
	PrefixGroupID string `cfg:"prefix_group_id"`
	// FormatDLQTopic is a format string to generate DLQ topic name.
	//  - Example is "finops_{{.AppName}}_dlq"
	//  - It should be exist if DLQ is enabled and topic is not set.
	//
	//  - Available variables:
	//    - AppName
	FormatDLQTopic string `cfg:"format_dlq_topic"`
	// Validation is a configuration for validation when consumer initialized.
	Validation Validation `cfg:"validation"`
}

type DLQConfig added in v0.1.1

type DLQConfig struct {
	// Disabled is a flag to disable DLQ.
	//  - Default is false.
	//  - If topic is not set, it will be generated from format_dlq_topic.
	//  - If topic and format_dlq_topic is not set, dlq will be disabled!
	Disabled bool `cfg:"disabled"`
	// RetryInterval is a time interval to retry again of DLQ messages.
	// - Default is 10 seconds.
	RetryInterval time.Duration `cfg:"retry_interval"`
	// StartOffset is used when there is no committed offset for GroupID.
	//
	// Available options:
	//      0 : Start consuming from the earliest offset.
	//     -1 : Start consuming from the latest offset.
	//  0 < n : Start consuming from the offset n.
	StartOffset int64 `cfg:"start_offset"`
	// Skip are optional message offsets to be skipped.
	//
	// The format is a map of partition directyl and offsets.
	//  0: // partition number
	//    offsets: // list of offsets to skip
	//      - 31
	//      - 90
	//    before: 20 // skip all offsets before or equal to this offset
	Skip map[int32]OffsetConfig `cfg:"skip"`
	// Topic is a topic name to send messages that failed to process also could be used for DLQ.
	Topic string `cfg:"topic"`
	// TopicExtra is extra a list of kafka topics to just consume from DLQ.
	TopicsExtra []string `cfg:"topics_extra"`
}

type DLQError added in v0.1.1

type DLQError struct {
	// Err is default error to add in header.
	// If not setted, header will just show "DLQ indexed error"
	Err error
	// Indexes to use send specific batch index to DLQ.
	// If index's error is nil, default error is used.
	Indexes map[int]error
}

DLQError is use with callback function to send message to DLQ topic.

func WrapErrDLQ added in v0.1.1

func WrapErrDLQ(err error) *DLQError

func (*DLQError) Error added in v0.1.1

func (e *DLQError) Error() string

type GroupIDValidation added in v0.0.7

type GroupIDValidation struct {
	Enabled bool `cfg:"enabled"`
	// RgxGroupID is a regex pattern to validate RgxGroupID.
	RgxGroupID string `cfg:"rgx_group_id"`
}

GroupIDValidation is a configuration for group_id validation.

func (GroupIDValidation) Validate added in v0.0.7

func (v GroupIDValidation) Validate(groupID string) error
type Header = kgo.RecordHeader

type LogNoop added in v0.3.2

type LogNoop struct{}

func (LogNoop) Debug added in v0.3.2

func (LogNoop) Debug(_ string, _ ...interface{})

func (LogNoop) Error added in v0.3.2

func (LogNoop) Error(_ string, _ ...interface{})

func (LogNoop) Info added in v0.3.2

func (LogNoop) Info(_ string, _ ...interface{})

func (LogNoop) Warn added in v0.3.2

func (LogNoop) Warn(_ string, _ ...interface{})

type Logger added in v0.3.2

type Logger interface {
	Error(msg string, keysAndValues ...interface{})
	Info(msg string, keysAndValues ...interface{})
	Debug(msg string, keysAndValues ...interface{})
	Warn(msg string, keysAndValues ...interface{})
}

type Meter added in v0.2.1

type Meter interface {
	Meter(start time.Time, batchSize int64, topic string, err error, isDLQ bool)
}

func NewMeter added in v0.2.1

func NewMeter(meterFunc func(time.Time, int64, string, error, bool)) Meter

type OffsetConfig added in v0.1.1

type OffsetConfig struct {
	// Offsets is a list of offsets numbers in that partition to skip.
	Offsets []int64 `cfg:"offsets"`
	// Before skips all offsets before or equal to this offset.
	// In other words, before is 10, the listener skips all offsets that
	// are <= 10, so offsets 1, 2, 3, 4, 5, 6, 7, 8, 9 and 10 will be skipped
	// but not 11, 12, 13, ...
	Before int64 `cfg:"before"`
}

type Option

type Option func(*options)

func WithAppName added in v0.1.0

func WithAppName(appName string) Option

WithAppName to set app name in kafka server. Default is using idProgname variable.

Use WithClientInfo instead if you want to set version and appname.

func WithAutoTopicCreation

func WithAutoTopicCreation(v bool) Option

WithAutoTopicCreation to enable auto topic creation for producer and consumer.

Default is enabled.

  • Producer will fail if topic not exists and auto topic creation is disabled.
  • Consumer will wait and not fail when is disabled.

func WithClientID

func WithClientID(clientID string) Option

WithClientID to set client_id in kafka server. Default is using DefaultClientID variable.

No need to set most of time!

func WithClientInfo added in v0.0.5

func WithClientInfo(appName, version string) Option

WithClientInfo to set client_id in kafka server. Not usable with WithClientID option.

  • appname:version@hostname

func WithConsumer

func WithConsumer(cfg ConsumerConfig) Option

WithConsumer configures the client to use the provided consumer config.

  • It is shallow copied and to make safe use skip function to modify skip map.

func WithKGOOptions

func WithKGOOptions(opts ...kgo.Opt) Option

WithKGOOptions to set kgo options.

func WithKGOOptionsDLQ added in v0.1.0

func WithKGOOptionsDLQ(opts ...kgo.Opt) Option

WithKGOOptionsDLQ to set kgo options for DLQ client.

If empty than it will use same options as main client.

func WithLogger added in v0.1.0

func WithLogger(logger Logger) Option

WithLogger configures the client to use the provided logger.

  • For zerolog logz.AdapterKV{Log: logger} can usable.
  • Default is using zerolog's global logger.

func WithMeter added in v0.2.1

func WithMeter(m Meter) Option

func WithNoLogger added in v0.1.0

func WithNoLogger(v bool) Option

WithNoLogger to disable logger.

func WithPing added in v0.3.2

func WithPing(v bool) Option

WithPing to ping kafka brokers on client creation.

  • Default is enabled.

func WithPingBackoff added in v0.3.2

func WithPingBackoff(b backoff.BackOff) Option

func WithPingRetry added in v0.3.2

func WithPingRetry(v bool) Option

WithPingRetry to retry ping kafka brokers on client creation.

type OptionConsumer added in v0.0.7

type OptionConsumer func(*optionConsumer) error

func WithCallbackDLQ added in v0.1.0

func WithCallbackDLQ[T any](fn func(ctx context.Context, msg T) error) OptionConsumer

WithCallbackDLQ to set wkafka consumer's callback function for DLQ.

  • Use this option if you want to process DLQ messages in different function.

func WithDecode added in v0.0.7

func WithDecode[T any](fn func(raw []byte, r *kgo.Record) (T, error)) OptionConsumer

WithDecode to set wkafka consumer's decode function.

  • Default is json.Unmarshal.
  • Use this option after the WithCallback option.

func WithPreCheck added in v0.0.7

func WithPreCheck(fn func(ctx context.Context, r *kgo.Record) error) OptionConsumer

WithPreCheck to set wkafka consumer's pre check function.

  • Return ErrSkip will skip the message.

type OptionProducer added in v0.1.0

type OptionProducer func(producerConfigInf) error

func WithEncoder added in v0.1.0

func WithEncoder[T any](fn func(T) ([]byte, error)) OptionProducer

WithEncoder to set encoder function.

func WithHeaders added in v0.1.0

func WithHeaders[T any](headers ...Header) OptionProducer

WithHeaders to append headers.

func WithHook added in v0.1.0

func WithHook[T any](fn func(T, *Record) error) OptionProducer

WithHook to set hook function.

  • Hook will be called before Encoder.
  • If Hook return ErrSkip, record will be skip.
  • If Hook not set any value to record, Encoder will be called.

type Processor added in v0.0.3

type Processor[T any] interface {
	Process(ctx context.Context, msg T) error
}

type ProcessorDecode added in v0.0.3

type ProcessorDecode[T any] interface {
	Decode(raw []byte, r *kgo.Record) (T, error)
}

type ProcessorPreCheck added in v0.0.3

type ProcessorPreCheck interface {
	PreCheck(ctx context.Context, r *kgo.Record) error
}

type Producer

type Producer[T any] struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer[T any](client *Client, topic string, opts ...OptionProducer) (*Producer[T], error)

NewProducer to create a new procuder with type.

  • If data is []byte, Encoder will be ignored.
  • WithHook, WithEncoder, WithHeaders options can be used.

func (*Producer[T]) GetTopic added in v0.3.3

func (p *Producer[T]) GetTopic() string

GetTopic to get producer default topic.

func (*Producer[T]) Produce

func (p *Producer[T]) Produce(ctx context.Context, data ...T) error

type Record

type Record = kgo.Record

func CtxRecord added in v0.1.0

func CtxRecord(ctx context.Context) *Record

CtxRecord returns the *Record from the context in callback function.

  • If the context is nil, or the Record is not set, nil is returned.
  • This is only used in callback function.
  • Don't edit the returned *Record!

func CtxRecordBatch added in v0.1.0

func CtxRecordBatch(ctx context.Context) []*Record

CtxRecordBatch returns the []*Record from the context in callback function.

  • If the context is nil, or the Record is not set, nil is returned.
  • This is only used in batch callback function.
  • Don't edit the returned []*Record!

type SalsConfig

type SalsConfig struct {
	Plain SaslPlain `cfg:"plain"`
	SCRAM SaslSCRAM `cfg:"scram"`
}

func (SalsConfig) Generate

func (c SalsConfig) Generate() (sasl.Mechanism, error)

type SaslConfigs

type SaslConfigs []SalsConfig

func (SaslConfigs) Generate

func (c SaslConfigs) Generate() ([]sasl.Mechanism, error)

type SaslPlain

type SaslPlain struct {
	// Enabled this config.
	Enabled bool `cfg:"enabled"`
	// Zid is an optional authorization ID to use in authenticating.
	Zid string `cfg:"zid"`
	// User is the SASL username.
	User string `cfg:"user"`
	// Pass is the SASL password.
	Pass string `cfg:"pass" log:"false"`
}

SaslPlain contains options for SASL/SCRAM authentication.

func (SaslPlain) Generate

func (s SaslPlain) Generate() (sasl.Mechanism, error)

type SaslSCRAM

type SaslSCRAM struct {
	// Enabled this config.
	Enabled bool `cfg:"enabled"`
	// Algorithm valid values are "SCRAM-SHA-256" and "SCRAM-SHA-512".
	// Empty is plain SASL.
	Algorithm string `cfg:"algorithm"`
	// Zid is an optional authorization ID to use in authenticating.
	Zid string `cfg:"zid"`
	// Username is the SASL username.
	User string `cfg:"user"`
	// Pass is the SASL password.
	Pass string `cfg:"pass" log:"false"`
	// IsToken, if true, suffixes the "tokenauth=true" extra attribute to
	// the initial authentication message.
	//
	// Set this to true if the user and pass are from a delegation token.
	IsToken bool `cfg:"is_token"`
}

func (SaslSCRAM) Generate

func (s SaslSCRAM) Generate() (sasl.Mechanism, error)

type SecurityConfig

type SecurityConfig struct {
	TLS  TLSConfig   `cfg:"tls"`
	SASL SaslConfigs `cfg:"sasl"`
}

SecurityConfig contains options for TLS and SASL authentication. Zero value is used if the kafka instance has a plaintext listener.

type SkipMap added in v0.3.0

type SkipMap = map[string]map[int32]OffsetConfig

type TLSConfig

type TLSConfig struct {
	// Enabled is whether TLS is enabled.
	Enabled bool `cfg:"enabled"`
	// CertFile is the path to the client's TLS certificate.
	// Should be use with KeyFile.
	CertFile string `cfg:"cert_file"`
	// KeyFile is the path to the client's TLS key.
	// Should be use with CertFile.
	KeyFile string `cfg:"key_file"`
	// CAFile is the path to the CA certificate.
	// If empty, the server's root CA set will be used.
	CAFile string `cfg:"ca_file"`
}

TLSConfig contains options for TLS authentication.

func (TLSConfig) Generate

func (t TLSConfig) Generate() (*tls.Config, error)

Generate returns a tls.Config based on the TLSConfig.

If the TLSConfig is empty, nil is returned.

type Validation added in v0.0.6

type Validation struct {
	GroupID GroupIDValidation `cfg:"group_id"`
}

Validation is a configuration for validation when consumer initialized.

func (Validation) Validate added in v0.0.7

func (v Validation) Validate(consumerConfig *ConsumerConfig) error

Directories

Path Synopsis
example module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL