requeuer

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2024 License: MIT Imports: 7 Imported by: 1

Documentation

Index

Constants

View Source
const RetriesKey = "_watermill_requeuer_retries"

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Subscriber is the subscriber to consume messages from. Required.
	Subscriber message.Subscriber

	// SubscribeTopic is the topic related to the Subscriber to consume messages from. Required.
	SubscribeTopic string

	// Publisher is the publisher to publish requeued messages to. Required.
	Publisher message.Publisher

	// GeneratePublishTopic is the topic related to the Publisher to publish the requeued message to.
	// For example, it could be a constant, or taken from the message's metadata.
	// Required.
	GeneratePublishTopic func(params GeneratePublishTopicParams) (string, error)

	// Delay is the duration to wait before requeueing the message. Optional.
	// The default is no delay.
	//
	// This can be useful to avoid requeueing messages too quickly, for example, to avoid
	// requeueing a message that failed to process due to a temporary issue.
	//
	// Avoid setting this to a very high value, as it will block the message processing.
	Delay time.Duration

	// Router is the custom router to run the requeue handler on. Optional.
	Router *message.Router
}

Config is the configuration for the Requeuer.

type GeneratePublishTopicParams

type GeneratePublishTopicParams struct {
	Message *message.Message
}

GeneratePublishTopicParams are the parameters passed to the GeneratePublishTopic function.

type Requeuer

type Requeuer struct {
	// contains filtered or unexported fields
}

Requeuer is a component that moves messages from one topic to another. It can be used to requeue messages that failed to process.

func NewRequeuer

func NewRequeuer(
	config Config,
	logger watermill.LoggerAdapter,
) (*Requeuer, error)

NewRequeuer creates a new Requeuer with the provided Config. It's not started automatically. You need to call Run on the returned Requeuer.

func (*Requeuer) Run

func (r *Requeuer) Run(ctx context.Context) error

Run runs the Requeuer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL