Documentation ¶
Index ¶
- type Connector
- func (connector *Connector) Close()
- func (connector *Connector) ConsumerChannel() <-chan *flow.FlowMessage
- func (connector *Connector) DisableAuth()
- func (connector *Connector) DisableTLS()
- func (connector *Connector) EnablePrometheus(listen string)
- func (connector *Connector) NewBaseConfig() *sarama.Config
- func (connector *Connector) ProducerChannel(topic string) chan *flow.FlowMessage
- func (connector *Connector) SetAuth(user string, pass string)
- func (connector *Connector) SetAuthAnon()
- func (connector *Connector) SetAuthFromEnv() error
- func (connector *Connector) StartConsumer(brokers string, topics []string, group string, offset int64) error
- func (connector *Connector) StartProducer(broker string) error
- type Consumer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
Connector handles a connection to read bwNetFlow flows from kafka.
func (*Connector) ConsumerChannel ¶
func (connector *Connector) ConsumerChannel() <-chan *flow.FlowMessage
Return the channel used for receiving Flows from the Kafka Consumer. If this channel closes, it means the upstream Kafka Consumer has closed its channel previously of the last decoding step. You can restart the Consumer by using .StartConsumer() on the same Connector object.
func (*Connector) DisableAuth ¶
func (connector *Connector) DisableAuth()
DisableAuth disables authentification
func (*Connector) DisableTLS ¶
func (connector *Connector) DisableTLS()
DisableTLS disables ssl/tls connection
func (*Connector) EnablePrometheus ¶
EnablePrometheus enables metric exporter for both, Consumer and Producer
func (*Connector) NewBaseConfig ¶
EnablePrometheus enables metric exporter for both, Consumer and Producer
func (*Connector) ProducerChannel ¶
func (connector *Connector) ProducerChannel(topic string) chan *flow.FlowMessage
Return the channel used for handing over Flows to the Kafka Producer. If writing to this channel blocks, check the log.
func (*Connector) SetAuthAnon ¶
func (connector *Connector) SetAuthAnon()
Set anonymous credentials as login method.
func (*Connector) SetAuthFromEnv ¶
Check environment to infer which login to use in SASL/PLAIN auth via TLS Requires KAFKA_SASL_USER and KAFKA_SASL_PASS to be set for this process.
func (*Connector) StartConsumer ¶
func (connector *Connector) StartConsumer(brokers string, topics []string, group string, offset int64) error
Start a Kafka Consumer with the specified parameters. Its output will be available in the channel returned by ConsumerChannel.
func (*Connector) StartProducer ¶
Start a Kafka Producer with the specified parameters. The channel returned by ProducerChannel will be accepting your input.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a Sarama consumer group consumer
func (*Consumer) Cleanup ¶
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Consumer) ConsumeClaim ¶
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().