Documentation ¶
Index ¶
Constants ¶
const ( // OffsetNewest defines the newest offset to read from using the consumer OffsetNewest = -1 // OffsetOldest defines the oldest offset to read from using the consumer OffsetOldest = -2 )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Assignment ¶
Assignment represents a partition:offset assignment for the current connection
type Consumer ¶
type Consumer interface { Events() <-chan Event // group consume assumes co-partioned topics // define input topics to consume Subscribe(topics map[string]int64) error // marks the consumer ready to start consuming the messages AddGroupPartition(partition int32) Commit(topic string, partition int32, offset int64) error // consume individual topic/partitions AddPartition(topic string, partition int32, initialOffset int64) error RemovePartition(topic string, partition int32) error // Close stops closes the events channel Close() error }
Consumer abstracts a kafka consumer
func DefaultConsumerBuilder ¶
DefaultConsumerBuilder creates a Kafka consumer using the Sarama library.
type ConsumerBuilder ¶
ConsumerBuilder creates a Kafka consumer.
func ConsumerBuilderWithConfig ¶
func ConsumerBuilderWithConfig(config *cluster.Config) ConsumerBuilder
ConsumerBuilderWithConfig creates a Kafka consumer using the Sarama library.
type Error ¶
type Error struct {
Err error
}
Error from kafka wrapped to be conform with the Event-Interface
type Event ¶
type Event interface {
// contains filtered or unexported methods
}
Event abstracts different types of events from the kafka consumer like BOF/EOF/Error or an actual message
type Message ¶
type Message struct { Topic string Partition int32 Offset int64 Timestamp time.Time Header map[string][]byte Key string Value []byte }
Message represents a message from kafka containing extra information like topic, partition and offset for convenience
type Producer ¶
type Producer interface { // Emit sends a message to topic. Emit(topic string, key string, value []byte) *Promise Close() error }
Producer abstracts the kafka producer
type ProducerBuilder ¶
type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)
ProducerBuilder create a Kafka producer.
func ProducerBuilderWithConfig ¶
func ProducerBuilderWithConfig(config *cluster.Config) ProducerBuilder
ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
type Promise ¶
Promise as in https://en.wikipedia.org/wiki/Futures_and_promises
type TopicManager ¶
type TopicManager interface { // EnsureTableExists checks that a table (log-compacted topic) exists, or create one if possible EnsureTableExists(topic string, npar int) error // EnsureStreamExists checks that a stream topic exists, or create one if possible EnsureStreamExists(topic string, npar int) error // EnsureTopicExists checks that a topic exists, or create one if possible, // enforcing the given configuration EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error // Partitions returns the number of partitions of a topic, that are assigned to the running // instance, i.e. it doesn't represent all partitions of a topic. Partitions(topic string) ([]int32, error) // Close closes the topic manager Close() error }
TopicManager provides an interface to create/check topics and their partitions
func DefaultTopicManagerBuilder ¶
func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error)
DefaultTopicManagerBuilder creates TopicManager using the Sarama library. This topic manager cannot create topics.
func NewSaramaTopicManager ¶
func NewSaramaTopicManager(brokers []string, config *sarama.Config) (TopicManager, error)
NewSaramaTopicManager creates a new topic manager using the sarama library
func NewTopicManager ¶
func NewTopicManager(servers []string, config *TopicManagerConfig) (TopicManager, error)
NewTopicManager creates a new topic manager for managing topics with zookeeper
type TopicManagerBuilder ¶
type TopicManagerBuilder func(brokers []string) (TopicManager, error)
TopicManagerBuilder creates a TopicManager to check partition counts and create tables.
func TopicManagerBuilderWithConfig ¶
func TopicManagerBuilderWithConfig(config *cluster.Config) TopicManagerBuilder
TopicManagerBuilderWithConfig creates TopicManager using the Sarama library. This topic manager cannot create topics.
func ZKTopicManagerBuilder ¶
func ZKTopicManagerBuilder(servers []string) TopicManagerBuilder
ZKTopicManagerBuilder creates a TopicManager that connects with ZooKeeper to check partition counts and create tables.
func ZKTopicManagerBuilderWithConfig ¶
func ZKTopicManagerBuilderWithConfig(servers []string, config *TopicManagerConfig) TopicManagerBuilder
ZKTopicManagerBuilderWithConfig creates a TopicManager that connects with ZooKeeper to check partition counts and create tables given a topic configuration.
type TopicManagerConfig ¶
type TopicManagerConfig struct { Table struct { Replication int } Stream struct { Replication int Retention time.Duration } }
TopicManagerConfig contains the configuration to access the Zookeeper servers as well as the desired options of to create tables and stream topics.
func NewTopicManagerConfig ¶
func NewTopicManagerConfig() *TopicManagerConfig
NewTopicManagerConfig provides a default configuration for auto-creation with replication factor of 1 and rentention time of 1 hour.