amqprpc

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2021 License: MIT Imports: 18 Imported by: 7

README

AMQP RPC
RabbitMQ RPC

RabbitMQ RPC Framework

GoDoc

Description

This is a framework to use RabbitMQ as a client/server RPC setup togheter with the Go amqp implementation. The framework can manage a fully funcitonal message queue setup with reconnects, disconnects, graceful shutdown and other stability mechanisms. By providing this RabbitMQ can be used as transport and service discovery to quickly get up and running with a micro service architecture.

Since the framework handles the majority of the communication with RabbitMQ and the Go library the user does not need to know details about those systems. However, since a few interfaces exposes the Go package types and that the nomenclature is unique for RabbitMQ some prior experience is preferred.

Project status

This project has been used in production since october 2018 handling millions of requests both as server and client.

Server

The server is inspired by the HTTP library where the user maps a RabbitMQ binding to a handler function. A response writer is passed to the handler which may be used as an io.Writer to write the response.

This is an an example of how to get up and running with a server responding to all messages published to the given routing key.

server := NewServer("amqp://guest:guest@localhost:5672")

server.Bind(DirectBinding("routing_key", func(c context.Context, rw *ResponseWriter d *amqp.Delivery) {
    // Print what the body and header was
    fmt.Println(d.Body, d.Headers)

    // Add a response to the publisher
    fmt.Fprint(rw, "Handled")
}))

server.ListenAndServe()

The example above uses a DirectBinding but all the supported bindings are provided via an interface where the exchange type will be set to the proper type.

server.Bind(DirectBinding("routing_key", handleFunc))
server.Bind(FanoutBinding("fanout-exchange", handleFunc))
server.Bind(TopicBinding("queue-name", "routing_key.#", handleFunc))
server.Bind(HeadersBinding("queue-name", amqp.Table{"x-match": "all", "foo": "bar"}, handleFunc))

If the default variables doesn't result in the desired result you can setup the binding with the type manually.

customBinding := HandlerBinding{
    QueueName:    "oh-sweet-queue",
    ExchangeName: "my-exchange",
    ExchangeType: ExchangeDirect,
    RoutingKey:   "my-key",
    BindHeaders:  amqp.Table{},
    Handler:      handleFunc,
}

server.Bind(customBinding)

The server will not connect until ListenAndServe() is being called. This means that you can configure settings for the server until that point. The settings can be changed by calling chainable methods.

server := NewServer("amqp://guest:guest@localhost:5672").
    WithConsumeSettings(ConsumeSettings{}).
    WithQueueDeclareSettings(QueueDeclareSettings{}).
    WithExchangeDeclareSettings(ExchangeDeclareSettings{}).
    WithDebugLogger(log.Printf).
    WithErrorLogger(log.Printf).
    WithDialConfig(amqp.Config{}).
    WithTLS(&tls.Config{})

QoS is by default set to a prefetch count of 10 and a prefetch size of 0 (no limit). If you want to change this you can use the WithConsumeSettings(settings) function.

Client

The client is built around channels to be able to handle as many requests as possible without the need to setup multiple clients. All messages published get a unique correlation ID which is used to determine where a response should be passed no matter what order it's received.

The client takes a Request as input which can hold all required information about how to route and handle the message and response.

client := NewClient("amqp://guest:guest@localhost:5672")

request := NewRequest().
    WithRoutingKey("routing_key").
    WithBody("This is my body)

response, err := client.Send(request)
if err != nil {
    log.Fatal(err.Error())
}

log.Print(string(response.Body))

The client will not connect while being created, instead this happens when the first request is being published (while calling Send()). This allows you to configure connection related parameters such as timeout by chaining the methods.

// Set timeout after NewClient is invoked by chaining.
client := NewClient("amqp://guest:guest@localhost:5672").
    WithTimeout(5000 * time.Milliseconds)

// Will not connect and may be changed untill this call.
client.Send(NewRequest().WithRoutingKey("routing_key"))

Example of available methods for chaining.

client := NewClient("amqp://guest:guest@localhost:5672").
    WithDebugLogger(log.Printf).
    WithErrorLogger(log.Printf).
    WithDialConfig(amqp.Config{}).
    WithTLS(&tls.Config{}).
    WithQueueDeclareSettings(QueueDeclareSettings{}).
    WithConsumeSettings(ConsumeSettings{}).
    WithPublishSettings(PublishSettings{}).
    WithConfirmMode(true),
    WithTimeout(10 * Time.Second)

Confirm mode

Confirm mode can be set on the client and will make the client wait for Ack from the amqp-server. This makes sending requests more reliable at the cost of some performance as each publishing must be confirmed by the amqp-server Your can read more here

The client is set in confirm mode by default.

You can use WithPublishSettings or WithConfirmMode to control this setting.

client := NewClient("amqp://guest:guest@localhost:5672").
    WithConfirmMode(true)

client := NewClient("amqp://guest:guest@localhost:5672").
    WithPublishSettings(true)

Request

The Request type is used as input to the clients send function and holds all the information about how to route and handle the request. All the properties may be set with chainable methods to make it easy to construct. A Request may be re-used as many times as desired.

request := NewRequest().
    WithBody(`{"hello":"world"}`).
    WithContentType("application/json").
    WithContext(context.TODO()).
    WithExchange("custom.exchange").
    WithRoutingKey("routing_key").
    WithHeaders(amqp.Headers{}).
    WithCorrelationID("custom-correlation-id").
    WithTimeout(5 * time.Second).
    WithResponse(true)

By default a context.Background() will be added and WithResponse() will be set to true.

WithTimeout will also set the Expiration on the publishing since there is no point of handling the message after the timeout has expired. Setting WithResponse(false) will ensure that no Expiration is set.

The Request also implements the io.Writer interface which makes it possible to use directly like that.

request := NewRequest()

err := json.NewEncoder(request).Encode(serializableObject)
if err != nil {
    panic(err)
}

Note: If you request a response when sending to a fanout exchange the response will be the first one responded from any of the subscribers. There's currently no way to stream multiple responses for the same request.

Sender

The client invokes a default SendFunc while calling Send() where all the RabbitMQ communication is handled and the message is published. The sender is attached to a public field (Sender) and may be overridden. This enables you to handle unit testing without the need to implement an interface. This framework comes with a package named amqprpctest which helps you create a client with your own custom send function.

unitTestSendFunc := func(r *Request) (*amqp.Delivery, error) {
    fmt.Println("will not connect or publish anything")

    mockResponse := &amqp.Delivery{
        Body: []byte("this is my mock response")
    }

    return mockResponse, nil
}

client := amqprpctest.NewTestClient(unitTestSendFunc)

Middlewares

Both the client and the server aim to only handle the connectivity and routing within the RPC setup. This means that there's no code interacting with the request or the responses before or after a request is being published or received. To handle this in a dynamic way and allow the user to affect as much as possible for each request both the server and the client is built around middlewares.

Server middlewares

Server middlewares can be hooked both to a specific handler or to all messages for the entire server. Middlewares can be chained to be executed in a specific order.

The middleware is inspired by the HTTP library where the middleware is defined as a function that takes a handler function as input and returns the same handler func. This makes it possible to add middlewares in any order.

The handler function HandlerFunc takes a context, a response writer and a delivery as input not returning anything. The ServerMiddlewareFunc thus takes this type as input and returns the same type.

type HandlerFunc func(context.Context, *ResponseWriter, amqp.Delivery)

type ServerMiddlewareFunc func(next HandlerFunc) HandlerFunc

To execute the next handler just call the received function.

server := NewServer("amqp://guest:guest@localhost:5672")

// Define a custom middleware to use for the server.
func myMiddleware(next HandlerFunc) HandlerFunc {
    // Preinitialization of middleware here.

    return func(ctx context.Context, rw *ResponseWriter d amqp.Delivery) {
        fmt.Println("this will be executed before the actual handler")

        // Execute the handler.
        next(ctx, rw, d)

        fmt.Println("this will be executed after the actual handler")
    }
}

// Add a middleware to specific handler.
server.Bind(DirectBinding("foobar", myMiddleware(HandlerFunc)))

// Add multiple middlewares to specific handler by chainging them.
server.Bind(
    DirectBinding(
        "foobar",
        ServerMiddlewareChain(
            myHandler,
            middlewareOne,
            middlewareTwo,
            middlewareThree,
        ),
    )
)

// Add middleware to all handlers on the server.
server.AddMiddleware(myMiddleware)

server.ListenAndServe()

Note that an example of how to handle panic recovery with a middleware if a handler would panic is located in the middleware folder in this project.

Client middlewares

The client supports middlewares which may be executed before or after a request has been published. This is a great way to i.e enrich headers or handle errors returned.

The sender function SendFunc takes a Request as input and returns an amqp.Delivery pointer and an error. The ClientMiddlewareFunc thus takes this type as input and returns the same type.

type SendFunc func(r *Request) (d *amqp.Delivery, e error)

type ClientMiddlewareFunc func(next SendFunc) SendFunc

A middleware can be added to the client to be executed for all requests or attached to the Request type instead.

func myMiddleware(next SendFunc) SendFunc {
    return func(r *Request) (*amqp.Delivery, error) (
        r.Publishing.Headers["enriched-header"] = "yes"
        r.Publishing.AppId = "my-app"

        return next(r)
    }
}

// Add the middleware to all request made with the client.
client := NewClient("amqp://guest:guest@localhost:5672").
    AddMiddleware(myMiddleware)

// Add the middleware to a singlerequest
reuqest := NewRequest().
    WithRoutingKey("routing_key").
    AddMiddleware(myMiddleware)

client.Send(request)

Due to the way middlewares are added and chained the middlewares attached to the request will always be executed after the middlewares attached to the client.

For more examples of client middlewares, see examples/middleware.

Connection hooks

You often want to know when a connection has been established and when it comes to RabbitMQ also perform some post connection setup. This is enabled by the fact that both the server and the client holds a list of OnStarted. The function receives the incomming connection, outgoing connection, incomming channel and outgoing channel.

type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)

As an example this is a great place to do some initial QoS setup.

server := NewServer("amqp://guest:guest@localhost:5672")

setupQoS(_, _ *amqp.Connection, inChan, _ *amqp.Channel) {
    err := inChan.Qos(
        10,   // Prefetch count
        1024, // Prefetch size
        true, // Global
    )

    if err != nil {
        panic(err.Error())
    }
}

// Setup QoS when the connection is established.
server.OnStarted(setupQoS)

server.ListenAndServe()

Both the server and the client follow the recommendations for RabbitMQ connections which means separate connections for incomming and outgoing traffic and separate channels for consuming and publishing messages. Because of this the signature looks the same way for both the server and the client.

TLS

Since this frameworks main responsibility is to handle connections it's shipped with a type named Certificates which can hold the RabbitMQ message bus CA and the clients certificate and private key. This is just a convenient way to keep track of certificates to use but it also implements a method named TLSConfig() which returns a *tls.Config. This may then be used to connect with a secured protocol (AMQPS) to the message bus.

certificates := Certificates{
    CA: "/path/to/rootCA.pem",
}

// Now we can get the TLS configuration to use for the client and server.
uri := "amqps://guest:guest@localhost:5671"

server := NewServer(uri).WithTLS(certificates.TLSConfig())
client := NewClient(uri).WithTLS(certificates.TLSConfig())

server.ListenAndServe()

Logging

You can specifiy two optional loggers for debugging and errors or unexpected behaviour. By default only error logging is turned on and is logged via the log package's standard logging.

You can provide your own logging function for both error and debug on both the client and the server.

debugLogger := log.New(os.Stdout, "DEBUG - ", log.LstdFlags)
errorLogger := log.New(os.Stdout, "ERROR - ", log.LstdFlags)

server := NewServer(url).
    WithErrorLogger(errorLogger.Printf).
    WithDebugLogger(debugLogger.Printf)

client := NewClient(url).
    WithErrorLogger(errorLogger.Printf).
    WithDebugLogger(debugLogger.Printf)

This is perfect when using a logger which supports debugging as a separate method such as the logrus logger which has Debugf and Errorf methods.

Examples

There are a few examples included in the examples folder. For more information about how to customize your setup, see the documentation.

Documentation

Index

Constants

View Source
const (
	ExchangeDirect  = "direct"
	ExchangeFanout  = "fanout"
	ExchangeTopic   = "topic"
	ExchangeHeaders = "headers"
)

Exchanges are enteties where messages are published. This defines the available entities based on https://www.rabbitmq.com/tutorials/amqp-concepts.html.

View Source
const (
	// CtxQueueName can be used to get the queue name from the context.Context
	// inside the HandlerFunc.
	CtxQueueName ctxKey = "queue_name"
)

Variables

View Source
var (
	// ErrRequestReturned can be returned by Client#Send() when the server
	// returns the message. For example when mandatory is set but the message
	// can not be routed.
	ErrRequestReturned = errors.New("publishing returned")

	// ErrRequestRejected can be returned by Client#Send() when the server Nacks
	// the message. This can happen if there is some problem inside the amqp
	// server. To check if the error returned is a ErrRequestReturned error, use
	// errors.Is(err, ErrRequestRejected).
	ErrRequestRejected = errors.New("publishing Nacked")

	// ErrRequestTimeout is an error returned when a client request does not
	// receive a response within the client timeout duration. To check if the
	// error returned is an ErrRequestTimeout error, use errors.Is(err,
	// ErrRequestTimeout).
	ErrRequestTimeout = errors.New("request timed out")
)
View Source
var ErrUnexpectedConnClosed = errors.New("unexpected connection close without specific error")

ErrUnexpectedConnClosed is returned by ListenAndServe() if the server shuts down without calling Stop() and if AMQP does not give an error when said shutdown happens.

Functions

func DefaultDialer

func DefaultDialer(network, addr string) (net.Conn, error)

DefaultDialer is the RPC server default implementation of a dialer.

Types

type AwareAcknowledger added in v0.10.1

type AwareAcknowledger struct {
	Acknowledger amqp.Acknowledger
	Handled      bool
}

AwareAcknowledger implements the amqp.Acknowledger interface with the addition that it can tell if a message has been acked in any way.

func NewAwareAcknowledger added in v0.10.1

func NewAwareAcknowledger(acknowledger amqp.Acknowledger) *AwareAcknowledger

NewAwareAcknowledger returns the passed acknowledger as an AwareAcknowledger.

func (*AwareAcknowledger) Ack added in v0.10.1

func (a *AwareAcknowledger) Ack(tag uint64, multiple bool) error

Ack passes the Ack down to the underlying Acknowledger.

func (*AwareAcknowledger) Nack added in v0.10.1

func (a *AwareAcknowledger) Nack(tag uint64, multiple, requeue bool) error

Nack passes the Nack down to the underlying Acknowledger.

func (*AwareAcknowledger) Reject added in v0.10.1

func (a *AwareAcknowledger) Reject(tag uint64, requeue bool) error

Reject passes the Reject down to the underlying Acknowledger.

type Certificates

type Certificates struct {
	Cert string
	Key  string
	CA   string
}

Certificates represents the certificate, the key and the CA to use when using RabbitMQ with TLS or the certificate and key when using as TLS configuration for RPC server. The fields should be the path to files stored on disk and will be passed to ioutil.ReadFile and tls.LoadX509KeyPair.

func (*Certificates) TLSConfig

func (c *Certificates) TLSConfig() *tls.Config

TLSConfig will return a *tls.Config type based on the files set in the Certificates type.

type Client

type Client struct {

	// Sender is the main send function called after all middlewares has been
	// chained and called. This field can be overridden to simplify testing.
	Sender SendFunc
	// contains filtered or unexported fields
}

Client represents an AMQP client used within a RPC framework. This client can be used to communicate with RPC servers.

func NewClient

func NewClient(url string) *Client

NewClient will return a pointer to a new Client. There are two ways to manage the connection that will be used by the client (i.e. when using TLS).

The first one is to use the Certificates type and just pass the filenames to the client certificate, key and the server CA. If this is done the function will handle the reading of the files.

It is also possible to create a custom amqp.Config with whatever configuration desired and that will be used as dial configuration when connection to the message bus.

func (*Client) AddMiddleware

func (c *Client) AddMiddleware(m ClientMiddlewareFunc) *Client

AddMiddleware will add a middleware which will be executed on request.

func (*Client) OnStarted

func (c *Client) OnStarted(f OnStartedFunc)

OnStarted can be used to hook into the connections/channels that the client is using. This can be useful if you want more control over amqp directly. Note that since the client is lazy and won't connect until the first .Send() the provided OnStartedFunc won't be called until then. Also note that this is blocking and the client won't continue it's startup until this function has finished executing.

client := NewClient(url)
client.OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) {
	// Do something with amqp connections/channels.
})

func (*Client) Send

func (c *Client) Send(r *Request) (*amqp.Delivery, error)

Send will send a Request by using a amqp.Publishing.

func (*Client) Stop

func (c *Client) Stop()

Stop will gracefully disconnect from AMQP. It is not guaranteed that all in flight requests or responses are handled before the disconnect. Instead the user should ensure that all calls to c.Send() has returned before calling c.Stop().

func (*Client) WithConfirmMode added in v0.11.0

func (c *Client) WithConfirmMode(confirmMode bool) *Client

WithConfirmMode sets the confirm-mode on the client. This causes the client to wait for confirmations, and if none arrives or the confirmation is marked as Nack, Client#Send() returns a corresponding error.

func (*Client) WithConsumeSettings

func (c *Client) WithConsumeSettings(s ConsumeSettings) *Client

WithConsumeSettings will set the settings used when consuming in the client globally.

