Documentation ¶
Index ¶
- Variables
- func ParseConnectionString(zookeeper string) (nodes []string, chroot string)
- type Config
- type Consumergroup
- func (cg *Consumergroup) CommitOffset(topic string, partition int32, offset int64) error
- func (cg *Consumergroup) Create() error
- func (cg *Consumergroup) Delete() error
- func (cg *Consumergroup) Exists() (bool, error)
- func (cg *Consumergroup) FetchOffset(topic string, partition int32) (int64, error)
- func (cg *Consumergroup) Instance(id string) *ConsumergroupInstance
- func (cg *Consumergroup) Instances() (ConsumergroupInstanceList, error)
- func (cg *Consumergroup) NewInstance() *ConsumergroupInstance
- func (cg *Consumergroup) PartitionOwner(topic string, partition int32) (*ConsumergroupInstance, error)
- func (cg *Consumergroup) Topics() (TopicList, error)
- func (cg *Consumergroup) WatchInstances() (ConsumergroupInstanceList, <-chan struct{}, error)
- type ConsumergroupInstance
- func (cgi *ConsumergroupInstance) ClaimPartition(topic string, partition int32) error
- func (cgi *ConsumergroupInstance) Deregister() error
- func (cgi *ConsumergroupInstance) Register(topics []string) error
- func (cgi *ConsumergroupInstance) Registered() (bool, error)
- func (cgi *ConsumergroupInstance) Registration() (*Registration, error)
- func (cgi *ConsumergroupInstance) ReleasePartition(topic string, partition int32) error
- type ConsumergroupInstanceList
- type ConsumergroupList
- type Kazoo
- func (kz *Kazoo) BrokerList() ([]string, error)
- func (kz *Kazoo) Brokers() (map[int32]string, error)
- func (kz *Kazoo) Close() error
- func (kz *Kazoo) Consumergroup(name string) *Consumergroup
- func (kz *Kazoo) Consumergroups() (ConsumergroupList, error)
- func (kz *Kazoo) Controller() (int32, error)
- func (kz *Kazoo) Topic(topic string) *Topic
- func (kz *Kazoo) Topics() (TopicList, error)
- type Partition
- type PartitionList
- type RegPattern
- type RegVersion
- type Registration
- type Topic
- type TopicList
Constants ¶
This section is empty.
Variables ¶
var ( ErrRunningInstances = errors.New("Cannot deregister a consumergroup with running instances") ErrInstanceAlreadyRegistered = errors.New("Cannot register consumer instance because it already is registered") ErrInstanceNotRegistered = errors.New("Cannot deregister consumer instance because it not registered") ErrPartitionClaimedByOther = errors.New("Cannot claim partition: it is already claimed by another instance") ErrPartitionNotClaimed = errors.New("Cannot release partition: it is not claimed by this instance") )
var (
FailedToClaimPartition = errors.New("Failed to claim partition for this consumer instance. Do you have a rogue consumer running?")
)
Functions ¶
func ParseConnectionString ¶
ParseConnectionString parses a zookeeper connection string in the form of host1:2181,host2:2181/chroot and returns the list of servers, and the chroot.
Types ¶
type Config ¶
type Config struct { // The chroot the Kafka installation is registerde under. Defaults to "". Chroot string // The amount of time the Zookeeper client can be disconnected from the Zookeeper cluster // before the cluster will get rid of watches and ephemeral nodes. Defaults to 1 second. Timeout time.Duration }
Config holds configuration values f.
type Consumergroup ¶
type Consumergroup struct { Name string // contains filtered or unexported fields }
Consumergroup represents a high-level consumer that is registered in Zookeeper,
func (*Consumergroup) CommitOffset ¶
func (cg *Consumergroup) CommitOffset(topic string, partition int32, offset int64) error
CommitOffset commits an offset to a group/topic/partition
func (*Consumergroup) Create ¶
func (cg *Consumergroup) Create() error
Create registers the consumergroup in zookeeper
func (*Consumergroup) Delete ¶
func (cg *Consumergroup) Delete() error
Delete removes the consumergroup from zookeeper
func (*Consumergroup) Exists ¶
func (cg *Consumergroup) Exists() (bool, error)
Exists checks whether the consumergroup has been registered in Zookeeper
func (*Consumergroup) FetchOffset ¶
func (cg *Consumergroup) FetchOffset(topic string, partition int32) (int64, error)
FetchOffset retrieves an offset to a group/topic/partition
func (*Consumergroup) Instance ¶
func (cg *Consumergroup) Instance(id string) *ConsumergroupInstance
Instance instantiates a new ConsumergroupInstance inside this consumer group, using an existing ID.
func (*Consumergroup) Instances ¶
func (cg *Consumergroup) Instances() (ConsumergroupInstanceList, error)
Instances returns a map of all running instances inside this consumergroup.
func (*Consumergroup) NewInstance ¶
func (cg *Consumergroup) NewInstance() *ConsumergroupInstance
NewInstance instantiates a new ConsumergroupInstance inside this consumer group, using a newly generated ID.
func (*Consumergroup) PartitionOwner ¶
func (cg *Consumergroup) PartitionOwner(topic string, partition int32) (*ConsumergroupInstance, error)
PartitionOwner returns the ConsumergroupInstance that has claimed the given partition. This can be nil if nobody has claime dit yet.
func (*Consumergroup) Topics ¶
func (cg *Consumergroup) Topics() (TopicList, error)
Topics retrieves the list of topics the consumergroup has claimed ownership of at some point.
func (*Consumergroup) WatchInstances ¶
func (cg *Consumergroup) WatchInstances() (ConsumergroupInstanceList, <-chan struct{}, error)
WatchInstances returns a ConsumergroupInstanceList, and a channel that will be closed as soon the instance list changes.
type ConsumergroupInstance ¶
type ConsumergroupInstance struct { ID string // contains filtered or unexported fields }
ConsumergroupInstance represents an instance of a Consumergroup.
func (*ConsumergroupInstance) ClaimPartition ¶
func (cgi *ConsumergroupInstance) ClaimPartition(topic string, partition int32) error
Claim claims a topic/partition ownership for a consumer ID within a group. If the partition is already claimed by another running instance, it will return ErrAlreadyClaimed.
func (*ConsumergroupInstance) Deregister ¶
func (cgi *ConsumergroupInstance) Deregister() error
Deregister removes the registration of the instance from zookeeper.
func (*ConsumergroupInstance) Register ¶
func (cgi *ConsumergroupInstance) Register(topics []string) error
Register registers the consumergroup instance in Zookeeper.
func (*ConsumergroupInstance) Registered ¶
func (cgi *ConsumergroupInstance) Registered() (bool, error)
Registered checks whether the consumergroup instance is registered in Zookeeper.
func (*ConsumergroupInstance) Registration ¶
func (cgi *ConsumergroupInstance) Registration() (*Registration, error)
Registered returns current registration of the consumer group instance.
func (*ConsumergroupInstance) ReleasePartition ¶
func (cgi *ConsumergroupInstance) ReleasePartition(topic string, partition int32) error
ReleasePartition releases a claim to a partition.
type ConsumergroupInstanceList ¶
type ConsumergroupInstanceList []*ConsumergroupInstance
func (ConsumergroupInstanceList) Find ¶
func (cgil ConsumergroupInstanceList) Find(id string) *ConsumergroupInstance
Find returns the consumergroup instance with the given ID if it exists in the list. Otherwise it will return `nil`.
func (ConsumergroupInstanceList) Len ¶
func (cgil ConsumergroupInstanceList) Len() int
func (ConsumergroupInstanceList) Less ¶
func (cgil ConsumergroupInstanceList) Less(i, j int) bool
func (ConsumergroupInstanceList) Swap ¶
func (cgil ConsumergroupInstanceList) Swap(i, j int)
type ConsumergroupList ¶
type ConsumergroupList []*Consumergroup
func (ConsumergroupList) Find ¶
func (cgl ConsumergroupList) Find(name string) *Consumergroup
Find returns the consumergroup with the given name if it exists in the list. Otherwise it will return `nil`.
func (ConsumergroupList) Len ¶
func (cgl ConsumergroupList) Len() int
func (ConsumergroupList) Less ¶
func (cgl ConsumergroupList) Less(i, j int) bool
func (ConsumergroupList) Swap ¶
func (cgl ConsumergroupList) Swap(i, j int)
type Kazoo ¶
type Kazoo struct {
// contains filtered or unexported fields
}
Kazoo interacts with the Kafka metadata in Zookeeper
func NewKazooFromConnectionString ¶
NewKazooFromConnectionString creates a new connection instance based on a zookeeer connection string that can include a chroot.
func (*Kazoo) BrokerList ¶
BrokerList returns a slice of broker addresses that can be used to connect to the Kafka cluster, e.g. using `sarama.NewAsyncProducer()`.
func (*Kazoo) Brokers ¶
Brokers returns a map of all the brokers that make part of the Kafka cluster that is regeistered in Zookeeper.
func (*Kazoo) Consumergroup ¶
func (kz *Kazoo) Consumergroup(name string) *Consumergroup
Consumergroup instantiates a new consumergroup.
func (*Kazoo) Consumergroups ¶
func (kz *Kazoo) Consumergroups() (ConsumergroupList, error)
Consumergroups returns all the registered consumergroups
func (*Kazoo) Controller ¶
Controller returns what broker is currently acting as controller of the Kafka cluster
type Partition ¶
Partition interacts with Kafka's partition metadata in Zookeeper.
func (*Partition) ISR ¶
ISR returns the broker IDs of the current in-sync replica set for the partition
func (*Partition) Leader ¶
Leader returns the broker ID of the broker that is currently the leader for the partition.
func (*Partition) UnderReplicated ¶
func (*Partition) UsesPreferredReplica ¶
type PartitionList ¶
type PartitionList []*Partition
func (PartitionList) Len ¶
func (pl PartitionList) Len() int
func (PartitionList) Less ¶
func (pl PartitionList) Less(i, j int) bool
func (PartitionList) Swap ¶
func (pl PartitionList) Swap(i, j int)
type RegPattern ¶
type RegPattern string
const ( RegPatternStatic RegPattern = "static" RegPatternWhiteList RegPattern = "white_list" RegPatternBlackList RegPattern = "black_list" )
type Registration ¶
type Registration struct { Pattern RegPattern `json:"pattern"` Subscription map[string]int `json:"subscription"` Timestamp int64 `json:"timestamp"` Version RegVersion `json:"version"` }
type Topic ¶
type Topic struct { Name string // contains filtered or unexported fields }
Topic interacts with Kafka's topic metadata in Zookeeper.
func (*Topic) Partitions ¶
func (t *Topic) Partitions() (PartitionList, error)
Partitions returns a map of all partitions for the topic.