Documentation ¶
Overview ¶
Kasper is a lightweight library for processing Kafka topics. It is heavily inspired by Apache Samza (See http://samza.apache.org). Kasper processes Kafka messages in small batches and is designed to work with centralized key-value stores such as Redis, Cassandra or Elasticsearch for maintaining state during processing. Kasper is a good fit for high-throughput applications (> 10k messages per second) that can tolerate a moderate amount of processing latency (~1000ms). Please note that Kasper is designed for idempotent processing of at-least-once semantics streams. If you require exactly-once semantics or need to perform non-idempotent operations, Kasper is likely not a good choice.
Step 1 - Create a sarama Client ¶
Kasper uses Shopify's excellent sarama library (see https://github.com/Shopify/sarama) for consuming and producing messages to Kafka. All Kasper application must begin with instantiating a sarama Client. Choose the parameters in sarama.Config carefully; the performance, reliability, and correctness of your application are all highly sensitive to these settings. We recommend setting sarama.Config.Producer.RequiredAcks to WaitForAll.
saramaConfig := sarama.NewConfig() saramaConfig.Producer.RequiredAcks = sarama.WaitForAll client, err := sarama.NewClient([]string{"kafka-broker.local:9092"}, saramaConfig)
Step 2 - create a Config ¶
TopicProcessorName is used for logging, labeling metrics, and is used as a suffix to the Kafka consumer group. InputTopics and InputPartitions are the lists of topics and partitions to consume. Please note that Kasper currently does not support consuming topics with differing numbers of partitions. This limitation can be alleviated by manually adding an extra fan-out step in your processing pipeline to a new topic with the desired number of partitions.
config := &kasper.Config{ TopicProcessorName: "twitter-reach", Client: client, InputTopics: []string{"tweets", "twitter-followers"}, InputPartitions: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, BatchSize: 10000, BatchWaitDuration: 5 * time.Second, Logger: kasper.NewJSONLogger("twitter-reach-0", false), MetricsProvider: kasper.NewPrometheus("twitter-reach-0"), MetricsUpdateInterval: 60 * time.Second, }
Kasper is instrumented with a number of useful metrics so we recommend setting MetricsProvider for production applications. Kasper includes an implementation for collecting metrics in Prometheus and adapting the interface to other tools should be easy.
Step 3 - Create a MessageProcessor per input partition ¶
You need to create a map[int]MessageProcessor. The MessageProcessor instances can safely be shared across partitions. Each MessageProcessor must implement a single function:
func (*TweetProcessor) Process(messages []*sarama.ConsumerMessage, sender Sender) error { // process messages here }
All messages for the input topics on the specified partition will be passed to the appropriate MessageProcessor instance. This is useful for implementing partition-wise joins of different topics. The Sender instance must be used to produce messages to output topics. Messages passed to Sender are not sent directly but are collected in an array instead. When Process returns, the messages are sent to Kafka and Kasper waits for the configured number of acks. When all messages have been successfully produced, Kasper updates the consumer offsets of the input partitions and resumes processing. If Process returns a non-nil error value, Kasper stops all processing.
Step 4 - Create a TopicProcessor ¶
To start processing messages, call TopicProcessor.RunLoop(). Kasper does not spawn any goroutines and runs a single-threaded event loop instead. RunLoop() will block the current goroutine and will run forever until an error occurs or until Close() is called. For parallel processing, run multiple TopicProcessor instances in different goroutines or processes (the input partitions cannot overlap). You should set Config.TopicProcessorName to the same value on all instances in order to easily scale the processing up or down.
Index ¶
- type Config
- type Counter
- type Elasticsearch
- func (s *Elasticsearch) Delete(key string) error
- func (s *Elasticsearch) Flush() error
- func (s *Elasticsearch) Get(key string) ([]byte, error)
- func (s *Elasticsearch) GetAll(keys []string) (map[string][]byte, error)
- func (s *Elasticsearch) GetClient() *elastic.Client
- func (s *Elasticsearch) Put(key string, value []byte) error
- func (s *Elasticsearch) PutAll(kvs map[string][]byte) error
- type ElasticsearchTenancy
- type Gauge
- type Logger
- type Map
- func (s *Map) Delete(key string) error
- func (s *Map) Flush() error
- func (s *Map) Get(key string) ([]byte, error)
- func (s *Map) GetAll(keys []string) (map[string][]byte, error)
- func (s *Map) GetMap() map[string][]byte
- func (s *Map) Put(key string, value []byte) error
- func (s *Map) PutAll(kvs map[string][]byte) error
- type MessageProcessor
- type MetricsProvider
- type MultiElasticsearch
- type MultiMap
- type MultiRedis
- type MultiStore
- type NoopMetricsProvider
- type Prometheus
- type Redis
- type Sender
- type Store
- type Summary
- type TenantKey
- type TopicProcessor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Used for logging, metrics, and Kafka consumer group TopicProcessorName string // Used for consuming and producing messages Client sarama.Client // Input topics (all topics need to have the same number of partitions) InputTopics []string // Input partitions (cannot overlap between TopicProcessor instances) InputPartitions []int // Maximum number of messages processed in one go BatchSize int // Maximum amount of time spent waiting for a batch to be filled BatchWaitDuration time.Duration // Use NewBasicLogger() or any other Logger Logger Logger // Use NewPrometheus() or any other MetricsProvider MetricsProvider MetricsProvider // 15 seconds is a sensible value MetricsUpdateInterval time.Duration }
Config contains the configuration settings for a TopicProcessor.
type Elasticsearch ¶
type Elasticsearch struct {
// contains filtered or unexported fields
}
Elasticsearch is an implementation of Store that uses Elasticsearch. Each instance provides key-value access to a given index and a given document type. This implementation supports Elasticsearch 5.x and uses Oliver Eilhard's Go Elasticsearch client. See https://github.com/olivere/elastic
func NewElasticsearch ¶
func NewElasticsearch(config *Config, client *elastic.Client, indexName, typeName string) *Elasticsearch
NewElasticsearch creates Elasticsearch instances. All documents read and written will correspond to the URL:
https://{cluster}:9092/{indexName}/{typeName}/{key}
func (*Elasticsearch) Delete ¶
func (s *Elasticsearch) Delete(key string) error
Delete removes a document from the store. It does not return an error if the document was not present. It is implemented using the Elasticsearch Delete API. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html
func (*Elasticsearch) Flush ¶
func (s *Elasticsearch) Flush() error
Flush flushes the Elasticsearch translog to disk. It is implemented using the Elasticsearch Flush API. See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html
func (*Elasticsearch) Get ¶
func (s *Elasticsearch) Get(key string) ([]byte, error)
Get gets a document by key (i.e. the Elasticsearch _id). It is implemented by using the Elasticsearch Get API. The returned byte slice contains the UTF8-encoded JSON document (i.e., _source). This function returns (nil, nil) if the document does not exist. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html
func (*Elasticsearch) GetAll ¶
func (s *Elasticsearch) GetAll(keys []string) (map[string][]byte, error)
GetAll gets multiple document from the store. It is implemented using the Elasticsearch MultiGet API. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html
func (*Elasticsearch) GetClient ¶
func (s *Elasticsearch) GetClient() *elastic.Client
GetClient returns the underlying elastic.Client
func (*Elasticsearch) Put ¶
func (s *Elasticsearch) Put(key string, value []byte) error
Put inserts or updates a document in the store (key is used as the document _id). It is implemented using the Elasticsearch Index API. The value byte slice must contain the UTF8-encoded JSON document (i.e., _source). See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
func (*Elasticsearch) PutAll ¶
func (s *Elasticsearch) PutAll(kvs map[string][]byte) error
PutAll inserts or updates a number of documents in the store. It is implemented using the Elasticsearch Bulk and Index APIs. It returns an error if any operation fails. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
type ElasticsearchTenancy ¶
type ElasticsearchTenancy interface {
TenantIndexAndType(tenant string) (indexName, typeName string)
}
ElasticsearchTenancy defines how tenanted keys are mapped to an index and type. Here is a simple example:
type CustomerTenancy struct{} func (CustomerTenancy) TenantIndexAndType(tenant string) (indexName, typeName string) { indexName = fmt.Sprintf("sales-service~%s", tenant) typeName = "customer" return }
type Logger ¶
type Logger interface { Debug(...interface{}) Debugf(string, ...interface{}) Info(...interface{}) Infof(string, ...interface{}) Error(...interface{}) Errorf(string, ...interface{}) Panic(...interface{}) Panicf(string, ...interface{}) }
Logger is a logging interface for Kasper.
func NewBasicLogger ¶
NewBasicLogger uses the Go standard library logger. See https://golang.org/pkg/log/
func NewJSONLogger ¶
NewJSONLogger uses the logrus JSON formatter. See https://github.com/sirupsen/logrus
func NewTextLogger ¶
NewTextLogger uses the logrus text formatter. See https://github.com/sirupsen/logrus
type Map ¶
type Map struct {
// contains filtered or unexported fields
}
Map wraps a map[string][]byte value and implements the Store interface.
func (*Map) Delete ¶
Delete removes a single value by key. Does not return an error if the key is not present.
func (*Map) GetAll ¶
GetAll returns multiple values by key. The returned map does not contain entries for missing documents.
type MessageProcessor ¶
type MessageProcessor interface { // Process receives a slice of incoming Kafka messages and a Sender to send messages to output topics. // References to the byte slice or Sender interface cannot be held between calls. // If Process returns a non-nil error value, Kasper stops all processing. // This error value is then returned by TopicProcessor.RunLoop(). Process([]*sarama.ConsumerMessage, Sender) error }
MessageProcessor is the interface that encapsulates application business logic. It receives all messages of a single partition of the TopicProcessor's input topics.
type MetricsProvider ¶
type MetricsProvider interface { NewCounter(name string, help string, labelNames ...string) Counter NewGauge(name string, help string, labelNames ...string) Gauge NewSummary(name string, help string, labelNames ...string) Summary }
MetricsProvider is a facility to create metrics instances.
type MultiElasticsearch ¶
type MultiElasticsearch struct {
// contains filtered or unexported fields
}
MultiElasticsearch is an implementation of MultiStore that uses Elasticsearch. Each instance provides key-value access to a subset of the Elasticsearch documents defined by a tenancy instance (see ElasticsearchTenancy). This implementation supports Elasticsearch 5.x
func NewMultiElasticsearch ¶
func NewMultiElasticsearch(config *Config, client *elastic.Client, tenancy ElasticsearchTenancy) *MultiElasticsearch
NewMultiElasticsearch creates MultiElasticsearch instances. All documents read and written will correspond to the URL:
https://{cluster}:9092/{indexName}/{typeName}/{key}
where indexName and typeName depend on the tenant and the tenancy instance.
func (*MultiElasticsearch) AllTenants ¶
func (s *MultiElasticsearch) AllTenants() []string
AllTenants returns the list of tenants known to this instance.
func (*MultiElasticsearch) Fetch ¶
func (s *MultiElasticsearch) Fetch(keys []TenantKey) (*MultiMap, error)
Fetch performs a single MultiGet operation on the Elasticsearch cluster across multiple tenants (i.e. indexes).
func (*MultiElasticsearch) Push ¶
func (s *MultiElasticsearch) Push(m *MultiMap) error
Push performs a single Bulk index request with all documents provided. It returns an error if any operation fails.
func (*MultiElasticsearch) Tenant ¶
func (s *MultiElasticsearch) Tenant(tenant string) Store
Tenant returns an Elasticsearch Store for the given tenant. Created instances are cached on future invocations.
type MultiMap ¶
type MultiMap struct {
// contains filtered or unexported fields
}
MultiMap is a multitenanted version of Map that implements the MultiStore interface.
func NewMultiMap ¶
NewMultiMap creates new MultiMap. Each underlying Map instance is initialized to the given size.
func (*MultiMap) AllTenants ¶
AllTenants returns the list of tenants known to this instance.
type MultiRedis ¶
type MultiRedis struct {
// contains filtered or unexported fields
}
MultiRedis is an implementation of MultiStore that uses Redis. Each instance provides multitenant key-value access to keys of the form {tenant}/{keyPrefix}/{key}. This implementation uses Gary Burd's Go Redis client. See https://github.com/garyburd/redigo
func NewMultiRedis ¶
func NewMultiRedis(config *Config, conn redis.Conn, keyPrefix string) *MultiRedis
NewMultiRedis creates MultiRedis instances. All keys read and written will be of the form:
{tenant}/{keyPrefix}/{key}
func (*MultiRedis) AllTenants ¶
func (s *MultiRedis) AllTenants() []string
AllTenants returns the list of tenants known to this instance.
func (*MultiRedis) Fetch ¶
func (s *MultiRedis) Fetch(keys []TenantKey) (*MultiMap, error)
Fetch performs a single MULTI GET Redis command across multiple tenants.
func (*MultiRedis) Push ¶
func (s *MultiRedis) Push(entries *MultiMap) error
Fetch performs a single MULTI SET Redis command across multiple tenants.
func (*MultiRedis) Tenant ¶
func (s *MultiRedis) Tenant(tenant string) Store
Tenant returns an Redis Store for the given tenant. Created instances are cached on future invocations.
type MultiStore ¶
type MultiStore interface { // Tenant returns the underlying store for a tenant. Tenant(tenant string) Store // AllTenants returns the list of known tenants to this instance. AllTenants() []string // Fetch is a multitenant version of GetAll. Fetch(keys []TenantKey) (*MultiMap, error) // Push is a multitenant version of PutAll Push(store *MultiMap) error }
MultiStore is a multitenant version of Store. Tenants are represented as strings. Each tenant has an underlying Store.
type NoopMetricsProvider ¶
type NoopMetricsProvider struct{}
NoopMetricsProvider is a dummy implementation of MetricsProvider that does nothing. Useful for testing, not recommended in production.
func (*NoopMetricsProvider) NewCounter ¶
func (m *NoopMetricsProvider) NewCounter(name string, help string, labelNames ...string) Counter
NewCounter creates a new no-op Counter
func (*NoopMetricsProvider) NewGauge ¶
func (m *NoopMetricsProvider) NewGauge(name string, help string, labelNames ...string) Gauge
NewGauge creates a new no-op Gauge
func (*NoopMetricsProvider) NewSummary ¶
func (m *NoopMetricsProvider) NewSummary(name string, help string, labelNames ...string) Summary
NewSummary creates a new no-op Summary
type Prometheus ¶
type Prometheus struct { Registry *prometheus.Registry // contains filtered or unexported fields }
Prometheus is an implementation of MetricsProvider that uses Prometheus. See https://github.com/prometheus/client_golang
func NewPrometheus ¶
func NewPrometheus(label string) *Prometheus
NewPrometheus creates new Prometheus instance.
func (*Prometheus) NewCounter ¶
func (provider *Prometheus) NewCounter(name string, help string, labelNames ...string) Counter
NewCounter creates a new prometheus CounterVec
func (*Prometheus) NewGauge ¶
func (provider *Prometheus) NewGauge(name string, help string, labelNames ...string) Gauge
NewGauge creates a new prometheus GaugeVec
func (*Prometheus) NewSummary ¶
func (provider *Prometheus) NewSummary(name string, help string, labelNames ...string) Summary
NewSummary creates a new prometheus SummaryVec
type Redis ¶
type Redis struct {
// contains filtered or unexported fields
}
Redis is an implementation of Store that uses Redis. Each instance provides key-value access to keys with a specific prefix. This implementation uses Gary Burd's Go Redis client. See https://github.com/garyburd/redigo
func NewRedis ¶
NewRedis creates Redis instances. All keys read and written in Redis are of the form:
{keyPrefix}/{key}
func (*Redis) Delete ¶
Delete deletes a value by key. It is implemented using the Redis DEL command. See https://redis.io/commands/del
func (*Redis) Flush ¶
Flush executes the SAVE command. See https://redis.io/commands/save
func (*Redis) Get ¶
Get gets a value by key. Returns nil, nil if the key is missing. It is implemented using the Redis GET command. See https://redis.io/commands/get
func (*Redis) GetAll ¶
GetAll gets multiple values by key. It is implemented by using the MULTI and GET commands. See https://redis.io/commands/multi
func (*Redis) Put ¶
Puts inserts or updates a value by key. It is implemented using the Redis SET command. See https://redis.io/commands/set
type Sender ¶
type Sender interface { // Send appends a message to a slice held by the sender instance. // These messages are sent in bulk when Process() returns or when Flush() is called. Send(msg *sarama.ProducerMessage) // Flush immediately sends all messages held in the sender slice in bulk, and empties the slice. See Send() above. Flush() error }
Sender instances are given to MessageProcessor.Process to send messages to Kafka topics. Messages passed to Sender are not sent directly but are collected in an array instead. When Process returns, the messages are sent to Kafka and Kasper waits for the configured number of acks. When all messages have been successfully produced, Kasper updates the consumer offsets of the input partitions and resumes processing.
type Store ¶
type Store interface { // Get gets a value by key. Get(key string) ([]byte, error) // GetAll gets multiple values by key. GetAll(keys []string) (map[string][]byte, error) // Put insert or update a value by key. Put(key string, value []byte) error // PutAll inserts or updates multiple key-value pairs. PutAll(map[string][]byte) error // Delete deletes a key from the store. Delete(key string) error // Flush indicates that the underlying storage must be made persistent. Flush() error }
Store is a universal interface for a key-value store. Keys are strings, and values are byte slices.
type TopicProcessor ¶
type TopicProcessor struct {
// contains filtered or unexported fields
}
TopicProcessor is the main entity in Kasper. It implements a single-threaded processing loop for a set of topics and partitions.
func NewTopicProcessor ¶
func NewTopicProcessor(config *Config, messageProcessors map[int]MessageProcessor) *TopicProcessor
NewTopicProcessor creates a new instance of TopicProcessor. For parallel processing, run multiple TopicProcessor instances in different goroutines or processes (the input partitions cannot overlap). You should set Config.TopicProcessorName to the same value on all instances in order to easily scale the processing up or down.
func (*TopicProcessor) Close ¶
func (tp *TopicProcessor) Close()
Close safely shuts down the TopicProcessor, which makes RunLoop() return.
func (*TopicProcessor) HasConsumedAllMessages ¶
func (tp *TopicProcessor) HasConsumedAllMessages() bool
HasConsumedAllMessages returns true when all input topics have been entirely consumed. Kasper checks all high water marks and offsets for all topics before returning.
func (*TopicProcessor) RunLoop ¶
func (tp *TopicProcessor) RunLoop() error
RunLoop is the main processing loop of Kasper. It does not spawn any goroutines and runs a single-threaded event loop instead. RunLoop will block the current goroutine and will run forever until an error occurs or until Close() is called. RunLoop propagates the error returned by MessageProcessor.Process if not nil.