cqrs

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFacade

func NewFacade(
	redisClient redis.UniversalClient, logger Logger, router *message.Router,
	commandsPublisher, eventsPublisher message.Publisher,
	commandsSubscriber message.Subscriber,
	commandHandlers []CommanfHandlerFactory, eventHandlers []EventHandlerFactory,
) (*cqrs.Facade, error)

NewFacade creates a new CQRS Facade. Read more about cqrs component: https://watermill.io/docs/cqrs/

func NewPublisher

func NewPublisher(redisClient redis.UniversalClient, logger Logger) (*redisstream.Publisher, error)

NewPublisher initializes the events publisher based on the Redis stream client.

func NewRouter

func NewRouter(logger Logger, maxRetry int) (*message.Router, error)

NewRouter creates a new cqrs.Router. Detailed documentation: https://watermill.io/docs/messages-router/

func NewSubscriber

func NewSubscriber(redisClient redis.UniversalClient, consumerGroup string, logger Logger) (message.Subscriber, error)

NewSubscriber creates a new subscriber for the given consumer group. If consumer group empty, fan-out mode will be used.

Types

type CommandBus

type CommandBus interface {
	// Send sends command to the command bus.
	Send(ctx context.Context, cmd interface{}) error
}

CommandBus interface

type CommandHandler

type CommandHandler interface {
	// HandlerName is the name used in message.Router while creating handler.
	//
	// It will be also passed to CommandsSubscriberConstructor.
	// May be useful, for example, to create a consumer group per each handler.
	//
	// WARNING: If HandlerName was changed and is used for generating consumer groups,
	// it may result with **reconsuming all messages**!
	HandlerName() string

	NewCommand() interface{}

	Handle(ctx context.Context, cmd interface{}) error
}

CommandHandler receives a command defined by NewCommand and handles it with the Handle method. If using DDD, CommandHandler may modify and persist the aggregate.

In contrast to EventHandler, every Command must have only one CommandHandler.

One instance of CommandHandler is used during handling messages. When multiple commands are delivered at the same time, Handle method can be executed multiple times at the same time. Because of that, Handle method needs to be thread safe!

type CommanfHandlerFactory

type CommanfHandlerFactory func(cb CommandBus, eb EventBus) CommandHandler

CommanfHandlerFactory is a function that creates a new CommandHandler.

type EventBus

type EventBus interface {
	// Publish sends event to the event bus.
	Publish(ctx context.Context, event interface{}) error
}

EventBus interface

type EventHandler

type EventHandler interface {
	// HandlerName is the name used in message.Router while creating handler.
	//
	// It will be also passed to EventsSubscriberConstructor.
	// May be useful, for example, to create a consumer group per each handler.
	//
	// WARNING: If HandlerName was changed and is used for generating consumer groups,
	// it may result with **reconsuming all messages** !!!
	HandlerName() string

	NewEvent() interface{}

	Handle(ctx context.Context, event interface{}) error
}

EventHandler receives events defined by NewEvent and handles them with its Handle method. If using DDD, CommandHandler may modify and persist the aggregate. It can also invoke a process manager, a saga or just build a read model.

In contrast to CommandHandler, every Event can have multiple EventHandlers.

One instance of EventHandler is used during handling messages. When multiple events are delivered at the same time, Handle method can be executed multiple times at the same time. Because of that, Handle method needs to be thread safe!

type EventHandlerFactory

type EventHandlerFactory func(cb CommandBus, eb EventBus) EventHandler

EventHandlerFactory is a function that creates a new EventHandler.

type Logger

type Logger interface {
	Error(msg string, err error, fields watermill.LogFields)
	Info(msg string, fields watermill.LogFields)
	Debug(msg string, fields watermill.LogFields)
	Trace(msg string, fields watermill.LogFields)
	With(fields watermill.LogFields) watermill.LoggerAdapter
}

Logger is an interface, that you need to implement to support Watermill logging. You can use watermill.StdLoggerAdapter as a reference implementation.

func NewLogrusWrapper

func NewLogrusWrapper(log *logrus.Entry) Logger

NewLogrusWrapper returns new instance of logrusWrapper

Directories

Path Synopsis
_example
app
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL