Documentation ¶
Overview ¶
Package go_kafka_client provides a high-level Kafka consumer implementation and introduces different approach than Java/Scala high-level consumer.
Primary differences include workers concept enforcing at least once processing before committing offsets, improved rebalancing algorithm - closing obsolete connections and opening new connections without stopping the whole consumer, graceful shutdown support notifying client when it is over, batch processing, static partitions configuration support allowing to start a consumer with a predefined set of partitions never caring about rebalancing.
Index ¶
- Constants
- Variables
- func BootstrapBrokers(coordinator ConsumerCoordinator) ([]string, error)
- func CreateMultiplePartitionsTopic(zk string, topicName string, numPartitions int)
- func Critical(tag interface{}, message interface{})
- func Criticalf(tag interface{}, message interface{}, params ...interface{})
- func Debug(tag interface{}, message interface{})
- func Debugf(tag interface{}, message interface{}, params ...interface{})
- func EnsureHasLeader(zkConnect string, topic string)
- func Error(tag interface{}, message interface{})
- func Errorf(tag interface{}, message interface{}, params ...interface{})
- func Info(tag interface{}, message interface{})
- func Infof(tag interface{}, message interface{}, params ...interface{})
- func LoadConfiguration(Path string) (map[string]string, error)
- func Trace(tag interface{}, message interface{})
- func Tracef(tag interface{}, message interface{}, params ...interface{})
- func Warn(tag interface{}, message interface{})
- func Warnf(tag interface{}, message interface{}, params ...interface{})
- type BlackList
- type BlueGreenDeployment
- type BrokerInfo
- type ByteDecoder
- type ByteEncoder
- type CachedSchemaRegistryClient
- func (this *CachedSchemaRegistryClient) GetByID(id int32) (avro.Schema, error)
- func (this *CachedSchemaRegistryClient) GetLatestSchemaMetadata(subject string) (*SchemaMetadata, error)
- func (this *CachedSchemaRegistryClient) GetVersion(subject string, schema avro.Schema) (int32, error)
- func (this *CachedSchemaRegistryClient) Register(subject string, schema avro.Schema) (int32, error)
- type CompatibilityLevel
- type ConstantPartitioner
- type Consumer
- func (c *Consumer) Close() <-chan bool
- func (c *Consumer) StartStatic(topicCountMap map[string]int)
- func (c *Consumer) StartStaticPartitions(topicPartitionMap map[string][]int32)
- func (c *Consumer) StartWildcard(topicFilter TopicFilter, numStreams int)
- func (c *Consumer) StateSnapshot() *StateSnapshot
- func (c *Consumer) String() string
- type ConsumerConfig
- type ConsumerCoordinator
- type ConsumerGroupApi
- type ConsumerInfo
- type ConsumerThreadId
- type CoordinatorEvent
- type Decoder
- type DefaultLogger
- func (dl *DefaultLogger) Critical(message string, params ...interface{})
- func (dl *DefaultLogger) Debug(message string, params ...interface{})
- func (dl *DefaultLogger) Error(message string, params ...interface{})
- func (dl *DefaultLogger) Info(message string, params ...interface{})
- func (dl *DefaultLogger) Trace(message string, params ...interface{})
- func (dl *DefaultLogger) Warn(message string, params ...interface{})
- type Encoder
- type ErrorMessage
- type FailedAttemptCallback
- type FailedCallback
- type FailedDecision
- type FailedMessage
- type FailureCounter
- type FixedPartitioner
- type GetSchemaResponse
- type GetSubjectVersionResponse
- type HashPartitioner
- type Int32Decoder
- type Int32Encoder
- type KafkaAvroDecoder
- type KafkaAvroEncoder
- type KafkaLogger
- type LogLevel
- type LowLevelClient
- type MarathonEventProducer
- type MarathonEventProducerConfig
- type Message
- type MirrorMaker
- type MirrorMakerConfig
- type Partitioner
- type PartitionerConstructor
- type ProcessingFailedResult
- type Producer
- type ProducerConfig
- type ProducerConstructor
- type ProducerMessage
- type RandomPartitioner
- type RegisterSchemaResponse
- type RoundRobinPartitioner
- type SaramaClient
- func (this *SaramaClient) Close()
- func (this *SaramaClient) Fetch(topic string, partition int32, offset int64) ([]*Message, error)
- func (this *SaramaClient) GetAvailableOffset(topic string, partition int32, offsetTime string) (int64, error)
- func (this *SaramaClient) Initialize() error
- func (this *SaramaClient) IsOffsetOutOfRange(err error) bool
- func (this *SaramaClient) String() string
- type SaramaPartitioner
- type SaramaPartitionerFactory
- type SaramaProducer
- type SchemaMetadata
- type SchemaRegistryClient
- type SiestaClient
- func (this *SiestaClient) Close()
- func (this *SiestaClient) Fetch(topic string, partition int32, offset int64) ([]*Message, error)
- func (this *SiestaClient) GetAvailableOffset(topic string, partition int32, offsetTime string) (int64, error)
- func (this *SiestaClient) Initialize() error
- func (this *SiestaClient) IsOffsetOutOfRange(err error) bool
- func (this *SiestaClient) String() string
- type StateSnapshot
- type StaticTopicsToNumStreams
- type StringDecoder
- type StringEncoder
- type SuccessfulResult
- type SyslogMessage
- type SyslogProducer
- type SyslogProducerConfig
- type Task
- type TaskId
- type TimedOutResult
- type TopicAndPartition
- type TopicFilter
- type TopicInfo
- type TopicsToNumStreams
- type WhiteList
- type WildcardTopicsToNumStreams
- type Worker
- type WorkerManager
- type WorkerResult
- type WorkerStrategy
- type ZookeeperConfig
- type ZookeeperCoordinator
- func (this *ZookeeperCoordinator) AwaitOnStateBarrier(consumerId string, group string, barrierName string, barrierSize int, ...) bool
- func (this *ZookeeperCoordinator) ClaimPartitionOwnership(Groupid string, Topic string, Partition int32, ...) (bool, error)
- func (this *ZookeeperCoordinator) CommitOffset(Groupid string, TopicPartition *TopicAndPartition, Offset int64) error
- func (this *ZookeeperCoordinator) Connect() (err error)
- func (this *ZookeeperCoordinator) DeregisterConsumer(Consumerid string, Groupid string) (err error)
- func (this *ZookeeperCoordinator) Disconnect()
- func (this *ZookeeperCoordinator) GetAllBrokers() (brokers []*BrokerInfo, err error)
- func (this *ZookeeperCoordinator) GetAllTopics() (topics []string, err error)
- func (this *ZookeeperCoordinator) GetBlueGreenRequest(Group string) (topics map[string]*BlueGreenDeployment, err error)
- func (this *ZookeeperCoordinator) GetConsumerInfo(Consumerid string, Groupid string) (info *ConsumerInfo, err error)
- func (this *ZookeeperCoordinator) GetConsumersInGroup(Groupid string) (consumers []string, err error)
- func (this *ZookeeperCoordinator) GetConsumersPerTopic(Groupid string, ExcludeInternalTopics bool) (consumers map[string][]ConsumerThreadId, err error)
- func (this *ZookeeperCoordinator) GetOffsetForTopicPartition(Groupid string, TopicPartition *TopicAndPartition) (offset int64, err error)
- func (this *ZookeeperCoordinator) GetPartitionsForTopics(Topics []string) (partitions map[string][]int32, err error)
- func (this *ZookeeperCoordinator) RegisterConsumer(Consumerid string, Groupid string, TopicCount TopicsToNumStreams) (err error)
- func (this *ZookeeperCoordinator) ReleasePartitionOwnership(Groupid string, Topic string, Partition int32) error
- func (this *ZookeeperCoordinator) RemoveOldApiRequests(group string) (err error)
- func (this *ZookeeperCoordinator) RemoveStateBarrier(group string, stateHash string, api string) error
- func (this *ZookeeperCoordinator) RequestBlueGreenDeployment(blue BlueGreenDeployment, green BlueGreenDeployment) error
- func (this *ZookeeperCoordinator) String() string
- func (this *ZookeeperCoordinator) SubscribeForChanges(Groupid string) (events <-chan CoordinatorEvent, err error)
- func (this *ZookeeperCoordinator) Unsubscribe()
Constants ¶
const ( // Offset with invalid value InvalidOffset int64 = -1 // Reset the offset to the smallest offset if it is out of range SmallestOffset = "smallest" // Reset the offset to the largest offset if it is out of range LargestOffset = "largest" // Zookeeper offset storage configuration string ZookeeperOffsetStorage = "zookeeper" // Kafka offset storage configuration string KafkaOffsetStorage = "kafka" )
const ( /* Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1 and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1 */ RangeStrategy = "range" /* The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.) (For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance and thread-id within that instance. Therefore, round-robin assignment is allowed only if: a) Every topic has the same number of streams within a consumer instance b) The set of subscribed topics is identical for every consumer instance within the group. */ RoundRobinStrategy = "roundrobin" )
const ( GET_SCHEMA_BY_ID = "/schemas/ids/%d" GET_SUBJECTS = "/subjects" GET_SUBJECT_VERSIONS = "/subjects/%s/versions" GET_SPECIFIC_SUBJECT_VERSION = "/subjects/%s/versions/%s" REGISTER_NEW_SCHEMA = "/subjects/%s/versions" CHECK_IS_REGISTERED = "/subjects/%s" TEST_COMPATIBILITY = "/compatibility/subjects/%s/versions/%s" CONFIG = "/config" )
const ( SCHEMA_REGISTRY_V1_JSON = "application/vnd.schemaregistry.v1+json" SCHEMA_REGISTRY_V1_JSON_WEIGHTED = "application/vnd.schemaregistry.v1+json" SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT = "application/vnd.schemaregistry.v1+json" SCHEMA_REGISTRY_DEFAULT_JSON = "application/vnd.schemaregistry+json" SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED = "application/vnd.schemaregistry+json qs=0.9" JSON = "application/json" JSON_WEIGHTED = "application/json qs=0.5" GENERIC_REQUEST = "application/octet-stream" )
Variables ¶
var PREFERRED_RESPONSE_TYPES = []string{SCHEMA_REGISTRY_V1_JSON, SCHEMA_REGISTRY_DEFAULT_JSON, JSON}
Functions ¶
func BootstrapBrokers ¶
func BootstrapBrokers(coordinator ConsumerCoordinator) ([]string, error)
BootstrapBrokers queries the ConsumerCoordinator for all known brokers in the cluster to be used later as a bootstrap list for the LowLevelClient.
func CreateMultiplePartitionsTopic ¶
Convenience utility to create a topic topicName with numPartitions partitions in Zookeeper located at zk (format should be host:port). Please note that this requires Apache Kafka 0.8.1 binary distribution available through KAFKA_PATH environment variable
func Critical ¶
func Critical(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Critical.
func Criticalf ¶
func Criticalf(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Critical.
func Debug ¶
func Debug(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Debug.
func Debugf ¶
func Debugf(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Debug.
func EnsureHasLeader ¶
blocks until the leader for every partition of a given topic appears this is used by tests only to avoid "In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes"
func Error ¶
func Error(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Error.
func Errorf ¶
func Errorf(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Error.
func Info ¶
func Info(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Info.
func Infof ¶
func Infof(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Info.
func LoadConfiguration ¶
Loads a property file located at Path. Returns a map[string]string or error.
func Trace ¶
func Trace(tag interface{}, message interface{})
Writes a given message with a given tag to log with level Trace.
func Tracef ¶
func Tracef(tag interface{}, message interface{}, params ...interface{})
Formats a given message according to given params with a given tag to log with level Trace.
Types ¶
type BlackList ¶
type BlackList struct {
// contains filtered or unexported fields
}
BlackList is a topic filter that will match every topic that does not match a given regex
func NewBlackList ¶
Creates a new BlackList topic filter for a given regex
type BlueGreenDeployment ¶
type BlueGreenDeployment struct { // Comma separated list of topics to consume from Topics string // Either black_list, white_list or static Pattern string //Consumer group to switch to Group string }
DeployedTopics contain information needed to do a successful blue-green deployment. It contains a comma-separated list of new topics to consume if the pattern is static and regex for wildcard consuming.
type BrokerInfo ¶
General information about Kafka broker. Used to keep it in consumer coordinator.
func (*BrokerInfo) String ¶
func (b *BrokerInfo) String() string
type ByteDecoder ¶
type ByteDecoder struct{}
func (*ByteDecoder) Decode ¶
func (this *ByteDecoder) Decode(bytes []byte) (interface{}, error)
type ByteEncoder ¶
type ByteEncoder struct{}
func (*ByteEncoder) Encode ¶
func (this *ByteEncoder) Encode(what interface{}) ([]byte, error)
type CachedSchemaRegistryClient ¶
type CachedSchemaRegistryClient struct {
// contains filtered or unexported fields
}
func NewCachedSchemaRegistryClient ¶
func NewCachedSchemaRegistryClient(registryURL string) *CachedSchemaRegistryClient
func (*CachedSchemaRegistryClient) GetByID ¶
func (this *CachedSchemaRegistryClient) GetByID(id int32) (avro.Schema, error)
func (*CachedSchemaRegistryClient) GetLatestSchemaMetadata ¶
func (this *CachedSchemaRegistryClient) GetLatestSchemaMetadata(subject string) (*SchemaMetadata, error)
func (*CachedSchemaRegistryClient) GetVersion ¶
type CompatibilityLevel ¶
type CompatibilityLevel string
const ( BackwardCompatibilityLevel CompatibilityLevel = "BACKWARD" ForwardCompatibilityLevel CompatibilityLevel = "FORWARD" FullCompatibilityLevel CompatibilityLevel = "FULL" NoneCompatibilityLevel CompatibilityLevel = "NONE" )
type ConstantPartitioner ¶
type ConstantPartitioner struct {
Constant int32
}
ConstantPartitioner implements the Partitioner interface by just returning a constant value.
func (*ConstantPartitioner) Partition ¶
func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) (int32, error)
func (*ConstantPartitioner) RequiresConsistency ¶
func (p *ConstantPartitioner) RequiresConsistency() bool
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a high-level Kafka consumer designed to work within a consumer group. It subscribes to coordinator events and is able to balance load within a consumer group.
func NewConsumer ¶
func NewConsumer(config *ConsumerConfig) *Consumer
NewConsumer creates a new Consumer with a given configuration. Creating a Consumer does not start fetching immediately.
func (*Consumer) Close ¶
Tells the Consumer to close all existing connections and stop. This method is NOT blocking but returns a channel which will get a single value once the closing is finished.
func (*Consumer) StartStatic ¶
Starts consuming specified topics using a configured amount of goroutines for each topic.
func (*Consumer) StartStaticPartitions ¶
Starts consuming given topic-partitions using ConsumerConfig.NumConsumerFetchers goroutines for each topic.
func (*Consumer) StartWildcard ¶
func (c *Consumer) StartWildcard(topicFilter TopicFilter, numStreams int)
Starts consuming all topics which correspond to a given topicFilter using numStreams goroutines for each topic.
func (*Consumer) StateSnapshot ¶
func (c *Consumer) StateSnapshot() *StateSnapshot
Returns a state snapshot for this consumer. State snapshot contains a set of metrics splitted by topics and partitions.
type ConsumerConfig ¶
type ConsumerConfig struct { /* A string that uniquely identifies a set of consumers within the same consumer group */ Groupid string /* A string that uniquely identifies a consumer within a group. Generated automatically if not set. Set this explicitly for only testing purpose. */ Consumerid string /* The socket timeout for network requests. Its value should be at least FetchWaitMaxMs. */ SocketTimeout time.Duration /* The maximum number of bytes to attempt to fetch */ FetchMessageMaxBytes int32 /* The number of goroutines used to fetch data */ NumConsumerFetchers int /* Max number of message batches buffered for consumption, each batch can be up to FetchBatchSize */ QueuedMaxMessages int32 /* Max number of retries during rebalance */ RebalanceMaxRetries int32 /* The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */ FetchMinBytes int32 /* The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy FetchMinBytes */ FetchWaitMaxMs int32 /* Backoff time between retries during rebalance */ RebalanceBackoff time.Duration /* Backoff time to refresh the leader of a partition after it loses the current leader */ RefreshLeaderBackoff time.Duration /* Retry the offset commit up to this many times on failure. */ OffsetsCommitMaxRetries int /* Try to commit offset every OffsetCommitInterval. If previous offset commit for a partition is still in progress updates the next offset to commit and continues. This way it does not commit all the offset history if the coordinator is slow, but only the highest offsets. */ OffsetCommitInterval time.Duration /* Specify whether offsets should be committed to "zookeeper" (default) or "kafka". */ OffsetsStorage string /* What to do if an offset is out of range. SmallestOffset : automatically reset the offset to the smallest offset. LargestOffset : automatically reset the offset to the largest offset. Defaults to LargestOffset. */ AutoOffsetReset string /* Client id is specified by the kafka consumer client, used to distinguish different clients. */ Clientid string /* Whether messages from internal topics (such as offsets) should be exposed to the consumer. */ ExcludeInternalTopics bool /* Select a strategy for assigning partitions to consumer streams. Possible values: RangeStrategy, RoundRobinStrategy */ PartitionAssignmentStrategy string /* Amount of workers per partition to process consumed messages. */ NumWorkers int /* Times to retry processing a failed message by a worker. */ MaxWorkerRetries int /* Worker retry threshold. Increments each time a worker fails to process a message within MaxWorkerRetries. When this threshold is hit within a WorkerThresholdTimeWindow, WorkerFailureCallback is called letting the user to decide whether the consumer should stop. */ WorkerRetryThreshold int32 /* Resets WorkerRetryThreshold if it isn't hit within this period. */ WorkerThresholdTimeWindow time.Duration /* Callback executed when WorkerRetryThreshold exceeded within WorkerThresholdTimeWindow */ WorkerFailureCallback FailedCallback /* Callback executed when Worker failed to process the message after MaxWorkerRetries and WorkerRetryThreshold is not hit */ WorkerFailedAttemptCallback FailedAttemptCallback /* Worker timeout to process a single message. */ WorkerTaskTimeout time.Duration /* Backoff between worker attempts to process a single message. */ WorkerBackoff time.Duration /* Maximum wait time to gracefully stop a worker manager */ WorkerManagersStopTimeout time.Duration /* A function which defines a user-specified action on a single message. This function is responsible for actual message processing. Consumer panics if Strategy is not set. */ Strategy WorkerStrategy /* Number of messages to accumulate before flushing them to workers */ FetchBatchSize int /* Timeout to accumulate messages. Flushes accumulated batch to workers even if it is not yet full. Resets after each flush meaning this won't be triggered if FetchBatchSize is reached before timeout. */ FetchBatchTimeout time.Duration /* Backoff between fetch requests if no messages were fetched from a previous fetch. */ RequeueAskNextBackoff time.Duration /* Buffer size for ask next channel. This value shouldn't be less than number of partitions per fetch routine. */ AskNextChannelSize int /* Maximum fetch retries if no messages were fetched from a previous fetch */ FetchMaxRetries int /* Maximum retries to fetch topic metadata from one broker. */ FetchTopicMetadataRetries int /* Backoff for fetch topic metadata request if the previous request failed. */ FetchTopicMetadataBackoff time.Duration /* Backoff between two fetch requests for one fetch routine. Needed to prevent fetcher from querying the broker too frequently. */ FetchRequestBackoff time.Duration /* Coordinator used to coordinate consumer's actions, e.g. trigger rebalance events, store offsets and consumer metadata etc. */ Coordinator ConsumerCoordinator /* Indicates whether the client supports blue-green deployment. This config entry is needed because blue-green deployment won't work with RoundRobin partition assignment strategy. Defaults to true. */ BlueGreenDeploymentEnabled bool /* Time to wait after consumer has registered itself in group */ DeploymentTimeout time.Duration /* Service coordinator barrier timeout */ BarrierTimeout time.Duration /* Low Level Kafka Client implementation. */ LowLevelClient LowLevelClient /* Message keys decoder */ KeyDecoder Decoder /* Message values decoder */ ValueDecoder Decoder }
ConsumerConfig defines configuration options for Consumer
func ConsumerConfigFromFile ¶
func ConsumerConfigFromFile(filename string) (*ConsumerConfig, error)
ConsumerConfigFromFile is a helper function that loads a consumer's configuration from file. The file accepts the following fields:
group.id consumer.id fetch.message.max.bytes num.consumer.fetchers rebalance.max.retries queued.max.message.chunks fetch.min.bytes fetch.wait.max.ms rebalance.backoff refresh.leader.backoff offset.commit.max.retries offset.commit.interval offsets.storage auto.offset.reset exclude.internal.topics partition.assignment.strategy num.workers max.worker.retries worker.retry.threshold worker.threshold.time.window worker.task.timeout worker.backoff worker.managers.stop.timeout fetch.batch.size fetch.batch.timeout requeue.ask.next.backoff fetch.max.retries fetch.topic.metadata.retries fetch.topic.metadata.backoff fetch.request.backoff blue.green.deployment.enabled
The configuration file entries should be constructed in key=value syntax. A # symbol at the beginning of a line indicates a comment. Blank lines are ignored. The file should end with a newline character.
func DefaultConsumerConfig ¶
func DefaultConsumerConfig() *ConsumerConfig
DefaultConsumerConfig creates a ConsumerConfig with sane defaults. Note that several required config entries (like Strategy and callbacks) are still not set.
func (*ConsumerConfig) String ¶
func (c *ConsumerConfig) String() string
func (*ConsumerConfig) Validate ¶
func (c *ConsumerConfig) Validate() error
Validate this ConsumerConfig. Returns a corresponding error if the ConsumerConfig is invalid and nil otherwise.
type ConsumerCoordinator ¶
type ConsumerCoordinator interface { /* Establish connection to this ConsumerCoordinator. Returns an error if fails to connect, nil otherwise. */ Connect() error /* Close connection to this ConsumerCoordinator. */ Disconnect() /* Registers a new consumer with Consumerid id and TopicCount subscription that is a part of consumer group Group in this ConsumerCoordinator. Returns an error if registration failed, nil otherwise. */ RegisterConsumer(Consumerid string, Group string, TopicCount TopicsToNumStreams) error /* Deregisters consumer with Consumerid id that is a part of consumer group Group form this ConsumerCoordinator. Returns an error if deregistration failed, nil otherwise. */ DeregisterConsumer(Consumerid string, Group string) error /* Gets the information about consumer with Consumerid id that is a part of consumer group Group from this ConsumerCoordinator. Returns ConsumerInfo on success and error otherwise (For example if consumer with given Consumerid does not exist). */ GetConsumerInfo(Consumerid string, Group string) (*ConsumerInfo, error) /* Gets the information about consumers per topic in consumer group Group excluding internal topics (such as offsets) if ExcludeInternalTopics = true. Returns a map where keys are topic names and values are slices of consumer ids and fetcher ids associated with this topic and error on failure. */ GetConsumersPerTopic(Group string, ExcludeInternalTopics bool) (map[string][]ConsumerThreadId, error) /* Gets the list of all consumer ids within a consumer group Group. Returns a slice containing all consumer ids in group and error on failure. */ GetConsumersInGroup(Group string) ([]string, error) /* Gets the list of all topics registered in this ConsumerCoordinator. Returns a slice conaining topic names and error on failure. */ GetAllTopics() ([]string, error) /* Gets the information about existing partitions for a given Topics. Returns a map where keys are topic names and values are slices of partition ids associated with this topic and error on failure. */ GetPartitionsForTopics(Topics []string) (map[string][]int32, error) /* Gets the information about all Kafka brokers registered in this ConsumerCoordinator. Returns a slice of BrokerInfo and error on failure. */ GetAllBrokers() ([]*BrokerInfo, error) /* Gets the offset for a given TopicPartition and consumer group Group. Returns offset on sucess, error otherwise. */ GetOffsetForTopicPartition(Group string, TopicPartition *TopicAndPartition) (int64, error) /* Subscribes for any change that should trigger consumer rebalance on consumer group Group in this ConsumerCoordinator or trigger topic switch. Returns a read-only channel of CoordinatorEvent that will get values on any significant coordinator event (e.g. new consumer appeared, new broker appeared etc.) and error if failed to subscribe. */ SubscribeForChanges(Group string) (<-chan CoordinatorEvent, error) /* Gets all deployed topics for consume group Group from consumer coordinator. Returns a map where keys are notification ids and values are DeployedTopics. May also return an error (e.g. if failed to reach coordinator). */ GetBlueGreenRequest(Group string) (map[string]*BlueGreenDeployment, error) /* Implements classic barrier synchronization primitive via service coordinator facilities */ AwaitOnStateBarrier(consumerId string, group string, stateHash string, barrierSize int, api string, timeout time.Duration) bool /* Removes state barrier */ RemoveStateBarrier(group string, stateHash string, api string) error /* Tells the ConsumerCoordinator to unsubscribe from events for the consumer it is associated with. */ Unsubscribe() /* Tells the ConsumerCoordinator to claim partition topic Topic and partition Partition for ConsumerThreadId fetcher that works within a consumer group Group. Returns true if claim is successful, false and error explaining failure otherwise. */ ClaimPartitionOwnership(Group string, Topic string, Partition int32, ConsumerThreadId ConsumerThreadId) (bool, error) /* Tells the ConsumerCoordinator to release partition ownership on topic Topic and partition Partition for consumer group Group. Returns error if failed to released partition ownership. */ ReleasePartitionOwnership(Group string, Topic string, Partition int32) error /* Tells the ConsumerCoordinator to commit offset Offset for topic and partition TopicPartition for consumer group Group. Returns error if failed to commit offset. */ CommitOffset(Group string, TopicPartition *TopicAndPartition, Offset int64) error /* Removes old api objects */ RemoveOldApiRequests(group string) error }
ConsumerCoordinator is used to coordinate actions of multiple consumers within the same consumer group. It is responsible for keeping track of alive consumers, manages their offsets and assigns partitions to consume. The current default ConsumerCoordinator is ZookeeperCoordinator. More of them can be added in future.
type ConsumerGroupApi ¶
type ConsumerGroupApi string
const ( BlueGreenDeploymentAPI ConsumerGroupApi = "blue_green" Rebalance ConsumerGroupApi = "rebalance" )
type ConsumerInfo ¶
type ConsumerInfo struct { Version int16 Subscription map[string]int Pattern string Timestamp int64 }
General information about Kafka consumer. Used to keep it in consumer coordinator.
func (*ConsumerInfo) String ¶
func (c *ConsumerInfo) String() string
type ConsumerThreadId ¶
Consumer routine id. Used to keep track of what consumer routine consumes a particular topic-partition in consumer coordinator.
func (*ConsumerThreadId) String ¶
func (c *ConsumerThreadId) String() string
type CoordinatorEvent ¶
type CoordinatorEvent string
CoordinatorEvent is sent by consumer coordinator representing some state change.
const ( // A regular coordinator event that should normally trigger consumer rebalance. Regular CoordinatorEvent = "Regular" // A coordinator event that informs a consumer group of new deployed topics. BlueGreenRequest CoordinatorEvent = "BlueGreenRequest" )
type DefaultLogger ¶
type DefaultLogger struct {
// contains filtered or unexported fields
}
Default implementation of KafkaLogger interface used in this client.
func NewDefaultLogger ¶
func NewDefaultLogger(Level LogLevel) *DefaultLogger
Creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.
func (*DefaultLogger) Critical ¶
func (dl *DefaultLogger) Critical(message string, params ...interface{})
Formats a given message according to given params to log with level Critical.
func (*DefaultLogger) Debug ¶
func (dl *DefaultLogger) Debug(message string, params ...interface{})
Formats a given message according to given params to log with level Debug.
func (*DefaultLogger) Error ¶
func (dl *DefaultLogger) Error(message string, params ...interface{})
Formats a given message according to given params to log with level Error.
func (*DefaultLogger) Info ¶
func (dl *DefaultLogger) Info(message string, params ...interface{})
Formats a given message according to given params to log with level Info.
func (*DefaultLogger) Trace ¶
func (dl *DefaultLogger) Trace(message string, params ...interface{})
Formats a given message according to given params to log with level Trace.
func (*DefaultLogger) Warn ¶
func (dl *DefaultLogger) Warn(message string, params ...interface{})
Formats a given message according to given params to log with level Warn.
type ErrorMessage ¶
func (*ErrorMessage) Error ¶
func (this *ErrorMessage) Error() string
type FailedAttemptCallback ¶
type FailedAttemptCallback func(*Task, WorkerResult) FailedDecision
A callback that is triggered when a worker fails to process a single message.
type FailedCallback ¶
type FailedCallback func(*WorkerManager) FailedDecision
A callback that is triggered when a worker fails to process ConsumerConfig.WorkerRetryThreshold messages within ConsumerConfig.WorkerThresholdTimeWindow
type FailedDecision ¶
type FailedDecision int32
Defines what to do when worker fails to process a message.
const ( // Tells the worker manager to ignore the failure and continue normally. CommitOffsetAndContinue FailedDecision = iota // Tells the worker manager to continue processing new messages but not to commit offset that failed. DoNotCommitOffsetAndContinue // Tells the worker manager to commit offset and stop processing the current batch. CommitOffsetAndStop // Tells the worker manager not to commit offset and stop processing the current batch. DoNotCommitOffsetAndStop )
type FailedMessage ¶
type FailedMessage struct {
// contains filtered or unexported fields
}
type FailureCounter ¶
type FailureCounter struct {
// contains filtered or unexported fields
}
A counter used to track whether we reached the configurable threshold of failed messages within a given time window.
func NewFailureCounter ¶
func NewFailureCounter(FailedThreshold int32, WorkerThresholdTimeWindow time.Duration) *FailureCounter
Creates a new FailureCounter with threshold FailedThreshold and time window WorkerThresholdTimeWindow.
func (*FailureCounter) Failed ¶
func (f *FailureCounter) Failed() bool
Tells this FailureCounter to increment a number of failures by one. Returns true if threshold is reached, false otherwise.
type FixedPartitioner ¶
type FixedPartitioner struct{}
Partitioner sends messages to partitions that correspond message keys
func (*FixedPartitioner) Partition ¶
func (this *FixedPartitioner) Partition(key []byte, numPartitions int32) (int32, error)
func (*FixedPartitioner) RequiresConsistency ¶
func (this *FixedPartitioner) RequiresConsistency() bool
type GetSchemaResponse ¶
type GetSchemaResponse struct {
Schema string
}
type HashPartitioner ¶
type HashPartitioner struct {
// contains filtered or unexported fields
}
HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.
func (*HashPartitioner) Partition ¶
func (this *HashPartitioner) Partition(key []byte, numPartitions int32) (int32, error)
func (*HashPartitioner) RequiresConsistency ¶
func (this *HashPartitioner) RequiresConsistency() bool
type Int32Decoder ¶
type Int32Decoder struct{}
func (*Int32Decoder) Decode ¶
func (this *Int32Decoder) Decode(bytes []byte) (interface{}, error)
type Int32Encoder ¶
type Int32Encoder struct{}
func (*Int32Encoder) Encode ¶
func (this *Int32Encoder) Encode(what interface{}) ([]byte, error)
type KafkaAvroDecoder ¶
type KafkaAvroDecoder struct {
// contains filtered or unexported fields
}
func NewKafkaAvroDecoder ¶
func NewKafkaAvroDecoder(url string) *KafkaAvroDecoder
func (*KafkaAvroDecoder) Decode ¶
func (this *KafkaAvroDecoder) Decode(bytes []byte) (interface{}, error)
type KafkaAvroEncoder ¶
type KafkaAvroEncoder struct {
// contains filtered or unexported fields
}
func NewKafkaAvroEncoder ¶
func NewKafkaAvroEncoder(url string) *KafkaAvroEncoder
func (*KafkaAvroEncoder) Encode ¶
func (this *KafkaAvroEncoder) Encode(obj interface{}) ([]byte, error)
type KafkaLogger ¶
type KafkaLogger interface { //Formats a given message according to given params to log with level Trace. Trace(message string, params ...interface{}) //Formats a given message according to given params to log with level Debug. Debug(message string, params ...interface{}) //Formats a given message according to given params to log with level Info. Info(message string, params ...interface{}) //Formats a given message according to given params to log with level Warn. Warn(message string, params ...interface{}) //Formats a given message according to given params to log with level Error. Error(message string, params ...interface{}) //Formats a given message according to given params to log with level Critical. Critical(message string, params ...interface{}) }
Logger interface. Lets you plug-in your custom logging library instead of using built-in one.
var Logger KafkaLogger = NewDefaultLogger(InfoLevel)
Logger used by this client. Defaults to build-in logger with Info log level.
type LogLevel ¶
type LogLevel string
Represents a logging level
const ( //Use TraceLevel for debugging to find problems in functions, variables etc. TraceLevel LogLevel = "trace" //Use DebugLevel for detailed system reports and diagnostic messages. DebugLevel LogLevel = "debug" //Use InfoLevel for general information about a running application. InfoLevel LogLevel = "info" //Use WarnLevel to indicate small errors and failures that should not happen normally but are recovered automatically. WarnLevel LogLevel = "warn" //Use ErrorLevel to indicate severe errors that affect application workflow and are not handled automatically. ErrorLevel LogLevel = "error" //Use CriticalLevel to indicate fatal errors that may cause data corruption or loss. CriticalLevel LogLevel = "critical" )
type LowLevelClient ¶
type LowLevelClient interface { // This will be called right after connecting to ConsumerCoordinator so this client can initialize itself // with bootstrap broker list for example. May return an error to signal this client is unable to work with given configuration. Initialize() error // This will be called each time the fetch request to Kafka should be issued. Topic, partition and offset are self-explanatory. // Should return a slice of Messages and an error if a fetch error occurred. // Note that for performance reasons it makes sense to keep open broker connections and reuse them on every fetch call. Fetch(topic string, partition int32, offset int64) ([]*Message, error) // This will be called when call to Fetch returns an error. As every client has different error mapping we ask here explicitly // whether the returned error is an OffsetOutOfRange error. Should return true if it is, false otherwise. IsOffsetOutOfRange(error) bool // This will be called to handle OffsetOutOfRange error. OffsetTime will be either "smallest" or "largest". // Should return a corresponding offset value and an error if it occurred. GetAvailableOffset(topic string, partition int32, offsetTime string) (int64, error) // This will be called to gracefully shutdown this client. Close() }
LowLevelClient is a low-level Kafka client that manages broker connections, responsible to fetch metadata and is able to handle Fetch and Offset requests. TODO not sure that's a good name for this interface
type MarathonEventProducer ¶
type MarathonEventProducer struct {
// contains filtered or unexported fields
}
func NewMarathonEventProducer ¶
func NewMarathonEventProducer(config *MarathonEventProducerConfig) *MarathonEventProducer
func (*MarathonEventProducer) Start ¶
func (this *MarathonEventProducer) Start()
func (*MarathonEventProducer) Stop ¶
func (this *MarathonEventProducer) Stop()
func (*MarathonEventProducer) String ¶
func (this *MarathonEventProducer) String() string
type MarathonEventProducerConfig ¶
type MarathonEventProducerConfig struct { // Marathon event producer config. ProducerConfig *ProducerConfig // Destination topic for all incoming messages. Topic string // Kafka Broker List host:port,host:port BrokerList string //HTTP endpoint binding port Port int // HTTP endpoint url pattern to listen, e.g. "/marathon". Pattern string // URL to Confluent Schema Registry. This triggers all messages to be sent in Avro format. SchemaRegistryUrl string // Avro schema to use when producing messages in Avro format. AvroSchema avro.Schema // Function that generates producer instances ProducerConstructor ProducerConstructor }
func NewMarathonEventProducerConfig ¶
func NewMarathonEventProducerConfig() *MarathonEventProducerConfig
Creates an empty MarathonEventProducerConfig.
type Message ¶
type Message struct { // Partition key. Key []byte // Message value. Value []byte // Decoded message key DecodedKey interface{} // Decoded message value DecodedValue interface{} // Topic this message came from. Topic string // Partition this message came from. Partition int32 // Message offset. Offset int64 }
Single Kafka message that is sent to user-defined Strategy
type MirrorMaker ¶
type MirrorMaker struct {
// contains filtered or unexported fields
}
MirrorMaker is a tool to mirror source Kafka cluster into a target (mirror) Kafka cluster. It uses a Kafka consumer to consume messages from the source cluster, and re-publishes those messages to the target cluster.
func NewMirrorMaker ¶
func NewMirrorMaker(config *MirrorMakerConfig) *MirrorMaker
Creates a new MirrorMaker using given MirrorMakerConfig.
func (*MirrorMaker) AddTiming ¶
func (this *MirrorMaker) AddTiming(record *avro.GenericRecord, tag string, now int64) *avro.GenericRecord
func (*MirrorMaker) Errors ¶
func (this *MirrorMaker) Errors() <-chan *FailedMessage
func (*MirrorMaker) Start ¶
func (this *MirrorMaker) Start()
Starts the MirrorMaker. This method is blocking and should probably be run in a separate goroutine.
type MirrorMakerConfig ¶
type MirrorMakerConfig struct { // Whitelist of topics to mirror. Exactly one whitelist or blacklist is allowed. Whitelist string // Blacklist of topics to mirror. Exactly one whitelist or blacklist is allowed. Blacklist string // Consumer configurations to consume from a source cluster. ConsumerConfigs []string // Embedded producer config. ProducerConfig string // Number of producer instances. NumProducers int // Number of consumption streams. NumStreams int // Flag to preserve partition number. E.g. if message was read from partition 5 it'll be written to partition 5. Note that this can affect performance. PreservePartitions bool // Flag to preserve message order. E.g. message sequence 1, 2, 3, 4, 5 will remain 1, 2, 3, 4, 5 in destination topic. Note that this can affect performance. PreserveOrder bool // Destination topic prefix. E.g. if message was read from topic "test" and prefix is "dc1_" it'll be written to topic "dc1_test". TopicPrefix string // Number of messages that are buffered between the consumer and producer. ChannelSize int // Message keys encoder for producer KeyEncoder Encoder // Message values encoder for producer ValueEncoder Encoder // Message keys decoder for consumer KeyDecoder Decoder // Message values decoder for consumer ValueDecoder Decoder // Function that generates producer instances ProducerConstructor ProducerConstructor // Path to producer configuration, that is responsible for logging timings // Defines whether add timings to message or not. // Note: used only for avro encoded messages TimingsProducerConfig string }
MirrorMakerConfig defines configuration options for MirrorMaker
func NewMirrorMakerConfig ¶
func NewMirrorMakerConfig() *MirrorMakerConfig
Creates an empty MirrorMakerConfig.
type Partitioner ¶
type Partitioner interface { Partition(key []byte, numPartitions int32) (int32, error) RequiresConsistency() bool }
func NewFixedPartitioner ¶
func NewFixedPartitioner() Partitioner
func NewHashPartitioner ¶
func NewHashPartitioner() Partitioner
func NewRandomPartitioner ¶
func NewRandomPartitioner() Partitioner
func NewRoundRobinPartitioner ¶
func NewRoundRobinPartitioner() Partitioner
type PartitionerConstructor ¶
type PartitionerConstructor func() Partitioner
type ProcessingFailedResult ¶
type ProcessingFailedResult struct {
// contains filtered or unexported fields
}
An implementation of WorkerResult interface representing a failure to process incoming message.
func NewProcessingFailedResult ¶
func NewProcessingFailedResult(id TaskId) *ProcessingFailedResult
Creates a new ProcessingFailedResult for given TaskId.
func (*ProcessingFailedResult) Id ¶
func (wr *ProcessingFailedResult) Id() TaskId
Returns an id of task that was processed.
func (*ProcessingFailedResult) String ¶
func (sr *ProcessingFailedResult) String() string
func (*ProcessingFailedResult) Success ¶
func (wr *ProcessingFailedResult) Success() bool
Always returns false for ProcessingFailedResult.
type Producer ¶
type Producer interface { Errors() <-chan *FailedMessage Successes() <-chan *ProducerMessage Input() chan<- *ProducerMessage Close() error AsyncClose() }
func NewSaramaProducer ¶
func NewSaramaProducer(conf *ProducerConfig) Producer
type ProducerConfig ¶
type ProducerConfig struct { Clientid string BrokerList []string SendBufferSize int CompressionCodec string FlushByteCount int FlushTimeout time.Duration BatchSize int MaxMessageBytes int MaxMessagesPerRequest int Acks int RetryBackoff time.Duration Timeout time.Duration Partitioner PartitionerConstructor KeyEncoder Encoder ValueEncoder Encoder AckSuccesses bool }
func DefaultProducerConfig ¶
func DefaultProducerConfig() *ProducerConfig
func ProducerConfigFromFile ¶
func ProducerConfigFromFile(filename string) (*ProducerConfig, error)
ProducerConfigFromFile is a helper function that loads a producer's configuration information from file. The file accepts the following fields:
client.id metadata.broker.list send.buffer.size compression.codec flush.byte.count flush.timeout batch.size max.message.bytes max.messages.per.request acks retry.backoff timeout
The configuration file entries should be constructed in key=value syntax. A # symbol at the beginning of a line indicates a comment. Blank lines are ignored. The file should end with a newline character.
func (*ProducerConfig) Validate ¶
func (this *ProducerConfig) Validate() error
type ProducerConstructor ¶
type ProducerConstructor func(config *ProducerConfig) Producer
type ProducerMessage ¶
type RandomPartitioner ¶
type RandomPartitioner struct {
// contains filtered or unexported fields
}
RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
func (*RandomPartitioner) Partition ¶
func (this *RandomPartitioner) Partition(key []byte, numPartitions int32) (int32, error)
func (*RandomPartitioner) RequiresConsistency ¶
func (this *RandomPartitioner) RequiresConsistency() bool
type RegisterSchemaResponse ¶
type RegisterSchemaResponse struct {
Id int32
}
type RoundRobinPartitioner ¶
type RoundRobinPartitioner struct {
// contains filtered or unexported fields
}
RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
func (*RoundRobinPartitioner) Partition ¶
func (this *RoundRobinPartitioner) Partition(key []byte, numPartitions int32) (int32, error)
func (*RoundRobinPartitioner) RequiresConsistency ¶
func (this *RoundRobinPartitioner) RequiresConsistency() bool
type SaramaClient ¶
type SaramaClient struct {
// contains filtered or unexported fields
}
SaramaClient implements LowLevelClient and uses github.com/Shopify/sarama as underlying implementation.
func NewSaramaClient ¶
func NewSaramaClient(config *ConsumerConfig) *SaramaClient
Creates a new SaramaClient using a given ConsumerConfig.
func (*SaramaClient) Fetch ¶
This will be called each time the fetch request to Kafka should be issued. Topic, partition and offset are self-explanatory. Returns slice of Messages and an error if a fetch error occurred.
func (*SaramaClient) GetAvailableOffset ¶
func (this *SaramaClient) GetAvailableOffset(topic string, partition int32, offsetTime string) (int64, error)
This will be called to handle OffsetOutOfRange error. OffsetTime will be either "smallest" or "largest".
func (*SaramaClient) Initialize ¶
func (this *SaramaClient) Initialize() error
This will be called right after connecting to ConsumerCoordinator so this client can initialize itself with bootstrap broker list for example. May return an error to signal this client is unable to work with given configuration.
func (*SaramaClient) IsOffsetOutOfRange ¶
func (this *SaramaClient) IsOffsetOutOfRange(err error) bool
Checks whether the given error indicates an OffsetOutOfRange error.
func (*SaramaClient) String ¶
func (this *SaramaClient) String() string
Returns a string representation of this SaramaClient.
type SaramaPartitioner ¶
type SaramaPartitioner struct {
// contains filtered or unexported fields
}
func (*SaramaPartitioner) RequiresConsistency ¶
func (this *SaramaPartitioner) RequiresConsistency() bool
type SaramaPartitionerFactory ¶
type SaramaPartitionerFactory struct {
// contains filtered or unexported fields
}
func (*SaramaPartitionerFactory) PartitionerConstructor ¶
func (this *SaramaPartitionerFactory) PartitionerConstructor() sarama.Partitioner
type SaramaProducer ¶
type SaramaProducer struct {
// contains filtered or unexported fields
}
func (*SaramaProducer) AsyncClose ¶
func (this *SaramaProducer) AsyncClose()
func (*SaramaProducer) Close ¶
func (this *SaramaProducer) Close() error
func (*SaramaProducer) Errors ¶
func (this *SaramaProducer) Errors() <-chan *FailedMessage
func (*SaramaProducer) Input ¶
func (this *SaramaProducer) Input() chan<- *ProducerMessage
func (*SaramaProducer) Successes ¶
func (this *SaramaProducer) Successes() <-chan *ProducerMessage
type SchemaMetadata ¶
type SchemaRegistryClient ¶
type SiestaClient ¶
type SiestaClient struct {
// contains filtered or unexported fields
}
SiestaClient implements LowLevelClient and uses github.com/stealthly/siesta as underlying implementation.
func NewSiestaClient ¶
func NewSiestaClient(config *ConsumerConfig) *SiestaClient
Creates a new SiestaClient using a given ConsumerConfig.
func (*SiestaClient) Fetch ¶
This will be called each time the fetch request to Kafka should be issued. Topic, partition and offset are self-explanatory. Returns slice of Messages and an error if a fetch error occurred.
func (*SiestaClient) GetAvailableOffset ¶
func (this *SiestaClient) GetAvailableOffset(topic string, partition int32, offsetTime string) (int64, error)
This will be called to handle OffsetOutOfRange error. OffsetTime will be either "smallest" or "largest".
func (*SiestaClient) Initialize ¶
func (this *SiestaClient) Initialize() error
This will be called right after connecting to ConsumerCoordinator so this client can initialize itself with bootstrap broker list for example. May return an error to signal this client is unable to work with given configuration.
func (*SiestaClient) IsOffsetOutOfRange ¶
func (this *SiestaClient) IsOffsetOutOfRange(err error) bool
Checks whether the given error indicates an OffsetOutOfRange error.
func (*SiestaClient) String ¶
func (this *SiestaClient) String() string
Returns a string representation of this SaramaClient.
type StateSnapshot ¶
type StateSnapshot struct { // Metrics are a map where keys are event names and values are maps holding event values grouped by meters (count, min, max, etc.). Metrics map[string]map[string]float64 // Offsets are a map where keys are topics and values are maps where keys are partitions and values are offsets for these topic-partitions. Offsets map[string]map[int32]int64 }
Represents a consumer state snapshot.
type StaticTopicsToNumStreams ¶
type StaticTopicsToNumStreams struct { // Consumer id string. ConsumerId string // Map where keys are topic names and values are number of fetcher routines responsible for processing these topics. TopicsToNumStreamsMap map[string]int }
TopicsToNumStreams implementation representing a static consumer subscription.
func (*StaticTopicsToNumStreams) GetConsumerThreadIdsPerTopic ¶
func (tc *StaticTopicsToNumStreams) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId
Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.
func (*StaticTopicsToNumStreams) GetTopicsToNumStreamsMap ¶
func (tc *StaticTopicsToNumStreams) GetTopicsToNumStreamsMap() map[string]int
Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.
func (*StaticTopicsToNumStreams) Pattern ¶
func (tc *StaticTopicsToNumStreams) Pattern() string
Returns a pattern describing this TopicsToNumStreams.
type StringDecoder ¶
type StringDecoder struct{}
func (*StringDecoder) Decode ¶
func (this *StringDecoder) Decode(bytes []byte) (interface{}, error)
type StringEncoder ¶
type StringEncoder struct{}
func (*StringEncoder) Encode ¶
func (this *StringEncoder) Encode(what interface{}) ([]byte, error)
type SuccessfulResult ¶
type SuccessfulResult struct {
// contains filtered or unexported fields
}
An implementation of WorkerResult interface representing a successfully processed incoming message.
func NewSuccessfulResult ¶
func NewSuccessfulResult(id TaskId) *SuccessfulResult
Creates a new SuccessfulResult for given TaskId.
func (*SuccessfulResult) Id ¶
func (wr *SuccessfulResult) Id() TaskId
Returns an id of task that was processed.
func (*SuccessfulResult) String ¶
func (sr *SuccessfulResult) String() string
func (*SuccessfulResult) Success ¶
func (wr *SuccessfulResult) Success() bool
Always returns true for SuccessfulResult.
type SyslogMessage ¶
type SyslogProducer ¶
type SyslogProducer struct {
// contains filtered or unexported fields
}
func NewSyslogProducer ¶
func NewSyslogProducer(config *SyslogProducerConfig) *SyslogProducer
func (*SyslogProducer) Start ¶
func (this *SyslogProducer) Start()
func (*SyslogProducer) Stop ¶
func (this *SyslogProducer) Stop()
func (*SyslogProducer) String ¶
func (this *SyslogProducer) String() string
type SyslogProducerConfig ¶
type SyslogProducerConfig struct { // Syslog producer config. ProducerConfig *ProducerConfig // Number of producer instances. NumProducers int // Number of messages that are buffered to produce. ChannelSize int Topic string // Receive messages from this TCP address and post them to topic. TCPAddr string // Receive messages from this UDP address and post them to topic. UDPAddr string // Kafka Broker List host:port,host:port BrokerList string // Transformer func(message syslogparser.LogParts, topic string) *sarama.ProducerMessage Transformer func(message *SyslogMessage, topic string) *sarama.ProducerMessage }
SyslogProducerConfig defines configuration options for SyslogProducer
func NewSyslogProducerConfig ¶
func NewSyslogProducerConfig() *SyslogProducerConfig
Creates an empty SyslogProducerConfig.
type Task ¶
type Task struct { // A message that should be processed. Msg *Message // Number of retries used for this task. Retries int // A worker that is responsible for processing this task. Callee *Worker }
Represents a single task for a worker.
type TaskId ¶
type TaskId struct { // Message's topic and partition TopicPartition TopicAndPartition // Message's offset Offset int64 }
Type representing a task id. Consists from topic, partition and offset of a message being processed.
type TimedOutResult ¶
type TimedOutResult struct {
// contains filtered or unexported fields
}
An implementation of WorkerResult interface representing a timeout to process incoming message.
func (*TimedOutResult) Id ¶
func (wr *TimedOutResult) Id() TaskId
Returns an id of task that was processed.
func (*TimedOutResult) String ¶
func (sr *TimedOutResult) String() string
func (*TimedOutResult) Success ¶
func (wr *TimedOutResult) Success() bool
Always returns false for TimedOutResult.
type TopicAndPartition ¶
Type representing a single Kafka topic and partition
func (*TopicAndPartition) String ¶
func (tp *TopicAndPartition) String() string
type TopicFilter ¶
type TopicFilter interface { Regex() string TopicAllowed(topic string, excludeInternalTopics bool) bool }
Either a WhiteList or BlackList consumer topic filter.
type TopicsToNumStreams ¶
type TopicsToNumStreams interface { //Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics. GetTopicsToNumStreamsMap() map[string]int //Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics. GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId //Returns a pattern describing this TopicsToNumStreams. Pattern() string }
Information on Consumer subscription. Used to keep it in consumer coordinator.
func NewStaticTopicsToNumStreams ¶
func NewStaticTopicsToNumStreams(consumerId string, topics string, pattern string, numStreams int, excludeInternalTopics bool, coordinator ConsumerCoordinator) TopicsToNumStreams
Constructs a new TopicsToNumStreams for consumer with Consumerid id that works within consumer group Groupid. Uses Coordinator to get consumer information. Returns error if fails to retrieve consumer information from Coordinator.
func NewTopicsToNumStreams ¶
func NewTopicsToNumStreams(Groupid string, Consumerid string, Coordinator ConsumerCoordinator, ExcludeInternalTopics bool) (TopicsToNumStreams, error)
Constructs a new TopicsToNumStreams for consumer with Consumerid id that works within consumer group Groupid. Uses Coordinator to get consumer information. Returns error if fails to retrieve consumer information from Coordinator.
type WhiteList ¶
type WhiteList struct {
// contains filtered or unexported fields
}
WhiteList is a topic filter that will match every topic for a given regex
func NewWhiteList ¶
Creates a new WhiteList topic filter for a given regex
type WildcardTopicsToNumStreams ¶
type WildcardTopicsToNumStreams struct { Coordinator ConsumerCoordinator ConsumerId string TopicFilter TopicFilter NumStreams int ExcludeInternalTopics bool }
TopicsToNumStreams implementation representing either whitelist or blacklist consumer subscription.
func (*WildcardTopicsToNumStreams) GetConsumerThreadIdsPerTopic ¶
func (tc *WildcardTopicsToNumStreams) GetConsumerThreadIdsPerTopic() map[string][]ConsumerThreadId
Creates a map describing consumer subscription where keys are topic names and values are slices of ConsumerThreadIds used to fetch these topics.
func (*WildcardTopicsToNumStreams) GetTopicsToNumStreamsMap ¶
func (tc *WildcardTopicsToNumStreams) GetTopicsToNumStreamsMap() map[string]int
Creates a map descibing consumer subscription where keys are topic names and values are number of fetchers used to fetch these topics.
func (*WildcardTopicsToNumStreams) Pattern ¶
func (tc *WildcardTopicsToNumStreams) Pattern() string
Returns a pattern describing this TopicsToNumStreams.
type Worker ¶
type Worker struct { // Channel to write processing results to. OutputChannel chan WorkerResult // Timeout for a single worker task. TaskTimeout time.Duration // Indicates whether this worker is closed and cannot accept new work. Closed bool }
Represents a worker that is able to process a single message.
func (*Worker) Start ¶
func (w *Worker) Start(task *Task, strategy WorkerStrategy)
Starts processing a given task using given strategy with this worker. Call to this method blocks until the task is done or timed out.
type WorkerManager ¶
type WorkerManager struct {
// contains filtered or unexported fields
}
WorkerManager is responsible for splitting the incomming batches of messages between a configured amount of workers. It also keeps track of highest processed offsets and commits them to offset storage with a configurable frequency.
func NewWorkerManager ¶
func NewWorkerManager(id string, config *ConsumerConfig, topicPartition TopicAndPartition, metrics *consumerMetrics) *WorkerManager
Creates a new WorkerManager with given id using a given ConsumerConfig and responsible for managing given TopicAndPartition.
func (*WorkerManager) GetLargestOffset ¶
func (wm *WorkerManager) GetLargestOffset() int64
Gets the highest offset that has been processed by this WorkerManager.
func (*WorkerManager) IsBatchProcessed ¶
func (wm *WorkerManager) IsBatchProcessed() bool
Asks this WorkerManager whether the current batch is fully processed. Returns true if so, false otherwise.
func (*WorkerManager) Start ¶
func (wm *WorkerManager) Start()
Starts processing incoming batches with this WorkerManager. Processing is possible only in batch-at-once mode. It also launches an offset committer routine. Call to this method blocks.
func (*WorkerManager) Stop ¶
func (wm *WorkerManager) Stop() chan bool
Tells this WorkerManager to finish processing current batch, stop accepting new work and shut down. This method returns immediately and returns a channel which will get the value once the shut down is finished.
func (*WorkerManager) String ¶
func (wm *WorkerManager) String() string
func (*WorkerManager) UpdateLargestOffset ¶
func (wm *WorkerManager) UpdateLargestOffset(offset int64)
Updates the highest offset that has been processed by this WorkerManager with a new value.
type WorkerResult ¶
type WorkerResult interface { // Returns an id of task that was processed. Id() TaskId // Returns true if processing succeeded, false otherwise. Success() bool }
Interface that represents a result of processing incoming message.
type WorkerStrategy ¶
type WorkerStrategy func(*Worker, *Message, TaskId) WorkerResult
Defines what to do with a single Kafka message. Returns a WorkerResult to distinguish successful and unsuccessful processings.
type ZookeeperConfig ¶
type ZookeeperConfig struct { /* Zookeeper hosts */ ZookeeperConnect []string /* Zookeeper read timeout */ ZookeeperTimeout time.Duration /* Max retries for any request except CommitOffset. CommitOffset is controlled by ConsumerConfig.OffsetsCommitMaxRetries. */ MaxRequestRetries int /* Backoff to retry any request */ RequestBackoff time.Duration /* kafka Root */ Root string }
ZookeeperConfig is used to pass multiple configuration entries to ZookeeperCoordinator.
func NewZookeeperConfig ¶
func NewZookeeperConfig() *ZookeeperConfig
Created a new ZookeeperConfig with sane defaults. Default ZookeeperConnect points to localhost.
func ZookeeperConfigFromFile ¶
func ZookeeperConfigFromFile(filename string) (*ZookeeperConfig, error)
ZookeeperConfigFromFile is a helper function that loads zookeeper configuration information from file. The file accepts the following fields:
zookeeper.connect zookeeper.kafka.Root zookeeper.connection.timeout zookeeper.max.request.retries zookeeper.request.backoff
The configuration file entries should be constructed in key=value syntax. A # symbol at the beginning of a line indicates a comment. Blank lines are ignored. The file should end with a newline character.
type ZookeeperCoordinator ¶
type ZookeeperCoordinator struct {
// contains filtered or unexported fields
}
ZookeeperCoordinator implements ConsumerCoordinator interface and is used to coordinate multiple consumers that work within the same consumer group.
func NewZookeeperCoordinator ¶
func NewZookeeperCoordinator(Config *ZookeeperConfig) *ZookeeperCoordinator
Creates a new ZookeeperCoordinator with a given configuration. The new created ZookeeperCoordinator does NOT automatically connect to zookeeper, you should call Connect() explicitly
func (*ZookeeperCoordinator) AwaitOnStateBarrier ¶
func (*ZookeeperCoordinator) ClaimPartitionOwnership ¶
func (this *ZookeeperCoordinator) ClaimPartitionOwnership(Groupid string, Topic string, Partition int32, consumerThreadId ConsumerThreadId) (bool, error)
Tells the ConsumerCoordinator to claim partition topic Topic and partition Partition for consumerThreadId fetcher that works within a consumer group Group. Returns true if claim is successful, false and error explaining failure otherwise.
func (*ZookeeperCoordinator) CommitOffset ¶
func (this *ZookeeperCoordinator) CommitOffset(Groupid string, TopicPartition *TopicAndPartition, Offset int64) error
Tells the ConsumerCoordinator to commit offset Offset for topic and partition TopicPartition for consumer group Groupid. Returns error if failed to commit offset.
func (*ZookeeperCoordinator) Connect ¶
func (this *ZookeeperCoordinator) Connect() (err error)
Establish connection to this ConsumerCoordinator. Returns an error if fails to connect, nil otherwise.
func (*ZookeeperCoordinator) DeregisterConsumer ¶
func (this *ZookeeperCoordinator) DeregisterConsumer(Consumerid string, Groupid string) (err error)
Deregisters consumer with Consumerid id that is a part of consumer group Groupid form this ConsumerCoordinator. Returns an error if deregistration failed, nil otherwise.
func (*ZookeeperCoordinator) Disconnect ¶
func (this *ZookeeperCoordinator) Disconnect()
func (*ZookeeperCoordinator) GetAllBrokers ¶
func (this *ZookeeperCoordinator) GetAllBrokers() (brokers []*BrokerInfo, err error)
Gets the information about all Kafka brokers registered in this ConsumerCoordinator. Returns a slice of BrokerInfo and error on failure.
func (*ZookeeperCoordinator) GetAllTopics ¶
func (this *ZookeeperCoordinator) GetAllTopics() (topics []string, err error)
Gets the list of all topics registered in this ConsumerCoordinator. Returns a slice conaining topic names and error on failure.
func (*ZookeeperCoordinator) GetBlueGreenRequest ¶
func (this *ZookeeperCoordinator) GetBlueGreenRequest(Group string) (topics map[string]*BlueGreenDeployment, err error)
Gets all deployed topics for consume group Group from consumer coordinator. Returns a map where keys are notification ids and values are DeployedTopics. May also return an error (e.g. if failed to reach coordinator).
func (*ZookeeperCoordinator) GetConsumerInfo ¶
func (this *ZookeeperCoordinator) GetConsumerInfo(Consumerid string, Groupid string) (info *ConsumerInfo, err error)
Gets the information about consumer with Consumerid id that is a part of consumer group Groupid from this ConsumerCoordinator. Returns ConsumerInfo on success and error otherwise (For example if consumer with given Consumerid does not exist).
func (*ZookeeperCoordinator) GetConsumersInGroup ¶
func (this *ZookeeperCoordinator) GetConsumersInGroup(Groupid string) (consumers []string, err error)
Gets the list of all consumer ids within a consumer group Groupid. Returns a slice containing all consumer ids in group and error on failure.
func (*ZookeeperCoordinator) GetConsumersPerTopic ¶
func (this *ZookeeperCoordinator) GetConsumersPerTopic(Groupid string, ExcludeInternalTopics bool) (consumers map[string][]ConsumerThreadId, err error)
Gets the information about consumers per topic in consumer group Groupid excluding internal topics (such as offsets) if ExcludeInternalTopics = true. Returns a map where keys are topic names and values are slices of consumer ids and fetcher ids associated with this topic and error on failure.
func (*ZookeeperCoordinator) GetOffsetForTopicPartition ¶
func (this *ZookeeperCoordinator) GetOffsetForTopicPartition(Groupid string, TopicPartition *TopicAndPartition) (offset int64, err error)
Gets the offset for a given TopicPartition and consumer group Groupid. Returns offset on sucess, error otherwise.
func (*ZookeeperCoordinator) GetPartitionsForTopics ¶
func (this *ZookeeperCoordinator) GetPartitionsForTopics(Topics []string) (partitions map[string][]int32, err error)
Gets the information about existing partitions for a given Topics. Returns a map where keys are topic names and values are slices of partition ids associated with this topic and error on failure.
func (*ZookeeperCoordinator) RegisterConsumer ¶
func (this *ZookeeperCoordinator) RegisterConsumer(Consumerid string, Groupid string, TopicCount TopicsToNumStreams) (err error)
Registers a new consumer with Consumerid id and TopicCount subscription that is a part of consumer group Groupid in this ConsumerCoordinator. Returns an error if registration failed, nil otherwise.
func (*ZookeeperCoordinator) ReleasePartitionOwnership ¶
func (this *ZookeeperCoordinator) ReleasePartitionOwnership(Groupid string, Topic string, Partition int32) error
Tells the ConsumerCoordinator to release partition ownership on topic Topic and partition Partition for consumer group Groupid. Returns error if failed to released partition ownership.
func (*ZookeeperCoordinator) RemoveOldApiRequests ¶
func (this *ZookeeperCoordinator) RemoveOldApiRequests(group string) (err error)
func (*ZookeeperCoordinator) RemoveStateBarrier ¶
func (this *ZookeeperCoordinator) RemoveStateBarrier(group string, stateHash string, api string) error
func (*ZookeeperCoordinator) RequestBlueGreenDeployment ¶
func (this *ZookeeperCoordinator) RequestBlueGreenDeployment(blue BlueGreenDeployment, green BlueGreenDeployment) error
func (*ZookeeperCoordinator) String ¶
func (this *ZookeeperCoordinator) String() string
func (*ZookeeperCoordinator) SubscribeForChanges ¶
func (this *ZookeeperCoordinator) SubscribeForChanges(Groupid string) (events <-chan CoordinatorEvent, err error)
Subscribes for any change that should trigger consumer rebalance on consumer group Groupid in this ConsumerCoordinator. Returns a read-only channel of booleans that will get values on any significant coordinator event (e.g. new consumer appeared, new broker appeared etc.) and error if failed to subscribe.
func (*ZookeeperCoordinator) Unsubscribe ¶
func (this *ZookeeperCoordinator) Unsubscribe()
Tells the ConsumerCoordinator to unsubscribe from events for the consumer it is associated with.
Source Files ¶
- api.go
- avro_encoder_decoder.go
- consumer.go
- consumer_config.go
- encoder_decoder.go
- fetcher.go
- filter.go
- logger.go
- low_level_client.go
- marathon_event_producer.go
- message_buffer.go
- metrics.go
- mirror_maker.go
- partition_assignment.go
- producer.go
- producer_sarama.go
- schema_registry.go
- structs.go
- syslog_producer.go
- testing_utils.go
- topics.go
- utils.go
- workers.go
- zk_coordinator.go
Directories ¶
Path | Synopsis |
---|---|
syslog_proto
Package syslog_proto is a generated protocol buffer package.
|
Package syslog_proto is a generated protocol buffer package. |