func (*Client) WithDebugLogger

func (c *Client) WithDebugLogger(f LogFunc) *Client

WithDebugLogger sets the logger to use for debug logging.

func (*Client) WithDialConfig

func (c *Client) WithDialConfig(dc amqp.Config) *Client

WithDialConfig sets the dial config used for the client.

func (*Client) WithErrorLogger

func (c *Client) WithErrorLogger(f LogFunc) *Client

WithErrorLogger sets the logger to use for error logging.

func (*Client) WithMaxRetries added in v0.10.1

func (c *Client) WithMaxRetries(n int) *Client

WithMaxRetries sets the maximum amount of times the client will retry sending the request before giving up and returning the error to the caller of c.Send(). This retry will persist during reconnects.

func (*Client) WithPublishSettings

func (c *Client) WithPublishSettings(s PublishSettings) *Client

WithPublishSettings will set the client publishing settings when publishing messages.

func (*Client) WithQueueDeclareSettings

func (c *Client) WithQueueDeclareSettings(s QueueDeclareSettings) *Client

WithQueueDeclareSettings will set the settings used when declaring queues for the client globally.

func (*Client) WithTLS

func (c *Client) WithTLS(tls *tls.Config) *Client

WithTLS sets the TLS config in the dial config for the client.

func (*Client) WithTimeout

func (c *Client) WithTimeout(t time.Duration) *Client

WithTimeout will set the client timeout used when publishing messages. t will be rounded using the duration's Round function to the nearest multiple of a millisecond. Rounding will be away from zero.

type ClientMiddlewareFunc

type ClientMiddlewareFunc func(next SendFunc) SendFunc

ClientMiddlewareFunc represents a function that can be used as a middleware.

type ConsumeSettings

type ConsumeSettings struct {
	// Consumer sets the consumer tag used when consuming.
	Consumer string

	// AutoAck sets the auto-ack flag. When this is set to false, you must
	// manually ack any deliveries. This is always true for the Client when
	// consuming replies.
	AutoAck bool

	// Exclusive sets the exclusive flag. When this is set to true, no other
	// instances can consume from a given queue. This has no affect on the
	// Client when consuming replies where it's always set to true so that no
	// two clients can consume from the same reply-to queue.
	Exclusive bool

	// QoSPrefetchCount sets the prefetch-count. Set this to a value to ensure
	// that amqp-rpc won't prefetch all messages in the queue. This has no
	// effect on the Client which will always try to fetch everything.
	QoSPrefetchCount int

	// QoSPrefetchSize sets the prefetch-size. Set this to a value to ensure
	// that amqp-rpc won't prefetch all messages in the queue.
	QoSPrefetchSize int

	// Args sets the arguments table used.
	Args amqp.Table
}

ConsumeSettings is the settings that will be used when the consumption on a specified queue is started.

type Dialer

type Dialer func(string, string) (net.Conn, error)

Dialer is a function returning a connection used to connect to the message bus.

type ExchangeDeclareSettings

type ExchangeDeclareSettings struct {
	// Durable sets the durable flag. Durable exchanges survives server restart.
	Durable bool

	// AutoDelete sets the auto-delete flag, this ensures the exchange is
	// deleted when it isn't bound to any more.
	AutoDelete bool

	// Args sets the arguments table used.
	Args amqp.Table
}

ExchangeDeclareSettings is the settings that will be used when a handler is mapped to a fanout exchange and an exchange is declared.

type HandlerBinding

type HandlerBinding struct {
	QueueName    string
	ExchangeName string
	ExchangeType string
	RoutingKey   string
	BindHeaders  amqp.Table
	Handler      HandlerFunc
}

HandlerBinding holds information about how an exchange and a queue should be declared and bound. If the ExchangeName is not defined (an empty string), the queue will not be bound to the exchange but assumed to use the default match.

func DirectBinding

func DirectBinding(routingKey string, handler HandlerFunc) HandlerBinding

DirectBinding returns a HandlerBinding to use for direct exchanges where each routing key will be mapped to one handler.

func FanoutBinding

func FanoutBinding(exchangeName string, handler HandlerFunc) HandlerBinding

FanoutBinding returns a HandlerBinding to use for fanout exchanges. These exchanges does not use the routing key. We do not use the default exchange (amq.fanout) since this would broadcast all messages everywhere.

func HeadersBinding

func HeadersBinding(queueName string, headers amqp.Table, handler HandlerFunc) HandlerBinding

HeadersBinding returns a HandlerBinding to use for header exchanges that will match on specific headers. The heades are specified as an amqp.Table. The default exchange amq.match will be used.

func TopicBinding

func TopicBinding(queueName, routingKey string, handler HandlerFunc) HandlerBinding

TopicBinding returns a HandlerBinding to use for topic exchanges. The default exchange (amq.topic) will be used. The topic is matched on the routing key.

type HandlerFunc

type HandlerFunc func(context.Context, *ResponseWriter, amqp.Delivery)

HandlerFunc is the function that handles all request based on the routing key.

func ServerMiddlewareChain

func ServerMiddlewareChain(next HandlerFunc, m ...ServerMiddlewareFunc) HandlerFunc

ServerMiddlewareChain will attatch all given middlewares to your HandlerFunc. The middlewares will be executed in the same order as your input.

For example:

s := New("url")

s.Bind(DirectBinding(
	"foobar",
	ServerMiddlewareChain(
		myHandler,
		middlewareOne,
		middlewareTwo,
		middlewareThree,
	),
))

type LogFunc

type LogFunc func(format string, args ...interface{})

LogFunc is used for logging in amqp-rpc. It makes it possible to define your own logging.

Here is an example where the logger from the log package is used:

debugLogger := log.New(os.Stdout, "DEBUG - ", log.LstdFlags)
errorLogger := log.New(os.Stdout, "ERROR - ", log.LstdFlags)

server := NewServer(url)
server.WithErrorLogger(errorLogger.Printf)
server.WithDebugLogger(debugLogger.Printf)

It can also be used with for example a Logrus logger:

logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)
logger.Formatter = &logrus.JSONFormatter{}

s.WithErrorLogger(logger.Warnf)
s.WithDebugLogger(logger.Debugf)

client := NewClient(url)
client.WithErrorLogger(logger.Errorf)
client.WithDebugLogger(logger.Debugf)

type MockAcknowledger added in v0.10.1

type MockAcknowledger struct {
	Acks    int
	Nacks   int
	Rejects int
}

MockAcknowledger is a mocked amqp.Acknowledger, useful for tests.

func (*MockAcknowledger) Ack added in v0.10.1

func (ma *MockAcknowledger) Ack(tag uint64, multiple bool) error

Ack increases Acks.

func (*MockAcknowledger) Nack added in v0.10.1

func (ma *MockAcknowledger) Nack(tag uint64, multiple, requeue bool) error

Nack increases Nacks.

func (*MockAcknowledger) Reject added in v0.10.1

func (ma *MockAcknowledger) Reject(tag uint64, requeue bool) error

Reject increases Rejects.

type OnStartedFunc

type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)

OnStartedFunc can be registered at Server.OnStarted(f) and Client.OnStarted(f). This is used when you want to do more setup on the connections and/or channels from amqp, for example setting Qos, NotifyPublish etc.

type PublishSettings

type PublishSettings struct {
	// Mandatory sets the mandatory flag. When this is true a Publish call will
	// be returned if it's not routable by the exchange.
	Mandatory bool

	// Immediate sets the immediate flag. When this is true a Publish call will
	// be returned if a consumer isn't directly available.
	Immediate bool

	// ConfirmMode puts the channel that messages are published over in
	// confirm mode. This makes sending requests more reliable at the cost
	// of some performance. Each publishing must be confirmed by the server.
	// See https://www.rabbitmq.com/confirms.html#publisher-confirms
	ConfirmMode bool
}

PublishSettings is the settings that will be used when a message is about to be published to the message bus. These settings are only used by the Client and never by the Server. For the server, Mandatory or Immediate can be set on the ResponseWriter instead.

type QueueDeclareSettings

type QueueDeclareSettings struct {
	// DeleteWhenUnused sets the auto-delete flag. It's recommended to have this
	// set to false so that amqp-rpc can reconnect and use the same queue while
	// keeping any messages in the queue.
	DeleteWhenUnused bool

	// Durable sets the durable flag. It's recommended to have this set to false
	// and instead use ha-mode for queues and messages.
	Durable bool

	// Exclusive sets the exclusive flag when declaring queues. This flag has
	// no effect on Clients reply-to queues which are never exclusive so it
	// can support reconnects properly.
	Exclusive bool

	// Args sets the arguments table used.
	Args amqp.Table
}

QueueDeclareSettings is the settings that will be used when the response any kind of queue is declared. Se documentation for amqp.QueueDeclare for more information about these settings.

type Request

type Request struct {
	// Exchange is the exchange to which the rquest will be published when
	// passing it to the clients send function.
	Exchange string

	// Routing key is the routing key that will be used in the amqp.Publishing
	// request.
	RoutingKey string

	// Reply is a boolean value telling if the request should wait for a reply
	// or just send the request without waiting.
	Reply bool

	// Timeout is the time we should wait after a request is published before
	// we assume the request got lost.
	Timeout time.Duration

	// Publishing is the publising that are going to be published.
	Publishing amqp.Publishing

	// Context is a context which you can use to pass data from where the
	// request is created to middlewares. By default this will be a
	// context.Background()
	Context context.Context
	// contains filtered or unexported fields
}

Request is a requet to perform with the client.

func NewRequest

func NewRequest() *Request

NewRequest will generate a new request to be published. The default request will use the content type "text/plain" and always wait for reply.

func (*Request) AddMiddleware

func (r *Request) AddMiddleware(m ClientMiddlewareFunc) *Request

AddMiddleware will add a middleware which will be executed when the request is published.

func (*Request) AfterTimeout added in v0.11.0

func (r *Request) AfterTimeout() <-chan time.Time

AfterTimeout waits for the duration of the timeout.

func (*Request) WithBody

func (r *Request) WithBody(b string) *Request

WithBody will convert a string to a byte slice and add as the body passed for the request.

func (*Request) WithContentType

func (r *Request) WithContentType(ct string) *Request

WithContentType will update the content type passed in the header of the request. This value will bee set as the ContentType in the amqp.Publishing type but also preserved as a header value.

func (*Request) WithContext

func (r *Request) WithContext(ctx context.Context) *Request

WithContext will set the context on the request.

func (*Request) WithCorrelationID

func (r *Request) WithCorrelationID(id string) *Request

WithCorrelationID will add/overwrite the correlation ID used for the request and set it on the Publishing.

func (*Request) WithExchange

func (r *Request) WithExchange(e string) *Request

WithExchange will set the exchange on to which the request will be published.

func (*Request) WithHeaders

func (r *Request) WithHeaders(h amqp.Table) *Request

WithHeaders will set the full amqp.Table as the headers for the request. Note that this will overwrite anything previously set on the headers.

func (*Request) WithResponse

func (r *Request) WithResponse(wr bool) *Request

WithResponse sets the value determining wether the request should wait for a response or not. A request that does not require a response will only catch errors occurring before the reuqest has been published.

func (*Request) WithRoutingKey

func (r *Request) WithRoutingKey(rk string) *Request

WithRoutingKey will set the routing key for the request.

func (*Request) WithTimeout

func (r *Request) WithTimeout(t time.Duration) *Request

WithTimeout will set the client timeout used when publishing messages. t will be rounded using the duration's Round function to the nearest multiple of a millisecond. Rounding will be away from zero.

func (*Request) Write

func (r *Request) Write(p []byte) (int, error)

Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.

func (*Request) WriteHeader

func (r *Request) WriteHeader(header string, value interface{})

WriteHeader will write a header for the specified key.

type RequestMap added in v0.11.0

type RequestMap struct {
	// contains filtered or unexported fields
}

RequestMap keeps track of requests based on their DeliveryTag and/or CorrelationID.

func (*RequestMap) Delete added in v0.11.0

func (m *RequestMap) Delete(r *Request)

Delete will remove r from m.

func (*RequestMap) GetByCorrelationID added in v0.11.0

func (m *RequestMap) GetByCorrelationID(key string) (*Request, bool)

GetByCorrelationID returns the request with the provided correlation id.

func (*RequestMap) GetByDeliveryTag added in v0.11.0

func (m *RequestMap) GetByDeliveryTag(key uint64) (*Request, bool)

GetByDeliveryTag returns the request with the provided delivery tag.

func (*RequestMap) Set added in v0.11.0

func (m *RequestMap) Set(r *Request)

Set will add r to m so it can be fetched later using it's correlation id or delivery tag.

type ResponseWriter

type ResponseWriter struct {
	Publishing *amqp.Publishing
	Mandatory  bool
	Immediate  bool
}

ResponseWriter is used by a handler to construct an RPC response. The ResponseWriter may NOT be used after the handler has returned.

Because the ResponseWriter implements io.Writer you can for example use it to write json:

encoder := json.NewEncoder(responseWriter)
encoder.Encode(dataObject)

func NewResponseWriter

func NewResponseWriter(p *amqp.Publishing) *ResponseWriter

NewResponseWriter will create a new response writer with given amqp.Publishing.

func (*ResponseWriter) Write

func (rw *ResponseWriter) Write(p []byte) (int, error)

Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.

func (*ResponseWriter) WriteHeader

func (rw *ResponseWriter) WriteHeader(header string, value interface{})

WriteHeader will write a header for the specified key.

type SendFunc

type SendFunc func(r *Request) (d *amqp.Delivery, e error)

SendFunc represents the function that Send does. It takes a Request as input and returns a delivery and an error.

func ClientMiddlewareChain

func ClientMiddlewareChain(next SendFunc, m ...ClientMiddlewareFunc) SendFunc

ClientMiddlewareChain will attatch all given middlewares to your SendFunc. The middlewares will be executed in the same order as your input.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents an AMQP server used within the RPC framework. The server uses bindings to keep a list of handler functions.

func NewServer

func NewServer(url string) *Server

NewServer will return a pointer to a new Server.

func (*Server) AddMiddleware

func (s *Server) AddMiddleware(m ServerMiddlewareFunc) *Server

AddMiddleware will add a ServerMiddleware to the list of middlewares to be triggered before the handle func for each request.

func (*Server) Bind

func (s *Server) Bind(binding HandlerBinding)

Bind will add a HandlerBinding to the list of servers to serve.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe()

ListenAndServe will dial the RabbitMQ message bus, set up all the channels, consume from all RPC server queues and monitor to connection to ensure the server is always connected.

func (*Server) OnStarted

func (s *Server) OnStarted(f OnStartedFunc)

OnStarted can be used to hook into the connections/channels that the server is using. This can be useful if you want more control over amqp directly. The OnStartedFunc will be executed after ListenAndServe is called. Note that this function is blocking and the server won't continue it's startup until it has finished executing.

server := NewServer(url)
server.OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) {
	// Do something with amqp connections/channels.
})

func (*Server) Stop

func (s *Server) Stop()

Stop will gracefully disconnect from AMQP after draining first incoming then outgoing messages. This method won't wait for server shutdown to complete, you should instead wait for ListenAndServe to exit.

func (*Server) WithAutoAck added in v0.10.1

func (s *Server) WithAutoAck(b bool) *Server

WithAutoAck sets the AMQP servers auto-ack mode.

func (*Server) WithConsumeSettings added in v0.10.1

func (s *Server) WithConsumeSettings(settings ConsumeSettings) *Server

WithConsumeSettings sets configuration used when the server wants to start consuming from a queue.

func (*Server) WithDebugLogger

func (s *Server) WithDebugLogger(f LogFunc) *Server

WithDebugLogger sets the logger to use for debug logging.

func (*Server) WithDialConfig

func (s *Server) WithDialConfig(c amqp.Config) *Server

WithDialConfig sets the dial config used for the server.

func (*Server) WithErrorLogger

func (s *Server) WithErrorLogger(f LogFunc) *Server

WithErrorLogger sets the logger to use for error logging.

func (*Server) WithExchangeDeclareSettings added in v0.10.1

func (s *Server) WithExchangeDeclareSettings(settings ExchangeDeclareSettings) *Server

WithExchangeDeclareSettings sets configuration used when the server wants to declare exchanges.

func (*Server) WithQoSPrefetchCount added in v0.12.0

func (s *Server) WithQoSPrefetchCount(c int) *Server

WithQoSPrefetchCount sets the AMQP servers QoS pre-fetch count.

func (*Server) WithQueueDeclareSettings added in v0.10.1

func (s *Server) WithQueueDeclareSettings(settings QueueDeclareSettings) *Server

WithQueueDeclareSettings sets configuration used when the server wants to declare queues.

func (*Server) WithTLS

func (s *Server) WithTLS(tls *tls.Config) *Server

WithTLS sets the TLS config in the dial config for the server.

type ServerMiddlewareFunc

type ServerMiddlewareFunc func(next HandlerFunc) HandlerFunc

ServerMiddlewareFunc represent a function that can be used as a middleware.

For example:

func myMiddle(next HandlerFunc) HandlerFunc {

	// Preinitialization of middleware here.

	return func(ctx context.Context, rw *ResponseWriter d amqp.Delivery) {
		// Before handler execution here.

		// Execute the handler.
		next(ctx, rw, d)

		// After execution here.
	}
}

s := New("url")

// Add middleware to specific handler.
s.Bind(DirectBinding("foobar", myMiddle(HandlerFunc)))

// Add middleware to all handlers on the server.
s.AddMiddleware(myMiddle)

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL