Documentation ¶
Index ¶
- type Connector
- func (connector *Connector) CancelConsumerControlMessages()
- func (connector *Connector) Close()
- func (connector *Connector) CloseConsumer()
- func (connector *Connector) CloseProducer()
- func (connector *Connector) ConsumerChannel() <-chan *flow.FlowMessage
- func (connector *Connector) ConsumerErrors() <-chan error
- func (connector *Connector) ConsumerNotifications() <-chan *cluster.Notification
- func (connector *Connector) EnableManualErrorHandling()
- func (connector *Connector) GetConsumerControlMessages() <-chan ConsumerControlMessage
- func (connector *Connector) ProducerChannel() chan *flow.FlowMessage
- func (connector *Connector) ProducerErrors() <-chan *sarama.ProducerError
- func (connector *Connector) SetAuth(user string, pass string)
- func (connector *Connector) SetAuthAnon()
- func (connector *Connector) SetAuthFromEnv() error
- func (connector *Connector) SetChannelLength(l uint)
- func (connector *Connector) StartConsumer(broker string, topics []string, consumergroup string, offset int64) error
- func (connector *Connector) StartProducer(broker string, topic string) error
- type ConsumerControlMessage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
Connector handles a connection to read bwNetFlow flows from kafka.
func (*Connector) CancelConsumerControlMessages ¶
func (connector *Connector) CancelConsumerControlMessages()
CancelConsumerControlMessages disables the channel for control messages `ConsumerControlMessage`
func (*Connector) Close ¶
func (connector *Connector) Close()
Close closes the connection to kafka, i.e. Consumer and Producer
func (*Connector) CloseConsumer ¶
func (connector *Connector) CloseConsumer()
Close the Kafka Consumer specifically.
func (*Connector) CloseProducer ¶
func (connector *Connector) CloseProducer()
Close the Kafka Producer specifically.
func (*Connector) ConsumerChannel ¶
func (connector *Connector) ConsumerChannel() <-chan *flow.FlowMessage
Return the channel used for receiving Flows from the Kafka Consumer. If this channel closes, it means the upstream Kafka Consumer has closed its channel previously of the last decoding step. You can restart the Consumer by using .StartConsumer() on the same Connector object.
func (*Connector) ConsumerErrors ¶
Consumer Errors relayed directly from the Kafka Cluster.
This will become an exclusive reference only after EnableManualErrorHandling has been called. IMPORTANT: read EnableManualErrorHandling docs carefully
func (*Connector) ConsumerNotifications ¶
func (connector *Connector) ConsumerNotifications() <-chan *cluster.Notification
Consumer Notifications are relayed directly from the Kafka Cluster. These include which topics and partitions are read by this instance and are sent on every Rebalancing Event.
This will become an exclusive reference only after EnableManualErrorHandling has been called. IMPORTANT: read EnableManualErrorHandling docs carefully
func (*Connector) EnableManualErrorHandling ¶
func (connector *Connector) EnableManualErrorHandling()
Enable manual error handling by setting the internal flags. Any application calling this will have to read all messages provided by the channels returned from the ConsumerErrors, ConsumerNotifications and ProducerErrors methods. Else there will be deadlocks.
If this is called before any `.Start*` method was called, no go routines will be spawned for logging any messages. This is the recommended case. If this is called after any `.Start*` method was called, spawned go routines will be terminated.
func (*Connector) GetConsumerControlMessages ¶
func (connector *Connector) GetConsumerControlMessages() <-chan ConsumerControlMessage
GetConsumerControlMessages enables and returns a channel for control messages `ConsumerControlMessage`
func (*Connector) ProducerChannel ¶
func (connector *Connector) ProducerChannel() chan *flow.FlowMessage
Return the channel used for handing over Flows to the Kafka Producer. If writing to this channel blocks, check the log.
func (*Connector) ProducerErrors ¶
func (connector *Connector) ProducerErrors() <-chan *sarama.ProducerError
Producer Errors are relayed directly from the Kafka Cluster.
This will become an exclusive reference only after EnableManualErrorHandling has been called. IMPORTANT: read EnableManualErrorHandling docs carefully
func (*Connector) SetAuthAnon ¶
func (connector *Connector) SetAuthAnon()
Set anonymous credentials as login method.
func (*Connector) SetAuthFromEnv ¶
Check environment to infer which login to use in SASL/PLAIN auth via TLS Requires KAFKA_SASL_USER and KAFKA_SASL_PASS to be set for this process.
func (*Connector) SetChannelLength ¶
Set the channel length to something >0. Maybe read the source before using it.
type ConsumerControlMessage ¶
type ConsumerControlMessage struct { Partition int32 Offset int64 Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp }
ConsumerControlMessage takes the control params of *sarama.ConsumerMessage