kafka

package
v0.0.0-...-bfe46a2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: MPL-2.0 Imports: 18 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consume

func Consume(ctx context.Context, reader Consumer, handler Handler, errorHandler ErrorHandler) error

Consume implements consumer loop.

func GenerateCodecs

func GenerateCodecs(codecs map[string]string) (map[string]*goavro.Codec, error)

GenerateCodecs - create a map of codec name to the avro codec

func InitKafkaWriter

func InitKafkaWriter(ctx context.Context, topic string) (*kafka.Writer, *kafka.Dialer, error)

InitKafkaWriter - create a kafka writer given a topic

func InstrumentKafka

func InstrumentKafka(ctx context.Context)

InstrumentKafka - setup instrumentation and metrics around our kafka connection

func TLSDialer

func TLSDialer() (*kafka.Dialer, *x509.Certificate, error)

TLSDialer creates a Kafka dialer over TLS. The function requires KAFKA_SSL_CERTIFICATE_LOCATION and KAFKA_SSL_KEY_LOCATION environment variables to be set.

Types

type Consumer

type Consumer interface {
	ReadMessage(ctx context.Context) (kafka.Message, error)
	FetchMessage(ctx context.Context) (kafka.Message, error)
	CommitMessages(ctx context.Context, messages ...kafka.Message) error
	Close() error
}

Consumer defines methods for consuming kafka messages.

type ErrorHandler

type ErrorHandler interface {
	Handle(ctx context.Context, message kafkago.Message, errorMessage error) error
}

ErrorHandler defines an error handler.

type Handler

type Handler interface {
	Handle(ctx context.Context, message kafkago.Message) error
}

Handler defines a handler.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader is an implementation of the kafka.Consumer interface.

func NewKafkaReader

func NewKafkaReader(ctx context.Context, groupID string, topic string) (*Reader, error)

NewKafkaReader creates a new kafka reader for groupID and topic.

func (*Reader) Close

func (k *Reader) Close() error

func (*Reader) CommitMessages

func (k *Reader) CommitMessages(ctx context.Context, messages ...kafka.Message) error

CommitMessages commits the list of messages passed as argument.

func (*Reader) FetchMessage

func (k *Reader) FetchMessage(ctx context.Context) (kafka.Message, error)

FetchMessage reads and return the next message. FetchMessage does not commit offsets automatically when using consumer groups.

func (*Reader) ReadMessage

func (k *Reader) ReadMessage(ctx context.Context) (kafka.Message, error)

ReadMessage - reads kafka messages

Directories

Path Synopsis
Package mockdialer is a generated GoMock package.
Package mockdialer is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL