kafka

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package kafka provides methods to interact with Apache Kafka offering functionality for both producing and consuming messages from kafka-topics.

Index

Constants

View Source
const (
	SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
	PLAIN               = "PLAIN"
)
View Source
const (
	// OffsetNewest stands for the log head offset, i.e. the offset that will be
	// assigned to the next message that will be produced to the partition.
	OffsetNewest int64 = sarama.OffsetNewest
	// OffsetOldest stands for the oldest offset available on the broker for a
	// partition. You can send this to a client's GetOffset method to get this
	// offset, or when calling ConsumePartition to start consuming from the
	// oldest offset that is still available on the broker.
	OffsetOldest int64 = sarama.OffsetOldest
)

Variables

Functions

func NewKafkaProducer

func NewKafkaProducer(config *Config) (sarama.SyncProducer, error)

NewKafkaProducer returns a kafka producer object created using the configs provided. returns error if configs are invalid

func NewKafkaWithAvro

func NewKafkaWithAvro(config *AvroWithKafkaConfig, logger log.Logger) (pubsub.PublisherSubscriber, error)

NewKafkaWithAvro initialize Kafka with Avro when EventHubConfig and AvroConfig are right

Types

type AvroWithKafkaConfig

type AvroWithKafkaConfig struct {
	KafkaConfig Config
	AvroConfig  avro.Config
}

AvroWithKafkaConfig represents a configuration for using Avro with Kafka

type Config

type Config struct {
	// Brokers comma separated kafka brokers
	Brokers string

	// SASL provide configs for authentication
	SASL SASLConfig

	// MaxRetry number of times to retry sending a failing message
	MaxRetry int

	// RetryFrequency backoff time in milliseconds before retrying
	RetryFrequency int

	// Topics multiple topics to subscribe messages from
	// first topic will be used for publishing the message
	Topics []string

	// GroupID consumer group id
	GroupID string

	Config *sarama.Config

	// ConnRetryDuration for specifying connection retry duration
	ConnRetryDuration int

	// Offsets is slice of TopicPartition in which "Topic","Partition" and "Offset"
	// are the field needed to be set to start consuming from specific offset
	Offsets []pubsub.TopicPartition

	InitialOffsets int64

	// This config will allow application to disable kafka consumer auto commit
	DisableAutoCommit bool
}

Config provide values for kafka producer and consumer

type Consumer

type Consumer struct {
	ConsumerGroup        sarama.ConsumerGroup
	ConsumerGroupHandler *ConsumerHandler
	// contains filtered or unexported fields
}

Consumer is a wrapper on sarama ConsumerGroup.

func NewKafkaConsumer

func NewKafkaConsumer(config *Config) (*Consumer, error)

NewKafkaConsumer returns a kafka consumer object created using the configs provided. returns error if configs are invalid

type ConsumerHandler

type ConsumerHandler struct {
	// contains filtered or unexported fields
}

ConsumerHandler represents a Sarama consumer group message handler. It is responsible for handling the messages in a specific topic/partition. The handler implements methods called during the lifecycle of a consumer group session.

func (*ConsumerHandler) Cleanup

func (consumer *ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ConsumerHandler) ConsumeClaim

func (consumer *ConsumerHandler) ConsumeClaim(
	session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ConsumerHandler) Setup

func (consumer *ConsumerHandler) Setup(s sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

type Kafka

type Kafka struct {
	Producer sarama.SyncProducer
	Consumer *Consumer
	// contains filtered or unexported fields
}

Kafka is a client for interacting with Apache Kafka.

func New

func New(config *Config, logger log.Logger) (*Kafka, error)

New establishes connection to Kafka using the config provided in KafkaConfig

func NewKafkaFromEnv

func NewKafkaFromEnv() (*Kafka, error)

NewKafkaFromEnv fetches the config from environment variables and tries to connect to Kafka Deprecated: Instead use pubsub.New

func (*Kafka) Bind

func (k *Kafka) Bind(message []byte, target interface{}) error

Bind parses the encoded data and stores the result in the value pointed to by target

func (*Kafka) CommitOffset

func (k *Kafka) CommitOffset(offsets pubsub.TopicPartition)

CommitOffset marks a particular offset on a specific partition as Read. The commits are performed asynchronously at intervals specified in Sarama Consumer.Offsets.AutoCommit

func (*Kafka) HealthCheck

func (k *Kafka) HealthCheck() types.Health

HealthCheck checks if consumer and producer are initialized and the connection is stable

func (*Kafka) IsSet

func (k *Kafka) IsSet() bool

IsSet checks whether kafka is initialized or not

func (*Kafka) Pause

func (k *Kafka) Pause() error

Pause suspends fetching from all partitions.

func (*Kafka) Ping

func (k *Kafka) Ping() error

Ping checks for the health of kafka, returns an error if it is down

func (*Kafka) PublishEvent

func (k *Kafka) PublishEvent(key string, value interface{}, headers map[string]string) (err error)

PublishEvent publishes the event to kafka

func (*Kafka) PublishEventWithOptions

func (k *Kafka) PublishEventWithOptions(key string, value interface{}, headers map[string]string,
	options *pubsub.PublishOptions) (err error)

PublishEventWithOptions publishes message to kafka. Ability to provide additional options described in PublishOptions struct

func (*Kafka) Resume

func (k *Kafka) Resume() error

Resume resumes all partitions which have been paused with Pause()

func (*Kafka) Subscribe

func (k *Kafka) Subscribe() (*pubsub.Message, error)

Subscribe method is responsible for consuming a single message from a sarama Consumer Group. When Subscribe is called first time we initiate consumer group session rebalance, which handles the partition assignment to multiple consumers in the group.

func (*Kafka) SubscribeWithCommit

func (k *Kafka) SubscribeWithCommit(f pubsub.CommitFunc) (*pubsub.Message, error)

SubscribeWithCommit calls the CommitFunc after subscribing message from kafka and based on the return values decides whether to commit message and consume another message

type SASLConfig

type SASLConfig struct {
	// User username to connect to protected kafka instance
	User string

	// Password password to connect to protected kafka instance
	Password string

	// Mechanism SASL mechanism used for authentication
	Mechanism string

	// SecurityProtocol SSL or PLAINTEXT
	SecurityProtocol string

	// SSLVerify set it to true if certificate verification is required
	SSLVerify bool
}

SASLConfig holds SASL authentication configurations for Kafka.

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
	Begin prepares the client for the SCRAM exchange with
    the server with a user name and a password

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

Done should return true when the SCRAM conversation is over.

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Step steps client through the SCRAM exchange. It is called repeatedly until it errors or `Done` returns true.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL