Documentation ¶
Overview ¶
Package zk is a helper lib that manages kafka cluster meta data and consumer meta data.
Index ¶
- Constants
- Variables
- func ClusterPath(cluster string) string
- func DbusCheckpointRoot(cluster string) string
- func DbusClusterRoot(cluster string) string
- func DbusConfig(cluster string) string
- func DbusConfigDir(cluster string) string
- func DefaultZkSessionTimeout() time.Duration
- func TimestampToTime(ts string) time.Time
- type ActorList
- type BrokerInfo
- type BrokerZnode
- type Config
- type ConsumerMeta
- type ConsumerZnode
- type ControllerMeta
- type EsCluster
- type KatewayMeta
- type KguardMeta
- type Orchestrator
- func (this *Orchestrator) ActorRegistered(id string) (bool, error)
- func (this *Orchestrator) ClaimResource(actorId, root, resource string) (err error)
- func (this *Orchestrator) JobQueueCluster(topic string) (string, error)
- func (this *Orchestrator) RegisterActor(id string, val []byte) error
- func (this *Orchestrator) ReleaseResource(actorId, root, resource string) error
- func (this *Orchestrator) ResignActor(id string) error
- func (this *Orchestrator) WatchActors() (ActorList, <-chan zk.Event, error)
- func (this *Orchestrator) WatchResources(path string) (ResourceList, <-chan zk.Event, error)
- func (this *Orchestrator) WebhookInfo(topic string) (*WebhookMeta, error)
- type PartitionOffset
- type RedisCluster
- type RedisInstance
- type ResourceList
- type TopicConfig
- type TopicConfigInfo
- type TopicConfigMeta
- type TopicZnode
- type WebhookMeta
- type ZkCluster
- func (this *ZkCluster) AddTopic(topic string, ts *sla.TopicSla) (output []string, err error)
- func (this *ZkCluster) AlterTopic(topic string, ts *sla.TopicSla) (output []string, err error)
- func (this *ZkCluster) Broker(id int) (b *BrokerZnode)
- func (this *ZkCluster) BrokerList() []string
- func (this *ZkCluster) Brokers() map[string]*BrokerZnode
- func (this *ZkCluster) Chroot() string
- func (this *ZkCluster) ClusterInfoPath() string
- func (this *ZkCluster) ConfiggedTopics() map[string]TopicConfigMeta
- func (this *ZkCluster) ConsumerGroupOffsetPath(group string) string
- func (this *ZkCluster) ConsumerGroupRoot(group string) string
- func (this *ZkCluster) ConsumerGroups() map[string]map[string]*ConsumerZnode
- func (this *ZkCluster) ConsumerGroupsOfTopic(topic string) (map[string][]ConsumerMeta, error)
- func (this *ZkCluster) ConsumerOffsetsOfGroup(group string) map[string]map[string]int64
- func (this *ZkCluster) ConsumersByGroup(groupPattern string) map[string][]ConsumerMeta
- func (this *ZkCluster) DeleteTopic(topic string) (output []string, err error)
- func (this *ZkCluster) GetTopicConfigPath(topic string) string
- func (this *ZkCluster) Isr(topic string, partitionId int32) ([]int, time.Time, time.Time)
- func (this *ZkCluster) ListChildren(recursive bool) ([]string, error)
- func (this *ZkCluster) Name() string
- func (this *ZkCluster) NamedBrokerList() []string
- func (this *ZkCluster) NamedZkConnectAddr() string
- func (this *ZkCluster) OnlyNamedBrokerList() []string
- func (this *ZkCluster) OwnersOfGroupByTopic(group, topic string) map[string]string
- func (this *ZkCluster) Partitions(topic string) []int32
- func (this *ZkCluster) RegisterBroker(id int, host string, port int) error
- func (this *ZkCluster) RegisteredInfo() ZkCluster
- func (this *ZkCluster) ResetConsumerGroupOffset(topic, group, partition string, offset int64) error
- func (this *ZkCluster) SetNickname(name string)
- func (this *ZkCluster) SetPriority(priority int)
- func (this *ZkCluster) SetPublic(public bool)
- func (this *ZkCluster) SetReplicas(replicas int)
- func (this *ZkCluster) SetRetention(retention int)
- func (this *ZkCluster) SimpleConsumeKafkaTopic(topic string, msgChan chan<- *sarama.ConsumerMessage) error
- func (this *ZkCluster) TailMessage(topic string, partitionID int32, lastN int) ([][]byte, error)
- func (this *ZkCluster) TopicConfigInfo(topic string) (tci TopicConfigInfo, err error)
- func (this *ZkCluster) TopicConfigRoot() string
- func (this *ZkCluster) Topics() ([]string, error)
- func (this *ZkCluster) TopicsCtime() map[string]time.Time
- func (this *ZkCluster) TotalConsumerOffsets(topicPattern string) (total int64)
- func (this *ZkCluster) UnregisterBroker(id int) error
- func (this *ZkCluster) WatchTopics() ([]string, <-chan zk.Event, error)
- func (this *ZkCluster) ZkConnectAddr() string
- func (this *ZkCluster) ZkZone() *ZkZone
- func (this *ZkCluster) ZombieConsumerGroups(autofix bool) (groups []string)
- type ZkStat
- type ZkTimestamp
- type ZkZone
- func (this *ZkZone) AddRedis(host string, port int)
- func (this *ZkZone) AddRedisCluster(cluster string, instances []RedisInstance)
- func (this *ZkZone) AllRedis() []string
- func (this *ZkZone) AllRedisClusters() []RedisCluster
- func (this *ZkZone) CallSOS(caller string, msg string)
- func (this *ZkZone) ChildrenWithData(path string) map[string]zkData
- func (this *ZkZone) Close()
- func (this *ZkZone) ClusterPath(name string) string
- func (this *ZkZone) Clusters() map[string]string
- func (this *ZkZone) Conn() *zk.Conn
- func (this *ZkZone) Connect() (err error)
- func (this *ZkZone) CreateDbusCluster(name string) error
- func (this *ZkZone) CreateEphemeralZnode(path string, data []byte) error
- func (this *ZkZone) CreateEsCluster(name string) error
- func (this *ZkZone) CreateJobQueue(topic, cluster string) error
- func (this *ZkZone) CreateOrUpdateWebhook(topic string, hook WebhookMeta) error
- func (this *ZkZone) CreatePermenantZnode(path string, data []byte) error
- func (this *ZkZone) DefaultDbusCluster() (cluster string)
- func (this *ZkZone) DelRedis(host string, port int)
- func (this *ZkZone) DelRedisCluster(cluster string)
- func (this *ZkZone) DeleteRecursive(node string) (err error)
- func (this *ZkZone) DiscoverClusters(rootPath string) ([]string, error)
- func (this *ZkZone) EnsurePathExists(path string) error
- func (this *ZkZone) Errors() []error
- func (this *ZkZone) FlushKatewayMetrics(katewayId string, key string, data []byte) error
- func (this *ZkZone) ForSortedBrokers(fn func(cluster string, brokers map[string]*BrokerZnode))
- func (this *ZkZone) ForSortedClusters(fn func(zkcluster *ZkCluster))
- func (this *ZkZone) ForSortedControllers(fn func(cluster string, controller *ControllerMeta))
- func (this *ZkZone) ForSortedDbusClusters(fn func(name string, data []byte))
- func (this *ZkZone) ForSortedEsClusters(fn func(*EsCluster))
- func (this *ZkZone) HostBelongs(hostIp string) (liveClusters, registeredClusters []string)
- func (this *ZkZone) KatewayInfoById(id string) *KatewayMeta
- func (this *ZkZone) KatewayInfos() ([]*KatewayMeta, error)
- func (this *ZkZone) KatewayJobClusterConfig() (data []byte, err error)
- func (this *ZkZone) KatewayMysqlDsn() (string, error)
- func (this *ZkZone) KguardInfos() ([]*KguardMeta, error)
- func (this *ZkZone) LoadKatewayMetrics(katewayId string, key string) ([]byte, error)
- func (this *ZkZone) Name() string
- func (this *ZkZone) NewCluster(cluster string) *ZkCluster
- func (this *ZkZone) NewEsCluster(name string) *EsCluster
- func (this *ZkZone) NewOrchestrator() *Orchestrator
- func (this *ZkZone) NewclusterWithPath(cluster, path string) *ZkCluster
- func (this *ZkZone) Ping() error
- func (this *ZkZone) PublicClusters() []*ZkCluster
- func (this *ZkZone) RegisterCluster(name, path string) error
- func (this *ZkZone) ResetErrors()
- func (this *ZkZone) RunZkFourLetterCommand(cmd string) map[string]string
- func (this *ZkZone) SessionEvents() (<-chan zk.Event, bool)
- func (this *ZkZone) SessionTimeout() time.Duration
- func (this *ZkZone) ZkAddrList() []string
- func (this *ZkZone) ZkAddrs() string
Constants ¶
const ( KatewayIdsRoot = "/_kateway/ids" KatewayMysqlPath = "/_kateway/mysql" PubsubJobConfig = "/_kateway/orchestrator/jobconfig" PubsubJobQueues = "/_kateway/orchestrator/jobs" PubsubActors = "/_kateway/orchestrator/actors/ids" PubsubJobQueueOwners = "/_kateway/orchestrator/actors/job_owners" PubsubWebhooks = "/_kateway/orchestrator/webhooks" PubsubWebhooksOff = "/_kateway/orchestrator/webhooks_off" PubsubWebhookOwners = "/_kateway/orchestrator/actors/webhook_owners" KguardLeaderPath = "_kguard/leader" ConsumersPath = "/consumers" BrokerIdsPath = "/brokers/ids" BrokerTopicsPath = "/brokers/topics" ControllerPath = "/controller" ControllerEpochPath = "/controller_epoch" BrokerSequenceIdPath = "/brokers/seqid" EntityConfigChangesPath = "/config/changes" TopicConfigPath = "/config/topics" EntityConfigPath = "/config" DeleteTopicsPath = "/admin/delete_topics" RedisMonPath = "/redis" RedisClusterRoot = "/rediscluster" DbusRoot = "/dbus" )
Variables ¶
var ( ErrDupConnect = errors.New("connect while being connected") ErrClaimedByOthers = errors.New("claimed by others") ErrNotClaimed = errors.New("release non-claimed") )
var PanicHandler func(interface{})
Functions ¶
func ClusterPath ¶
func DbusCheckpointRoot ¶
func DbusClusterRoot ¶
func DbusConfig ¶
func DbusConfigDir ¶
func DefaultZkSessionTimeout ¶
func TimestampToTime ¶
Types ¶
type BrokerInfo ¶
func (*BrokerInfo) Addr ¶
func (this *BrokerInfo) Addr() string
func (*BrokerInfo) NamedAddr ¶
func (this *BrokerInfo) NamedAddr() string
type BrokerZnode ¶
type BrokerZnode struct { Id string `json:-` JmxPort int `json:"jmx_port"` Timestamp string `json:"timestamp"` Endpoints []string `json:"endpoints"` Host string `json:"host"` Port int `json:"port"` Version int `json:"version"` }
func (*BrokerZnode) Addr ¶
func (b *BrokerZnode) Addr() string
func (*BrokerZnode) NamedAddr ¶
func (this *BrokerZnode) NamedAddr() (string, bool)
func (BrokerZnode) NamedString ¶
func (b BrokerZnode) NamedString() string
func (BrokerZnode) String ¶
func (b BrokerZnode) String() string
func (*BrokerZnode) Uptime ¶
func (b *BrokerZnode) Uptime() time.Time
type Config ¶
func DefaultConfig ¶
type ConsumerMeta ¶
type ConsumerMeta struct { Group string Online bool Topic string PartitionId string Mtime ZkTimestamp ConsumerOffset int64 OldestOffset int64 ProducerOffset int64 // newest offset Lag int64 ConsumerZnode *ConsumerZnode }
type ConsumerZnode ¶
type ConsumerZnode struct { Id string `json:-` Version int `json:"version"` Subscription map[string]int `json:"subscription"` // topic:count Pattern string `json:"pattern"` Timestamp interface{} `json:"timestamp"` }
func (*ConsumerZnode) ClientRealIP ¶
func (c *ConsumerZnode) ClientRealIP() (ip string)
func (*ConsumerZnode) Host ¶
func (c *ConsumerZnode) Host() string
func (*ConsumerZnode) String ¶
func (c *ConsumerZnode) String() string
func (*ConsumerZnode) Topics ¶
func (c *ConsumerZnode) Topics() []string
func (*ConsumerZnode) Uptime ¶
func (c *ConsumerZnode) Uptime() time.Time
type ControllerMeta ¶
type ControllerMeta struct { Broker *BrokerZnode Mtime ZkTimestamp Epoch string }
func (*ControllerMeta) String ¶
func (c *ControllerMeta) String() string
type EsCluster ¶
type EsCluster struct { Name string // contains filtered or unexported fields }
func (*EsCluster) FirstBootstrapNode ¶
type KatewayMeta ¶
type KatewayMeta struct { Id string `json:"id"` Zone string `json:"zone"` Ver string `json:"ver"` Build string `json:"build"` BuiltAt string `json:"builtat"` Arch string `json:"arch"` Host string `json:"host"` Ip string `json:"ip"` Cpu string `json:"cpu"` PubAddr string `json:"pub"` SPubAddr string `json:"spub"` SubAddr string `json:"sub"` SSubAddr string `json:"ssub"` ManAddr string `json:"man"` SManAddr string `json:"sman"` DebugAddr string `json:"debug"` Ctime time.Time `json:"-"` }
type Orchestrator ¶
type Orchestrator struct {
*ZkZone
}
func (*Orchestrator) ActorRegistered ¶
func (this *Orchestrator) ActorRegistered(id string) (bool, error)
func (*Orchestrator) ClaimResource ¶
func (this *Orchestrator) ClaimResource(actorId, root, resource string) (err error)
func (*Orchestrator) JobQueueCluster ¶
func (this *Orchestrator) JobQueueCluster(topic string) (string, error)
func (*Orchestrator) RegisterActor ¶
func (this *Orchestrator) RegisterActor(id string, val []byte) error
func (*Orchestrator) ReleaseResource ¶
func (this *Orchestrator) ReleaseResource(actorId, root, resource string) error
func (*Orchestrator) ResignActor ¶
func (this *Orchestrator) ResignActor(id string) error
func (*Orchestrator) WatchActors ¶
func (this *Orchestrator) WatchActors() (ActorList, <-chan zk.Event, error)
func (*Orchestrator) WatchResources ¶
func (this *Orchestrator) WatchResources(path string) (ResourceList, <-chan zk.Event, error)
func (*Orchestrator) WebhookInfo ¶
func (this *Orchestrator) WebhookInfo(topic string) (*WebhookMeta, error)
type PartitionOffset ¶
type RedisCluster ¶
type RedisCluster struct { Name string `json:"name"` Desciption string `json:"desc"` Members []RedisInstance `json:"members"` }
type RedisInstance ¶
func (RedisInstance) String ¶
func (ri RedisInstance) String() string
type ResourceList ¶
type ResourceList []string
func (ResourceList) Len ¶
func (this ResourceList) Len() int
func (ResourceList) Less ¶
func (this ResourceList) Less(i, j int) bool
func (ResourceList) Swap ¶
func (this ResourceList) Swap(i, j int)
type TopicConfig ¶
type TopicConfig struct {
Config TopicConfigInfo `json:"config"`
}
type TopicConfigInfo ¶
type TopicConfigInfo struct {
RetentionMs string `json:"retention.ms"`
}
func (TopicConfigInfo) RetentionSeconds ¶
func (tci TopicConfigInfo) RetentionSeconds() time.Duration
type TopicConfigMeta ¶
type TopicZnode ¶
type WebhookMeta ¶
func (*WebhookMeta) Bytes ¶
func (this *WebhookMeta) Bytes() []byte
func (*WebhookMeta) From ¶
func (this *WebhookMeta) From(b []byte) error
type ZkCluster ¶
type ZkCluster struct { Nickname string `json:"nickname"` Roster []BrokerInfo `json:"roster"` // manually registered brokers Replicas int `json:"replicas"` Priority int `json:"priority"` Public bool `json:"public"` Retention int `json:"retention"` // in hours // contains filtered or unexported fields }
ZkCluster is a kafka cluster that has a chroot path in Zookeeper.
func (*ZkCluster) AlterTopic ¶
func (*ZkCluster) Broker ¶
func (this *ZkCluster) Broker(id int) (b *BrokerZnode)
func (*ZkCluster) BrokerList ¶
func (*ZkCluster) Brokers ¶
func (this *ZkCluster) Brokers() map[string]*BrokerZnode
Returns online {brokerId: broker}.
func (*ZkCluster) ClusterInfoPath ¶
func (*ZkCluster) ConfiggedTopics ¶
func (this *ZkCluster) ConfiggedTopics() map[string]TopicConfigMeta
ConfiggedTopics returns topics and theirs configs in zk:/config/topics that have non-default configuration.
func (*ZkCluster) ConsumerGroupOffsetPath ¶
func (*ZkCluster) ConsumerGroupRoot ¶
func (*ZkCluster) ConsumerGroups ¶
func (this *ZkCluster) ConsumerGroups() map[string]map[string]*ConsumerZnode
Returns {groupName: {consumerId: consumer}}
func (*ZkCluster) ConsumerGroupsOfTopic ¶
func (this *ZkCluster) ConsumerGroupsOfTopic(topic string) (map[string][]ConsumerMeta, error)
returns {consumerGroup: consumerInfo}
func (*ZkCluster) ConsumerOffsetsOfGroup ¶
Returns {topic: {partitionId: offset}}
func (*ZkCluster) ConsumersByGroup ¶
func (this *ZkCluster) ConsumersByGroup(groupPattern string) map[string][]ConsumerMeta
returns {consumerGroup: consumerInfo}
func (*ZkCluster) DeleteTopic ¶
func (*ZkCluster) GetTopicConfigPath ¶
func (*ZkCluster) ListChildren ¶
func (*ZkCluster) NamedBrokerList ¶
func (*ZkCluster) NamedZkConnectAddr ¶
func (*ZkCluster) OnlyNamedBrokerList ¶
OnlyNamedBrokerList only returns the brokers that has internal reverse DNS records.
func (*ZkCluster) OwnersOfGroupByTopic ¶
Returns {partitionId: consumerId} consumerId is /consumers/$group/ids/$consumerId
func (*ZkCluster) Partitions ¶
func (*ZkCluster) RegisterBroker ¶
func (*ZkCluster) RegisteredInfo ¶
Get registered cluster info from zk.
func (*ZkCluster) ResetConsumerGroupOffset ¶
func (*ZkCluster) SetNickname ¶
func (*ZkCluster) SetPriority ¶
func (*ZkCluster) SetReplicas ¶
func (*ZkCluster) SetRetention ¶
func (*ZkCluster) SimpleConsumeKafkaTopic ¶
func (this *ZkCluster) SimpleConsumeKafkaTopic(topic string, msgChan chan<- *sarama.ConsumerMessage) error
func (*ZkCluster) TailMessage ¶
func (*ZkCluster) TopicConfigInfo ¶
func (this *ZkCluster) TopicConfigInfo(topic string) (tci TopicConfigInfo, err error)
func (*ZkCluster) TopicConfigRoot ¶
func (*ZkCluster) TotalConsumerOffsets ¶
func (*ZkCluster) UnregisterBroker ¶
func (*ZkCluster) WatchTopics ¶
func (*ZkCluster) ZkConnectAddr ¶
kafka servers.properties zookeeper.connect=
func (*ZkCluster) ZombieConsumerGroups ¶
type ZkStat ¶
type ZkStat struct { Version string Latency string Connections string Outstanding string Mode string // S=standalone, L=leader, F=follower Znodes string Received string Sent string }
func ParseStatResult ¶
Parse `zk stat` output into ZkStat struct.
type ZkTimestamp ¶
type ZkTimestamp int64
func (ZkTimestamp) Time ¶
func (this ZkTimestamp) Time() time.Time
type ZkZone ¶
type ZkZone struct {
// contains filtered or unexported fields
}
ZkZone represents a single Zookeeper ensemble where many kafka clusters can reside each of which has a different chroot path.
func NewZkZone ¶
NewZkZone creates a new ZkZone instance. All ephemeral nodes and watchers are automatically maintained event after zk connection lost and reconnected.
func (*ZkZone) AddRedisCluster ¶
func (this *ZkZone) AddRedisCluster(cluster string, instances []RedisInstance)
func (*ZkZone) AllRedisClusters ¶
func (this *ZkZone) AllRedisClusters() []RedisCluster
func (*ZkZone) ChildrenWithData ¶
return {childName: zkData}
func (*ZkZone) ClusterPath ¶
ClusterPath return the zk chroot path of a cluster.
func (*ZkZone) CreateDbusCluster ¶
func (*ZkZone) CreateEphemeralZnode ¶
func (*ZkZone) CreateEsCluster ¶
func (*ZkZone) CreateJobQueue ¶
func (*ZkZone) CreateOrUpdateWebhook ¶
func (this *ZkZone) CreateOrUpdateWebhook(topic string, hook WebhookMeta) error
func (*ZkZone) CreatePermenantZnode ¶
func (*ZkZone) DefaultDbusCluster ¶
func (*ZkZone) DelRedisCluster ¶
func (*ZkZone) DeleteRecursive ¶
func (*ZkZone) DiscoverClusters ¶
DiscoverClusters find all possible kafka clusters.
func (*ZkZone) EnsurePathExists ¶
func (*ZkZone) FlushKatewayMetrics ¶
func (*ZkZone) ForSortedBrokers ¶
func (this *ZkZone) ForSortedBrokers(fn func(cluster string, brokers map[string]*BrokerZnode))
func (*ZkZone) ForSortedClusters ¶
func (*ZkZone) ForSortedControllers ¶
func (this *ZkZone) ForSortedControllers(fn func(cluster string, controller *ControllerMeta))
func (*ZkZone) ForSortedDbusClusters ¶
func (*ZkZone) ForSortedEsClusters ¶
func (*ZkZone) HostBelongs ¶
func (*ZkZone) KatewayInfoById ¶
func (this *ZkZone) KatewayInfoById(id string) *KatewayMeta
func (*ZkZone) KatewayInfos ¶
func (this *ZkZone) KatewayInfos() ([]*KatewayMeta, error)
KatewayInfos return online kateway instances meta sort by id.
func (*ZkZone) KatewayJobClusterConfig ¶
func (*ZkZone) KatewayMysqlDsn ¶
func (*ZkZone) KguardInfos ¶
func (this *ZkZone) KguardInfos() ([]*KguardMeta, error)
func (*ZkZone) LoadKatewayMetrics ¶
func (*ZkZone) NewCluster ¶
func (*ZkZone) NewEsCluster ¶
func (*ZkZone) NewOrchestrator ¶
func (this *ZkZone) NewOrchestrator() *Orchestrator
func (*ZkZone) NewclusterWithPath ¶
func (*ZkZone) PublicClusters ¶
func (*ZkZone) RegisterCluster ¶
func (*ZkZone) ResetErrors ¶
func (this *ZkZone) ResetErrors()
func (*ZkZone) RunZkFourLetterCommand ¶
Returns {zkHost: outputLines}
func (*ZkZone) SessionEvents ¶
SessionEvents returns zk connection events.