kafka

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2019 License: MIT Imports: 9 Imported by: 8

Documentation

Index

Constants

View Source
const NoSleep time.Duration = -1

NoSleep can be set to SubscriberConfig.NackResendSleep and SubscriberConfig.ReconnectRetrySleep.

View Source
const UUIDHeaderKey = "_watermill_message_uuid"

Variables

This section is empty.

Functions

func DefaultSaramaSubscriberConfig

func DefaultSaramaSubscriberConfig() *sarama.Config

DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.

Custom config can be passed to NewSubscriber and NewPublisher.

saramaConfig := DefaultSaramaSubscriberConfig()
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

subscriber, err := NewSubscriber(watermillConfig, saramaConfig, unmarshaler, logger)
// ...

func DefaultSaramaSyncPublisherConfig

func DefaultSaramaSyncPublisherConfig() *sarama.Config

func NewPublisher

func NewPublisher(
	brokers []string,
	marshaler Marshaler,
	overwriteSaramaConfig *sarama.Config,
	logger watermill.LoggerAdapter,
) (message.Publisher, error)

NewPublisher creates a new Kafka Publisher.

func NewSubscriber

func NewSubscriber(
	config SubscriberConfig,
	overwriteSaramaConfig *sarama.Config,
	unmarshaler Unmarshaler,
	logger watermill.LoggerAdapter,
) (message.Subscriber, error)

NewSubscriber creates a new Kafka Subscriber.

Types

type DefaultMarshaler

type DefaultMarshaler struct{}

func (DefaultMarshaler) Marshal

func (DefaultMarshaler) Unmarshal

func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*message.Message, error)

type GeneratePartitionKey

type GeneratePartitionKey func(topic string, msg *message.Message) (string, error)

type Marshaler

type Marshaler interface {
	Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
}

Marshaler marshals Watermill's message to Kafka message.

type MarshalerUnmarshaler

type MarshalerUnmarshaler interface {
	Marshaler
	Unmarshaler
}

func NewWithPartitioningMarshaler

func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, msgs ...*message.Message) error

Publish publishes message to Kafka.

Publish is blocking and wait for ack from Kafka. When one of messages delivery fails - function is interrupted.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe subscribers for messages in Kafka.

There are multiple subscribers spawned

func (*Subscriber) SubscribeInitialize

func (s *Subscriber) SubscribeInitialize(topic string) (err error)

type SubscriberConfig

type SubscriberConfig struct {
	// Kafka brokers list.
	Brokers []string

	// Kafka consumer group.
	// When empty, all messages from all partitions will be returned.
	ConsumerGroup string

	// How long after Nack message should be redelivered.
	NackResendSleep time.Duration

	// How long about unsuccessful reconnecting next reconnect will occur.
	ReconnectRetrySleep time.Duration

	InitializeTopicDetails *sarama.TopicDetail
}

func (SubscriberConfig) Validate

func (c SubscriberConfig) Validate() error

type Unmarshaler

type Unmarshaler interface {
	Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
}

Unmarshaler unmarshals Kafka's message to Watermill's message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL