Documentation ¶
Index ¶
- Variables
- type Config
- type ConsumerGroup
- func (cg *ConsumerGroup) Close() error
- func (cg *ConsumerGroup) Closed() bool
- func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error
- func (cg *ConsumerGroup) Errors() <-chan *sarama.ConsumerError
- func (cg *ConsumerGroup) InstanceRegistered() (bool, error)
- func (cg *ConsumerGroup) Logf(format string, args ...interface{})
- func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage
- type OffsetManager
- type OffsetManagerConfig
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var (
AlreadyClosing = errors.New("The consumer group is already shutting down.")
)
var (
UncleanClose = errors.New("Not all offsets were committed before shutdown was completed")
)
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { *sarama.Config Zookeeper *kazoo.Config Offsets struct { Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest. ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute. CommitInterval time.Duration // The interval between which the processed offsets are commited. ResetOffsets bool // Resets the offsets for the consumergroup so that it won't resume from where it left off previously. } }
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
The ConsumerGroup type holds all the information for a consumer that is part of a consumer group. Call JoinConsumerGroup to start a consumer.
Example ¶
consumer, consumerErr := JoinConsumerGroup( "ExampleConsumerGroup", []string{TopicWithSinglePartition, TopicWithMultiplePartitions}, zookeeperPeers, nil) if consumerErr != nil { log.Fatalln(consumerErr) } c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) go func() { <-c consumer.Close() }() eventCount := 0 for event := range consumer.Messages() { // Process event log.Println(string(event.Value)) eventCount += 1 // Ack event consumer.CommitUpto(event) } log.Printf("Processed %d events.", eventCount)
Output:
func JoinConsumerGroup ¶
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error)
Connects to a consumer group, using Zookeeper for auto-discovery
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close() error
func (*ConsumerGroup) Closed ¶
func (cg *ConsumerGroup) Closed() bool
func (*ConsumerGroup) CommitUpto ¶
func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error
func (*ConsumerGroup) Errors ¶
func (cg *ConsumerGroup) Errors() <-chan *sarama.ConsumerError
Returns a channel that you can read to obtain events from Kafka to process.
func (*ConsumerGroup) InstanceRegistered ¶
func (cg *ConsumerGroup) InstanceRegistered() (bool, error)
func (*ConsumerGroup) Logf ¶
func (cg *ConsumerGroup) Logf(format string, args ...interface{})
func (*ConsumerGroup) Messages ¶
func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage
Returns a channel that you can read to obtain events from Kafka to process.
type OffsetManager ¶
type OffsetManager interface { // InitializePartition is called when the consumergroup is starting to consume a // partition. It should return the last processed offset for this partition. Note: // the same partition can be initialized multiple times during a single run of a // consumer group due to other consumer instances coming online and offline. InitializePartition(topic string, partition int32) (int64, error) // MarkAsProcessed tells the offset manager than a certain message has been successfully // processed by the consumer, and should be committed. The implementation does not have // to store this offset right away, but should return true if it intends to do this at // some point. // // Offsets should generally be increasing if the consumer // processes events serially, but this cannot be guaranteed if the consumer does any // asynchronous processing. This can be handled in various ways, e.g. by only accepting // offsets that are higehr than the offsets seen before for the same partition. MarkAsProcessed(topic string, partition int32, offset int64) bool // FinalizePartition is called when the consumergroup is done consuming a // partition. In this method, the offset manager can flush any remaining offsets to its // backend store. It should return an error if it was not able to commit the offset. // Note: it's possible that the consumergroup instance will start to consume the same // partition again after this function is called. FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error // Close is called when the consumergroup is shutting down. In normal circumstances, all // offsets are committed because FinalizePartition is called for all the running partition // consumers. You may want to check for this to be true, and try to commit any outstanding // offsets. If this doesn't succeed, it should return an error. Close() error }
OffsetManager is the main interface consumergroup requires to manage offsets of the consumergroup.
func NewZookeeperOffsetManager ¶
func NewZookeeperOffsetManager(cg *ConsumerGroup, config *OffsetManagerConfig) OffsetManager
NewZookeeperOffsetManager returns an offset manager that uses Zookeeper to store offsets.
type OffsetManagerConfig ¶
type OffsetManagerConfig struct { CommitInterval time.Duration // Interval between offset flushes to the backend store. VerboseLogging bool // Whether to enable verbose logging. }
OffsetManagerConfig holds configuration setting son how the offset manager should behave.
func NewOffsetManagerConfig ¶
func NewOffsetManagerConfig() *OffsetManagerConfig
NewOffsetManagerConfig returns a new OffsetManagerConfig with sane defaults.