Documentation ¶
Index ¶
- Variables
- func BuildConnectionString(nodes []string) string
- func BuildConnectionStringWithChroot(nodes []string, chroot string) string
- func ParseConnectionString(zookeeper string) (nodes []string, chroot string)
- type Config
- type Consumergroup
- func (cg *Consumergroup) CommitOffset(topic string, partition int32, offset int64) error
- func (cg *Consumergroup) Create() error
- func (cg *Consumergroup) Delete() error
- func (cg *Consumergroup) Exists() (bool, error)
- func (cg *Consumergroup) FetchAllOffsets() (map[string]map[int32]int64, error)
- func (cg *Consumergroup) FetchOffset(topic string, partition int32) (int64, error)
- func (cg *Consumergroup) Instance(id string) *ConsumergroupInstance
- func (cg *Consumergroup) Instances() (ConsumergroupInstanceList, error)
- func (cg *Consumergroup) NewInstance() *ConsumergroupInstance
- func (cg *Consumergroup) PartitionOwner(topic string, partition int32) (*ConsumergroupInstance, error)
- func (cg *Consumergroup) ResetOffsets() error
- func (cg *Consumergroup) Topics() (TopicList, error)
- func (cg *Consumergroup) WatchInstances() (ConsumergroupInstanceList, <-chan zk.Event, error)
- func (cg *Consumergroup) WatchPartitionOwner(topic string, partition int32) (*ConsumergroupInstance, <-chan zk.Event, error)
- type ConsumergroupInstance
- func (cgi *ConsumergroupInstance) ClaimPartition(topic string, partition int32) error
- func (cgi *ConsumergroupInstance) Deregister() error
- func (cgi *ConsumergroupInstance) Register(topics []string) error
- func (cgi *ConsumergroupInstance) RegisterWithSubscription(subscriptionJSON []byte) error
- func (cgi *ConsumergroupInstance) Registered() (bool, error)
- func (cgi *ConsumergroupInstance) Registration() (*Registration, error)
- func (cgi *ConsumergroupInstance) ReleasePartition(topic string, partition int32) error
- func (cgi *ConsumergroupInstance) UpdateRegistration(topics []string) error
- func (cgi *ConsumergroupInstance) WatchRegistration() (*Registration, <-chan zk.Event, error)
- type ConsumergroupInstanceList
- type ConsumergroupList
- type Kazoo
- func (kz *Kazoo) BrokerList() ([]string, error)
- func (kz *Kazoo) Brokers() (map[int32]string, error)
- func (kz *Kazoo) Close() error
- func (kz *Kazoo) Consumergroup(name string) *Consumergroup
- func (kz *Kazoo) Consumergroups() (ConsumergroupList, error)
- func (kz *Kazoo) Controller() (int32, error)
- func (kz *Kazoo) CreateTopic(name string, partitionCount int, replicationFactor int, ...) error
- func (kz *Kazoo) DeleteTopic(name string) error
- func (kz *Kazoo) DeleteTopicSync(name string, timeout time.Duration) error
- func (kz *Kazoo) Topic(topic string) *Topic
- func (kz *Kazoo) Topics() (TopicList, error)
- func (kz *Kazoo) WatchTopics() (TopicList, <-chan zk.Event, error)
- type Partition
- func (p *Partition) ISR() ([]int32, error)
- func (p *Partition) Key() string
- func (p *Partition) Leader() (int32, error)
- func (p *Partition) PreferredReplica() int32
- func (p *Partition) Topic() *Topic
- func (p *Partition) UnderReplicated() (bool, error)
- func (p *Partition) UsesPreferredReplica() (bool, error)
- type PartitionList
- type RegPattern
- type RegVersion
- type Registration
- type Topic
- func (t *Topic) Config() (map[string]string, error)
- func (t *Topic) Exists() (bool, error)
- func (t *Topic) Partition(id int32, replicas []int32) *Partition
- func (t *Topic) Partitions() (PartitionList, error)
- func (t *Topic) Watch() (<-chan zk.Event, error)
- func (t *Topic) WatchPartitions() (PartitionList, <-chan zk.Event, error)
- type TopicList
Constants ¶
This section is empty.
Variables ¶
var ( ErrRunningInstances = errors.New("Cannot deregister a consumergroup with running instances") ErrInstanceAlreadyRegistered = errors.New("Consumer instance already registered") ErrInstanceNotRegistered = errors.New("Consumer instance not registered") ErrPartitionClaimedByOther = errors.New("Cannot claim partition: it is already claimed by another instance") ErrPartitionNotClaimed = errors.New("Cannot release partition: it is not claimed by this instance") )
var ( ErrTopicExists = errors.New("Topic already exists") ErrTopicMarkedForDelete = errors.New("Topic is already marked for deletion") ErrDeletionTimedOut = errors.New("Timed out while waiting for a topic to be deleted") )
var ( ErrInvalidPartitionCount = errors.New("Number of partitions must be larger than 0") ErrInvalidReplicationFactor = errors.New("Replication factor must be between 1 and the number of brokers") ErrInvalidReplicaCount = errors.New("All partitions must have the same number of replicas") ErrReplicaBrokerOverlap = errors.New("All replicas for a partition must be on separate brokers") ErrInvalidBroker = errors.New("Replica assigned to invalid broker") ErrMissingPartitionID = errors.New("Partition ids must be sequential starting from 0") ErrDuplicatePartitionID = errors.New("Each partition must have a unique ID") )
var (
FailedToClaimPartition = errors.New("Failed to claim partition for this consumer instance. Do you have a rogue consumer running?")
)
Functions ¶
func BuildConnectionString ¶
BuildConnectionString builds a Zookeeper connection string for a list of nodes. Returns a string like "zk1:2181,zk2:2181,zk3:2181"
func BuildConnectionStringWithChroot ¶
ConnectionStringWithChroot builds a Zookeeper connection string for a list of nodes and a chroot. The chroot should start with "/". Returns a string like "zk1:2181,zk2:2181,zk3:2181/chroot"
func ParseConnectionString ¶
ParseConnectionString parses a zookeeper connection string in the form of host1:2181,host2:2181/chroot and returns the list of servers, and the chroot.
Types ¶
type Config ¶
type Config struct { // The chroot the Kafka installation is registerde under. Defaults to "". Chroot string // The amount of time the Zookeeper client can be disconnected from the Zookeeper cluster // before the cluster will get rid of watches and ephemeral nodes. Defaults to 1 second. Timeout time.Duration // Logger Logger zk.Logger }
Config holds configuration values f.
type Consumergroup ¶
type Consumergroup struct { Name string // contains filtered or unexported fields }
Consumergroup represents a high-level consumer that is registered in Zookeeper,
func (*Consumergroup) CommitOffset ¶
func (cg *Consumergroup) CommitOffset(topic string, partition int32, offset int64) error
CommitOffset commits an offset to a group/topic/partition
func (*Consumergroup) Create ¶
func (cg *Consumergroup) Create() error
Create registers the consumergroup in zookeeper
func (*Consumergroup) Delete ¶
func (cg *Consumergroup) Delete() error
Delete removes the consumergroup from zookeeper
func (*Consumergroup) Exists ¶
func (cg *Consumergroup) Exists() (bool, error)
Exists checks whether the consumergroup has been registered in Zookeeper
func (*Consumergroup) FetchAllOffsets ¶
func (cg *Consumergroup) FetchAllOffsets() (map[string]map[int32]int64, error)
FetchOffset retrieves all the commmitted offsets for a group
func (*Consumergroup) FetchOffset ¶
func (cg *Consumergroup) FetchOffset(topic string, partition int32) (int64, error)
FetchOffset retrieves an offset to a group/topic/partition
func (*Consumergroup) Instance ¶
func (cg *Consumergroup) Instance(id string) *ConsumergroupInstance
Instance instantiates a new ConsumergroupInstance inside this consumer group, using an existing ID.
func (*Consumergroup) Instances ¶
func (cg *Consumergroup) Instances() (ConsumergroupInstanceList, error)
Instances returns a map of all running instances inside this consumergroup.
func (*Consumergroup) NewInstance ¶
func (cg *Consumergroup) NewInstance() *ConsumergroupInstance
NewInstance instantiates a new ConsumergroupInstance inside this consumer group, using a newly generated ID.
func (*Consumergroup) PartitionOwner ¶
func (cg *Consumergroup) PartitionOwner(topic string, partition int32) (*ConsumergroupInstance, error)
PartitionOwner returns the ConsumergroupInstance that has claimed the given partition. This can be nil if nobody has claimed it yet.
func (*Consumergroup) ResetOffsets ¶
func (cg *Consumergroup) ResetOffsets() error
func (*Consumergroup) Topics ¶
func (cg *Consumergroup) Topics() (TopicList, error)
Topics retrieves the list of topics the consumergroup has claimed ownership of at some point.
func (*Consumergroup) WatchInstances ¶
func (cg *Consumergroup) WatchInstances() (ConsumergroupInstanceList, <-chan zk.Event, error)
WatchInstances returns a ConsumergroupInstanceList, and a channel that will be closed as soon the instance list changes.
func (*Consumergroup) WatchPartitionOwner ¶
func (cg *Consumergroup) WatchPartitionOwner(topic string, partition int32) (*ConsumergroupInstance, <-chan zk.Event, error)
WatchPartitionOwner retrieves what instance is currently owning the partition, and sets a Zookeeper watch to be notified of changes. If the partition currently does not have an owner, the function returns nil for every return value. In this case is should be safe to claim the partition for an instance.
type ConsumergroupInstance ¶
type ConsumergroupInstance struct { ID string // contains filtered or unexported fields }
ConsumergroupInstance represents an instance of a Consumergroup.
func (*ConsumergroupInstance) ClaimPartition ¶
func (cgi *ConsumergroupInstance) ClaimPartition(topic string, partition int32) error
Claim claims a topic/partition ownership for a consumer ID within a group. If the partition is already claimed by another running instance, it will return ErrAlreadyClaimed.
func (*ConsumergroupInstance) Deregister ¶
func (cgi *ConsumergroupInstance) Deregister() error
Deregister removes the registration of the instance from zookeeper.
func (*ConsumergroupInstance) Register ¶
func (cgi *ConsumergroupInstance) Register(topics []string) error
Register registers the consumergroup instance in Zookeeper.
func (*ConsumergroupInstance) RegisterWithSubscription ¶
func (cgi *ConsumergroupInstance) RegisterWithSubscription(subscriptionJSON []byte) error
RegisterSubscription registers the consumer instance in Zookeeper, with its subscription.
func (*ConsumergroupInstance) Registered ¶
func (cgi *ConsumergroupInstance) Registered() (bool, error)
Registered checks whether the consumergroup instance is registered in Zookeeper.
func (*ConsumergroupInstance) Registration ¶
func (cgi *ConsumergroupInstance) Registration() (*Registration, error)
Registered returns current registration of the consumer group instance.
func (*ConsumergroupInstance) ReleasePartition ¶
func (cgi *ConsumergroupInstance) ReleasePartition(topic string, partition int32) error
ReleasePartition releases a claim to a partition.
func (*ConsumergroupInstance) UpdateRegistration ¶
func (cgi *ConsumergroupInstance) UpdateRegistration(topics []string) error
UpdateRegistration updates a consumer group member registration. If the consumer group member has not been registered yet, then an error is returned.
func (*ConsumergroupInstance) WatchRegistration ¶
func (cgi *ConsumergroupInstance) WatchRegistration() (*Registration, <-chan zk.Event, error)
WatchRegistered returns current registration of the consumer group instance, and a channel that will be closed as soon the registration changes.
type ConsumergroupInstanceList ¶
type ConsumergroupInstanceList []*ConsumergroupInstance
ConsumergroupInstanceList implements the sortable interface on top of a consumer instance list
func (ConsumergroupInstanceList) Find ¶
func (cgil ConsumergroupInstanceList) Find(id string) *ConsumergroupInstance
Find returns the consumergroup instance with the given ID if it exists in the list. Otherwise it will return `nil`.
func (ConsumergroupInstanceList) Len ¶
func (cgil ConsumergroupInstanceList) Len() int
func (ConsumergroupInstanceList) Less ¶
func (cgil ConsumergroupInstanceList) Less(i, j int) bool
func (ConsumergroupInstanceList) Swap ¶
func (cgil ConsumergroupInstanceList) Swap(i, j int)
type ConsumergroupList ¶
type ConsumergroupList []*Consumergroup
ConsumergroupList implements the sortable interface on top of a consumer group list
func (ConsumergroupList) Find ¶
func (cgl ConsumergroupList) Find(name string) *Consumergroup
Find returns the consumergroup with the given name if it exists in the list. Otherwise it will return `nil`.
func (ConsumergroupList) Len ¶
func (cgl ConsumergroupList) Len() int
func (ConsumergroupList) Less ¶
func (cgl ConsumergroupList) Less(i, j int) bool
func (ConsumergroupList) Swap ¶
func (cgl ConsumergroupList) Swap(i, j int)
type Kazoo ¶
type Kazoo struct {
// contains filtered or unexported fields
}
Kazoo interacts with the Kafka metadata in Zookeeper
func NewKazooFromConnectionString ¶
NewKazooFromConnectionString creates a new connection instance based on a zookeeer connection string that can include a chroot.
func (*Kazoo) BrokerList ¶
BrokerList returns a slice of broker addresses that can be used to connect to the Kafka cluster, e.g. using `sarama.NewAsyncProducer()`.
func (*Kazoo) Brokers ¶
Brokers returns a map of all the brokers that make part of the Kafka cluster that is registered in Zookeeper.
func (*Kazoo) Consumergroup ¶
func (kz *Kazoo) Consumergroup(name string) *Consumergroup
Consumergroup instantiates a new consumergroup.
func (*Kazoo) Consumergroups ¶
func (kz *Kazoo) Consumergroups() (ConsumergroupList, error)
Consumergroups returns all the registered consumergroups
func (*Kazoo) Controller ¶
Controller returns what broker is currently acting as controller of the Kafka cluster
func (*Kazoo) CreateTopic ¶
func (kz *Kazoo) CreateTopic(name string, partitionCount int, replicationFactor int, topicConfig map[string]string) error
CreateTopic creates a new kafka topic with the specified parameters and properties
func (*Kazoo) DeleteTopic ¶
DeleteTopic marks a kafka topic for deletion. Deleting a topic is asynchronous and DeleteTopic will return before Kafka actually does the deletion.
func (*Kazoo) DeleteTopicSync ¶
DeleteTopicSync marks a kafka topic for deletion and waits until it is deleted before returning.
type Partition ¶
Partition interacts with Kafka's partition metadata in Zookeeper.
func (*Partition) ISR ¶
ISR returns the broker IDs of the current in-sync replica set for the partition
func (*Partition) Key ¶
Key returns a unique identifier for the partition, using the form "topic/partition".
func (*Partition) Leader ¶
Leader returns the broker ID of the broker that is currently the leader for the partition.
func (*Partition) PreferredReplica ¶
PreferredReplica returns the preferred replica for this partition.
func (*Partition) UnderReplicated ¶
func (*Partition) UsesPreferredReplica ¶
type PartitionList ¶
type PartitionList []*Partition
PartitionList is a type that implements the sortable interface for a list of Partition instances
func (PartitionList) Len ¶
func (pl PartitionList) Len() int
func (PartitionList) Less ¶
func (pl PartitionList) Less(i, j int) bool
func (PartitionList) Swap ¶
func (pl PartitionList) Swap(i, j int)
type RegPattern ¶
type RegPattern string
const ( RegPatternStatic RegPattern = "static" RegPatternWhiteList RegPattern = "white_list" RegPatternBlackList RegPattern = "black_list" )
type Registration ¶
type Registration struct { Pattern RegPattern `json:"pattern"` Subscription map[string]int `json:"subscription"` Timestamp int64 `json:"timestamp"` Version RegVersion `json:"version"` }
type Topic ¶
type Topic struct { Name string // contains filtered or unexported fields }
Topic interacts with Kafka's topic metadata in Zookeeper.
func (*Topic) Partitions ¶
func (t *Topic) Partitions() (PartitionList, error)
Partitions returns a list of all partitions for the topic.
func (*Topic) WatchPartitions ¶
func (t *Topic) WatchPartitions() (PartitionList, <-chan zk.Event, error)
WatchPartitions returns a list of all partitions for the topic, and watches the topic for changes.