kafka

package module
v0.0.0-...-b44924f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2019 License: GPL-3.0 Imports: 11 Imported by: 0

README

Build Status Go Report Card GoDoc

bwNetFlow Go Kafka Connector

Example Usage:

package main

import (
	"fmt"

	"github.com/Shopify/sarama"
	kafka "omi-gitlab.e-technik.uni-ulm.de/bwnetflow/kafka/kafkaconnector"
)

var kafkaConn = kafka.Connector{}

func main() {

	fmt.Printf("welcome ... let's go!\n")

	// connect to the BelWue Kafka cluster
	broker := "127.0.0.1:9092,[::1]:9092" // TODO: set valid uris
	topic := []string{"flow-messages-anon"}
	consumerGroup := "anon-golang-example"
	kafkaConn.SetAuthAnon() // optionally: change to SetAuthFromEnv() or SetAuth(user string, pass string)
	kafkaConn.StartConsumer(broker, topic, consumerGroup, sarama.OffsetNewest)
	defer kafkaConn.Close()

	// receive flows: e.g. count flows & bytes
	var flowCounter, byteCounter uint64
	for {
		flow := <-kafkaConn.ConsumerChannel()
		// process the flow here ...
		flowCounter++
		byteCounter += flow.GetBytes()
		fmt.Printf("\rflows: %d, bytes: %d GB", flowCounter, byteCounter/1024/1024/1024)
	}

}

Step by step guide

  • Make sure you have golang installed
  • Make new directory "go-example", and place file a client.go with the above listed code snippet in it
  • Inside "go-example" run go get ./... to download the dependencies (will be downloaded to the path $GOPATH)
  • Inside "go-example" run go run client.go to execute the example from source code
  • If you want to build a binary, run go build client.go which produces the "client" binary (run it with ./client)

flowfilter

The kafkaconnector contains also an optional flowfilter, which allows to filter for customer ID, IP address (ranges) and peers. Example usage:

var (
	// filtering
	filterCustomerIDs = flag.String("filter.customerid", "", "If defined, only flows for this customer are considered. Leave empty to disable filter. Provide comma separated list to filter for multiple customers.")
	filterIPsv4       = flag.String("filter.IPsv4", "", "If defined, only flows to/from this IP V4 subnet are considered. Leave empty to disable filter. Provide comma separated list to filter for multiple IP subnets.")
	filterIPsv6       = flag.String("filter.IPsv6", "", "If defined, only flows to/from this IP V6 subnet are considered. Leave empty to disable filter. Provide comma separated list to filter for multiple IP subnets.")
	filterPeers       = flag.String("filter.peers", "", "If defined, only flows to/from this peer are considered. Leave empty to disable filter. Provide comma separated list to filter for multiple peers.")
)

func main() {
	// ... establish connection, etc.

	// initialize filters: prepare filter arrays
	flowFilter = flowFilter.NewFlowFilter(*filterCustomerIDs, *filterIPsv4, *filterIPsv6, *filterPeers)

	// handle kafka flow messages in foreground
	for {
		flow := <-kafkaConn.ConsumerChannel()
		if flowFilter.FilterApplies(flow) {
			handleFlow(flow)
		}
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

Connector handles a connection to read bwNetFlow flows from kafka.

func (*Connector) CancelConsumerControlMessages

func (connector *Connector) CancelConsumerControlMessages()

CancelConsumerControlMessages disables the channel for control messages `ConsumerControlMessage`

func (*Connector) Close

func (connector *Connector) Close()

Close closes the connection to kafka, i.e. Consumer and Producer

func (*Connector) CloseConsumer

func (connector *Connector) CloseConsumer()

Close the Kafka Consumer specifically.

func (*Connector) CloseProducer

func (connector *Connector) CloseProducer()

Close the Kafka Producer specifically.

func (*Connector) ConsumerChannel

func (connector *Connector) ConsumerChannel() <-chan *flow.FlowMessage

Return the channel used for receiving Flows from the Kafka Consumer. If this channel closes, it means the upstream Kafka Consumer has closed its channel previously of the last decoding step. You can restart the Consumer by using .StartConsumer() on the same Connector object.

func (*Connector) ConsumerErrors

func (connector *Connector) ConsumerErrors() <-chan error

Consumer Errors relayed directly from the Kafka Cluster.

This will become an exclusive reference only after EnableManualErrorHandling has been called. IMPORTANT: read EnableManualErrorHandling docs carefully

func (*Connector) ConsumerNotifications

func (connector *Connector) ConsumerNotifications() <-chan *cluster.Notification

Consumer Notifications are relayed directly from the Kafka Cluster. These include which topics and partitions are read by this instance and are sent on every Rebalancing Event.

This will become an exclusive reference only after EnableManualErrorHandling has been called. IMPORTANT: read EnableManualErrorHandling docs carefully

func (*Connector) EnableManualErrorHandling

func (connector *Connector) EnableManualErrorHandling()

Enable manual error handling by setting the internal flags. Any application calling this will have to read all messages provided by the channels returned from the ConsumerErrors, ConsumerNotifications and ProducerErrors methods. Else there will be deadlocks.

If this is called before any `.Start*` method was called, no go routines will be spawned for logging any messages. This is the recommended case. If this is called after any `.Start*` method was called, spawned go routines will be terminated.

func (*Connector) GetConsumerControlMessages

func (connector *Connector) GetConsumerControlMessages() <-chan ConsumerControlMessage

GetConsumerControlMessages enables and returns a channel for control messages `ConsumerControlMessage`

func (*Connector) ProducerChannel

func (connector *Connector) ProducerChannel() chan *flow.FlowMessage

Return the channel used for handing over Flows to the Kafka Producer. If writing to this channel blocks, check the log.

func (*Connector) ProducerErrors

func (connector *Connector) ProducerErrors() <-chan *sarama.ProducerError

Producer Errors are relayed directly from the Kafka Cluster.

This will become an exclusive reference only after EnableManualErrorHandling has been called. IMPORTANT: read EnableManualErrorHandling docs carefully

func (*Connector) SetAuth

func (connector *Connector) SetAuth(user string, pass string)

SetAuth explicitly set which login to use in SASL/PLAIN auth via TLS

func (*Connector) SetAuthAnon

func (connector *Connector) SetAuthAnon()

Set anonymous credentials as login method.

func (*Connector) SetAuthFromEnv

func (connector *Connector) SetAuthFromEnv() error

Check environment to infer which login to use in SASL/PLAIN auth via TLS Requires KAFKA_SASL_USER and KAFKA_SASL_PASS to be set for this process.

func (*Connector) SetChannelLength

func (connector *Connector) SetChannelLength(l uint)

Set the channel length to something >0. Maybe read the source before using it.

func (*Connector) StartConsumer

func (connector *Connector) StartConsumer(broker string, topics []string, consumergroup string, offset int64) error

Start a Kafka Consumer with the specified parameters. Its output will be available in the channel returned by ConsumerChannel.

func (*Connector) StartProducer

func (connector *Connector) StartProducer(broker string, topic string) error

Start a Kafka Producer with the specified parameters. The channel returned by ProducerChannel will be accepting your input.

type ConsumerControlMessage

type ConsumerControlMessage struct {
	Partition      int32
	Offset         int64
	Timestamp      time.Time // only set if kafka is version 0.10+, inner message timestamp
	BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
}

ConsumerControlMessage takes the control params of *sarama.ConsumerMessage

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL