Versions in this module Expand all Collapse all v0 v0.1.1 Nov 12, 2018 Changes in this version + const UUIDHeaderKey + var ErrClosingConsumer = errors.New("closing subscriber") + var ErrNackReceived = errors.New("closing subscriber") + func DefaultConfluentConsumerConstructor(config SubscriberConfig) (*kafka.Consumer, error) + func NewConfluentSubscriber(config SubscriberConfig, unmarshaler Unmarshaler, ...) (message.Subscriber, error) + func NewCustomConfluentSubscriber(config SubscriberConfig, unmarshaler Unmarshaler, ...) (message.Subscriber, error) + func NewCustomPublisher(producer *kafka.Producer, marshaler Marshaler) (message.Publisher, error) + func NewPublisher(brokers []string, marshaler Marshaler, kafkaConfigOverwrite kafka.ConfigMap) (message.Publisher, error) + type ConfluentConsumerConstructor func(config SubscriberConfig) (*kafka.Consumer, error) + type DefaultMarshaler struct + func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) + func (DefaultMarshaler) Unmarshal(kafkaMsg *confluentKafka.Message) (*message.Message, error) + type GeneratePartitionKey func(topic string, msg *message.Message) (string, error) + type Marshaler interface + Marshal func(topic string, msg *message.Message) (*confluentKafka.Message, error) + type MarshalerUnmarshaler interface + func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler + type SubscriberConfig struct + AutoOffsetReset string + Brokers []string + ConsumerGroup string + ConsumersCount int + KafkaConfigOverwrite kafka.ConfigMap + NoConsumerGroup bool + func (c SubscriberConfig) Validate() error + type Unmarshaler interface + Unmarshal func(*confluentKafka.Message) (*message.Message, error) v0.1.0 Nov 12, 2018