Versions in this module Expand all Collapse all v2 v2.2.2 Oct 17, 2020 v2.2.0 Oct 17, 2020 Changes in this version + const StartOffsetNewest + const StartOffsetOldest + var ErrClosed = errors.New("closed") + var ErrMxClosed = errors.New("closed") + var ErrNoData = errors.New("no data") + type BatchConsumer interface + ConsumeBatch func() ([]*proto.Message, error) + type Broker struct + func Dial(nodeAddresses []string, conf BrokerConf) (*Broker, error) + func (b *Broker) BatchConsumer(conf ConsumerConf) (BatchConsumer, error) + func (b *Broker) Close() + func (b *Broker) Consumer(conf ConsumerConf) (Consumer, error) + func (b *Broker) CreateTopic(topics []proto.TopicInfo, timeout time.Duration, validateOnly bool) (*proto.CreateTopicsResp, error) + func (b *Broker) DeleteTopic(topics []string) (*proto.DeleteTopicsResp, error) + func (b *Broker) Metadata() (*proto.MetadataResp, error) + func (b *Broker) OffsetCoordinator(conf OffsetCoordinatorConf) (OffsetCoordinator, error) + func (b *Broker) OffsetEarliest(topic string, partition int32) (offset int64, err error) + func (b *Broker) OffsetLatest(topic string, partition int32) (offset int64, err error) + func (b *Broker) PartitionCount(topic string) (int32, error) + func (b *Broker) Producer(conf ProducerConf) Producer + type BrokerConf struct + AllowTopicCreation bool + ClientID string + DialRetryLimit int + DialRetryWait time.Duration + DialTimeout time.Duration + LeaderRetryLimit int + LeaderRetryWait time.Duration + Log interface{ ... } + Logger Logger + ReadTimeout time.Duration + RetryErrLimit int + RetryErrWait time.Duration + TLSCa []byte + TLSCert []byte + TLSKey []byte + func NewBrokerConf(clientID string) BrokerConf + type Client interface + Close func() + Consumer func(conf ConsumerConf) (Consumer, error) + OffsetCoordinator func(conf OffsetCoordinatorConf) (OffsetCoordinator, error) + OffsetEarliest func(topic string, partition int32) (offset int64, err error) + OffsetLatest func(topic string, partition int32) (offset int64, err error) + Producer func(conf ProducerConf) Producer + type Consumer interface + Consume func() (*proto.Message, error) + type ConsumerConf struct + Logger Logger + MaxFetchSize int32 + MinFetchSize int32 + Partition int32 + RequestTimeout time.Duration + RetryErrLimit int + RetryErrWait time.Duration + RetryLimit int + RetryWait time.Duration + StartOffset int64 + Topic string + func NewConsumerConf(topic string, partition int32) ConsumerConf + type DistributingProducer interface + Distribute func(topic string, messages ...*proto.Message) (offset int64, err error) + func NewHashProducer(p Producer, numPartitions int32) DistributingProducer + func NewRandomProducer(p Producer, numPartitions int32) DistributingProducer + func NewRoundRobinProducer(p Producer, numPartitions int32) DistributingProducer + type Logger interface + Debug func(msg string, args ...interface{}) + Error func(msg string, args ...interface{}) + Info func(msg string, args ...interface{}) + Warn func(msg string, args ...interface{}) + type Mx struct + func Merge(consumers ...Consumer) *Mx + func (p *Mx) Close() + func (p *Mx) Consume() (*proto.Message, error) + func (p *Mx) Workers() int + type OffsetCoordinator interface + Commit func(topic string, partition int32, offset int64) error + Offset func(topic string, partition int32) (offset int64, metadata string, err error) + type OffsetCoordinatorConf struct + ConsumerGroup string + Logger Logger + RetryErrLimit int + RetryErrWait time.Duration + func NewOffsetCoordinatorConf(consumerGroup string) OffsetCoordinatorConf + type Producer interface + Produce func(topic string, partition int32, messages ...*proto.Message) (offset int64, err error) + type ProducerConf struct + Compression proto.Compression + Logger Logger + RequestTimeout time.Duration + RequiredAcks int16 + RetryLimit int + RetryWait time.Duration + func NewProducerConf() ProducerConf Other modules containing this package github.com/Domoryonok/kafka