Versions in this module Expand all Collapse all v1 v1.3.5 Jun 6, 2024 Changes in this version + var ErrOutputInNoPublisherHandler = errors.New("returned output messages in a handler without publisher") + func HandlerNameFromCtx(ctx context.Context) string + func PublishTopicFromCtx(ctx context.Context) string + func PublisherNameFromCtx(ctx context.Context) string + func SubscribeTopicFromCtx(ctx context.Context) string + func SubscriberNameFromCtx(ctx context.Context) string + type DuplicateHandlerNameError struct + HandlerName string + func (d DuplicateHandlerNameError) Error() string + type Handler struct + func (h *Handler) AddMiddleware(m ...HandlerMiddleware) + func (h *Handler) Started() chan struct{} + func (h *Handler) Stop() + func (h *Handler) Stopped() chan struct{} + type HandlerFunc func(msg *Message) ([]*Message, error) + var PassthroughHandler HandlerFunc = func(msg *Message) ([]*Message, error) { ... } + type HandlerMiddleware func(h HandlerFunc) HandlerFunc + type Message struct + Metadata Metadata + Payload Payload + UUID string + func NewMessage(uuid string, payload Payload) *Message + func (m *Message) Ack() bool + func (m *Message) Acked() <-chan struct{} + func (m *Message) Context() context.Context + func (m *Message) Copy() *Message + func (m *Message) Equals(toCompare *Message) bool + func (m *Message) Nack() bool + func (m *Message) Nacked() <-chan struct{} + func (m *Message) SetContext(ctx context.Context) + type Messages []*Message + func (m Messages) IDs() []string + type Metadata map[string]string + func (m Metadata) Get(key string) string + func (m Metadata) Set(key, value string) + type NoPublishHandlerFunc func(msg *Message) error + type Payload []byte + type Publisher interface + Close func() error + Publish func(topic string, messages ...*Message) error + type PublisherDecorator func(pub Publisher) (Publisher, error) + func MessageTransformPublisherDecorator(transform func(*Message)) PublisherDecorator + type Router struct + func NewRouter(config RouterConfig, logger watermill.LoggerAdapter) (*Router, error) + func (r *Router) AddHandler(handlerName string, subscribeTopic string, subscriber Subscriber, ...) *Handler + func (r *Router) AddMiddleware(m ...HandlerMiddleware) + func (r *Router) AddNoPublisherHandler(handlerName string, subscribeTopic string, subscriber Subscriber, ...) *Handler + func (r *Router) AddPlugin(p ...RouterPlugin) + func (r *Router) AddPublisherDecorators(dec ...PublisherDecorator) + func (r *Router) AddSubscriberDecorators(dec ...SubscriberDecorator) + func (r *Router) Close() error + func (r *Router) Handlers() map[string]HandlerFunc + func (r *Router) IsClosed() bool + func (r *Router) IsRunning() bool + func (r *Router) Logger() watermill.LoggerAdapter + func (r *Router) Run(ctx context.Context) (err error) + func (r *Router) RunHandlers(ctx context.Context) error + func (r *Router) Running() chan struct{} + type RouterConfig struct + CloseTimeout time.Duration + func (c RouterConfig) Validate() error + type RouterPlugin func(*Router) error + type SubscribeInitializer interface + SubscribeInitialize func(topic string) error + type Subscriber interface + Close func() error + Subscribe func(ctx context.Context, topic string) (<-chan *Message, error) + type SubscriberDecorator func(sub Subscriber) (Subscriber, error) + func MessageTransformSubscriberDecorator(transform func(*Message)) SubscriberDecorator