Documentation ¶
Overview ¶
Package cmdbus provides a distributed & event-driven Command Bus.
Index ¶
- Constants
- Variables
- func RegisterEvents(r codec.Registerer)
- type Bus
- func (b *Bus[ErrorCode]) Dispatch(ctx context.Context, cmd command.Command, opts ...command.DispatchOption) (err error)
- func (b *Bus[ErrorCode]) Run(ctx context.Context) (<-chan error, error)
- func (b *Bus[ErrorCode]) Subscribe(ctx context.Context, names ...string) (<-chan command.Ctx[any], <-chan error, error)
- type CommandAcceptedData
- type CommandAssignedData
- type CommandDispatchedData
- type CommandExecutedData
- type CommandRequestedData
- type ExecutionError
- type Option
Constants ¶
const ( // DefaultAssignTimeout is the default timeout for assigning a command to a // handler. If a command is not assigned to a handler within this timeout, // the command bus returns an error that unwraps to ErrAssignTimeout. // The default timeout is 5s. A zero Duration means no timeout. DefaultAssignTimeout = 5 * time.Second // DefaultReceiveTimeout is the default timeout for receiving commands from // the command bus. If the command is not received within this timeout, the // command bus returns an error that unwraps to ErrReceiveTimeout. // The default timeout is 10s. A zero Duration means no timeout. DefaultReceiveTimeout = 10 * time.Second )
const ( // CommandDispatched is published by a Bus to dispatch a Command. CommandDispatched = "goes.command.dispatched" // CommandRequested is published by a Bus to show interest in a dispatched // Command. CommandRequested = "goes.command.requested" // CommandAssigned is published by a Bus to assign a dispatched Command to a // Handler. CommandAssigned = "goes.command.assigned" // CommandAccepted is published by a Bus to notify other Buses that a // Command has been accepted. CommandAccepted = "goes.command.accepted" // CommandExecuted is published by a Bus to notify other Buses that a // Command has been executed. CommandExecuted = "goes.command.executed" )
Variables ¶
var ( // ErrAssignTimeout is returned by a Bus when it fails to assign a Command // to a Handler before a given deadline. ErrAssignTimeout = errors.New("failed to assign command because of timeout") // ErrReceiveTimeout is emitted by a Bus when the DrainTimeout is exceeded // when receiving remaining Commands from a canceled Command subscription. ErrReceiveTimeout = errors.New("command dropped because of receive timeout") // Deprecated: Use ErrReceiveTimeout instead. ErrDrainTimeout = ErrReceiveTimeout // ErrAlreadySubscribed is returned when trying to subscribe to the same // commands more than once within a single command bus. ErrSubscribed = errors.New("already subscribed to command") )
Functions ¶
func RegisterEvents ¶
func RegisterEvents(r codec.Registerer)
RegisterEvents registers the command events into a Registry.
Types ¶
type Bus ¶
type Bus[ErrorCode constraints.Integer] struct { *handler.Handler // contains filtered or unexported fields }
Bus is an event-driven Command Bus.
func New ¶
func New[ErrorCode constraints.Integer](enc codec.Encoding, events event.Bus, opts ...Option) *Bus[ErrorCode]
New returns an event-driven command bus.
func (*Bus[ErrorCode]) Dispatch ¶
func (b *Bus[ErrorCode]) Dispatch(ctx context.Context, cmd command.Command, opts ...command.DispatchOption) (err error)
Dispatch dispatches a Command to the appropriate handler (Command Bus) using the underlying event Bus to communicate between b and the other Command Buses.
How it works ¶
Dispatch first publishes a CommandDispatched event with the Command Payload encoded in the event Data. Every Command Bus that is currently subscribed to a Command receives the CommandDispatched event and checks if it handles Commands that have the name of the dispatched Command.
If a Command Bus doesn't handle Commands with that name, they just ignore the CommandDispatched event, but if they're instructed to handle such Commands, they tell the Bus b that they want to handle the Command by publishing a CommandRequested event which the Bus b will listen for.
The first of those CommandRequested events that the Bus b receives is used to assign the Command to a Handler. When b receives the first CommandRequested Event, it publishes a CommandAssigned event with the ID of the selected Handler.
The handler Command Buses receive the CommandAssigned event and check if they're Handler that is assigned to the Command. The assigned Handler then publishes a final CommandAccepted event to tell the Bus b that the Command arrived at its Handler.
Errors ¶
By default, the error returned by Dispatch doesn't give any information about the execution of the Command because the Bus returns as soon as another Bus accepts a dispatched Command.
To handle errors that happen during the execution of Commands, use the dispatch.Sync() Option to make the dispatch synchronous. A synchronous dispatch waits for and returns the execution error from the executing Bus.
Errors that happen during a synchronous excecution are then also returned by Dispatch as an *ExecutionError. Call ExecError with that error as the argument to unwrap the underlying *ExecutionError:
var b command.Bus err := b.Dispatch(context.TODO(), command.New(...)) if execError, ok := cmdbus.ExecError(err); ok { log.Println(execError.Cmd) log.Println(execError.Err) }
Execution result ¶
By default, Dispatch does not return information about the execution of a Command, but a report.Reporter can be provided with the dispatch.Report() Option. When a Reporter is provided, the dispatch is automatically made synchronous.
Example:
var rep report.Report var cmd command.Command err := b.Dispatch(context.TODO(), cmd, dispatch.Report(&rep)) log.Println(fmt.Sprintf("Command: %v", rep.Command())) log.Println(fmt.Sprintf("Runtime: %v", rep.Runtime())) log.Println(fmt.Sprintf("Error: %v", err))
func (*Bus[ErrorCode]) Run ¶ added in v0.1.2
Run runs the command bus until ctx is canceled. If the bus is used before Run has been called, Run will be called automtically and the errors are logged to stderr.
func (*Bus[ErrorCode]) Subscribe ¶
func (b *Bus[ErrorCode]) Subscribe(ctx context.Context, names ...string) (<-chan command.Ctx[any], <-chan error, error)
Subscribe returns a channel of Command Contexts and an error channel. The Context channel channel is registered as a handler for Commands which have one of the specified names.
Callers of Subscribe are responsible for receiving from the returned error channel to prevent a deadlock.
When a Command Bus, which uses the same underlying event Bus as Bus b, dispatches a Command, Bus b tries to assign itself as the handler for that Command. If b is assigned as the handler, a Command Context can be received from the returned channel.
It is guaranteed that only one Command Bus will handle a single Command; when a Command is received from the Context channel, no other Context channel will receive that Command.
When ctx is canceled, the remaining Commands that have already been received are pushed into the Context channel before it is closed. Use the DrainTimeout Option to specify the timeout after which the remaining Commands are being discarded.
type CommandAcceptedData ¶
CommandAcceptedData is the event Data for the CommandAccepted Event.
type CommandAssignedData ¶
CommandAssignedData is the event Data for the CommandAssigned Event.
type CommandDispatchedData ¶
type CommandDispatchedData struct { // ID is the unique Command ID. ID uuid.UUID // Name is the name of the Command. Name string // AggregateName is the name of the aggregate the Command belongs to. // (optional) AggregateName string // AggregateID is the ID of the aggregate the Command belongs to. (optional) AggregateID uuid.UUID // Payload is the encoded domain-specific Command Payload. Payload []byte }
CommandDispatchedData is the event Data for the CommandDispatched Event.
type CommandExecutedData ¶
type CommandExecutedData struct { ID uuid.UUID Runtime time.Duration Error []byte // *google.protobuf.Any }
CommandExecutedData is the event Data for the CommandExecuted Event.
type CommandRequestedData ¶
CommandRequestedData is the event Data for the CommandRequested Event.
type ExecutionError ¶
ExecutionError is the error returned by a Bus when doing a synchronous dispatch and the execution of the Command fails.
func ExecError ¶
func ExecError[P any](err error) (*ExecutionError[P], bool)
ExecError unwraps err as an *ExecutionError.
func (*ExecutionError[P]) Error ¶
func (err *ExecutionError[P]) Error() string
Error returns a string representation of the error. The string includes the name of the command and the underlying error that caused the execution to fail.
func (*ExecutionError[P]) Unwrap ¶
func (err *ExecutionError[P]) Unwrap() error
Unwrap returns the underlying error wrapped by *ExecutionError[P]. It implements the Unwrap method defined in the Go 1.13 error package errors.Unwrap.
type Option ¶
type Option func(*options)
Option is a command bus option.
func AssignTimeout ¶
AssignTimeout returns an Option that configures the timeout when assigning a Command to a Handler. A zero Duration means no timeout.
A zero Duration means no timeout. The default timeout is 5s.
func DrainTimeout
deprecated
func Filter ¶ added in v0.2.10
Filter returns an Option that adds a filter to the command bus. Filters allow you to restrict the commands that are handled by the bus: By default, the bus handles all commands that it's subscribed to. Filters are called before the bus assigns itself as the handler of a command. If any of the registered filters returns false for a command, the command will not be handled by the bus.
func ReceiveTimeout ¶ added in v0.1.2
ReceiveTimeout returns an Option that configures the timeout for receiving a command context from the command bus. If the command is not received from the returned channel within the configured timeout, the command is dropped.
A zero Duration means no timeout. The default timeout is 10s.