Documentation
¶
Index ¶
- func NewFacade(redisClient redis.UniversalClient, logger Logger, router *message.Router, ...) (*cqrs.Facade, error)
- func NewPublisher(redisClient redis.UniversalClient, logger Logger) (*redisstream.Publisher, error)
- func NewRouter(logger Logger, maxRetry int) (*message.Router, error)
- func NewSubscriber(redisClient redis.UniversalClient, consumerGroup string, logger Logger) (message.Subscriber, error)
- type CommandBus
- type CommandHandler
- type CommanfHandlerFactory
- type EventBus
- type EventHandler
- type EventHandlerFactory
- type Logger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewFacade ¶
func NewFacade( redisClient redis.UniversalClient, logger Logger, router *message.Router, commandsPublisher, eventsPublisher message.Publisher, commandsSubscriber message.Subscriber, commandHandlers []CommanfHandlerFactory, eventHandlers []EventHandlerFactory, ) (*cqrs.Facade, error)
NewFacade creates a new CQRS Facade. Read more about cqrs component: https://watermill.io/docs/cqrs/
func NewPublisher ¶
func NewPublisher(redisClient redis.UniversalClient, logger Logger) (*redisstream.Publisher, error)
NewPublisher initializes the events publisher based on the Redis stream client.
func NewRouter ¶
NewRouter creates a new cqrs.Router. Detailed documentation: https://watermill.io/docs/messages-router/
func NewSubscriber ¶
func NewSubscriber(redisClient redis.UniversalClient, consumerGroup string, logger Logger) (message.Subscriber, error)
NewSubscriber creates a new subscriber for the given consumer group. If consumer group empty, fan-out mode will be used.
Types ¶
type CommandBus ¶
type CommandBus interface { // Send sends command to the command bus. Send(ctx context.Context, cmd interface{}) error }
CommandBus interface
type CommandHandler ¶
type CommandHandler interface { // HandlerName is the name used in message.Router while creating handler. // // It will be also passed to CommandsSubscriberConstructor. // May be useful, for example, to create a consumer group per each handler. // // WARNING: If HandlerName was changed and is used for generating consumer groups, // it may result with **reconsuming all messages**! HandlerName() string NewCommand() interface{} Handle(ctx context.Context, cmd interface{}) error }
CommandHandler receives a command defined by NewCommand and handles it with the Handle method. If using DDD, CommandHandler may modify and persist the aggregate.
In contrast to EventHandler, every Command must have only one CommandHandler.
One instance of CommandHandler is used during handling messages. When multiple commands are delivered at the same time, Handle method can be executed multiple times at the same time. Because of that, Handle method needs to be thread safe!
type CommanfHandlerFactory ¶
type CommanfHandlerFactory func(cb CommandBus, eb EventBus) CommandHandler
CommanfHandlerFactory is a function that creates a new CommandHandler.
type EventBus ¶
type EventBus interface { // Publish sends event to the event bus. Publish(ctx context.Context, event interface{}) error }
EventBus interface
type EventHandler ¶
type EventHandler interface { // HandlerName is the name used in message.Router while creating handler. // // It will be also passed to EventsSubscriberConstructor. // May be useful, for example, to create a consumer group per each handler. // // WARNING: If HandlerName was changed and is used for generating consumer groups, // it may result with **reconsuming all messages** !!! HandlerName() string NewEvent() interface{} Handle(ctx context.Context, event interface{}) error }
EventHandler receives events defined by NewEvent and handles them with its Handle method. If using DDD, CommandHandler may modify and persist the aggregate. It can also invoke a process manager, a saga or just build a read model.
In contrast to CommandHandler, every Event can have multiple EventHandlers.
One instance of EventHandler is used during handling messages. When multiple events are delivered at the same time, Handle method can be executed multiple times at the same time. Because of that, Handle method needs to be thread safe!
type EventHandlerFactory ¶
type EventHandlerFactory func(cb CommandBus, eb EventBus) EventHandler
EventHandlerFactory is a function that creates a new EventHandler.
type Logger ¶
type Logger interface { Error(msg string, err error, fields watermill.LogFields) Info(msg string, fields watermill.LogFields) Debug(msg string, fields watermill.LogFields) Trace(msg string, fields watermill.LogFields) With(fields watermill.LogFields) watermill.LoggerAdapter }
Logger is an interface, that you need to implement to support Watermill logging. You can use watermill.StdLoggerAdapter as a reference implementation.
func NewLogrusWrapper ¶
NewLogrusWrapper returns new instance of logrusWrapper