go_kafka_client

package module
v0.0.0-...-28fba2b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2015 License: Apache-2.0 Imports: 39 Imported by: 0

README

Go Kafka Client

The Apache Kafka Client Library for Go is sponsored by [CrowdStrike] (http://www.crowdstrike.com/) and developed by [Big Data Open Source Security LLC] (http://stealth.ly)

Build Status

Ideas and goals behind the Go Kafka Client:

1) Partition Ownership

We decided on implementing multiple strategies for this including static assignment. The concept of re-balancing is preserved but now there are a few different strategies to re-balancing and they can run at different times depending on what is going on (like a blue/green deploy is happening). For more on blue/green deployments check out this video.

2) Fetch Management

This is what “fills up the reservoir” as I like to call it so the processing (either sequential or in batch) will always have data if there is data for it to have without making a network hop. The fetcher has to stay ahead here keeping the processing tap full (or if empty that is controlled) pulling the data for the Kafka partition(s) it is owning.

3) Work Management

For the Go consumer we currently only support “fan out” using go routines and channels. If you have ever used go this will be familiar to you if not you should drop everything and learn Go.

4) Offset Management

Our offset management is based on a per batch basis with each highest offset from the batch committed on a per partition basis.

Prerequisites:

  1. Install Golang http://golang.org/doc/install
  2. Make sure env variables GOPATH and GOROOT exist and point to correct places
  3. Install GPM https://github.com/pote/gpm
  4. mkdir -p $GOPATH/src/github.com/stealthly && cd $GOPATH/src/github.com/stealthly
  5. git clone https://github.com/stealthly/go_kafka_client.git && cd go_kafka_client
  6. gpm install

Optional (for all tests to work):

  1. Install Docker https://docs.docker.com/installation/#installation
  2. cd $GOPATH/src/github.com/stealthly/go_kafka_client
  3. Build docker image: docker build -t stealthly/go_kafka_client .
  4. docker run -v $(pwd):/go_kafka_client stealthly/go_kafka_client

After this is done you're ready to write some code!

For email support https://groups.google.com/forum/#!forum/kafka-clients

Related docs:

  1. Offset Storage configuration.
  2. Log and metrics emitters.

Documentation

Overview

Package go_kafka_client provides a high-level Kafka consumer implementation and introduces different approach than Java/Scala high-level consumer.

Primary differences include workers concept enforcing at least once processing before committing offsets, improved rebalancing algorithm - closing obsolete connections and opening new connections without stopping the whole consumer, graceful shutdown support notifying client when it is over, batch processing, static partitions configuration support allowing to start a consumer with a predefined set of partitions never caring about rebalancing.

Index

Constants

View Source
const (
	// Offset with invalid value
	InvalidOffset int64 = -1

	// Reset the offset to the smallest offset if it is out of range
	SmallestOffset = "smallest"
	// Reset the offset to the largest offset if it is out of range
	LargestOffset = "largest"
)
View Source
const (
	/* Logtypeid field value for LogLine indicating Trace log level */
	TraceLogTypeId int64 = 1

	/* Logtypeid field value for LogLine indicating Debug log level */
	DebugLogTypeId int64 = 2

	/* Logtypeid field value for LogLine indicating Info log level */
	InfoLogTypeId int64 = 3

	/* Logtypeid field value for LogLine indicating Warn log level */
	WarnLogTypeId int64 = 4

	/* Logtypeid field value for LogLine indicating Error log level */
	ErrorLogTypeId int64 = 5

	/* Logtypeid field value for LogLine indicating Critical log level */
	CriticalLogTypeId int64 = 6

	/* Logtypeid field value for LogLine indicating Metrics data */
	MetricsLogTypeId int64 = 7
)
View Source
const (
	/* Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
	and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of
	consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly
	divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1
	and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread
	will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be:
	p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1 */
	RangeStrategy = "range"

	/* The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It
	then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer
	instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
	will be within a delta of exactly one across all consumer threads.)

	(For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance
	and thread-id within that instance. Therefore, round-robin assignment is allowed only if:
	a) Every topic has the same number of streams within a consumer instance
	b) The set of subscribed topics is identical for every consumer instance within the group. */
	RoundRobinStrategy = "roundrobin"
)
View Source
const (
	GET_SCHEMA_BY_ID             = "/schemas/ids/%d"
	GET_SUBJECTS                 = "/subjects"
	GET_SUBJECT_VERSIONS         = "/subjects/%s/versions"
	GET_SPECIFIC_SUBJECT_VERSION = "/subjects/%s/versions/%s"
	REGISTER_NEW_SCHEMA          = "/subjects/%s/versions"
	CHECK_IS_REGISTERED          = "/subjects/%s"
	TEST_COMPATIBILITY           = "/compatibility/subjects/%s/versions/%s"
	CONFIG                       = "/config"
)
View Source
const (
	SCHEMA_REGISTRY_V1_JSON               = "application/vnd.schemaregistry.v1+json"
	SCHEMA_REGISTRY_V1_JSON_WEIGHTED      = "application/vnd.schemaregistry.v1+json"
	SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT = "application/vnd.schemaregistry.v1+json"
	SCHEMA_REGISTRY_DEFAULT_JSON          = "application/vnd.schemaregistry+json"
	SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED = "application/vnd.schemaregistry+json qs=0.9"
	JSON                                  = "application/json"
	JSON_WEIGHTED                         = "application/json qs=0.5"
	GENERIC_REQUEST                       = "application/octet-stream"
)

Variables

Functions

func BootstrapBrokers

func BootstrapBrokers(coordinator ConsumerCoordinator) ([]string, error)

BootstrapBrokers queries the ConsumerCoordinator for all known brokers in the cluster to be used later as a bootstrap list for the LowLevelClient.

func CreateMultiplePartitionsTopic

func CreateMultiplePartitionsTopic(zk string, topicName string, numPartitions int)

Convenience utility to create a topic topicName with numPartitions partitions in Zookeeper located at zk (format should be host:port). Please note that this requires Apache Kafka 0.8.1 binary distribution available through KAFKA_PATH environment variable

func Critical

func Critical(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Critical.

func Criticalf

func Criticalf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Critical.

func Debug

func Debug(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Debug.

func Debugf

func Debugf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Debug.

func EnsureHasLeader

func EnsureHasLeader(zkConnect string, topic string)

blocks until the leader for every partition of a given topic appears this is used by tests only to avoid "In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes"

func Error

func Error(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Error.

func Errorf

func Errorf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Error.

func Info

func Info(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Info.

func Infof

func Infof(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Info.

func LoadConfiguration

func LoadConfiguration(Path string) (map[string]string, error)

Loads a property file located at Path. Returns a map[string]string or error.

func Trace

func Trace(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Trace.

func Tracef

func Tracef(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Trace.

func Warn

func Warn(tag interface{}, message interface{})

Writes a given message with a given tag to log with level Warn.

func Warnf

func Warnf(tag interface{}, message interface{}, params ...interface{})

Formats a given message according to given params with a given tag to log with level Warn.

Types

type BlackList

type BlackList struct {
	// contains filtered or unexported fields
}

BlackList is a topic filter that will match every topic that does not match a given regex

func NewBlackList

func NewBlackList(regex string) *BlackList

Creates a new BlackList topic filter for a given regex

func (*BlackList) Regex

func (bl *BlackList) Regex() string

func (*BlackList) TopicAllowed

func (bl *BlackList) TopicAllowed(topic string, excludeInternalTopics bool) bool

type BlueGreenDeployment

type BlueGreenDeployment struct {
	// Comma separated list of topics to consume from
	Topics string
	// Either black_list, white_list or static
	Pattern string
	//Consumer group to switch to
	Group string
}

DeployedTopics contain information needed to do a successful blue-green deployment. It contains a comma-separated list of new topics to consume if the pattern is static and regex for wildcard consuming.

type BrokerInfo

type BrokerInfo struct {
	Version int16
	Id      int32
	Host    string
	Port    uint32
}

General information about Kafka broker. Used to keep it in consumer coordinator.

func (*BrokerInfo) String

func (b *BrokerInfo) String() string

type ByteDecoder

type ByteDecoder struct{}

func (*ByteDecoder) Decode

func (this *ByteDecoder) Decode(bytes []byte) (interface{}, error)

type ByteEncoder

type ByteEncoder struct{}

func (*ByteEncoder) Encode

func (this *ByteEncoder) Encode(what interface{}) ([]byte, error)

type CachedSchemaRegistryClient

type CachedSchemaRegistryClient struct {
	// contains filtered or unexported fields
}

func NewCachedSchemaRegistryClient

func NewCachedSchemaRegistryClient(registryURL string) *CachedSchemaRegistryClient

func (*CachedSchemaRegistryClient) GetByID

func (this *CachedSchemaRegistryClient) GetByID(id int32) (avro.Schema, error)

func (*CachedSchemaRegistryClient) GetLatestSchemaMetadata

func (this *CachedSchemaRegistryClient) GetLatestSchemaMetadata(subject string) (*SchemaMetadata, error)

func (*CachedSchemaRegistryClient) GetVersion

func (this *CachedSchemaRegistryClient) GetVersion(subject string, schema avro.Schema) (int32, error)

func (*CachedSchemaRegistryClient) Register

func (this *CachedSchemaRegistryClient) Register(subject string, schema avro.Schema) (int32, error)

type CodahaleKafkaReporter

type CodahaleKafkaReporter struct {
	// contains filtered or unexported fields
}

func NewCodahaleKafkaReporter

func NewCodahaleKafkaReporter(topic string, schemaRegistryUrl string, producerConfig *ProducerConfig) *CodahaleKafkaReporter

func (*CodahaleKafkaReporter) Write

func (c *CodahaleKafkaReporter) Write(bytes []byte) (n int, err error)

type CompatibilityLevel

type CompatibilityLevel string
const (
	BackwardCompatibilityLevel CompatibilityLevel = "BACKWARD"
	ForwardCompatibilityLevel  CompatibilityLevel = "FORWARD"
	FullCompatibilityLevel     CompatibilityLevel = "FULL"
	NoneCompatibilityLevel     CompatibilityLevel = "NONE"
)

type ConstantPartitioner

type ConstantPartitioner struct {
	Constant int32
}

ConstantPartitioner implements the Partitioner interface by just returning a constant value.

func (*ConstantPartitioner) Partition

func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) (int32, error)

func (*ConstantPartitioner) RequiresConsistency

func (p *ConstantPartitioner) RequiresConsistency() bool

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a high-level Kafka consumer designed to work within a consumer group. It subscribes to coordinator events and is able to balance load within a consumer group.

func NewConsumer

func NewConsumer(config *ConsumerConfig) *Consumer

NewConsumer creates a new Consumer with a given configuration. Creating a Consumer does not start fetching immediately.

func (*Consumer) Close

func (c *Consumer) Close() <-chan bool

Tells the Consumer to close all existing connections and stop. This method is NOT blocking but returns a channel which will get a single value once the closing is finished.

func (*Consumer) Metrics

func (c *Consumer) Metrics() *ConsumerMetrics

func (*Consumer) StartStatic

func (c *Consumer) StartStatic(topicCountMap map[string]int)

Starts consuming specified topics using a configured amount of goroutines for each topic.

func (*Consumer) StartStaticPartitions

func (c *Consumer) StartStaticPartitions(topicPartitionMap map[string][]int32)

Starts consuming given topic-partitions using ConsumerConfig.NumConsumerFetchers goroutines for each topic.

func (*Consumer) StartWildcard

func (c *Consumer) StartWildcard(topicFilter TopicFilter, numStreams int)

Starts consuming all topics which correspond to a given topicFilter using numStreams goroutines for each topic.

func (*Consumer) StateSnapshot

func (c *Consumer) StateSnapshot() *StateSnapshot

Returns a state snapshot for this consumer. State snapshot contains a set of metrics splitted by topics and partitions.

func (*Consumer) String

func (c *Consumer) String() string

type ConsumerConfig

type ConsumerConfig struct {
	/* A string that uniquely identifies a set of consumers within the same consumer group */
	Groupid string

	/* A string that uniquely identifies a consumer within a group. Generated automatically if not set.
	Set this explicitly for only testing purpose. */
	Consumerid string

	/* The socket timeout for network requests. Its value should be at least FetchWaitMaxMs. */
	SocketTimeout time.Duration

	/* The maximum number of bytes to attempt to fetch */
	FetchMessageMaxBytes int32

	/* The number of goroutines used to fetch data */
	NumConsumerFetchers int

	/* Max number of message batches buffered for consumption, each batch can be up to FetchBatchSize */
	QueuedMaxMessages int32

	/* Max number of retries during rebalance */
	RebalanceMaxRetries int32

	/* The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
	FetchMinBytes int32

	/* The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy FetchMinBytes */
	FetchWaitMaxMs int32

	/* Backoff time between retries during rebalance */
	RebalanceBackoff time.Duration

	/* Backoff time to refresh the leader of a partition after it loses the current leader */
	RefreshLeaderBackoff time.Duration

	/* Retry the offset commit up to this many times on failure. */
	OffsetsCommitMaxRetries int

	/* Try to commit offset every OffsetCommitInterval. If previous offset commit for a partition is still in progress updates the next offset to commit and continues.
	This way it does not commit all the offset history if the coordinator is slow, but only the highest offsets. */
	OffsetCommitInterval time.Duration

	/* What to do if an offset is out of range.
	SmallestOffset : automatically reset the offset to the smallest offset.
	LargestOffset : automatically reset the offset to the largest offset.
	Defaults to LargestOffset. */
	AutoOffsetReset string

	/* Client id is specified by the kafka consumer client, used to distinguish different clients. */
	Clientid string

	/* Whether messages from internal topics (such as offsets) should be exposed to the consumer. */
	ExcludeInternalTopics bool

	/* Select a strategy for assigning partitions to consumer streams. Possible values: RangeStrategy, RoundRobinStrategy */
	PartitionAssignmentStrategy string

	/* Amount of workers per partition to process consumed messages. */
	NumWorkers int

	/* Times to retry processing a failed message by a worker. */
	MaxWorkerRetries int

	/* Worker retry threshold. Increments each time a worker fails to process a message within MaxWorkerRetries.
	When this threshold is hit within a WorkerThresholdTimeWindow, WorkerFailureCallback is called letting the user to decide whether the consumer should stop. */
	WorkerRetryThreshold int32

	/* Resets WorkerRetryThreshold if it isn't hit within this period. */
	WorkerThresholdTimeWindow time.Duration

	/* Callback executed when WorkerRetryThreshold exceeded within WorkerThresholdTimeWindow */
	WorkerFailureCallback FailedCallback

	/* Callback executed when Worker failed to process the message after MaxWorkerRetries and WorkerRetryThreshold is not hit */
	WorkerFailedAttemptCallback FailedAttemptCallback

	/* Worker timeout to process a single message. */
	WorkerTaskTimeout time.Duration

	/* Backoff between worker attempts to process a single message. */
	WorkerBackoff time.Duration

	/* Maximum wait time to gracefully stop a worker manager */
	WorkerManagersStopTimeout time.Duration

	/* A function which defines a user-specified action on a single message. This function is responsible for actual message processing.
	Consumer panics if Strategy is not set. */
	Strategy WorkerStrategy

	/* Number of messages to accumulate before flushing them to workers */
	FetchBatchSize int

	/* Timeout to accumulate messages. Flushes accumulated batch to workers even if it is not yet full.
	Resets after each flush meaning this won't be triggered if FetchBatchSize is reached before timeout. */
	FetchBatchTimeout time.Duration

	/* Backoff between fetch requests if no messages were fetched from a previous fetch. */
	RequeueAskNextBackoff time.Duration

	/* Buffer size for ask next channel. This value shouldn't be less than number of partitions per fetch routine. */
	AskNextChannelSize int

	/* Maximum fetch retries if no messages were fetched from a previous fetch */
	FetchMaxRetries int

	/* Maximum retries to fetch topic metadata from one broker. */
	FetchTopicMetadataRetries int

	/* Backoff for fetch topic metadata request if the previous request failed. */
	FetchTopicMetadataBackoff time.Duration

	/* Backoff between two fetch requests for one fetch routine. Needed to prevent fetcher from querying the broker too frequently. */
	FetchRequestBackoff time.Duration

	/* Coordinator used to coordinate consumer's actions, e.g. trigger rebalance events, store offsets and consumer metadata etc. */
	Coordinator ConsumerCoordinator

	/* OffsetStorage is used to store and retrieve consumer offsets. */
	OffsetStorage OffsetStorage

	/* Indicates whether the client supports blue-green deployment.
	This config entry is needed because blue-green deployment won't work with RoundRobin partition assignment strategy.
	Defaults to true. */
	BlueGreenDeploymentEnabled bool

	/* Time to wait after consumer has registered itself in group */
	DeploymentTimeout time.Duration

	/* Service coordinator barrier timeout */
	BarrierTimeout time.Duration

	/* Low Level Kafka Client implementation. */
	LowLevelClient LowLevelClient

	/* Message keys decoder */
	KeyDecoder Decoder

	/* Message values decoder */
	ValueDecoder Decoder

	/* Flag for debug mode */
	Debug bool

	/* Metrics Prefix if the client wants to organize the way metric names are emitted. (optional) */
	MetricsPrefix string
}

ConsumerConfig defines configuration options for Consumer

func ConsumerConfigFromFile

func ConsumerConfigFromFile(filename string) (*ConsumerConfig, error)

ConsumerConfigFromFile is a helper function that loads a consumer's configuration from file. The file accepts the following fields:

group.id
consumer.id
fetch.message.max.bytes
num.consumer.fetchers
rebalance.max.retries
queued.max.message.chunks
fetch.min.bytes
fetch.wait.max.ms
rebalance.backoff
refresh.leader.backoff
offset.commit.max.retries
offset.commit.interval
offsets.storage
auto.offset.reset
exclude.internal.topics
partition.assignment.strategy
num.workers
max.worker.retries
worker.retry.threshold
worker.threshold.time.window
worker.task.timeout
worker.backoff
worker.managers.stop.timeout
fetch.batch.size
fetch.batch.timeout
requeue.ask.next.backoff
fetch.max.retries
fetch.topic.metadata.retries
fetch.topic.metadata.backoff
fetch.request.backoff
blue.green.deployment.enabled

The configuration file entries should be constructed in key=value syntax. A # symbol at the beginning of a line indicates a comment. Blank lines are ignored. The file should end with a newline character.

func DefaultConsumerConfig

func DefaultConsumerConfig() *ConsumerConfig

DefaultConsumerConfig creates a ConsumerConfig with sane defaults. Note that several required config entries (like Strategy and callbacks) are still not set.

func (*ConsumerConfig) String

func (c *ConsumerConfig) String() string

func (*ConsumerConfig) Validate

func (c *ConsumerConfig) Validate() error

Validate this ConsumerConfig. Returns a corresponding error if the ConsumerConfig is invalid and nil otherwise.

type ConsumerCoordinator

type ConsumerCoordinator interface {
	/* Establish connection to this ConsumerCoordinator. Returns an error if fails to connect, nil otherwise. */
	Connect() error

	/* Close connection to this ConsumerCoordinator. */
	Disconnect()

	/* Registers a new consumer with Consumerid id and TopicCount subscription that is a part of consumer group Group in this ConsumerCoordinator. Returns an error if registration failed, nil otherwise. */
	RegisterConsumer(Consumerid string, Group string, TopicCount TopicsToNumStreams) error

	/* Deregisters consumer with Consumerid id that is a part of consumer group Group form this ConsumerCoordinator. Returns an error if deregistration failed, nil otherwise. */
	DeregisterConsumer(Consumerid string, Group string) error

	/* Gets the information about consumer with Consumerid id that is a part of consumer group Group from this ConsumerCoordinator.
	Returns ConsumerInfo on success and error otherwise (For example if consumer with given Consumerid does not exist). */
	GetConsumerInfo(Consumerid string, Group string) (*ConsumerInfo, error)

	/* Gets the information about consumers per topic in consumer group Group excluding internal topics (such as offsets) if ExcludeInternalTopics = true.
	Returns a map where keys are topic names and values are slices of consumer ids and fetcher ids associated with this topic and error on failure. */
	GetConsumersPerTopic(Group string, ExcludeInternalTopics bool) (map[string][]ConsumerThreadId, error)

	/* Gets the list of all consumer ids within a consumer group Group. Returns a slice containing all consumer ids in group and error on failure. */
	GetConsumersInGroup(Group string) ([]string, error)

	/* Gets the list of all topics registered in this ConsumerCoordinator. Returns a slice conaining topic names and error on failure. */
	GetAllTopics() ([]string, error)

	/* Gets the information about existing partitions for a given Topics.
	Returns a map where keys are topic names and values are slices of partition ids associated with this topic and error on failure. */
	GetPartitionsForTopics(Topics []string) (map[string][]int32, error)

	/* Gets the information about all Kafka brokers registered in this ConsumerCoordinator.
	Returns a slice of BrokerInfo and error on failure. */
	GetAllBrokers() ([]*BrokerInfo, error)

	/* Subscribes for any change that should trigger consumer rebalance on consumer group Group in this ConsumerCoordinator or trigger topic switch.
	Returns a read-only channel of CoordinatorEvent that will get values on any significant coordinator event (e.g. new consumer appeared, new broker appeared etc.) and error if failed to subscribe. */
	SubscribeForChanges(Group string) (<-chan CoordinatorEvent, error)

	/* Requests that a blue/green deployment be done.*/
	RequestBlueGreenDeployment(blue BlueGreenDeployment, green BlueGreenDeployment) error

	/* Gets all deployed topics for consume group Group from consumer coordinator.
	Returns a map where keys are notification ids and values are DeployedTopics. May also return an error (e.g. if failed to reach coordinator). */
	GetBlueGreenRequest(Group string) (map[string]*BlueGreenDeployment, error)

	/* Implements classic barrier synchronization primitive via service coordinator facilities */
	AwaitOnStateBarrier(consumerId string, group string, stateHash string, barrierSize int, api string, timeout time.Duration) bool

	/* Removes state barrier */
	RemoveStateBarrier(group string, stateHash string, api string) error

	/* Tells the ConsumerCoordinator to unsubscribe from events for the consumer it is associated with. */
	Unsubscribe()

	/* Tells the ConsumerCoordinator to claim partition topic Topic and partition Partition for ConsumerThreadId fetcher that works within a consumer group Group.
	Returns true if claim is successful, false and error explaining failure otherwise. */
	ClaimPartitionOwnership(Group string, Topic string, Partition int32, ConsumerThreadId ConsumerThreadId) (bool, error)

	/* Tells the ConsumerCoordinator to release partition ownership on topic Topic and partition Partition for consumer group Group.
	Returns error if failed to released partition ownership. */
	ReleasePartitionOwnership(Group string, Topic string, Partition int32) error

	/* Removes old api objects */
	RemoveOldApiRequests(group string) error
}

ConsumerCoordinator is used to coordinate actions of multiple consumers within the same consumer group. It is responsible for keeping track of alive consumers, manages their offsets and assigns partitions to consume. The current default ConsumerCoordinator is ZookeeperCoordinator. More of them can be added in future.

type ConsumerGroupApi

type ConsumerGroupApi string
const (
	BlueGreenDeploymentAPI ConsumerGroupApi = "blue_green"
	Rebalance              ConsumerGroupApi = "rebalance"
)

type ConsumerInfo

type ConsumerInfo struct {
	Version      int16          `json:"version"`
	Subscription map[string]int `json:"subscription"`
	Pattern      string         `json:"pattern"`
	Timestamp    int64          `json:"timestamp,string"`
}

General information about Kafka consumer. Used to keep it in consumer coordinator.

func (*ConsumerInfo) String

func (c *ConsumerInfo) String() string

type ConsumerMetrics

type ConsumerMetrics struct {
	// contains filtered or unexported fields
}

func (*ConsumerMetrics) Stats

func (this *ConsumerMetrics) Stats() map[string]map[string]float64

func (*ConsumerMetrics) WriteJSON

func (this *ConsumerMetrics) WriteJSON(reportingInterval time.Duration, writer io.Writer)

type ConsumerThreadId

type ConsumerThreadId struct {
	Consumer string
	ThreadId int
}

Consumer routine id. Used to keep track of what consumer routine consumes a particular topic-partition in consumer coordinator.

func (*ConsumerThreadId) String

func (c *ConsumerThreadId) String() string

type CoordinatorEvent

type CoordinatorEvent string

CoordinatorEvent is sent by consumer coordinator representing some state change.

const (
	// A regular coordinator event that should normally trigger consumer rebalance.
	Regular CoordinatorEvent = "Regular"

	// Coordinator event that should trigger consumer re-registrer
	Reinitialize CoordinatorEvent = "Reinitialize"

	// A coordinator event that informs a consumer group of new deployed topics.
	BlueGreenRequest CoordinatorEvent = "BlueGreenRequest"
)

type Decoder

type Decoder interface {
	Decode([]byte) (interface{}, error)
}

type DefaultLogger

type DefaultLogger struct {
	// contains filtered or unexported fields
}

Default implementation of KafkaLogger interface used in this client.

func NewDefaultLogger

func NewDefaultLogger(Level LogLevel) *DefaultLogger

Creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.

func (*DefaultLogger) Critical

func (dl *DefaultLogger) Critical(message string, params ...interface{})

Formats a given message according to given params to log with level Critical.

func (*DefaultLogger) Debug

func (dl *DefaultLogger) Debug(message string, params ...interface{})

Formats a given message according to given params to log with level Debug.

func (*DefaultLogger) Error

func (dl *DefaultLogger) Error(message string, params ...interface{})

Formats a given message according to given params to log with level Error.

func (*DefaultLogger) GetLogLevel

func (dl *DefaultLogger) GetLogLevel() LogLevel

func (*DefaultLogger) Info

func (dl *DefaultLogger) Info(message string, params ...interface{})

Formats a given message according to given params to log with level Info.

func (*DefaultLogger) IsAllowed

func (dl *DefaultLogger) IsAllowed(loglevel LogLevel) bool

Formats a given message according to given params to log with level Critical.

func (*DefaultLogger) Trace

func (dl *DefaultLogger) Trace(message string, params ...interface{})

Formats a given message according to given params to log with level Trace.

func (*DefaultLogger) Warn

func (dl *DefaultLogger) Warn(message string, params ...interface{})

Formats a given message according to given params to log with level Warn.

type EmptyEmitter

type EmptyEmitter string

EmptyEmitter implements emitter and ignores all incoming messages. Used not to break anyone.

func NewEmptyEmitter

func NewEmptyEmitter() *EmptyEmitter

NewEmptyEmitter creates a new EmptyEmitter.

func (*EmptyEmitter) Close

func (e *EmptyEmitter) Close()

Does nothing.

func (*EmptyEmitter) Emit

func (e *EmptyEmitter) Emit(*avro.LogLine)

Does nothing. Ignores given message.

type Encoder

type Encoder interface {
	Encode(interface{}) ([]byte, error)
}

type ErrorMessage

type ErrorMessage struct {
	Error_code int32
	Message    string
}

func (*ErrorMessage) Error

func (this *ErrorMessage) Error() string

type FailedAttemptCallback

type FailedAttemptCallback func(*Task, WorkerResult) FailedDecision

A callback that is triggered when a worker fails to process a single message.

type FailedCallback

type FailedCallback func(*WorkerManager) FailedDecision

A callback that is triggered when a worker fails to process ConsumerConfig.WorkerRetryThreshold messages within ConsumerConfig.WorkerThresholdTimeWindow

type FailedDecision

type FailedDecision int32

Defines what to do when worker fails to process a message.

const (
	// Tells the worker manager to ignore the failure and continue normally.
	CommitOffsetAndContinue FailedDecision = iota

	// Tells the worker manager to continue processing new messages but not to commit offset that failed.
	DoNotCommitOffsetAndContinue

	// Tells the worker manager to commit offset and stop processing the current batch.
	CommitOffsetAndStop

	// Tells the worker manager not to commit offset and stop processing the current batch.
	DoNotCommitOffsetAndStop
)

type FailedMessage

type FailedMessage struct {
	// contains filtered or unexported fields
}

type FailureCounter

type FailureCounter struct {
	// contains filtered or unexported fields
}

A counter used to track whether we reached the configurable threshold of failed messages within a given time window.

func NewFailureCounter

func NewFailureCounter(FailedThreshold int32, WorkerThresholdTimeWindow time.Duration) *FailureCounter

Creates a new FailureCounter with threshold FailedThreshold and time window WorkerThresholdTimeWindow.

func (*FailureCounter) Close

func (f *FailureCounter) Close()

Stops this failure counter

func (*FailureCounter) Failed

func (f *FailureCounter) Failed() bool

Tells this FailureCounter to increment a number of failures by one. Returns true if threshold is reached, false otherwise.

type FixedPartitioner

type FixedPartitioner struct{}

Partitioner sends messages to partitions that correspond message keys

func (*FixedPartitioner) Partition

func (this *FixedPartitioner) Partition(key []byte, numPartitions int32) (int32, error)

func (*FixedPartitioner) RequiresConsistency

func (this *FixedPartitioner) RequiresConsistency() bool

type GetSchemaResponse

type GetSchemaResponse struct {
	Schema string
}

type GetSubjectVersionResponse

type GetSubjectVersionResponse struct {
	Subject string
	Version int32
	Id      int32
	Schema  string
}

type GroupWatch

type GroupWatch struct {
	// contains filtered or unexported fields
}

type HashPartitioner

type HashPartitioner struct {
	// contains filtered or unexported fields
}

HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.

func (*HashPartitioner) Partition

func (this *HashPartitioner) Partition(key []byte, numPartitions int32) (int32, error)

func (*HashPartitioner) RequiresConsistency

func (this *HashPartitioner) RequiresConsistency() bool

type Int32Decoder

type Int32Decoder struct{}

func (*Int32Decoder) Decode

func (this *Int32Decoder) Decode(bytes []byte) (interface{}, error)

type Int32Encoder

type Int32Encoder struct{}

func (*Int32Encoder) Encode

func (this *Int32Encoder) Encode(what interface{}) ([]byte, error)

type KafkaAvroDecoder

type KafkaAvroDecoder struct {
	// contains filtered or unexported fields
}

func NewKafkaAvroDecoder

func NewKafkaAvroDecoder(url string) *KafkaAvroDecoder

func (*KafkaAvroDecoder) Decode

func (this *KafkaAvroDecoder) Decode(bytes []byte) (interface{}, error)

func (*KafkaAvroDecoder) DecodeSpecific

func (this *KafkaAvroDecoder) DecodeSpecific(bytes []byte, value interface{}) error

type KafkaAvroEncoder

type KafkaAvroEncoder struct {
	// contains filtered or unexported fields
}

func NewKafkaAvroEncoder

func NewKafkaAvroEncoder(url string) *KafkaAvroEncoder

func (*KafkaAvroEncoder) Encode

func (this *KafkaAvroEncoder) Encode(obj interface{}) ([]byte, error)

type KafkaLogEmitter

type KafkaLogEmitter struct {
	// contains filtered or unexported fields
}

KafkaLogEmitter implements LogEmitter and KafkaLogger and sends all structured log data to a Kafka topic encoded as Avro.

func NewKafkaLogEmitter

func NewKafkaLogEmitter(config *KafkaLogEmitterConfig) *KafkaLogEmitter

NewKafkaLogEmitter creates a new KafkaLogEmitter with a provided configuration.

func (*KafkaLogEmitter) Close

func (k *KafkaLogEmitter) Close()

Close closes the underlying producer. The KafkaLogEmitter won't be usable anymore after call to this.

func (*KafkaLogEmitter) Critical

func (k *KafkaLogEmitter) Critical(message string, params ...interface{})

Critical formats a given message according to given params to log with level Critical and produces to a given Kafka topic.

func (*KafkaLogEmitter) Debug

func (k *KafkaLogEmitter) Debug(message string, params ...interface{})

Debug formats a given message according to given params to log with level Debug and produces to a given Kafka topic.

func (*KafkaLogEmitter) Emit

func (k *KafkaLogEmitter) Emit(logLine *avro.LogLine)

Emit emits a single entry to a given destination topic.

func (*KafkaLogEmitter) Error

func (k *KafkaLogEmitter) Error(message string, params ...interface{})

Error formats a given message according to given params to log with level Error and produces to a given Kafka topic.

func (*KafkaLogEmitter) Info

func (k *KafkaLogEmitter) Info(message string, params ...interface{})

Info formats a given message according to given params to log with level Info and produces to a given Kafka topic.

func (*KafkaLogEmitter) Trace

func (k *KafkaLogEmitter) Trace(message string, params ...interface{})

Trace formats a given message according to given params to log with level Trace and produces to a given Kafka topic.

func (*KafkaLogEmitter) Warn

func (k *KafkaLogEmitter) Warn(message string, params ...interface{})

Warn formats a given message according to given params to log with level Warn and produces to a given Kafka topic.

type KafkaLogEmitterConfig

type KafkaLogEmitterConfig struct {
	// LogLevel to use for KafkaLogEmitter
	LogLevel LogLevel

	// LogLine event source. Used only when called as a Logger interface.
	Source string

	// LogLine tags. Used only when called as a logger interface.
	Tags map[string]string

	// Topic to emit logs to.
	Topic string

	// Confluent Avro schema registry URL.
	SchemaRegistryUrl string

	// Producer config that will be used by this emitter. Note that ValueEncoder WILL BE replaced by KafkaAvroEncoder.
	ProducerConfig *ProducerConfig
}

KafkaLogEmitterConfig provides multiple configuration entries for KafkaLogEmitter.

func NewKafkaLogEmitterConfig

func NewKafkaLogEmitterConfig() *KafkaLogEmitterConfig

NewKafkaLogEmitterConfig creates a new KafkaLogEmitterConfig with log level set to Info.

type KafkaLogger

type KafkaLogger interface {
	//Formats a given message according to given params to log with level Trace.
	Trace(message string, params ...interface{})

	//Formats a given message according to given params to log with level Debug.
	Debug(message string, params ...interface{})

	//Formats a given message according to given params to log with level Info.
	Info(message string, params ...interface{})

	//Formats a given message according to given params to log with level Warn.
	Warn(message string, params ...interface{})

	//Formats a given message according to given params to log with level Error.
	Error(message string, params ...interface{})

	//Formats a given message according to given params to log with level Critical.
	Critical(message string, params ...interface{})

	GetLogLevel() LogLevel

	IsAllowed(logLevel LogLevel) bool
}

Logger interface. Lets you plug-in your custom logging library instead of using built-in one.

Logger used by this client. Defaults to build-in logger with Info log level.

type LogEmitter

type LogEmitter interface {
	// Emit sends a single LogLine to some destination.
	Emit(*avro.LogLine)

	// Close closes the given emitter.
	Close()
}

LogEmitter is an interface that handles structured log data.

var EmitterLogs LogEmitter = NewEmptyEmitter()

LogEmitter used by this client. Defaults to empty emitter that ignores all incoming data.

type LogLevel

type LogLevel string

Represents a logging level

const (
	//Use TraceLevel for debugging to find problems in functions, variables etc.
	TraceLevel LogLevel = "trace"

	//Use DebugLevel for detailed system reports and diagnostic messages.
	DebugLevel LogLevel = "debug"

	//Use InfoLevel for general information about a running application.
	InfoLevel LogLevel = "info"

	//Use WarnLevel to indicate small errors and failures that should not happen normally but are recovered automatically.
	WarnLevel LogLevel = "warn"

	//Use ErrorLevel to indicate severe errors that affect application workflow and are not handled automatically.
	ErrorLevel LogLevel = "error"

	//Use CriticalLevel to indicate fatal errors that may cause data corruption or loss.
	CriticalLevel LogLevel = "critical"
)

type LowLevelClient

type LowLevelClient interface {
	// This will be called right after connecting to ConsumerCoordinator so this client can initialize itself
	// with bootstrap broker list for example. May return an error to signal this client is unable to work with given configuration.
	Initialize() error

	// This will be called each time the fetch request to Kafka should be issued. Topic, partition and offset are self-explanatory.
	// Should return a slice of Messages and an error if a fetch error occurred.
	// Note that for performance reasons it makes sense to keep open broker connections and reuse them on every fetch call.
	Fetch(topic string, partition int32, offset int64) ([]*Message, error)

	// This will be called when call to Fetch returns an error. As every client has different error mapping we ask here explicitly
	// whether the returned error is an OffsetOutOfRange error. Should return true if it is, false otherwise.
	IsOffsetOutOfRange(error) bool

	// This will be called to handle OffsetOutOfRange error. OffsetTime will be either "smallest" or "largest".
	// Should return a corresponding offset value and an error if it occurred.
	GetAvailableOffset(topic string, partition int32, offsetTime string) (int64, error)

	// This will be called to gracefully shutdown this client.
	Close()
}

LowLevelClient is a low-level Kafka client that manages broker connections, responsible to fetch metadata and is able to handle Fetch and Offset requests. TODO not sure that's a good name for this interface

type MarathonEventProducer

type MarathonEventProducer struct {
	// contains filtered or unexported fields
}

func NewMarathonEventProducer

func NewMarathonEventProducer(config *MarathonEventProducerConfig) *MarathonEventProducer

func (*MarathonEventProducer) Start

func (this *MarathonEventProducer) Start()

func (*MarathonEventProducer) Stop

func (this *MarathonEventProducer) Stop()

func (*MarathonEventProducer) String

func (this *MarathonEventProducer) String() string

type MarathonEventProducerConfig

type MarathonEventProducerConfig struct {
	// Marathon event producer config.
	ProducerConfig *ProducerConfig

	// Destination topic for all incoming messages.
	Topic string

	// Kafka Broker List host:port,host:port
	BrokerList string

	//HTTP endpoint binding port
	Port int

	// HTTP endpoint url pattern to listen, e.g. "/marathon".
	Pattern string

	// URL to Confluent Schema Registry. This triggers all messages to be sent in Avro format.
	SchemaRegistryUrl string

	// Avro schema to use when producing messages in Avro format.
	AvroSchema avro.Schema

	// Function that generates producer instances
	ProducerConstructor ProducerConstructor
}

func NewMarathonEventProducerConfig

func NewMarathonEventProducerConfig() *MarathonEventProducerConfig

Creates an empty MarathonEventProducerConfig.

type Message

type Message struct {
	// Partition key.
	Key []byte
	// Message value.
	Value []byte
	// Decoded message key
	DecodedKey interface{}
	// Decoded message value
	DecodedValue interface{}
	// Topic this message came from.
	Topic string

	// Partition this message came from.
	Partition int32

	// Message offset.
	Offset int64

	// HighwaterMarkOffset is an offset of the last message in this topic-partition.
	HighwaterMarkOffset int64
}

Single Kafka message that is sent to user-defined Strategy

func (*Message) String

func (m *Message) String() string

type MirrorMaker

type MirrorMaker struct {
	// contains filtered or unexported fields
}

MirrorMaker is a tool to mirror source Kafka cluster into a target (mirror) Kafka cluster. It uses a Kafka consumer to consume messages from the source cluster, and re-publishes those messages to the target cluster.

func NewMirrorMaker

func NewMirrorMaker(config *MirrorMakerConfig) *MirrorMaker

Creates a new MirrorMaker using given MirrorMakerConfig.

func (*MirrorMaker) AddTiming

func (this *MirrorMaker) AddTiming(record *avro.GenericRecord, tag string, now int64) *avro.GenericRecord

func (*MirrorMaker) Errors

func (this *MirrorMaker) Errors() <-chan *FailedMessage

func (*MirrorMaker) Start

func (this *MirrorMaker) Start()

Starts the MirrorMaker. This method is blocking and should probably be run in a separate goroutine.

func (*MirrorMaker) Stop

func (this *MirrorMaker) Stop()

Gracefully stops the MirrorMaker.

type MirrorMakerConfig

type MirrorMakerConfig struct {
	// Whitelist of topics to mirror. Exactly one whitelist or blacklist is allowed.
	Whitelist string

	// Blacklist of topics to mirror. Exactly one whitelist or blacklist is allowed.
	Blacklist string

	// Consumer configurations to consume from a source cluster.
	ConsumerConfigs []string

	// Embedded producer config.
	ProducerConfig string

	// Number of producer instances.
	NumProducers int

	// Number of consumption streams.
	NumStreams int

	// Flag to preserve partition number. E.g. if message was read from partition 5 it'll be written to partition 5. Note that this can affect performance.
	PreservePartitions bool

	// Flag to preserve message order. E.g. message sequence 1, 2, 3, 4, 5 will remain 1, 2, 3, 4, 5 in destination topic. Note that this can affect performance.
	PreserveOrder bool

	// Destination topic prefix. E.g. if message was read from topic "test" and prefix is "dc1_" it'll be written to topic "dc1_test".
	TopicPrefix string

	// Number of messages that are buffered between the consumer and producer.
	ChannelSize int

	// Message keys encoder for producer
	KeyEncoder Encoder

	// Message values encoder for producer
	ValueEncoder Encoder

	// Message keys decoder for consumer
	KeyDecoder Decoder

	// Message values decoder for consumer
	ValueDecoder Decoder

	// Function that generates producer instances
	ProducerConstructor ProducerConstructor

	// Path to producer configuration, that is responsible for logging timings
	// Defines whether add timings to message or not.
	// Note: used only for avro encoded messages
	TimingsProducerConfig string
}

MirrorMakerConfig defines configuration options for MirrorMaker

func NewMirrorMakerConfig

func NewMirrorMakerConfig() *MirrorMakerConfig

Creates an empty MirrorMakerConfig.

type OffsetStorage

type OffsetStorage interface {
	// Gets the offset for a given group, topic and partition.
	// May return an error if fails to retrieve the offset.
	GetOffset(group string, topic string, partition int32) (int64, error)

	// Commits the given offset for a given group, topic and partition.
	// May return an error if fails to commit the offset.
	CommitOffset(group string, topic string, partition int32, offset int64) error
}

OffsetStorage is used to store and retrieve consumer offsets.

type Partitioner

type Partitioner interface {
	Partition(key []byte, numPartitions int32) (int32, error)
	RequiresConsistency() bool
}

func NewFixedPartitioner

func NewFixedPartitioner() Partitioner

func NewHashPartitioner

func NewHashPartitioner() Partitioner

func NewRandomPartitioner

func NewRandomPartitioner() Partitioner

func NewRoundRobinPartitioner

func NewRoundRobinPartitioner() Partitioner

type PartitionerConstructor

type PartitionerConstructor func() Partitioner

type ProcessingFailedResult

type ProcessingFailedResult struct {
	// contains filtered or unexported fields
}

An implementation of WorkerResult interface representing a failure to process incoming message.

func NewProcessingFailedResult

func NewProcessingFailedResult(id TaskId) *ProcessingFailedResult

Creates a new ProcessingFailedResult for given TaskId.

func (*ProcessingFailedResult) Id

func (wr *ProcessingFailedResult) Id() TaskId

Returns an id of task that was processed.

func (*ProcessingFailedResult) String

func (sr *ProcessingFailedResult) String() string

func (*ProcessingFailedResult) Success

func (wr *ProcessingFailedResult) Success() bool

Always returns false for ProcessingFailedResult.

type Producer

type Producer interface {
	Errors() <-chan *FailedMessage
	Successes() <-chan *ProducerMessage
	Input() chan<- *ProducerMessage
	Close() error
	AsyncClose()
}

func NewSaramaProducer

func NewSaramaProducer(conf *ProducerConfig) Producer

type ProducerConfig

type ProducerConfig struct {
	Clientid              string
	BrokerList            []string
	SendBufferSize        int
	CompressionCodec      string
	FlushByteCount        int
	FlushTimeout          time.Duration
	BatchSize             int
	MaxMessageBytes       int
	MaxMessagesPerRequest int
	Acks                  int
	RetryBackoff          time.Duration
	Timeout               time.Duration
	Partitioner           PartitionerConstructor
	KeyEncoder            Encoder
	ValueEncoder          Encoder
	AckSuccesses          bool
}

func DefaultProducerConfig

func DefaultProducerConfig() *ProducerConfig

func ProducerConfigFromFile

func ProducerConfigFromFile(filename string) (*ProducerConfig, error)

ProducerConfigFromFile is a helper function that loads a producer's configuration information from file. The file accepts the following fields:

client.id
metadata.broker.list
send.buffer.size
compression.codec
flush.byte.count
flush.timeout
batch.size
max.message.bytes
max.messages.per.request
acks
retry.backoff
timeout

The configuration file entries should be constructed in key=value syntax. A # symbol at the beginning of a line indicates a comment. Blank lines are ignored. The file should end with a newline character.

func (*ProducerConfig) Validate

func (this *ProducerConfig) Validate() error

type ProducerConstructor

type ProducerConstructor func(config *ProducerConfig) Producer

type ProducerMessage

type ProducerMessage struct {
	Topic        string
	Key          interface{}
	Value        interface{}
	KeyEncoder   Encoder
	ValueEncoder Encoder
	// contains filtered or unexported fields
}

type RandomPartitioner

type RandomPartitioner struct {
	// contains filtered or unexported fields
}

RandomPartitioner implements the Partitioner interface by choosing a random partition each time.

func (*RandomPartitioner) Partition

func (this *RandomPartitioner) Partition(key []byte, numPartitions int32) (int32, error)

func (*RandomPartitioner) RequiresConsistency

func (this *RandomPartitioner) RequiresConsistency() bool

type RegisterSchemaResponse

type RegisterSchemaResponse struct {
	Id int32
}

type RoundRobinPartitioner

type RoundRobinPartitioner struct {
	// contains filtered or unexported fields
}

RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.

func (*RoundRobinPartitioner) Partition

func (this *RoundRobinPartitioner) Partition(key []byte, numPartitions int32) (int32, error)

func (*RoundRobinPartitioner) RequiresConsistency

func (this *RoundRobinPartitioner) RequiresConsistency() bool

type SaramaClient

type SaramaClient struct {
	// contains filtered or unexported fields
}

SaramaClient implements LowLevelClient and uses github.com/Shopify/sarama as underlying implementation.

func NewSaramaClient

func NewSaramaClient(config *ConsumerConfig) *SaramaClient

Creates a new SaramaClient using a given ConsumerConfig.

func (*SaramaClient) Close

func (this *SaramaClient) Close()

Gracefully shuts down this client.

func (*SaramaClient) Fetch

func (this *SaramaClient) Fetch(topic string, partition int32, offset int64) ([]*Message, error)

This will be called each time the fetch request to Kafka should be issued. Topic, partition and offset are self-explanatory. Returns slice of Messages and an error if a fetch error occurred.

func (*SaramaClient) GetAvailableOffset

func (this *SaramaClient) GetAvailableOffset(topic string, partition int32, offsetTime string) (int64, error)

This will be called to handle OffsetOutOfRange error. OffsetTime will be either "smallest" or "largest".

func (*SaramaClient) Initialize

func (this *SaramaClient) Initialize() error

This will be called right after connecting to ConsumerCoordinator so this client can initialize itself with bootstrap broker list for example. May return an error to signal this client is unable to work with given configuration.

func (*SaramaClient) IsOffsetOutOfRange

func (this *SaramaClient) IsOffsetOutOfRange(err error) bool

Checks whether the given error indicates an OffsetOutOfRange error.

func (*SaramaClient) String

func (this *SaramaClient) String() string

Returns a string representation of this SaramaClient.

type SaramaPartitioner

type SaramaPartitioner struct {
	// contains filtered or unexported fields
}

func (*SaramaPartitioner) Partition

func (this *SaramaPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error)

func (*SaramaPartitioner) RequiresConsistency

func (this *SaramaPartitioner) RequiresConsistency() bool

type SaramaPartitionerFactory

type SaramaPartitionerFactory struct {
	// contains filtered or unexported fields
}

func (*SaramaPartitionerFactory) PartitionerConstructor

func (this *SaramaPartitionerFactory) PartitionerConstructor(topic string) sarama.Partitioner

type SaramaProducer

type SaramaProducer struct {
	// contains filtered or unexported fields
}

func (*SaramaProducer) AsyncClose

func (this *SaramaProducer) AsyncClose()

func (*SaramaProducer) Close

func (this *SaramaProducer) Close() error

func (*SaramaProducer) Errors

func (this *SaramaProducer) Errors() <-chan *FailedMessage

func (*SaramaProducer) Input

func (this *SaramaProducer) Input() chan<- *ProducerMessage

func (*SaramaProducer) Successes

func (this *SaramaProducer) Successes() <-chan *ProducerMessage

type SchemaMetadata

type SchemaMetadata struct {
	Id      int32
	Version int32
	Schema  string
}

type SchemaRegistryClient

type SchemaRegistryClient interface {
	Register(subject string, schema avro.Schema) (int32, error)
	GetByID(id int32) (avro.Schema, error)
	GetLatestSchemaMetadata(subject string) (*SchemaMetadata, error)
	GetVersion(subject string, schema avro.Schema) (int32, error)
}

type SiestaClient

type SiestaClient struct {
	// contains filtered or unexported fields
}

SiestaClient implements LowLevelClient and OffsetStorage and uses github.com/stealthly/siesta as underlying implementation.

func NewSiestaClient

func NewSiestaClient(config *ConsumerConfig) *SiestaClient

Creates a new SiestaClient using a given ConsumerConfig.

func (*SiestaClient) Close

func (this *SiestaClient) Close()

Gracefully shuts down this client.

func (*SiestaClient) CommitOffset

func (this *SiestaClient) CommitOffset(group string, topic string, partition int32, offset int64) error

Commits the given offset for a given group, topic and partition. May return an error if fails to commit the offset.

func (*SiestaClient) Fetch

func (this *SiestaClient) Fetch(topic string, partition int32, offset int64) ([]*Message, error)

This will be called each time the fetch request to Kafka should be issued. Topic, partition and offset are self-explanatory. Returns slice of Messages and an error if a fetch error occurred.

func (*SiestaClient) GetAvailableOffset

func (this *SiestaClient) GetAvailableOffset(topic string, partition int32, offsetTime string) (int64, error)

This will be called to handle OffsetOutOfRange error. OffsetTime will be either "smallest" or "largest".

func (*SiestaClient) GetOffset

func (this *SiestaClient) GetOffset(group string, topic string, partition int32) (int64, error)

Gets the offset for a given group, topic and partition. May return an error if fails to retrieve the offset.

func (*SiestaClient) Initialize

func (this *SiestaClient) Initialize() error

This will be called right after connecting to ConsumerCoordinator so this client can initialize itself with bootstrap broker list for example. May return an error to signal this client is unable to work with given configuration.

func (*SiestaClient) IsOffsetOutOfRange

func (this *SiestaClient) IsOffsetOutOfRange(err error) bool

Checks whether the given error indicates an OffsetOutOfRange error.

func (*SiestaClient) String

func (this *SiestaClient) String() string

Returns a string representation of this SaramaClient.

type StateSnapshot

type StateSnapshot struct {
	// Metrics are a map where keys are event names and values are maps holding event values grouped by meters (count, min, max, etc.).
	Metrics map[string]map[string]float64
	// Offsets are a map where keys are topics and values are maps where keys are partitions and values are offsets for these topic-partitions.
	Offsets map[string]map[int32]int64
}

Represents a consumer state snapshot.

type StaticTopicsToNumStreams

type StaticTopicsToNumStreams struct {
	// Consumer id string.
	ConsumerId string
	// Map where keys are topic names and values are number of fetcher routines responsible for processing these topics.
	TopicsToNumStreamsMap map[string]int
}

TopicsToNumStreams implementation representing a static consumer subscription.

func (*StaticTopicsToNumStreams) GetConsumerThreadIdsPerTopic

func (tc *StaticTopicsToNumStreams) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId

Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.

func (*StaticTopicsToNumStreams) GetTopicsToNumStreamsMap

func (tc *StaticTopicsToNumStreams) GetTopicsToNumStreamsMap() map[string]int

Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.

func (*StaticTopicsToNumStreams) Pattern

func (tc *StaticTopicsToNumStreams) Pattern() string

Returns a pattern describing this TopicsToNumStreams.

type StringDecoder

type StringDecoder struct{}

func (*StringDecoder) Decode

func (this *StringDecoder) Decode(bytes []byte) (interface{}, error)

type StringEncoder

type StringEncoder struct{}

func (*StringEncoder) Encode

func (this *StringEncoder) Encode(what interface{}) ([]byte, error)

type SuccessfulResult

type SuccessfulResult struct {
	// contains filtered or unexported fields
}

An implementation of WorkerResult interface representing a successfully processed incoming message.

func NewSuccessfulResult

func NewSuccessfulResult(id TaskId) *SuccessfulResult

Creates a new SuccessfulResult for given TaskId.

func (*SuccessfulResult) Id

func (wr *SuccessfulResult) Id() TaskId

Returns an id of task that was processed.

func (*SuccessfulResult) String

func (sr *SuccessfulResult) String() string

func (*SuccessfulResult) Success

func (wr *SuccessfulResult) Success() bool

Always returns true for SuccessfulResult.

type SyslogMessage

type SyslogMessage struct {
	Message   string
	Timestamp int64
}

type SyslogProducer

type SyslogProducer struct {
	// contains filtered or unexported fields
}

func NewSyslogProducer

func NewSyslogProducer(config *SyslogProducerConfig) *SyslogProducer

func (*SyslogProducer) Start

func (this *SyslogProducer) Start()

func (*SyslogProducer) Stop

func (this *SyslogProducer) Stop()

func (*SyslogProducer) String

func (this *SyslogProducer) String() string

type SyslogProducerConfig

type SyslogProducerConfig struct {
	// Syslog producer config.
	ProducerConfig *ProducerConfig

	// Number of producer instances.
	NumProducers int

	// Number of messages that are buffered to produce.
	ChannelSize int

	Topic string

	// Receive messages from this TCP address and post them to topic.
	TCPAddr string

	// Receive messages from this UDP address and post them to topic.
	UDPAddr string

	// Kafka Broker List host:port,host:port
	BrokerList string

	// Transformer func(message syslogparser.LogParts, topic string) *sarama.ProducerMessage
	Transformer func(message *SyslogMessage, topic string) *sarama.ProducerMessage
}

SyslogProducerConfig defines configuration options for SyslogProducer

func NewSyslogProducerConfig

func NewSyslogProducerConfig() *SyslogProducerConfig

Creates an empty SyslogProducerConfig.

type Task

type Task struct {
	// A message that should be processed.
	Msg *Message

	// Number of retries used for this task.
	Retries int

	// A worker that is responsible for processing this task.
	Callee *Worker
}

Represents a single task for a worker.

func (*Task) Id

func (t *Task) Id() TaskId

Returns an id for this Task.

type TaskAndStrategy

type TaskAndStrategy struct {
	WorkerTask *Task
	Strategy   WorkerStrategy
}

type TaskId

type TaskId struct {
	// Message's topic and partition
	TopicPartition TopicAndPartition

	// Message's offset
	Offset int64
}

Type representing a task id. Consists from topic, partition and offset of a message being processed.

func (TaskId) String

func (tid TaskId) String() string

type TimedOutResult

type TimedOutResult struct {
	// contains filtered or unexported fields
}

An implementation of WorkerResult interface representing a timeout to process incoming message.

func (*TimedOutResult) Id

func (wr *TimedOutResult) Id() TaskId

Returns an id of task that was processed.

func (*TimedOutResult) String

func (sr *TimedOutResult) String() string

func (*TimedOutResult) Success

func (wr *TimedOutResult) Success() bool

Always returns false for TimedOutResult.

type TopicAndPartition

type TopicAndPartition struct {
	Topic     string
	Partition int32
}

Type representing a single Kafka topic and partition

func (*TopicAndPartition) String

func (tp *TopicAndPartition) String() string

type TopicFilter

type TopicFilter interface {
	Regex() string
	TopicAllowed(topic string, excludeInternalTopics bool) bool
}

Either a WhiteList or BlackList consumer topic filter.

type TopicInfo

type TopicInfo struct {
	Version    int16
	Partitions map[string][]int32
}

General information about Kafka topic. Used to keep it in consumer coordinator.

func (*TopicInfo) String

func (t *TopicInfo) String() string

type TopicsToNumStreams

type TopicsToNumStreams interface {
	//Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.
	GetTopicsToNumStreamsMap() map[string]int

	//Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.
	GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId

	//Returns a pattern describing this TopicsToNumStreams.
	Pattern() string
}

Information on Consumer subscription. Used to keep it in consumer coordinator.

func NewStaticTopicsToNumStreams

func NewStaticTopicsToNumStreams(consumerId string,
	topics string,
	pattern string,
	numStreams int,
	excludeInternalTopics bool,
	coordinator ConsumerCoordinator) TopicsToNumStreams

Constructs a new TopicsToNumStreams for consumer with Consumerid id that works within consumer group Groupid. Uses Coordinator to get consumer information. Returns error if fails to retrieve consumer information from Coordinator.

func NewTopicsToNumStreams

func NewTopicsToNumStreams(Groupid string, Consumerid string, Coordinator ConsumerCoordinator, ExcludeInternalTopics bool) (TopicsToNumStreams, error)

Constructs a new TopicsToNumStreams for consumer with Consumerid id that works within consumer group Groupid. Uses Coordinator to get consumer information. Returns error if fails to retrieve consumer information from Coordinator.

type WhiteList

type WhiteList struct {
	// contains filtered or unexported fields
}

WhiteList is a topic filter that will match every topic for a given regex

func NewWhiteList

func NewWhiteList(regex string) *WhiteList

Creates a new WhiteList topic filter for a given regex

func (*WhiteList) Regex

func (wl *WhiteList) Regex() string

func (*WhiteList) TopicAllowed

func (wl *WhiteList) TopicAllowed(topic string, excludeInternalTopics bool) bool

type WildcardTopicsToNumStreams

type WildcardTopicsToNumStreams struct {
	Coordinator           ConsumerCoordinator
	ConsumerId            string
	TopicFilter           TopicFilter
	NumStreams            int
	ExcludeInternalTopics bool
}

TopicsToNumStreams implementation representing either whitelist or blacklist consumer subscription.

func (*WildcardTopicsToNumStreams) GetConsumerThreadIdsPerTopic

func (tc *WildcardTopicsToNumStreams) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId

Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.

func (*WildcardTopicsToNumStreams) GetTopicsToNumStreamsMap

func (tc *WildcardTopicsToNumStreams) GetTopicsToNumStreamsMap() map[string]int

Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.

func (*WildcardTopicsToNumStreams) Pattern

func (tc *WildcardTopicsToNumStreams) Pattern() string

Returns a pattern describing this TopicsToNumStreams.

type Worker

type Worker struct {
	// Channel to write tasks to.
	InputChannel chan *TaskAndStrategy

	// Channel to write processing results to.
	OutputChannel chan WorkerResult

	// Intermediate channel for pushing result to strategy handler
	HandlerInputChannel chan *TaskAndStrategy

	// Intermediate channel for pushing result from strategy handler
	HandlerOutputChannel chan WorkerResult

	// Timeout for a single worker task.
	TaskTimeout time.Duration

	// Indicates whether this worker is closed and cannot accept new work.
	Closed bool
}

Represents a worker that is able to process a single message.

func (*Worker) Start

func (w *Worker) Start()

Starts processing a given task using given strategy with this worker. Call to this method blocks until the task is done or timed out.

func (*Worker) Stop

func (w *Worker) Stop()

func (*Worker) String

func (w *Worker) String() string

type WorkerManager

type WorkerManager struct {
	// contains filtered or unexported fields
}

WorkerManager is responsible for splitting the incomming batches of messages between a configured amount of workers. It also keeps track of highest processed offsets and commits them to offset storage with a configurable frequency.

func NewWorkerManager

func NewWorkerManager(id string, config *ConsumerConfig, topicPartition TopicAndPartition, metrics *ConsumerMetrics, closeConsumer chan bool) *WorkerManager

Creates a new WorkerManager with given id using a given ConsumerConfig and responsible for managing given TopicAndPartition.

func (*WorkerManager) GetLargestOffset

func (wm *WorkerManager) GetLargestOffset() int64

Gets the highest offset that has been processed by this WorkerManager.

func (*WorkerManager) IsBatchProcessed

func (wm *WorkerManager) IsBatchProcessed() bool

Asks this WorkerManager whether the current batch is fully processed. Returns true if so, false otherwise.

func (*WorkerManager) Start

func (wm *WorkerManager) Start()

Starts processing incoming batches with this WorkerManager. Processing is possible only in batch-at-once mode. It also launches an offset committer routine. Call to this method blocks.

func (*WorkerManager) Stop

func (wm *WorkerManager) Stop() chan bool

Tells this WorkerManager to finish processing current batch, stop accepting new work and shut down. This method returns immediately and returns a channel which will get the value once the shut down is finished.

func (*WorkerManager) String

func (wm *WorkerManager) String() string

func (*WorkerManager) UpdateLargestOffset

func (wm *WorkerManager) UpdateLargestOffset(offset int64)

Updates the highest offset that has been processed by this WorkerManager with a new value.

type WorkerResult

type WorkerResult interface {
	// Returns an id of task that was processed.
	Id() TaskId

	// Returns true if processing succeeded, false otherwise.
	Success() bool
}

Interface that represents a result of processing incoming message.

type WorkerStrategy

type WorkerStrategy func(*Worker, *Message, TaskId) WorkerResult

Defines what to do with a single Kafka message. Returns a WorkerResult to distinguish successful and unsuccessful processings.

type ZookeeperConfig

type ZookeeperConfig struct {
	/* Zookeeper hosts */
	ZookeeperConnect []string

	/* Zookeeper read timeout */
	ZookeeperTimeout time.Duration

	/* Max retries for any request except CommitOffset. CommitOffset is controlled by ConsumerConfig.OffsetsCommitMaxRetries. */
	MaxRequestRetries int

	/* Backoff to retry any request */
	RequestBackoff time.Duration

	/* kafka Root */
	Root string

	// PanicHandler is a function that will be called when unrecoverable error occurs to give the possibility to perform cleanups, recover from panic etc
	PanicHandler func(error)
}

ZookeeperConfig is used to pass multiple configuration entries to ZookeeperCoordinator.

func NewZookeeperConfig

func NewZookeeperConfig() *ZookeeperConfig

Created a new ZookeeperConfig with sane defaults. Default ZookeeperConnect points to localhost.

func ZookeeperConfigFromFile

func ZookeeperConfigFromFile(filename string) (*ZookeeperConfig, error)

ZookeeperConfigFromFile is a helper function that loads zookeeper configuration information from file. The file accepts the following fields:

zookeeper.connect
zookeeper.kafka.root
zookeeper.connection.timeout
zookeeper.max.request.retries
zookeeper.request.backoff

The configuration file entries should be constructed in key=value syntax. A # symbol at the beginning of a line indicates a comment. Blank lines are ignored. The file should end with a newline character.

type ZookeeperCoordinator

type ZookeeperCoordinator struct {
	// contains filtered or unexported fields
}

ZookeeperCoordinator implements ConsumerCoordinator and OffsetStorage interfaces and is used to coordinate multiple consumers that work within the same consumer group as well as storing and retrieving their offsets.

func NewZookeeperCoordinator

func NewZookeeperCoordinator(Config *ZookeeperConfig) *ZookeeperCoordinator

Creates a new ZookeeperCoordinator with a given configuration. The new created ZookeeperCoordinator does NOT automatically connect to zookeeper, you should call Connect() explicitly

func (*ZookeeperCoordinator) AwaitOnStateBarrier

func (this *ZookeeperCoordinator) AwaitOnStateBarrier(consumerId string, group string, barrierName string,
	barrierSize int, api string, timeout time.Duration) bool

func (*ZookeeperCoordinator) ClaimPartitionOwnership

func (this *ZookeeperCoordinator) ClaimPartitionOwnership(Groupid string, Topic string, Partition int32, consumerThreadId ConsumerThreadId) (bool, error)

Tells the ConsumerCoordinator to claim partition topic Topic and partition Partition for consumerThreadId fetcher that works within a consumer group Group. Returns true if claim is successful, false and error explaining failure otherwise.

func (*ZookeeperCoordinator) CommitOffset

func (this *ZookeeperCoordinator) CommitOffset(Groupid string, Topic string, Partition int32, Offset int64) error

Tells the ConsumerCoordinator to commit offset Offset for topic and partition TopicPartition for consumer group Groupid. Returns error if failed to commit offset.

func (*ZookeeperCoordinator) Connect

func (this *ZookeeperCoordinator) Connect() (err error)

Establish connection to this ConsumerCoordinator. Returns an error if fails to connect, nil otherwise.

func (*ZookeeperCoordinator) DeregisterConsumer

func (this *ZookeeperCoordinator) DeregisterConsumer(Consumerid string, Groupid string) (err error)

Deregisters consumer with Consumerid id that is a part of consumer group Groupid form this ConsumerCoordinator. Returns an error if deregistration failed, nil otherwise.

func (*ZookeeperCoordinator) Disconnect

func (this *ZookeeperCoordinator) Disconnect()

func (*ZookeeperCoordinator) GetAllBrokers

func (this *ZookeeperCoordinator) GetAllBrokers() (brokers []*BrokerInfo, err error)

Gets the information about all Kafka brokers registered in this ConsumerCoordinator. Returns a slice of BrokerInfo and error on failure.

func (*ZookeeperCoordinator) GetAllTopics

func (this *ZookeeperCoordinator) GetAllTopics() (topics []string, err error)

Gets the list of all topics registered in this ConsumerCoordinator. Returns a slice conaining topic names and error on failure.

func (*ZookeeperCoordinator) GetBlueGreenRequest

func (this *ZookeeperCoordinator) GetBlueGreenRequest(Group string) (topics map[string]*BlueGreenDeployment, err error)

Gets all deployed topics for consume group Group from consumer coordinator. Returns a map where keys are notification ids and values are DeployedTopics. May also return an error (e.g. if failed to reach coordinator).

func (*ZookeeperCoordinator) GetConsumerInfo

func (this *ZookeeperCoordinator) GetConsumerInfo(Consumerid string, Groupid string) (info *ConsumerInfo, err error)

Gets the information about consumer with Consumerid id that is a part of consumer group Groupid from this ConsumerCoordinator. Returns ConsumerInfo on success and error otherwise (For example if consumer with given Consumerid does not exist).

func (*ZookeeperCoordinator) GetConsumersInGroup

func (this *ZookeeperCoordinator) GetConsumersInGroup(Groupid string) (consumers []string, err error)

Gets the list of all consumer ids within a consumer group Groupid. Returns a slice containing all consumer ids in group and error on failure.

func (*ZookeeperCoordinator) GetConsumersPerTopic

func (this *ZookeeperCoordinator) GetConsumersPerTopic(Groupid string, ExcludeInternalTopics bool) (consumers map[string][]ConsumerThreadId, err error)

Gets the information about consumers per topic in consumer group Groupid excluding internal topics (such as offsets) if ExcludeInternalTopics = true. Returns a map where keys are topic names and values are slices of consumer ids and fetcher ids associated with this topic and error on failure.

func (*ZookeeperCoordinator) GetOffset

func (this *ZookeeperCoordinator) GetOffset(Groupid string, topic string, partition int32) (offset int64, err error)

Gets the offset for a given topic, partition and consumer group. Returns offset on sucess, error otherwise.

func (*ZookeeperCoordinator) GetPartitionsForTopics

func (this *ZookeeperCoordinator) GetPartitionsForTopics(Topics []string) (partitions map[string][]int32, err error)

Gets the information about existing partitions for a given Topics. Returns a map where keys are topic names and values are slices of partition ids associated with this topic and error on failure.

func (*ZookeeperCoordinator) RegisterConsumer

func (this *ZookeeperCoordinator) RegisterConsumer(Consumerid string, Groupid string, TopicCount TopicsToNumStreams) (err error)

Registers a new consumer with Consumerid id and TopicCount subscription that is a part of consumer group Groupid in this ConsumerCoordinator. Returns an error if registration failed, nil otherwise.

func (*ZookeeperCoordinator) ReleasePartitionOwnership

func (this *ZookeeperCoordinator) ReleasePartitionOwnership(Groupid string, Topic string, Partition int32) error

Tells the ConsumerCoordinator to release partition ownership on topic Topic and partition Partition for consumer group Groupid. Returns error if failed to released partition ownership.

func (*ZookeeperCoordinator) RemoveOldApiRequests

func (this *ZookeeperCoordinator) RemoveOldApiRequests(group string) (err error)

func (*ZookeeperCoordinator) RemoveStateBarrier

func (this *ZookeeperCoordinator) RemoveStateBarrier(group string, stateHash string, api string) error

func (*ZookeeperCoordinator) RequestBlueGreenDeployment

func (this *ZookeeperCoordinator) RequestBlueGreenDeployment(blue BlueGreenDeployment, green BlueGreenDeployment) error

func (*ZookeeperCoordinator) String

func (this *ZookeeperCoordinator) String() string

func (*ZookeeperCoordinator) SubscribeForChanges

func (this *ZookeeperCoordinator) SubscribeForChanges(Groupid string) (events <-chan CoordinatorEvent, err error)

Subscribes for any change that should trigger consumer rebalance on consumer group Groupid in this ConsumerCoordinator. Returns a read-only channel of booleans that will get values on any significant coordinator event (e.g. new consumer appeared, new broker appeared etc.) and error if failed to subscribe.

func (*ZookeeperCoordinator) Unsubscribe

func (this *ZookeeperCoordinator) Unsubscribe()

Tells the ConsumerCoordinator to unsubscribe from events for the consumer it is associated with.

Directories

Path Synopsis
perf
syslog_proto
Package syslog_proto is a generated protocol buffer package.
Package syslog_proto is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL