Documentation ¶
Overview ¶
Package go_kafka_client provides a high-level Kafka consumer implementation and introduces different approach than Java/Scala high-level consumer.
Primary differences include workers concept enforcing at least once processing before committing offsets, improved rebalancing algorithm - closing obsolete connections and opening new connections without stopping the whole consumer, graceful shutdown support notifying client when it is over, batch processing, static partitions configuration support allowing to start a consumer with a predefined set of partitions never caring about rebalancing.
Index ¶
- Constants
- func BootstrapBrokers(coordinator ConsumerCoordinator) ([]string, error)
- func CreateMultiplePartitionsTopic(zk string, topicName string, numPartitions int)
- func Critical(tag interface{}, message interface{})
- func Criticalf(tag interface{}, message interface{}, params ...interface{})
- func Debug(tag interface{}, message interface{})
- func Debugf(tag interface{}, message interface{}, params ...interface{})
- func EnsureHasLeader(zkConnect string, topic string)
- func Error(tag interface{}, message interface{})
- func Errorf(tag interface{}, message interface{}, params ...interface{})
- func Info(tag interface{}, message interface{})
- func Infof(tag interface{}, message interface{}, params ...interface{})
- func LoadConfiguration(Path string) (map[string]string, error)
- func Trace(tag interface{}, message interface{})
- func Tracef(tag interface{}, message interface{}, params ...interface{})
- func Warn(tag interface{}, message interface{})
- func Warnf(tag interface{}, message interface{}, params ...interface{})
- type BlackList
- type BlueGreenDeployment
- type BrokerInfo
- type ByteDecoder
- type ByteEncoder
- type CodahaleKafkaReporter
- type Consumer
- func (c *Consumer) Close() <-chan bool
- func (c *Consumer) Metrics() *ConsumerMetrics
- func (c *Consumer) StartStatic(topicCountMap map[string]int)
- func (c *Consumer) StartStaticPartitions(topicPartitionMap map[string][]int32)
- func (c *Consumer) StartWildcard(topicFilter TopicFilter, numStreams int)
- func (c *Consumer) StateSnapshot() *StateSnapshot
- func (c *Consumer) String() string
- type ConsumerConfig
- type ConsumerCoordinator
- type ConsumerGroupApi
- type ConsumerInfo
- type ConsumerMetrics
- type ConsumerThreadId
- type CoordinatorEvent
- type Decoder
- type DefaultLogger
- func (dl *DefaultLogger) Critical(message string, params ...interface{})
- func (dl *DefaultLogger) Debug(message string, params ...interface{})
- func (dl *DefaultLogger) Error(message string, params ...interface{})
- func (dl *DefaultLogger) GetLogLevel() LogLevel
- func (dl *DefaultLogger) Info(message string, params ...interface{})
- func (dl *DefaultLogger) IsAllowed(loglevel LogLevel) bool
- func (dl *DefaultLogger) Trace(message string, params ...interface{})
- func (dl *DefaultLogger) Warn(message string, params ...interface{})
- type EmptyEmitter
- type Encoder
- type ErrorType
- type FailedAttemptCallback
- type FailedCallback
- type FailedDecision
- type FailureCounter
- type GroupWatch
- type KafkaLogEmitter
- func (k *KafkaLogEmitter) Close()
- func (k *KafkaLogEmitter) Critical(message string, params ...interface{})
- func (k *KafkaLogEmitter) Debug(message string, params ...interface{})
- func (k *KafkaLogEmitter) Emit(logLine *avro.LogLine)
- func (k *KafkaLogEmitter) Error(message string, params ...interface{})
- func (k *KafkaLogEmitter) Info(message string, params ...interface{})
- func (k *KafkaLogEmitter) Trace(message string, params ...interface{})
- func (k *KafkaLogEmitter) Warn(message string, params ...interface{})
- type KafkaLogEmitterConfig
- type KafkaLogger
- type KafkaMetricReporter
- type LogEmitter
- type LogLevel
- type LowLevelClient
- type Message
- type MirrorMaker
- type MirrorMakerConfig
- type OffsetStorage
- type ProcessingFailedResult
- type RoutinePool
- type SiestaClient
- func (this *SiestaClient) Close()
- func (this *SiestaClient) CommitOffset(group string, topic string, partition int32, offset int64) error
- func (this *SiestaClient) Fetch(topic string, partition int32, offset int64) ([]*Message, error)
- func (this *SiestaClient) GetAvailableOffset(topic string, partition int32, offsetTime string) (int64, error)
- func (this *SiestaClient) GetErrorType(err error) ErrorType
- func (this *SiestaClient) GetOffset(group string, topic string, partition int32) (int64, error)
- func (this *SiestaClient) Initialize() error
- func (this *SiestaClient) String() string
- type StateSnapshot
- type StaticTopicsToNumStreams
- type StringDecoder
- type StringEncoder
- type SuccessfulResult
- type Task
- type TaskAndStrategy
- type TaskId
- type TimedOutResult
- type TopicAndPartition
- type TopicFilter
- type TopicInfo
- type TopicsToNumStreams
- type WhiteList
- type WildcardTopicsToNumStreams
- type Worker
- type WorkerManager
- type WorkerResult
- type WorkerStrategy
- type ZookeeperConfig
- type ZookeeperCoordinator
- func (this *ZookeeperCoordinator) AwaitOnStateBarrier(consumerId string, group string, barrierName string, barrierSize int, ...) bool
- func (this *ZookeeperCoordinator) ClaimPartitionOwnership(Groupid string, Topic string, Partition int32, ...) (bool, error)
- func (this *ZookeeperCoordinator) CommitOffset(Groupid string, Topic string, Partition int32, Offset int64) error
- func (this *ZookeeperCoordinator) Connect() (err error)
- func (this *ZookeeperCoordinator) DeregisterConsumer(Consumerid string, Groupid string) (err error)
- func (this *ZookeeperCoordinator) Disconnect()
- func (this *ZookeeperCoordinator) GetAllBrokers() (brokers []*BrokerInfo, err error)
- func (this *ZookeeperCoordinator) GetAllTopics() (topics []string, err error)
- func (this *ZookeeperCoordinator) GetBlueGreenRequest(Group string) (topics map[string]*BlueGreenDeployment, err error)
- func (this *ZookeeperCoordinator) GetConsumerInfo(Consumerid string, Groupid string) (info *ConsumerInfo, err error)
- func (this *ZookeeperCoordinator) GetConsumersInGroup(Groupid string) (consumers []string, err error)
- func (this *ZookeeperCoordinator) GetConsumersPerTopic(Groupid string, ExcludeInternalTopics bool) (consumers map[string][]ConsumerThreadId, err error)
- func (this *ZookeeperCoordinator) GetOffset(Groupid string, topic string, partition int32) (offset int64, err error)
- func (this *ZookeeperCoordinator) GetPartitionsForTopics(Topics []string) (partitions map[string][]int32, err error)
- func (this *ZookeeperCoordinator) RegisterConsumer(Consumerid string, Groupid string, TopicCount TopicsToNumStreams) (err error)
- func (this *ZookeeperCoordinator) ReleasePartitionOwnership(Groupid string, Topic string, Partition int32) error
- func (this *ZookeeperCoordinator) RemoveOldApiRequests(group string) (err error)
- func (this *ZookeeperCoordinator) RemoveStateBarrier(group string, stateHash string, api string) error
- func (this *ZookeeperCoordinator) RequestBlueGreenDeployment(blue BlueGreenDeployment, green BlueGreenDeployment) error
- func (this *ZookeeperCoordinator) String() string
- func (this *ZookeeperCoordinator) SubscribeForChanges(Groupid string) (events <-chan CoordinatorEvent, err error)
- func (this *ZookeeperCoordinator) Unsubscribe()
Constants ¶
const ( // Offset with invalid value InvalidOffset int64 = -1 // Reset the offset to the smallest offset if it is out of range SmallestOffset = "smallest" // Reset the offset to the largest offset if it is out of range LargestOffset = "largest" )
const ( /* Logtypeid field value for LogLine indicating Trace log level */ TraceLogTypeId int64 = 1 /* Logtypeid field value for LogLine indicating Debug log level */ DebugLogTypeId int64 = 2 /* Logtypeid field value for LogLine indicating Info log level */ InfoLogTypeId int64 = 3 /* Logtypeid field value for LogLine indicating Warn log level */ WarnLogTypeId int64 = 4 /* Logtypeid field value for LogLine indicating Error log level */ ErrorLogTypeId int64 = 5 /* Logtypeid field value for LogLine indicating Critical log level */ CriticalLogTypeId int64 = 6 /* Logtypeid field value for LogLine indicating Metrics data */ MetricsLogTypeId int64 = 7 )
const ( /* Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1 and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1 */ RangeStrategy = "range" /* The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.) (For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance and thread-id within that instance. Therefore, round-robin assignment is allowed only if: a) Every topic has the same number of streams within a consumer instance b) The set of subscribed topics is identical for every consumer instance within the group. */ RoundRobinStrategy = "roundrobin" )
Variables ¶
This section is empty.
Functions ¶
func BootstrapBrokers ¶
func BootstrapBrokers(coordinator ConsumerCoordinator) ([]string, error)
BootstrapBrokers queries the ConsumerCoordinator for all known brokers in the cluster to be used later as a bootstrap list for the LowLevelClient.
func CreateMultiplePartitionsTopic ¶
Convenience utility to create a topic topicName with numPartitions partitions in Zookeeper located at zk (format should be host:port). Please note that this requires Apache Kafka 0.8.1 binary distribution available through KAFKA_PATH environment variable
func Critical ¶
func Critical(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Critical.
func Criticalf ¶
func Criticalf(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Critical.
func Debug ¶
func Debug(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Debug.
func Debugf ¶
func Debugf(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Debug.
func EnsureHasLeader ¶
blocks until the leader for every partition of a given topic appears this is used by tests only to avoid "In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes"
func Error ¶
func Error(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Error.
func Errorf ¶
func Errorf(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Error.
func Info ¶
func Info(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Info.
func Infof ¶
func Infof(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Info.
func LoadConfiguration ¶
Loads a property file located at Path. Returns a map[string]string or error.
func Trace ¶
func Trace(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Trace.
func Tracef ¶
func Tracef(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Trace.
Types ¶
type BlackList ¶
type BlackList struct {
// contains filtered or unexported fields
}
BlackList is a topic filter that will match every topic that does not match a given regex
func NewBlackList ¶
Creates a new BlackList topic filter for a given regex
type BlueGreenDeployment ¶
type BlueGreenDeployment struct { // Comma separated list of topics to consume from Topics string // Either black_list, white_list or static Pattern string //Consumer group to switch to Group string }
DeployedTopics contain information needed to do a successful blue-green deployment. It contains a comma-separated list of new topics to consume if the pattern is static and regex for wildcard consuming.
type BrokerInfo ¶
General information about Kafka broker. Used to keep it in consumer coordinator.
func (*BrokerInfo) String ¶
func (b *BrokerInfo) String() string
type ByteDecoder ¶
type ByteDecoder struct{}
func (*ByteDecoder) Decode ¶
func (this *ByteDecoder) Decode(bytes []byte) (interface{}, error)
type ByteEncoder ¶
type ByteEncoder struct{}
func (*ByteEncoder) Encode ¶
func (this *ByteEncoder) Encode(what interface{}) ([]byte, error)
type CodahaleKafkaReporter ¶
type CodahaleKafkaReporter struct {
// contains filtered or unexported fields
}
func NewCodahaleKafkaReporter ¶
func NewCodahaleKafkaReporter(topic string, schemaRegistryUrl string, producerConfig *producer.ProducerConfig, connectorConfig *siesta.ConnectorConfig) (*CodahaleKafkaReporter, error)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a high-level Kafka consumer designed to work within a consumer group. It subscribes to coordinator events and is able to balance load within a consumer group.
func NewConsumer ¶
func NewConsumer(config *ConsumerConfig) *Consumer
NewConsumer creates a new Consumer with a given configuration. Creating a Consumer does not start fetching immediately.
func (*Consumer) Close ¶
Tells the Consumer to close all existing connections and stop. This method is NOT blocking but returns a channel which will get a single value once the closing is finished.
func (*Consumer) Metrics ¶
func (c *Consumer) Metrics() *ConsumerMetrics
func (*Consumer) StartStatic ¶
Starts consuming specified topics using a configured amount of goroutines for each topic.
func (*Consumer) StartStaticPartitions ¶
Starts consuming given topic-partitions using ConsumerConfig.NumConsumerFetchers goroutines for each topic.
func (*Consumer) StartWildcard ¶
func (c *Consumer) StartWildcard(topicFilter TopicFilter, numStreams int)
Starts consuming all topics which correspond to a given topicFilter using numStreams goroutines for each topic.
func (*Consumer) StateSnapshot ¶
func (c *Consumer) StateSnapshot() *StateSnapshot
Returns a state snapshot for this consumer. State snapshot contains a set of metrics splitted by topics and partitions.
type ConsumerConfig ¶
type ConsumerConfig struct { /* A string that uniquely identifies a set of consumers within the same consumer group */ Groupid string /* A string that uniquely identifies a consumer within a group. Generated automatically if not set. Set this explicitly for only testing purpose. */ Consumerid string /* The socket timeout for network requests. Its value should be at least FetchWaitMaxMs. */ SocketTimeout time.Duration /* The maximum number of bytes to attempt to fetch */ FetchMessageMaxBytes int32 /* The number of goroutines used to fetch data */ NumConsumerFetchers int /* Max number of message batches buffered for consumption, each batch can be up to FetchBatchSize */ QueuedMaxMessages int32 /* Max number of retries during rebalance */ RebalanceMaxRetries int32 /* The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */ FetchMinBytes int32 /* The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy FetchMinBytes */ FetchWaitMaxMs int32 /* Backoff time between retries during rebalance */ RebalanceBackoff time.Duration /* Backoff time to refresh the leader of a partition after it loses the current leader */ RefreshLeaderBackoff time.Duration /* Retry the offset commit up to this many times on failure. */ OffsetsCommitMaxRetries int /* Try to commit offset every OffsetCommitInterval. If previous offset commit for a partition is still in progress updates the next offset to commit and continues. This way it does not commit all the offset history if the coordinator is slow, but only the highest offsets. */ OffsetCommitInterval time.Duration /* What to do if an offset is out of range. SmallestOffset : automatically reset the offset to the smallest offset. LargestOffset : automatically reset the offset to the largest offset. Defaults to LargestOffset. */ AutoOffsetReset string /* Client id is specified by the kafka consumer client, used to distinguish different clients. */ Clientid string /* Whether messages from internal topics (such as offsets) should be exposed to the consumer. */ ExcludeInternalTopics bool /* Select a strategy for assigning partitions to consumer streams. Possible values: RangeStrategy, RoundRobinStrategy */ PartitionAssignmentStrategy string /* Amount of workers per partition to process consumed messages. */ NumWorkers int /* Times to retry processing a failed message by a worker. */ MaxWorkerRetries int /* Worker retry threshold. Increments each time a worker fails to process a message within MaxWorkerRetries. When this threshold is hit within a WorkerThresholdTimeWindow, WorkerFailureCallback is called letting the user to decide whether the consumer should stop. */ WorkerRetryThreshold int32 /* Resets WorkerRetryThreshold if it isn't hit within this period. */ WorkerThresholdTimeWindow time.Duration /* Callback executed when WorkerRetryThreshold exceeded within WorkerThresholdTimeWindow */ WorkerFailureCallback FailedCallback /* Callback executed when Worker failed to process the message after MaxWorkerRetries and WorkerRetryThreshold is not hit */ WorkerFailedAttemptCallback FailedAttemptCallback /* Worker timeout to process a single message. */ WorkerTaskTimeout time.Duration /* Backoff between worker attempts to process a single message. */ WorkerBackoff time.Duration /* Maximum wait time to gracefully stop a worker manager */ WorkerManagersStopTimeout time.Duration /* A function which defines a user-specified action on a single message. This function is responsible for actual message processing. Consumer panics if Strategy is not set. */ Strategy WorkerStrategy /* Number of messages to accumulate before flushing them to workers */ FetchBatchSize int /* Timeout to accumulate messages. Flushes accumulated batch to workers even if it is not yet full. Resets after each flush meaning this won't be triggered if FetchBatchSize is reached before timeout. */ FetchBatchTimeout time.Duration /* Backoff between fetch requests if no messages were fetched from a previous fetch. */ RequeueAskNextBackoff time.Duration /* Buffer size for ask next channel. This value shouldn't be less than number of partitions per fetch routine. */ AskNextChannelSize int /* Maximum fetch retries if no messages were fetched from a previous fetch */ FetchMaxRetries int /* Maximum retries to fetch topic metadata from one broker. */ FetchTopicMetadataRetries int /* Backoff for fetch topic metadata request if the previous request failed. */ FetchTopicMetadataBackoff time.Duration /* Backoff between two fetch requests for one fetch routine. Needed to prevent fetcher from querying the broker too frequently. */ FetchRequestBackoff time.Duration /* Coordinator used to coordinate consumer's actions, e.g. trigger rebalance events, store offsets and consumer metadata etc. */ Coordinator ConsumerCoordinator /* OffsetStorage is used to store and retrieve consumer offsets. */ OffsetStorage OffsetStorage /* Indicates whether the client supports blue-green deployment. This config entry is needed because blue-green deployment won't work with RoundRobin partition assignment strategy. Defaults to true. */ BlueGreenDeploymentEnabled bool /* Time to wait after consumer has registered itself in group */ DeploymentTimeout time.Duration /* Service coordinator barrier timeout */ BarrierTimeout time.Duration /* Low Level Kafka Client implementation. */ LowLevelClient LowLevelClient /* Message keys decoder */ KeyDecoder Decoder /* Message values decoder */ ValueDecoder Decoder /* Flag for debug mode */ Debug bool /* Metrics Prefix if the client wants to organize the way metric names are emitted. (optional) */ MetricsPrefix string /* Config to skip corrupted messages. If set to true the consumer will increment the topic-partition offset by 1 on each corrupted response until the corrupted part of data is over. Turned off by default. */ SkipCorruptedMessages bool /* RoutinePoolSize defines the size of routine pools created within this consumer. */ RoutinePoolSize int }
ConsumerConfig defines configuration options for Consumer
func ConsumerConfigFromFile ¶
func ConsumerConfigFromFile(filename string) (*ConsumerConfig, error)
ConsumerConfigFromFile is a helper function that loads a consumer's configuration from file. The file accepts the following fields:
group.id consumer.id fetch.message.max.bytes num.consumer.fetchers rebalance.max.retries queued.max.message.chunks fetch.min.bytes fetch.wait.max.ms rebalance.backoff refresh.leader.backoff offset.commit.max.retries offset.commit.interval offsets.storage auto.offset.reset exclude.internal.topics partition.assignment.strategy num.workers max.worker.retries worker.retry.threshold worker.threshold.time.window worker.task.timeout worker.backoff worker.managers.stop.timeout fetch.batch.size fetch.batch.timeout requeue.ask.next.backoff fetch.max.retries fetch.topic.metadata.retries fetch.topic.metadata.backoff fetch.request.backoff blue.green.deployment.enabled
The configuration file entries should be constructed in key=value syntax. A # symbol at the beginning of a line indicates a comment. Blank lines are ignored. The file should end with a newline character.
func DefaultConsumerConfig ¶
func DefaultConsumerConfig() *ConsumerConfig
DefaultConsumerConfig creates a ConsumerConfig with sane defaults. Note that several required config entries (like Strategy and callbacks) are still not set.
func (*ConsumerConfig) String ¶
func (c *ConsumerConfig) String() string
func (*ConsumerConfig) Validate ¶
func (c *ConsumerConfig) Validate() error
Validate this ConsumerConfig. Returns a corresponding error if the ConsumerConfig is invalid and nil otherwise.
type ConsumerCoordinator ¶
type ConsumerCoordinator interface { /* Establish connection to this ConsumerCoordinator. Returns an error if fails to connect, nil otherwise. */ Connect() error /* Close connection to this ConsumerCoordinator. */ Disconnect() /* Registers a new consumer with Consumerid id and TopicCount subscription that is a part of consumer group Group in this ConsumerCoordinator. Returns an error if registration failed, nil otherwise. */ RegisterConsumer(Consumerid string, Group string, TopicCount TopicsToNumStreams) error /* Deregisters consumer with Consumerid id that is a part of consumer group Group form this ConsumerCoordinator. Returns an error if deregistration failed, nil otherwise. */ DeregisterConsumer(Consumerid string, Group string) error /* Gets the information about consumer with Consumerid id that is a part of consumer group Group from this ConsumerCoordinator. Returns ConsumerInfo on success and error otherwise (For example if consumer with given Consumerid does not exist). */ GetConsumerInfo(Consumerid string, Group string) (*ConsumerInfo, error) /* Gets the information about consumers per topic in consumer group Group excluding internal topics (such as offsets) if ExcludeInternalTopics = true. Returns a map where keys are topic names and values are slices of consumer ids and fetcher ids associated with this topic and error on failure. */ GetConsumersPerTopic(Group string, ExcludeInternalTopics bool) (map[string][]ConsumerThreadId, error) /* Gets the list of all consumer ids within a consumer group Group. Returns a slice containing all consumer ids in group and error on failure. */ GetConsumersInGroup(Group string) ([]string, error) /* Gets the list of all topics registered in this ConsumerCoordinator. Returns a slice conaining topic names and error on failure. */ GetAllTopics() ([]string, error) /* Gets the information about existing partitions for a given Topics. Returns a map where keys are topic names and values are slices of partition ids associated with this topic and error on failure. */ GetPartitionsForTopics(Topics []string) (map[string][]int32, error) /* Gets the information about all Kafka brokers registered in this ConsumerCoordinator. Returns a slice of BrokerInfo and error on failure. */ GetAllBrokers() ([]*BrokerInfo, error) /* Subscribes for any change that should trigger consumer rebalance on consumer group Group in this ConsumerCoordinator or trigger topic switch. Returns a read-only channel of CoordinatorEvent that will get values on any significant coordinator event (e.g. new consumer appeared, new broker appeared etc.) and error if failed to subscribe. */ SubscribeForChanges(Group string) (<-chan CoordinatorEvent, error) /* Requests that a blue/green deployment be done.*/ RequestBlueGreenDeployment(blue BlueGreenDeployment, green BlueGreenDeployment) error /* Gets all deployed topics for consume group Group from consumer coordinator. Returns a map where keys are notification ids and values are DeployedTopics. May also return an error (e.g. if failed to reach coordinator). */ GetBlueGreenRequest(Group string) (map[string]*BlueGreenDeployment, error) /* Implements classic barrier synchronization primitive via service coordinator facilities */ AwaitOnStateBarrier(consumerId string, group string, stateHash string, barrierSize int, api string, timeout time.Duration) bool /* Removes state barrier */ RemoveStateBarrier(group string, stateHash string, api string) error /* Tells the ConsumerCoordinator to unsubscribe from events for the consumer it is associated with. */ Unsubscribe() /* Tells the ConsumerCoordinator to claim partition topic Topic and partition Partition for ConsumerThreadId fetcher that works within a consumer group Group. Returns true if claim is successful, false and error explaining failure otherwise. */ ClaimPartitionOwnership(Group string, Topic string, Partition int32, ConsumerThreadId ConsumerThreadId) (bool, error) /* Tells the ConsumerCoordinator to release partition ownership on topic Topic and partition Partition for consumer group Group. Returns error if failed to released partition ownership. */ ReleasePartitionOwnership(Group string, Topic string, Partition int32) error /* Removes old api objects */ RemoveOldApiRequests(group string) error }
ConsumerCoordinator is used to coordinate actions of multiple consumers within the same consumer group. It is responsible for keeping track of alive consumers, manages their offsets and assigns partitions to consume. The current default ConsumerCoordinator is ZookeeperCoordinator. More of them can be added in future.
type ConsumerGroupApi ¶
type ConsumerGroupApi string
const ( BlueGreenDeploymentAPI ConsumerGroupApi = "blue_green" Rebalance ConsumerGroupApi = "rebalance" )
type ConsumerInfo ¶
type ConsumerInfo struct { Version int16 `json:"version"` Subscription map[string]int `json:"subscription"` Pattern string `json:"pattern"` Timestamp int64 `json:"timestamp,string"` }
General information about Kafka consumer. Used to keep it in consumer coordinator.
func (*ConsumerInfo) String ¶
func (c *ConsumerInfo) String() string
type ConsumerMetrics ¶
type ConsumerMetrics struct {
// contains filtered or unexported fields
}
type ConsumerThreadId ¶
Consumer routine id. Used to keep track of what consumer routine consumes a particular topic-partition in consumer coordinator.
func (*ConsumerThreadId) String ¶
func (c *ConsumerThreadId) String() string
type CoordinatorEvent ¶
type CoordinatorEvent string
CoordinatorEvent is sent by consumer coordinator representing some state change.
const ( // A regular coordinator event that should normally trigger consumer rebalance. Regular CoordinatorEvent = "Regular" // Coordinator event that should trigger consumer re-registrer Reinitialize CoordinatorEvent = "Reinitialize" // A coordinator event that informs a consumer group of new deployed topics. BlueGreenRequest CoordinatorEvent = "BlueGreenRequest" )
type DefaultLogger ¶
type DefaultLogger struct {
// contains filtered or unexported fields
}
Default implementation of KafkaLogger interface used in this client.
func NewDefaultLogger ¶
func NewDefaultLogger(Level LogLevel) *DefaultLogger
Creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.
func (*DefaultLogger) Critical ¶
func (dl *DefaultLogger) Critical(message string, params ...interface{})
Formats a given message according to given params to log with level Critical.
func (*DefaultLogger) Debug ¶
func (dl *DefaultLogger) Debug(message string, params ...interface{})
Formats a given message according to given params to log with level Debug.
func (*DefaultLogger) Error ¶
func (dl *DefaultLogger) Error(message string, params ...interface{})
Formats a given message according to given params to log with level Error.
func (*DefaultLogger) GetLogLevel ¶
func (dl *DefaultLogger) GetLogLevel() LogLevel
func (*DefaultLogger) Info ¶
func (dl *DefaultLogger) Info(message string, params ...interface{})
Formats a given message according to given params to log with level Info.
func (*DefaultLogger) IsAllowed ¶
func (dl *DefaultLogger) IsAllowed(loglevel LogLevel) bool
Formats a given message according to given params to log with level Critical.
func (*DefaultLogger) Trace ¶
func (dl *DefaultLogger) Trace(message string, params ...interface{})
Formats a given message according to given params to log with level Trace.
func (*DefaultLogger) Warn ¶
func (dl *DefaultLogger) Warn(message string, params ...interface{})
Formats a given message according to given params to log with level Warn.
type EmptyEmitter ¶
type EmptyEmitter string
EmptyEmitter implements emitter and ignores all incoming messages. Used not to break anyone.
func NewEmptyEmitter ¶
func NewEmptyEmitter() *EmptyEmitter
NewEmptyEmitter creates a new EmptyEmitter.
func (*EmptyEmitter) Emit ¶
func (e *EmptyEmitter) Emit(*avro.LogLine)
Does nothing. Ignores given message.
type FailedAttemptCallback ¶
type FailedAttemptCallback func(*Task, WorkerResult) FailedDecision
A callback that is triggered when a worker fails to process a single message.
type FailedCallback ¶
type FailedCallback func(*WorkerManager) FailedDecision
A callback that is triggered when a worker fails to process ConsumerConfig.WorkerRetryThreshold messages within ConsumerConfig.WorkerThresholdTimeWindow
type FailedDecision ¶
type FailedDecision int32
Defines what to do when worker fails to process a message.
const ( // Tells the worker manager to ignore the failure and continue normally. CommitOffsetAndContinue FailedDecision = iota // Tells the worker manager to continue processing new messages but not to commit offset that failed. DoNotCommitOffsetAndContinue // Tells the worker manager to commit offset and stop processing the current batch. CommitOffsetAndStop // Tells the worker manager not to commit offset and stop processing the current batch. DoNotCommitOffsetAndStop )
type FailureCounter ¶
type FailureCounter struct {
// contains filtered or unexported fields
}
A counter used to track whether we reached the configurable threshold of failed messages within a given time window.
func NewFailureCounter ¶
func NewFailureCounter(FailedThreshold int32, WorkerThresholdTimeWindow time.Duration) *FailureCounter
Creates a new FailureCounter with threshold FailedThreshold and time window WorkerThresholdTimeWindow.
func (*FailureCounter) Failed ¶
func (f *FailureCounter) Failed() bool
Tells this FailureCounter to increment a number of failures by one. Returns true if threshold is reached, false otherwise.
type GroupWatch ¶
type GroupWatch struct {
// contains filtered or unexported fields
}
type KafkaLogEmitter ¶
type KafkaLogEmitter struct {
// contains filtered or unexported fields
}
KafkaLogEmitter implements LogEmitter and KafkaLogger and sends all structured log data to a Kafka topic encoded as Avro.
func NewKafkaLogEmitter ¶
func NewKafkaLogEmitter(config *KafkaLogEmitterConfig) (*KafkaLogEmitter, error)
NewKafkaLogEmitter creates a new KafkaLogEmitter with a provided configuration.
func (*KafkaLogEmitter) Close ¶
func (k *KafkaLogEmitter) Close()
Close closes the underlying producer. The KafkaLogEmitter won't be usable anymore after call to this.
func (*KafkaLogEmitter) Critical ¶
func (k *KafkaLogEmitter) Critical(message string, params ...interface{})
Critical formats a given message according to given params to log with level Critical and produces to a given Kafka topic.
func (*KafkaLogEmitter) Debug ¶
func (k *KafkaLogEmitter) Debug(message string, params ...interface{})
Debug formats a given message according to given params to log with level Debug and produces to a given Kafka topic.
func (*KafkaLogEmitter) Emit ¶
func (k *KafkaLogEmitter) Emit(logLine *avro.LogLine)
Emit emits a single entry to a given destination topic.
func (*KafkaLogEmitter) Error ¶
func (k *KafkaLogEmitter) Error(message string, params ...interface{})
Error formats a given message according to given params to log with level Error and produces to a given Kafka topic.
func (*KafkaLogEmitter) Info ¶
func (k *KafkaLogEmitter) Info(message string, params ...interface{})
Info formats a given message according to given params to log with level Info and produces to a given Kafka topic.
func (*KafkaLogEmitter) Trace ¶
func (k *KafkaLogEmitter) Trace(message string, params ...interface{})
Trace formats a given message according to given params to log with level Trace and produces to a given Kafka topic.
func (*KafkaLogEmitter) Warn ¶
func (k *KafkaLogEmitter) Warn(message string, params ...interface{})
Warn formats a given message according to given params to log with level Warn and produces to a given Kafka topic.
type KafkaLogEmitterConfig ¶
type KafkaLogEmitterConfig struct { // LogLevel to use for KafkaLogEmitter LogLevel LogLevel // LogLine event source. Used only when called as a Logger interface. Source string // LogLine tags. Used only when called as a logger interface. Tags map[string]string // Topic to emit logs to. Topic string // Confluent Avro schema registry URL. SchemaRegistryUrl string // Producer config that will be used by this emitter. Note that ValueEncoder WILL BE replaced by KafkaAvroEncoder. ProducerConfig *producer.ProducerConfig // Siesta connector config that will be used by this emitter ConnectorConfig *siesta.ConnectorConfig // ProducerCloseTimeout is the maximum time to wait until the producer closes gracefully ProducerCloseTimeout time.Duration }
KafkaLogEmitterConfig provides multiple configuration entries for KafkaLogEmitter.
func NewKafkaLogEmitterConfig ¶
func NewKafkaLogEmitterConfig() *KafkaLogEmitterConfig
NewKafkaLogEmitterConfig creates a new KafkaLogEmitterConfig with log level set to Info.
type KafkaLogger ¶
type KafkaLogger interface { //Formats a given message according to given params to log with level Trace. Trace(message string, params ...interface{}) //Formats a given message according to given params to log with level Debug. Debug(message string, params ...interface{}) //Formats a given message according to given params to log with level Info. Info(message string, params ...interface{}) //Formats a given message according to given params to log with level Warn. Warn(message string, params ...interface{}) //Formats a given message according to given params to log with level Error. Error(message string, params ...interface{}) //Formats a given message according to given params to log with level Critical. Critical(message string, params ...interface{}) GetLogLevel() LogLevel IsAllowed(logLevel LogLevel) bool }
Logger interface. Lets you plug-in your custom logging library instead of using built-in one.
var Logger KafkaLogger = NewDefaultLogger(InfoLevel)
Logger used by this client. Defaults to build-in logger with Info log level.
type KafkaMetricReporter ¶
type KafkaMetricReporter struct {
// contains filtered or unexported fields
}
func NewKafkaMetricReporter ¶
func NewKafkaMetricReporter(topic string, producerConfig *producer.ProducerConfig, connectorConfig *siesta.ConnectorConfig) (*KafkaMetricReporter, error)
type LogEmitter ¶
type LogEmitter interface { // Emit sends a single LogLine to some destination. Emit(*avro.LogLine) // Close closes the given emitter. Close() }
LogEmitter is an interface that handles structured log data.
var EmitterLogs LogEmitter = NewEmptyEmitter()
LogEmitter used by this client. Defaults to empty emitter that ignores all incoming data.
type LogLevel ¶
type LogLevel string
Represents a logging level
const ( //Use TraceLevel for debugging to find problems in functions, variables etc. TraceLevel LogLevel = "trace" //Use DebugLevel for detailed system reports and diagnostic messages. DebugLevel LogLevel = "debug" //Use InfoLevel for general information about a running application. InfoLevel LogLevel = "info" //Use WarnLevel to indicate small errors and failures that should not happen normally but are recovered automatically. WarnLevel LogLevel = "warn" //Use ErrorLevel to indicate severe errors that affect application workflow and are not handled automatically. ErrorLevel LogLevel = "error" //Use CriticalLevel to indicate fatal errors that may cause data corruption or loss. CriticalLevel LogLevel = "critical" )
type LowLevelClient ¶
type LowLevelClient interface { // This will be called right after connecting to ConsumerCoordinator so this client can initialize itself // with bootstrap broker list for example. May return an error to signal this client is unable to work with given configuration. Initialize() error // This will be called each time the fetch request to Kafka should be issued. Topic, partition and offset are self-explanatory. // Should return a slice of Messages and an error if a fetch error occurred. // Note that for performance reasons it makes sense to keep open broker connections and reuse them on every fetch call. Fetch(topic string, partition int32, offset int64) ([]*Message, error) // This will be called when call to Fetch returns an error. As every client has different error mapping we ask here explicitly // what kind of error was returned. GetErrorType(error) ErrorType // This will be called to handle OffsetOutOfRange error. OffsetTime will be either "smallest" or "largest". // Should return a corresponding offset value and an error if it occurred. GetAvailableOffset(topic string, partition int32, offsetTime string) (int64, error) // This will be called to gracefully shutdown this client. Close() }
LowLevelClient is a low-level Kafka client that manages broker connections, responsible to fetch metadata and is able to handle Fetch and Offset requests. TODO not sure that's a good name for this interface
type Message ¶
type Message struct { // Partition key. Key []byte // Message value. Value []byte // Decoded message key DecodedKey interface{} // Decoded message value DecodedValue interface{} // Topic this message came from. Topic string // Partition this message came from. Partition int32 // Message offset. Offset int64 // HighwaterMarkOffset is an offset of the last message in this topic-partition. HighwaterMarkOffset int64 }
Single Kafka message that is sent to user-defined Strategy
type MirrorMaker ¶
type MirrorMaker struct {
// contains filtered or unexported fields
}
MirrorMaker is a tool to mirror source Kafka cluster into a target (mirror) Kafka cluster. It uses a Kafka consumer to consume messages from the source cluster, and re-publishes those messages to the target cluster.
func NewMirrorMaker ¶
func NewMirrorMaker(config *MirrorMakerConfig) *MirrorMaker
Creates a new MirrorMaker using given MirrorMakerConfig.
func (*MirrorMaker) Start ¶
func (this *MirrorMaker) Start()
Starts the MirrorMaker. This method is blocking and should probably be run in a separate goroutine.
type MirrorMakerConfig ¶
type MirrorMakerConfig struct { // Whitelist of topics to mirror. Exactly one whitelist or blacklist is allowed. Whitelist string // Blacklist of topics to mirror. Exactly one whitelist or blacklist is allowed. Blacklist string // Consumer configurations to consume from a source cluster. ConsumerConfigs []string // Embedded producer config. ProducerConfig string // Number of producer instances. NumProducers int // Number of consumption streams. NumStreams int // Flag to preserve partition number. E.g. if message was read from partition 5 it'll be written to partition 5. Note that this can affect performance. PreservePartitions bool // Flag to preserve message order. E.g. message sequence 1, 2, 3, 4, 5 will remain 1, 2, 3, 4, 5 in destination topic. Note that this can affect performance. PreserveOrder bool // Destination topic prefix. E.g. if message was read from topic "test" and prefix is "dc1_" it'll be written to topic "dc1_test". TopicPrefix string // Number of messages that are buffered between the consumer and producer. ChannelSize int // Message keys encoder for producer KeyEncoder producer.Serializer // Message values encoder for producer ValueEncoder producer.Serializer // Message keys decoder for consumer KeyDecoder Decoder // Message values decoder for consumer ValueDecoder Decoder }
MirrorMakerConfig defines configuration options for MirrorMaker
func NewMirrorMakerConfig ¶
func NewMirrorMakerConfig() *MirrorMakerConfig
Creates an empty MirrorMakerConfig.
type OffsetStorage ¶
type OffsetStorage interface { // Gets the offset for a given group, topic and partition. // May return an error if fails to retrieve the offset. GetOffset(group string, topic string, partition int32) (int64, error) // Commits the given offset for a given group, topic and partition. // May return an error if fails to commit the offset. CommitOffset(group string, topic string, partition int32, offset int64) error }
OffsetStorage is used to store and retrieve consumer offsets.
type ProcessingFailedResult ¶
type ProcessingFailedResult struct {
// contains filtered or unexported fields
}
An implementation of WorkerResult interface representing a failure to process incoming message.
func NewProcessingFailedResult ¶
func NewProcessingFailedResult(id TaskId) *ProcessingFailedResult
Creates a new ProcessingFailedResult for given TaskId.
func (*ProcessingFailedResult) Id ¶
func (wr *ProcessingFailedResult) Id() TaskId
Returns an id of task that was processed.
func (*ProcessingFailedResult) String ¶
func (sr *ProcessingFailedResult) String() string
func (*ProcessingFailedResult) Success ¶
func (wr *ProcessingFailedResult) Success() bool
Always returns false for ProcessingFailedResult.
type RoutinePool ¶
type RoutinePool struct {
// contains filtered or unexported fields
}
func NewRoutinePool ¶
func NewRoutinePool(size int) *RoutinePool
func (*RoutinePool) Do ¶
func (rp *RoutinePool) Do(f func())
func (*RoutinePool) Stop ¶
func (rp *RoutinePool) Stop()
type SiestaClient ¶
type SiestaClient struct {
// contains filtered or unexported fields
}
SiestaClient implements LowLevelClient and OffsetStorage and uses github.com/elodina/siesta as underlying implementation.
func NewSiestaClient ¶
func NewSiestaClient(config *ConsumerConfig) *SiestaClient
Creates a new SiestaClient using a given ConsumerConfig.
func (*SiestaClient) CommitOffset ¶
func (this *SiestaClient) CommitOffset(group string, topic string, partition int32, offset int64) error
Commits the given offset for a given group, topic and partition. May return an error if fails to commit the offset.
func (*SiestaClient) Fetch ¶
This will be called each time the fetch request to Kafka should be issued. Topic, partition and offset are self-explanatory. Returns slice of Messages and an error if a fetch error occurred.
func (*SiestaClient) GetAvailableOffset ¶
func (this *SiestaClient) GetAvailableOffset(topic string, partition int32, offsetTime string) (int64, error)
This will be called to handle OffsetOutOfRange error. OffsetTime will be either "smallest" or "largest".
func (*SiestaClient) GetErrorType ¶
func (this *SiestaClient) GetErrorType(err error) ErrorType
Tells the caller what kind of error it is.
func (*SiestaClient) GetOffset ¶
Gets the offset for a given group, topic and partition. May return an error if fails to retrieve the offset.
func (*SiestaClient) Initialize ¶
func (this *SiestaClient) Initialize() error
This will be called right after connecting to ConsumerCoordinator so this client can initialize itself with bootstrap broker list for example. May return an error to signal this client is unable to work with given configuration.
func (*SiestaClient) String ¶
func (this *SiestaClient) String() string
Returns a string representation of this SiestaClient.
type StateSnapshot ¶
type StateSnapshot struct { // Metrics are a map where keys are event names and values are maps holding event values grouped by meters (count, min, max, etc.). Metrics map[string]map[string]float64 // Offsets are a map where keys are topics and values are maps where keys are partitions and values are offsets for these topic-partitions. Offsets map[string]map[int32]int64 }
Represents a consumer state snapshot.
type StaticTopicsToNumStreams ¶
type StaticTopicsToNumStreams struct { // Consumer id string. ConsumerId string // Map where keys are topic names and values are number of fetcher routines responsible for processing these topics. TopicsToNumStreamsMap map[string]int }
TopicsToNumStreams implementation representing a static consumer subscription.
func (*StaticTopicsToNumStreams) GetConsumerThreadIdsPerTopic ¶
func (tc *StaticTopicsToNumStreams) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId
Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.
func (*StaticTopicsToNumStreams) GetTopicsToNumStreamsMap ¶
func (tc *StaticTopicsToNumStreams) GetTopicsToNumStreamsMap() map[string]int
Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.
func (*StaticTopicsToNumStreams) Pattern ¶
func (tc *StaticTopicsToNumStreams) Pattern() string
Returns a pattern describing this TopicsToNumStreams.
type StringDecoder ¶
type StringDecoder struct{}
func (*StringDecoder) Decode ¶
func (this *StringDecoder) Decode(bytes []byte) (interface{}, error)
type StringEncoder ¶
type StringEncoder struct{}
func (*StringEncoder) Encode ¶
func (this *StringEncoder) Encode(what interface{}) ([]byte, error)
type SuccessfulResult ¶
type SuccessfulResult struct {
// contains filtered or unexported fields
}
An implementation of WorkerResult interface representing a successfully processed incoming message.
func NewSuccessfulResult ¶
func NewSuccessfulResult(id TaskId) *SuccessfulResult
Creates a new SuccessfulResult for given TaskId.
func (*SuccessfulResult) Id ¶
func (wr *SuccessfulResult) Id() TaskId
Returns an id of task that was processed.
func (*SuccessfulResult) String ¶
func (sr *SuccessfulResult) String() string
func (*SuccessfulResult) Success ¶
func (wr *SuccessfulResult) Success() bool
Always returns true for SuccessfulResult.
type Task ¶
type Task struct { // A message that should be processed. Msg *Message // Number of retries used for this task. Retries int // A worker that is responsible for processing this task. Callee *Worker }
Represents a single task for a worker.
type TaskAndStrategy ¶
type TaskAndStrategy struct { WorkerTask *Task Strategy WorkerStrategy }
type TaskId ¶
type TaskId struct { // Message's topic and partition TopicPartition TopicAndPartition // Message's offset Offset int64 }
Type representing a task id. Consists from topic, partition and offset of a message being processed.
type TimedOutResult ¶
type TimedOutResult struct {
// contains filtered or unexported fields
}
An implementation of WorkerResult interface representing a timeout to process incoming message.
func (*TimedOutResult) Id ¶
func (wr *TimedOutResult) Id() TaskId
Returns an id of task that was processed.
func (*TimedOutResult) String ¶
func (sr *TimedOutResult) String() string
func (*TimedOutResult) Success ¶
func (wr *TimedOutResult) Success() bool
Always returns false for TimedOutResult.
type TopicAndPartition ¶
Type representing a single Kafka topic and partition
func (*TopicAndPartition) String ¶
func (tp *TopicAndPartition) String() string
type TopicFilter ¶
type TopicFilter interface { Regex() string TopicAllowed(topic string, excludeInternalTopics bool) bool }
Either a WhiteList or BlackList consumer topic filter.
type TopicsToNumStreams ¶
type TopicsToNumStreams interface { //Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics. GetTopicsToNumStreamsMap() map[string]int //Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics. GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId //Returns a pattern describing this TopicsToNumStreams. Pattern() string }
Information on Consumer subscription. Used to keep it in consumer coordinator.
func NewStaticTopicsToNumStreams ¶
func NewStaticTopicsToNumStreams(consumerId string, topics string, pattern string, numStreams int, excludeInternalTopics bool, coordinator ConsumerCoordinator) TopicsToNumStreams
Constructs a new TopicsToNumStreams for consumer with Consumerid id that works within consumer group Groupid. Uses Coordinator to get consumer information. Returns error if fails to retrieve consumer information from Coordinator.
func NewTopicsToNumStreams ¶
func NewTopicsToNumStreams(Groupid string, Consumerid string, Coordinator ConsumerCoordinator, ExcludeInternalTopics bool) (TopicsToNumStreams, error)
Constructs a new TopicsToNumStreams for consumer with Consumerid id that works within consumer group Groupid. Uses Coordinator to get consumer information. Returns error if fails to retrieve consumer information from Coordinator.
type WhiteList ¶
type WhiteList struct {
// contains filtered or unexported fields
}
WhiteList is a topic filter that will match every topic for a given regex
func NewWhiteList ¶
Creates a new WhiteList topic filter for a given regex
type WildcardTopicsToNumStreams ¶
type WildcardTopicsToNumStreams struct { Coordinator ConsumerCoordinator ConsumerId string TopicFilter TopicFilter NumStreams int ExcludeInternalTopics bool }
TopicsToNumStreams implementation representing either whitelist or blacklist consumer subscription.
func (*WildcardTopicsToNumStreams) GetConsumerThreadIdsPerTopic ¶
func (tc *WildcardTopicsToNumStreams) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId
Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.
func (*WildcardTopicsToNumStreams) GetTopicsToNumStreamsMap ¶
func (tc *WildcardTopicsToNumStreams) GetTopicsToNumStreamsMap() map[string]int
Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.
func (*WildcardTopicsToNumStreams) Pattern ¶
func (tc *WildcardTopicsToNumStreams) Pattern() string
Returns a pattern describing this TopicsToNumStreams.
type Worker ¶
type Worker struct { // Channel to write tasks to. InputChannel chan *TaskAndStrategy // Channel to write processing results to. OutputChannel chan WorkerResult // Intermediate channel for pushing result to strategy handler HandlerInputChannel chan *TaskAndStrategy // Intermediate channel for pushing result from strategy handler HandlerOutputChannel chan WorkerResult // Timeout for a single worker task. TaskTimeout time.Duration // Indicates whether this worker is closed and cannot accept new work. Closed bool }
Represents a worker that is able to process a single message.
type WorkerManager ¶
type WorkerManager struct {
// contains filtered or unexported fields
}
WorkerManager is responsible for splitting the incomming batches of messages between a configured amount of workers. It also keeps track of highest processed offsets and commits them to offset storage with a configurable frequency.
func NewWorkerManager ¶
func NewWorkerManager(id string, config *ConsumerConfig, topicPartition TopicAndPartition, metrics *ConsumerMetrics, closeConsumer chan bool) *WorkerManager
Creates a new WorkerManager with given id using a given ConsumerConfig and responsible for managing given TopicAndPartition.
func (*WorkerManager) GetLargestOffset ¶
func (wm *WorkerManager) GetLargestOffset() int64
Gets the highest offset that has been processed by this WorkerManager.
func (*WorkerManager) IsBatchProcessed ¶
func (wm *WorkerManager) IsBatchProcessed() bool
Asks this WorkerManager whether the current batch is fully processed. Returns true if so, false otherwise.
func (*WorkerManager) Start ¶
func (wm *WorkerManager) Start()
Starts processing incoming batches with this WorkerManager. Processing is possible only in batch-at-once mode. It also launches an offset committer routine. Call to this method blocks.
func (*WorkerManager) Stop ¶
func (wm *WorkerManager) Stop() chan bool
Tells this WorkerManager to finish processing current batch, stop accepting new work and shut down. This method returns immediately and returns a channel which will get the value once the shut down is finished.
func (*WorkerManager) String ¶
func (wm *WorkerManager) String() string
func (*WorkerManager) UpdateLargestOffset ¶
func (wm *WorkerManager) UpdateLargestOffset(offset int64)
Updates the highest offset that has been processed by this WorkerManager with a new value.
type WorkerResult ¶
type WorkerResult interface { // Returns an id of task that was processed. Id() TaskId // Returns true if processing succeeded, false otherwise. Success() bool }
Interface that represents a result of processing incoming message.
type WorkerStrategy ¶
type WorkerStrategy func(*Worker, *Message, TaskId) WorkerResult
Defines what to do with a single Kafka message. Returns a WorkerResult to distinguish successful and unsuccessful processings.
type ZookeeperConfig ¶
type ZookeeperConfig struct { /* Zookeeper hosts */ ZookeeperConnect []string /* Zookeeper session timeout */ ZookeeperSessionTimeout time.Duration /* Max retries for any request except CommitOffset. CommitOffset is controlled by ConsumerConfig.OffsetsCommitMaxRetries. */ MaxRequestRetries int /* Backoff to retry any request */ RequestBackoff time.Duration /* kafka Root */ Root string // PanicHandler is a function that will be called when unrecoverable error occurs to give the possibility to perform cleanups, recover from panic etc PanicHandler func(error) }
ZookeeperConfig is used to pass multiple configuration entries to ZookeeperCoordinator.
func NewZookeeperConfig ¶
func NewZookeeperConfig() *ZookeeperConfig
Created a new ZookeeperConfig with sane defaults. Default ZookeeperConnect points to localhost.
func ZookeeperConfigFromFile ¶
func ZookeeperConfigFromFile(filename string) (*ZookeeperConfig, error)
ZookeeperConfigFromFile is a helper function that loads zookeeper configuration information from file. The file accepts the following fields:
zookeeper.connect zookeeper.kafka.root zookeeper.connection.timeout zookeeper.max.request.retries zookeeper.request.backoff
The configuration file entries should be constructed in key=value syntax. A # symbol at the beginning of a line indicates a comment. Blank lines are ignored. The file should end with a newline character.
type ZookeeperCoordinator ¶
type ZookeeperCoordinator struct {
// contains filtered or unexported fields
}
ZookeeperCoordinator implements ConsumerCoordinator and OffsetStorage interfaces and is used to coordinate multiple consumers that work within the same consumer group as well as storing and retrieving their offsets.
func NewZookeeperCoordinator ¶
func NewZookeeperCoordinator(Config *ZookeeperConfig) *ZookeeperCoordinator
Creates a new ZookeeperCoordinator with a given configuration. The new created ZookeeperCoordinator does NOT automatically connect to zookeeper, you should call Connect() explicitly
func (*ZookeeperCoordinator) AwaitOnStateBarrier ¶
func (*ZookeeperCoordinator) ClaimPartitionOwnership ¶
func (this *ZookeeperCoordinator) ClaimPartitionOwnership(Groupid string, Topic string, Partition int32, consumerThreadId ConsumerThreadId) (bool, error)
Tells the ConsumerCoordinator to claim partition topic Topic and partition Partition for consumerThreadId fetcher that works within a consumer group Group. Returns true if claim is successful, false and error explaining failure otherwise.
func (*ZookeeperCoordinator) CommitOffset ¶
func (this *ZookeeperCoordinator) CommitOffset(Groupid string, Topic string, Partition int32, Offset int64) error
Tells the ConsumerCoordinator to commit offset Offset for topic and partition TopicPartition for consumer group Groupid. Returns error if failed to commit offset.
func (*ZookeeperCoordinator) Connect ¶
func (this *ZookeeperCoordinator) Connect() (err error)
Establish connection to this ConsumerCoordinator. Returns an error if fails to connect, nil otherwise.
func (*ZookeeperCoordinator) DeregisterConsumer ¶
func (this *ZookeeperCoordinator) DeregisterConsumer(Consumerid string, Groupid string) (err error)
Deregisters consumer with Consumerid id that is a part of consumer group Groupid form this ConsumerCoordinator. Returns an error if deregistration failed, nil otherwise.
func (*ZookeeperCoordinator) Disconnect ¶
func (this *ZookeeperCoordinator) Disconnect()
func (*ZookeeperCoordinator) GetAllBrokers ¶
func (this *ZookeeperCoordinator) GetAllBrokers() (brokers []*BrokerInfo, err error)
Gets the information about all Kafka brokers registered in this ConsumerCoordinator. Returns a slice of BrokerInfo and error on failure.
func (*ZookeeperCoordinator) GetAllTopics ¶
func (this *ZookeeperCoordinator) GetAllTopics() (topics []string, err error)
Gets the list of all topics registered in this ConsumerCoordinator. Returns a slice conaining topic names and error on failure.
func (*ZookeeperCoordinator) GetBlueGreenRequest ¶
func (this *ZookeeperCoordinator) GetBlueGreenRequest(Group string) (topics map[string]*BlueGreenDeployment, err error)
Gets all deployed topics for consume group Group from consumer coordinator. Returns a map where keys are notification ids and values are DeployedTopics. May also return an error (e.g. if failed to reach coordinator).
func (*ZookeeperCoordinator) GetConsumerInfo ¶
func (this *ZookeeperCoordinator) GetConsumerInfo(Consumerid string, Groupid string) (info *ConsumerInfo, err error)
Gets the information about consumer with Consumerid id that is a part of consumer group Groupid from this ConsumerCoordinator. Returns ConsumerInfo on success and error otherwise (For example if consumer with given Consumerid does not exist).
func (*ZookeeperCoordinator) GetConsumersInGroup ¶
func (this *ZookeeperCoordinator) GetConsumersInGroup(Groupid string) (consumers []string, err error)
Gets the list of all consumer ids within a consumer group Groupid. Returns a slice containing all consumer ids in group and error on failure.
func (*ZookeeperCoordinator) GetConsumersPerTopic ¶
func (this *ZookeeperCoordinator) GetConsumersPerTopic(Groupid string, ExcludeInternalTopics bool) (consumers map[string][]ConsumerThreadId, err error)
Gets the information about consumers per topic in consumer group Groupid excluding internal topics (such as offsets) if ExcludeInternalTopics = true. Returns a map where keys are topic names and values are slices of consumer ids and fetcher ids associated with this topic and error on failure.
func (*ZookeeperCoordinator) GetOffset ¶
func (this *ZookeeperCoordinator) GetOffset(Groupid string, topic string, partition int32) (offset int64, err error)
Gets the offset for a given topic, partition and consumer group. Returns offset on sucess, error otherwise.
func (*ZookeeperCoordinator) GetPartitionsForTopics ¶
func (this *ZookeeperCoordinator) GetPartitionsForTopics(Topics []string) (partitions map[string][]int32, err error)
Gets the information about existing partitions for a given Topics. Returns a map where keys are topic names and values are slices of partition ids associated with this topic and error on failure.
func (*ZookeeperCoordinator) RegisterConsumer ¶
func (this *ZookeeperCoordinator) RegisterConsumer(Consumerid string, Groupid string, TopicCount TopicsToNumStreams) (err error)
Registers a new consumer with Consumerid id and TopicCount subscription that is a part of consumer group Groupid in this ConsumerCoordinator. Returns an error if registration failed, nil otherwise.
func (*ZookeeperCoordinator) ReleasePartitionOwnership ¶
func (this *ZookeeperCoordinator) ReleasePartitionOwnership(Groupid string, Topic string, Partition int32) error
Tells the ConsumerCoordinator to release partition ownership on topic Topic and partition Partition for consumer group Groupid. Returns error if failed to released partition ownership.
func (*ZookeeperCoordinator) RemoveOldApiRequests ¶
func (this *ZookeeperCoordinator) RemoveOldApiRequests(group string) (err error)
func (*ZookeeperCoordinator) RemoveStateBarrier ¶
func (this *ZookeeperCoordinator) RemoveStateBarrier(group string, stateHash string, api string) error
func (*ZookeeperCoordinator) RequestBlueGreenDeployment ¶
func (this *ZookeeperCoordinator) RequestBlueGreenDeployment(blue BlueGreenDeployment, green BlueGreenDeployment) error
func (*ZookeeperCoordinator) String ¶
func (this *ZookeeperCoordinator) String() string
func (*ZookeeperCoordinator) SubscribeForChanges ¶
func (this *ZookeeperCoordinator) SubscribeForChanges(Groupid string) (events <-chan CoordinatorEvent, err error)
Subscribes for any change that should trigger consumer rebalance on consumer group Groupid in this ConsumerCoordinator. Returns a read-only channel of booleans that will get values on any significant coordinator event (e.g. new consumer appeared, new broker appeared etc.) and error if failed to subscribe.
func (*ZookeeperCoordinator) Unsubscribe ¶
func (this *ZookeeperCoordinator) Unsubscribe()
Tells the ConsumerCoordinator to unsubscribe from events for the consumer it is associated with.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Godeps
|
|
_workspace/src/github.com/Shopify/sarama
Package sarama provides client libraries for the Kafka 0.8 protocol.
|
Package sarama provides client libraries for the Kafka 0.8 protocol. |
_workspace/src/github.com/Shopify/sarama/mocks
Package mocks provides mocks that can be used for testing applications that use Sarama.
|
Package mocks provides mocks that can be used for testing applications that use Sarama. |
_workspace/src/github.com/cihub/seelog
Package seelog implements logging functionality with flexible dispatching, filtering, and formatting.
|
Package seelog implements logging functionality with flexible dispatching, filtering, and formatting. |
_workspace/src/github.com/eapache/go-resiliency/breaker
Package breaker implements the circuit-breaker resiliency pattern for Go.
|
Package breaker implements the circuit-breaker resiliency pattern for Go. |
_workspace/src/github.com/eapache/queue
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki.
|
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki. |
_workspace/src/github.com/golang/snappy
Package snappy implements the snappy block-based compression format.
|
Package snappy implements the snappy block-based compression format. |
_workspace/src/github.com/rcrowley/go-metrics
Go port of Coda Hale's Metrics library <https://github.com/rcrowley/go-metrics> Coda Hale's original work: <https://github.com/codahale/metrics>
|
Go port of Coda Hale's Metrics library <https://github.com/rcrowley/go-metrics> Coda Hale's original work: <https://github.com/codahale/metrics> |
_workspace/src/github.com/rcrowley/go-metrics/exp
Hook go-metrics into expvar on any /debug/metrics request, load all vars from the registry into expvar, and execute regular expvar handler
|
Hook go-metrics into expvar on any /debug/metrics request, load all vars from the registry into expvar, and execute regular expvar handler |
_workspace/src/github.com/rcrowley/go-metrics/stathat
Metrics output to StatHat.
|
Metrics output to StatHat. |
_workspace/src/github.com/samuel/go-zookeeper/zk
Package zk is a native Go client library for the ZooKeeper orchestration service.
|
Package zk is a native Go client library for the ZooKeeper orchestration service. |
_workspace/src/gopkg.in/cihub/seelog.v2
Package seelog implements logging functionality with flexible dispatching, filtering, and formatting.
|
Package seelog implements logging functionality with flexible dispatching, filtering, and formatting. |