Documentation ¶
Index ¶
- Variables
- type Config
- type ConsumerGroup
- func (cg *ConsumerGroup) Close() error
- func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error
- func (cg *ConsumerGroup) Errors() <-chan *sarama.ConsumerError
- func (cg *ConsumerGroup) ID() string
- func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage
- func (cg *ConsumerGroup) Name() string
- type OffsetManager
- type OffsetManagerConfig
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( EmptyZkAddrs = errors.New("You need to provide at least one zookeeper node address") AlreadyClosing = errors.New("The consumer group is already shutting down.") ConfigErrorOffset = errors.New("Offsets.Initial should be sarama.OffsetOldest or sarama.OffsetNewest") UncleanClose = errors.New("Not all offsets were committed before shutdown was completed") TopicPartitionNotFound = errors.New("Never consumed this topic/partition") OffsetBackwardsError = errors.New("Offset to be committed is smaller than highest processed offset") NoOffsetToCommit = errors.New("No offsets to commit") OffsetTooLarge = errors.New("Offset to be committed is larger than highest consumed offset") ErrTooManyConsumers = errors.New("Consumers more than active partitions") ErrInvalidTopic = errors.New("Invalid topic") ErrConsumerConflict = errors.New("One group can only consume one topic") ErrConnBroken = errors.New("Kafka connection broken") ErrKafkaDead = errors.New("Kakfa brokers all dead") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { *sarama.Config Zookeeper *kazoo.Config Offsets struct { // The initial offset to use if the consumer has no previously stored offset. // Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest. Initial int64 // Resets the offsets for the consumer group so that it won't resume // from where it left off previously. ResetOffsets bool // Time to wait for all the offsets for a partition to be processed // after stopping to consume from it. ProcessingTimeout time.Duration // The interval between which the processed offsets are commited. CommitInterval time.Duration } // If NoDup is true, consumer group will automatically discard the duplicated message. NoDup bool // If not PermitStandby, consumer group will emit ErrTooManyConsumers through error channel // to let client close the consumer group. PermitStandby bool // If OneToOne is true, a single consumer group can only consumer a single topic. OneToOne bool }
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
The ConsumerGroup type holds all the information for a consumer that is part of a consumer group. Call JoinConsumerGroup to start a consumer.
You must call Close() on a consumer group to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
Example ¶
consumer, consumerErr := JoinConsumerGroup( "ExampleConsumerGroup", []string{TopicWithSinglePartition, TopicWithMultiplePartitions}, zookeeperPeers, nil) if consumerErr != nil { log.Fatalln(consumerErr) } c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) go func() { <-c consumer.Close() }() eventCount := 0 for event := range consumer.Messages() { // Process event log.Println(string(event.Value)) eventCount += 1 // Ack event consumer.CommitUpto(event) } log.Printf("Processed %d events.", eventCount)
Output:
func JoinConsumerGroup ¶
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error)
Connects to a consumer group, using Zookeeper for auto-discovery
func JoinConsumerGroupRealIp ¶
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close() error
func (*ConsumerGroup) CommitUpto ¶
func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error
func (*ConsumerGroup) Errors ¶
func (cg *ConsumerGroup) Errors() <-chan *sarama.ConsumerError
Returns a channel that you can read to obtain errors from Kafka to process.
func (*ConsumerGroup) ID ¶
func (cg *ConsumerGroup) ID() string
func (*ConsumerGroup) Messages ¶
func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage
Returns a channel that you can read to obtain events from Kafka to process.
func (*ConsumerGroup) Name ¶
func (cg *ConsumerGroup) Name() string
type OffsetManager ¶
type OffsetManager interface { // InitializePartition is called when the consumergroup is starting to consume a // partition. It should return the last processed offset for this partition. Note: // the same partition can be initialized multiple times during a single run of a // consumer group due to other consumer instances coming online and offline. InitializePartition(topic string, partition int32) (int64, error) // MarkAsProcessed tells the offset manager that a certain message has been successfully // processed by the consumer, and should be committed. The implementation does not have // to store this offset right away, but should return true if it intends to do this at // some point. // // Offsets should generally be increasing if the consumer // processes events serially, but this cannot be guaranteed if the consumer does any // asynchronous processing. This can be handled in various ways, e.g. by only accepting // offsets that are higehr than the offsets seen before for the same partition. MarkAsProcessed(topic string, partition int32, offset int64) error MarkAsConsumed(topic string, partition int32, offset int64) error // FinalizePartition is called when the consumergroup is done consuming a // partition. In this method, the offset manager can flush any remaining offsets to its // backend store. It should return an error if it was not able to commit the offset. // Note: it's possible that the consumergroup instance will start to consume the same // partition again after this function is called. FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error // Close is called when the consumergroup is shutting down. In normal circumstances, all // offsets are committed because FinalizePartition is called for all the running partition // consumers. You may want to check for this to be true, and try to commit any outstanding // offsets. If this doesn't succeed, it should return an error. Close() error }
OffsetManager is the main interface consumergroup requires to manage offsets of the consumergroup.
func NewKafkaOffsetManager ¶
func NewKafkaOffsetManager() OffsetManager
func NewZookeeperOffsetManager ¶
func NewZookeeperOffsetManager(cg *ConsumerGroup, config *OffsetManagerConfig) OffsetManager
NewZookeeperOffsetManager returns an offset manager that uses Zookeeper to store offsets.
type OffsetManagerConfig ¶
type OffsetManagerConfig struct { CommitInterval time.Duration // Interval between offset flushes to the backend store. VerboseLogging bool // Whether to enable verbose logging. }
OffsetManagerConfig holds configuration setting son how the offset manager should behave.
func NewOffsetManagerConfig ¶
func NewOffsetManagerConfig() *OffsetManagerConfig
NewOffsetManagerConfig returns a new OffsetManagerConfig with sane defaults.