Versions in this module Expand all Collapse all v1 v1.1.1 Feb 4, 2020 v1.1.0 Dec 16, 2019 Changes in this version + const OffsetNewset + const OffsetOldest + var ErrBrokersIsEmpty = errors.New("brokers is empty") + var ErrGroupIDIsEmpty = errors.New("groupId is empty") + var ErrTopicIsEmpty = errors.New("topic is empty") + type AsyncProducer struct + func NewAsyncProducer(brokers string) (*AsyncProducer, error) + func (ap *AsyncProducer) BatchSend(topic string, messages map[string]string) + func (ap *AsyncProducer) Close() error + func (ap *AsyncProducer) Send(topic, key, message string) + type Client struct + func NewClient(brokers string) (*Client, error) + func (c *Client) CreatePartitions(topic string, totalPartition int32) error + func (c *Client) CreateTopic(topic string, partitions int32, replicationFactor int16, ...) error + func (c *Client) DeleteRecord(topic string, partitionID int32, endOffset int64) error + func (c *Client) DeleteTopic(topic string) error + func (c *Client) DescribeConsumerGroup(group string) (*sarama.GroupDescription, error) + func (c *Client) DescribeTopic(topic string) ([]*sarama.PartitionMetadata, error) + func (c *Client) GetNewestOffset(topic string, partitionID int32) (int64, error) + func (c *Client) GetOldestOffset(topic string, partitionID int32) (int64, error) + func (c *Client) ListConsumerGroups() (map[string]string, error) + func (c *Client) ListTopics() ([]string, error) + type Consumer struct + func NewConsumer(brokers, topic, groupID string, defaultOffset int) (*Consumer, error) + func (c *Consumer) Close() + func (c *Consumer) CommitOffset(msg *sarama.ConsumerMessage) + func (c *Consumer) Errors() <-chan error + func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage + type SyncProducer struct + func NewSyncProducer(brokers string) (*SyncProducer, error) + func (sp *SyncProducer) BatchSend(topic string, messages map[string]string) error + func (sp *SyncProducer) Close() error + func (sp *SyncProducer) Send(topic, key, message string) (partitionID int32, offset int64, err error)