Versions in this module Expand all Collapse all v2 v2.5.0 Jan 15, 2020 Changes in this version + const Hash + const Manual + const Random + func NewClient(config *Config, partitioner string) (sarama.Client, error) + type AsyncProducer struct + Client sarama.Client + Config *Config + Partition int32 + Producer sarama.AsyncProducer + func GetAsyncProducerMock(t mocks.ErrorReporter) (*AsyncProducer, *mocks.AsyncProducer) + func NewAsyncProducer(config *Config, sClient sarama.Client, partitioner string, wg *sync.WaitGroup) (*AsyncProducer, error) + func (ref *AsyncProducer) Close(async ...bool) error + func (ref *AsyncProducer) GetCloseChannel() <-chan struct{} + func (ref *AsyncProducer) IsClosed() bool + func (ref *AsyncProducer) SendMsgByte(topic string, key []byte, msg []byte, metadata interface{}) + func (ref *AsyncProducer) SendMsgToPartition(topic string, partition int32, key Encoder, msg Encoder, metadata interface{}) + func (ref *AsyncProducer) WaitForClose() + type Config struct + Brokers []string + Cancel context.CancelFunc + Context context.Context + Debug bool + ErrorChan chan *ProducerError + GroupID string + InitialOffset int64 + Partition int32 + Partitioner sarama.PartitionerConstructor + RecvError bool + RecvErrorChan chan error + RecvMessageChan chan *ConsumerMessage + RecvNotification bool + RecvNotificationChan chan *cluster.Notification + RequiredAcks RequiredAcks + SendError bool + SendSuccess bool + SuccessChan chan *ProducerMessage + Topics []string + func NewConfig(log logging.Logger) *Config + func (ref *Config) ConsumerConfig() *cluster.Config + func (ref *Config) ProducerConfig() *sarama.Config + func (ref *Config) SetAcks(acks RequiredAcks) + func (ref *Config) SetBrokers(brokers ...string) + func (ref *Config) SetDebug(val bool) + func (ref *Config) SetErrorChan(val chan *ProducerError) + func (ref *Config) SetGroup(id string) + func (ref *Config) SetInitialOffset(offset int64) + func (ref *Config) SetPartition(val int32) + func (ref *Config) SetPartitioner(val string) + func (ref *Config) SetRecvError(val bool) + func (ref *Config) SetRecvErrorChan(val chan error) + func (ref *Config) SetRecvMessageChan(val chan *ConsumerMessage) + func (ref *Config) SetRecvNotification(val bool) + func (ref *Config) SetRecvNotificationChan(val chan *cluster.Notification) + func (ref *Config) SetSendError(val bool) + func (ref *Config) SetSendSuccess(val bool) + func (ref *Config) SetSuccessChan(val chan *ProducerMessage) + func (ref *Config) SetTLS(tlsConfig *tls.Config) (err error) + func (ref *Config) SetTopics(topics string) + func (ref *Config) ValidateAsyncProducerConfig() error + func (ref *Config) ValidateConsumerConfig() error + func (ref *Config) ValidateSyncProducerConfig() error + type Consumer struct + Config *Config + Consumer clusterConsumer + SConsumer sarama.Consumer + func GetConsumerMock(t mocks.ErrorReporter) *Consumer + func NewConsumer(config *Config, wg *sync.WaitGroup) (*Consumer, error) + func (ref *Consumer) Close() error + func (ref *Consumer) CommitOffsets() error + func (ref *Consumer) GetCloseChannel() <-chan struct{} + func (ref *Consumer) IsClosed() bool + func (ref *Consumer) MarkOffset(msg *ConsumerMessage, metadata string) + func (ref *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) + func (ref *Consumer) PrintNotification(note map[string][]int32) + func (ref *Consumer) StartConsumerHandlers() + func (ref *Consumer) StartConsumerManualHandlers(partitionConsumer sarama.PartitionConsumer) + func (ref *Consumer) Subscriptions() map[string][]int32 + func (ref *Consumer) WaitForClose() + type ConsumerMessage struct + Key []byte + Offset int64 + Partition int32 + PrevValue []byte + Timestamp time.Time + Topic string + Value []byte + func (cm *ConsumerMessage) GetKey() string + func (cm *ConsumerMessage) GetOffset() int64 + func (cm *ConsumerMessage) GetPartition() int32 + func (cm *ConsumerMessage) GetPrevValue() []byte + func (cm *ConsumerMessage) GetTopic() string + func (cm *ConsumerMessage) GetValue() []byte + type Encoder interface + type ProducerError struct + Err error + func (ref *ProducerError) Error() error + func (ref *ProducerError) String() string + type ProducerMessage struct + Key Encoder + Metadata interface{} + Offset int64 + Partition int32 + Topic string + Value Encoder + func (pm *ProducerMessage) GetKey() string + func (pm *ProducerMessage) GetOffset() int64 + func (pm *ProducerMessage) GetPartition() int32 + func (pm *ProducerMessage) GetPrevValue() []byte + func (pm *ProducerMessage) GetTopic() string + func (pm *ProducerMessage) GetValue() []byte + func (pm *ProducerMessage) String() string + type ProtoConsumerMessage struct + func NewProtoConsumerMessage(msg *ConsumerMessage, serializer keyval.Serializer) *ProtoConsumerMessage + func (cm *ProtoConsumerMessage) GetKey() string + func (cm *ProtoConsumerMessage) GetOffset() int64 + func (cm *ProtoConsumerMessage) GetPartition() int32 + func (cm *ProtoConsumerMessage) GetPrevValue(msg proto.Message) (prevValueExist bool, err error) + func (cm *ProtoConsumerMessage) GetTopic() string + func (cm *ProtoConsumerMessage) GetValue(msg proto.Message) error + type ProtoProducerMessage struct + Serializer keyval.Serializer + func (ppm *ProtoProducerMessage) GetKey() string + func (ppm *ProtoProducerMessage) GetOffset() int64 + func (ppm *ProtoProducerMessage) GetPartition() int32 + func (ppm *ProtoProducerMessage) GetPrevValue(msg proto.Message) (prevValueExist bool, err error) + func (ppm *ProtoProducerMessage) GetTopic() string + func (ppm *ProtoProducerMessage) GetValue(msg proto.Message) error + type ProtoProducerMessageErr struct + Err error + func (pme *ProtoProducerMessageErr) Error() error + type RequiredAcks int16 + const AcksUnset + const NoResponse + const WaitForAll + const WaitForLocal + type SyncProducer struct + Client sarama.Client + Config *Config + Partition int32 + Producer sarama.SyncProducer + func GetSyncProducerMock(t mocks.ErrorReporter) (*SyncProducer, *mocks.SyncProducer) + func NewSyncProducer(config *Config, sClient sarama.Client, partitioner string, wg *sync.WaitGroup) (*SyncProducer, error) + func (ref *SyncProducer) Close() error + func (ref *SyncProducer) GetCloseChannel() <-chan struct{} + func (ref *SyncProducer) IsClosed() bool + func (ref *SyncProducer) SendMsgByte(topic string, key []byte, msg []byte) (*ProducerMessage, error) + func (ref *SyncProducer) SendMsgToPartition(topic string, partition int32, key sarama.Encoder, msg sarama.Encoder) (*ProducerMessage, error) + func (ref *SyncProducer) WaitForClose() v2.5.0-alpha Dec 13, 2019 Other modules containing this package go.ligato.io/cn-infra