kaf

package
v0.1.48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2022 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrKafClosed = errors.New("kaf object is closed")
)

Functions

This section is empty.

Types

type KafReader

type KafReader interface {
	Close() error
	CommitMessages(ctx context.Context, msgs ...kafka.Message) error
	FetchMessage(ctx context.Context) (kafka.Message, error)
}

Interface for the real Kafka Reader to allow mock for testing.

type KafReaderFunc

type KafReaderFunc func(config *kafka.ReaderConfig) KafReader

type KafRecv

type KafRecv struct {
	// contains filtered or unexported fields
}

func NewKafRecv

func NewKafRecv(kafkaUrl, component, topic string, cb KafRecvFunc) *KafRecv

Create a KafRecv and start a goroutine to handle incoming messages on the given topic, passing each incoming message to the callback handler.

func NewKafRecvFromLatest added in v0.1.15

func NewKafRecvFromLatest(kafkaUrl, component, topic string, cb KafRecvFunc) *KafRecv

Create a KafRecv and receive from LATEST/new messages. This is different from NewKafRecv which receives from beginning of msgs

func (*KafRecv) Close

func (kr *KafRecv) Close()

Close and terminate the goroutine message handler.

type KafRecvFunc

type KafRecvFunc func(key, value []byte)

type KafSend

type KafSend struct {
	// contains filtered or unexported fields
}

func NewKafSend

func NewKafSend(kafkaUrl, topic string) *KafSend

Create a KafSend and start a goroutine to handle outgoing messages and react to connectivity failures.

func (*KafSend) Close

func (ks *KafSend) Close()

Close and terminate the goroutine message handler.

func (*KafSend) SendMessage

func (ks *KafSend) SendMessage(key, val []byte) error

The API to pass a message to the goroutine sending outgoing messages.

type KafWriter

type KafWriter interface {
	Close() error
	WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}

Interface for the real Kafka Writer to allow mock for testing.

type KafWriterFunc

type KafWriterFunc func(config *kafka.WriterConfig) KafWriter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL