Documentation ¶
Overview ¶
Package kafka a provides high level client API for Apache Kafka.
Use 'Broker' for node connection management, 'Producer' for sending messages, and 'Consumer' for fetching. All those structures implement Client, Consumer and Producer interface, that is also implemented in kafkatest package.
Index ¶
- Constants
- Variables
- type Broker
- func (b *Broker) Close()
- func (b *Broker) Consumer(conf ConsumerConf) (Consumer, error)
- func (b *Broker) Metadata() (*proto.MetadataResp, error)
- func (b *Broker) OffsetCoordinator(conf OffsetCoordinatorConf) (OffsetCoordinator, error)
- func (b *Broker) OffsetEarliest(topic string, partition int32) (offset int64, err error)
- func (b *Broker) OffsetLatest(topic string, partition int32) (offset int64, err error)
- func (b *Broker) PartitionCount(topic string) (int32, error)
- func (b *Broker) Producer(conf ProducerConf) Producer
- type BrokerConf
- type Client
- type Consumer
- type ConsumerConf
- type DistributingProducer
- type Logger
- type Mx
- type OffsetCoordinator
- type OffsetCoordinatorConf
- type Producer
- type ProducerConf
Examples ¶
Constants ¶
const ( // StartOffsetNewest configures the consumer to fetch messages produced // after creating the consumer. StartOffsetNewest = -1 // StartOffsetOldest configures the consumer to fetch starting from the // oldest message available. StartOffsetOldest = -2 )
Variables ¶
var ErrClosed = errors.New("closed")
ErrClosed is returned as result of any request made using closed connection.
var ErrMxClosed = errors.New("closed")
ErrMxClosed is returned as a result of closed multiplexer consumption.
var ( // Returned by consumers on Fetch when the retry limit is set and exceeded. ErrNoData = errors.New("no data") )
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker is an abstract connection to kafka cluster, managing connections to all kafka nodes.
func Dial ¶
func Dial(nodeAddresses []string, conf BrokerConf) (*Broker, error)
Dial connects to any node from a given list of kafka addresses and after successful metadata fetch, returns broker.
The returned broker is not initially connected to any kafka node.
func (*Broker) Close ¶
func (b *Broker) Close()
Close closes the broker and all active kafka nodes connections.
func (*Broker) Consumer ¶
func (b *Broker) Consumer(conf ConsumerConf) (Consumer, error)
Consumer creates a new consumer instance, bound to the broker.
func (*Broker) OffsetCoordinator ¶
func (b *Broker) OffsetCoordinator(conf OffsetCoordinatorConf) (OffsetCoordinator, error)
OffsetCoordinator returns offset management coordinator for single consumer group, bound to broker.
func (*Broker) OffsetEarliest ¶
OffsetEarliest returns the oldest offset available on the given partition.
func (*Broker) OffsetLatest ¶
OffsetLatest return the offset of the next message produced in given partition
func (*Broker) PartitionCount ¶
PartitionCount returns how many partitions a given topic has. If a topic is not known, 0 and an error are returned.
func (*Broker) Producer ¶
func (b *Broker) Producer(conf ProducerConf) Producer
Producer returns new producer instance, bound to the broker.
type BrokerConf ¶
type BrokerConf struct { // Kafka client ID. ClientID string // LeaderRetryLimit limits the number of connection attempts to a single // node before failing. Use LeaderRetryWait to control the wait time // between retries. // // Defaults to 10. LeaderRetryLimit int // LeaderRetryWait sets a limit to the waiting time when trying to connect // to a single node after failure. // // Defaults to 500ms. // // Timeout on a connection is controlled by the DialTimeout setting. LeaderRetryWait time.Duration // AllowTopicCreation enables a last-ditch "send produce request" which // happens if we do not know about a topic. This enables topic creation // if your Kafka cluster is configured to allow it. // // Defaults to False. AllowTopicCreation bool // Any new connection dial timeout. // // Default is 10 seconds. DialTimeout time.Duration // DialRetryLimit limits the number of connection attempts to every node in // cluster before failing. Use DialRetryWait to control the wait time // between retries. // // Defaults to 10. DialRetryLimit int // DialRetryWait sets a limit to the waiting time when trying to establish // broker connection to single node to fetch cluster metadata. // // Defaults to 500ms. DialRetryWait time.Duration // DEPRECATED 2015-07-10 - use Logger instead // // TODO(husio) remove // // Logger used by the broker. Log interface { Print(...interface{}) Printf(string, ...interface{}) } // Logger is general logging interface that can be provided by popular // logging frameworks. Used to notify and as replacement for stdlib `log` // package. Logger Logger }
func NewBrokerConf ¶
func NewBrokerConf(clientID string) BrokerConf
type Client ¶
type Client interface { Producer(conf ProducerConf) Producer Consumer(conf ConsumerConf) (Consumer, error) OffsetCoordinator(conf OffsetCoordinatorConf) (OffsetCoordinator, error) OffsetEarliest(topic string, partition int32) (offset int64, err error) OffsetLatest(topic string, partition int32) (offset int64, err error) Close() }
Client is the interface implemented by Broker.
type Consumer ¶
Consumer is the interface that wraps the Consume method.
Consume reads a message from a consumer, returning an error when encountered.
Example ¶
// connect to kafka cluster addresses := []string{"localhost:9092", "localhost:9093"} broker, err := Dial(addresses, NewBrokerConf("test")) if err != nil { panic(err) } defer broker.Close() // create new consumer conf := NewConsumerConf("my-messages", 0) conf.StartOffset = StartOffsetNewest consumer, err := broker.Consumer(conf) if err != nil { panic(err) } // read all messages for { msg, err := consumer.Consume() if err != nil { if err == ErrNoData { break } panic(err) } fmt.Printf("message: %#v", msg) }
Output:
type ConsumerConf ¶
type ConsumerConf struct { // Topic name that should be consumed Topic string // Partition ID that should be consumed. Partition int32 // RequestTimeout controls fetch request timeout. This operation is // blocking the whole connection, so it should always be set to a small // value. By default it's set to 50ms. // To control fetch function timeout use RetryLimit and RetryWait. RequestTimeout time.Duration // RetryLimit limits fetching messages a given amount of times before // returning ErrNoData error. // // Default is -1, which turns this limit off. RetryLimit int // RetryWait controls the duration of wait between fetch request calls, // when no data was returned. // // Default is 50ms. RetryWait time.Duration // RetryErrLimit limits the number of retry attempts when an error is // encountered. // // Default is 10. RetryErrLimit int // RetryErrWait controls the wait duration between retries after failed // fetch request. // // Default is 500ms. RetryErrWait time.Duration // MinFetchSize is the minimum size of messages to fetch in bytes. // // Default is 1 to fetch any message available. MinFetchSize int32 // MaxFetchSize is the maximum size of data which can be sent by kafka node // to consumer. // // Default is 2000000 bytes. MaxFetchSize int32 // Consumer cursor starting point. Set to StartOffsetNewest to receive only // newly created messages or StartOffsetOldest to read everything. Assign // any offset value to manually set cursor -- consuming starts with the // message whose offset is equal to given value (including first message). // // Default is StartOffsetOldest. StartOffset int64 // Logger used by consumer. By default, reuse logger assigned to broker. Logger Logger }
func NewConsumerConf ¶
func NewConsumerConf(topic string, partition int32) ConsumerConf
NewConsumerConf returns the default consumer configuration.
type DistributingProducer ¶
type DistributingProducer interface {
Distribute(topic string, messages ...*proto.Message) (offset int64, err error)
}
DistributingProducer is the interface similar to Producer, but never require to explicitly specify partition.
Distribute writes messages to the given topic, automatically choosing partition, returning the post-commit offset and any error encountered. The offset of each message is also updated accordingly.
func NewHashProducer ¶
func NewHashProducer(p Producer, numPartitions int32) DistributingProducer
NewHashProducer wraps given producer and return DistributingProducer that publish messages to kafka, computing partition number from message key hash, using fnv hash and [0, numPartitions) range.
func NewRandomProducer ¶
func NewRandomProducer(p Producer, numPartitions int32) DistributingProducer
NewRandomProducer wraps given producer and return DistributingProducer that publish messages to kafka, randomly picking partition number from range [0, numPartitions)
func NewRoundRobinProducer ¶
func NewRoundRobinProducer(p Producer, numPartitions int32) DistributingProducer
NewRoundRobinProducer wraps given producer and return DistributingProducer that publish messages to kafka, choosing destination partition from cycle build from [0, numPartitions) range.
type Logger ¶
type Logger interface { Debug(msg string, args ...interface{}) Info(msg string, args ...interface{}) Warn(msg string, args ...interface{}) Error(msg string, args ...interface{}) }
Logger is general logging interface that can be provided by popular logging frameworks.
* https://github.com/go-kit/kit/tree/master/log * https://github.com/husio/log
type Mx ¶
type Mx struct {
// contains filtered or unexported fields
}
Mx is multiplexer combining into single stream number of consumers.
It is responsibility of the user of the multiplexer and the consumer implementation to handle errors. ErrNoData returned by consumer is not passed through by the multiplexer, instead consumer that returned ErrNoData is removed from merged set. When all consumers are removed (set is empty), Mx is automatically closed and any further Consume call will result in ErrMxClosed error.
It is important to remember that because fetch from every consumer is done by separate worker, most of the time there is one message consumed by each worker that is held in memory while waiting for opportunity to return it once Consume on multiplexer is called. Closing multiplexer may result in ignoring some of already read, waiting for delivery messages kept internally by every worker.
func Merge ¶
Merge is merging consume result of any number of consumers into single stream and expose them through returned multiplexer.
Example ¶
// connect to kafka cluster addresses := []string{"localhost:9092", "localhost:9093"} broker, err := Dial(addresses, NewBrokerConf("test")) if err != nil { panic(err) } defer broker.Close() topics := []string{"fruits", "vegetables"} fetchers := make([]Consumer, len(topics)) // create consumers for different topics for i, topic := range topics { conf := NewConsumerConf(topic, 0) conf.RetryLimit = 20 conf.StartOffset = StartOffsetNewest consumer, err := broker.Consumer(conf) if err != nil { panic(err) } fetchers[i] = consumer } // merge all created consumers (they don't even have to belong to the same broker!) mx := Merge(fetchers...) defer mx.Close() // consume messages from all sources for { msg, err := mx.Consume() if err != nil { panic(err) } fmt.Printf("message: %#v", msg) }
Output:
func (*Mx) Close ¶
func (p *Mx) Close()
Close is closing multiplexer and stopping all underlying workers.
Closing multiplexer will stop all workers as soon as possible, but any consume-in-progress action performed by worker has to be finished first. Any consumption result received after closing multiplexer is ignored.
Close is returning without waiting for all the workers to finish.
Closing closed multiplexer has no effect.
type OffsetCoordinator ¶
type OffsetCoordinator interface { Commit(topic string, partition int32, offset int64) error Offset(topic string, partition int32) (offset int64, metadata string, err error) }
OffsetCoordinator is the interface which wraps the Commit and Offset methods.
Example ¶
// connect to kafka cluster addresses := []string{"localhost:9092", "localhost:9093"} broker, err := Dial(addresses, NewBrokerConf("test")) if err != nil { panic(err) } defer broker.Close() // create offset coordinator and customize configuration conf := NewOffsetCoordinatorConf("my-consumer-group") conf.RetryErrLimit = 20 coordinator, err := broker.OffsetCoordinator(conf) if err != nil { panic(err) } // write consumed message offset for topic/partition if err := coordinator.Commit("my-topic", 0, 12); err != nil { panic(err) } // get latest consumed offset for given topic/partition off, _, err := coordinator.Offset("my-topic", 0) if err != nil { panic(err) } if off != 12 { panic(fmt.Sprintf("offset is %d, not 12", off)) }
Output:
type OffsetCoordinatorConf ¶
type OffsetCoordinatorConf struct { ConsumerGroup string // RetryErrLimit limits messages fetch retry upon failure. By default 10. RetryErrLimit int // RetryErrWait controls wait duration between retries after failed fetch // request. By default 500ms. RetryErrWait time.Duration // Logger used by consumer. By default, reuse logger assigned to broker. Logger Logger }
func NewOffsetCoordinatorConf ¶
func NewOffsetCoordinatorConf(consumerGroup string) OffsetCoordinatorConf
NewOffsetCoordinatorConf returns default OffsetCoordinator configuration.
type Producer ¶
type Producer interface {
Produce(topic string, partition int32, messages ...*proto.Message) (offset int64, err error)
}
Producer is the interface that wraps the Produce method.
Produce writes the messages to the given topic and partition. It returns the offset of the first message and any error encountered. The offset of each message is also updated accordingly.
Example ¶
// connect to kafka cluster addresses := []string{"localhost:9092", "localhost:9093"} broker, err := Dial(addresses, NewBrokerConf("test")) if err != nil { panic(err) } defer broker.Close() // create new producer conf := NewProducerConf() conf.RequiredAcks = proto.RequiredAcksLocal // write two messages to kafka using single call to make it atomic producer := broker.Producer(conf) messages := []*proto.Message{ {Value: []byte("first")}, {Value: []byte("second")}, } if _, err := producer.Produce("my-messages", 0, messages...); err != nil { panic(err) }
Output:
type ProducerConf ¶
type ProducerConf struct { // Compression method to use, defaulting to proto.CompressionNone. Compression proto.Compression // Timeout of single produce request. By default, 5 seconds. RequestTimeout time.Duration // Message ACK configuration. Use proto.RequiredAcksAll to require all // servers to write, proto.RequiredAcksLocal to wait only for leader node // answer or proto.RequiredAcksNone to not wait for any response. // Setting this to any other, greater than zero value will make producer to // wait for given number of servers to confirm write before returning. RequiredAcks int16 // RetryLimit specify how many times message producing should be retried in // case of failure, before returning the error to the caller. By default // set to 10. RetryLimit int // RetryWait specify wait duration before produce retry after failure. By // default set to 200ms. RetryWait time.Duration // Logger used by producer. By default, reuse logger assigned to broker. Logger Logger }
func NewProducerConf ¶
func NewProducerConf() ProducerConf
NewProducerConf returns a default producer configuration.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package kafkatest provides mock objects for high level kafka interface.
|
Package kafkatest provides mock objects for high level kafka interface. |
Package proto provides kafka binary protocol implementation.
|
Package proto provides kafka binary protocol implementation. |