Documentation
¶
Index ¶
- func Copy(source interface{}, destin interface{})
- type MockAsyncProducer
- func (t *MockAsyncProducer) AbortTxn() error
- func (t *MockAsyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error
- func (t *MockAsyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error
- func (t *MockAsyncProducer) AsyncClose()
- func (t *MockAsyncProducer) BeginTxn() error
- func (t *MockAsyncProducer) Close() error
- func (t *MockAsyncProducer) CommitTxn() error
- func (t *MockAsyncProducer) Errors() <-chan *sarama.ProducerError
- func (t *MockAsyncProducer) Input() chan<- *sarama.ProducerMessage
- func (t *MockAsyncProducer) IsTransactional() bool
- func (t *MockAsyncProducer) Successes() <-chan *sarama.ProducerMessage
- func (t *MockAsyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag
- type MockConsumerGroupClaim
- type MockConsumerGroupHandler
- type MockConsumerGroupSession
- func (t *MockConsumerGroupSession) Claims() map[string][]int32
- func (t *MockConsumerGroupSession) Commit()
- func (t *MockConsumerGroupSession) Context() context.Context
- func (t *MockConsumerGroupSession) GenerationID() int32
- func (t *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
- func (t *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
- func (t *MockConsumerGroupSession) MemberID() string
- func (t *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
- type ProducerPool
- type StreamConsumer
- func (consumer *StreamConsumer) AddDestination(dest common.Topic, serializer common.MessageSerializer)
- func (consumer *StreamConsumer) Cleanup(session sarama.ConsumerGroupSession) error
- func (consumer *StreamConsumer) CloseGroup()
- func (consumer *StreamConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (consumer *StreamConsumer) Destinations() []common.Topic
- func (consumer *StreamConsumer) HandleTxnError(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, ...)
- func (consumer *StreamConsumer) MessageSerializers() []common.MessageSerializer
- func (consumer *StreamConsumer) ProducerPool() *ProducerPool
- func (consumer *StreamConsumer) Setup(session sarama.ConsumerGroupSession) error
- func (consumer *StreamConsumer) StartAsGroup(ctx context.Context, handler sarama.ConsumerGroupHandler)
- func (consumer *StreamConsumer) StartAsGroupSelf(ctx context.Context)
- func (consumer *StreamConsumer) Transaction(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, ...)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type MockAsyncProducer ¶ added in v0.1.1
type MockAsyncProducer struct { TxnStatusFlag sarama.ProducerTxnStatusFlag AbortTxnCalled int AbortTxnError error BeginTxnCalled int BeginTxnError error CommitTxnCalled int CommitTxnError error AddMessageToTxnCalled int AddMessageToTxnError error CloseCalled int InputChan chan *sarama.ProducerMessage }
MockAsyncProducer is a mock implementation of sarama.AsyncProducer
func (*MockAsyncProducer) AbortTxn ¶ added in v0.1.1
func (t *MockAsyncProducer) AbortTxn() error
func (*MockAsyncProducer) AddMessageToTxn ¶ added in v0.1.1
func (t *MockAsyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error
func (*MockAsyncProducer) AddOffsetsToTxn ¶ added in v0.1.1
func (t *MockAsyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error
func (*MockAsyncProducer) AsyncClose ¶ added in v0.1.1
func (t *MockAsyncProducer) AsyncClose()
func (*MockAsyncProducer) BeginTxn ¶ added in v0.1.1
func (t *MockAsyncProducer) BeginTxn() error
func (*MockAsyncProducer) Close ¶ added in v0.1.1
func (t *MockAsyncProducer) Close() error
func (*MockAsyncProducer) CommitTxn ¶ added in v0.1.1
func (t *MockAsyncProducer) CommitTxn() error
func (*MockAsyncProducer) Errors ¶ added in v0.1.1
func (t *MockAsyncProducer) Errors() <-chan *sarama.ProducerError
func (*MockAsyncProducer) Input ¶ added in v0.1.1
func (t *MockAsyncProducer) Input() chan<- *sarama.ProducerMessage
func (*MockAsyncProducer) IsTransactional ¶ added in v0.1.1
func (t *MockAsyncProducer) IsTransactional() bool
func (*MockAsyncProducer) Successes ¶ added in v0.1.1
func (t *MockAsyncProducer) Successes() <-chan *sarama.ProducerMessage
func (*MockAsyncProducer) TxnStatus ¶ added in v0.1.1
func (t *MockAsyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag
type MockConsumerGroupClaim ¶ added in v0.1.1
type MockConsumerGroupClaim struct {
DataChan chan *sarama.ConsumerMessage
}
MockConsumerGroupClaim is a mock implementation of sarama.ConsumerGroupClaim
func (*MockConsumerGroupClaim) HighWaterMarkOffset ¶ added in v0.1.1
func (t *MockConsumerGroupClaim) HighWaterMarkOffset() int64
func (*MockConsumerGroupClaim) InitialOffset ¶ added in v0.1.1
func (t *MockConsumerGroupClaim) InitialOffset() int64
func (*MockConsumerGroupClaim) Messages ¶ added in v0.1.1
func (t *MockConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage
func (*MockConsumerGroupClaim) Partition ¶ added in v0.1.1
func (t *MockConsumerGroupClaim) Partition() int32
func (*MockConsumerGroupClaim) Topic ¶ added in v0.1.1
func (t *MockConsumerGroupClaim) Topic() string
type MockConsumerGroupHandler ¶ added in v0.1.1
type MockConsumerGroupHandler struct { SetupCalled int CleanupCalled int ConsumeClaimCalled int ConsumeClaimError error }
func (*MockConsumerGroupHandler) Cleanup ¶ added in v0.1.1
func (t *MockConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
func (*MockConsumerGroupHandler) ConsumeClaim ¶ added in v0.1.1
func (t *MockConsumerGroupHandler) ConsumeClaim(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) error
func (*MockConsumerGroupHandler) Setup ¶ added in v0.1.1
func (t *MockConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
type MockConsumerGroupSession ¶ added in v0.1.1
MockConsumerGroupSession is a mock implementation of sarama.ConsumerGroupSession
func (*MockConsumerGroupSession) Claims ¶ added in v0.1.1
func (t *MockConsumerGroupSession) Claims() map[string][]int32
func (*MockConsumerGroupSession) Commit ¶ added in v0.1.1
func (t *MockConsumerGroupSession) Commit()
func (*MockConsumerGroupSession) Context ¶ added in v0.1.1
func (t *MockConsumerGroupSession) Context() context.Context
func (*MockConsumerGroupSession) GenerationID ¶ added in v0.1.1
func (t *MockConsumerGroupSession) GenerationID() int32
func (*MockConsumerGroupSession) MarkMessage ¶ added in v0.1.1
func (t *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
func (*MockConsumerGroupSession) MarkOffset ¶ added in v0.1.1
func (t *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
func (*MockConsumerGroupSession) MemberID ¶ added in v0.1.1
func (t *MockConsumerGroupSession) MemberID() string
func (*MockConsumerGroupSession) ResetOffset ¶ added in v0.1.1
func (t *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
type ProducerPool ¶ added in v0.1.1
type ProducerPool struct {
// contains filtered or unexported fields
}
ProducerPool is a pool of producers that can be used to produce messages to Kafka for one set of brokers. It is not related to Transaction, Transactional Producer implements by configProvider.
func NewProducerPool ¶ added in v0.1.1
func NewProducerPool(brokers []string, configProvider func() *sarama.Config) *ProducerPool
func (*ProducerPool) Close ¶ added in v0.1.1
func (p *ProducerPool) Close()
func (*ProducerPool) Producers ¶ added in v0.1.1
func (p *ProducerPool) Producers() map[common.Topic][]sarama.AsyncProducer
func (*ProducerPool) Return ¶ added in v0.1.1
func (p *ProducerPool) Return(producer sarama.AsyncProducer, topic common.Topic)
Return returns a producer to the pool.
func (*ProducerPool) Take ¶ added in v0.1.1
func (p *ProducerPool) Take(topic common.Topic) (producer sarama.AsyncProducer)
Take returns a producer for a given topic. If the producer does not exist, it creates a new one.
type StreamConsumer ¶
type StreamConsumer struct {
// contains filtered or unexported fields
}
StreamConsumer is a consumer that consumes messages from a Kafka topic and produces them to other topics. It implements the sarama.ConsumerGroupHandler interface.
func NewStreamConsumer ¶
func (*StreamConsumer) AddDestination ¶
func (consumer *StreamConsumer) AddDestination(dest common.Topic, serializer common.MessageSerializer)
func (*StreamConsumer) Cleanup ¶
func (consumer *StreamConsumer) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.
func (*StreamConsumer) CloseGroup ¶ added in v0.1.1
func (consumer *StreamConsumer) CloseGroup()
func (*StreamConsumer) ConsumeClaim ¶
func (consumer *StreamConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). NOTE: This must not be called within a goroutine, already handled as goroutines by sarama.
func (*StreamConsumer) Destinations ¶ added in v0.1.1
func (consumer *StreamConsumer) Destinations() []common.Topic
func (*StreamConsumer) HandleTxnError ¶ added in v0.1.1
func (consumer *StreamConsumer) HandleTxnError(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, err error, defaulthandler func() error)
HandleTxnError handles transaction errors, this exported method is only for testing purposes.
func (*StreamConsumer) MessageSerializers ¶ added in v0.1.1
func (consumer *StreamConsumer) MessageSerializers() []common.MessageSerializer
func (*StreamConsumer) ProducerPool ¶ added in v0.1.1
func (consumer *StreamConsumer) ProducerPool() *ProducerPool
func (*StreamConsumer) Setup ¶
func (consumer *StreamConsumer) Setup(session sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim.
func (*StreamConsumer) StartAsGroup ¶
func (consumer *StreamConsumer) StartAsGroup(ctx context.Context, handler sarama.ConsumerGroupHandler)
func (*StreamConsumer) StartAsGroupSelf ¶ added in v0.1.1
func (consumer *StreamConsumer) StartAsGroupSelf(ctx context.Context)
func (*StreamConsumer) Transaction ¶ added in v0.1.1
func (consumer *StreamConsumer) Transaction(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession)