Documentation ¶
Overview ¶
Consumes flows from a Kafka instance and passes them to the following segments. This segment is based on the kafkaconnector library: https://github.com/bwNetFlow/kafkaconnector
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler represents a Sarama consumer group consumer
func (*Handler) Cleanup ¶
func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Handler) ConsumeClaim ¶
func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
type KafkaConsumer ¶
type KafkaConsumer struct { segments.BaseSegment Server string // required Topic string // required Group string // required User string // required if auth is true Pass string // required if auth is true Tls bool // optional, default is true Auth bool // optional, default is true StartAt string // optional, one of "oldest" or "newest", default is "newest" Timeout time.Duration // optional, default is 5m, any parsable duration // contains filtered or unexported fields }
FIXME: clean up those todos
func (KafkaConsumer) New ¶
func (segment KafkaConsumer) New(config map[string]string) segments.Segment
func (*KafkaConsumer) Run ¶
func (segment *KafkaConsumer) Run(wg *sync.WaitGroup)
Click to show internal directories.
Click to hide internal directories.