internal

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Copy added in v0.1.1

func Copy(source interface{}, destin interface{})

Deep copy

Types

type MockAsyncProducer added in v0.1.1

type MockAsyncProducer struct {
	TxnStatusFlag         sarama.ProducerTxnStatusFlag
	AbortTxnCalled        int
	AbortTxnError         error
	BeginTxnCalled        int
	BeginTxnError         error
	CommitTxnCalled       int
	CommitTxnError        error
	AddMessageToTxnCalled int
	AddMessageToTxnError  error
	CloseCalled           int
	InputChan             chan *sarama.ProducerMessage
}

MockAsyncProducer is a mock implementation of sarama.AsyncProducer

func (*MockAsyncProducer) AbortTxn added in v0.1.1

func (t *MockAsyncProducer) AbortTxn() error

func (*MockAsyncProducer) AddMessageToTxn added in v0.1.1

func (t *MockAsyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error

func (*MockAsyncProducer) AddOffsetsToTxn added in v0.1.1

func (t *MockAsyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error

func (*MockAsyncProducer) AsyncClose added in v0.1.1

func (t *MockAsyncProducer) AsyncClose()

func (*MockAsyncProducer) BeginTxn added in v0.1.1

func (t *MockAsyncProducer) BeginTxn() error

func (*MockAsyncProducer) Close added in v0.1.1

func (t *MockAsyncProducer) Close() error

func (*MockAsyncProducer) CommitTxn added in v0.1.1

func (t *MockAsyncProducer) CommitTxn() error

func (*MockAsyncProducer) Errors added in v0.1.1

func (t *MockAsyncProducer) Errors() <-chan *sarama.ProducerError

func (*MockAsyncProducer) Input added in v0.1.1

func (t *MockAsyncProducer) Input() chan<- *sarama.ProducerMessage

func (*MockAsyncProducer) IsTransactional added in v0.1.1

func (t *MockAsyncProducer) IsTransactional() bool

func (*MockAsyncProducer) Successes added in v0.1.1

func (t *MockAsyncProducer) Successes() <-chan *sarama.ProducerMessage

func (*MockAsyncProducer) TxnStatus added in v0.1.1

type MockConsumerGroupClaim added in v0.1.1

type MockConsumerGroupClaim struct {
	DataChan chan *sarama.ConsumerMessage
}

MockConsumerGroupClaim is a mock implementation of sarama.ConsumerGroupClaim

func (*MockConsumerGroupClaim) HighWaterMarkOffset added in v0.1.1

func (t *MockConsumerGroupClaim) HighWaterMarkOffset() int64

func (*MockConsumerGroupClaim) InitialOffset added in v0.1.1

func (t *MockConsumerGroupClaim) InitialOffset() int64

func (*MockConsumerGroupClaim) Messages added in v0.1.1

func (t *MockConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage

func (*MockConsumerGroupClaim) Partition added in v0.1.1

func (t *MockConsumerGroupClaim) Partition() int32

func (*MockConsumerGroupClaim) Topic added in v0.1.1

func (t *MockConsumerGroupClaim) Topic() string

type MockConsumerGroupHandler added in v0.1.1

type MockConsumerGroupHandler struct {
	SetupCalled        int
	CleanupCalled      int
	ConsumeClaimCalled int
	ConsumeClaimError  error
}

func (*MockConsumerGroupHandler) Cleanup added in v0.1.1

func (*MockConsumerGroupHandler) ConsumeClaim added in v0.1.1

func (*MockConsumerGroupHandler) Setup added in v0.1.1

type MockConsumerGroupSession added in v0.1.1

type MockConsumerGroupSession struct {
	Ctx               context.Context
	ResetOffsetCalled int
}

MockConsumerGroupSession is a mock implementation of sarama.ConsumerGroupSession

func (*MockConsumerGroupSession) Claims added in v0.1.1

func (t *MockConsumerGroupSession) Claims() map[string][]int32

func (*MockConsumerGroupSession) Commit added in v0.1.1

func (t *MockConsumerGroupSession) Commit()

func (*MockConsumerGroupSession) Context added in v0.1.1

func (*MockConsumerGroupSession) GenerationID added in v0.1.1

func (t *MockConsumerGroupSession) GenerationID() int32

func (*MockConsumerGroupSession) MarkMessage added in v0.1.1

func (t *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)

func (*MockConsumerGroupSession) MarkOffset added in v0.1.1

func (t *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)

func (*MockConsumerGroupSession) MemberID added in v0.1.1

func (t *MockConsumerGroupSession) MemberID() string

func (*MockConsumerGroupSession) ResetOffset added in v0.1.1

func (t *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)

type ProducerPool added in v0.1.1

type ProducerPool struct {
	// contains filtered or unexported fields
}

ProducerPool is a pool of producers that can be used to produce messages to Kafka for one set of brokers. It is not related to Transaction, Transactional Producer implements by configProvider.

Will be overridden Returns.Errors to true, because of monitoring errors.

func NewProducerPool added in v0.1.1

func NewProducerPool(brokers []string, configProvider func() *sarama.Config) *ProducerPool

func (*ProducerPool) Close added in v0.1.1

func (p *ProducerPool) Close()

func (*ProducerPool) Producers added in v0.1.1

func (p *ProducerPool) Producers() map[common.Topic][]sarama.AsyncProducer

func (*ProducerPool) Return added in v0.1.1

func (p *ProducerPool) Return(producer sarama.AsyncProducer, topic common.Topic)

Return returns a producer to the pool.

func (*ProducerPool) Take added in v0.1.1

func (p *ProducerPool) Take(topic common.Topic) (producer sarama.AsyncProducer)

Take returns a producer for a given topic. If the producer does not exist, it creates a new one.

type StreamConsumer

type StreamConsumer struct {
	// contains filtered or unexported fields
}

StreamConsumer is a consumer that consumes messages from a Kafka topic and produces them to other topics. It implements the sarama.ConsumerGroupHandler interface.

func NewStreamConsumer

func NewStreamConsumer(
	origin common.Topic, groupId string,
	brokers []string, config *sarama.Config, producerConfig *sarama.Config,
) *StreamConsumer

func (*StreamConsumer) AddDestination

func (consumer *StreamConsumer) AddDestination(dest common.Topic, serializer common.MessageSerializer)

func (*StreamConsumer) Cleanup

func (consumer *StreamConsumer) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.

func (*StreamConsumer) CloseGroup added in v0.1.1

func (consumer *StreamConsumer) CloseGroup()

func (*StreamConsumer) ConsumeClaim

func (consumer *StreamConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). NOTE: This must not be called within a goroutine, already handled as goroutines by sarama.

func (*StreamConsumer) Destinations added in v0.1.1

func (consumer *StreamConsumer) Destinations() []common.Topic

func (*StreamConsumer) HandleTxnError added in v0.1.1

func (consumer *StreamConsumer) HandleTxnError(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, err error, defaulthandler func() error)

HandleTxnError handles transaction errors, this exported method is only for testing purposes.

func (*StreamConsumer) MessageSerializers added in v0.1.1

func (consumer *StreamConsumer) MessageSerializers() []common.MessageSerializer

func (*StreamConsumer) ProducerPool added in v0.1.1

func (consumer *StreamConsumer) ProducerPool() *ProducerPool

func (*StreamConsumer) Setup

func (consumer *StreamConsumer) Setup(session sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim.

func (*StreamConsumer) StartAsGroup

func (consumer *StreamConsumer) StartAsGroup(ctx context.Context, handler sarama.ConsumerGroupHandler)

func (*StreamConsumer) StartAsGroupSelf added in v0.1.1

func (consumer *StreamConsumer) StartAsGroupSelf(ctx context.Context)

func (*StreamConsumer) Transaction added in v0.1.1

func (consumer *StreamConsumer) Transaction(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL