Documentation
¶
Index ¶
Constants ¶
const ( // OffsetNewest defines the newest offset to read from using the consumer OffsetNewest = -1 // OffsetOldest defines the oldest offset to read from using the consumer OffsetOldest = -2 )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Assignment ¶
Assignment represents a partition:offset assignment for the current connection
type Consumer ¶
type Consumer interface { Events() <-chan Event // group consume assumes co-partioned topics Subscribe(topics map[string]int64) error AddGroupPartition(partition int32) Commit(topic string, partition int32, offset int64) error // consume individual topic/partitions AddPartition(topic string, partition int32, initialOffset int64) RemovePartition(topic string, partition int32) // Close stops closes the events channel Close() error }
Consumer abstracts a kafka consumer
type Error ¶
type Error struct {
Err error
}
Error from kafka wrapped to be conform with the Event-Interface
type Event ¶
type Event interface {
// contains filtered or unexported methods
}
Event abstracts different types of events from the kafka consumer like BOF/EOF/Error or an actual message
type Message ¶
Message represents a message from kafka containing extra information like topic, partition and offset for convenience
type Producer ¶
type Producer interface { // Emit sends a message to topic. // TODO (franz): this method should return a promise, instead of getting one. // Otherwise a callback is sufficient Emit(topic string, key string, value []byte) *Promise Close() error }
Producer abstracts the kafka producer
type Promise ¶
Promise as in https://en.wikipedia.org/wiki/Futures_and_promises
type TopicManager ¶
type TopicManager interface { // EnsureTableExists checks that a table (log-compacted topic) exists, or create one if possible EnsureTableExists(topic string, npar int) error // EnsureStreamExists checks that a stream topic exists, or create one if possible EnsureStreamExists(topic string, npar int) error // Partitions returns the number of partitions of a topic, that are assigned to the running // instance, i.e. it doesn't represent all partitions of a topic. Partitions(topic string) ([]int32, error) // Close closes the topic manager Close() error }
TopicManager provides an interface to create/check topics and their partitions
func NewSaramaTopicManager ¶
func NewSaramaTopicManager(brokers []string) (TopicManager, error)
NewSaramaTopicManager creates a new topic manager using the sarama library
func NewTopicManager ¶
func NewTopicManager(servers []string, config *TopicManagerConfig) (TopicManager, error)
NewTopicManager creates a new topic manager for managing topics with zookeeper
type TopicManagerConfig ¶
type TopicManagerConfig struct { Table struct { Replication int } Stream struct { Replication int Retention time.Duration } }
TopicManagerConfig contains the configuration to access the Zookeeper servers as well as the desired options of to create tables and stream topics.
func NewTopicManagerConfig ¶
func NewTopicManagerConfig() *TopicManagerConfig
NewTopicManagerConfig provides a default configuration for auto-creation with replication factor of 1 and rentention time of 1 hour.