Documentation ¶
Index ¶
- func CtxWithOriginalMessage(ctx context.Context, msg *message.Message) context.Context
- func FullyQualifiedStructName(v interface{}) string
- func NamedStruct(fallback func(v interface{}) string) func(v interface{}) string
- func OriginalMessageFromCtx(ctx context.Context) *message.Message
- func StructName(v interface{}) string
- type CommandBus
- type CommandBusConfig
- type CommandBusGeneratePublishTopicFn
- type CommandBusGeneratePublishTopicParams
- type CommandBusOnSendFn
- type CommandBusOnSendParams
- type CommandEventMarshaler
- type CommandHandler
- type CommandProcessor
- type CommandProcessorConfig
- type CommandProcessorGenerateSubscribeTopicFn
- type CommandProcessorGenerateSubscribeTopicParams
- type CommandProcessorOnHandleFn
- type CommandProcessorOnHandleParams
- type CommandProcessorSubscriberConstructorFn
- type CommandProcessorSubscriberConstructorParams
- type CommandsSubscriberConstructordeprecated
- type DuplicateCommandHandlerError
- type EventBus
- type EventBusConfig
- type EventGroupProcessor
- type EventGroupProcessorConfig
- type EventGroupProcessorGenerateSubscribeTopicFn
- type EventGroupProcessorGenerateSubscribeTopicParams
- type EventGroupProcessorOnHandleFn
- type EventGroupProcessorOnHandleParams
- type EventGroupProcessorSubscriberConstructorFn
- type EventGroupProcessorSubscriberConstructorParams
- type EventHandler
- type EventProcessor
- type EventProcessorConfig
- type EventProcessorGenerateSubscribeTopicFn
- type EventProcessorGenerateSubscribeTopicParams
- type EventProcessorOnHandleFn
- type EventProcessorOnHandleParams
- type EventProcessorSubscriberConstructorFn
- type EventProcessorSubscriberConstructorParams
- type EventsSubscriberConstructor
- type Facadedeprecated
- type FacadeConfigdeprecated
- type GenerateEventPublishTopicFn
- type GenerateEventPublishTopicParams
- type GroupEventHandler
- type JSONMarshaler
- type NoProtoMessageError
- type NonPointerError
- type OnEventSendFn
- type OnEventSendParams
- type ProtobufMarshaler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CtxWithOriginalMessage ¶ added in v1.3.5
CtxWithOriginalMessage returns a new context with the original message attached.
func FullyQualifiedStructName ¶ added in v1.0.0
func FullyQualifiedStructName(v interface{}) string
FullyQualifiedStructName name returns object name in format [package].[type name]. It ignores if the value is a pointer or not.
func NamedStruct ¶ added in v1.0.0
NamedStruct returns the name from a message implementing the following interface:
type namedStruct interface { Name() string }
It ignores if the value is a pointer or not.
func OriginalMessageFromCtx ¶ added in v1.3.5
OriginalMessageFromCtx returns the original message that was received by the event/command handler.
func StructName ¶ added in v1.0.0
func StructName(v interface{}) string
StructName name returns struct name in format [type name]. It ignores if the value is a pointer or not.
Types ¶
type CommandBus ¶
type CommandBus struct {
// contains filtered or unexported fields
}
CommandBus transports commands to command handlers.
func NewCommandBus ¶
func NewCommandBus( publisher message.Publisher, generateTopic func(commandName string) string, marshaler CommandEventMarshaler, ) (*CommandBus, error)
NewCommandBus creates a new CommandBus. Deprecated: use NewCommandBusWithConfig instead.
func NewCommandBusWithConfig ¶ added in v1.3.0
func NewCommandBusWithConfig(publisher message.Publisher, config CommandBusConfig) (*CommandBus, error)
NewCommandBusWithConfig creates a new CommandBus.
func (CommandBus) Send ¶
func (c CommandBus) Send(ctx context.Context, cmd any) error
Send sends command to the command bus.
func (CommandBus) SendWithModifiedMessage ¶ added in v1.3.5
type CommandBusConfig ¶ added in v1.3.0
type CommandBusConfig struct { // GeneratePublishTopic is used to generate topic for publishing command. GeneratePublishTopic CommandBusGeneratePublishTopicFn // OnSend is called before publishing the command. // The *message.Message can be modified. // // This option is not required. OnSend CommandBusOnSendFn // Marshaler is used to marshal and unmarshal commands. // It is required. Marshaler CommandEventMarshaler // Logger instance used to log. // If not provided, watermill.NopLogger is used. Logger watermill.LoggerAdapter }
func (CommandBusConfig) Validate ¶ added in v1.3.0
func (c CommandBusConfig) Validate() error
type CommandBusGeneratePublishTopicFn ¶ added in v1.3.0
type CommandBusGeneratePublishTopicFn func(CommandBusGeneratePublishTopicParams) (string, error)
type CommandBusGeneratePublishTopicParams ¶ added in v1.3.0
type CommandBusOnSendFn ¶ added in v1.3.0
type CommandBusOnSendFn func(params CommandBusOnSendParams) error
type CommandBusOnSendParams ¶ added in v1.3.0
type CommandEventMarshaler ¶
type CommandEventMarshaler interface { // Marshal marshals Command or Event to Watermill's message. Marshal(v interface{}) (*message.Message, error) // Unmarshal unmarshals watermill's message to v Command or Event. Unmarshal(msg *message.Message, v interface{}) (err error) // Name returns the name of Command or Event. // Name is used to determine, that received command or event is event which we want to handle. Name(v interface{}) string // NameFromMessage returns the name of Command or Event from Watermill's message (generated by Marshal). // // When we have Command or Event marshaled to Watermill's message, // we should use NameFromMessage instead of Name to avoid unnecessary unmarshaling. NameFromMessage(msg *message.Message) string }
CommandEventMarshaler marshals Commands and Events to Watermill's messages and vice versa. Payload of the command needs to be marshaled to []bytes.
type CommandHandler ¶
type CommandHandler interface { // HandlerName is the name used in message.Router while creating handler. // // It will be also passed to CommandsSubscriberConstructor. // May be useful, for example, to create a consumer group per each handler. // // WARNING: If HandlerName was changed and is used for generating consumer groups, // it may result with **reconsuming all messages**! HandlerName() string NewCommand() any Handle(ctx context.Context, cmd any) error }
CommandHandler receives a command defined by NewCommand and handles it with the Handle method. If using DDD, CommandHandler may modify and persist the aggregate.
In contrast to EventHandler, every Command must have only one CommandHandler.
One instance of CommandHandler is used during handling messages. When multiple commands are delivered at the same time, Handle method can be executed multiple times at the same time. Because of that, Handle method needs to be thread safe!
func NewCommandHandler ¶ added in v1.3.0
func NewCommandHandler[Command any]( handlerName string, handleFunc func(ctx context.Context, cmd *Command) error, ) CommandHandler
NewCommandHandler creates a new CommandHandler implementation based on provided function and command type inferred from function argument.
type CommandProcessor ¶
type CommandProcessor struct {
// contains filtered or unexported fields
}
CommandProcessor determines which CommandHandler should handle the command received from the command bus.
func NewCommandProcessor ¶
func NewCommandProcessor( handlers []CommandHandler, generateTopic func(commandName string) string, subscriberConstructor CommandsSubscriberConstructor, marshaler CommandEventMarshaler, logger watermill.LoggerAdapter, ) (*CommandProcessor, error)
NewCommandProcessor creates a new CommandProcessor. Deprecated. Use NewCommandProcessorWithConfig instead.
func NewCommandProcessorWithConfig ¶ added in v1.3.0
func NewCommandProcessorWithConfig(router *message.Router, config CommandProcessorConfig) (*CommandProcessor, error)
func (*CommandProcessor) AddHandlers ¶ added in v1.3.0
func (p *CommandProcessor) AddHandlers(handlers ...CommandHandler) error
AddHandlers adds a new CommandHandler to the CommandProcessor and adds it to the router.
func (CommandProcessor) AddHandlersToRouter ¶
func (p CommandProcessor) AddHandlersToRouter(r *message.Router) error
AddHandlersToRouter adds the CommandProcessor's handlers to the given router. It should be called only once per CommandProcessor instance.
It is required to call AddHandlersToRouter only if command processor is created with NewCommandProcessor (disableRouterAutoAddHandlers is set to true). Deprecated: please migrate to command processor created by NewCommandProcessorWithConfig.
func (CommandProcessor) Handlers ¶
func (p CommandProcessor) Handlers() []CommandHandler
Handlers returns the CommandProcessor's handlers.
type CommandProcessorConfig ¶ added in v1.3.0
type CommandProcessorConfig struct { // GenerateSubscribeTopic is used to generate topic for subscribing command. GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn // SubscriberConstructor is used to create subscriber for CommandHandler. SubscriberConstructor CommandProcessorSubscriberConstructorFn // OnHandle is called before handling command. // OnHandle works in a similar way to middlewares: you can inject additional logic before and after handling a command. // // Because of that, you need to explicitly call params.Handler.Handle() to handle the command. // func(params CommandProcessorOnHandleParams) (err error) { // // logic before handle // // (...) // // err := params.Handler.Handle(params.Message.Context(), params.Command) // // // logic after handle // // (...) // // return err // } // // This option is not required. OnHandle CommandProcessorOnHandleFn // Marshaler is used to marshal and unmarshal commands. // It is required. Marshaler CommandEventMarshaler // Logger instance used to log. // If not provided, watermill.NopLogger is used. Logger watermill.LoggerAdapter // If true, CommandProcessor will ack messages even if CommandHandler returns an error. // If RequestReplyBackend is not null and sending reply fails, the message will be nack-ed anyway. // // Warning: It's not recommended to use this option when you are using requestreply component // (requestreply.NewCommandHandler or requestreply.NewCommandHandlerWithResult), as it may ack the // command when sending reply failed. // // When you are using requestreply, you should use requestreply.PubSubBackendConfig.AckCommandErrors. AckCommandHandlingErrors bool // contains filtered or unexported fields }
func (CommandProcessorConfig) Validate ¶ added in v1.3.0
func (c CommandProcessorConfig) Validate() error
type CommandProcessorGenerateSubscribeTopicFn ¶ added in v1.3.0
type CommandProcessorGenerateSubscribeTopicFn func(CommandProcessorGenerateSubscribeTopicParams) (string, error)
type CommandProcessorGenerateSubscribeTopicParams ¶ added in v1.3.0
type CommandProcessorGenerateSubscribeTopicParams struct { CommandName string CommandHandler CommandHandler }
type CommandProcessorOnHandleFn ¶ added in v1.3.0
type CommandProcessorOnHandleFn func(params CommandProcessorOnHandleParams) error
type CommandProcessorOnHandleParams ¶ added in v1.3.0
type CommandProcessorOnHandleParams struct { Handler CommandHandler CommandName string Command any // Message is never nil and can be modified. Message *message.Message }
type CommandProcessorSubscriberConstructorFn ¶ added in v1.3.0
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
CommandProcessorSubscriberConstructorFn creates subscriber for CommandHandler. It allows you to create a separate customized Subscriber for every command handler.
type CommandProcessorSubscriberConstructorParams ¶ added in v1.3.0
type CommandProcessorSubscriberConstructorParams struct { HandlerName string Handler CommandHandler }
type CommandsSubscriberConstructor
deprecated
added in
v0.4.0
type CommandsSubscriberConstructor func(handlerName string) (message.Subscriber, error)
CommandsSubscriberConstructor creates subscriber for CommandHandler. It allows you to create a separate customized Subscriber for every command handler.
Deprecated: please use CommandProcessorSubscriberConstructorFn instead.
type DuplicateCommandHandlerError ¶ added in v0.4.0
type DuplicateCommandHandlerError struct {
CommandName string
}
DuplicateCommandHandlerError occurs when a handler with the same name already exists.
func (DuplicateCommandHandlerError) Error ¶ added in v0.4.0
func (d DuplicateCommandHandlerError) Error() string
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus transports events to event handlers.
func NewEventBus ¶
func NewEventBus( publisher message.Publisher, generateTopic func(eventName string) string, marshaler CommandEventMarshaler, ) (*EventBus, error)
NewEventBus creates a new CommandBus. Deprecated: use NewEventBusWithConfig instead.
func NewEventBusWithConfig ¶ added in v1.3.0
func NewEventBusWithConfig(publisher message.Publisher, config EventBusConfig) (*EventBus, error)
NewEventBusWithConfig creates a new EventBus.
type EventBusConfig ¶ added in v1.3.0
type EventBusConfig struct { // GeneratePublishTopic is used to generate topic name for publishing event. GeneratePublishTopic GenerateEventPublishTopicFn // OnPublish is called before sending the event. // The *message.Message can be modified. // // This option is not required. OnPublish OnEventSendFn // Marshaler is used to marshal and unmarshal events. // It is required. Marshaler CommandEventMarshaler // Logger instance used to log. // If not provided, watermill.NopLogger is used. Logger watermill.LoggerAdapter }
func (EventBusConfig) Validate ¶ added in v1.3.0
func (c EventBusConfig) Validate() error
type EventGroupProcessor ¶ added in v1.3.0
type EventGroupProcessor struct {
// contains filtered or unexported fields
}
EventGroupProcessor determines which EventHandler should handle event received from event bus. Compared to EventProcessor, EventGroupProcessor allows to have multiple handlers that share the same subscriber instance.
func NewEventGroupProcessorWithConfig ¶ added in v1.3.0
func NewEventGroupProcessorWithConfig(router *message.Router, config EventGroupProcessorConfig) (*EventGroupProcessor, error)
NewEventGroupProcessorWithConfig creates a new EventGroupProcessor.
func (*EventGroupProcessor) AddHandlersGroup ¶ added in v1.3.0
func (p *EventGroupProcessor) AddHandlersGroup(groupName string, handlers ...GroupEventHandler) error
AddHandlersGroup adds a new list of GroupEventHandler to the EventGroupProcessor and adds it to the router.
Compared to AddHandlers, AddHandlersGroup allows to have multiple handlers that share the same subscriber instance.
It's allowed to have multiple handlers for the same event type in one group, but we recommend to not do that. Please keep in mind that those handlers will be processed within the same message. If first handler succeeds and the second fails, the message will be re-delivered and the first will be re-executed.
Handlers group needs to be unique within the EventProcessor instance.
Handler group name is used as handler's name in router.
type EventGroupProcessorConfig ¶ added in v1.3.0
type EventGroupProcessorConfig struct { // GenerateSubscribeTopic is used to generate topic for subscribing to events for handler groups. // This option is required for EventProcessor if handler groups are used. GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn // SubscriberConstructor is used to create subscriber for GroupEventHandler. // This function is called for every events group once - thanks to that it's possible to have one subscription per group. // It's useful, when we are processing events from one stream and we want to do it in order. SubscriberConstructor EventGroupProcessorSubscriberConstructorFn // OnHandle is called before handling event. // OnHandle works in a similar way to middlewares: you can inject additional logic before and after handling an event. // // Because of that, you need to explicitly call params.Handler.Handle() to handle the event. // // func(params EventGroupProcessorOnHandleParams) (err error) { // // logic before handle // // (...) // // err := params.Handler.Handle(params.Message.Context(), params.Event) // // // logic after handle // // (...) // // return err // } // // This option is not required. OnHandle EventGroupProcessorOnHandleFn // AckOnUnknownEvent is used to decide if message should be acked if event has no handler defined. AckOnUnknownEvent bool // Marshaler is used to marshal and unmarshal events. // It is required. Marshaler CommandEventMarshaler // Logger instance used to log. // If not provided, watermill.NopLogger is used. Logger watermill.LoggerAdapter }
func (EventGroupProcessorConfig) Validate ¶ added in v1.3.0
func (c EventGroupProcessorConfig) Validate() error
type EventGroupProcessorGenerateSubscribeTopicFn ¶ added in v1.3.0
type EventGroupProcessorGenerateSubscribeTopicFn func(EventGroupProcessorGenerateSubscribeTopicParams) (string, error)
type EventGroupProcessorGenerateSubscribeTopicParams ¶ added in v1.3.0
type EventGroupProcessorGenerateSubscribeTopicParams struct { EventGroupName string EventGroupHandlers []GroupEventHandler }
type EventGroupProcessorOnHandleFn ¶ added in v1.3.0
type EventGroupProcessorOnHandleFn func(params EventGroupProcessorOnHandleParams) error
type EventGroupProcessorOnHandleParams ¶ added in v1.3.0
type EventGroupProcessorSubscriberConstructorFn ¶ added in v1.3.0
type EventGroupProcessorSubscriberConstructorFn func(EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error)
type EventGroupProcessorSubscriberConstructorParams ¶ added in v1.3.0
type EventGroupProcessorSubscriberConstructorParams struct { EventGroupName string EventGroupHandlers []GroupEventHandler }
type EventHandler ¶
type EventHandler interface { // HandlerName is the name used in message.Router while creating handler. // // It will be also passed to EventsSubscriberConstructor. // May be useful, for example, to create a consumer group per each handler. // // WARNING: If HandlerName was changed and is used for generating consumer groups, // it may result with **reconsuming all messages** !!! HandlerName() string NewEvent() any Handle(ctx context.Context, event any) error }
EventHandler receives events defined by NewEvent and handles them with its Handle method. If using DDD, CommandHandler may modify and persist the aggregate. It can also invoke a process manager, a saga or just build a read model.
In contrast to CommandHandler, every Event can have multiple EventHandlers.
One instance of EventHandler is used during handling messages. When multiple events are delivered at the same time, Handle method can be executed multiple times at the same time. Because of that, Handle method needs to be thread safe!
func NewEventHandler ¶ added in v1.3.0
func NewEventHandler[T any]( handlerName string, handleFunc func(ctx context.Context, event *T) error, ) EventHandler
NewEventHandler creates a new EventHandler implementation based on provided function and event type inferred from function argument.
type EventProcessor ¶
type EventProcessor struct {
// contains filtered or unexported fields
}
EventProcessor determines which EventHandler should handle event received from event bus.
func NewEventProcessor ¶
func NewEventProcessor( individualHandlers []EventHandler, generateTopic func(eventName string) string, subscriberConstructor EventsSubscriberConstructor, marshaler CommandEventMarshaler, logger watermill.LoggerAdapter, ) (*EventProcessor, error)
NewEventProcessor creates a new EventProcessor. Deprecated. Use NewEventProcessorWithConfig instead.
func NewEventProcessorWithConfig ¶ added in v1.3.0
func NewEventProcessorWithConfig(router *message.Router, config EventProcessorConfig) (*EventProcessor, error)
NewEventProcessorWithConfig creates a new EventProcessor.
func (*EventProcessor) AddHandlers ¶ added in v1.3.0
func (p *EventProcessor) AddHandlers(handlers ...EventHandler) error
AddHandlers adds a new EventHandler to the EventProcessor and adds it to the router.
func (EventProcessor) AddHandlersToRouter ¶
func (p EventProcessor) AddHandlersToRouter(r *message.Router) error
AddHandlersToRouter adds the EventProcessor's handlers to the given router. It should be called only once per EventProcessor instance.
It is required to call AddHandlersToRouter only if command processor is created with NewEventProcessor (disableRouterAutoAddHandlers is set to true). Deprecated: please migrate to event processor created by NewEventProcessorWithConfig.
func (EventProcessor) Handlers ¶
func (p EventProcessor) Handlers() []EventHandler
type EventProcessorConfig ¶ added in v1.3.0
type EventProcessorConfig struct { // GenerateSubscribeTopic is used to generate topic for subscribing to events. // If event processor is using handler groups, GenerateSubscribeTopic is used instead. GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn // SubscriberConstructor is used to create subscriber for EventHandler. // // This function is called for every EventHandler instance. // If you want to re-use one subscriber for multiple handlers, use GroupEventProcessor instead. SubscriberConstructor EventProcessorSubscriberConstructorFn // OnHandle is called before handling event. // OnHandle works in a similar way to middlewares: you can inject additional logic before and after handling an event. // // Because of that, you need to explicitly call params.Handler.Handle() to handle the event. // // func(params EventProcessorOnHandleParams) (err error) { // // logic before handle // // (...) // // err := params.Handler.Handle(params.Message.Context(), params.Event) // // // logic after handle // // (...) // // return err // } // // This option is not required. OnHandle EventProcessorOnHandleFn // AckOnUnknownEvent is used to decide if message should be acked if event has no handler defined. AckOnUnknownEvent bool // Marshaler is used to marshal and unmarshal events. // It is required. Marshaler CommandEventMarshaler // Logger instance used to log. // If not provided, watermill.NopLogger is used. Logger watermill.LoggerAdapter // contains filtered or unexported fields }
func (EventProcessorConfig) Validate ¶ added in v1.3.0
func (c EventProcessorConfig) Validate() error
type EventProcessorGenerateSubscribeTopicFn ¶ added in v1.3.0
type EventProcessorGenerateSubscribeTopicFn func(EventProcessorGenerateSubscribeTopicParams) (string, error)
type EventProcessorGenerateSubscribeTopicParams ¶ added in v1.3.0
type EventProcessorGenerateSubscribeTopicParams struct { EventName string EventHandler EventHandler }
type EventProcessorOnHandleFn ¶ added in v1.3.0
type EventProcessorOnHandleFn func(params EventProcessorOnHandleParams) error
type EventProcessorOnHandleParams ¶ added in v1.3.0
type EventProcessorOnHandleParams struct { Handler EventHandler Event any EventName string // Message is never nil and can be modified. Message *message.Message }
type EventProcessorSubscriberConstructorFn ¶ added in v1.3.0
type EventProcessorSubscriberConstructorFn func(EventProcessorSubscriberConstructorParams) (message.Subscriber, error)
type EventProcessorSubscriberConstructorParams ¶ added in v1.3.0
type EventProcessorSubscriberConstructorParams struct { HandlerName string EventHandler EventHandler }
type EventsSubscriberConstructor ¶ added in v0.4.0
type EventsSubscriberConstructor func(handlerName string) (message.Subscriber, error)
EventsSubscriberConstructor creates a subscriber for EventHandler. It allows you to create separated customized Subscriber for every command handler.
When handler groups are used, handler group is passed as handlerName. Deprecated: please use EventProcessorSubscriberConstructorFn instead.
type Facade
deprecated
type Facade struct {
// contains filtered or unexported fields
}
Deprecated: use CommandHandler and EventHandler instead.
Facade is a facade for creating the Command and Event buses and processors. It was created to avoid boilerplate, when using CQRS in the standard way. You can also create buses and processors manually, drawing inspiration from how it's done in NewFacade.
func NewFacade
deprecated
func NewFacade(config FacadeConfig) (*Facade, error)
Deprecated: use CommandHandler and EventHandler instead.
func (Facade) CommandBus ¶
func (f Facade) CommandBus() *CommandBus
func (Facade) CommandEventMarshaler ¶
func (f Facade) CommandEventMarshaler() CommandEventMarshaler
type FacadeConfig
deprecated
type FacadeConfig struct { // GenerateCommandsTopic generates topic name based on the command name. // Command name is generated by CommandEventMarshaler's Name method. // // It allows you to use topic per command or one topic for every command. GenerateCommandsTopic func(commandName string) string // CommandHandlers return command handlers which should be executed. CommandHandlers func(commandBus *CommandBus, eventBus *EventBus) []CommandHandler // CommandsPublisher is Publisher used to publish commands. CommandsPublisher message.Publisher // CommandsSubscriberConstructor is constructor for subscribers which will subscribe for messages. // It will be called for every command handler. // It allows you to create separated customized Subscriber for every command handler. CommandsSubscriberConstructor CommandsSubscriberConstructor // GenerateEventsTopic generates topic name based on the event name. // Event name is generated by CommandEventMarshaler's Name method. // // It allows you to use topic per command or one topic for every command. GenerateEventsTopic func(eventName string) string // EventHandlers return event handlers which should be executed. EventHandlers func(commandBus *CommandBus, eventBus *EventBus) []EventHandler // EventsPublisher is Publisher used to publish commands. EventsPublisher message.Publisher // EventsSubscriberConstructor is constructor for subscribers which will subscribe for messages. // It will be called for every event handler. // It allows you to create separated customized Subscriber for every event handler. EventsSubscriberConstructor EventsSubscriberConstructor // Router is a Watermill router, which will be used to handle events and commands. // Router handlers will be automatically generated by AddHandlersToRouter of Command and Event handlers. Router *message.Router CommandEventMarshaler CommandEventMarshaler Logger watermill.LoggerAdapter }
Deprecated: use CommandProcessor and EventProcessor instead.
func (FacadeConfig) CommandsEnabled ¶
func (c FacadeConfig) CommandsEnabled() bool
func (FacadeConfig) EventsEnabled ¶
func (c FacadeConfig) EventsEnabled() bool
func (FacadeConfig) Validate ¶
func (c FacadeConfig) Validate() error
type GenerateEventPublishTopicFn ¶ added in v1.3.0
type GenerateEventPublishTopicFn func(GenerateEventPublishTopicParams) (string, error)
type GenerateEventPublishTopicParams ¶ added in v1.3.0
type GroupEventHandler ¶ added in v1.3.0
type GroupEventHandler interface { NewEvent() interface{} Handle(ctx context.Context, event interface{}) error }
func NewGroupEventHandler ¶ added in v1.3.0
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler
NewGroupEventHandler creates a new GroupEventHandler implementation based on provided function and event type inferred from function argument.
type JSONMarshaler ¶
func (JSONMarshaler) Marshal ¶
func (m JSONMarshaler) Marshal(v interface{}) (*message.Message, error)
func (JSONMarshaler) Name ¶
func (m JSONMarshaler) Name(cmdOrEvent interface{}) string
func (JSONMarshaler) NameFromMessage ¶
func (m JSONMarshaler) NameFromMessage(msg *message.Message) string
type NoProtoMessageError ¶
type NoProtoMessageError struct {
// contains filtered or unexported fields
}
NoProtoMessageError is returned when the given value does not implement proto.Message.
func (NoProtoMessageError) Error ¶
func (e NoProtoMessageError) Error() string
type NonPointerError ¶
func (NonPointerError) Error ¶
func (e NonPointerError) Error() string
type OnEventSendFn ¶ added in v1.3.0
type OnEventSendFn func(params OnEventSendParams) error
type OnEventSendParams ¶ added in v1.3.0
type ProtobufMarshaler ¶
ProtobufMarshaler is the default Protocol Buffers marshaler.
func (ProtobufMarshaler) Marshal ¶
func (m ProtobufMarshaler) Marshal(v interface{}) (*message.Message, error)
Marshal marshals the given protobuf's message into watermill's Message.
func (ProtobufMarshaler) Name ¶
func (m ProtobufMarshaler) Name(cmdOrEvent interface{}) string
Name returns the command or event's name.
func (ProtobufMarshaler) NameFromMessage ¶
func (m ProtobufMarshaler) NameFromMessage(msg *message.Message) string
NameFromMessage returns the metadata name value for a given Message.