Documentation ¶
Index ¶
- func Copy(source interface{}, destin interface{})
- type MockAsyncProducer
- func (t *MockAsyncProducer) AbortTxn() error
- func (t *MockAsyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error
- func (t *MockAsyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error
- func (t *MockAsyncProducer) AsyncClose()
- func (t *MockAsyncProducer) BeginTxn() error
- func (t *MockAsyncProducer) Close() error
- func (t *MockAsyncProducer) CommitTxn() error
- func (t *MockAsyncProducer) Errors() <-chan *sarama.ProducerError
- func (t *MockAsyncProducer) Input() chan<- *sarama.ProducerMessage
- func (t *MockAsyncProducer) IsTransactional() bool
- func (t *MockAsyncProducer) Successes() <-chan *sarama.ProducerMessage
- func (t *MockAsyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag
- type MockConsumerGroupClaim
- type MockConsumerGroupHandler
- type MockConsumerGroupSession
- func (t *MockConsumerGroupSession) Claims() map[string][]int32
- func (t *MockConsumerGroupSession) Commit()
- func (t *MockConsumerGroupSession) Context() context.Context
- func (t *MockConsumerGroupSession) GenerationID() int32
- func (t *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
- func (t *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
- func (t *MockConsumerGroupSession) MemberID() string
- func (t *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
- type ProducerPool
- type StreamConsumer
- func (consumer *StreamConsumer) AddDestination(dest common.Topic, serializer common.MessageSerializer)
- func (consumer *StreamConsumer) Cleanup(session sarama.ConsumerGroupSession) error
- func (consumer *StreamConsumer) CloseGroup()
- func (consumer *StreamConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (consumer *StreamConsumer) Destinations() []common.Topic
- func (consumer *StreamConsumer) HandleTxnError(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, ...)
- func (consumer *StreamConsumer) MessageSerializers() []common.MessageSerializer
- func (consumer *StreamConsumer) ProducerPool() *ProducerPool
- func (consumer *StreamConsumer) Setup(session sarama.ConsumerGroupSession) error
- func (consumer *StreamConsumer) StartAsGroup(ctx context.Context, handler sarama.ConsumerGroupHandler)
- func (consumer *StreamConsumer) StartAsGroupSelf(ctx context.Context)
- func (consumer *StreamConsumer) Transaction(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, ...)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type MockAsyncProducer ¶ added in v0.1.1
type MockAsyncProducer struct { TxnStatusFlag sarama.ProducerTxnStatusFlag AbortTxnCalled int AbortTxnError error BeginTxnCalled int BeginTxnError error CommitTxnCalled int CommitTxnError error AddMessageToTxnCalled int AddMessageToTxnError error CloseCalled int InputChan chan *sarama.ProducerMessage }
MockAsyncProducer is a mock implementation of sarama.AsyncProducer
func (*MockAsyncProducer) AbortTxn ¶ added in v0.1.1
func (t *MockAsyncProducer) AbortTxn() error
func (*MockAsyncProducer) AddMessageToTxn ¶ added in v0.1.1
func (t *MockAsyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error
func (*MockAsyncProducer) AddOffsetsToTxn ¶ added in v0.1.1
func (t *MockAsyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error
func (*MockAsyncProducer) AsyncClose ¶ added in v0.1.1
func (t *MockAsyncProducer) AsyncClose()
func (*MockAsyncProducer) BeginTxn ¶ added in v0.1.1
func (t *MockAsyncProducer) BeginTxn() error
func (*MockAsyncProducer) Close ¶ added in v0.1.1
func (t *MockAsyncProducer) Close() error
func (*MockAsyncProducer) CommitTxn ¶ added in v0.1.1
func (t *MockAsyncProducer) CommitTxn() error
func (*MockAsyncProducer) Errors ¶ added in v0.1.1
func (t *MockAsyncProducer) Errors() <-chan *sarama.ProducerError
func (*MockAsyncProducer) Input ¶ added in v0.1.1
func (t *MockAsyncProducer) Input() chan<- *sarama.ProducerMessage
func (*MockAsyncProducer) IsTransactional ¶ added in v0.1.1
func (t *MockAsyncProducer) IsTransactional() bool
func (*MockAsyncProducer) Successes ¶ added in v0.1.1
func (t *MockAsyncProducer) Successes() <-chan *sarama.ProducerMessage
func (*MockAsyncProducer) TxnStatus ¶ added in v0.1.1
func (t *MockAsyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag
type MockConsumerGroupClaim ¶ added in v0.1.1
type MockConsumerGroupClaim struct {
DataChan chan *sarama.ConsumerMessage
}
MockConsumerGroupClaim is a mock implementation of sarama.ConsumerGroupClaim
func (*MockConsumerGroupClaim) HighWaterMarkOffset ¶ added in v0.1.1
func (t *MockConsumerGroupClaim) HighWaterMarkOffset() int64
func (*MockConsumerGroupClaim) InitialOffset ¶ added in v0.1.1
func (t *MockConsumerGroupClaim) InitialOffset() int64
func (*MockConsumerGroupClaim) Messages ¶ added in v0.1.1
func (t *MockConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage
func (*MockConsumerGroupClaim) Partition ¶ added in v0.1.1
func (t *MockConsumerGroupClaim) Partition() int32
func (*MockConsumerGroupClaim) Topic ¶ added in v0.1.1
func (t *MockConsumerGroupClaim) Topic() string
type MockConsumerGroupHandler ¶ added in v0.1.1
type MockConsumerGroupHandler struct { SetupCalled int CleanupCalled int ConsumeClaimCalled int ConsumeClaimError error }
func (*MockConsumerGroupHandler) Cleanup ¶ added in v0.1.1
func (t *MockConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
func (*MockConsumerGroupHandler) ConsumeClaim ¶ added in v0.1.1
func (t *MockConsumerGroupHandler) ConsumeClaim(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) error
func (*MockConsumerGroupHandler) Setup ¶ added in v0.1.1
func (t *MockConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
type MockConsumerGroupSession ¶ added in v0.1.1
MockConsumerGroupSession is a mock implementation of sarama.ConsumerGroupSession
func (*MockConsumerGroupSession) Claims ¶ added in v0.1.1
func (t *MockConsumerGroupSession) Claims() map[string][]int32
func (*MockConsumerGroupSession) Commit ¶ added in v0.1.1
func (t *MockConsumerGroupSession) Commit()
func (*MockConsumerGroupSession) Context ¶ added in v0.1.1
func (t *MockConsumerGroupSession) Context() context.Context
func (*MockConsumerGroupSession) GenerationID ¶ added in v0.1.1
func (t *MockConsumerGroupSession) GenerationID() int32
func (*MockConsumerGroupSession) MarkMessage ¶ added in v0.1.1
func (t *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
func (*MockConsumerGroupSession) MarkOffset ¶ added in v0.1.1
func (t *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
func (*MockConsumerGroupSession) MemberID ¶ added in v0.1.1
func (t *MockConsumerGroupSession) MemberID() string
func (*MockConsumerGroupSession) ResetOffset ¶ added in v0.1.1
func (t *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
type ProducerPool ¶ added in v0.1.1
type ProducerPool struct {
// contains filtered or unexported fields
}
ProducerPool is a pool of producers that can be used to produce messages to Kafka for one set of brokers. It is not related to Transaction, Transactional Producer implements by configProvider.
Will be overridden Returns.Errors to true, because of monitoring errors.
func NewProducerPool ¶ added in v0.1.1
func NewProducerPool(brokers []string, configProvider func() *sarama.Config) *ProducerPool
func (*ProducerPool) Close ¶ added in v0.1.1
func (p *ProducerPool) Close()
func (*ProducerPool) Producers ¶ added in v0.1.1
func (p *ProducerPool) Producers() map[common.Topic][]sarama.AsyncProducer
func (*ProducerPool) Return ¶ added in v0.1.1
func (p *ProducerPool) Return(producer sarama.AsyncProducer, topic common.Topic)
Return returns a producer to the pool.
func (*ProducerPool) Take ¶ added in v0.1.1
func (p *ProducerPool) Take(topic common.Topic) (producer sarama.AsyncProducer)
Take returns a producer for a given topic. If the producer does not exist, it creates a new one.
type StreamConsumer ¶
type StreamConsumer struct {
// contains filtered or unexported fields
}
StreamConsumer is a consumer that consumes messages from a Kafka topic and produces them to other topics. It implements the sarama.ConsumerGroupHandler interface.
func NewStreamConsumer ¶
func (*StreamConsumer) AddDestination ¶
func (consumer *StreamConsumer) AddDestination(dest common.Topic, serializer common.MessageSerializer)
func (*StreamConsumer) Cleanup ¶
func (consumer *StreamConsumer) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.
func (*StreamConsumer) CloseGroup ¶ added in v0.1.1
func (consumer *StreamConsumer) CloseGroup()
func (*StreamConsumer) ConsumeClaim ¶
func (consumer *StreamConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). NOTE: This must not be called within a goroutine, already handled as goroutines by sarama.
func (*StreamConsumer) Destinations ¶ added in v0.1.1
func (consumer *StreamConsumer) Destinations() []common.Topic
func (*StreamConsumer) HandleTxnError ¶ added in v0.1.1
func (consumer *StreamConsumer) HandleTxnError(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, err error, defaulthandler func() error)
HandleTxnError handles transaction errors, this exported method is only for testing purposes.
func (*StreamConsumer) MessageSerializers ¶ added in v0.1.1
func (consumer *StreamConsumer) MessageSerializers() []common.MessageSerializer
func (*StreamConsumer) ProducerPool ¶ added in v0.1.1
func (consumer *StreamConsumer) ProducerPool() *ProducerPool
func (*StreamConsumer) Setup ¶
func (consumer *StreamConsumer) Setup(session sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim.
func (*StreamConsumer) StartAsGroup ¶
func (consumer *StreamConsumer) StartAsGroup(ctx context.Context, handler sarama.ConsumerGroupHandler)
func (*StreamConsumer) StartAsGroupSelf ¶ added in v0.1.1
func (consumer *StreamConsumer) StartAsGroupSelf(ctx context.Context)
func (*StreamConsumer) Transaction ¶ added in v0.1.1
func (consumer *StreamConsumer) Transaction(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession)