Versions in this module Expand all Collapse all v1 v1.0.2 Apr 28, 2024 Changes in this version + const AnyOffset + func NewTestConfig() *sarama.Config + type AsyncProducer struct + func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer + func (mp *AsyncProducer) AbortTxn() error + func (mp *AsyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error + func (mp *AsyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error + func (mp *AsyncProducer) AsyncClose() + func (mp *AsyncProducer) BeginTxn() error + func (mp *AsyncProducer) Close() error + func (mp *AsyncProducer) CommitTxn() error + func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError + func (mp *AsyncProducer) ExpectInputAndFail(err error) *AsyncProducer + func (mp *AsyncProducer) ExpectInputAndSucceed() *AsyncProducer + func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) *AsyncProducer + func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) *AsyncProducer + func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *AsyncProducer + func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *AsyncProducer + func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage + func (mp *AsyncProducer) IsTransactional() bool + func (mp *AsyncProducer) Successes() <-chan *sarama.ProducerMessage + func (mp *AsyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag + type Consumer struct + func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer + func (c *Consumer) Close() error + func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) + func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer + func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 + func (c *Consumer) Partitions(topic string) ([]int32, error) + func (c *Consumer) Pause(topicPartitions map[string][]int32) + func (c *Consumer) PauseAll() + func (c *Consumer) Resume(topicPartitions map[string][]int32) + func (c *Consumer) ResumeAll() + func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) + func (c *Consumer) Topics() ([]string, error) + type ErrorReporter interface + Errorf func(string, ...interface{}) + type MessageChecker func(*sarama.ProducerMessage) error + type PartitionConsumer struct + func (pc *PartitionConsumer) AsyncClose() + func (pc *PartitionConsumer) Close() error + func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError + func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() *PartitionConsumer + func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() *PartitionConsumer + func (pc *PartitionConsumer) HighWaterMarkOffset() int64 + func (pc *PartitionConsumer) IsPaused() bool + func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage + func (pc *PartitionConsumer) Pause() + func (pc *PartitionConsumer) Resume() + func (pc *PartitionConsumer) YieldError(err error) *PartitionConsumer + func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *PartitionConsumer + type SyncProducer struct + func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer + func (sp *SyncProducer) AbortTxn() error + func (sp *SyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error + func (sp *SyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error + func (sp *SyncProducer) BeginTxn() error + func (sp *SyncProducer) Close() error + func (sp *SyncProducer) CommitTxn() error + func (sp *SyncProducer) ExpectSendMessageAndFail(err error) *SyncProducer + func (sp *SyncProducer) ExpectSendMessageAndSucceed() *SyncProducer + func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) *SyncProducer + func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) *SyncProducer + func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *SyncProducer + func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *SyncProducer + func (sp *SyncProducer) IsTransactional() bool + func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) + func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error + func (sp *SyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag + type TopicConfig struct + func NewTopicConfig() *TopicConfig + func (pc *TopicConfig) SetDefaultPartitions(n int32) + func (pc *TopicConfig) SetPartitions(partitions map[string]int32) + type ValueChecker func(val []byte) error