kafka

package
v1.83.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2024 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package kafka provides a simple high-level API for producing and consuming Apache Kafka messages. It abstracts away the complexities of the Kafka protocol and provides a simplified interface for working with Kafka topics.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a wrapper around kafka.Consumer.

func NewConsumer

func NewConsumer(brokers []string, topic, groupID string, opts ...Option) (*Consumer, error)

NewConsumer creates a new instance of Consumer. Please call the HealthCheck() method to check if the connection is working.

func (*Consumer) Close

func (c *Consumer) Close() error

Close cleans up Consumer's internal resources.

func (*Consumer) HealthCheck

func (c *Consumer) HealthCheck(ctx context.Context) error

HealthCheck checks if the consumer is working.

func (*Consumer) Receive

func (c *Consumer) Receive(ctx context.Context) ([]byte, error)

Receive reads one message from the Kafka; blocks if there are no messages in the queue.

type Option

type Option func(*config)

Option is a type alias for a function that configures Kafka client.

func WithFirstOffset

func WithFirstOffset() Option

WithFirstOffset tells Kafka consumer to read from the beginning of uncommitted offset. By default it will read from the end of the queue.

func WithSessionTimeout

func WithSessionTimeout(t time.Duration) Option

WithSessionTimeout sets the timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents a wrapper around kafka.Producer.

func NewProducer

func NewProducer(urls []string, topic string, opts ...Option) (*Producer, error)

NewProducer creates a new instance of Producer.

func (*Producer) Close

func (p *Producer) Close() error

Close cleans up Producer's internal resources.

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, msg []byte) error

Send sends a message to Kafka topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL