Documentation ¶
Index ¶
- type AccumulationInfo
- type GroupConfig
- type GroupInfo
- type Metadata
- func (m *Metadata) Accumulation(queue, group string) (int64, int64, error)
- func (m *Metadata) AddGroup(group string, queue string, write bool, read bool, url string, ips []string) error
- func (m *Metadata) AddQueue(queue string, idcs []string) error
- func (m *Metadata) Close()
- func (m *Metadata) DelQueue(queue string) error
- func (m *Metadata) DeleteGroup(group string, queue string) error
- func (m *Metadata) ExistGroup(queue, group string) bool
- func (m *Metadata) ExistQueue(queue string) bool
- func (m *Metadata) GetBrokerAddrsByIdc(idcs ...string) map[string][]string
- func (m *Metadata) GetGroupConfig(group string, queue string) (*GroupConfig, error)
- func (m *Metadata) GetGroupMap() map[string][]string
- func (m *Metadata) GetProxyConfigByID(id int) (string, error)
- func (m *Metadata) GetQueueConfig(queue string) *QueueConfig
- func (m *Metadata) GetQueueInfo(queues ...string) ([]*QueueInfo, error)
- func (m *Metadata) GetQueueMap() map[string][]string
- func (m *Metadata) GetQueues() (queues []string)
- func (m *Metadata) LoadMetrics() ([]byte, error)
- func (m *Metadata) LocalManager() *kafka.Manager
- func (m *Metadata) Proxys() (map[string]string, error)
- func (m *Metadata) RefreshMetadata() error
- func (m *Metadata) RegisterService(id int, data string) error
- func (m *Metadata) ResetOffset(queue string, group string, time int64) error
- func (m *Metadata) SaveMetrics(data string) error
- func (m *Metadata) UpdateGroupConfig(group string, queue string, write bool, read bool, url string, ips []string) error
- type Queue
- type QueueConfig
- type QueueInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccumulationInfo ¶
type GroupConfig ¶
type GroupConfig struct { Group string `json:"group,omitempty"` Queue string `json:"queue,omitempty"` Write bool `json:"write"` Read bool `json:"read"` Url string `json:"url"` Ips []string `json:"ips"` }
func (*GroupConfig) Load ¶
func (c *GroupConfig) Load(data []byte) error
func (*GroupConfig) String ¶
func (c *GroupConfig) String() string
type GroupInfo ¶
type GroupInfo struct { Group string `json:"group"` Queues []*GroupConfig `json:"queues,omitempty"` }
type Metadata ¶
type Metadata struct {
// contains filtered or unexported fields
}
func NewMetadata ¶
return a new metadata instance
func (*Metadata) Accumulation ¶
func (*Metadata) AddGroup ¶
func (m *Metadata) AddGroup(group string, queue string, write bool, read bool, url string, ips []string) error
add a group to given queue
func (*Metadata) DeleteGroup ¶
delete given group
func (*Metadata) ExistGroup ¶
Test a group exist
func (*Metadata) GetBrokerAddrsByIdc ¶
func (*Metadata) GetGroupConfig ¶
func (m *Metadata) GetGroupConfig(group string, queue string) (*GroupConfig, error)
func (*Metadata) GetGroupMap ¶
Get queue names of per group
func (*Metadata) GetProxyConfigByID ¶
Get a proxy's config
func (*Metadata) GetQueueConfig ¶
func (m *Metadata) GetQueueConfig(queue string) *QueueConfig
没有深拷贝,目前貌似不需要
func (*Metadata) GetQueueInfo ¶
TODO 回头修改HTTP API时同时修改返回的数据结构,能够最大化简化逻辑
func (*Metadata) GetQueueMap ¶
Get group names of per queue
func (*Metadata) LoadMetrics ¶
func (*Metadata) LocalManager ¶
return local IDC kafka manager
func (*Metadata) RefreshMetadata ¶
refresh metadata from zookeeper
func (*Metadata) RegisterService ¶
register service to zookeeper
func (*Metadata) ResetOffset ¶
reset given queue-group's offset by time
func (*Metadata) SaveMetrics ¶
type Queue ¶
type Queue interface { Create(queue string, idcs []string) error Update(queue string) error Delete(queue string) error Lookup(queue string, group string) ([]*QueueInfo, error) AddGroup(group string, queue string, write bool, read bool, url string, ips []string) error UpdateGroup(group string, queue string, write bool, read bool, url string, ips []string) error DeleteGroup(group string, queue string) error LookupGroup(group string) ([]*GroupInfo, error) GetSingleGroup(group string, queue string) (*GroupConfig, error) SendMessage(queue string, group string, data []byte, flag uint64) (id string, err error) RecvMessage(queue string, group string) (id string, data []byte, flag uint64, err error) AckMessage(queue string, group string, id string) error AccumulationStatus() ([]AccumulationInfo, error) Proxys() (map[string]string, error) GetProxyConfigByID(id int) (string, error) UpTime() int64 Version() string Close() }
type QueueConfig ¶
type QueueConfig struct { Queue string `json:"queue"` Ctime int64 `json:"ctime"` Length int64 `json:"length"` Groups map[string]GroupConfig `json:"groups,omitempty"` Idcs []string `json:"idcs,omitempty"` }
func (*QueueConfig) Parse ¶
func (q *QueueConfig) Parse(data []byte) error
func (*QueueConfig) String ¶
func (q *QueueConfig) String() string
Click to show internal directories.
Click to hide internal directories.