consumer

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2015 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrBufferOverflow

type ErrBufferOverflow error

type ErrRequestTimeout

type ErrRequestTimeout error

type ErrSetup

type ErrSetup error

type Int32Slice

type Int32Slice []int32

func (Int32Slice) Len

func (p Int32Slice) Len() int

func (Int32Slice) Less

func (p Int32Slice) Less(i, j int) bool

func (Int32Slice) Swap

func (p Int32Slice) Swap(i, j int)

type T

type T struct {
	// contains filtered or unexported fields
}

SmartConsumer is a Kafka consumer implementation that automatically maintains consumer groups registrations and topic subscriptions. Whenever a a message from a particular topic is consumed by a particular consumer group SmartConsumer checks if it has registered with the consumer group, and registers otherwise. Then it checks if it has subscribed for the topic, and subscribes otherwise. Later if a particular topic has not been consumed for the `Config.Consumer.RegistrationTimeout` period of time, then the consumer unsubscribes from the topic, likewise if a consumer group has not seen any requests for that period then the consumer deregisters from the group.

func Spawn

func Spawn(cfg *config.T) (*T, error)

Spawn creates a consumer instance with the specified configuration and starts all its goroutines.

func (*T) Consume

func (sc *T) Consume(group, topic string) (*sarama.ConsumerMessage, error)

Consume consumes a message from the specified topic on behalf of the specified consumer group. If there are no more new messages in the topic at the time of the request then it will block for `Config.Consumer.LongPollingTimeout`. If no new message is produced during that time, then `ErrConsumerRequestTimeout` is returned.

Note that during state transitions topic subscribe<->unsubscribe and consumer group register<->deregister the method may return either `ErrConsumerBufferOverflow` or `ErrConsumerRequestTimeout` even when there are messages available for consumption. In that case the user should back off a bit and then repeat the request.

func (*T) Stop

func (sc *T) Stop()

Stop sends the shutdown signal to all internal goroutines and blocks until all of them are stopped. It is guaranteed that all last consumed offsets of all consumer groups/topics are committed to Kafka before SmartConsumer stops.

func (*T) String

func (sc *T) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL