Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrKafClosed = errors.New("kaf object is closed")
)
Functions ¶
This section is empty.
Types ¶
type KafReader ¶
type KafReader interface { Close() error CommitMessages(ctx context.Context, msgs ...kafka.Message) error FetchMessage(ctx context.Context) (kafka.Message, error) }
Interface for the real Kafka Reader to allow mock for testing.
type KafReaderFunc ¶
type KafReaderFunc func(config *kafka.ReaderConfig) KafReader
type KafRecv ¶
type KafRecv struct {
// contains filtered or unexported fields
}
func NewKafRecv ¶
func NewKafRecv(kafkaUrl, component, topic string, cb KafRecvFunc) *KafRecv
Create a KafRecv and start a goroutine to handle incoming messages on the given topic, passing each incoming message to the callback handler.
func NewKafRecvFromLatest ¶ added in v0.1.15
func NewKafRecvFromLatest(kafkaUrl, component, topic string, cb KafRecvFunc) *KafRecv
Create a KafRecv and receive from LATEST/new messages. This is different from NewKafRecv which receives from beginning of msgs
type KafRecvFunc ¶
type KafRecvFunc func(key, value []byte)
type KafSend ¶
type KafSend struct {
// contains filtered or unexported fields
}
func NewKafSend ¶
Create a KafSend and start a goroutine to handle outgoing messages and react to connectivity failures.
func (*KafSend) Close ¶
func (ks *KafSend) Close()
Close and terminate the goroutine message handler.
func (*KafSend) SendMessage ¶
The API to pass a message to the goroutine sending outgoing messages.
type KafWriter ¶
type KafWriter interface { Close() error WriteMessages(ctx context.Context, msgs ...kafka.Message) error }
Interface for the real Kafka Writer to allow mock for testing.
type KafWriterFunc ¶
type KafWriterFunc func(config *kafka.WriterConfig) KafWriter
Click to show internal directories.
Click to hide internal directories.