requestreply

package
v1.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: MIT Imports: 9 Imported by: 1

Documentation

Index

Constants

View Source
const (
	ErrorMetadataKey    = "_watermill_requestreply_error"
	HasErrorMetadataKey = "_watermill_requestreply_has_error"
)
View Source
const OperationIDMetadataKey = "_watermill_requestreply_op_id"

Variables

This section is empty.

Functions

func NewCommandHandler

func NewCommandHandler[Command any](
	handlerName string,
	backend Backend[struct{}],
	handleFunc func(ctx context.Context, cmd *Command) error,
) cqrs.CommandHandler

NewCommandHandler creates a new CommandHandler which supports the request-reply pattern. The result handler is handler compatible with cqrs.CommandHandler.

The logic if a command should be acked or not is based on the logic of the Backend. For example, for the PubSubBackend, it depends on the `PubSubBackendConfig.AckCommandErrors` option.

func NewCommandHandlerWithResult

func NewCommandHandlerWithResult[Command any, Result any](
	handlerName string,
	backend Backend[Result],
	handleFunc func(ctx context.Context, cmd *Command) (Result, error),
) cqrs.CommandHandler

NewCommandHandlerWithResult creates a new CommandHandler which supports the request-reply pattern with a result. The result handler is handler compatible with cqrs.CommandHandler.

In addition to cqrs.CommandHandler, it also allows returning a result from the handler. The result is passed to the Backend implementation and sent to the caller.

The logic if a command should be acked or not is based on the logic of the Backend. For example, for the PubSubBackend, it depends on the `PubSubBackendConfig.AckCommandErrors` option.

The reply is sent to the caller, even if the handler returns an error.

func SendWithReplies

func SendWithReplies[Result any](
	ctx context.Context,
	c CommandBus,
	backend Backend[Result],
	cmd any,
) (replCh <-chan Reply[Result], cancel func(), err error)

SendWithReplies sends command to the command bus and receives a replies of the command handler. It returns a channel with replies, cancel function and error.

SendWithReplies can be cancelled by calling cancel function or by cancelling context or When SendWithReplies is canceled, the returned channel is closed as well. by exceeding the timeout set in the backend (if set). Warning: It's important to cancel the function, because it's listening for the replies in the background. Lack of cancelling the function can lead to subscriber leak.

SendWithReplies can listen for handlers with results (NewCommandHandlerWithResult) and without results (NewCommandHandler). If you are listening for handlers without results, you should pass `NoResult` or `struct{}` as `Result` generic type:

 replyCh, cancel, err := requestreply.SendWithReplies[requestreply.NoResult](
		context.Background(),
		ts.CommandBus,
		ts.RequestReplyBackend,
		&TestCommand{ID: "1"},
	)

If `NewCommandHandlerWithResult` handler returns a specific type, you should pass it as `Result` generic type:

 replyCh, cancel, err := requestreply.SendWithReplies[SomeTypeReturnedByHandler](
		context.Background(),
		ts.CommandBus,
		ts.RequestReplyBackend,
		&TestCommand{ID: "1"},
	)

SendWithReplies will send the replies to the channel until the context is cancelled or the timeout is exceeded. They are multiple cases when more than one reply can be sent:

  • when the handler returns an error, and backend is configured to nack the message on error (for the PubSubBackend, it depends on `PubSubBackendConfig.AckCommandErrors` option.),
  • when you are using fan-out mechanism and commands are handled multiple times,

Types

type Backend

type Backend[Result any] interface {
	ListenForNotifications(ctx context.Context, params BackendListenForNotificationsParams) (<-chan Reply[Result], error)
	OnCommandProcessed(ctx context.Context, params BackendOnCommandProcessedParams[Result]) error
}

type BackendListenForNotificationsParams

type BackendListenForNotificationsParams struct {
	Command     any
	OperationID OperationID
}

type BackendOnCommandProcessedParams

type BackendOnCommandProcessedParams[Result any] struct {
	Command        any
	CommandMessage *message.Message

	HandlerResult Result
	HandleErr     error
}

type BackendPubsubJSONMarshaler

type BackendPubsubJSONMarshaler[Result any] struct{}

func (BackendPubsubJSONMarshaler[Result]) MarshalReply

func (m BackendPubsubJSONMarshaler[Result]) MarshalReply(
	params BackendOnCommandProcessedParams[Result],
) (*message.Message, error)

func (BackendPubsubJSONMarshaler[Result]) UnmarshalReply

func (m BackendPubsubJSONMarshaler[Result]) UnmarshalReply(msg *message.Message) (Reply[Result], error)

type BackendPubsubMarshaler

type BackendPubsubMarshaler[Result any] interface {
	MarshalReply(params BackendOnCommandProcessedParams[Result]) (*message.Message, error)
	UnmarshalReply(msg *message.Message) (reply Reply[Result], err error)
}

type CommandBus

type CommandBus interface {
	SendWithModifiedMessage(ctx context.Context, cmd any, modify func(*message.Message) error) error
}

type CommandHandlerError

type CommandHandlerError struct {
	Err error
}

CommandHandlerError is returned when the command handler returns an error.

func (CommandHandlerError) Error

func (e CommandHandlerError) Error() string

func (CommandHandlerError) Unwrap

func (e CommandHandlerError) Unwrap() error

type NoResult

type NoResult = struct{}

NoResult is a result type for commands that don't have result.

type OperationID

type OperationID string

OperationID is a unique identifier of a command. It correlates commands with replies between the bus and the handler.

type PubSubBackend

type PubSubBackend[Result any] struct {
	// contains filtered or unexported fields
}

PubSubBackend is a Backend that uses Pub/Sub to transport commands and replies.

func NewPubSubBackend

func NewPubSubBackend[Result any](
	config PubSubBackendConfig,
	marshaler BackendPubsubMarshaler[Result],
) (*PubSubBackend[Result], error)

NewPubSubBackend creates a new PubSubBackend.

If you want to use backend together with `NewCommandHandler` (without result), you should pass `NoResult` or `struct{}` as Result type.

func (PubSubBackend[Result]) ListenForNotifications

func (p PubSubBackend[Result]) ListenForNotifications(
	ctx context.Context,
	params BackendListenForNotificationsParams,
) (<-chan Reply[Result], error)

func (PubSubBackend[Result]) OnCommandProcessed

func (p PubSubBackend[Result]) OnCommandProcessed(ctx context.Context, params BackendOnCommandProcessedParams[Result]) error

type PubSubBackendConfig

type PubSubBackendConfig struct {
	Publisher             message.Publisher
	SubscriberConstructor PubSubBackendSubscriberConstructorFn

	GeneratePublishTopic   PubSubBackendGeneratePublishTopicFn
	GenerateSubscribeTopic PubSubBackendGenerateSubscribeTopicFn

	Logger watermill.LoggerAdapter

	ListenForReplyTimeout *time.Duration

	ModifyNotificationMessage PubSubBackendModifyNotificationMessageFn

	OnListenForReplyFinished PubSubBackendOnListenForReplyFinishedFn

	// AckCommandErrors determines if the command should be acked or nacked when handler returns an error.
	// Command will be nacked by default when sending reply fails, you can control this behaviour with the
	// ReplyPublishErrorHandler config option.
	// You should use this option instead of cqrs.CommandProcessorConfig.AckCommandHandlingErrors, as it's aware
	// if error was returned by handler or sending reply failed.
	AckCommandErrors bool

	// ReplyPublishErrorHandler if not nil will be invoked when sending the reply fails. If it returns an error
	// the command will be nacked.
	ReplyPublishErrorHandler ReplyPublishErrorHandler
}

func (*PubSubBackendConfig) Validate

func (p *PubSubBackendConfig) Validate() error

type PubSubBackendGeneratePublishTopicFn

type PubSubBackendGeneratePublishTopicFn func(PubSubBackendPublishParams) (string, error)

type PubSubBackendGenerateSubscribeTopicFn

type PubSubBackendGenerateSubscribeTopicFn func(PubSubBackendSubscribeParams) (string, error)

type PubSubBackendModifyNotificationMessageFn

type PubSubBackendModifyNotificationMessageFn func(msg *message.Message, params PubSubBackendOnCommandProcessedParams) error

type PubSubBackendOnCommandProcessedParams

type PubSubBackendOnCommandProcessedParams struct {
	HandleErr error

	PubSubBackendPublishParams
}

type PubSubBackendOnListenForReplyFinishedFn

type PubSubBackendOnListenForReplyFinishedFn func(ctx context.Context, params PubSubBackendSubscribeParams)

type PubSubBackendPublishParams

type PubSubBackendPublishParams struct {
	Command any

	CommandMessage *message.Message

	OperationID OperationID
}

type PubSubBackendSubscribeParams

type PubSubBackendSubscribeParams struct {
	Command any

	OperationID OperationID
}

type PubSubBackendSubscriberConstructorFn

type PubSubBackendSubscriberConstructorFn func(PubSubBackendSubscribeParams) (message.Subscriber, error)

type Reply

type Reply[Result any] struct {
	// HandlerResult contains the handler result.
	// It's preset only when NewCommandHandlerWithResult is used. If NewCommandHandler is used, HandlerResult is empty.
	//
	// Result is sent even if the handler returns an error.
	HandlerResult Result

	// Error contains the error returned by the command handler or the Backend when handling notification fails.
	// Handling the notification can fail, for example, when unmarshaling the message or if there's a timeout.
	// If listening for a reply times out or the context is canceled, the Error is ReplyTimeoutError.
	//
	// If an error from the handler is returned, CommandHandlerError is returned.
	// If processing was successful, Error is nil.
	Error error

	// NotificationMessage contains the notification message sent after the command is handled.
	// It's present only if the request/reply backend uses a Pub/Sub for notifications (for example, PubSubBackend).
	//
	// Warning: NotificationMessage is nil if a timeout occurs.
	NotificationMessage *message.Message
}

func SendWithReply

func SendWithReply[Result any](
	ctx context.Context,
	c CommandBus,
	backend Backend[Result],
	cmd any,
) (Reply[Result], error)

SendWithReply sends command to the command bus and receives a replies of the command handler. It returns a channel with replies, cancel function and error. If more than one replies are sent, only the first which is received is returned.

If you expect multiple replies, please use SendWithReplies instead.

SendWithReply is blocking until the first reply is received or the context is canceled. SendWithReply can be cancelled by cancelling context or by exceeding the timeout set in the backend (if set).

SendWithReply can listen for handlers with results (NewCommandHandlerWithResult) and without results (NewCommandHandler). If you are listening for handlers without results, you should pass `NoResult` or `struct{}` as `Result` generic type:

 reply, err := requestreply.SendWithReply[requestreply.NoResult](
		context.Background(),
		ts.CommandBus,
		ts.RequestReplyBackend,
		&TestCommand{ID: "1"},
	)

If `NewCommandHandlerWithResult` handler returns a specific type, you should pass it as `Result` generic type:

 reply, err := requestreply.SendWithReply[SomeTypeReturnedByHandler](
		context.Background(),
		ts.CommandBus,
		ts.RequestReplyBackend,
		&TestCommand{ID: "1"},
	)

type ReplyPublishErrorHandler added in v1.3.6

type ReplyPublishErrorHandler func(replyTopic string, notificationMsg *message.Message, err error) error

type ReplyTimeoutError

type ReplyTimeoutError struct {
	Duration time.Duration
	Err      error
}

ReplyTimeoutError is returned when the reply timeout is exceeded.

func (ReplyTimeoutError) Error

func (e ReplyTimeoutError) Error() string

type ReplyUnmarshalError

type ReplyUnmarshalError struct {
	Err error
}

func (ReplyUnmarshalError) Error

func (r ReplyUnmarshalError) Error() string

func (ReplyUnmarshalError) Unwrap

func (r ReplyUnmarshalError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL