Documentation ¶
Index ¶
- Constants
- func DefaultSaramaSubscriberConfig() *sarama.Config
- func DefaultSaramaSyncPublisherConfig() *sarama.Config
- func MessagePartitionFromCtx(ctx context.Context) (int32, bool)
- func MessagePartitionOffsetFromCtx(ctx context.Context) (int64, bool)
- func MessageTimestampFromCtx(ctx context.Context) (time.Time, bool)
- type DefaultMarshaler
- type GeneratePartitionKey
- type Marshaler
- type MarshalerUnmarshaler
- type PartitionOffset
- type Publisher
- type PublisherConfig
- type Subscriber
- type SubscriberConfig
- type Unmarshaler
Constants ¶
const NoSleep time.Duration = -1
NoSleep can be set to SubscriberConfig.NackResendSleep and SubscriberConfig.ReconnectRetrySleep.
const UUIDHeaderKey = "_watermill_message_uuid"
Variables ¶
This section is empty.
Functions ¶
func DefaultSaramaSubscriberConfig ¶
DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
Custom config can be passed to NewSubscriber and NewPublisher.
saramaConfig := DefaultSaramaSubscriberConfig() saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest subscriberConfig.OverwriteSaramaConfig = saramaConfig subscriber, err := NewSubscriber(subscriberConfig, logger) // ...
func MessagePartitionFromCtx ¶
MessagePartitionFromCtx returns Kafka partition of the consumed message
func MessagePartitionOffsetFromCtx ¶
MessagePartitionOffsetFromCtx returns Kafka partition offset of the consumed message
Types ¶
type DefaultMarshaler ¶
type DefaultMarshaler struct{}
func (DefaultMarshaler) Marshal ¶
func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
func (DefaultMarshaler) Unmarshal ¶
func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*message.Message, error)
type GeneratePartitionKey ¶
type Marshaler ¶
type Marshaler interface {
Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
}
Marshaler marshals Watermill's message to Kafka message.
type MarshalerUnmarshaler ¶
type MarshalerUnmarshaler interface { Marshaler Unmarshaler }
func NewWithPartitioningMarshaler ¶
func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler
type PartitionOffset ¶
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher( config PublisherConfig, logger watermill.LoggerAdapter, ) (*Publisher, error)
NewPublisher creates a new Kafka Publisher.
type PublisherConfig ¶
type PublisherConfig struct { // Kafka brokers list. Brokers []string // Marshaler is used to marshal messages from Watermill format into Kafka format. Marshaler Marshaler // OverwriteSaramaConfig holds additional sarama settings. OverwriteSaramaConfig *sarama.Config // If true then each sent message will be wrapped with Opentelemetry tracing, provided by otelsarama. TracingEnabled bool }
func (PublisherConfig) Validate ¶
func (c PublisherConfig) Validate() error
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber( config SubscriberConfig, logger watermill.LoggerAdapter, ) (*Subscriber, error)
NewSubscriber creates a new Kafka Subscriber.
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
func (*Subscriber) PartitionOffset ¶
func (s *Subscriber) PartitionOffset(topic string) (PartitionOffset, error)
func (*Subscriber) Subscribe ¶
Subscribe subscribers for messages in Kafka.
There are multiple subscribers spawned
func (*Subscriber) SubscribeInitialize ¶
func (s *Subscriber) SubscribeInitialize(topic string) (err error)
type SubscriberConfig ¶
type SubscriberConfig struct { // Kafka brokers list. Brokers []string // Unmarshaler is used to unmarshal messages from Kafka format into Watermill format. Unmarshaler Unmarshaler // OverwriteSaramaConfig holds additional sarama settings. OverwriteSaramaConfig *sarama.Config // Kafka consumer group. // When empty, all messages from all partitions will be returned. ConsumerGroup string // How long after Nack message should be redelivered. NackResendSleep time.Duration // How long about unsuccessful reconnecting next reconnect will occur. ReconnectRetrySleep time.Duration InitializeTopicDetails *sarama.TopicDetail // If true then each consumed message will be wrapped with Opentelemetry tracing, provided by otelsarama. TracingEnabled bool }
func (SubscriberConfig) Validate ¶
func (c SubscriberConfig) Validate() error
type Unmarshaler ¶
type Unmarshaler interface {
Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
}
Unmarshaler unmarshals Kafka's message to Watermill's message.