Documentation ¶
Overview ¶
Utils to simplify consuming from Kafka topics using Sarama.
Index ¶
- type Consumer
- type ConsumerChannel
- func (cc *ConsumerChannel) ConsumerAcknowledge()
- func (cc *ConsumerChannel) ConsumerCancel()
- func (cc *ConsumerChannel) ConsumerWaitRead() ([]byte, error)
- func (cc *ConsumerChannel) ProducerWaitAcknowledge() (bool, error)
- func (cc *ConsumerChannel) ProducerWrite(message []byte) error
- func (cc *ConsumerChannel) WaitUntilFinished()
- type ConsumerGroup
- type Listener
- type ListenerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer interface {
Consume(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
}
Consumer can freely consume messages, return Error if something goes wrong, nil otherwise.
type ConsumerChannel ¶
type ConsumerChannel struct {
// contains filtered or unexported fields
}
ConsumerChannel defines a bidirectional channel communication for consuming and acknowledging messages.
func NewConsumerChannel ¶
func NewConsumerChannel() *ConsumerChannel
func (*ConsumerChannel) ConsumerAcknowledge ¶
func (cc *ConsumerChannel) ConsumerAcknowledge()
func (*ConsumerChannel) ConsumerCancel ¶
func (cc *ConsumerChannel) ConsumerCancel()
func (*ConsumerChannel) ConsumerWaitRead ¶
func (cc *ConsumerChannel) ConsumerWaitRead() ([]byte, error)
func (*ConsumerChannel) ProducerWaitAcknowledge ¶
func (cc *ConsumerChannel) ProducerWaitAcknowledge() (bool, error)
func (*ConsumerChannel) ProducerWrite ¶
func (cc *ConsumerChannel) ProducerWrite(message []byte) error
func (*ConsumerChannel) WaitUntilFinished ¶
func (cc *ConsumerChannel) WaitUntilFinished()
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup represents a Sarama consumer group consumer
func (*ConsumerGroup) Cleanup ¶
func (c *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ConsumerGroup) ConsumeClaim ¶
func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ConsumerGroup) Setup ¶
func (c *ConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*ConsumerGroup) WaitUntilReady ¶
func (c *ConsumerGroup) WaitUntilReady()
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener allows a Consumer handling kafka messages without much hassle
func NewListener ¶
func NewListener(config ListenerConfig, consumer Consumer) *Listener
Click to show internal directories.
Click to hide internal directories.