Documentation
¶
Index ¶
- Variables
- func HandlerNameFromCtx(ctx context.Context) string
- func PublisherNameFromCtx(ctx context.Context) string
- func SubscriberNameFromCtx(ctx context.Context) string
- type DuplicateHandlerNameError
- type HandlerFunc
- type HandlerMiddleware
- type Message
- func (m *Message) Ack() bool
- func (m *Message) Acked() <-chan struct{}
- func (m *Message) Context() context.Context
- func (m *Message) Copy() *Message
- func (m *Message) Equals(toCompare *Message) bool
- func (m *Message) Nack() bool
- func (m *Message) Nacked() <-chan struct{}
- func (m *Message) SetContext(ctx context.Context)
- type Messages
- type Metadata
- type Payload
- type PubSub
- type Publisher
- type PublisherDecorator
- type Router
- func (r *Router) AddHandler(handlerName string, subscribeTopic string, subscriber Subscriber, ...)
- func (r *Router) AddMiddleware(m ...HandlerMiddleware)
- func (r *Router) AddNoPublisherHandler(handlerName string, subscribeTopic string, subscriber Subscriber, ...)
- func (r *Router) AddPlugin(p ...RouterPlugin)
- func (r *Router) AddPublisherDecorators(dec ...PublisherDecorator)
- func (r *Router) AddSubscriberDecorators(dec ...SubscriberDecorator)
- func (r *Router) Close() error
- func (r *Router) Logger() watermill.LoggerAdapter
- func (r *Router) Run() (err error)
- func (r *Router) Running() chan struct{}
- type RouterConfig
- type RouterPlugin
- type SubscribeInitializer
- type Subscriber
- type SubscriberDecorator
Constants ¶
This section is empty.
Variables ¶
var ( // ErrOutputInNoPublisherHandler happens when a handler func returned some messages in a no-publisher handler. // todo: maybe change the handler func signature in no-publisher handler so that there's no possibility for this ErrOutputInNoPublisherHandler = errors.New("returned output messages in a handler without publisher") )
Functions ¶
func HandlerNameFromCtx ¶ added in v0.3.0
func PublisherNameFromCtx ¶ added in v0.3.0
func SubscriberNameFromCtx ¶ added in v0.3.0
Types ¶
type DuplicateHandlerNameError ¶ added in v0.3.0
type DuplicateHandlerNameError struct {
HandlerName string
}
func (DuplicateHandlerNameError) Error ¶ added in v0.3.0
func (d DuplicateHandlerNameError) Error() string
type HandlerFunc ¶
HandlerFunc is function called when message is received.
msg.Ack() is called automatically when HandlerFunc doesn't return error. When HandlerFunc returns error, msg.Nack() is called. When msg.Ack() was called in handler and HandlerFunc returns error, msg.Nack() will be not sent because Ack was already sent.
HandlerFunc's are executed parallel when multiple messages was received (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).
type HandlerMiddleware ¶
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
HandlerMiddleware allows us to write something like decorators to HandlerFunc. It can execute something before handler (for example: modify consumed message) or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
It can be attached to the router by using `AddMiddleware` method.
Example:
func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc { return func(message *message.Message) ([]*message.Message, error) { fmt.Println("executed before handler") producedMessages, err := h(message) fmt.Println("executed after handler") return producedMessages, err } }
type Message ¶
type Message struct { // UUID is an unique identifier of message. // // It is only used by Watermill for debugging. // UUID can be empty. UUID string // Metadata contains the message metadata. // // Can be used to store data which doesn't require unmarshaling entire payload. // It is something similar to HTTP request's headers. // // Metadata is marshaled and will be saved to PubSub. Metadata Metadata // Payload is message's payload. Payload Payload // contains filtered or unexported fields }
func NewMessage ¶
func (*Message) Ack ¶
Ack sends message's acknowledgement.
Ack is not blocking. Ack is idempotent. False is returned, if Nack is already sent.
func (*Message) Acked ¶
func (m *Message) Acked() <-chan struct{}
Acked returns channel which is closed when acknowledgement is sent.
Usage:
select { case <-message.Acked(): // ack received case <-message.Nacked(): // nack received }
func (*Message) Context ¶ added in v0.2.0
Context returns the message's context. To change the context, use SetContext.
The returned context is always non-nil; it defaults to the background context.
func (*Message) Copy ¶ added in v0.2.0
Copy copies all message without Acks/Nacks. The context is not propagated to the copy.
func (*Message) Equals ¶ added in v0.2.0
Equals compare, that two messages are equal. Acks/Nacks are not compared.
func (*Message) Nack ¶
Nack sends message's negative acknowledgement.
Nack is not blocking. Nack is idempotent. False is returned, if Ack is already sent.
func (*Message) Nacked ¶
func (m *Message) Nacked() <-chan struct{}
Nacked returns channel which is closed when negative acknowledgement is sent.
Usage:
select { case <-message.Acked(): // ack received case <-message.Nacked(): // nack received }
func (*Message) SetContext ¶ added in v0.2.0
SetContext sets provided context to the message.
type PubSub ¶
type PubSub interface { Close() error // contains filtered or unexported methods }
func NewPubSub ¶
func NewPubSub(publisher Publisher, subscriber Subscriber) PubSub
type Publisher ¶
type Publisher interface { // Close should flush unsent messages, if publisher is async. Close() error // contains filtered or unexported methods }
type PublisherDecorator ¶ added in v0.3.0
PublisherDecorator wraps the underlying Publisher, adding some functionality.
func MessageTransformPublisherDecorator ¶ added in v0.3.0
func MessageTransformPublisherDecorator(transform func(*Message)) PublisherDecorator
MessageTransformPublisherDecorator creates a publisher decorator that calls transform on each message that passes through the publisher.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
func NewRouter ¶
func NewRouter(config RouterConfig, logger watermill.LoggerAdapter) (*Router, error)
func (*Router) AddHandler ¶
func (r *Router) AddHandler( handlerName string, subscribeTopic string, subscriber Subscriber, publishTopic string, publisher Publisher, handlerFunc HandlerFunc, )
AddHandler adds a new handler.
handlerName must be unique. For now, it is used only for debugging.
subscribeTopic is a topic from which handler will receive messages.
publishTopic is a topic to which router will produce messages returned by handlerFunc. When handler needs to publish to multiple topics, it is recommended to just inject Publisher to Handler or implement middleware which will catch messages and publish to topic based on metadata for example.
pubSub is PubSub from which messages will be consumed and to which created messages will be published. If you have separated Publisher and Subscriber object, you can create PubSub object by calling message.NewPubSub(publisher, subscriber).
func (*Router) AddMiddleware ¶
func (r *Router) AddMiddleware(m ...HandlerMiddleware)
AddMiddleware adds a new middleware to the router.
The order of middlewares matters. Middleware added at the beginning is executed first.
func (*Router) AddNoPublisherHandler ¶
func (r *Router) AddNoPublisherHandler( handlerName string, subscribeTopic string, subscriber Subscriber, handlerFunc HandlerFunc, )
AddNoPublisherHandler adds a new handler. This handler cannot return messages. When message is returned it will occur an error and Nack will be sent.
handlerName must be unique. For now, it is used only for debugging.
subscribeTopic is a topic from which handler will receive messages.
subscriber is Subscriber from which messages will be consumed.
func (*Router) AddPlugin ¶
func (r *Router) AddPlugin(p ...RouterPlugin)
func (*Router) AddPublisherDecorators ¶ added in v0.3.0
func (r *Router) AddPublisherDecorators(dec ...PublisherDecorator)
AddPublisherDecorators wraps the router's Publisher. The first decorator is the innermost, i.e. calls the original publisher.
func (*Router) AddSubscriberDecorators ¶ added in v0.3.0
func (r *Router) AddSubscriberDecorators(dec ...SubscriberDecorator)
AddSubscriberDecorators wraps the router's Subscriber. The first decorator is the innermost, i.e. calls the original subscriber.
func (*Router) Logger ¶
func (r *Router) Logger() watermill.LoggerAdapter
func (*Router) Run ¶
Run runs all plugins and handlers and starts subscribing to provided topics. This call is blocking while the router is running.
When all handlers have stopped (for example, because subscriptions were closed), the router will also stop.
To stop Run() you should call Close() on the router.
When all handlers are stopped (for example: because of closed connection), Run() will be also stopped.
type RouterConfig ¶
type RouterConfig struct { // CloseTimeout determines how long router should work for handlers when closing. CloseTimeout time.Duration }
func (RouterConfig) Validate ¶
func (c RouterConfig) Validate() error
type RouterPlugin ¶
RouterPlugin is function which is executed on Router start.
type SubscribeInitializer ¶ added in v0.3.0
type SubscribeInitializer interface { // SubscribeInitialize can be called to initialize subscribe before consume. // When calling Subscribe before Publish, SubscribeInitialize should be not required. // // Not every Pub/Sub requires this initialize and it may be optional for performance improvements etc. // For detailed SubscribeInitialize functionality, please check Pub/Subs godoc. // // Implementing SubscribeInitialize is not obligatory. SubscribeInitialize(topic string) error }
type Subscriber ¶
type Subscriber interface { // Close closes all subscriptions with their output channels and flush offsets etc. when needed. Close() error // contains filtered or unexported methods }
type SubscriberDecorator ¶ added in v0.3.0
type SubscriberDecorator func(sub Subscriber) (Subscriber, error)
SubscriberDecorator wraps the underlying Subscriber, adding some functionality.
func MessageTransformSubscriberDecorator ¶ added in v0.3.0
func MessageTransformSubscriberDecorator(transform func(*Message)) SubscriberDecorator
MessageTransformSubscriberDecorator creates a subscriber decorator that calls transform on each message that passes through the subscriber.