Documentation ¶
Index ¶
- Variables
- type Claimer
- type Client
- func (client *Client) Claim(consumed *sarama.ConsumerMessage) (err error)
- func (client *Client) Close() error
- func (client *Client) Connect(brokers []string, config *sarama.Config, initialOffset int64, ...) error
- func (client *Client) Healthy() bool
- func (client *Client) Subscribe(topics ...types.Topic) (<-chan *types.Message, error)
- func (client *Client) Unsubscribe(sub <-chan *types.Message) error
- type GroupHandle
- func (handle *GroupHandle) Cleanup(session sarama.ConsumerGroupSession) error
- func (handle *GroupHandle) Close() error
- func (handle *GroupHandle) Connect(conn sarama.Client, topics []string, group string) error
- func (handle *GroupHandle) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (handle *GroupHandle) Setup(sarama.ConsumerGroupSession) error
- type Handle
- type HandleType
- type PartitionConsumer
- type PartitionHandle
- func (handle *PartitionHandle) Close() error
- func (handle *PartitionHandle) Connect(conn sarama.Client, topics []string, initialOffset int64) error
- func (handle *PartitionHandle) Heartbeat()
- func (handle *PartitionHandle) PartitionConsumer(topic string, partition int32) error
- func (handle *PartitionHandle) PullPartitions(topic string) ([]int32, error)
- func (handle *PartitionHandle) Rebalance() error
- type Subscription
- type Topic
- type TopicPartitionConsumers
Constants ¶
This section is empty.
Variables ¶
var ( // ErrRetry error retry type representation ErrRetry = errors.New("retry message") )
Functions ¶
This section is empty.
Types ¶
type Claimer ¶
type Claimer interface {
Claim(*sarama.ConsumerMessage)
}
Claimer represents a consumer message claimer struct
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client consumes kafka messages
func (*Client) Claim ¶
func (client *Client) Claim(consumed *sarama.ConsumerMessage) (err error)
Claim consumes and emit's the given Kafka message to the subscribed subscriptions. All subscriptions are awaited untill done. An error is returned if one of the subscriptions failed to process the message.
func (*Client) Connect ¶ added in v0.2.1
func (client *Client) Connect(brokers []string, config *sarama.Config, initialOffset int64, ts ...types.Topic) error
Connect opens a new Kafka consumer
type GroupHandle ¶
type GroupHandle struct {
// contains filtered or unexported fields
}
GroupHandle represents a Sarama consumer group consumer handle
func NewGroupHandle ¶
func NewGroupHandle(client *Client) *GroupHandle
NewGroupHandle initializes a new GroupHandle
func (*GroupHandle) Cleanup ¶
func (handle *GroupHandle) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*GroupHandle) Close ¶
func (handle *GroupHandle) Close() error
Close closes the group consume handle and awaits till all claimed messages are processed. The consumer group get's marked for closing
func (*GroupHandle) Connect ¶
Connect initializes a new Sarama consumer group and awaits till the consumer group is set up and ready to consume messages.
func (*GroupHandle) ConsumeClaim ¶
func (handle *GroupHandle) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). When a Kafka message is claimed is it passed to the client Claim method. If an error occured during processing of the claimed message is the message marked to be retried.
func (*GroupHandle) Setup ¶
func (handle *GroupHandle) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type HandleType ¶
type HandleType int8
HandleType represents the type of consumer that is adviced to use for the given connectionstring
const ( PartitionConsumerHandle HandleType = 0 GroupConsumerHandle HandleType = 1 )
Plausible consumer types
type PartitionConsumer ¶
type PartitionConsumer struct {
// contains filtered or unexported fields
}
PartitionConsumer represents a single partition consumer
type PartitionHandle ¶
type PartitionHandle struct {
// contains filtered or unexported fields
}
PartitionHandle represents a Sarama partition consumer
func NewPartitionHandle ¶
func NewPartitionHandle(client *Client) *PartitionHandle
NewPartitionHandle initializes a new PartitionHandle
func (*PartitionHandle) Close ¶
func (handle *PartitionHandle) Close() error
Close closes the given consumer and all topic partition consumers. First are all partition consumers closed before the client consumer is closed.
func (*PartitionHandle) Connect ¶
func (handle *PartitionHandle) Connect(conn sarama.Client, topics []string, initialOffset int64) error
Connect initializes a new Sarama partition consumer and awaits till the consumer group is set up and ready to consume messages.
func (*PartitionHandle) Heartbeat ¶
func (handle *PartitionHandle) Heartbeat()
Heartbeat set's up a new time ticker that checks every time if the partition count has changed for the consumed topics. By default does the heartbeat tick every 1500ms
func (*PartitionHandle) PartitionConsumer ¶
func (handle *PartitionHandle) PartitionConsumer(topic string, partition int32) error
PartitionConsumer set's up a new partition consumer for the given topic and partition
func (*PartitionHandle) PullPartitions ¶
func (handle *PartitionHandle) PullPartitions(topic string) ([]int32, error)
PullPartitions pulls the available partitions for the set topics. If the partition count has changed are the new partitions returned.
func (*PartitionHandle) Rebalance ¶
func (handle *PartitionHandle) Rebalance() error
Rebalance pulls the latest available topics and starts new partition consumers when nessasery.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription represents a consumer topic(s) subscription
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic represents a thread safe list of subscriptions
type TopicPartitionConsumers ¶
type TopicPartitionConsumers struct {
// contains filtered or unexported fields
}
TopicPartitionConsumers represents a topic and it's partition consumers
func (*TopicPartitionConsumers) ClaimMessages ¶
func (tc *TopicPartitionConsumers) ClaimMessages(topic string, partition int32, consumer sarama.PartitionConsumer)
ClaimMessages handles the claiming of consumed messages
func (*TopicPartitionConsumers) Consume ¶
func (tc *TopicPartitionConsumers) Consume(partition int32) error
Consume opens a new consumer for the given partition
func (*TopicPartitionConsumers) Delist ¶
func (tc *TopicPartitionConsumers) Delist(consumer *PartitionConsumer) error
Delist unlists the consumer as available