Documentation
¶
Index ¶
- Constants
- Variables
- func DeadLetterQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error)
- func DeadLetterQueueWithFilter(pub message.Publisher, topic string, ...) (message.HandlerMiddleware, error)
- func RandString(n ...int) string
- type AMQPEndpoint
- func (endpoint *AMQPEndpoint) Publisher() (message.Publisher, error)
- func (endpoint *AMQPEndpoint) RPCServerPublish(topic string, msg *message.Message) error
- func (endpoint *AMQPEndpoint) RPCServerSubscriber() (message.Subscriber, error)
- func (endpoint *AMQPEndpoint) SendAndWait(topic string, request *message.Message) (*message.Message, error)
- func (endpoint *AMQPEndpoint) String() string
- func (endpoint *AMQPEndpoint) Subscriber(name string) (message.Subscriber, error)
- type Application
- func (app *Application) OnEvent(event string, handler func(Ctx) error) error
- func (app *Application) OnRequest(topic string, handler func(Ctx) (interface{}, error)) error
- func (app *Application) Send(topic string, payload interface{}, metadata ...map[string]string) error
- func (app *Application) SendAndWait(topic string, payload interface{}, metadata ...map[string]string) (Ctx, error)
- func (app *Application) SetErrorHandler(eh *ErrorHandler)
- func (app *Application) Start(context context.Context) error
- type Ctx
- type Endpoint
- type ErrorHandler
- type RPCClientEndpoint
- type RPCServerEndpoint
Constants ¶
const ( RoutingKeyRequestPrefix = "protobus.amqp.request." RoutingKeyReplyPrefix = "protobus.amqp.reply." )
const ( // Header/Metadata keys which marks the reason and context why the message was deemed poisoned. HeaderDeadLetterReason = "HeaderDeadLetterReason" HeaderDeadLetterStackTrace = "DeadLetterStackTrace" HeaderDeadLetterSourceTopic = "DeadLetterSourceTopic" HeaderDeadLetterHandler = "DeadLetterHandler" HeaderDeadLetterSubscriber = "DeadLetterSubscriber" // Custom Header/Metadata HeaderReplyTo = "ProtoBusReplyTo" // Custom Context Keys ContextRouteId = "ProtoBusRouteId" )
Variables ¶
var ErrInvalidDeadLetterQueueTopic = errors.New("invalid dead letter queue topic")
ErrInvalidDeadLetterQueueTopic occurs when the topic supplied to the DeadLetterQueue constructor is invalid.
Functions ¶
func DeadLetterQueue ¶
DeadLetterQueue provides a middleware that salvages unprocessable messages and published them on a separate topic. The main middleware chain then continues on, business as usual.
func DeadLetterQueueWithFilter ¶
func DeadLetterQueueWithFilter(pub message.Publisher, topic string, shouldGoToDeadLetterQueue func(err error) bool) (message.HandlerMiddleware, error)
DeadLetterQueueWithFilter is just like DeadLetterQueue, but accepts a function that decides which errors qualify for the dead letter queue.
func RandString ¶
Types ¶
type AMQPEndpoint ¶
type AMQPEndpoint struct {
// contains filtered or unexported fields
}
`PubSub` generator for given endpoint (uri) Should implement Endpoint, RPCServer and RPCCLient
func NewAMQPEndpoint ¶
func NewAMQPEndpoint(uri, groupId string, logger watermill.LoggerAdapter) (*AMQPEndpoint, error)
Create new instance of AMQP endpoint. The AMQP Endpoint will create durable `PubSub` with `fanout` exchange type `uri` Required. AMQP connection string `groupId` Optional. Default: nil
If supplied, GroupId will be used to prefix the queue name (GroupId + ExchangeName). This is required if you want to have competing consumers.
`logger` Optional. Default: watermill.StdLogger
func (*AMQPEndpoint) Publisher ¶
func (endpoint *AMQPEndpoint) Publisher() (message.Publisher, error)
Return `Publisher` for this `Endpoint`. The `Publisher` will only created once and reused.
func (*AMQPEndpoint) RPCServerPublish ¶
func (endpoint *AMQPEndpoint) RPCServerPublish(topic string, msg *message.Message) error
func (*AMQPEndpoint) RPCServerSubscriber ¶
func (endpoint *AMQPEndpoint) RPCServerSubscriber() (message.Subscriber, error)
func (*AMQPEndpoint) SendAndWait ¶
func (*AMQPEndpoint) String ¶
func (endpoint *AMQPEndpoint) String() string
func (*AMQPEndpoint) Subscriber ¶
func (endpoint *AMQPEndpoint) Subscriber(name string) (message.Subscriber, error)
Return new `Subscriber` for given exchange, this will create new `queue` and bind it to the exchange if not yet available If `GroupId` is not empty then `queue` name will use `GroupId.ExchangeName`, otherwise will use `ExchangeName` as queue name.
type Application ¶
type Application struct {
// contains filtered or unexported fields
}
ProtoBus ProtoBusApplication. An facade that export only a fine grained property/functions, to simplified work using Watermill's CQRS.
func New ¶
func New(endpoint Endpoint, marshaler marshaler.Marshaler, logger watermill.LoggerAdapter) *Application
Create new instance of bus for given endpoint `endpoint` Required. `logger` Optional. Default: watermill.StdLogger
func (*Application) OnEvent ¶
func (app *Application) OnEvent(event string, handler func(Ctx) error) error
Event handler
func (*Application) OnRequest ¶
func (app *Application) OnRequest(topic string, handler func(Ctx) (interface{}, error)) error
Request-reply pattern handler, return payload as reply is expected
func (*Application) Send ¶
func (app *Application) Send(topic string, payload interface{}, metadata ...map[string]string) error
Publish message in 'fire & forget' manner
func (*Application) SendAndWait ¶
func (app *Application) SendAndWait(topic string, payload interface{}, metadata ...map[string]string) (Ctx, error)
Send message and wait for reply
func (*Application) SetErrorHandler ¶
func (app *Application) SetErrorHandler(eh *ErrorHandler)
Set ErrorHandler to use when error occurs. This ErrorHandler will set middlewares on handler level.
type Ctx ¶
type Ctx interface { // get payload Payload() []byte // get headers Headers() map[string]string // set header SetHeader(k string, v string) // get header Header(k string) string // unmarshal and bind to the param Parse(v interface{}) error // send the message to publisher Send(t string, v interface{}) error SendAndWait(t string, v interface{}, h ...map[string]string) (Ctx, error) }