kafkacgo

package
v1.101.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package kafkacgo provides a simple high-level API for producing and consuming Apache Kafka messages.

Based on github.com/confluentinc/confluent-kafka-go/kafka, it abstracts away the complexities of the Kafka protocol and provides a simplified interface for working with Kafka topics.

It allows to specify custom message encoding and decoding functions, including serialization and encryption.

NOTE: This package depends on a C implementation, CGO must be enabled to use this package. For a non-CGO implementation see the github.com/Vonage/gosrvlib/pkg/kafka package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultMessageDecodeFunc added in v1.85.0

func DefaultMessageDecodeFunc(_ context.Context, msg []byte, data any) error

DefaultMessageDecodeFunc is the default function to decode a message for ReceiveData(). The value underlying data must be a pointer to the correct type for the next data item received.

func DefaultMessageEncodeFunc added in v1.85.0

func DefaultMessageEncodeFunc(_ context.Context, _ string, data any) ([]byte, error)

DefaultMessageEncodeFunc is the default function to encode the input data for SendData().

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a wrapper around kafka.Consumer.

func NewConsumer

func NewConsumer(urls, topics []string, groupID string, opts ...Option) (*Consumer, error)

NewConsumer creates a new instance of Consumer.

func (*Consumer) Close

func (c *Consumer) Close() error

Close cleans up Consumer's internal resources.

func (*Consumer) Receive

func (c *Consumer) Receive() ([]byte, error)

Receive reads one message from the Kafka; is blocked if no messages in the queue.

func (*Consumer) ReceiveData added in v1.85.0

func (c *Consumer) ReceiveData(ctx context.Context, data any) error

ReceiveData retrieves a message from the queue and extract its content in the data.

type Offset

type Offset string

Offset points to where Kafka should start to read messages from.

const (
	// OffsetLatest automatically reset the offset to the latest offset.
	OffsetLatest Offset = "latest"

	// OffsetEarliest automatically reset the offset to the earliest offset.
	OffsetEarliest Offset = "earliest"

	// OffsetNone throw an error to the consumerClient if no previous offset is found for the consumerClient's group.
	OffsetNone Offset = "none"
)

type Option

type Option func(*config)

Option is a type alias for a function that configures Kafka client.

func WithAutoOffsetResetPolicy

func WithAutoOffsetResetPolicy(p Offset) Option

WithAutoOffsetResetPolicy sets what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted).

func WithConfigParameter

func WithConfigParameter(key string, val kafka.ConfigValue) Option

WithConfigParameter extends kafka.ConfigMap with additional parameters. Parameters are listed at: * consumer: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html * producer: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html

func WithMessageDecodeFunc added in v1.85.0

func WithMessageDecodeFunc(f TDecodeFunc) Option

WithMessageDecodeFunc allow to replace DefaultMessageDecodeFunc(). This function used by ReceiveData() to decode a message encoded with messageEncodeFunc to the provided data object. The value underlying data must be a pointer to the correct type for the next data item received.

func WithMessageEncodeFunc added in v1.85.0

func WithMessageEncodeFunc(f TEncodeFunc) Option

WithMessageEncodeFunc allow to replace DefaultMessageEncodeFunc. This function used by SendData() to encode the input data.

func WithProduceChannelSize

func WithProduceChannelSize(size int) Option

WithProduceChannelSize sets the buffer size (in number of messages).

func WithSessionTimeout

func WithSessionTimeout(t time.Duration) Option

WithSessionTimeout sets the timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents a wrapper around kafka.Producer.

func NewProducer

func NewProducer(urls []string, opts ...Option) (*Producer, error)

NewProducer creates a new instance of Producer.

func (*Producer) Close

func (p *Producer) Close()

Close cleans up Producer's internal resources.

func (*Producer) Send

func (p *Producer) Send(topic string, msg []byte) error

Send sends a message to Kafka topic.

func (*Producer) SendData added in v1.85.0

func (p *Producer) SendData(ctx context.Context, topic string, data any) error

SendData delivers the specified data as encoded message to the queue.

type TDecodeFunc added in v1.85.0

type TDecodeFunc func(ctx context.Context, msg []byte, data any) error

TDecodeFunc is the type of function used to replace the default message decoding function used by ReceiveData().

type TEncodeFunc added in v1.85.0

type TEncodeFunc func(ctx context.Context, topic string, data any) ([]byte, error)

TEncodeFunc is the type of function used to replace the default message encoding function used by SendData().

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL