kafka

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2020 License: BSD-3-Clause Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// OffsetNewest defines the newest offset to read from using the consumer
	OffsetNewest = -1
	// OffsetOldest defines the oldest offset to read from using the consumer
	OffsetOldest = -2
)

Variables

This section is empty.

Functions

func NewConfig

func NewConfig() *cluster.Config

NewConfig creates a (bsm) sarama configuration with default values.

Types

type Assignment

type Assignment map[int32]int64

Assignment represents a partition:offset assignment for the current connection

type BOF

type BOF struct {
	Topic     string
	Partition int32
	Offset    int64
	Hwm       int64
}

BOF marks the beginning of a topic/partition.

type Consumer

type Consumer interface {
	Events() <-chan Event

	// group consume assumes co-partioned topics
	// define input topics to consume
	Subscribe(topics map[string]int64) error
	// marks the consumer ready to start consuming the messages
	AddGroupPartition(partition int32)
	Commit(topic string, partition int32, offset int64) error

	// consume individual topic/partitions
	AddPartition(topic string, partition int32, initialOffset int64) error
	RemovePartition(topic string, partition int32) error

	// Close stops closes the events channel
	Close() error
}

Consumer abstracts a kafka consumer

func DefaultConsumerBuilder

func DefaultConsumerBuilder(brokers []string, group, clientID string) (Consumer, error)

DefaultConsumerBuilder creates a Kafka consumer using the Sarama library.

func NewSaramaConsumer

func NewSaramaConsumer(brokers []string, group string, config *cluster.Config) (Consumer, error)

NewSaramaConsumer creates a new Consumer using sarama

type ConsumerBuilder

type ConsumerBuilder func(brokers []string, group, clientID string) (Consumer, error)

ConsumerBuilder creates a Kafka consumer.

func ConsumerBuilderWithConfig

func ConsumerBuilderWithConfig(config *cluster.Config) ConsumerBuilder

ConsumerBuilderWithConfig creates a Kafka consumer using the Sarama library.

type EOF

type EOF struct {
	Topic     string
	Partition int32
	Hwm       int64
}

EOF marks the end of the log of a topic/partition.

type Error

type Error struct {
	Err error
}

Error from kafka wrapped to be conform with the Event-Interface

type Event

type Event interface {
	// contains filtered or unexported methods
}

Event abstracts different types of events from the kafka consumer like BOF/EOF/Error or an actual message

type Message

type Message struct {
	Topic     string
	Partition int32
	Offset    int64
	Timestamp time.Time
	Header    map[string][]byte

	Key   string
	Value []byte
}

Message represents a message from kafka containing extra information like topic, partition and offset for convenience

type NOP

type NOP struct {
	Topic     string
	Partition int32
}

NOP does not carry any information. Useful for debugging.

type Producer

type Producer interface {
	// Emit sends a message to topic.
	Emit(topic string, key string, value []byte) *Promise
	Close() error
}

Producer abstracts the kafka producer

func DefaultProducerBuilder

func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

DefaultProducerBuilder creates a Kafka producer using the Sarama library.

func NewProducer

func NewProducer(brokers []string, config *sarama.Config) (Producer, error)

NewProducer creates new kafka producer for passed brokers.

type ProducerBuilder

type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

ProducerBuilder create a Kafka producer.

func ProducerBuilderWithConfig

func ProducerBuilderWithConfig(config *cluster.Config) ProducerBuilder

ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.

type Promise

type Promise struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Promise as in https://en.wikipedia.org/wiki/Futures_and_promises

func NewPromise

func NewPromise() *Promise

NewPromise creates a new Promise

func (*Promise) Finish

func (p *Promise) Finish(err error) *Promise

Finish finishes the promise by executing all callbacks and saving the message/error for late subscribers

func (*Promise) Then

func (p *Promise) Then(s func(err error)) *Promise

Then chains a callback to the Promise

type TopicManager

type TopicManager interface {
	// EnsureTableExists checks that a table (log-compacted topic) exists, or create one if possible
	EnsureTableExists(topic string, npar int) error
	// EnsureStreamExists checks that a stream topic exists, or create one if possible
	EnsureStreamExists(topic string, npar int) error
	// EnsureTopicExists checks that a topic exists, or create one if possible,
	// enforcing the given configuration
	EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error

	// Partitions returns the number of partitions of a topic, that are assigned to the running
	// instance, i.e. it doesn't represent all partitions of a topic.
	Partitions(topic string) ([]int32, error)

	// Close closes the topic manager
	Close() error
}

TopicManager provides an interface to create/check topics and their partitions

func DefaultTopicManagerBuilder

func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error)

DefaultTopicManagerBuilder creates TopicManager using the Sarama library. This topic manager cannot create topics.

func NewSaramaTopicManager

func NewSaramaTopicManager(brokers []string, config *sarama.Config) (TopicManager, error)

NewSaramaTopicManager creates a new topic manager using the sarama library

func NewTopicManager

func NewTopicManager(servers []string, config *TopicManagerConfig) (TopicManager, error)

NewTopicManager creates a new topic manager for managing topics with zookeeper

type TopicManagerBuilder

type TopicManagerBuilder func(brokers []string) (TopicManager, error)

TopicManagerBuilder creates a TopicManager to check partition counts and create tables.

func TopicManagerBuilderWithConfig

func TopicManagerBuilderWithConfig(config *cluster.Config) TopicManagerBuilder

TopicManagerBuilderWithConfig creates TopicManager using the Sarama library. This topic manager cannot create topics.

func ZKTopicManagerBuilder

func ZKTopicManagerBuilder(servers []string) TopicManagerBuilder

ZKTopicManagerBuilder creates a TopicManager that connects with ZooKeeper to check partition counts and create tables.

func ZKTopicManagerBuilderWithConfig

func ZKTopicManagerBuilderWithConfig(servers []string, config *TopicManagerConfig) TopicManagerBuilder

ZKTopicManagerBuilderWithConfig creates a TopicManager that connects with ZooKeeper to check partition counts and create tables given a topic configuration.

type TopicManagerConfig

type TopicManagerConfig struct {
	Table struct {
		Replication int
	}
	Stream struct {
		Replication int
		Retention   time.Duration
	}
}

TopicManagerConfig contains the configuration to access the Zookeeper servers as well as the desired options of to create tables and stream topics.

func NewTopicManagerConfig

func NewTopicManagerConfig() *TopicManagerConfig

NewTopicManagerConfig provides a default configuration for auto-creation with replication factor of 1 and rentention time of 1 hour.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL