Documentation ¶
Index ¶
- Constants
- func NewCommandHandler[Command any](handlerName string, backend Backend[struct{}], ...) cqrs.CommandHandler
- func NewCommandHandlerWithResult[Command any, Result any](handlerName string, backend Backend[Result], ...) cqrs.CommandHandler
- func SendWithReplies[Result any](ctx context.Context, c CommandBus, backend Backend[Result], cmd any) (replCh <-chan Reply[Result], cancel func(), err error)
- type Backend
- type BackendListenForNotificationsParams
- type BackendOnCommandProcessedParams
- type BackendPubsubJSONMarshaler
- type BackendPubsubMarshaler
- type CommandBus
- type CommandHandlerError
- type NoResult
- type OperationID
- type PubSubBackend
- type PubSubBackendConfig
- type PubSubBackendGeneratePublishTopicFn
- type PubSubBackendGenerateSubscribeTopicFn
- type PubSubBackendModifyNotificationMessageFn
- type PubSubBackendOnCommandProcessedParams
- type PubSubBackendOnListenForReplyFinishedFn
- type PubSubBackendPublishParams
- type PubSubBackendSubscribeParams
- type PubSubBackendSubscriberConstructorFn
- type Reply
- type ReplyTimeoutError
- type ReplyUnmarshalError
Constants ¶
const ( ErrorMetadataKey = "_watermill_requestreply_error" HasErrorMetadataKey = "_watermill_requestreply_has_error" )
const OperationIDMetadataKey = "_watermill_requestreply_op_id"
Variables ¶
This section is empty.
Functions ¶
func NewCommandHandler ¶
func NewCommandHandler[Command any]( handlerName string, backend Backend[struct{}], handleFunc func(ctx context.Context, cmd *Command) error, ) cqrs.CommandHandler
NewCommandHandler creates a new CommandHandler which supports the request-reply pattern. The result handler is handler compatible with cqrs.CommandHandler.
The logic if a command should be acked or not is based on the logic of the Backend. For example, for the PubSubBackend, it depends on the `PubSubBackendConfig.AckCommandErrors` option.
func NewCommandHandlerWithResult ¶
func NewCommandHandlerWithResult[Command any, Result any]( handlerName string, backend Backend[Result], handleFunc func(ctx context.Context, cmd *Command) (Result, error), ) cqrs.CommandHandler
NewCommandHandlerWithResult creates a new CommandHandler which supports the request-reply pattern with a result. The result handler is handler compatible with cqrs.CommandHandler.
In addition to cqrs.CommandHandler, it also allows returninga result from the handler. The result is passed to the Backend implementation and sent to the caller.
The logic if a command should be acked or not is based on the logic of the Backend. For example, for the PubSubBackend, it depends on the `PubSubBackendConfig.AckCommandErrors` option.
The reply is sent to the caller, even if the handler returns an error.
func SendWithReplies ¶
func SendWithReplies[Result any]( ctx context.Context, c CommandBus, backend Backend[Result], cmd any, ) (replCh <-chan Reply[Result], cancel func(), err error)
SendWithReplies sends command to the command bus and receives a replies of the command handler. It returns a channel with replies, cancel function and error.
SendWithReplies can be cancelled by calling cancel function or by cancelling context or When SendWithReplies is canceled, the returned channel is closed as well. by exceeding the timeout set in the backend (if set). Warning: It's important to cancel the function, because it's listening for the replies in the background. Lack of cancelling the function can lead to subscriber leak.
SendWithReplies can listen for handlers with results (NewCommandHandlerWithResult) and without results (NewCommandHandler). If you are listening for handlers without results, you should pass `NoResult` or `struct{}` as `Result` generic type:
replyCh, cancel, err := requestreply.SendWithReplies[requestreply.NoResult]( context.Background(), ts.CommandBus, ts.RequestReplyBackend, &TestCommand{ID: "1"}, )
If `NewCommandHandlerWithResult` handler returns a specific type, you should pass it as `Result` generic type:
replyCh, cancel, err := requestreply.SendWithReplies[SomeTypeReturnedByHandler]( context.Background(), ts.CommandBus, ts.RequestReplyBackend, &TestCommand{ID: "1"}, )
SendWithReplies will send the replies to the channel until the context is cancelled or the timeout is exceeded. They are multiple cases when more than one reply can be sent:
- when the handler returns an error, and backend is configured to nack the message on error (for the PubSubBackend, it depends on `PubSubBackendConfig.AckCommandErrors` option.),
- when you are using fan-out mechanism and commands are handled multiple times,
Types ¶
type Backend ¶
type Backend[Result any] interface { ListenForNotifications(ctx context.Context, params BackendListenForNotificationsParams) (<-chan Reply[Result], error) OnCommandProcessed(ctx context.Context, params BackendOnCommandProcessedParams[Result]) error }
type BackendListenForNotificationsParams ¶
type BackendListenForNotificationsParams struct { Command any OperationID OperationID }
type BackendPubsubJSONMarshaler ¶
type BackendPubsubJSONMarshaler[Result any] struct{}
func (BackendPubsubJSONMarshaler[Result]) MarshalReply ¶
func (m BackendPubsubJSONMarshaler[Result]) MarshalReply( params BackendOnCommandProcessedParams[Result], ) (*message.Message, error)
func (BackendPubsubJSONMarshaler[Result]) UnmarshalReply ¶
func (m BackendPubsubJSONMarshaler[Result]) UnmarshalReply(msg *message.Message) (Reply[Result], error)
type BackendPubsubMarshaler ¶
type CommandBus ¶
type CommandHandlerError ¶
type CommandHandlerError struct {
Err error
}
CommandHandlerError is returned when the command handler returns an error.
func (CommandHandlerError) Error ¶
func (e CommandHandlerError) Error() string
func (CommandHandlerError) Unwrap ¶
func (e CommandHandlerError) Unwrap() error
type NoResult ¶
type NoResult = struct{}
NoResult is a result type for commands that don't have result.
type OperationID ¶
type OperationID string
OperationID is a unique identifier of a command. It correlates commands with replies between the bus and the handler.
type PubSubBackend ¶
type PubSubBackend[Result any] struct { // contains filtered or unexported fields }
PubSubBackend is a Backend that uses Pub/Sub to transport commands and replies.
func NewPubSubBackend ¶
func NewPubSubBackend[Result any]( config PubSubBackendConfig, marshaler BackendPubsubMarshaler[Result], ) (*PubSubBackend[Result], error)
NewPubSubBackend creates a new PubSubBackend.
If you want to use backend together with `NewCommandHandler` (without result), you should pass `NoResult` or `struct{}` as Result type.
func (PubSubBackend[Result]) ListenForNotifications ¶
func (p PubSubBackend[Result]) ListenForNotifications( ctx context.Context, params BackendListenForNotificationsParams, ) (<-chan Reply[Result], error)
func (PubSubBackend[Result]) OnCommandProcessed ¶
func (p PubSubBackend[Result]) OnCommandProcessed(ctx context.Context, params BackendOnCommandProcessedParams[Result]) error
type PubSubBackendConfig ¶
type PubSubBackendConfig struct { Publisher message.Publisher SubscriberConstructor PubSubBackendSubscriberConstructorFn GeneratePublishTopic PubSubBackendGeneratePublishTopicFn GenerateSubscribeTopic PubSubBackendGenerateSubscribeTopicFn Logger watermill.LoggerAdapter ListenForReplyTimeout *time.Duration ModifyNotificationMessage PubSubBackendModifyNotificationMessageFn OnListenForReplyFinished PubSubBackendOnListenForReplyFinishedFn // AckCommandErrors determines if the command should be acked or nacked when handler returns an error. // Command will be always nacked, when sending reply fails. // You should use this option instead of cqrs.CommandProcessorConfig.AckCommandHandlingErrors, as it's aware // if error was returned by handler or sending reply failed. AckCommandErrors bool }
func (*PubSubBackendConfig) Validate ¶
func (p *PubSubBackendConfig) Validate() error
type PubSubBackendGeneratePublishTopicFn ¶
type PubSubBackendGeneratePublishTopicFn func(PubSubBackendPublishParams) (string, error)
type PubSubBackendGenerateSubscribeTopicFn ¶
type PubSubBackendGenerateSubscribeTopicFn func(PubSubBackendSubscribeParams) (string, error)
type PubSubBackendModifyNotificationMessageFn ¶
type PubSubBackendModifyNotificationMessageFn func(msg *message.Message, params PubSubBackendOnCommandProcessedParams) error
type PubSubBackendOnCommandProcessedParams ¶
type PubSubBackendOnCommandProcessedParams struct { HandleErr error PubSubBackendPublishParams }
type PubSubBackendOnListenForReplyFinishedFn ¶
type PubSubBackendOnListenForReplyFinishedFn func(ctx context.Context, params PubSubBackendSubscribeParams)
type PubSubBackendPublishParams ¶
type PubSubBackendPublishParams struct { Command any CommandMessage *message.Message OperationID OperationID }
type PubSubBackendSubscribeParams ¶
type PubSubBackendSubscribeParams struct { Command any OperationID OperationID }
type PubSubBackendSubscriberConstructorFn ¶
type PubSubBackendSubscriberConstructorFn func(PubSubBackendSubscribeParams) (message.Subscriber, error)
type Reply ¶
type Reply[Result any] struct { // HandlerResult contains the handler result. // It's preset only when NewCommandHandlerWithResult is used. If NewCommandHandler is used, HandlerResult is empty. // // Result is sent even if the handler returns an error. HandlerResult Result // Error contains the error returned by the command handler or the Backend when handling notification fails. // Handling the notification can fail, for example, when unmarshaling the message or if there's a timeout. // If listening for a reply times out or the context is canceled, the Error is ReplyTimeoutError. // // If an error from the handler is returned, CommandHandlerError is returned. // If processing was successful, Error is nil. Error error // NotificationMessage contains the notification message sent after the command is handled. // It's present only if the request/reply backend uses a Pub/Sub for notifications (for example, PubSubBackend). // // Warning: NotificationMessage is nil if a timeout occurs. NotificationMessage *message.Message }
func SendWithReply ¶
func SendWithReply[Result any]( ctx context.Context, c CommandBus, backend Backend[Result], cmd any, ) (Reply[Result], error)
SendWithReply sends command to the command bus and receives a replies of the command handler. It returns a channel with replies, cancel function and error. If more than one replies are sent, only the first which is received is returned.
If you expect multiple replies, please use SendWithReplies instead.
SendWithReply is blocking until the first reply is received or the context is canceled. SendWithReply can be cancelled by cancelling context or by exceeding the timeout set in the backend (if set).
SendWithReply can listen for handlers with results (NewCommandHandlerWithResult) and without results (NewCommandHandler). If you are listening for handlers without results, you should pass `NoResult` or `struct{}` as `Result` generic type:
reply, err := requestreply.SendWithReply[requestreply.NoResult]( context.Background(), ts.CommandBus, ts.RequestReplyBackend, &TestCommand{ID: "1"}, )
If `NewCommandHandlerWithResult` handler returns a specific type, you should pass it as `Result` generic type:
reply, err := requestreply.SendWithReply[SomeTypeReturnedByHandler]( context.Background(), ts.CommandBus, ts.RequestReplyBackend, &TestCommand{ID: "1"}, )
type ReplyTimeoutError ¶
ReplyTimeoutError is returned when the reply timeout is exceeded.
func (ReplyTimeoutError) Error ¶
func (e ReplyTimeoutError) Error() string
type ReplyUnmarshalError ¶
type ReplyUnmarshalError struct {
Err error
}
func (ReplyUnmarshalError) Error ¶
func (r ReplyUnmarshalError) Error() string
func (ReplyUnmarshalError) Unwrap ¶
func (r ReplyUnmarshalError) Unwrap() error