bus

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2023 License: GPL-3.0 Imports: 5 Imported by: 0

Documentation

Overview

Mediator style message bus adapted to the Go language without requiring the reflect package. Message members are suffixed with an underscore to avoid name conflicts it a command need a field named "Name" for example.

Performance wise, it is lightly slower than direct calls but you get a lower coupling and a way to add middlewares to your handlers.

Index

Constants

This section is empty.

Variables

View Source
var ErrNoHandlerRegistered = errors.New("no_handler_registered")
View Source
var Marshallable = storage.NewDiscriminatedMapper(func(r Request) string { return r.Name_() })

Contains message which can be unmarshalled from a raw string (= those used in the scheduler).

Functions

func On

func On[TSignal Signal](bus Bus, handler SignalHandler[TSignal])

Register a signal handler for the given signal. Multiple signals can be registered for the same signal and will all be called.

func Register

func Register[TResult any, TMsg TypedRequest[TResult]](bus Bus, handler RequestHandler[TResult, TMsg])

Register an handler for a specific request on the provided bus.

func Send

func Send[TResult any, TMsg TypedRequest[TResult]](bus Dispatcher, ctx context.Context, msg TMsg) (TResult, error)

Send the given message to the bus and return the result. This method ensure type safety when dispatching a request.

Types

type Bus

type Bus interface {
	Dispatcher
	Register(Message, NextFunc) // Register an handler for a specific message kind, even if you can use this method directly, you should prefer the typed version bus.Register and bus.On
}

Dispatcher with registration capabilities.

type Command

type Command[T any] struct{}

Request to mutate the system. Implements the TypedRequest interface.

func (Command[T]) Kind_

func (Command[T]) Kind_() MessageKind

type DefaultScheduler

type DefaultScheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(adapter SchedulerAdapter, log log.Logger, bus Dispatcher) *DefaultScheduler

Builds up a new scheduler used to queue messages for later dispatching using the provided adapter.

func (*DefaultScheduler) GetNextPendingJobs

func (s *DefaultScheduler) GetNextPendingJobs(ctx context.Context) ([]ScheduledJob, error)

func (*DefaultScheduler) Process

func (s *DefaultScheduler) Process(ctx context.Context, job ScheduledJob) error

func (*DefaultScheduler) Queue

func (s *DefaultScheduler) Queue(
	ctx context.Context,
	msg Request,
	dedupeName monad.Maybe[string],
	policy JobErrPolicy,
) error

type Dispatcher

type Dispatcher interface {
	Send(ctx context.Context, msg Request) (any, error) // Send the given request message to the bus
	Notify(ctx context.Context, msgs ...Signal) error   // Call every signal handlers attached to the given signals
}

Dispatcher is the interface used to send messages to the bus.

type JobErrPolicy

type JobErrPolicy uint8
const (
	JobErrPolicyRetry  JobErrPolicy = iota // Retry the job if it fails
	JobErrPolicyIgnore                     // Mark the job as done even if an error is returned
)

type Message

type Message interface {
	Name_() string      // Unique name of the message (here to not require reflection)
	Kind_() MessageKind // Type of the message to be able to customize middlewares
}

Message which can be sent in the bus and handled by a registered handler.

type MessageKind

type MessageKind uint8

Represent the kind of a message being dispatched. This is especially useful for middlewares to adapt their behavior depending on the message kind.

For example, a command may need a transaction whereas a query may not.

const (
	MessageKindNotification MessageKind = iota
	MessageKindCommand
	MessageKindQuery
)

type NextFunc

type NextFunc func(context.Context, Message) (any, error)

Generic handler (as seen by middlewares).

type Notification

type Notification struct{}

Message without result implementing the Signal interface.

func (Notification) Kind_

func (Notification) Kind_() MessageKind

type Query

type Query[T any] struct{}

Request to query the system. Implements the TypedRequest interface.

func (Query[T]) Kind_

func (Query[T]) Kind_() MessageKind

type Request

type Request interface {
	Message
	// contains filtered or unexported methods
}

Message which requires a result.

type RequestHandler

type RequestHandler[TResult any, TMsg TypedRequest[TResult]] func(context.Context, TMsg) (TResult, error)

Handler for a specific message.

type ScheduledJob

type ScheduledJob interface {
	ID() string           // Unique id of the job
	Message() Request     // Message to be dispatched
	Policy() JobErrPolicy // What to do when the dispatch has failed
}

Represents a request that has been queued for dispatching.

type Scheduler

type Scheduler interface {
	// Queue a request to be dispatched asynchronously at a later time.
	// The string parameter is the dedupe name and provide a way to avoid multiple
	// messages sharing the same dedupe name to be processed at the same time.
	//
	// You MUST register the type of Request using bus.RegisterForMarshalling[YourRequest]()
	// to make sure the scheduler will be able to (de)serialize the request when persisting.
	Queue(context.Context, Request, monad.Maybe[string], JobErrPolicy) error
}

Enable scheduled dispatching of a message.

type SchedulerAdapter

type SchedulerAdapter interface {
	Setup() error                                                             // Setup the adapter
	Create(context.Context, Request, monad.Maybe[string], JobErrPolicy) error // Create a new scheduled job
	GetNextPendingJobs(context.Context) ([]ScheduledJob, error)               // Get the next pending jobs to be dispatched
	Retry(context.Context, ScheduledJob, error) error                         // Retry the given job with the given reason
	Done(context.Context, ScheduledJob) error                                 // Mark the given job as done
}

Adapter used to store scheduled jobs. Could be anything from a database to a file or an in-memory store.

type Signal

type Signal interface {
	Message
	// contains filtered or unexported methods
}

Signal which do not need a result.

type SignalHandler

type SignalHandler[TSignal Signal] func(context.Context, TSignal) error

Handler for signal.

type TypedRequest

type TypedRequest[T any] interface {
	Request
	// contains filtered or unexported methods
}

Request with a typed result.

type UnitType

type UnitType uint8

Sometimes, you may not need a result type on a request but the RequestHandler expect one, just use this type as the result type and the bus.Unit as the return value.

const Unit UnitType = iota

Constant unit value to return when a request does not need a specific result set. In my mind, it should avoid the cost of allocating something not needed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL