Documentation
¶
Index ¶
- Constants
- type Consumer
- func (c *Consumer) Commit(msg *proto.Message) error
- func (c *Consumer) ConsumeChannel() <-chan *proto.Message
- func (c *Consumer) GetCurrentLag() int64
- func (c *Consumer) GetCurrentLoad() int
- func (c *Consumer) GetCurrentTopicClaims() (map[string]bool, error)
- func (c *Consumer) PrintState()
- func (c *Consumer) Terminate(release bool) bool
- func (c *Consumer) Terminated() bool
- func (c *Consumer) TopicClaims() <-chan map[string]bool
- type ConsumerOptions
- type KafkaCluster
- type Marshaler
- func (m *Marshaler) ClaimPartition(topicName string, partID int) bool
- func (m *Marshaler) ClientID() string
- func (m *Marshaler) CommitOffsets(topicName string, partID int, lastOffset int64) error
- func (m *Marshaler) GetLastPartitionClaim(topicName string, partID int) PartitionClaim
- func (m *Marshaler) GetPartitionClaim(topicName string, partID int) PartitionClaim
- func (m *Marshaler) GetPartitionOffsets(topicName string, partID int) (PartitionOffsets, error)
- func (m *Marshaler) GroupID() string
- func (m *Marshaler) Heartbeat(topicName string, partID int, lastOffset int64) error
- func (m *Marshaler) IsClaimed(topicName string, partID int) bool
- func (m *Marshaler) NewConsumer(topicNames []string, options ConsumerOptions) (*Consumer, error)
- func (m *Marshaler) Partitions(topicName string) int
- func (m *Marshaler) PrintState()
- func (m *Marshaler) ReleasePartition(topicName string, partID int, lastOffset int64) error
- func (m *Marshaler) Terminate()
- func (m *Marshaler) Terminated() bool
- func (m *Marshaler) Topics() []string
- type PartitionClaim
- type PartitionOffsets
Constants ¶
const ( // MarshalTopic is the main topic used for coordination. This must be constant across all // consumers that you want to coordinate. MarshalTopic = "__marshal" // HeartbeatInterval is the main timing used to determine how "chatty" the system is and how // fast it responds to failures of consumers. THIS VALUE MUST BE THE SAME BETWEEN ALL CONSUMERS // as it is critical to coordination. HeartbeatInterval = 60 // Measured in seconds. )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer allows you to safely consume data from a given topic in such a way that you don't need to worry about partitions and can safely split the load across as many processes as might be consuming from this topic. However, you should ONLY create one Consumer per topic in your application!
func (*Consumer) Commit ¶
Commit is called when you've finished processing a message. In the at-least-once consumption case, this will allow the "last processed offset" to move forward so that we can never see this message again. This operation does nothing for at-most-once consumption, as the commit happens in the Consume phase. TODO: AMO description is wrong.
func (*Consumer) ConsumeChannel ¶
ConsumeChannel returns a read-only channel. Messages that are retrieved from Kafka will be made available in this channel.
func (*Consumer) GetCurrentLag ¶
GetCurrentLag returns the number of messages that this consumer is lagging by. Note that this value can be unstable in the beginning of a run, as we might not have claimed all of partitions we will end up claiming, or we might have overclaimed and need to back off. Ideally this will settle towards 0. If it continues to rise, that implies there isn't enough consumer capacity.
func (*Consumer) GetCurrentLoad ¶
GetCurrentLoad returns a number representing the "load" of this consumer. Think of this like a load average in Unix systems: the numbers are kind of related to how much work the system is doing, but by itself they don't tell you much.
func (*Consumer) GetCurrentTopicClaims ¶
GetCurrentTopicClaims returns the topics that are currently claimed by this consumer. It should be relevent only when ClaimEntireTopic is set
func (*Consumer) PrintState ¶
func (c *Consumer) PrintState()
PrintState outputs the status of the consumer.
func (*Consumer) Terminate ¶
Terminate instructs the consumer to clean up and allow other consumers to begin consuming. (If you do not call this method before exiting, things will still work, but more slowly.)
func (*Consumer) Terminated ¶
Terminated returns whether or not this consumer has been terminated.
func (*Consumer) TopicClaims ¶
TopicClaims returns a read-only channel that receives updates for topic claims. It's only relevant when CLaimEntireTopic is set
type ConsumerOptions ¶
type ConsumerOptions struct { // FastReclaim instructs the consumer to attempt to reclaim any partitions // that are presently claimed by the ClientID/GroupID we have. This is useful // for situations where your ClientID is predictable/stable and you want to // minimize churn during restarts. This is dangerous if you have two copies // of your application running with the same ClientID/GroupID. // TODO: Create an instance ID for Marshaler such that we can detect when // someone else has decided to use our Client/Group. // // Note that this option ignores MaximumClaims, so it is possible to // exceed the claim limit if the ClientID previously held more claims. FastReclaim bool // ClaimEntireTopic makes Marshal handle claims on the entire topic rather than // on a per-partition basis. This is used with sharded produce/consume setups. // Defaults to false. ClaimEntireTopic bool // GreedyClaims indicates whether we should attempt to claim all unclaimed // partitions on start. This is appropriate in low QPS type environments. // Defaults to false/off. GreedyClaims bool // StrictOrdering tells the consumer that only a single message per partition // is allowed to be in-flight at a time. In order to consume the next message // you must commit the existing message. This option has a strong penalty to // consumption parallelism. StrictOrdering bool // The maximum number of claims this Consumer is allowed to hold simultaneously. // MaximumClaims indicates the maximum number of partitions to be claimed when // ClaimEntireTopic is set to false. Otherwise, it indicates the maximum number // of topics to claim. // Set to 0 (default) to allow an unlimited number of claims. // // Using this option will leave some partitions/topics completely unclaimed // if the number of Consumers in this GroupID falls below the number of // partitions/topics that exist. // // Note this limit does not apply to claims made via FastReclaim. MaximumClaims int }
ConsumerOptions represents all of the options that a consumer can be configured with.
func NewConsumerOptions ¶
func NewConsumerOptions() ConsumerOptions
NewConsumerOptions returns a default set of options for the Consumer.
type KafkaCluster ¶
type KafkaCluster struct {
// contains filtered or unexported fields
}
KafkaCluster is a user-agnostic view of the world. It connects to a Kafka cluster and runs rationalizers to observe the complete world state.
func Dial ¶
func Dial(name string, brokers []string) (*KafkaCluster, error)
Dial returns a new cluster object which can be used to instantiate a number of Marshalers that all use the same cluster.
func (*KafkaCluster) NewMarshaler ¶
func (c *KafkaCluster) NewMarshaler(clientID, groupID string) (*Marshaler, error)
NewMarshaler creates a Marshaler off of an existing cluster. This is more efficient if you're creating multiple instances, since they can share the same underlying cluster.
func (*KafkaCluster) Terminate ¶
func (c *KafkaCluster) Terminate()
Terminate is called when we're done with the marshaler and want to shut down.
func (*KafkaCluster) Terminated ¶
func (c *KafkaCluster) Terminated() bool
Terminated returns whether or not we have been terminated.
type Marshaler ¶
type Marshaler struct {
// contains filtered or unexported fields
}
Marshaler is the coordinator type. It is designed to be used once per (client, group) and is thread safe. Creating one of these will create connections to your Kafka cluster and begin actively monitoring the coordination topic.
func NewMarshaler ¶
NewMarshaler connects to a cluster (given broker addresses) and prepares to handle marshalling requests. Given the way this system works, the marshaler has to process all messages in the topic before it's safely able to begin operating. This might take a while. NOTE: If you are creating multiple marshalers in your program, you should instead call Dial and then use the NewMarshaler method on that object.
func (*Marshaler) ClaimPartition ¶
ClaimPartition is how you can actually claim a partition. If you call this, Marshal will attempt to claim the partition on your behalf. This is the low level function, you probably want to use a MarshaledConsumer. Returns a bool on whether or not the claim succeeded and whether you can continue.
func (*Marshaler) CommitOffsets ¶
CommitOffsets will commit the partition offsets to Kafka so it's available in the long-term storage of the offset coordination system. Note: this method does not ensure that this Marshal instance owns the topic/partition in question.
func (*Marshaler) GetLastPartitionClaim ¶
func (m *Marshaler) GetLastPartitionClaim(topicName string, partID int) PartitionClaim
GetLastPartitionClaim returns a PartitionClaim structure for a given partition. The structure describes the consumer that is currently or most recently claiming this partition. This is a copy of the claim structure, so changing it cannot change the world state.
func (*Marshaler) GetPartitionClaim ¶
func (m *Marshaler) GetPartitionClaim(topicName string, partID int) PartitionClaim
GetPartitionClaim returns a PartitionClaim structure for a given partition. The structure describes the consumer that is currently claiming this partition. This is a copy of the claim structure, so changing it cannot change the world state.
func (*Marshaler) GetPartitionOffsets ¶
func (m *Marshaler) GetPartitionOffsets(topicName string, partID int) (PartitionOffsets, error)
GetPartitionOffsets returns the current state of a topic/partition. This has to hit Kafka thrice to ask about a partition, but it returns the full state of information that can be used to calculate consumer lag.
func (*Marshaler) Heartbeat ¶
Heartbeat will send an update for other people to know that we're still alive and still owning this partition. Returns an error if anything has gone wrong (at which point we can no longer assert we have the lock).
func (*Marshaler) IsClaimed ¶
IsClaimed returns the current status on whether or not a partition is claimed by any other consumer in our group (including ourselves). A topic/partition that does not exist is considered to be unclaimed.
func (*Marshaler) NewConsumer ¶
func (m *Marshaler) NewConsumer(topicNames []string, options ConsumerOptions) (*Consumer, error)
NewConsumer instantiates a consumer object for a given topic. You must create a separate consumer for every individual topic that you want to consume from. Please see the documentation on ConsumerBehavior.
func (*Marshaler) Partitions ¶
Partitions returns the count of how many partitions are in a given topic. Returns 0 if a topic is unknown.
func (*Marshaler) PrintState ¶
func (m *Marshaler) PrintState()
PrintState will take the current state of the Marshal world and print it verbosely to the logging output. This is used in the rare case where we're self-terminating or on request from the user.
func (*Marshaler) ReleasePartition ¶
ReleasePartition will send an update for other people to know that we're done with a partition. Returns an error if anything has gone wrong (at which point we can no longer assert we have the lock).
func (*Marshaler) Terminate ¶
func (m *Marshaler) Terminate()
Terminate is called when we're done with the marshaler and want to shut down.
func (*Marshaler) Terminated ¶
Terminated returns whether or not we have been terminated.
type PartitionClaim ¶
type PartitionClaim struct { LastHeartbeat int64 LastOffset int64 ClientID string GroupID string // contains filtered or unexported fields }
PartitionClaim contains claim information about a given partition.
type PartitionOffsets ¶
PartitionOffsets is a record of offsets for a given partition. Contains information combined from Kafka and our current state.
A Kafka partition consists of N messages with offsets. In the basic case, you can think of an offset like an array index. With log compaction and other trickery it acts more like a sparse array, but it's a close enough metaphor.
We keep track of four values for offsets:
offsets 1 2 3 7 9 10 11 partition [ msg1, msg2, msg3, msg4, msg5, msg6, msg7, ... ] ^ ^ ^ \- Earliest | | \- Current Latest
In this example, Earliest is 1 which is the "oldest" offset within the partition. At any given time this offset might become invalid if a log rolls so we might update it.
Current is 7, which is the offset of the NEXT message i.e. this message has not been consumed yet.
Latest is 12, which is the offset that Kafka will assign to the message that next gets committed to the partition. This offset does not yet exist, and might never.
Committed is the value recorded in Kafka's committed offsets system.