forwarder

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: Apache-2.0, MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// ForwarderTopic is a topic on which the forwarder will be listening to enveloped messages to forward.
	// Defaults to `forwarder_topic`.
	ForwarderTopic string

	// Middlewares are used to decorate forwarder's handler function.
	Middlewares []message.HandlerMiddleware

	// CloseTimeout determines how long router should work for handlers when closing.
	CloseTimeout time.Duration

	// AckWhenCannotUnwrap enables acking of messages which cannot be unwrapped from an envelope.
	AckWhenCannotUnwrap bool

	// Router is a router used by the forwarder.
	// If not provided, a new router will be created.
	//
	// If router is provided, it's not necessary to call `Forwarder.Run()` if the router is started with `router.Run()`.
	Router *message.Router
}

func (*Config) Validate

func (c *Config) Validate() error

type Forwarder

type Forwarder struct {
	// contains filtered or unexported fields
}

Forwarder subscribes to the topic provided in the config and publishes them to the destination topic embedded in the enveloped message.

func NewForwarder

func NewForwarder(subscriberIn message.Subscriber, publisherOut message.Publisher,
	logger watermill.LoggerAdapter, config Config) (*Forwarder, error)

NewForwarder creates a forwarder which will subscribe to the topic provided in the config using the provided subscriber. It will publish messages received on this subscription to the destination topic embedded in the enveloped message using the provided publisher.

Provided subscriber and publisher can be from different Watermill Pub/Sub implementations, i.e. MySQL subscriber and Google Pub/Sub publisher.

Note: Keep in mind that by default the forwarder will nack all messages which weren't sent using a decorated publisher. You can change this behavior by passing a middleware which will ack them instead.

func (*Forwarder) Close

func (f *Forwarder) Close() error

Close stops forwarder's handler.

func (*Forwarder) Run

func (f *Forwarder) Run(ctx context.Context) error

Run runs forwarder's handler responsible for forwarding messages. This call is blocking while the forwarder is running. ctx will be propagated to the forwarder's subscription.

To stop Run() you should call Close() on the forwarder.

func (*Forwarder) Running

func (f *Forwarder) Running() chan struct{}

Running returns channel which is closed when the forwarder is running.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher changes `Publish` method behavior so it wraps a sent message in an envelope and sends it to the forwarder topic provided in the config.

func NewPublisher

func NewPublisher(publisher message.Publisher, config PublisherConfig) *Publisher

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, topic string, messages ...*message.Message) error

type PublisherConfig

type PublisherConfig struct {
	// ForwarderTopic is a topic which the forwarder is listening to. Publisher will send enveloped messages to this topic.
	// Defaults to `forwarder_topic`.
	ForwarderTopic string
}

func (*PublisherConfig) Validate

func (c *PublisherConfig) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL