Documentation
¶
Index ¶
- Constants
- Variables
- func DefaultConfluentConsumerConstructor(config SubscriberConfig) (*kafka.Consumer, error)
- func NewConfluentSubscriber(config SubscriberConfig, unmarshaler Unmarshaler, ...) (message.Subscriber, error)
- func NewCustomConfluentSubscriber(config SubscriberConfig, unmarshaler Unmarshaler, ...) (message.Subscriber, error)
- func NewCustomPublisher(producer *kafka.Producer, marshaler Marshaler) (message.Publisher, error)
- func NewPublisher(brokers []string, marshaler Marshaler, kafkaConfigOverwrite kafka.ConfigMap) (message.Publisher, error)
- type ConfluentConsumerConstructor
- type DefaultMarshaler
- type GeneratePartitionKey
- type Marshaler
- type MarshalerUnmarshaler
- type SubscriberConfig
- type Unmarshaler
Constants ¶
View Source
const UUIDHeaderKey = "_watermill_message_uuid"
Variables ¶
View Source
var ( ErrClosingConsumer = errors.New("closing subscriber") ErrNackReceived = errors.New("closing subscriber") )
Functions ¶
func DefaultConfluentConsumerConstructor ¶
func DefaultConfluentConsumerConstructor(config SubscriberConfig) (*kafka.Consumer, error)
func NewConfluentSubscriber ¶
func NewConfluentSubscriber( config SubscriberConfig, unmarshaler Unmarshaler, logger watermill.LoggerAdapter, ) (message.Subscriber, error)
func NewCustomConfluentSubscriber ¶
func NewCustomConfluentSubscriber( config SubscriberConfig, unmarshaler Unmarshaler, consumerConstructor ConfluentConsumerConstructor, logger watermill.LoggerAdapter, ) (message.Subscriber, error)
func NewCustomPublisher ¶
Types ¶
type ConfluentConsumerConstructor ¶
type ConfluentConsumerConstructor func(config SubscriberConfig) (*kafka.Consumer, error)
type DefaultMarshaler ¶
type DefaultMarshaler struct{}
func (DefaultMarshaler) Marshal ¶
func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error)
func (DefaultMarshaler) Unmarshal ¶
func (DefaultMarshaler) Unmarshal(kafkaMsg *confluentKafka.Message) (*message.Message, error)
type GeneratePartitionKey ¶
type MarshalerUnmarshaler ¶
type MarshalerUnmarshaler interface { Marshaler Unmarshaler }
func NewWithPartitioningMarshaler ¶
func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler
type SubscriberConfig ¶
type SubscriberConfig struct { Brokers []string ConsumerGroup string NoConsumerGroup bool AutoOffsetReset string ConsumersCount int KafkaConfigOverwrite kafka.ConfigMap }
func (SubscriberConfig) Validate ¶
func (c SubscriberConfig) Validate() error
type Unmarshaler ¶
type Unmarshaler interface {
Unmarshal(*confluentKafka.Message) (*message.Message, error)
}
Click to show internal directories.
Click to hide internal directories.