Documentation ¶
Index ¶
- func NewMockConsumer(topics *admin.Topics) *mockConsumer
- func NewMockPartitionConsumer(topics *admin.Topics, offsets offsets.Manager) *mockPartitionConsumer
- type Builder
- type BuilderOption
- type Config
- type Consumer
- type Error
- type Event
- type MockConsumerBuilder
- type MockPartitionConsumerBuilder
- type Offset
- type Partition
- type PartitionAllocated
- type PartitionConsumer
- type PartitionConsumerBuilder
- type PartitionEnd
- type PartitionRemoved
- type ReBalanceHandler
- type TopicPartition
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewMockConsumer ¶
Types ¶
type Builder ¶
type Builder interface { Config() *Config Build(options ...BuilderOption) (Consumer, error) }
func NewBuilder ¶
func NewBuilder() Builder
func NewMockConsumerBuilder ¶
type BuilderOption ¶
type BuilderOption func(config *Config)
func BuilderWithGroupId ¶
func BuilderWithGroupId(id string) BuilderOption
func BuilderWithId ¶
func BuilderWithId(id string) BuilderOption
func BuilderWithLogger ¶
func BuilderWithLogger(logger log.Logger) BuilderOption
func BuilderWithMetricsReporter ¶
func BuilderWithMetricsReporter(reporter metrics.Reporter) BuilderOption
type Config ¶
type Config struct { Id string GroupId string BootstrapServers []string MetricsReporter metrics.Reporter Logger log.Logger *sarama.Config }
func NewConsumerConfig ¶
func NewConsumerConfig() *Config
type Consumer ¶
type Consumer interface { Consume(tps []string, handler ReBalanceHandler) (chan Partition, error) Errors() <-chan *Error Close() error }
func NewConsumer ¶
type MockConsumerBuilder ¶
type MockConsumerBuilder struct { Builder // contains filtered or unexported fields }
func (*MockConsumerBuilder) Build ¶
func (mb *MockConsumerBuilder) Build(options ...BuilderOption) (Consumer, error)
type MockPartitionConsumerBuilder ¶
type MockPartitionConsumerBuilder struct { PartitionConsumerBuilder // contains filtered or unexported fields }
func (*MockPartitionConsumerBuilder) Build ¶
func (mb *MockPartitionConsumerBuilder) Build(options ...BuilderOption) (PartitionConsumer, error)
type PartitionAllocated ¶
type PartitionAllocated struct {
// contains filtered or unexported fields
}
func (*PartitionAllocated) String ¶
func (p *PartitionAllocated) String() string
func (*PartitionAllocated) TopicPartitions ¶
func (p *PartitionAllocated) TopicPartitions() []TopicPartition
type PartitionConsumer ¶
type PartitionConsumer interface { Consume(topic string, partition int32, offset Offset) (<-chan Event, error) Errors() <-chan *Error Close() error Id() string }
func NewPartitionConsumer ¶
func NewPartitionConsumer(c *Config) (PartitionConsumer, error)
type PartitionConsumerBuilder ¶
type PartitionConsumerBuilder interface { Config() *Config Build(options ...BuilderOption) (PartitionConsumer, error) }
func NewMockPartitionConsumerBuilder ¶
func NewMockPartitionConsumerBuilder(topics *admin.Topics, offsets offsets.Manager) PartitionConsumerBuilder
func NewPartitionConsumerBuilder ¶
func NewPartitionConsumerBuilder() PartitionConsumerBuilder
type PartitionEnd ¶
type PartitionEnd struct {
// contains filtered or unexported fields
}
func (*PartitionEnd) String ¶
func (p *PartitionEnd) String() string
func (*PartitionEnd) TopicPartitions ¶
func (p *PartitionEnd) TopicPartitions() []TopicPartition
type PartitionRemoved ¶
type PartitionRemoved struct {
// contains filtered or unexported fields
}
func (*PartitionRemoved) String ¶
func (p *PartitionRemoved) String() string
func (*PartitionRemoved) TopicPartitions ¶
func (p *PartitionRemoved) TopicPartitions() []TopicPartition
type ReBalanceHandler ¶
type ReBalanceHandler interface { OnPartitionRevoked(ctx context.Context, revoked []TopicPartition) error OnPartitionAssigned(ctx context.Context, assigned []TopicPartition) error }
type TopicPartition ¶
func (TopicPartition) String ¶
func (tp TopicPartition) String() string
Click to show internal directories.
Click to hide internal directories.