protobus

package module
v0.0.0-...-14080fc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2020 License: MIT Imports: 14 Imported by: 0

README

ProtoBus

ProtoBus is Command Bus like messaging built on top watermill.

The purpose is to give simpler API to work with watermill. Designed to have Command Bus like API.

Support Request-Reply pattern over messaging. Tested on RabbitMQ 3.8.3.

Documentation

Index

Constants

View Source
const (
	RoutingKeyRequestPrefix = "protobus.amqp.request."
	RoutingKeyReplyPrefix   = "protobus.amqp.reply."
)
View Source
const (
	// Header/Metadata keys which marks the reason and context why the message was deemed poisoned.
	HeaderDeadLetterReason      = "HeaderDeadLetterReason"
	HeaderDeadLetterStackTrace  = "DeadLetterStackTrace"
	HeaderDeadLetterSourceTopic = "DeadLetterSourceTopic"
	HeaderDeadLetterHandler     = "DeadLetterHandler"
	HeaderDeadLetterSubscriber  = "DeadLetterSubscriber"

	// Custom Header/Metadata
	HeaderReplyTo = "ProtoBusReplyTo"

	// Custom Context Keys
	ContextRouteId = "ProtoBusRouteId"
)

Variables

View Source
var ErrInvalidDeadLetterQueueTopic = errors.New("invalid dead letter queue topic")

ErrInvalidDeadLetterQueueTopic occurs when the topic supplied to the DeadLetterQueue constructor is invalid.

Functions

func DeadLetterQueue

func DeadLetterQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error)

DeadLetterQueue provides a middleware that salvages unprocessable messages and published them on a separate topic. The main middleware chain then continues on, business as usual.

func DeadLetterQueueWithFilter

func DeadLetterQueueWithFilter(pub message.Publisher, topic string, shouldGoToDeadLetterQueue func(err error) bool) (message.HandlerMiddleware, error)

DeadLetterQueueWithFilter is just like DeadLetterQueue, but accepts a function that decides which errors qualify for the dead letter queue.

func RandString

func RandString(n ...int) string

Types

type AMQPEndpoint

type AMQPEndpoint struct {
	// contains filtered or unexported fields
}

`PubSub` generator for given endpoint (uri) Should implement Endpoint, RPCServer and RPCCLient

func NewAMQPEndpoint

func NewAMQPEndpoint(uri, groupId string, logger watermill.LoggerAdapter) (*AMQPEndpoint, error)

Create new instance of AMQP endpoint. The AMQP Endpoint will create durable `PubSub` with `fanout` exchange type `uri` Required. AMQP connection string `groupId` Optional. Default: nil

If supplied, GroupId will be used to prefix the queue name (GroupId + ExchangeName). This is required
if you want to have competing consumers.

`logger` Optional. Default: watermill.StdLogger

func (*AMQPEndpoint) Publisher

func (endpoint *AMQPEndpoint) Publisher() (message.Publisher, error)

Return `Publisher` for this `Endpoint`. The `Publisher` will only created once and reused.

func (*AMQPEndpoint) RPCServerPublish

func (endpoint *AMQPEndpoint) RPCServerPublish(topic string, msg *message.Message) error

func (*AMQPEndpoint) RPCServerSubscriber

func (endpoint *AMQPEndpoint) RPCServerSubscriber() (message.Subscriber, error)

func (*AMQPEndpoint) SendAndWait

func (endpoint *AMQPEndpoint) SendAndWait(topic string, request *message.Message) (*message.Message, error)

func (*AMQPEndpoint) String

func (endpoint *AMQPEndpoint) String() string

func (*AMQPEndpoint) Subscriber

func (endpoint *AMQPEndpoint) Subscriber(name string) (message.Subscriber, error)

Return new `Subscriber` for given exchange, this will create new `queue` and bind it to the exchange if not yet available If `GroupId` is not empty then `queue` name will use `GroupId.ExchangeName`, otherwise will use `ExchangeName` as queue name.

type Application

type Application struct {
	// contains filtered or unexported fields
}

ProtoBus ProtoBusApplication. An facade that export only a fine grained property/functions, to simplified work using Watermill's CQRS.

func New

func New(endpoint Endpoint, marshaler marshaler.Marshaler, logger watermill.LoggerAdapter) *Application

Create new instance of bus for given endpoint `endpoint` Required. `logger` Optional. Default: watermill.StdLogger

func (*Application) OnEvent

func (app *Application) OnEvent(event string, handler func(Ctx) error) error

Event handler

func (*Application) OnRequest

func (app *Application) OnRequest(topic string, handler func(Ctx) (interface{}, error)) error

Request-reply pattern handler, return payload as reply is expected

func (*Application) Send

func (app *Application) Send(topic string, payload interface{}, metadata ...map[string]string) error

Publish message in 'fire & forget' manner

func (*Application) SendAndWait

func (app *Application) SendAndWait(topic string, payload interface{}, metadata ...map[string]string) (Ctx, error)

Send message and wait for reply

func (*Application) SetErrorHandler

func (app *Application) SetErrorHandler(eh *ErrorHandler)

Set ErrorHandler to use when error occurs. This ErrorHandler will set middlewares on handler level.

func (*Application) Start

func (app *Application) Start(context context.Context) error

Start the watermill router

type Ctx

type Ctx interface {
	// get payload
	Payload() []byte
	// get headers
	Headers() map[string]string
	// set header
	SetHeader(k string, v string)
	// get header
	Header(k string) string
	// unmarshal and bind to the param
	Parse(v interface{}) error
	// send the message to publisher
	Send(t string, v interface{}) error
	SendAndWait(t string, v interface{}, h ...map[string]string) (Ctx, error)
}

type Endpoint

type Endpoint interface {
	// Return new Subscriber for specified topic
	Subscriber(name string) (message.Subscriber, error)
	// Publisher for this endpoint
	Publisher() (message.Publisher, error)
	// Return human readable string that describe this endpoint
	String() string
}

type ErrorHandler

type ErrorHandler struct {
	// Retry middleware
	Retry *middleware.Retry
	// Dead Letter Queue (aka. Poison Queue)
	// Function to generate dead letter queue name, if it's nil then no dead letter will be configured.
	DeadLetterNameFunc func(topic string) string
	DeadLetterFilter   func(err error) bool
}

type RPCClientEndpoint

type RPCClientEndpoint interface {
	// Send request to given topic, wait and return reply
	SendAndWait(name string, request *message.Message) (reply *message.Message, err error)
}

type RPCServerEndpoint

type RPCServerEndpoint interface {
	// Return Subscriber for RPC Server listening to
	RPCServerSubscriber() (message.Subscriber, error)
	// Publish reply message
	RPCServerPublish(topic string, msg *message.Message) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL