kafka

package module
v0.0.0-...-491b239 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2018 License: MIT Imports: 15 Imported by: 1

README

Build Status GoDoc

Kafka

Kafka is Go client library for Apache Kafka server, released under MIT license. Originally based on the great client from: https://github.com/optiopay/kafka

Kafka provides minimal abstraction over wire protocol, support for transparent failover and easy to use blocking API.

Example

Write all messages from stdin to kafka and print all messages from kafka topic to stdout.

package main

import (
    "bufio"
    "log"
    "os"
    "strings"

    "github.com/zorkian/kafka"
    "github.com/zorkian/kafka/proto"
)

const (
    topic     = "my-messages"
    partition = 0
)

var kafkaAddrs = []string{"localhost:9092", "localhost:9093"}

// printConsumed read messages from kafka and print them out
func printConsumed(broker kafka.Client) {
    conf := kafka.NewConsumerConf(topic, partition)
    conf.StartOffset = kafka.StartOffsetNewest
    consumer, err := broker.Consumer(conf)
    if err != nil {
        log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err)
    }

    for {
        msg, err := consumer.Consume()
        if err != nil {
            if err != kafka.ErrNoData {
                log.Printf("cannot consume %q topic message: %s", topic, err)
            }
            break
        }
        log.Printf("message %d: %s", msg.Offset, msg.Value)
    }
    log.Print("consumer quit")
}

// produceStdin read stdin and send every non empty line as message
func produceStdin(broker kafka.Client) {
    producer := broker.Producer(kafka.NewProducerConf())
    input := bufio.NewReader(os.Stdin)
    for {
        line, err := input.ReadString('\n')
        if err != nil {
            log.Fatalf("input error: %s", err)
        }
        line = strings.TrimSpace(line)
        if line == "" {
            continue
        }

        msg := &proto.Message{Value: []byte(line)}
        if _, err := producer.Produce(topic, partition, msg); err != nil {
            log.Fatalf("cannot produce message to %s:%d: %s", topic, partition, err)
        }
    }
}

func main() {
    // connect to kafka cluster
    broker, err := kafka.Dial(kafkaAddrs, kafka.NewBrokerConf("test-client"))
    if err != nil {
        log.Fatalf("cannot connect to kafka cluster: %s", err)
    }
    defer broker.Close()

    go printConsumed(broker)
    produceStdin(broker)
}

Documentation

Overview

Package kafka a provides high level client API for Apache Kafka.

Use 'Broker' for node connection management, 'Producer' for sending messages, and 'Consumer' for fetching. All those structures implement Client, Consumer and Producer interface, that is also implemented in kafkatest package.

Index

Examples

Constants

View Source
const (
	// StartOffsetOldest configures the consumer to fetch starting from the
	// oldest message available.
	StartOffsetOldest = -1

	// StartOffsetNewest configures the consumer to fetch messages produced
	// after creating the consumer.
	StartOffsetNewest = -2
)

Variables

View Source
var ErrClosed = errors.New("closed")

ErrClosed is returned as result of any request made using closed connection.

View Source
var (

	// ErrNoData is returned by consumers on Fetch when the retry limit is set and exceeded.
	ErrNoData = errors.New("no data")
)

Functions

func InitializeMetadataCache

func InitializeMetadataCache()

InitializeMetadataCache will make Kafka connections will be cached globally.

func NewErrorAverseRRProducerConf

func NewErrorAverseRRProducerConf() *errorAverseRRProducerConf

func SetLogger

func SetLogger(l *logging.Logger)

SetLogger allows overriding the logger being used by Kafka clients.

Types

type BatchConsumer

type BatchConsumer interface {
	ConsumeBatch() ([]*proto.Message, error)
}

BatchConsumer is the interface that wraps the ConsumeBatch method.

ConsumeBatch reads a batch of messages from a consumer, returning an error when encountered.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker is an abstract connection to kafka cluster for the given configuration, and can be used to create clients to the cluster.

func NewBroker

func NewBroker(clusterName string, nodeAddresses []string, conf BrokerConf) (*Broker, error)

NewBroker returns a broker to a given list of kafka addresses.

The returned broker is not necessarily initially connected to any kafka node.

func (*Broker) BatchConsumer

func (b *Broker) BatchConsumer(conf ConsumerConf) (BatchConsumer, error)

BatchConsumer creates a new BatchConsumer instance, bound to the broker.

func (*Broker) Consumer

func (b *Broker) Consumer(conf ConsumerConf) (Consumer, error)

Consumer creates a new consumer instance, bound to the broker.

func (*Broker) Metadata

func (b *Broker) Metadata() (*proto.MetadataResp, error)

Metadata returns a copy of the metadata. This does not require a lock as it's fetching a new copy from Kafka, we never use our internal state.

func (*Broker) OffsetCoordinator

func (b *Broker) OffsetCoordinator(conf OffsetCoordinatorConf) (OffsetCoordinator, error)

OffsetCoordinator returns offset management coordinator for single consumer group, bound to broker.

func (*Broker) OffsetEarliest

func (b *Broker) OffsetEarliest(topic string, partition int32) (int64, error)

OffsetEarliest returns the oldest offset available on the given partition.

func (*Broker) OffsetLatest

func (b *Broker) OffsetLatest(topic string, partition int32) (int64, error)

OffsetLatest return the offset of the next message produced in given partition

func (*Broker) PartitionCount

func (b *Broker) PartitionCount(topic string) (int32, error)

PartitionCount returns the count of partitions in a topic, or 0 and an error if the topic does not exist.

func (*Broker) Producer

func (b *Broker) Producer(conf ProducerConf) Producer

Producer returns new producer instance, bound to the broker.

type BrokerConf

type BrokerConf struct {
	// Kafka client ID.
	ClientID string

	// LeaderRetryLimit limits the number of connection attempts to a single
	// node before failing. Use LeaderRetryWait to control the wait time
	// between retries.
	//
	// Defaults to 10.
	LeaderRetryLimit int

	// LeaderRetryWait sets a limit to the waiting time when trying to connect
	// to a single node after failure. This is the initial time, we will do an
	// exponential backoff with increasingly long durations.
	//
	// Defaults to 500ms.
	//
	// Timeout on a connection is controlled by the DialTimeout setting.
	LeaderRetryWait time.Duration

	// AllowTopicCreation enables a last-ditch "send produce request" which
	// happens if we do not know about a topic. This enables topic creation
	// if your Kafka cluster is configured to allow it.
	//
	// Defaults to False.
	AllowTopicCreation bool

	// Configuration specific to the connections to the cluster.
	ClusterConnectionConf ClusterConnectionConf
}

BrokerConf is the broker configuration container.

func NewBrokerConf

func NewBrokerConf(clientID string) BrokerConf

NewBrokerConf constructs default configuration.

type Client

type Client interface {
	Producer(conf ProducerConf) Producer
	Consumer(conf ConsumerConf) (Consumer, error)
	OffsetCoordinator(conf OffsetCoordinatorConf) (OffsetCoordinator, error)
	OffsetEarliest(topic string, partition int32) (offset int64, err error)
	OffsetLatest(topic string, partition int32) (offset int64, err error)
}

Client is the interface implemented by Broker.

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster maintains the metadata and connectionPools for a Kafka cluster.

func NewCluster

func NewCluster(nodeAddresses []string, conf ClusterConnectionConf) (*Cluster, error)

NewCluster connects to a cluster from a given list of kafka addresses and after successful metadata fetch, returns Cluster.

func (*Cluster) Fetch

func (cm *Cluster) Fetch(clientID string, topics ...string) (*proto.MetadataResp, error)

Fetch is requesting metadata information from any node and return protocol response if successful. This will attempt to talk to every node at least once until one returns a successful response. We walk the nodes in a random order.

If "topics" are specified, only fetch metadata for those topics (can be used to create a topic)

func (*Cluster) ForgetEndpoint

func (cm *Cluster) ForgetEndpoint(topic string, partition int32)

ForgetEndpoint is used to remove an endpoint that doesn't see to lead to a valid location.

func (*Cluster) GetEndpoint

func (cm *Cluster) GetEndpoint(topic string, partition int32) (int32, error)

GetEndpoint returns a nodeID for a topic/partition. Returns an error if the topic/partition is unknown.

func (*Cluster) GetNodeAddress

func (cm *Cluster) GetNodeAddress(nodeID int32) string

GetNodeAddress returns the address to a node if we know it.

func (*Cluster) GetNodes

func (cm *Cluster) GetNodes() NodeMap

GetNodes returns a map of nodes that exist in the cluster.

func (*Cluster) PartitionCount

func (cm *Cluster) PartitionCount(topic string) (int32, error)

PartitionCount returns how many partitions a given topic has. If a topic is not known, 0 and an error are returned.

func (*Cluster) RefreshMetadata

func (cm *Cluster) RefreshMetadata() error

RefreshMetadata is requesting metadata information from any node and refresh internal cached representation. This method can block for a long time depending on how long it takes to update metadata.

type ClusterConnectionConf

type ClusterConnectionConf struct {
	// ConnectionLimit sets a limit on how many outstanding connections may exist to a
	// single broker. This limit is for all connections except Metadata fetches which are exempted
	// but separately limited to one per cluster. That is, the maximum number of connections per
	// broker is ConnectionLimit + 1 but the maximum number of connections per cluster is
	// NumBrokers * ConnectionLimit + 1 not NumBrokers * (ConnectionLimit + 1).
	// Setting this too low can limit your throughput, but setting it too high can cause problems
	// for your cluster.
	//
	// Defaults to 10.
	ConnectionLimit int

	// IdleConnectionWait sets a timeout on how long we should wait for a connection to
	// become idle before we establish a new one. This value sets a cap on how much latency
	// you're willing to add to a request before establishing a new connection.
	//
	// Default is 200ms.
	IdleConnectionWait time.Duration

	// Any new connection dial timeout. This must be at least double the
	// IdleConnectionWait.
	//
	// Default is 10 seconds.
	DialTimeout time.Duration

	// DialRetryLimit limits the number of connection attempts to every node in
	// cluster before failing. Use DialRetryWait to control the wait time
	// between retries.
	//
	// Defaults to 10.
	DialRetryLimit int

	// DialRetryWait sets a limit to the waiting time when trying to establish
	// broker connection to single node to fetch cluster metadata. This is subject to
	// exponential backoff, so the second and further retries will be more than this
	// value.
	//
	// Defaults to 500ms.
	DialRetryWait time.Duration

	// MetadataRefreshTimeout is the maximum time to wait for a metadata refresh. This
	// is compounding with many of the retries -- various failures trigger a metadata
	// refresh. This should be set fairly high, as large metadata objects or loaded
	// clusters can take a little while to return data.
	//
	// Defaults to 30s.
	MetadataRefreshTimeout time.Duration

	// MetadataRefreshFrequency is how often we should refresh metadata regardless of whether we
	// have encountered errors.
	//
	// Defaults to 0 which means disabled.
	MetadataRefreshFrequency time.Duration
}

ClusterConnectionConf is configuration for the cluster connection pool.

func NewClusterConnectionConf

func NewClusterConnectionConf() ClusterConnectionConf

NewClusterConnectionConf constructs a default configuration.

type Consumer

type Consumer interface {
	// Consume reads a message from a consumer, returning an error when encountered.
	Consume() (*proto.Message, error)
	// SeekToLatest advances the Consumer's offset to the newest messages available, affecting
	// future calls to Consume. Calling this method violates the ALO guarantees normally associated
	// with Kafka consumption.
	SeekToLatest() error
}

Consumer is the interface that wraps the Consume method.

Example
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := NewBroker("example-test-cluster1", addresses, NewBrokerConf("test"))
if err != nil {
	panic(err)
}

// create new consumer
conf := NewConsumerConf("my-messages", 0)
conf.StartOffset = StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
	panic(err)
}

// read all messages
for {
	msg, err := consumer.Consume()
	if err != nil {
		if err == ErrNoData {
			break
		}
		panic(err)
	}

	fmt.Printf("message: %#v", msg)
}
Output:

type ConsumerConf

type ConsumerConf struct {
	// Topic name that should be consumed
	Topic string

	// Partition ID that should be consumed.
	Partition int32

	// RequestTimeout controls fetch request timeout. This operation is
	// blocking the whole connection, so it should always be set to a small
	// value. By default it's set to 50ms.
	// To control fetch function timeout use RetryLimit and RetryWait.
	RequestTimeout time.Duration

	// RetryLimit limits fetching messages a given amount of times before
	// returning ErrNoData error.
	//
	// Default is -1, which turns this limit off.
	RetryLimit int

	// RetryWait controls the duration of wait between fetch request calls,
	// when no data was returned. This follows an exponential backoff model
	// so that we don't overload servers that have very little data.
	//
	// Default is 50ms.
	RetryWait time.Duration

	// RetryErrLimit limits the number of retry attempts when an error is
	// encountered.
	//
	// Default is 10.
	RetryErrLimit int

	// RetryErrWait controls the wait duration between retries after failed
	// fetch request. This follows the exponential backoff curve.
	//
	// Default is 500ms.
	RetryErrWait time.Duration

	// MinFetchSize is the minimum size of messages to fetch in bytes.
	//
	// Default is 1 to fetch any message available.
	MinFetchSize int32

	// MaxFetchSize is the maximum size of data which can be sent by kafka node
	// to consumer.
	//
	// Default is 2000000 bytes.
	MaxFetchSize int32

	// Consumer cursor starting point. Set to StartOffsetNewest to receive only
	// newly created messages or StartOffsetOldest to read everything. Assign
	// any offset value to manually set cursor -- consuming starts with the
	// message whose offset is equal to given value (including first message).
	//
	// Default is StartOffsetOldest.
	StartOffset int64
}

ConsumerConf represents consumer configuration.

func NewConsumerConf

func NewConsumerConf(topic string, partition int32) ConsumerConf

NewConsumerConf returns the default consumer configuration.

type DistributingProducer

type DistributingProducer interface {
	Distribute(topic string, messages ...*proto.Message) (partition int32, offset int64, err error)
}

DistributingProducer is the interface similar to Producer, but never require to explicitly specify partition.

Distribute writes messages to the given topic, automatically choosing partition, returning the post-commit offset and any error encountered. The offset of each message is also updated accordingly.

func NewErrorAverseRRProducer

func NewErrorAverseRRProducer(conf *errorAverseRRProducerConf) DistributingProducer

type MetadataCache

type MetadataCache struct {
	// contains filtered or unexported fields
}

MetadataCache is a threadsafe cache of ClusterMetadata by clusterName. Entries are never removed or changed so the implementation is straightforward: use the existing entry if it exists, otherwise take the lock, check again if there is an existing entry, and add a new entry if not.

type NoConnectionsAvailable

type NoConnectionsAvailable struct{}

NoConnectionsAvailable indicates that the connection pool is full.

func (NoConnectionsAvailable) Error

type NoPartitionsAvailable

type NoPartitionsAvailable struct{}

func (NoPartitionsAvailable) Error

func (NoPartitionsAvailable) Error() string

type NodeMap

type NodeMap map[int32]string

NodeMap maps a broker node ID to a connection handle.

type OffsetCoordinator

type OffsetCoordinator interface {
	Commit(topic string, partition int32, offset int64) error
	Offset(topic string, partition int32) (offset int64, metadata string, err error)
}

OffsetCoordinator is the interface which wraps the Commit and Offset methods.

Example
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := NewBroker("test-cluster2", addresses, NewBrokerConf("test"))
if err != nil {
	panic(err)
}

// create offset coordinator and customize configuration
conf := NewOffsetCoordinatorConf("my-consumer-group")
conf.RetryErrLimit = 20
coordinator, err := broker.OffsetCoordinator(conf)
if err != nil {
	panic(err)
}

// write consumed message offset for topic/partition
if err := coordinator.Commit("my-topic", 0, 12); err != nil {
	panic(err)
}

// get latest consumed offset for given topic/partition
off, _, err := coordinator.Offset("my-topic", 0)
if err != nil {
	panic(err)
}

if off != 12 {
	panic(fmt.Sprintf("offset is %d, not 12", off))
}
Output:

type OffsetCoordinatorConf

type OffsetCoordinatorConf struct {
	ConsumerGroup string

	// RetryErrLimit limits messages fetch retry upon failure. By default 10.
	RetryErrLimit int

	// RetryErrWait controls wait duration between retries after failed fetch
	// request. By default 500ms.
	RetryErrWait time.Duration
}

OffsetCoordinatorConf is configuration for the offset coordinatior.

func NewOffsetCoordinatorConf

func NewOffsetCoordinatorConf(consumerGroup string) OffsetCoordinatorConf

NewOffsetCoordinatorConf returns default OffsetCoordinator configuration.

type PartitionCountSource

type PartitionCountSource interface {
	PartitionCount(topic string) (count int32, err error)
}

PartitionCountSource lets a DistributingProducer determine how many partitions exist for a particular topic. Broker fulfills this interface but a cache could be used instead.

type Producer

type Producer interface {
	Produce(topic string, partition int32, messages ...*proto.Message) (offset int64, err error)
}

Producer is the interface that wraps the Produce method.

Produce writes the messages to the given topic and partition. It returns the offset of the first message and any error encountered. The offset of each message is also updated accordingly.

Example
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := NewBroker("test-cluster3", addresses, NewBrokerConf("test"))
if err != nil {
	panic(err)
}

// create new producer
conf := NewProducerConf()
conf.RequiredAcks = proto.RequiredAcksLocal

// write two messages to kafka using single call to make it atomic
producer := broker.Producer(conf)
messages := []*proto.Message{
	{Value: []byte("first")},
	{Value: []byte("second")},
}
if _, err := producer.Produce("my-messages", 0, messages...); err != nil {
	panic(err)
}
Output:

type ProducerConf

type ProducerConf struct {
	// Compression method to use, defaulting to proto.CompressionNone.
	Compression proto.Compression

	// Timeout of single produce request. By default, 5 seconds.
	RequestTimeout time.Duration

	// Message ACK configuration. Use proto.RequiredAcksAll to require all
	// servers to write, proto.RequiredAcksLocal to wait only for leader node
	// answer or proto.RequiredAcksNone to not wait for any response.
	// Setting this to any other, greater than zero value will make producer to
	// wait for given number of servers to confirm write before returning.
	RequiredAcks int16

	// RetryLimit specify how many times message producing should be retried in
	// case of failure, before returning the error to the caller. By default
	// set to 10.
	RetryLimit int

	// RetryWait specify wait duration before produce retry after failure. This
	// is subject to exponential backoff.
	//
	// Defaults to 200ms.
	RetryWait time.Duration
}

ProducerConf is the configuration for a producer.

func NewProducerConf

func NewProducerConf() ProducerConf

NewProducerConf returns a default producer configuration.

Directories

Path Synopsis
Package kafkatest provides mock objects for high level kafka interface.
Package kafkatest provides mock objects for high level kafka interface.
Package proto provides kafka binary protocol implementation.
Package proto provides kafka binary protocol implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL