cmdbus

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2022 License: MIT Imports: 16 Imported by: 1

Documentation

Overview

Package cmdbus provides a distributed & event-driven Command Bus.

Index

Constants

View Source
const (
	// DefaultAssignTimeout is the default timeout when assigning a Command to a
	// Handler.
	DefaultAssignTimeout = 5 * time.Second

	// DefaultDrainTimeout is the default timeout for accepting Commands after
	// the used context is canceled.
	DefaultDrainTimeout = 10 * time.Second
)
View Source
const (
	// CommandDispatched is published by a Bus to dispatch a Command.
	CommandDispatched = "goes.command.dispatched"

	// CommandRequested is published by a Bus to show interest in a dispatched
	// Command.
	CommandRequested = "goes.command.requested"

	// CommandAssigned is published by a Bus to assign a dispatched Command to a
	// Handler.
	CommandAssigned = "goes.command.assigned"

	// CommandAccepted is published by a Bus to notify other Buses that a
	// Command has been accepted.
	CommandAccepted = "goes.command.accepted"

	// CommandExecuted is published by a Bus to notify other Buses that a
	// Command has been executed.
	CommandExecuted = "goes.command.executed"
)

Variables

View Source
var (
	// ErrAssignTimeout is returned by a Bus when it fails to assign a Command
	// to a Handler before a given deadline.
	ErrAssignTimeout = errors.New("failed to assign command because of timeout")

	// ErrDrainTimeout is emitted by a Bus when the DrainTimeout is exceeded
	// when receiving remaining Commands from a canceled Command subscription.
	ErrDrainTimeout = errors.New("dropped command because of timeout")

	// ErrDispatchCanceled is returned by a Bus when the dispatch was canceled
	// by the provided Context.
	ErrDispatchCanceled = errors.New("dispatch canceled")
)

Functions

func RegisterEvents

func RegisterEvents(reg *codec.Registry)

RegisterEvents registers the command events into a Registry.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is an Event-driven Command Bus.

func New

func New(cmdEnc codec.Encoding, eventReg *codec.Registry, events event.Bus, opts ...Option) *Bus

New returns an event-driven command bus.

func (*Bus) Dispatch

func (b *Bus) Dispatch(ctx context.Context, cmd command.Command, opts ...command.DispatchOption) error

Dispatch dispatches a Command to the appropriate handler (Command Bus) using the underlying Event Bus to communicate between b and the other Command Buses.

How it works

Dispatch first publishes a CommandDispatched Event with the Command Payload encoded in the Event Data. Every Command Bus that is currently subscribed to a Command receives the CommandDispatched Event and checks if it handles Commands that have the name of the dispatched Command.

If a Command Bus doesn't handle Commands with that name, they just ignore the CommandDispatched Event, but if they're instructed to handle such Commands, they tell the Bus b that they want to handle the Command by publishing a CommandRequested Event which the Bus b will listen for.

The first of those CommandRequested Events that the Bus b receives is used to assign the Command to a Handler. When b receives the first CommandRequested Event, it publishes a CommandAssigned Event with the ID of the selected Handler.

The handler Command Buses receive the CommandAssigned Event and check if they're Handler that is assigned to the Command. The assigned Handler then publishes a final CommandAccepted Event to tell the Bus b that the Command arrived at its Handler.

Errors

By default, the error returned by Dispatch doesn't give any information about the execution of the Command because the Bus returns as soon as another Bus accepts a dispatched Command.

To handle errors that happen during the execution of Commands, use the dispatch.Sync() Option to make the dispatch synchronous. A synchronous dispatch waits for and returns the execution error from the executing Bus.

Errors that happen during a synchronous excecution are then also returned by Dispatch as an *ExecutionError. Call ExecError with that error as the argument to unwrap the underlying *ExecutionError:

var b command.Bus
err := b.Dispatch(context.TODO(), command.New(...))
if execError, ok := cmdbus.ExecError(err); ok {
	log.Println(execError.Cmd)
	log.Println(execError.Err)
}

Execution result

By default, Dispatch does not return information about the execution of a Command, but a report.Reporter can be provided with the dispatch.Report() Option. When a Reporter is provided, the dispatch is automatically made synchronous.

Example:

var rep report.Report
var cmd command.Command
err := b.Dispatch(context.TODO(), cmd, dispatch.Report(&rep))
log.Println(fmt.Sprintf("Command: %v", rep.Command()))
log.Println(fmt.Sprintf("Runtime: %v", rep.Runtime()))
log.Println(fmt.Sprintf("Error: %v", err))

func (*Bus) Subscribe

func (b *Bus) Subscribe(ctx context.Context, names ...string) (<-chan command.Context, <-chan error, error)

Subscribe returns a channel of Command Contexts and an error channel. The Context channel channel is registered as a handler for Commands which have one of the specified names.

Callers of Subscribe are responsible for receiving from the returned error channel to prevent a deadlock.

When a Command Bus, which uses the same underlying Event Bus as Bus b, dispatches a Command, Bus b tries to assign itself as the handler for that Command. If b is assigned as the handler, a Command Context can be received from the returned channel.

It is guaranteed that only one Command Bus will handle a single Command; when a Command is received from the Context channel, no other Context channel will receive that Command.

When ctx is canceled, the remaining Commands that have already been received are pushed into the Context channel before it is closed. Use the DrainTimeout Option to specify the timeout after which the remaining Commands are being discarded.

type CommandAcceptedData

type CommandAcceptedData struct {
	ID        uuid.UUID
	HandlerID uuid.UUID
}

CommandAcceptedData is the Event Data for the CommandAccepted Event.

type CommandAssignedData

type CommandAssignedData struct {
	ID        uuid.UUID
	HandlerID uuid.UUID
}

CommandAssignedData is the Event Data for the CommandAssigned Event.

type CommandDispatchedData

type CommandDispatchedData struct {
	// ID is the unique Command ID.
	ID uuid.UUID

	// Name is the name of the Command.
	Name string

	// AggregateName is the name of the  Aggregate the Command belongs to.
	// (optional)
	AggregateName string

	// AggregateID is the ID of the Aggregate the Command belongs to. (optional)
	AggregateID uuid.UUID

	// Payload is the encoded domain-specific Command Payload.
	Payload []byte
}

CommandDispatchedData is the Event Data for the CommandDispatched Event.

type CommandExecutedData

type CommandExecutedData struct {
	ID      uuid.UUID
	Runtime time.Duration
	Error   string
}

CommandExecutedData is the Event Data for the CommandExecuted Event.

type CommandRequestedData

type CommandRequestedData struct {
	ID        uuid.UUID
	HandlerID uuid.UUID
}

CommandRequestedData is the Event Data for the CommandRequested Event.

type ExecutionError

type ExecutionError struct {
	Cmd command.Command
	Err error
}

ExecutionError is the error returned by a Bus when doing a synchronous dispatch and the execution of the Command fails.

func ExecError

func ExecError(err error) (*ExecutionError, bool)

ExecError unwraps err as an *ExecutionError.

func (*ExecutionError) Error

func (err *ExecutionError) Error() string

func (*ExecutionError) Unwrap

func (err *ExecutionError) Unwrap() error

type Option

type Option func(*Bus)

Option is a Command Bus option.

func AssignTimeout

func AssignTimeout(dur time.Duration) Option

AssignTimeout returns an Option that configures the timeout when assigning a Command to a Handler. A zero Duration means no timeout.

A zero Duration means no timeout. The default timeout is 5s.

func Debug

func Debug(id string) Option

Debug enables verbose logging for debugging purposes. Optional id may be specified to annotate debug output.

func DrainTimeout

func DrainTimeout(dur time.Duration) Option

DrainTimeout returns an Option that configures the timeout when accepting the remaining Commands after the Context that's used to subscribe to Commands is canceled.

A zero Duration means no timeout. The default timeout is 10s.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL