Documentation ¶
Index ¶
- func Consume(ctx context.Context, reader Consumer, handler Handler, ...) error
- func GenerateCodecs(codecs map[string]string) (map[string]*goavro.Codec, error)
- func InitKafkaWriter(ctx context.Context, topic string) (*kafka.Writer, *kafka.Dialer, error)
- func InstrumentKafka(ctx context.Context)
- func TLSDialer() (*kafka.Dialer, *x509.Certificate, error)
- type Consumer
- type ErrorHandler
- type Handler
- type Reader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Consume ¶
func Consume(ctx context.Context, reader Consumer, handler Handler, errorHandler ErrorHandler) error
Consume implements consumer loop.
func GenerateCodecs ¶
GenerateCodecs - create a map of codec name to the avro codec
func InitKafkaWriter ¶
InitKafkaWriter - create a kafka writer given a topic
func InstrumentKafka ¶
InstrumentKafka - setup instrumentation and metrics around our kafka connection
func TLSDialer ¶
func TLSDialer() (*kafka.Dialer, *x509.Certificate, error)
TLSDialer creates a Kafka dialer over TLS. The function requires KAFKA_SSL_CERTIFICATE_LOCATION and KAFKA_SSL_KEY_LOCATION environment variables to be set.
Types ¶
type Consumer ¶
type Consumer interface { ReadMessage(ctx context.Context) (kafka.Message, error) FetchMessage(ctx context.Context) (kafka.Message, error) CommitMessages(ctx context.Context, messages ...kafka.Message) error Close() error }
Consumer defines methods for consuming kafka messages.
type ErrorHandler ¶
type ErrorHandler interface {
Handle(ctx context.Context, message kafkago.Message, errorMessage error) error
}
ErrorHandler defines an error handler.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader is an implementation of the kafka.Consumer interface.
func NewKafkaReader ¶
NewKafkaReader creates a new kafka reader for groupID and topic.
func (*Reader) CommitMessages ¶
CommitMessages commits the list of messages passed as argument.
func (*Reader) FetchMessage ¶
FetchMessage reads and return the next message. FetchMessage does not commit offsets automatically when using consumer groups.