Documentation ¶
Overview ¶
Package go_kafka_client provides a high-level Kafka consumer implementation and introduces different approach than Java/Scala high-level consumer. Primary differences include: - workers concept enforcing at least once processing before committing offsets; - improved rebalancing algorithm - closes obsolete connections and opens new connections without stopping the whole consumer; - supports graceful shutdown notifying client when it is over; - batch processing; - supports static partitions configuration allowing to start a consumer with a predefined set of partitions never caring about rebalancing;
*
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at *
- http://www.apache.org/licenses/LICENSE-2.0 *
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
Index ¶
- Constants
- Variables
- func CreateMultiplePartitionsTopic(zk string, topicName string, numPartitions int)
- func Critical(contextName interface{}, message interface{})
- func Criticalf(contextName interface{}, message interface{}, params ...interface{})
- func Debug(contextName interface{}, message interface{})
- func Debugf(contextName interface{}, message interface{}, params ...interface{})
- func Error(contextName interface{}, message interface{})
- func Errorf(contextName interface{}, message interface{}, params ...interface{})
- func Info(contextName interface{}, message interface{})
- func Infof(contextName interface{}, message interface{}, params ...interface{})
- func LoadConfiguration(path string) (map[string]string, error)
- func Trace(contextName interface{}, message interface{})
- func Tracef(contextName interface{}, message interface{}, params ...interface{})
- func Warn(contextName interface{}, message interface{})
- func Warnf(contextName interface{}, message interface{}, params ...interface{})
- type BlackList
- type BrokerInfo
- type Consumer
- type ConsumerConfig
- type ConsumerCoordinator
- type ConsumerInfo
- type ConsumerThreadId
- type CoordinatorEvent
- type DeployedTopics
- type FailedAttemptCallback
- type FailedCallback
- type FailedDecision
- type FailureCounter
- type Message
- type ProcessingFailedResult
- type StaticTopicsToNumStreams
- type SuccessfulResult
- type Task
- type TaskId
- type TestKafkaCluster
- type TestKafkaServer
- type TimedOutResult
- type TopicAndPartition
- type TopicFilter
- type TopicInfo
- type TopicPartitionData
- type TopicSwitch
- type TopicsToNumStreams
- type WhiteList
- type WildcardTopicsToNumStreams
- type Worker
- type WorkerManager
- type WorkerResult
- type WorkerStrategy
- type ZookeeperConfig
- type ZookeeperCoordinator
- func (zc *ZookeeperCoordinator) ClaimPartitionOwnership(Groupid string, Topic string, Partition int32, ...) (bool, error)
- func (zc *ZookeeperCoordinator) CommitOffset(Groupid string, TopicPartition *TopicAndPartition, Offset int64) error
- func (zc *ZookeeperCoordinator) Connect() error
- func (zc *ZookeeperCoordinator) DeployTopics(Group string, Topics DeployedTopics) error
- func (zc *ZookeeperCoordinator) DeregisterConsumer(Consumerid string, Groupid string) error
- func (zc *ZookeeperCoordinator) GetAllBrokers() ([]*BrokerInfo, error)
- func (zc *ZookeeperCoordinator) GetAllTopics() ([]string, error)
- func (zc *ZookeeperCoordinator) GetConsumerInfo(Consumerid string, Groupid string) (*ConsumerInfo, error)
- func (zc *ZookeeperCoordinator) GetConsumersInGroup(Groupid string) ([]string, error)
- func (zc *ZookeeperCoordinator) GetConsumersPerTopic(Groupid string, ExcludeInternalTopics bool) (map[string][]ConsumerThreadId, error)
- func (zc *ZookeeperCoordinator) GetNewDeployedTopics(Group string) ([]*DeployedTopics, error)
- func (zc *ZookeeperCoordinator) GetOffsetForTopicPartition(Groupid string, TopicPartition *TopicAndPartition) (int64, error)
- func (zc *ZookeeperCoordinator) GetPartitionsForTopics(Topics []string) (map[string][]int32, error)
- func (zc *ZookeeperCoordinator) NotifyConsumerGroup(Groupid string, ConsumerId string) error
- func (zc *ZookeeperCoordinator) RegisterConsumer(Consumerid string, Groupid string, TopicCount TopicsToNumStreams) error
- func (zc *ZookeeperCoordinator) ReleasePartitionOwnership(Groupid string, Topic string, Partition int32) error
- func (zc *ZookeeperCoordinator) String() string
- func (zc *ZookeeperCoordinator) SubscribeForChanges(Groupid string) (<-chan CoordinatorEvent, error)
- func (zc *ZookeeperCoordinator) Unsubscribe()
Constants ¶
const ( InvalidOffset int64 = -1 //Reset the offset to the smallest offset if it is out of range SmallestOffset = "smallest" //Reset the offset to the largest offset if it is out of range LargestOffset = "largest" //Zookeeper offset storage configuration string ZookeeperOffsetStorage = "zookeeper" //Kafka offset storage configuration string KafkaOffsetStorage = "kafka" )
const ( /* Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1 and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1 */ RangeStrategy = "range" /* The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.) (For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance and thread-id within that instance. Therefore, round-robin assignment is allowed only if: a) Every topic has the same number of streams within a consumer instance b) The set of subscribed topics is identical for every consumer instance within the group. */ RoundRobinStrategy = "roundrobin" )
const ( Regular = "Regular" NewTopicDeployed = "NewTopicDeployed" )
Variables ¶
var Logger, _ = log.LoggerFromConfigAsBytes([]byte(config))
Functions ¶
func CreateMultiplePartitionsTopic ¶
Convenience utility to create a topic topicName with numPartitions partitions in Zookeeper located at zk (format should be host:port). Please note that this requires Apache Kafka 0.8.1 binary distribution available through KAFKA_PATH environment variable
func Criticalf ¶
func Criticalf(contextName interface{}, message interface{}, params ...interface{})
func LoadConfiguration ¶
TODO we need a file -> ConsumerConfig parser, not a file -> map one
Types ¶
type BlackList ¶
type BlackList struct {
// contains filtered or unexported fields
}
BlackList is a topic filter that will match every topic that does not match a given regex
func NewBlackList ¶
Creates a new BlackList topic filter for a given regex
type BrokerInfo ¶
General information about Kafka broker. Used to keep it in consumer coordinator.
func (*BrokerInfo) String ¶
func (b *BrokerInfo) String() string
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a high-level Kafka consumer designed to work within a consumer group. It subscribes to coordinator events and is able to balance load within a consumer group.
func NewConsumer ¶
func NewConsumer(config *ConsumerConfig) *Consumer
NewConsumer creates a new Consumer with a given configuration. Creating a Consumer does not start fetching immediately.
func (*Consumer) Close ¶
Tells the Consumer to close all existing connections and stop. This method is NOT blocking but returns a channel which will get a single value once the closing is finished.
func (*Consumer) StartStatic ¶
Starts consuming specified topics using a configured amount of goroutines for each topic.
func (*Consumer) StartStaticPartitions ¶
Starts consuming given topic-partitions using ConsumerConfig.NumConsumerFetchers goroutines for each topic.
func (*Consumer) StartWildcard ¶
func (c *Consumer) StartWildcard(topicFilter TopicFilter, numStreams int)
Starts consuming all topics which correspond to a given topicFilter using numStreams goroutines for each topic.
type ConsumerConfig ¶
type ConsumerConfig struct { /* A string that uniquely identifies a set of consumers within the same consumer group */ Groupid string /* A string that uniquely identifies a consumer within a group. Generated automatically if not set. Set this explicitly for only testing purpose. */ Consumerid string /* The socket timeout for network requests. Its value should be at least FetchWaitMaxMs. */ SocketTimeout time.Duration /* The maximum number of bytes to attempt to fetch */ FetchMessageMaxBytes int32 /* The number of goroutines used to fetch data */ NumConsumerFetchers int /* Max number of message batches buffered for consumption, each batch can be up to FetchBatchSize */ QueuedMaxMessages int32 /* Max number of retries during rebalance */ RebalanceMaxRetries int32 /* The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */ FetchMinBytes int32 /* The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy FetchMinBytes */ FetchWaitMaxMs int32 /* Backoff time between retries during rebalance */ RebalanceBackoff time.Duration /* Backoff time to refresh the leader of a partition after it loses the current leader */ RefreshLeaderBackoff time.Duration /* Retry the offset commit up to this many times on failure. */ OffsetsCommitMaxRetries int /* Try to commit offset every OffsetCommitInterval. If previous offset commit for a partition is still in progress updates the next offset to commit and continues. This way it does not commit all the offset history if the coordinator is slow, but only the highest offsets. */ OffsetCommitInterval time.Duration /* Specify whether offsets should be committed to "zookeeper" (default) or "kafka". */ OffsetsStorage string /* What to do if an offset is out of range. SmallestOffset : automatically reset the offset to the smallest offset. LargestOffset : automatically reset the offset to the largest offset. Defaults to LargestOffset. */ AutoOffsetReset string /* Client id is specified by the kafka consumer client, used to distinguish different clients. */ Clientid string /* Whether messages from internal topics (such as offsets) should be exposed to the consumer. */ ExcludeInternalTopics bool /* Select a strategy for assigning partitions to consumer streams. Possible values: RangeStrategy, RoundRobinStrategy */ PartitionAssignmentStrategy string /* Amount of workers per partition to process consumed messages. */ NumWorkers int /* Times to retry processing a failed message by a worker. */ MaxWorkerRetries int /* Worker retry threshold. Increments each time a worker fails to process a message within MaxWorkerRetries. When this threshold is hit within a WorkerThresholdTimeWindow, WorkerFailureCallback is called letting the user to decide whether the consumer should stop. */ WorkerRetryThreshold int32 /* Resets WorkerRetryThreshold if it isn't hit within this period. */ WorkerThresholdTimeWindow time.Duration /* Callback executed when WorkerRetryThreshold exceeded within WorkerThresholdTimeWindow */ WorkerFailureCallback FailedCallback /* Callback executed when Worker failed to process the message after MaxWorkerRetries and WorkerRetryThreshold is not hit */ WorkerFailedAttemptCallback FailedAttemptCallback /* Worker timeout to process a single message. */ WorkerTaskTimeout time.Duration /* Backoff between worker attempts to process a single message. */ WorkerBackoff time.Duration /* Maximum wait time to gracefully stop a worker manager */ WorkerManagersStopTimeout time.Duration /* A function which defines a user-specified action on a single message. This function is responsible for actual message processing. Consumer panics if Strategy is not set. */ Strategy WorkerStrategy /* Number of messages to accumulate before flushing them to workers */ FetchBatchSize int /* Timeout to accumulate messages. Flushes accumulated batch to workers even if it is not yet full. Resets after each flush meaning this won't be triggered if FetchBatchSize is reached before timeout. */ FetchBatchTimeout time.Duration /* Backoff between fetch requests if no messages were fetched from a previous fetch. */ RequeueAskNextBackoff time.Duration /* Maximum fetch retries if no messages were fetched from a previous fetch */ FetchMaxRetries int /* Maximum retries to fetch topic metadata from one broker. */ FetchTopicMetadataRetries int /* Backoff for fetch topic metadata request if the previous request failed. */ FetchTopicMetadataBackoff time.Duration /* Backoff between two fetch requests for one fetch routine. Needed to prevent fetcher from querying the broker too frequently. */ FetchRequestBackoff time.Duration /* Coordinator used to coordinate consumer's actions, e.g. trigger rebalance events, store offsets and consumer metadata etc. */ Coordinator ConsumerCoordinator /* Indicates whether the client supports blue-green deployment. This config entry is needed because blue-green deployment won't work with RoundRobin partition assignment strategy. Defaults to true. */ BlueGreenDeploymentEnabled bool }
ConsumerConfig defines configuration options for Consumer
func DefaultConsumerConfig ¶
func DefaultConsumerConfig() *ConsumerConfig
DefaultConsumerConfig creates a ConsumerConfig with sane defaults. Note that several required config entries (like Strategy and callbacks) are still not set.
func (*ConsumerConfig) String ¶
func (c *ConsumerConfig) String() string
func (*ConsumerConfig) Validate ¶
func (c *ConsumerConfig) Validate() error
Validates this ConsumerConfig. Returns a corresponding error if the ConsumerConfig is invalid and nil otherwise.
type ConsumerCoordinator ¶
type ConsumerCoordinator interface { /* Establish connection to this ConsumerCoordinator. Returns an error if fails to connect, nil otherwise. */ Connect() error /* Registers a new consumer with Consumerid id and TopicCount subscription that is a part of consumer group Group in this ConsumerCoordinator. Returns an error if registration failed, nil otherwise. */ RegisterConsumer(Consumerid string, Group string, TopicCount TopicsToNumStreams) error /* Deregisters consumer with Consumerid id that is a part of consumer group Group form this ConsumerCoordinator. Returns an error if deregistration failed, nil otherwise. */ DeregisterConsumer(Consumerid string, Group string) error /* Gets the information about consumer with Consumerid id that is a part of consumer group Group from this ConsumerCoordinator. Returns ConsumerInfo on success and error otherwise (For example if consumer with given Consumerid does not exist). */ GetConsumerInfo(Consumerid string, Group string) (*ConsumerInfo, error) /* Gets the information about consumers per topic in consumer group Group excluding internal topics (such as offsets) if ExcludeInternalTopics = true. Returns a map where keys are topic names and values are slices of consumer ids and fetcher ids associated with this topic and error on failure. */ GetConsumersPerTopic(Group string, ExcludeInternalTopics bool) (map[string][]ConsumerThreadId, error) /* Gets the list of all consumer ids within a consumer group Group. Returns a slice containing all consumer ids in group and error on failure. */ GetConsumersInGroup(Group string) ([]string, error) /* Gets the list of all topics registered in this ConsumerCoordinator. Returns a slice conaining topic names and error on failure. */ GetAllTopics() ([]string, error) /* Gets the information about existing partitions for a given Topics. Returns a map where keys are topic names and values are slices of partition ids associated with this topic and error on failure. */ GetPartitionsForTopics(Topics []string) (map[string][]int32, error) /* Gets the information about all Kafka brokers registered in this ConsumerCoordinator. Returns a slice of BrokerInfo and error on failure. */ GetAllBrokers() ([]*BrokerInfo, error) /* Gets the offset for a given TopicPartition and consumer group Group. Returns offset on sucess, error otherwise. */ GetOffsetForTopicPartition(Group string, TopicPartition *TopicAndPartition) (int64, error) //TODO not sure if we still need this NotifyConsumerGroup(Group string, ConsumerId string) error /* Subscribes for any change that should trigger consumer rebalance on consumer group Group in this ConsumerCoordinator or trigger topic switch. Returns a read-only channel of CoordinatorEvent that will get values on any significant coordinator event (e.g. new consumer appeared, new broker appeared etc.) and error if failed to subscribe. */ SubscribeForChanges(Group string) (<-chan CoordinatorEvent, error) GetNewDeployedTopics(Group string) ([]*DeployedTopics, error) /* Tells the ConsumerCoordinator to unsubscribe from events for the consumer it is associated with. */ Unsubscribe() /* Tells the ConsumerCoordinator to claim partition topic Topic and partition Partition for ConsumerThreadId fetcher that works within a consumer group Group. Returns true if claim is successful, false and error explaining failure otherwise. */ ClaimPartitionOwnership(Group string, Topic string, Partition int32, ConsumerThreadId ConsumerThreadId) (bool, error) /* Tells the ConsumerCoordinator to release partition ownership on topic Topic and partition Partition for consumer group Group. Returns error if failed to released partition ownership. */ ReleasePartitionOwnership(Group string, Topic string, Partition int32) error /* Tells the ConsumerCoordinator to commit offset Offset for topic and partition TopicPartition for consumer group Group. Returns error if failed to commit offset. */ CommitOffset(Group string, TopicPartition *TopicAndPartition, Offset int64) error }
ConsumerCoordinator is used to coordinate actions of multiple consumers within the same consumer group. It is responsible for keeping track of alive consumers, manages their offsets and assigns partitions to consume. The current default ConsumerCoordinator is ZookeeperCoordinator. More of them can be added in future.
type ConsumerInfo ¶
type ConsumerInfo struct { Version int16 Subscription map[string]int Pattern string Timestamp int64 }
General information about Kafka consumer. Used to keep it in consumer coordinator.
func (*ConsumerInfo) String ¶
func (c *ConsumerInfo) String() string
type ConsumerThreadId ¶
Consumer routine id. Used to keep track of what consumer routine consumes a particular topic-partition in consumer coordinator.
func (*ConsumerThreadId) String ¶
func (c *ConsumerThreadId) String() string
type CoordinatorEvent ¶
type CoordinatorEvent string
type DeployedTopics ¶
type FailedAttemptCallback ¶
type FailedAttemptCallback func(*Task, WorkerResult) FailedDecision
type FailedCallback ¶
type FailedCallback func(*WorkerManager) FailedDecision
type FailedDecision ¶
type FailedDecision int32
const ( CommitOffsetAndContinue FailedDecision = iota DoNotCommitOffsetAndContinue CommitOffsetAndStop DoNotCommitOffsetAndStop )
type FailureCounter ¶
type FailureCounter struct {
// contains filtered or unexported fields
}
func NewFailureCounter ¶
func NewFailureCounter(FailedThreshold int32, WorkerThresholdTimeWindow time.Duration) *FailureCounter
func (*FailureCounter) Failed ¶
func (f *FailureCounter) Failed() bool
type ProcessingFailedResult ¶
type ProcessingFailedResult struct {
// contains filtered or unexported fields
}
func NewProcessingFailedResult ¶
func NewProcessingFailedResult(id TaskId) *ProcessingFailedResult
func (*ProcessingFailedResult) Id ¶
func (wr *ProcessingFailedResult) Id() TaskId
func (*ProcessingFailedResult) String ¶
func (sr *ProcessingFailedResult) String() string
func (*ProcessingFailedResult) Success ¶
func (wr *ProcessingFailedResult) Success() bool
type StaticTopicsToNumStreams ¶
TopicsToNumStreams implementation representing a static consumer subscription.
func (*StaticTopicsToNumStreams) GetConsumerThreadIdsPerTopic ¶
func (tc *StaticTopicsToNumStreams) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId
Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.
func (*StaticTopicsToNumStreams) GetTopicsToNumStreamsMap ¶
func (tc *StaticTopicsToNumStreams) GetTopicsToNumStreamsMap() map[string]int
Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.
func (*StaticTopicsToNumStreams) Pattern ¶
func (tc *StaticTopicsToNumStreams) Pattern() string
Returns a pattern describing this TopicsToNumStreams.
type SuccessfulResult ¶
type SuccessfulResult struct {
// contains filtered or unexported fields
}
func NewSuccessfulResult ¶
func NewSuccessfulResult(id TaskId) *SuccessfulResult
func (*SuccessfulResult) Id ¶
func (wr *SuccessfulResult) Id() TaskId
func (*SuccessfulResult) String ¶
func (sr *SuccessfulResult) String() string
func (*SuccessfulResult) Success ¶
func (wr *SuccessfulResult) Success() bool
type TaskId ¶
type TaskId struct { TopicPartition TopicAndPartition Offset int64 }
type TestKafkaCluster ¶
type TestKafkaCluster struct { Path string Servers []*TestKafkaServer }
func StartTestKafkaCluster ¶
func StartTestKafkaCluster(size int, zookeeperPort int) (*TestKafkaCluster, error)
func (*TestKafkaCluster) Stop ¶
func (c *TestKafkaCluster) Stop()
type TestKafkaServer ¶
func (*TestKafkaServer) Addr ¶
func (k *TestKafkaServer) Addr() string
type TimedOutResult ¶
type TimedOutResult struct {
// contains filtered or unexported fields
}
func (*TimedOutResult) Id ¶
func (wr *TimedOutResult) Id() TaskId
func (*TimedOutResult) String ¶
func (sr *TimedOutResult) String() string
func (*TimedOutResult) Success ¶
func (wr *TimedOutResult) Success() bool
type TopicAndPartition ¶
Type representing a single Kafka topic and partition
func (*TopicAndPartition) String ¶
func (tp *TopicAndPartition) String() string
type TopicFilter ¶
type TopicFilter interface {
// contains filtered or unexported methods
}
Either a WhiteList or BlackList consumer topic filter.
type TopicPartitionData ¶
type TopicPartitionData struct { TopicPartition TopicAndPartition Data *sarama.FetchResponseBlock }
Fetched data from Kafka broker for a particular topic and partition
type TopicSwitch ¶
type TopicSwitch struct { ConsumerId string DesiredPattern string TopicsToNumStreamsMap map[string]int }
TODO ???
func (*TopicSwitch) GetConsumerThreadIdsPerTopic ¶
func (tc *TopicSwitch) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId
func (*TopicSwitch) GetTopicsToNumStreamsMap ¶
func (tc *TopicSwitch) GetTopicsToNumStreamsMap() map[string]int
func (*TopicSwitch) Pattern ¶
func (tc *TopicSwitch) Pattern() string
type TopicsToNumStreams ¶
type TopicsToNumStreams interface { //Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics. GetTopicsToNumStreamsMap() map[string]int //Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics. GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId //Returns a pattern describing this TopicsToNumStreams. Pattern() string }
Information on Consumer subscription. Used to keep it in consumer coordinator.
func NewTopicsToNumStreams ¶
func NewTopicsToNumStreams(Groupid string, Consumerid string, Coordinator ConsumerCoordinator, ExcludeInternalTopics bool) (TopicsToNumStreams, error)
Constructs a new TopicsToNumStreams for consumer with Consumerid id that works within consumer group Groupid. Uses Coordinator to get consumer information. Returns error if fails to retrieve consumer information from Coordinator.
type WhiteList ¶
type WhiteList struct {
// contains filtered or unexported fields
}
WhiteList is a topic filter that will match every topic for a given regex
func NewWhiteList ¶
Creates a new WhiteList topic filter for a given regex
type WildcardTopicsToNumStreams ¶
type WildcardTopicsToNumStreams struct { Coordinator ConsumerCoordinator ConsumerId string TopicFilter TopicFilter NumStreams int ExcludeInternalTopics bool }
TopicsToNumStreams implementation representing either whitelist or blacklist consumer subscription.
func (*WildcardTopicsToNumStreams) GetConsumerThreadIdsPerTopic ¶
func (tc *WildcardTopicsToNumStreams) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId
Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.
func (*WildcardTopicsToNumStreams) GetTopicsToNumStreamsMap ¶
func (tc *WildcardTopicsToNumStreams) GetTopicsToNumStreamsMap() map[string]int
Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.
func (*WildcardTopicsToNumStreams) Pattern ¶
func (tc *WildcardTopicsToNumStreams) Pattern() string
Returns a pattern describing this TopicsToNumStreams.
type Worker ¶
type Worker struct { OutputChannel chan WorkerResult TaskTimeout time.Duration Closed bool }
func (*Worker) Start ¶
func (w *Worker) Start(task *Task, strategy WorkerStrategy)
type WorkerManager ¶
type WorkerManager struct { Id string Config *ConsumerConfig Strategy WorkerStrategy FailureHook FailedCallback FailedAttemptHook FailedAttemptCallback Workers []*Worker AvailableWorkers chan *Worker CurrentBatch map[TaskId]*Task //TODO inspect for race conditions InputChannel chan []*Message TopicPartition TopicAndPartition LargestOffset int64 FailCounter *FailureCounter // contains filtered or unexported fields }
func NewWorkerManager ¶
func NewWorkerManager(id string, config *ConsumerConfig, topicPartition TopicAndPartition, wmsIdleTimer metrics.Timer, batchDurationTimer metrics.Timer, activeWorkersCounter metrics.Counter, pendingWMsTasksCounter metrics.Counter) *WorkerManager
func (*WorkerManager) IsBatchProcessed ¶
func (wm *WorkerManager) IsBatchProcessed() bool
func (*WorkerManager) Start ¶
func (wm *WorkerManager) Start()
func (*WorkerManager) Stop ¶
func (wm *WorkerManager) Stop() chan bool
func (*WorkerManager) String ¶
func (wm *WorkerManager) String() string
type WorkerResult ¶
type WorkerStrategy ¶
type WorkerStrategy func(*Worker, *Message, TaskId) WorkerResult
type ZookeeperConfig ¶
type ZookeeperConfig struct { /* Zookeeper hosts */ ZookeeperConnect []string /* Zookeeper read timeout */ ZookeeperTimeout time.Duration /* Max retries to claim one partition */ MaxClaimPartitionRetries int /* Backoff to retry to claim partition */ ClaimPartitionBackoff time.Duration }
ZookeeperConfig is used to pass multiple configuration entries to ZookeeperCoordinator.
func NewZookeeperConfig ¶
func NewZookeeperConfig() *ZookeeperConfig
Created a new ZookeeperConfig with sane defaults. Default ZookeeperConnect points to localhost.
type ZookeeperCoordinator ¶
type ZookeeperCoordinator struct {
// contains filtered or unexported fields
}
ZookeeperCoordinator implements ConsumerCoordinator interface and is used to coordinate multiple consumers that work within the same consumer group.
func NewZookeeperCoordinator ¶
func NewZookeeperCoordinator(Config *ZookeeperConfig) *ZookeeperCoordinator
Creates a new ZookeeperCoordinator with a given configuration.
func (*ZookeeperCoordinator) ClaimPartitionOwnership ¶
func (zc *ZookeeperCoordinator) ClaimPartitionOwnership(Groupid string, Topic string, Partition int32, consumerThreadId ConsumerThreadId) (bool, error)
Tells the ConsumerCoordinator to claim partition topic Topic and partition Partition for consumerThreadId fetcher that works within a consumer group Group. Returns true if claim is successful, false and error explaining failure otherwise.
func (*ZookeeperCoordinator) CommitOffset ¶
func (zc *ZookeeperCoordinator) CommitOffset(Groupid string, TopicPartition *TopicAndPartition, Offset int64) error
Tells the ConsumerCoordinator to commit offset Offset for topic and partition TopicPartition for consumer group Groupid. Returns error if failed to commit offset.
func (*ZookeeperCoordinator) Connect ¶
func (zc *ZookeeperCoordinator) Connect() error
Establish connection to this ConsumerCoordinator. Returns an error if fails to connect, nil otherwise.
func (*ZookeeperCoordinator) DeployTopics ¶
func (zc *ZookeeperCoordinator) DeployTopics(Group string, Topics DeployedTopics) error
func (*ZookeeperCoordinator) DeregisterConsumer ¶
func (zc *ZookeeperCoordinator) DeregisterConsumer(Consumerid string, Groupid string) error
Deregisters consumer with Consumerid id that is a part of consumer group Groupid form this ConsumerCoordinator. Returns an error if deregistration failed, nil otherwise.
func (*ZookeeperCoordinator) GetAllBrokers ¶
func (zc *ZookeeperCoordinator) GetAllBrokers() ([]*BrokerInfo, error)
Gets the information about all Kafka brokers registered in this ConsumerCoordinator. Returns a slice of BrokerInfo and error on failure.
func (*ZookeeperCoordinator) GetAllTopics ¶
func (zc *ZookeeperCoordinator) GetAllTopics() ([]string, error)
Gets the list of all topics registered in this ConsumerCoordinator. Returns a slice conaining topic names and error on failure.
func (*ZookeeperCoordinator) GetConsumerInfo ¶
func (zc *ZookeeperCoordinator) GetConsumerInfo(Consumerid string, Groupid string) (*ConsumerInfo, error)
Gets the information about consumer with Consumerid id that is a part of consumer group Groupid from this ConsumerCoordinator. Returns ConsumerInfo on success and error otherwise (For example if consumer with given Consumerid does not exist).
func (*ZookeeperCoordinator) GetConsumersInGroup ¶
func (zc *ZookeeperCoordinator) GetConsumersInGroup(Groupid string) ([]string, error)
Gets the list of all consumer ids within a consumer group Groupid. Returns a slice containing all consumer ids in group and error on failure.
func (*ZookeeperCoordinator) GetConsumersPerTopic ¶
func (zc *ZookeeperCoordinator) GetConsumersPerTopic(Groupid string, ExcludeInternalTopics bool) (map[string][]ConsumerThreadId, error)
Gets the information about consumers per topic in consumer group Groupid excluding internal topics (such as offsets) if ExcludeInternalTopics = true. Returns a map where keys are topic names and values are slices of consumer ids and fetcher ids associated with this topic and error on failure.
func (*ZookeeperCoordinator) GetNewDeployedTopics ¶
func (zc *ZookeeperCoordinator) GetNewDeployedTopics(Group string) ([]*DeployedTopics, error)
func (*ZookeeperCoordinator) GetOffsetForTopicPartition ¶
func (zc *ZookeeperCoordinator) GetOffsetForTopicPartition(Groupid string, TopicPartition *TopicAndPartition) (int64, error)
Gets the offset for a given TopicPartition and consumer group Groupid. Returns offset on sucess, error otherwise.
func (*ZookeeperCoordinator) GetPartitionsForTopics ¶
func (zc *ZookeeperCoordinator) GetPartitionsForTopics(Topics []string) (map[string][]int32, error)
Gets the information about existing partitions for a given Topics. Returns a map where keys are topic names and values are slices of partition ids associated with this topic and error on failure.
func (*ZookeeperCoordinator) NotifyConsumerGroup ¶
func (zc *ZookeeperCoordinator) NotifyConsumerGroup(Groupid string, ConsumerId string) error
TODO not sure if we need this
func (*ZookeeperCoordinator) RegisterConsumer ¶
func (zc *ZookeeperCoordinator) RegisterConsumer(Consumerid string, Groupid string, TopicCount TopicsToNumStreams) error
Registers a new consumer with Consumerid id and TopicCount subscription that is a part of consumer group Groupid in this ConsumerCoordinator. Returns an error if registration failed, nil otherwise.
func (*ZookeeperCoordinator) ReleasePartitionOwnership ¶
func (zc *ZookeeperCoordinator) ReleasePartitionOwnership(Groupid string, Topic string, Partition int32) error
Tells the ConsumerCoordinator to release partition ownership on topic Topic and partition Partition for consumer group Groupid. Returns error if failed to released partition ownership.
func (*ZookeeperCoordinator) String ¶
func (zc *ZookeeperCoordinator) String() string
func (*ZookeeperCoordinator) SubscribeForChanges ¶
func (zc *ZookeeperCoordinator) SubscribeForChanges(Groupid string) (<-chan CoordinatorEvent, error)
Subscribes for any change that should trigger consumer rebalance on consumer group Groupid in this ConsumerCoordinator. Returns a read-only channel of booleans that will get values on any significant coordinator event (e.g. new consumer appeared, new broker appeared etc.) and error if failed to subscribe.
func (*ZookeeperCoordinator) Unsubscribe ¶
func (zc *ZookeeperCoordinator) Unsubscribe()
Tells the ConsumerCoordinator to unsubscribe from events for the consumer it is associated with.