Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func KafkaValidateConf ¶
KafkaValidateConf validates supplied configuration
Types ¶
type KafkaClient ¶
type KafkaClient interface { NewProducer(KafkaCommon) (KafkaProducer, error) NewConsumer(KafkaCommon) (KafkaConsumer, error) Brokers() []*sarama.Broker }
KafkaClient is the kafka client
type KafkaCommon ¶
type KafkaCommon interface { ValidateConf() error Start() error Conf() conf.KafkaConf Producer() KafkaProducer }
KafkaCommon is the base interface for bridges that interact with Kafka
func NewKafkaCommon ¶
func NewKafkaCommon(kf KafkaFactory, conf conf.KafkaConf, kafkaGoRoutines KafkaGoRoutines) (k KafkaCommon)
NewKafkaCommon constructs a new KafkaCommon instance
type KafkaConsumer ¶
type KafkaConsumer interface { Close() error Messages() <-chan *sarama.ConsumerMessage Errors() <-chan error MarkOffset(*sarama.ConsumerMessage, string) }
KafkaConsumer provides the interface passed from KafkaCommon to consume messages
type KafkaFactory ¶
type KafkaFactory interface {
NewClient(KafkaCommon, *sarama.Config) (KafkaClient, error)
}
KafkaFactory builds new clients
type KafkaGoRoutines ¶
type KafkaGoRoutines interface { ConsumerMessagesLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup) ProducerErrorLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup) ProducerSuccessLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup) }
KafkaGoRoutines defines goroutines for processing Kafka messages from KafkaCommon
type KafkaProducer ¶
type KafkaProducer interface { AsyncClose() Input() chan<- *sarama.ProducerMessage Successes() <-chan *sarama.ProducerMessage Errors() <-chan *sarama.ProducerError }
KafkaProducer provides the interface passed from KafkaCommon to produce messages (subset of sarama)
type SaramaKafkaFactory ¶
type SaramaKafkaFactory struct{}
SaramaKafkaFactory - uses sarama
func (*SaramaKafkaFactory) NewClient ¶
func (f *SaramaKafkaFactory) NewClient(k KafkaCommon, clientConf *sarama.Config) (c KafkaClient, err error)
NewClient - returns a new client
Click to show internal directories.
Click to hide internal directories.