Documentation ¶
Index ¶
- Constants
- func DefaultSaramaSubscriberConfig() *sarama.Config
- func DefaultSaramaSyncPublisherConfig() *sarama.Config
- func NewPublisher(brokers []string, marshaler Marshaler, overwriteSaramaConfig *sarama.Config, ...) (message.Publisher, error)
- func NewSubscriber(config SubscriberConfig, overwriteSaramaConfig *sarama.Config, ...) (message.Subscriber, error)
- type DefaultMarshaler
- type GeneratePartitionKey
- type Marshaler
- type MarshalerUnmarshaler
- type Publisher
- type Subscriber
- type SubscriberConfig
- type Unmarshaler
Constants ¶
View Source
const NoSleep time.Duration = -1
NoSleep can be set to SubscriberConfig.NackResendSleep and SubscriberConfig.ReconnectRetrySleep.
View Source
const UUIDHeaderKey = "_watermill_message_uuid"
Variables ¶
This section is empty.
Functions ¶
func DefaultSaramaSubscriberConfig ¶ added in v0.2.0
DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
Custom config can be passed to NewSubscriber and NewPublisher.
saramaConfig := DefaultSaramaSubscriberConfig() saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest subscriber, err := NewSubscriber(watermillConfig, saramaConfig, unmarshaler, logger) // ...
func DefaultSaramaSyncPublisherConfig ¶ added in v0.2.0
func NewPublisher ¶
func NewPublisher( brokers []string, marshaler Marshaler, overwriteSaramaConfig *sarama.Config, logger watermill.LoggerAdapter, ) (message.Publisher, error)
NewPublisher creates a new Kafka Publisher.
func NewSubscriber ¶ added in v0.2.0
func NewSubscriber( config SubscriberConfig, overwriteSaramaConfig *sarama.Config, unmarshaler Unmarshaler, logger watermill.LoggerAdapter, ) (message.Subscriber, error)
NewSubscriber creates a new Kafka Subscriber.
Types ¶
type DefaultMarshaler ¶
type DefaultMarshaler struct{}
func (DefaultMarshaler) Marshal ¶
func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
func (DefaultMarshaler) Unmarshal ¶
func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*message.Message, error)
type GeneratePartitionKey ¶
type Marshaler ¶
type Marshaler interface {
Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
}
Marshaler marshals Watermill's message to Kafka message.
type MarshalerUnmarshaler ¶
type MarshalerUnmarshaler interface { Marshaler Unmarshaler }
func NewWithPartitioningMarshaler ¶
func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler
type Publisher ¶ added in v0.2.0
type Publisher struct {
// contains filtered or unexported fields
}
type Subscriber ¶ added in v0.2.0
type Subscriber struct {
// contains filtered or unexported fields
}
func (*Subscriber) Close ¶ added in v0.2.0
func (s *Subscriber) Close() error
func (*Subscriber) Subscribe ¶ added in v0.2.0
Subscribe subscribers for messages in Kafka.
There are multiple subscribers spawned
func (*Subscriber) SubscribeInitialize ¶ added in v0.3.0
func (s *Subscriber) SubscribeInitialize(topic string) (err error)
type SubscriberConfig ¶
type SubscriberConfig struct { // Kafka brokers list. Brokers []string // Kafka consumer group. // When empty, all messages from all partitions will be returned. ConsumerGroup string // How long after Nack message should be redelivered. NackResendSleep time.Duration // How long about unsuccessful reconnecting next reconnect will occur. ReconnectRetrySleep time.Duration InitializeTopicDetails *sarama.TopicDetail }
func (SubscriberConfig) Validate ¶
func (c SubscriberConfig) Validate() error
type Unmarshaler ¶
type Unmarshaler interface {
Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
}
Unmarshaler unmarshals Kafka's message to Watermill's message.
Click to show internal directories.
Click to hide internal directories.