kafka

package
v0.9.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func KafkaValidateConf

func KafkaValidateConf(kconf conf.KafkaConf) (err error)

KafkaValidateConf validates supplied configuration

Types

type KafkaClient

type KafkaClient interface {
	NewProducer(KafkaCommon) (KafkaProducer, error)
	NewConsumer(KafkaCommon) (KafkaConsumer, error)
	Brokers() []*sarama.Broker
}

KafkaClient is the kafka client

type KafkaCommon

type KafkaCommon interface {
	ValidateConf() error
	Start() error
	Conf() conf.KafkaConf
	Producer() KafkaProducer
}

KafkaCommon is the base interface for bridges that interact with Kafka

func NewKafkaCommon

func NewKafkaCommon(kf KafkaFactory, conf conf.KafkaConf, kafkaGoRoutines KafkaGoRoutines) (k KafkaCommon)

NewKafkaCommon constructs a new KafkaCommon instance

type KafkaConsumer

type KafkaConsumer interface {
	Close() error
	Messages() <-chan *sarama.ConsumerMessage
	Errors() <-chan error
	MarkOffset(*sarama.ConsumerMessage, string)
}

KafkaConsumer provides the interface passed from KafkaCommon to consume messages

type KafkaFactory

type KafkaFactory interface {
	NewClient(KafkaCommon, *sarama.Config) (KafkaClient, error)
}

KafkaFactory builds new clients

type KafkaGoRoutines

type KafkaGoRoutines interface {
	ConsumerMessagesLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
	ProducerErrorLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
	ProducerSuccessLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
}

KafkaGoRoutines defines goroutines for processing Kafka messages from KafkaCommon

type KafkaProducer

type KafkaProducer interface {
	AsyncClose()
	Input() chan<- *sarama.ProducerMessage
	Successes() <-chan *sarama.ProducerMessage
	Errors() <-chan *sarama.ProducerError
}

KafkaProducer provides the interface passed from KafkaCommon to produce messages (subset of sarama)

type SaramaKafkaFactory

type SaramaKafkaFactory struct{}

SaramaKafkaFactory - uses sarama

func (*SaramaKafkaFactory) NewClient

func (f *SaramaKafkaFactory) NewClient(k KafkaCommon, clientConf *sarama.Config) (c KafkaClient, err error)

NewClient - returns a new client

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL