Documentation ¶
Overview ¶
Mediator style message bus adapted to the Go language without requiring the reflect package. Message members are suffixed with an underscore to avoid name conflicts it a command need a field named "Name" for example.
Performance wise, it is lightly slower than direct calls but you get a lower coupling and a way to add middlewares to your handlers.
Index ¶
- Variables
- func On[TSignal Signal](bus Bus, handler SignalHandler[TSignal])
- func Register[TResult any, TMsg TypedRequest[TResult]](bus Bus, handler RequestHandler[TResult, TMsg])
- func Send[TResult any, TMsg TypedRequest[TResult]](bus Dispatcher, ctx context.Context, msg TMsg) (TResult, error)
- type Bus
- type Command
- type DefaultScheduler
- type Dispatcher
- type JobErrPolicy
- type Message
- type MessageKind
- type NextFunc
- type Notification
- type Query
- type Request
- type RequestHandler
- type ScheduledJob
- type Scheduler
- type SchedulerAdapter
- type Signal
- type SignalHandler
- type TypedRequest
- type UnitType
Constants ¶
This section is empty.
Variables ¶
var ErrNoHandlerRegistered = errors.New("no_handler_registered")
var Marshallable = storage.NewDiscriminatedMapper(func(r Request) string { return r.Name_() })
Contains message which can be unmarshalled from a raw string (= those used in the scheduler).
Functions ¶
func On ¶
func On[TSignal Signal](bus Bus, handler SignalHandler[TSignal])
Register a signal handler for the given signal. Multiple signals can be registered for the same signal and will all be called.
func Register ¶
func Register[TResult any, TMsg TypedRequest[TResult]](bus Bus, handler RequestHandler[TResult, TMsg])
Register an handler for a specific request on the provided bus.
Types ¶
type Bus ¶
type Bus interface { Dispatcher Register(Message, NextFunc) // Register an handler for a specific message kind, even if you can use this method directly, you should prefer the typed version bus.Register and bus.On }
Dispatcher with registration capabilities.
type Command ¶
type Command[T any] struct{}
Request to mutate the system. Implements the TypedRequest interface.
func (Command[T]) Kind_ ¶
func (Command[T]) Kind_() MessageKind
type DefaultScheduler ¶
type DefaultScheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(adapter SchedulerAdapter, log log.Logger, bus Dispatcher) *DefaultScheduler
Builds up a new scheduler used to queue messages for later dispatching using the provided adapter.
func (*DefaultScheduler) GetNextPendingJobs ¶
func (s *DefaultScheduler) GetNextPendingJobs(ctx context.Context) ([]ScheduledJob, error)
func (*DefaultScheduler) Process ¶
func (s *DefaultScheduler) Process(ctx context.Context, job ScheduledJob) error
type Dispatcher ¶
type Dispatcher interface { Send(ctx context.Context, msg Request) (any, error) // Send the given request message to the bus Notify(ctx context.Context, msgs ...Signal) error // Call every signal handlers attached to the given signals }
Dispatcher is the interface used to send messages to the bus.
type JobErrPolicy ¶
type JobErrPolicy uint8
const ( JobErrPolicyRetry JobErrPolicy = iota // Retry the job if it fails JobErrPolicyIgnore // Mark the job as done even if an error is returned )
type Message ¶
type Message interface { Name_() string // Unique name of the message (here to not require reflection) Kind_() MessageKind // Type of the message to be able to customize middlewares }
Message which can be sent in the bus and handled by a registered handler.
type MessageKind ¶
type MessageKind uint8
Represent the kind of a message being dispatched. This is especially useful for middlewares to adapt their behavior depending on the message kind.
For example, a command may need a transaction whereas a query may not.
const ( MessageKindNotification MessageKind = iota MessageKindCommand MessageKindQuery )
type Notification ¶
type Notification struct{}
Message without result implementing the Signal interface.
func (Notification) Kind_ ¶
func (Notification) Kind_() MessageKind
type Query ¶
type Query[T any] struct{}
Request to query the system. Implements the TypedRequest interface.
func (Query[T]) Kind_ ¶
func (Query[T]) Kind_() MessageKind
type Request ¶
type Request interface { Message // contains filtered or unexported methods }
Message which requires a result.
type RequestHandler ¶
type RequestHandler[TResult any, TMsg TypedRequest[TResult]] func(context.Context, TMsg) (TResult, error)
Handler for a specific message.
type ScheduledJob ¶
type ScheduledJob interface { ID() string // Unique id of the job Message() Request // Message to be dispatched Policy() JobErrPolicy // What to do when the dispatch has failed }
Represents a request that has been queued for dispatching.
type Scheduler ¶
type Scheduler interface { // Queue a request to be dispatched asynchronously at a later time. // The string parameter is the dedupe name and provide a way to avoid multiple // messages sharing the same dedupe name to be processed at the same time. // // You MUST register the type of Request using bus.RegisterForMarshalling[YourRequest]() // to make sure the scheduler will be able to (de)serialize the request when persisting. Queue(context.Context, Request, monad.Maybe[string], JobErrPolicy) error }
Enable scheduled dispatching of a message.
type SchedulerAdapter ¶
type SchedulerAdapter interface { Setup() error // Setup the adapter Create(context.Context, Request, monad.Maybe[string], JobErrPolicy) error // Create a new scheduled job GetNextPendingJobs(context.Context) ([]ScheduledJob, error) // Get the next pending jobs to be dispatched Retry(context.Context, ScheduledJob, error) error // Retry the given job with the given reason Done(context.Context, ScheduledJob) error // Mark the given job as done }
Adapter used to store scheduled jobs. Could be anything from a database to a file or an in-memory store.
type Signal ¶
type Signal interface { Message // contains filtered or unexported methods }
Signal which do not need a result.
type SignalHandler ¶
Handler for signal.
type TypedRequest ¶
Request with a typed result.