Documentation ¶
Index ¶
- Variables
- type Consumer
- type Manager
- func (m *Manager) Accumulation(topic, group string) (int64, int64, error)
- func (m *Manager) BrokerAddrs() []string
- func (m *Manager) BrokersList() []int32
- func (m *Manager) Close() error
- func (m *Manager) CommitOffset(topic, group string, offsets map[int32]int64) error
- func (m *Manager) CreateTopic(topic string, replications int32, partitions int32) error
- func (m *Manager) DeleteTopic(topic string) error
- func (m *Manager) ExistTopic(topic string) (bool, error)
- func (m *Manager) FetchGroupOffsets(topic, group string) (map[int32]int64, error)
- func (m *Manager) FetchTopicOffsets(topic string, time int64) (map[int32]int64, error)
- func (m *Manager) RefreshMetadata() error
- func (m *Manager) Topics() (topics []string, err error)
- func (m *Manager) UpdateTopic(topic string, partitions int) error
- type Producer
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrTimeout = errors.New("timeout") ErrClosed = errors.New("consumer closed") ErrBadAck = errors.New("bad ack") ErrNewConsumer = errors.New("new kafka consumer failed") ErrIdcNotExist = errors.New("idc not exist") ErrInvaildPartition = errors.New("invaild partition") ErrInvaildOffset = errors.New("invaild offset") ErrEmptyAddr = errors.New("empty addrs") )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewManager ¶
new a kafka manager by give zookeeper address of kafka
func (*Manager) Accumulation ¶
获得指定topic, group堆积的消息的信息
func (*Manager) BrokerAddrs ¶
get broker address from manager's cached data
func (*Manager) BrokersList ¶
get broker id list from manager's cached data
func (*Manager) CommitOffset ¶
目前只能用于新建group时的offset置位,当group join-group后,Kafka server需要检查 OffsetCommitRequest的ConsumerGroupGeneration、ConsumerID是否有效,这样,该API 就会返回失败。当有新的需求时,应当考虑该API是否需要重新设计。
func (*Manager) CreateTopic ¶
create a topic by given name
func (*Manager) DeleteTopic ¶
mark given topic to delete
func (*Manager) ExistTopic ¶
test given topic whether exists.
func (*Manager) FetchGroupOffsets ¶
func (*Manager) FetchTopicOffsets ¶
得到的offset为该topic将要写入消息的offset
func (*Manager) RefreshMetadata ¶
refresh the available metadata of kafka
Click to show internal directories.
Click to hide internal directories.