reader

package
v5.4.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KafkaReadTimeout = 10 * time.Minute
	KafkaWaitTimeout = 11 * time.Minute
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	KafkaAddr []string
	// the CommitTs of binlog return by reader will bigger than the config CommitTs
	CommitTS int64
	Offset   int64 // start at kafka offset
	// if Topic is empty, use the default name in drainer <ClusterID>_obinlog
	Topic     string
	ClusterID string
	// buffer size of messages of the reader internal.
	// default value 1.
	// suggest only setting the buffer size of this if you want the reader to buffer
	// message internal and leave `SaramaBufferSize` as 1 default.
	MessageBufferSize int
	// the sarama internal buffer size of messages.
	SaramaBufferSize int
}

Config for Reader

type KafkaSeeker

type KafkaSeeker struct {
	// contains filtered or unexported fields
}

KafkaSeeker seeks offset in kafka topics by given condition

func NewKafkaSeeker

func NewKafkaSeeker(addr []string, config *sarama.Config) (*KafkaSeeker, error)

NewKafkaSeeker creates an instance of KafkaSeeker

func (*KafkaSeeker) Close

func (ks *KafkaSeeker) Close()

Close releases resources of KafkaSeeker

func (*KafkaSeeker) Seek

func (ks *KafkaSeeker) Seek(topic string, ts int64, partitions []int32) (offsets []int64, err error)

Seek seeks the first offset which binlog CommitTs bigger than ts

type Message

type Message struct {
	Binlog *pb.Binlog
	Offset int64 // kafka offset
}

Message read from reader

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader to read binlog from kafka

func NewReader

func NewReader(cfg *Config) (r *Reader, err error)

NewReader creates an instance of Reader

func (*Reader) Close

func (r *Reader) Close()

Close shuts down the reader

func (*Reader) Messages

func (r *Reader) Messages() (msgs <-chan *Message)

Messages returns a chan that contains unread buffered message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL