Documentation ¶
Overview ¶
Package consumer provides kafka 0.9 consumer groups on top of the low level Sarama kafka package.
Consumer groups distribute topics' partitions dynamically across group members, and restart at the last comitted offset of each partition.
This requires Kafka v0.9+ and follows the steps guide, described in: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
CONFIGURATION ¶
Three customization APIs may be set in the Config:
Config.Offset.OffsetOutOfRange func(topic, partition, sarama.Client) (restart_offset, error) allows users to decide how to react to falling off the tail of the kafka log. The default is to restart at the newest offset. However depending on the use case restarting at an offset T time in the past, or even the oldest offset, may make more sense.
Config.StartingOffset func(topic, partition, committed_offset, sarama.Client) (starting_offset, error) allows users to decide where to restart when consuming a partition. The default is to restart at the committed offset, or at sarama.Config.Consumer.Offsets.Initial if the starting offset is -1 (indicating no committed offset could be found).
Config.Partitioner interface allows users to control how the consumer group distributes partitions across the group members. The default is to distribute the partitions of each topic in a round-robin fashion across the available members. This is good for basic load balancing. Round-robin is no good if it is desireable that the partitions stay at the same consumer during repartitioning.
A stable partitioner is provided by the stable package. It keeps the partition->consumer mapping stable as best it can. When one consumer restart quickly enough (within the kafka consumer heartbeat timeout) the partition mapping of the rest of the consumers is not altered. When consumers are added to the group only a minimum number of partitions are reassigned from existing consumers to the new consumers.
Using the stable partition means setting
Config.Partitioner = stable.New(false)
Passing true to stable.New() returns a stable & consistent consumer. See the documentation.
More complex partitioners, for example one which did some sort of weighted balancing, are yours to implement.
PHILOSOPHY ¶
The consumer API has three rules the calling code must abide: messages must be passed to Consumer.Done() once each message does not need to be replayed, Client.Errors() must be consumed, and Client.Close() or Consumer.AsyncClose() must be called to clean up resources if your code wishes to stop consuming messages.
Kafka's rule that [if consumers keep up] all messages will be seen at least once, and possibly many times always applies.
The API of this package deliberately does not wrap or otherwise hide the underlying sarama API. I believe doing so is a waste of CPU time, generates more work for the gc, and makes building on top of a package harder than it should be. It also makes no assumptions about how the caller's work should be done. There are no requirements to process messages in order, nor does it dictate a go-routine organization on the caller. I've applied RFC1925 #5 and #12 as best I can.
I've used other kafka APIs which did wrap and impose structure and found them difficult to really use, and as a reaction I try not to impose such APIs on others (nor on myself) even if it means the calling code is a little more complex.
(For example you have to create a suitably configured samara.Client yourself before calling NewClient. That's 3 more lines of code, but it also lets you tune the samara.Client's config just as you need it to be, or even mock the client for test.)
The simple use case of this package is shown in the NewClient example code.
Index ¶
- Variables
- func DefaultOffsetOutOfRange(topic string, partition int32, client sarama.Client) (int64, error)
- func DefaultStartingOffset(topic string, partition int32, offset int64, client sarama.Client) (int64, error)
- type AssignmentNotification
- type Client
- type Config
- type Consumer
- type Error
- type OffsetOutOfRange
- type PartitionStartNotification
- type Partitioner
- type SidechannelMsg
- type SidechannelOffset
- type StartingOffset
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var Logf func(fmt string, args ...interface{}) = log.Printf
low level logging function. Replace it with your own if desired before making any calls to the rest of the API
var MinVersion = sarama.V0_9_0_0
minimum kafka API version required. Use this when constructing the sarama.Client's sarama.Config.MinVersion
Functions ¶
func DefaultOffsetOutOfRange ¶
default implementation of Config.OffsetOutOfRange jumps to the current head of the partition.
Types ¶
type AssignmentNotification ¶
type Client ¶
type Client interface { // Consume returns a consumer of the given topic Consume(topic string) (Consumer, error) // ConsumeMany starts consuming many topics at once. It is much more efficient than calling Consume // repeatedly because kafka brokers serialize joining topics ConsumeMany(topics []string) ([]Consumer, error) // Close closes the client. It must be called to shutdown // the client. It cleans up any unclosed topic Consumers created by this Client. // It does NOT close the inner sarama.Client. // Calling twice is NOT supported. Close() // Errors returns a channel which can (should) be monitored // for errors. callers should probably log or otherwise report // the returned errors. The channel closes when the client // is closed. Errors() <-chan error }
Client is a kafaka client belonging to a consumer group. It is created by NewClient.
func NewClient ¶
NewClient creates a new consumer group client on top of an existing sarama.Client.
After this call the contents of config should be treated as read-only. config can be nil if the defaults are acceptable.
The consumer group name is used to match this client with other instances running elsewhere, but connected to the same cluster of kafka brokers and using the same consumer group name.
The supplied sarama.Client should have been constructed with a sarama.Config where sarama.Config.Version is >= consumer.MinVersion, and if full handling of ErrOffsetOutOfRange is desired, sarama.Config.Consumer.Return.Errors = true.
In addition, this package uses the settings in sarama.Config.Consumer.Offsets and sarama.Config.Metadata.RefreshFrequency
Example ¶
package main import ( "fmt" "time" "github.com/Shopify/sarama" consumer "github.com/mistsys/sarama-consumer" "github.com/mistsys/sarama-consumer/offsets" "github.com/mistsys/sarama-consumer/stable" ) func main() { // create a suitable sarama.Client sconfig := sarama.NewConfig() sconfig.Version = consumer.MinVersion // consumer requires at least 0.9 sconfig.Consumer.Return.Errors = true // needed if asynchronous ErrOffsetOutOfRange handling is desired (it's a good idea) sclient, _ := sarama.NewClient([]string{"kafka-broker:9092"}, sconfig) // from that, create a consumer.Config with some fancy options config := consumer.NewConfig() config.Partitioner = stable.New(false) // use a stable (but inconsistent) partitioner config.StartingOffset, config.OffsetOutOfRange = offsets.NoOlderThan(time.Second * 30) // always start and restart no more than 30 seconds in the past (NOTE: requires kafka 0.10 brokers to work properly) // and finally a consumer Client client, _ := consumer.NewClient("group_name", config, sclient) defer client.Close() // not strictly necessary, since we don't exit, but this is example code and someone might C&V it and exit // consume and print errors go func() { for err := range client.Errors() { fmt.Println(err) } }() // consume a topic topic_consumer, _ := client.Consume("topic1") defer topic_consumer.AsyncClose() // same comment as for client.Close() above // process messages for msg := range topic_consumer.Messages() { fmt.Println("processing message", msg) topic_consumer.Done(msg) // required } }
Output:
type Config ¶
type Config struct { Session struct { // The allowed session timeout for registered consumers (defaults to 30s). // Must be within the allowed server range. Timeout time.Duration } Rebalance struct { // The allowed rebalance timeout for registered consumers (defaults to 30s). // Must be within the allowed server range. Only functions if sarama.Config.Version >= 0.10.1 // Otherwise Session.Timeout is used for rebalancing too. Timeout time.Duration } Heartbeat struct { // Interval between each heartbeat (defaults to 3s). It should be no more // than 1/3rd of the Group.Session.Timout setting Interval time.Duration } // the partitioner used to map partitions to consumer group members (defaults to a round-robin partitioner) Partitioner Partitioner // OffsetOutOfRange is the handler for sarama.ErrOffsetOutOfRange errors (defaults to sarama.OffsetNewest,nil), // and messages older than MaxMessageAge if MaxMessageAge is set. // Implementations must return the new starting offset in the partition, or an error. The sarama.Client is included // for convenience, since handling this might involve querying the partition's current offsets. OffsetOutOfRange OffsetOutOfRange // StartingOffset is a hook to allow modifying the starting offset when a Consumer begins to consume // a partition. (defaults to returning the last committed offset). Some consumers might want to jump // ahead to fresh messages. The sarama.Client is included for convenience, since handling this might involve // looking up a partition's offset by time. When no committed offset could be found -1 (sarama.OffsetNewest) // is passed in. An implementation might want to return client.Config().Consumer.Offsets.Initial in that case. StartingOffset StartingOffset // SidechannelOffset is a kafka topic used to exchange partition offsets between dying and rebalancing consumers. // It defaults to "sarama-consumer-sidechannel-offsets". If SidechannelTopic is "" then this feature is disabled, // and consumers can rewind as much as Config.Rebalance.Timeout + sarama.Config.Offset.CommitInterval // when a partition is reassigned. That's always possible (kafka only promises at-least-once), but in high frequency // topics rewinding the default 30 seconds creates a measureable burst). // We can't comit offsets normally during a rebalance because at that point in time we still belong to the old generation, // but the broker belongs to the new generation. Hence this side channel. SidechannelTopic string // AssignmentNotification is an optional callback to inform the client code whenever the client gets a new // partition assignment. AssignmentNotification AssignmentNotification // InOrderDone disables extra processing which permits Done() to be called out of ordera. // If InOrderDone is true then Done() does not have to be be called for every message. // The caller can wait and call Done() once to indicate that processing is complete for // all messages up to and including the argument message in the argument's partition. // This is not the historical behavior, so it is disabled by default. InOrderDone bool // NoMessages disables fetching and receiving sarama.ConsumerMessage. So the consumer group participation is performed, // but fetching the kafka messages is left to the caller's code (presumably using sarama.Broker.Fetch or similar low level API) // NoMessages requires InOrderDone, since without seeing the messages ingress the group consumer cannot keep track of which // haven't been completed. NoMessages bool // PartitionStartNotification is an optional callback to inform client code of the (partition,offset) at which we've // started consuming (or, if NoMessages, at which we think the caller should start consuming) PartitionStartNotification PartitionStartNotification // MaxMessageAge is a optional function mapping a topic to the maximum lag the consumer should try and maintain in that topic. If the consumer // lags more than MaxMessageAge (as compared with the sarama.ConsumerMessage.Timestamp) it declares an OffsetOutOfRange condition, and restarts // where OffsetOutOfRange() indicates. (by default that's OffsetNewest, which might not be what you want, so override OffsetOutOfRange too) // Note that this is not absolute, since there's always some lag, and pipelining, and in a low message frequency // partition MaxMessageAge might not make sense. Returning 0 disables this functionality. MaxMessageAge func(topic string) time.Duration }
Config is the configuration of a Client. Typically you'd create a default configuration with NewConfig, modify any fields of interest, and pass it to NewClient. Once passed to NewClient the Config must not be modified. (doing so leads to data races, and may caused bugs as well).
In addition to this config, consumer's code also looks at the sarama.Config of the sarama.Client supplied to NewClient, especially at the Consumer.Offsets settings, Version, Metadata.Retry.Backoff, Metadata.RefreshFrequency and ChannelBufferSize.
type Consumer ¶
type Consumer interface { // Messages returns the channel of messages arriving from kafka. It always // returns the same result, so it is safe to call once and store the result. // Every message read from the channel should be passed to Done when processing // of the message is complete. // It is not necessary to call Done in the same order as messages are received // from this channel. Messages() <-chan *sarama.ConsumerMessage // Done indicates the processing of the message is complete, and its offset can // be committed to kafka. Calling Done twice with the same message, or with a // garbage message, can cause trouble. // Calling Done on message out of order is supported, and the consumer keeps // track of the correct offset to commit to kafka. Done(*sarama.ConsumerMessage) // AsyncClose terminates the consumer cleanly. Callers can continue to read from // Messages channel until it is closed, or not, as they wish. // Calling Client.Close() performs a AsyncClose() on any remaining consumers. // Calling AsyncClose multiple times is permitted. Only the first call has any effect. // Never calling AsyncClose is also permitted. Client.Close() implies Consumer.AsyncClose. AsyncClose() // Close terminates the consumer and waits for it to be finished committing the current // offsets to kafka. Calling twice happens to work at the moment, but let's not encourage it. Close() }
Consumer is a consumer of a topic.
Messages from any partition assigned to this client arrive on the channel returned by Messages.
Every message read from the Messages channel should be eventually passed to Done, or have its topic/partition/offset passed to MarkUpTo. Calling Done is the signal that that one message has been consumed (possibly out of receive order).
type Error ¶
type Error struct { Err error // underlying error Context string // description of the context surrounding the error Consumer Consumer // nil, or Consumer which produced the error Topic string // "", or the topic which had the error Partition int32 // -1, or the partition which had the error // contains filtered or unexported fields }
Error holds the errors generated by this package
type OffsetOutOfRange ¶
type PartitionStartNotification ¶ added in v1.3.0
type Partitioner ¶
type Partitioner interface { // name this partitioner (used for log messages) Name() string // PrepareJoin prepares a JoinGroupRequest given the topics supplied. // The simplest implementation would be something like // join_req.AddGroupProtocolMetadata("<partitioner name>", &sarama.ConsumerGroupMemberMetadata{ Version: 1, Topics: topics, }) PrepareJoin(join_req *sarama.JoinGroupRequest, topics []string, current_assignments map[string][]int32) // Partition performs the partitioning. Given the requested // memberships from the JoinGroupResponse, it adds the results // to the SyncGroupRequest. Returning an error cancels everything. // The sarama.Client supplied to NewClient is included for convenince, // since performing the partitioning probably requires looking at each // topic's metadata, especially its list of partitions. Partition(*sarama.SyncGroupRequest, *sarama.JoinGroupResponse, sarama.Client) error // ParseSync parses the SyncGroupResponse and returns the map of topics // to partitions assigned to this client, or an error if the information // is not parsable. ParseSync(*sarama.SyncGroupResponse) (map[string][]int32, error) }
Partitioner maps partitions to consumer group members.
When the user wants control over the partitioning they should set Config.Partitioner to their implementation of Partitioner.
type SidechannelMsg ¶
type SidechannelMsg struct { Ver int // should be 1 ConsumerGroup string // name of the consumer group sending the offsets (also used as the kafka message key) Offsets map[string][]SidechannelOffset // map from topic to list of <partition,offset> pairs }
SidechannelMsg is what is published to and read from the Config.SidechannelTopic
type SidechannelOffset ¶
type SidechannelOffset struct { Partition int32 `json:"p"` // use short field names in JSON to keep the size of the messages low Offset int64 `json:"o"` // since there can be a lot of SidechannelOffsets in a SidechannelMsg }
SidechannelOffset contains the offset a single partition