Versions in this module Expand all Collapse all v1 v1.0.0 Aug 30, 2021 Changes in this version + var ErrorPaused = errors.New("paused") + func CreateTopic(brokerAddr string, topic string, NumPartitions, ReplicationFactor int) error + func ListTopics(brokerAddr string) ([]string, error) + type Alarm interface + Alarm func(message string) + type Consumer struct + type FileRecover struct + func NewFileRecover(topic, savePath string, bufCap int, writeFileIntervalSecond int) (*FileRecover, error) + func (m *FileRecover) CloseNotify() chan struct{} + func (m *FileRecover) Restore(fn RestoreFunc) + func (m *FileRecover) Save(message Message) error + type KeyHashPartitionRule struct + PartitionNum int + func (m *KeyHashPartitionRule) Partition(topic string, key, value []byte) int + type Message struct + Key []byte + Value []byte + type PartitionRule interface + Partition func(topic string, key, value []byte) int + type Producer struct + func NewProducer(topic string, partitionRule PartitionRule, alarm Alarm, ...) *Producer + func (m *Producer) PublishMessage(msg Message) error + func (m *Producer) PublishMessages(msgs []Message, partition int) (int, error) + type ProducerRecover interface + CloseNotify func() chan struct{} + Restore func(fn RestoreFunc) + Save func(message Message) error + type RestoreFunc func(message *Message, err error, canceled *bool) + type RoundRobinPartitionRule struct + PartitionNum int + func (m *RoundRobinPartitionRule) Partition(topic string, key, value []byte) int