Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Builder ¶
func NewBuilder ¶
func NewBuilder() Builder
type Config ¶
type Config struct { Id string GroupId string BootstrapServers []string MetricsReporter metrics.Reporter Logger log.PrefixedLogger *sarama.Config }
func NewConsumerConfig ¶
func NewConsumerConfig() *Config
type Consumer ¶
type Consumer interface { Consume(tps []string, handler ReBalanceHandler) (chan Partition, error) Errors() <-chan *Error Close() error }
func NewConsumer ¶
type OffsetManager ¶
func (OffsetManager) OffsetValid ¶
func (m OffsetManager) OffsetValid(tp TopicPartition, offset int64) (isValid bool, valid int64, err error)
type PartitionAllocated ¶
type PartitionAllocated struct {
// contains filtered or unexported fields
}
func (*PartitionAllocated) String ¶
func (p *PartitionAllocated) String() string
func (*PartitionAllocated) TopicPartitions ¶
func (p *PartitionAllocated) TopicPartitions() []TopicPartition
type PartitionConsumer ¶
type PartitionConsumer interface { Consume(topic string, partition int32, offset Offset) (<-chan Event, error) Errors() <-chan *Error GetOldestOffset(topic string, partition int32) (int64, error) GetLatestOffset(topic string, partition int32) (int64, error) Close() error Id() string }
func NewPartitionConsumer ¶
func NewPartitionConsumer(c *Config) (PartitionConsumer, error)
type PartitionConsumerBuilder ¶
type PartitionConsumerBuilder interface { Configure(*Config) Config() *Config Build() (PartitionConsumer, error) }
func NewPartitionConsumerBuilder ¶
func NewPartitionConsumerBuilder() PartitionConsumerBuilder
type PartitionEnd ¶
type PartitionEnd struct {
// contains filtered or unexported fields
}
func (*PartitionEnd) String ¶
func (p *PartitionEnd) String() string
func (*PartitionEnd) TopicPartitions ¶
func (p *PartitionEnd) TopicPartitions() []TopicPartition
type PartitionRemoved ¶
type PartitionRemoved struct {
// contains filtered or unexported fields
}
func (*PartitionRemoved) String ¶
func (p *PartitionRemoved) String() string
func (*PartitionRemoved) TopicPartitions ¶
func (p *PartitionRemoved) TopicPartitions() []TopicPartition
type ReBalanceHandler ¶
type ReBalanceHandler interface { OnPartitionRevoked(ctx context.Context, revoked []TopicPartition) error OnPartitionAssigned(ctx context.Context, assigned []TopicPartition) error }
type Record ¶
type Record struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
Headers []*RecordHeader // only set if kafka is version 0.11+
UUID uuid.UUID
}
func (*Record) RecordValue ¶
func (r *Record) RecordValue() interface{}
type RecordHeader ¶
type TopicPartition ¶
func (TopicPartition) String ¶
func (tp TopicPartition) String() string
Click to show internal directories.
Click to hide internal directories.