Documentation ¶
Overview ¶
Package cmdbus provides a distributed & event-driven Command Bus.
Index ¶
Constants ¶
const ( // DefaultAssignTimeout is the default timeout when assigning a Command to a // Handler. DefaultAssignTimeout = 5 * time.Second // DefaultDrainTimeout is the default timeout for accepting Commands after // the used context is canceled. DefaultDrainTimeout = 10 * time.Second )
const ( // CommandDispatched is published by a Bus to dispatch a Command. CommandDispatched = "goes.command.dispatched" // CommandRequested is published by a Bus to show interest in a dispatched // Command. CommandRequested = "goes.command.requested" // CommandAssigned is published by a Bus to assign a dispatched Command to a // Handler. CommandAssigned = "goes.command.assigned" // CommandAccepted is published by a Bus to notify other Buses that a // Command has been accepted. CommandAccepted = "goes.command.accepted" // CommandExecuted is published by a Bus to notify other Buses that a // Command has been executed. CommandExecuted = "goes.command.executed" )
Variables ¶
var ( // ErrAssignTimeout is returned by a Bus when it fails to assign a Command // to a Handler before a given deadline. ErrAssignTimeout = errors.New("failed to assign command because of timeout") // ErrDrainTimeout is emitted by a Bus when the DrainTimeout is exceeded // when receiving remaining Commands from a canceled Command subscription. ErrDrainTimeout = errors.New("dropped command because of timeout") // ErrDispatchCanceled is returned by a Bus when the dispatch was canceled // by the provided Context. ErrDispatchCanceled = errors.New("dispatch canceled") )
Functions ¶
func RegisterEvents ¶
RegisterEvents registers the command events into a Registry.
Types ¶
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus is an Event-driven Command Bus.
func (*Bus) Dispatch ¶
func (b *Bus) Dispatch(ctx context.Context, cmd command.Command, opts ...command.DispatchOption) error
Dispatch dispatches a Command to the appropriate handler (Command Bus) using the underlying Event Bus to communicate between b and the other Command Buses.
How it works ¶
Dispatch first publishes a CommandDispatched Event with the Command Payload encoded in the Event Data. Every Command Bus that is currently subscribed to a Command receives the CommandDispatched Event and checks if it handles Commands that have the name of the dispatched Command.
If a Command Bus doesn't handle Commands with that name, they just ignore the CommandDispatched Event, but if they're instructed to handle such Commands, they tell the Bus b that they want to handle the Command by publishing a CommandRequested Event which the Bus b will listen for.
The first of those CommandRequested Events that the Bus b receives is used to assign the Command to a Handler. When b receives the first CommandRequested Event, it publishes a CommandAssigned Event with the ID of the selected Handler.
The handler Command Buses receive the CommandAssigned Event and check if they're Handler that is assigned to the Command. The assigned Handler then publishes a final CommandAccepted Event to tell the Bus b that the Command arrived at its Handler.
Errors ¶
By default, the error returned by Dispatch doesn't give any information about the execution of the Command because the Bus returns as soon as another Bus accepts a dispatched Command.
To handle errors that happen during the execution of Commands, use the dispatch.Sync() Option to make the dispatch synchronous. A synchronous dispatch waits for and returns the execution error from the executing Bus.
Errors that happen during a synchronous excecution are then also returned by Dispatch as an *ExecutionError. Call ExecError with that error as the argument to unwrap the underlying *ExecutionError:
var b command.Bus err := b.Dispatch(context.TODO(), command.New(...)) if execError, ok := cmdbus.ExecError(err); ok { log.Println(execError.Cmd) log.Println(execError.Err) }
Execution result ¶
By default, Dispatch does not return information about the execution of a Command, but a report.Reporter can be provided with the dispatch.Report() Option. When a Reporter is provided, the dispatch is automatically made synchronous.
Example:
var rep report.Report var cmd command.Command err := b.Dispatch(context.TODO(), cmd, dispatch.Report(&rep)) log.Println(fmt.Sprintf("Command: %v", rep.Command())) log.Println(fmt.Sprintf("Runtime: %v", rep.Runtime())) log.Println(fmt.Sprintf("Error: %v", err))
func (*Bus) Subscribe ¶
func (b *Bus) Subscribe(ctx context.Context, names ...string) (<-chan command.Context, <-chan error, error)
Subscribe returns a channel of Command Contexts and an error channel. The Context channel channel is registered as a handler for Commands which have one of the specified names.
Callers of Subscribe are responsible for receiving from the returned error channel to prevent a deadlock.
When a Command Bus, which uses the same underlying Event Bus as Bus b, dispatches a Command, Bus b tries to assign itself as the handler for that Command. If b is assigned as the handler, a Command Context can be received from the returned channel.
It is guaranteed that only one Command Bus will handle a single Command; when a Command is received from the Context channel, no other Context channel will receive that Command.
When ctx is canceled, the remaining Commands that have already been received are pushed into the Context channel before it is closed. Use the DrainTimeout Option to specify the timeout after which the remaining Commands are being discarded.
type CommandAcceptedData ¶
CommandAcceptedData is the Event Data for the CommandAccepted Event.
type CommandAssignedData ¶
CommandAssignedData is the Event Data for the CommandAssigned Event.
type CommandDispatchedData ¶
type CommandDispatchedData struct { // ID is the unique Command ID. ID uuid.UUID // Name is the name of the Command. Name string // AggregateName is the name of the Aggregate the Command belongs to. // (optional) AggregateName string // AggregateID is the ID of the Aggregate the Command belongs to. (optional) AggregateID uuid.UUID // Payload is the encoded domain-specific Command Payload. Payload []byte }
CommandDispatchedData is the Event Data for the CommandDispatched Event.
type CommandExecutedData ¶
CommandExecutedData is the Event Data for the CommandExecuted Event.
type CommandRequestedData ¶
CommandRequestedData is the Event Data for the CommandRequested Event.
type ExecutionError ¶
ExecutionError is the error returned by a Bus when doing a synchronous dispatch and the execution of the Command fails.
func ExecError ¶
func ExecError(err error) (*ExecutionError, bool)
ExecError unwraps err as an *ExecutionError.
func (*ExecutionError) Error ¶
func (err *ExecutionError) Error() string
func (*ExecutionError) Unwrap ¶
func (err *ExecutionError) Unwrap() error
type Option ¶
type Option func(*Bus)
Option is a Command Bus option.
func AssignTimeout ¶
AssignTimeout returns an Option that configures the timeout when assigning a Command to a Handler. A zero Duration means no timeout.
A zero Duration means no timeout. The default timeout is 5s.
func Debug ¶
Debug enables verbose logging for debugging purposes. Optional id may be specified to annotate debug output.
func DrainTimeout ¶
DrainTimeout returns an Option that configures the timeout when accepting the remaining Commands after the Context that's used to subscribe to Commands is canceled.
A zero Duration means no timeout. The default timeout is 10s.