kafka

package module
v2.0.7+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2019 License: MIT Imports: 15 Imported by: 0

README

CircleCI GoDoc

Kafka

Kafka is Go client library for Apache Kafka server, released under MIT license.

Kafka provides minimal abstraction over wire protocol, support for transparent failover and easy to use blocking API.

Example

Write all messages from stdin to kafka and print all messages from kafka topic to stdout.

package main

import (
    "bufio"
    "log"
    "os"
    "strings"

    "github.com/optiopay/kafka"
    "github.com/optiopay/kafka/proto"
)

const (
    topic     = "my-messages"
    partition = 0
)

var kafkaAddrs = []string{"localhost:9092", "localhost:9093"}

// printConsumed read messages from kafka and print them out
func printConsumed(broker kafka.Client) {
    conf := kafka.NewConsumerConf(topic, partition)
    conf.StartOffset = kafka.StartOffsetNewest
    consumer, err := broker.Consumer(conf)
    if err != nil {
        log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err)
    }

    for {
        msg, err := consumer.Consume()
        if err != nil {
            if err != kafka.ErrNoData {
                log.Printf("cannot consume %q topic message: %s", topic, err)
            }
            break
        }
        log.Printf("message %d: %s", msg.Offset, msg.Value)
    }
    log.Print("consumer quit")
}

// produceStdin read stdin and send every non empty line as message
func produceStdin(broker kafka.Client) {
    producer := broker.Producer(kafka.NewProducerConf())
    input := bufio.NewReader(os.Stdin)
    for {
        line, err := input.ReadString('\n')
        if err != nil {
            log.Fatalf("input error: %s", err)
        }
        line = strings.TrimSpace(line)
        if line == "" {
            continue
        }

        msg := &proto.Message{Value: []byte(line)}
        if _, err := producer.Produce(topic, partition, msg); err != nil {
            log.Fatalf("cannot produce message to %s:%d: %s", topic, partition, err)
        }
    }
}

func main() {
    conf := kafka.NewBrokerConf("test-client")
    conf.AllowTopicCreation = true

    // connect to kafka cluster
    broker, err := kafka.Dial(kafkaAddrs, conf)
    if err != nil {
        log.Fatalf("cannot connect to kafka cluster: %s", err)
    }
    defer broker.Close()

    go printConsumed(broker)
    produceStdin(broker)
}

Documentation

Overview

Package kafka a provides high level client API for Apache Kafka.

Use 'Broker' for node connection management, 'Producer' for sending messages, and 'Consumer' for fetching. All those structures implement Client, Consumer and Producer interface, that is also implemented in kafkatest package.

Index

Examples

Constants

View Source
const (
	// StartOffsetNewest configures the consumer to fetch messages produced
	// after creating the consumer.
	StartOffsetNewest = -1

	// StartOffsetOldest configures the consumer to fetch starting from the
	// oldest message available.
	StartOffsetOldest = -2
)

Variables

View Source
var ErrClosed = errors.New("closed")

ErrClosed is returned as result of any request made using closed connection.

View Source
var ErrMxClosed = errors.New("closed")

ErrMxClosed is returned as a result of closed multiplexer consumption.

View Source
var (
	// ErrNoData is returned by consumers on Fetch when the retry limit is set
	// and exceeded.
	ErrNoData = errors.New("no data")
)

Functions

This section is empty.

Types

type BatchConsumer

type BatchConsumer interface {
	ConsumeBatch() ([]*proto.Message, error)
}

BatchConsumer is the interface that wraps the ConsumeBatch method.

ConsumeBatch reads a batch of messages from a consumer, returning an error when encountered.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker is an abstract connection to kafka cluster, managing connections to all kafka nodes.

func Dial

func Dial(nodeAddresses []string, conf BrokerConf) (*Broker, error)

Dial connects to any node from a given list of kafka addresses and after successful metadata fetch, returns broker.

The returned broker is not initially connected to any kafka node.

func (*Broker) BatchConsumer

func (b *Broker) BatchConsumer(conf ConsumerConf) (BatchConsumer, error)

BatchConsumer creates a new BatchConsumer instance, bound to the broker.

func (*Broker) Close

func (b *Broker) Close()

Close closes the broker and all active kafka nodes connections.

func (*Broker) Consumer

func (b *Broker) Consumer(conf ConsumerConf) (Consumer, error)

Consumer creates a new consumer instance, bound to the broker.

func (*Broker) CreateTopic

func (b *Broker) CreateTopic(topics []proto.TopicInfo, timeout time.Duration, validateOnly bool) (*proto.CreateTopicsResp, error)

CreateTopic request topic creation

func (*Broker) Metadata

func (b *Broker) Metadata() (*proto.MetadataResp, error)

Metadata requests metadata information from any node.

func (*Broker) OffsetCoordinator

func (b *Broker) OffsetCoordinator(conf OffsetCoordinatorConf) (OffsetCoordinator, error)

OffsetCoordinator returns offset management coordinator for single consumer group, bound to broker.

func (*Broker) OffsetEarliest

func (b *Broker) OffsetEarliest(topic string, partition int32) (offset int64, err error)

OffsetEarliest returns the oldest offset available on the given partition.

func (*Broker) OffsetLatest

func (b *Broker) OffsetLatest(topic string, partition int32) (offset int64, err error)

OffsetLatest return the offset of the next message produced in given partition

func (*Broker) PartitionCount

func (b *Broker) PartitionCount(topic string) (int32, error)

PartitionCount returns how many partitions a given topic has. If a topic is not known, 0 and an error are returned.

func (*Broker) Producer

func (b *Broker) Producer(conf ProducerConf) Producer

Producer returns new producer instance, bound to the broker.

type BrokerConf

type BrokerConf struct {
	// Kafka client ID.
	ClientID string

	// LeaderRetryLimit limits the number of connection attempts to a single
	// node before failing. Use LeaderRetryWait to control the wait time
	// between retries.
	//
	// Defaults to 10.
	LeaderRetryLimit int

	// LeaderRetryWait sets a limit to the waiting time when trying to connect
	// to a single node after failure.
	//
	// Defaults to 500ms.
	//
	// Timeout on a connection is controlled by the DialTimeout setting.
	LeaderRetryWait time.Duration

	// AllowTopicCreation enables a last-ditch "send produce request" which
	// happens if we do not know about a topic. This enables topic creation
	// if your Kafka cluster is configured to allow it.
	//
	// Defaults to False.
	AllowTopicCreation bool

	// Any new connection dial timeout.
	//
	// Default is 10 seconds.
	DialTimeout time.Duration

	// DialRetryLimit limits the number of connection attempts to every node in
	// cluster before failing. Use DialRetryWait to control the wait time
	// between retries.
	//
	// Defaults to 10.
	DialRetryLimit int

	// DialRetryWait sets a limit to the waiting time when trying to establish
	// broker connection to single node to fetch cluster metadata.
	//
	// Defaults to 500ms.
	DialRetryWait time.Duration

	// ReadTimeout is TCP read timeout
	//
	// Default is 30 seconds
	ReadTimeout time.Duration

	// RetryErrLimit limits the number of retry attempts when an error is
	// encountered.
	//
	// Default is 10.
	RetryErrLimit int

	// RetryErrWait controls the wait duration between retries after failed
	// fetch request.
	//
	// Default is 500ms.
	RetryErrWait time.Duration

	// DEPRECATED 2015-07-10 - use Logger instead
	//
	// TODO(husio) remove
	//
	// Logger used by the broker.
	Log interface {
		Print(...interface{})
		Printf(string, ...interface{})
	}

	// Logger is general logging interface that can be provided by popular
	// logging frameworks. Used to notify and as replacement for stdlib `log`
	// package.
	Logger Logger

	//TLS CA pem
	TLSCa []byte
	//TLS certificate
	TLSCert []byte
	//TLS key
	TLSKey []byte
}

BrokerConf represents the configuration of a broker.

func NewBrokerConf

func NewBrokerConf(clientID string) BrokerConf

NewBrokerConf returns the default broker configuration.

type Client

type Client interface {
	Producer(conf ProducerConf) Producer
	Consumer(conf ConsumerConf) (Consumer, error)
	OffsetCoordinator(conf OffsetCoordinatorConf) (OffsetCoordinator, error)
	OffsetEarliest(topic string, partition int32) (offset int64, err error)
	OffsetLatest(topic string, partition int32) (offset int64, err error)
	Close()
}

Client is the interface implemented by Broker.

type Consumer

type Consumer interface {
	Consume() (*proto.Message, error)
}

Consumer is the interface that wraps the Consume method.

Consume reads a message from a consumer, returning an error when encountered.

Example
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
	panic(err)
}
defer broker.Close()

// create new consumer
conf := NewConsumerConf("my-messages", 0)
conf.StartOffset = StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
	panic(err)
}

// read all messages
for {
	msg, err := consumer.Consume()
	if err != nil {
		if err == ErrNoData {
			break
		}
		panic(err)
	}

	fmt.Printf("message: %#v", msg)
}
Output:

type ConsumerConf

type ConsumerConf struct {
	// Topic name that should be consumed
	Topic string

	// Partition ID that should be consumed.
	Partition int32

	// RequestTimeout controls fetch request timeout. This operation is
	// blocking the whole connection, so it should always be set to a small
	// value. By default it's set to 50ms.
	// To control fetch function timeout use RetryLimit and RetryWait.
	RequestTimeout time.Duration

	// RetryLimit limits fetching messages a given amount of times before
	// returning ErrNoData error.
	//
	// Default is -1, which turns this limit off.
	RetryLimit int

	// RetryWait controls the duration of wait between fetch request calls,
	// when no data was returned.
	//
	// Default is 50ms.
	RetryWait time.Duration

	// RetryErrLimit limits the number of retry attempts when an error is
	// encountered.
	//
	// Default is 10.
	RetryErrLimit int

	// RetryErrWait controls the wait duration between retries after failed
	// fetch request.
	//
	// Default is 500ms.
	RetryErrWait time.Duration

	// MinFetchSize is the minimum size of messages to fetch in bytes.
	//
	// Default is 1 to fetch any message available.
	MinFetchSize int32

	// MaxFetchSize is the maximum size of data which can be sent by kafka node
	// to consumer.
	//
	// Default is 4MB.
	MaxFetchSize int32

	// Consumer cursor starting point. Set to StartOffsetNewest to receive only
	// newly created messages or StartOffsetOldest to read everything. Assign
	// any offset value to manually set cursor -- consuming starts with the
	// message whose offset is equal to given value (including first message).
	//
	// Default is StartOffsetOldest.
	StartOffset int64

	// Logger used by consumer. By default, reuse logger assigned to broker.
	Logger Logger
}

ConsumerConf represents the configuration of a consumer.

func NewConsumerConf

func NewConsumerConf(topic string, partition int32) ConsumerConf

NewConsumerConf returns the default consumer configuration.

type DistributingProducer

type DistributingProducer interface {
	Distribute(topic string, messages ...*proto.Message) (offset int64, err error)
}

DistributingProducer is the interface similar to Producer, but never require to explicitly specify partition.

Distribute writes messages to the given topic, automatically choosing partition, returning the post-commit offset and any error encountered. The offset of each message is also updated accordingly.

func NewHashProducer

func NewHashProducer(p Producer, numPartitions int32) DistributingProducer

NewHashProducer wraps given producer and return DistributingProducer that publish messages to kafka, computing partition number from message key hash, using fnv hash and [0, numPartitions) range.

func NewRandomProducer

func NewRandomProducer(p Producer, numPartitions int32) DistributingProducer

NewRandomProducer wraps given producer and return DistributingProducer that publish messages to kafka, randomly picking partition number from range [0, numPartitions)

func NewRoundRobinProducer

func NewRoundRobinProducer(p Producer, numPartitions int32) DistributingProducer

NewRoundRobinProducer wraps given producer and return DistributingProducer that publish messages to kafka, choosing destination partition from cycle build from [0, numPartitions) range.

type Logger

type Logger interface {
	Debug(msg string, args ...interface{})
	Info(msg string, args ...interface{})
	Warn(msg string, args ...interface{})
	Error(msg string, args ...interface{})
}

Logger is general logging interface that can be provided by popular logging frameworks.

* https://github.com/go-kit/kit/tree/master/log * https://github.com/husio/log

type Mx

type Mx struct {
	// contains filtered or unexported fields
}

Mx is multiplexer combining into single stream number of consumers.

It is responsibility of the user of the multiplexer and the consumer implementation to handle errors. ErrNoData returned by consumer is not passed through by the multiplexer, instead consumer that returned ErrNoData is removed from merged set. When all consumers are removed (set is empty), Mx is automatically closed and any further Consume call will result in ErrMxClosed error.

It is important to remember that because fetch from every consumer is done by separate worker, most of the time there is one message consumed by each worker that is held in memory while waiting for opportunity to return it once Consume on multiplexer is called. Closing multiplexer may result in ignoring some of already read, waiting for delivery messages kept internally by every worker.

func Merge

func Merge(consumers ...Consumer) *Mx

Merge is merging consume result of any number of consumers into single stream and expose them through returned multiplexer.

Example
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
	panic(err)
}
defer broker.Close()

topics := []string{"fruits", "vegetables"}
fetchers := make([]Consumer, len(topics))

// create consumers for different topics
for i, topic := range topics {
	conf := NewConsumerConf(topic, 0)
	conf.RetryLimit = 20
	conf.StartOffset = StartOffsetNewest
	consumer, err := broker.Consumer(conf)
	if err != nil {
		panic(err)
	}
	fetchers[i] = consumer
}

// merge all created consumers (they don't even have to belong to the same broker!)
mx := Merge(fetchers...)
defer mx.Close()

// consume messages from all sources
for {
	msg, err := mx.Consume()
	if err != nil {
		panic(err)
	}
	fmt.Printf("message: %#v", msg)
}
Output:

func (*Mx) Close

func (p *Mx) Close()

Close is closing multiplexer and stopping all underlying workers.

Closing multiplexer will stop all workers as soon as possible, but any consume-in-progress action performed by worker has to be finished first. Any consumption result received after closing multiplexer is ignored.

Close is returning without waiting for all the workers to finish.

Closing closed multiplexer has no effect.

func (*Mx) Consume

func (p *Mx) Consume() (*proto.Message, error)

Consume returns Consume result from any of the merged consumer.

func (*Mx) Workers

func (p *Mx) Workers() int

Workers return number of active consumer workers that are pushing messages to multiplexer conumer queue.

type OffsetCoordinator

type OffsetCoordinator interface {
	Commit(topic string, partition int32, offset int64) error
	Offset(topic string, partition int32) (offset int64, metadata string, err error)
}

OffsetCoordinator is the interface which wraps the Commit and Offset methods.

Example
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
	panic(err)
}
defer broker.Close()

// create offset coordinator and customize configuration
conf := NewOffsetCoordinatorConf("my-consumer-group")
conf.RetryErrLimit = 20
coordinator, err := broker.OffsetCoordinator(conf)
if err != nil {
	panic(err)
}

// write consumed message offset for topic/partition
if err := coordinator.Commit("my-topic", 0, 12); err != nil {
	panic(err)
}

// get latest consumed offset for given topic/partition
off, _, err := coordinator.Offset("my-topic", 0)
if err != nil {
	panic(err)
}

if off != 12 {
	panic(fmt.Sprintf("offset is %d, not 12", off))
}
Output:

type OffsetCoordinatorConf

type OffsetCoordinatorConf struct {
	ConsumerGroup string

	// RetryErrLimit limits messages fetch retry upon failure. By default 10.
	RetryErrLimit int

	// RetryErrWait controls wait duration between retries after failed fetch
	// request. By default 500ms.
	RetryErrWait time.Duration

	// Logger used by consumer. By default, reuse logger assigned to broker.
	Logger Logger
}

OffsetCoordinatorConf represents the configuration of an offset coordinator.

func NewOffsetCoordinatorConf

func NewOffsetCoordinatorConf(consumerGroup string) OffsetCoordinatorConf

NewOffsetCoordinatorConf returns default OffsetCoordinator configuration.

type Producer

type Producer interface {
	Produce(topic string, partition int32, messages ...*proto.Message) (offset int64, err error)
}

Producer is the interface that wraps the Produce method.

Produce writes the messages to the given topic and partition. It returns the offset of the first message and any error encountered. The offset of each message is also updated accordingly.

Example
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
	panic(err)
}
defer broker.Close()

// create new producer
conf := NewProducerConf()
conf.RequiredAcks = proto.RequiredAcksLocal

// write two messages to kafka using single call to make it atomic
producer := broker.Producer(conf)
messages := []*proto.Message{
	{Value: []byte("first")},
	{Value: []byte("second")},
}
if _, err := producer.Produce("my-messages", 0, messages...); err != nil {
	panic(err)
}
Output:

type ProducerConf

type ProducerConf struct {
	// Compression method to use, defaulting to proto.CompressionNone.
	Compression proto.Compression

	// Message ACK configuration. Use proto.RequiredAcksAll to require all
	// servers to write, proto.RequiredAcksLocal to wait only for leader node
	// answer or proto.RequiredAcksNone to not wait for any response.
	// Setting this to any other, greater than zero value will make producer to
	// wait for given number of servers to confirm write before returning.
	RequiredAcks int16

	// Timeout of single produce request. By default, 5 seconds.
	RequestTimeout time.Duration

	// RetryLimit specify how many times message producing should be retried in
	// case of failure, before returning the error to the caller. By default
	// set to 10.
	RetryLimit int

	// RetryWait specify wait duration before produce retry after failure. By
	// default set to 200ms.
	RetryWait time.Duration

	// Logger used by producer. By default, reuse logger assigned to broker.
	Logger Logger
}

ProducerConf represents the configuration of a producer.

func NewProducerConf

func NewProducerConf() ProducerConf

NewProducerConf returns a default producer configuration.

Directories

Path Synopsis
Package kafkatest provides mock objects for high level kafka interface.
Package kafkatest provides mock objects for high level kafka interface.
Package proto provides kafka binary protocol implementation.
Package proto provides kafka binary protocol implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL