consumer

package
v0.0.2-beta-2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2020 License: MIT Imports: 8 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Builder

type Builder interface {
	Configure(*Config)
	Config() *Config
	Build() (Consumer, error)
}

func NewBuilder

func NewBuilder() Builder

type Config

type Config struct {
	Id               string
	GroupId          string
	BootstrapServers []string
	MetricsReporter  metrics.Reporter
	Logger           log.PrefixedLogger
	*sarama.Config
}

func NewConsumerConfig

func NewConsumerConfig() *Config

type Consumer

type Consumer interface {
	Consume(tps []string, handler ReBalanceHandler) (chan Partition, error)
	Errors() <-chan *Error
	Close() error
}

func NewConsumer

func NewConsumer(config *Config) (Consumer, error)

type Error

type Error struct {
	// contains filtered or unexported fields
}

func (*Error) Error

func (p *Error) Error() string

func (*Error) String

func (p *Error) String() string

type Event

type Event interface {
	String() string
}

type Offset

type Offset int64
const (
	Earliest Offset = -2
	Latest   Offset = -1
)

func (Offset) String

func (o Offset) String() string

type OffsetManager

type OffsetManager struct {
	Client sarama.Client
}

func (OffsetManager) OffsetValid

func (m OffsetManager) OffsetValid(tp TopicPartition, offset int64) (isValid bool, valid int64, err error)

type Partition

type Partition interface {
	Wait() chan<- bool
	Records() <-chan *Record
	Partition() TopicPartition
	MarkOffset(offset int64)
	CommitOffset(*Record) error
}

type PartitionAllocated

type PartitionAllocated struct {
	// contains filtered or unexported fields
}

func (*PartitionAllocated) String

func (p *PartitionAllocated) String() string

func (*PartitionAllocated) TopicPartitions

func (p *PartitionAllocated) TopicPartitions() []TopicPartition

type PartitionConsumer

type PartitionConsumer interface {
	Consume(topic string, partition int32, offset Offset) (<-chan Event, error)
	Errors() <-chan *Error
	GetOldestOffset(topic string, partition int32) (int64, error)
	GetLatestOffset(topic string, partition int32) (int64, error)
	Close() error
	Id() string
}

func NewPartitionConsumer

func NewPartitionConsumer(c *Config) (PartitionConsumer, error)

type PartitionConsumerBuilder

type PartitionConsumerBuilder interface {
	Configure(*Config)
	Config() *Config
	Build() (PartitionConsumer, error)
}

func NewPartitionConsumerBuilder

func NewPartitionConsumerBuilder() PartitionConsumerBuilder

type PartitionEnd

type PartitionEnd struct {
	// contains filtered or unexported fields
}

func (*PartitionEnd) String

func (p *PartitionEnd) String() string

func (*PartitionEnd) TopicPartitions

func (p *PartitionEnd) TopicPartitions() []TopicPartition

type PartitionRemoved

type PartitionRemoved struct {
	// contains filtered or unexported fields
}

func (*PartitionRemoved) String

func (p *PartitionRemoved) String() string

func (*PartitionRemoved) TopicPartitions

func (p *PartitionRemoved) TopicPartitions() []TopicPartition

type ReBalanceHandler

type ReBalanceHandler interface {
	OnPartitionRevoked(ctx context.Context, revoked []TopicPartition) error
	OnPartitionAssigned(ctx context.Context, assigned []TopicPartition) error
}

type Record

type Record struct {
	Key, Value     []byte
	Topic          string
	Partition      int32
	Offset         int64
	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp
	Headers        []*RecordHeader // only set if kafka is version 0.11+
	UUID           uuid.UUID
}

func (*Record) RecordKey

func (r *Record) RecordKey() interface{}

func (*Record) RecordValue

func (r *Record) RecordValue() interface{}

func (*Record) String

func (r *Record) String() string

type RecordHeader

type RecordHeader struct {
	Key   []byte
	Value []byte
}

type TopicPartition

type TopicPartition struct {
	Topic     string
	Partition int32
}

func (TopicPartition) String

func (tp TopicPartition) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL