cmdbus

package
v0.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2022 License: Apache-2.0 Imports: 16 Imported by: 1

Documentation

Overview

Package cmdbus provides a distributed & event-driven Command Bus.

Index

Constants

View Source
const (
	// DefaultAssignTimeout is the default timeout for assigning a command to a
	// handler. If a command is not assigned to a handler within this timeout,
	// the command bus returns an error that unwraps to ErrAssignTimeout.
	// The default timeout is 5s. A zero Duration means no timeout.
	DefaultAssignTimeout = 5 * time.Second

	// DefaultReceiveTimeout is the default timeout for receiving commands from
	// the command bus. If the command is not received within this timeout, the
	// command bus returns an error that unwraps to ErrReceiveTimeout.
	// The default timeout is 10s. A zero Duration means no timeout.
	DefaultReceiveTimeout = 10 * time.Second
)
View Source
const (
	// CommandDispatched is published by a Bus to dispatch a Command.
	CommandDispatched = "goes.command.dispatched"

	// CommandRequested is published by a Bus to show interest in a dispatched
	// Command.
	CommandRequested = "goes.command.requested"

	// CommandAssigned is published by a Bus to assign a dispatched Command to a
	// Handler.
	CommandAssigned = "goes.command.assigned"

	// CommandAccepted is published by a Bus to notify other Buses that a
	// Command has been accepted.
	CommandAccepted = "goes.command.accepted"

	// CommandExecuted is published by a Bus to notify other Buses that a
	// Command has been executed.
	CommandExecuted = "goes.command.executed"
)

Variables

View Source
var (
	// ErrAssignTimeout is returned by a Bus when it fails to assign a Command
	// to a Handler before a given deadline.
	ErrAssignTimeout = errors.New("failed to assign command because of timeout")

	// ErrReceiveTimeout is emitted by a Bus when the DrainTimeout is exceeded
	// when receiving remaining Commands from a canceled Command subscription.
	ErrReceiveTimeout = errors.New("command dropped because of receive timeout")

	// Deprecated: Use ErrReceiveTimeout instead.
	ErrDrainTimeout = ErrReceiveTimeout

	// ErrAlreadySubscribed is returned when trying to subscribe to the same
	// commands more than once within a single command bus.
	ErrSubscribed = errors.New("already subscribed to command")
)

Functions

func RegisterEvents

func RegisterEvents(r codec.Registerer)

RegisterEvents registers the command events into a Registry.

Types

type Bus

type Bus struct {
	*handler.Handler
	// contains filtered or unexported fields
}

Bus is an event-driven Command Bus.

func New

func New(enc codec.Encoding, events event.Bus, opts ...Option) *Bus

New returns an event-driven command bus.

func (*Bus) Dispatch

func (b *Bus) Dispatch(ctx context.Context, cmd command.Command, opts ...command.DispatchOption) (err error)

Dispatch dispatches a Command to the appropriate handler (Command Bus) using the underlying event Bus to communicate between b and the other Command Buses.

How it works

Dispatch first publishes a CommandDispatched event with the Command Payload encoded in the event Data. Every Command Bus that is currently subscribed to a Command receives the CommandDispatched event and checks if it handles Commands that have the name of the dispatched Command.

If a Command Bus doesn't handle Commands with that name, they just ignore the CommandDispatched event, but if they're instructed to handle such Commands, they tell the Bus b that they want to handle the Command by publishing a CommandRequested event which the Bus b will listen for.

The first of those CommandRequested events that the Bus b receives is used to assign the Command to a Handler. When b receives the first CommandRequested Event, it publishes a CommandAssigned event with the ID of the selected Handler.

The handler Command Buses receive the CommandAssigned event and check if they're Handler that is assigned to the Command. The assigned Handler then publishes a final CommandAccepted event to tell the Bus b that the Command arrived at its Handler.

Errors

By default, the error returned by Dispatch doesn't give any information about the execution of the Command because the Bus returns as soon as another Bus accepts a dispatched Command.

To handle errors that happen during the execution of Commands, use the dispatch.Sync() Option to make the dispatch synchronous. A synchronous dispatch waits for and returns the execution error from the executing Bus.

Errors that happen during a synchronous excecution are then also returned by Dispatch as an *ExecutionError. Call ExecError with that error as the argument to unwrap the underlying *ExecutionError:

var b command.Bus
err := b.Dispatch(context.TODO(), command.New(...))
if execError, ok := cmdbus.ExecError(err); ok {
	log.Println(execError.Cmd)
	log.Println(execError.Err)
}

Execution result

By default, Dispatch does not return information about the execution of a Command, but a report.Reporter can be provided with the dispatch.Report() Option. When a Reporter is provided, the dispatch is automatically made synchronous.

Example:

var rep report.Report
var cmd command.Command
err := b.Dispatch(context.TODO(), cmd, dispatch.Report(&rep))
log.Println(fmt.Sprintf("Command: %v", rep.Command()))
log.Println(fmt.Sprintf("Runtime: %v", rep.Runtime()))
log.Println(fmt.Sprintf("Error: %v", err))

func (*Bus) Run added in v0.1.2

func (b *Bus) Run(ctx context.Context) (<-chan error, error)

Run runs the command bus until ctx is canceled. If the bus is used before Run has been called, Run will be called automtically and the errors are logged to stderr.

func (*Bus) Subscribe

func (b *Bus) Subscribe(ctx context.Context, names ...string) (<-chan command.Ctx[any], <-chan error, error)

Subscribe returns a channel of Command Contexts and an error channel. The Context channel channel is registered as a handler for Commands which have one of the specified names.

Callers of Subscribe are responsible for receiving from the returned error channel to prevent a deadlock.

When a Command Bus, which uses the same underlying event Bus as Bus b, dispatches a Command, Bus b tries to assign itself as the handler for that Command. If b is assigned as the handler, a Command Context can be received from the returned channel.

It is guaranteed that only one Command Bus will handle a single Command; when a Command is received from the Context channel, no other Context channel will receive that Command.

When ctx is canceled, the remaining Commands that have already been received are pushed into the Context channel before it is closed. Use the DrainTimeout Option to specify the timeout after which the remaining Commands are being discarded.

type CommandAcceptedData

type CommandAcceptedData struct {
	ID    uuid.UUID
	BusID uuid.UUID
}

CommandAcceptedData is the event Data for the CommandAccepted Event.

type CommandAssignedData

type CommandAssignedData struct {
	ID    uuid.UUID
	BusID uuid.UUID
}

CommandAssignedData is the event Data for the CommandAssigned Event.

type CommandDispatchedData

type CommandDispatchedData struct {
	// ID is the unique Command ID.
	ID uuid.UUID

	// Name is the name of the Command.
	Name string

	// AggregateName is the name of the  aggregate the Command belongs to.
	// (optional)
	AggregateName string

	// AggregateID is the ID of the aggregate the Command belongs to. (optional)
	AggregateID uuid.UUID

	// Payload is the encoded domain-specific Command Payload.
	Payload []byte
}

CommandDispatchedData is the event Data for the CommandDispatched Event.

type CommandExecutedData

type CommandExecutedData struct {
	ID      uuid.UUID
	Runtime time.Duration
	Error   string
}

CommandExecutedData is the event Data for the CommandExecuted Event.

type CommandRequestedData

type CommandRequestedData struct {
	ID    uuid.UUID
	BusID uuid.UUID
}

CommandRequestedData is the event Data for the CommandRequested Event.

type ExecutionError

type ExecutionError[P any] struct {
	Cmd command.Of[P]
	Err error
}

ExecutionError is the error returned by a Bus when doing a synchronous dispatch and the execution of the Command fails.

func ExecError

func ExecError[P any](err error) (*ExecutionError[P], bool)

ExecError unwraps err as an *ExecutionError.

func (*ExecutionError[P]) Error

func (err *ExecutionError[P]) Error() string

func (*ExecutionError[P]) Unwrap

func (err *ExecutionError[P]) Unwrap() error

type Option

type Option func(*Bus)

Option is a command bus option.

func AssignTimeout

func AssignTimeout(dur time.Duration) Option

AssignTimeout returns an Option that configures the timeout when assigning a Command to a Handler. A zero Duration means no timeout.

A zero Duration means no timeout. The default timeout is 5s.

func Debug

func Debug(debug bool) Option

Debug returns an Option that toggles the debug mode of the command bus.

func DrainTimeout deprecated

func DrainTimeout(dur time.Duration) Option

Deprecated: Use ReceiveTimeout instead.

func ReceiveTimeout added in v0.1.2

func ReceiveTimeout(dur time.Duration) Option

ReceiveTimeout returns an Option that configures the timeout for receiving a command context from the command bus. If the command is not received from the returned channel within the configured timeout, the command is dropped.

A zero Duration means no timeout. The default timeout is 10s.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL