Documentation ¶
Index ¶
- func GetTransferChan(consumerBuffer int) chan *sarama.ProducerMessage
- func LogRPS(tag, topic string, influxAccessor influxlogger.InfluxD, rps []*uint64)
- type Config
- type Kafka
- func (k *Kafka) Close()
- func (k *Kafka) GetConsumerErrors()
- func (k *Kafka) InitConsumer(transferChan chan *sarama.ProducerMessage, reset bool) error
- func (k *Kafka) InitProducerFromConsumer(transferChan chan *sarama.ProducerMessage) error
- func (k *Kafka) Monitor()
- func (k *Kafka) Pull()
- func (k *Kafka) Push()
- func (k *Kafka) RPSTicker()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetTransferChan ¶
func GetTransferChan(consumerBuffer int) chan *sarama.ProducerMessage
GetTransferChan returns a transfer channel that can be used to pass messages around
Types ¶
type Config ¶
type Config struct { Topic string Zookeepers []string ConsumerGroupName string ConsumerBuffer int MaxErrors int MaxRetry int BatchSize int FlushInterval int ConsumerTransactions *uint64 ProducerTransactions *uint64 }
Config is a convenient wrapper for all the config values needed for InitKafka
type Kafka ¶
type Kafka struct { // Config Conf Config Brokers []string // Kafka stuff Producer sarama.AsyncProducer Consumer *consumergroup.ConsumerGroup ProducerChan chan *sarama.ProducerMessage TransferChan chan *sarama.ProducerMessage // other stuff WaitGroup *sync.WaitGroup Shutdown chan struct{} // contains filtered or unexported fields }
Kafka object that provides nice helper functions
func InitKafka ¶
func InitKafka(conf Config, influxAccessor influxlogger.InfluxD, signalChan chan struct{}, wg *sync.WaitGroup) (*Kafka, error)
InitKafka initializes the Kafka object creating some helper clients
func (*Kafka) GetConsumerErrors ¶
func (k *Kafka) GetConsumerErrors()
GetConsumerErrors logs any consumer errors
func (*Kafka) InitConsumer ¶
func (k *Kafka) InitConsumer(transferChan chan *sarama.ProducerMessage, reset bool) error
InitConsumer sets up the consumer for kafka
func (*Kafka) InitProducerFromConsumer ¶
func (k *Kafka) InitProducerFromConsumer(transferChan chan *sarama.ProducerMessage) error
InitProducerFromConsumer sets up the producer for kafka using a consumer channel as input
Click to show internal directories.
Click to hide internal directories.