command

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2022 License: MIT Imports: 8 Imported by: 7

README

Command System

goes provides a distributed, event-driven command system for inter-process command handling. The command system communicates and coordinates between different services/processes to ensure that a command is sent to the appropriate handler.

The command system is only needed in applications that consist of multiple (micro-)services that need to dispatch commands to one another over a network.

Setup

The command system communicates between processes using the Event System. Use the cmdbus.New constructor to create the event-driven command bus.

package example

import (
  "github.com/modernice/goes/codec"
  "github.com/modernice/goes/command/cmdbus"
  "github.com/modernice/goes/event/eventbus"
)

func example() {
  ereg := codec.New() // Event registry
  creg := codec.New() // Command registry
  ebus := eventbus.New() // In-memory event bus

  cbus := cmdbus.New(creg, ereg, ebus)

  // Subscribe to commands
  commands, errs, err := cbus.Subscribe(context.TODO(), ...)

  // Dispatch commands
  err := cbus.Dispatch(context.TODO(), ...)
}

Define Commands

Just like events, commands need to be defined and most likely to be registered into a command registry for encoding/decoding.

// Package auth is an example authentication service.
package auth

const UserAggregate = "auth.user"

type User struct { ... } // User aggregate

// Events
const (
  UserRegistered = "auth.user.registered"
)

type UserRegisteredData struct { ... }

func (u *User) Register(name, email string) error {
  // Implementation ...
  return nil
}

// Commands
const (
  RegisterUserCmd = "auth.user.register"
)

type registerUserPayload struct {
  Name  string
  Email string
}

// RegisterUser returns the command to register a user with the given name and
// email address.
func RegisterUser(id uuid.UUID, name, email string) command.Command {
  return command.New(
    RegisterUserCmd,
    registerUserPayload{Name: name, Email: email},
    command.Aggregate(UserAggregate, id), // Bind the command to an aggregate
  )
}

func RegisterCommands(r *codec.Registry) {
  gr := codec.Gob(r)
  gr.GobRegister(RegisterUserCmd, func() interface{} {
    return registerUserPayload{}
  })
}

Dispatch Commands

The command.Bus.Dispatch method dispatches commands between services. Commands can be created with command.New(), which accepts the name of the command and the command payload.

package example

import (
  "github.com/modernice/goes/command"
)

func dispatchCommand(bus command.Bus) {
  cmd := command.New("cmd-name", somePayload{...})
  err := bus.Dispatch(context.TODO(), cmd)
}
Synchronous Dispatch

By default, dispatches are asynchronous, which means that dispatching a command blocks only until a command handler has been selected and not until the command was actually handled. This means that errors that happen during the command handling are not reported back to the dispatcher. In order to block the dispatch until the command has been handled, pass the dispatch.Sync() dispatch option. When using this option, the command.Bus.Dispatch method also reports the error of the command handler.

package example

import (
  "github.com/modernice/goes/command"
  "github.com/modernice/goes/command/cmdbus/dispatch"
)

func syncDispatch(bus command.Bus) {
  cmd := command.New("cmd-name", somePayload{...})
  err := bus.Dispatch(context.TODO(), cmd, dispatch.Sync())
}

Command Handling

The command.Bus.Subscribe method subscribes to commands and returns two channels: one for the incoming commands and one for any asynchronous error of the command bus. These allow for flexible handling of commands and errors, but requires quite a bit of boilerplate code to actually handle commands. The command.Handler provides convenience methods around the command bus to allow for simpler command handling.

// Package auth is an example authentication service.
package auth

// All the previous code ...

func HandleCommands(
  ctx context.Context,
  bus command.Bus,
  repo aggregate.Repository,
) <-chan error {
  h := command.NewHandler(bus)

  registerErrors := h.MustHandle(
    ctx, RegisterUserCmd,
    func(ctx command.Context) error {
      load := ctx.Payload().(registerUserPayload)

      u := NewUser(ctx.AggregateID())

      if err := repo.Fetch(ctx, u); err != nil {
        return fmt.Errorf("fetch user: %w [id=%v]", err, ctx.AggregateID())
      }

      if err := u.Register(load.Name, load.Email); err != nil {
        return err
      }

      if err := repo.Save(ctx, u); err != nil {
        return fmt.Errorf("save user: %w", err)
      }

      return nil
    },
  )

  return registerErrrors
}

Built-in Commands

goes provides built-in, commonly needed commands and command handlers.

package example

import (
  "github.com/modernice/goes/aggregate/repository"
  "github.com/modernice/goes/codec"
  "github.com/modernice/goes/command/builtin"
  "github.com/modernice/goes/command/cmdbus"
)

func example(ebus event.Bus) {
  ereg := codec.New() // Event registry
  creg := codec.New() // Command registry
  cbus := cmdbus.New(creg, ereg, ebus)
  repo := repository.New(ebus)
  
  builtin.RegisterEvents(ereg)
  builtin.RegisterCommands(creg)

  errs := builtin.MustHandle(context.TODO(), cbus, repo)

  for err := range errs {
    log.Printf("Failed to handle built-in command: %v")
  }
}
Delete an Aggregate

The builtin.DeleteAggregate() command deletes an aggregate by deleting its event stream from the event store.

package example

import (
  "github.com/modernice/goes/aggregate/repository"
  "github.com/modernice/goes/codec"
  "github.com/modernice/goes/command/builtin"
  "github.com/modernice/goes/command/cmdbus"
)

func example(bus command.Bus) {
  userID := uuid.New() // Get this from somewhere
  cmd := builtin.DeleteAggregate("auth.user", userID)

  err := bus.Dispatch(context.TODO(), cmd)
  // handle err
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrAlreadyFinished is returned when a Command is finished multiple times.
	ErrAlreadyFinished = errors.New("command already finished")
)

Functions

func Handle

func Handle(ctx context.Context, bus Bus, name string, handler func(Context) error) (<-chan error, error)

Handle is a shortcut for

NewHandler(bus).Handle(ctx, name, handler)

func MustHandle

func MustHandle(ctx context.Context, bus Bus, name string, handler func(Context) error) <-chan error

MustHandle is a shortcut for

NewHandler(bus).MustHandle(ctx, name, handler)

Types

type Bus

type Bus interface {
	// Dispatch sends the Command to the appropriate subscriber. Dispatch must
	// only return nil if the Command has been successfully received by a
	// subscriber.
	Dispatch(context.Context, Command, ...DispatchOption) error

	// Subscribe subscribes to Commands with the given names and returns a
	// channel of Contexts. Implementations of Bus must ensure that Commands
	// aren't received by multiple subscribers.
	Subscribe(ctx context.Context, names ...string) (<-chan Context, <-chan error, error)
}

A Bus dispatches Commands to appropriate handlers.

type Cmd

type Cmd struct {
	Data Data
}

Cmd is the implementation of Command.

func New

func New(name string, pl interface{}, opts ...Option) Cmd

New returns a new command with the given name and payload.

func (Cmd) Aggregate

func (cmd Cmd) Aggregate() (uuid.UUID, string)

Aggregate returns the attached aggregate data.

func (Cmd) ID

func (cmd Cmd) ID() uuid.UUID

ID returns the command id.

func (Cmd) Name

func (cmd Cmd) Name() string

Name returns the command name.

func (Cmd) Payload

func (cmd Cmd) Payload() interface{}

Payload returns the command payload.

type Command

type Command interface {
	// ID returns the Command ID.
	ID() uuid.UUID

	// Name returns the Command name.
	Name() string

	// Payload returns the Command Payload.
	Payload() interface{}

	// Aggregate returns the attached aggregate data.
	Aggregate() (id uuid.UUID, name string)
}

A Command represents a command in the business model of an application or service. Commands can be dispatched through a Bus to handlers of such Commands.

type Context

type Context interface {
	context.Context
	Command

	// AggregateID returns the UUID of the attached aggregate, or uuid.Nil.
	AggregateID() uuid.UUID

	// AggregateName returns the name of the attached aggregate, or an empty string.
	AggregateName() string

	// Finish should be called after the Command has been handled so that the
	// Bus that dispatched the Command can be notified about the execution
	// result.
	Finish(context.Context, ...finish.Option) error
}

Context is the context for handling Commands.

type Data

type Data struct {
	ID            uuid.UUID
	Name          string
	Payload       interface{}
	AggregateName string
	AggregateID   uuid.UUID
}

Data contains the actual fields of Cmd.

type DispatchConfig

type DispatchConfig struct {
	// A synchronous dispatch waits for the execution of the Command to finish
	// and returns the execution error if there was any.
	//
	// A dispatch is automatically made synchronous when Repoter is non-nil.
	Synchronous bool

	// If Reporter is not nil, the Bus will report the execution result of a
	// Command to Reporter by calling Reporter.Report().
	//
	// A non-nil Reporter makes the dispatch synchronous.
	Reporter Reporter
}

Config is the configuration for dispatching a Command.

type DispatchOption

type DispatchOption func(*DispatchConfig)

DispatchOption is an option for dispatching Commands.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler can be used to subscribe to and handle Commands.

func NewHandler

func NewHandler(bus Bus) *Handler

NewHandler returns a Handler for Commands that uses the provided Bus to subscribe to Commands.

func (*Handler) Handle

func (h *Handler) Handle(ctx context.Context, name string, handler func(Context) error) (<-chan error, error)

Handle registers the function handler as a handler for the given Command name. Handle subscribes to the underlying Bus for Commands with that name. When Handler is selected as the handler for a dispatched Command, handler is called with a Command Context which contains the Command and its Payload.

If Handle fails to subscribe to the Command, a nil channel and the error from the Bus is returned. Otherwise a channel of asynchronous Command errors and a nil error are returned.

When handler returns a non-nil error, that error is pushed into the returned error channel. Asynchronous errors from the underlying Command Bus are pushed into the error channel as well. Callers must receive from the returned error channel to prevent the handler from blocking indefinitely.

When ctx is canceled, the returned error channel is closed.

func (*Handler) MustHandle

func (h *Handler) MustHandle(ctx context.Context, name string, handler func(Context) error) <-chan error

MustHandle does the same as Handle, but panics if the event subscription fails.

type Option

type Option func(*Cmd)

Option is a command option.

func Aggregate

func Aggregate(name string, id uuid.UUID) Option

Aggregate returns an Option that links a Command to an Aggregate.

func ID

func ID(id uuid.UUID) Option

ID returns an Option that overrides the auto-generated UUID of a Command.

type Reporter

type Reporter interface {
	Report(report.Report)
}

A Reporter reports execution results of a Command.

Directories

Path Synopsis
Package cmdbus provides a distributed & event-driven Command Bus.
Package cmdbus provides a distributed & event-driven Command Bus.
Package mock_command is a generated GoMock package.
Package mock_command is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL