amqprpc

package module
v5.0.0-...-83d63e8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: MIT Imports: 19 Imported by: 0

README

AMQP RPC
RabbitMQ RPC

RabbitMQ RPC Framework

Go Reference

Description

This is a framework to use RabbitMQ as a client/server RPC setup together with the Go amqp implementation. The framework can manage a fully functional message queue setup with reconnects, disconnects, graceful shutdown and other stability mechanisms. By providing this RabbitMQ can be used as transport and service discovery to quickly get up and running with a micro service architecture.

Since the framework handles the majority of the communication with RabbitMQ and the Go library the user does not need to know details about those systems. However, since a few interfaces exposes the Go package types and that the nomenclature is unique for RabbitMQ some prior experience is preferred.

Project status

This project has been used in production since October 2018 handling millions of requests both as server and client.

Server

The server is inspired by the HTTP library where the user maps a RabbitMQ binding to a handler function. A response writer is passed to the handler which may be used as an io.Writer to write the response.

This is an example of how to get up and running with a server responding to all messages published to the given routing key.

server := NewServer("amqp://guest:guest@localhost:5672")

server.Bind(DirectBinding("routing_key", func(c context.Context, rw *ResponseWriter d *amqp.Delivery) {
    // Print what the body and header was
    fmt.Println(d.Body, d.Headers)

    // Add a response to the publisher
    fmt.Fprint(rw, "Handled")
}))

server.ListenAndServe()

The example above uses a DirectBinding but all the supported bindings are provided via an interface where the exchange type will be set to the proper type.

server.Bind(DirectBinding("routing_key", handleFunc))
server.Bind(TopicBinding("queue-name", "routing_key.#", handleFunc))
server.Bind(HeadersBinding("queue-name", amqp.Table{"x-match": "all", "foo": "bar"}, handleFunc))

If the default variables doesn't result in the desired result you can setup the binding with the type manually.

customBinding := CreateBinding("oh-sweet-queue", DefaultExchangeNameDirect, handleFunc).
    WithPrefetchCount(100).
    WithAutoAck(false)

server.Bind(customBinding)

The server will not connect until ListenAndServe() is being called. This means that you can configure settings for the server until that point. The settings can be changed by calling chainable methods.

server := NewServer("amqp://guest:guest@localhost:5672").
    WithLogger(logger).
    WithTLS(&tls.Config{})

QoS is by default set to a prefetch count of 10. If you want to change this you can modify the binding by setting the PrefetchCount to something else.

Client

The client is built around channels to be able to handle as many requests as possible without the need to setup multiple clients. All messages published get a unique correlation ID which is used to determine where a response should be passed no matter what order it's received.

The client takes a Request as input which can hold all required information about how to route and handle the message and response.

client := NewClient("amqp://guest:guest@localhost:5672")

request := NewRequest().
    WithRoutingKey("routing_key").
    WithBody("This is my body)

response, err := client.Send(request)
if err != nil {
    slog.Error(err.Error())
}

slog.Info(string(response.Body))

The client will not connect while being created, instead this happens when the first request is being published (while calling Send()). This allows you to configure connection related parameters such as timeout by chaining the methods.

// Set timeout after NewClient is invoked by chaining.
client := NewClient("amqp://guest:guest@localhost:5672").
    WithTimeout(5000 * time.Milliseconds)

// Will not connect and may be changed until this call.
client.Send(NewRequest().WithRoutingKey("routing_key"))

Example of available methods for chaining.

client := NewClient("amqp://guest:guest@localhost:5672").
    WithLogger(logger).
    WithDialConfig(amqp.Config{}).
    WithTLS(&tls.Config{}).
    WithReplyToConsumerArgs(amqp.Table{}).
    WithConfirmMode(false),
    WithTimeout(10 * Time.Second)

Confirm mode

Confirm mode can be set on the client and will make the client wait for Ack from the amqp-server. This makes sending requests more reliable at the cost of some performance as each publishing must be confirmed by the amqp-server Your can read more here

The client is set in confirm mode by default.

You can use WithConfirmMode to control this setting. It defaults to true.

client := NewClient("amqp://guest:guest@localhost:5672").
    WithConfirmMode(true)

Request

The Request type is used as input to the clients send function and holds all the information about how to route and handle the request. All the properties may be set with chainable methods to make it easy to construct. A Request may be re-used as many times as desired.

request := NewRequest().
    WithBody(`{"hello":"world"}`).
    WithContentType("application/json").
    WithContext(context.TODO()).
    WithExchange("custom.exchange").
    WithRoutingKey("routing_key").
    WithHeaders(amqp.Headers{}).
    WithTimeout(5 * time.Second).
    WithMandatory(true).
    WithResponse(true)

By default the request will have Reply set to true to expect a reply to the message.

The Request also implements the io.Writer interface which makes it possible to use directly like that.

request := NewRequest()

err := json.NewEncoder(request).Encode(serializableObject)
if err != nil {
    panic(err)
}
Timeout

You can set a timeout on requests with Client.WithTimeout(duration), Request.WithTimeout(duration) or having a context with a deadline set. Timeouts set on the request will take precedence.

When a request expects a reply (Request.Reply), the outgoing message is also assigned an Expiration corresponding to the request’s deadline. This ensures the message won’t remain in the queue after the client has stopped waiting. If you set a deadline on the context, and that deadline is shorter than the timeout, the Expiration will be set to that deadline.

Context

By default a context.Background() will be set on the request. You should set your own context so that any cancellation is propagated. Cancelling the context will cancel the request and Client.Send will unblock and return with the error from the Cause set by the cancellation. It will not wait for confirmations or responses.

ctx, cancel := context.WithCancelCause(r.Context, errors.New("my error"))
cancel()

request := NewRequest().
    WithContext(ctx)

_, err := client.Send(NewRequest().WithContext(ctx))
fmt.Println(err) // my error

Sender

The client invokes a default SendFunc while calling Send() where all the RabbitMQ communication is handled and the message is published. The sender is attached to a public field (Sender) and may be overridden. This enables you to handle unit testing without the need to implement an interface. This framework comes with a package named amqprpctest which helps you create a client with your own custom send function.

unitTestSendFunc := func(r *Request) (*amqp.Delivery, error) {
    fmt.Println("will not connect or publish anything")

    mockResponse := &amqp.Delivery{
        Body: []byte("this is my mock response")
    }

    return mockResponse, nil
}

client := amqprpctest.NewTestClient(unitTestSendFunc)

Middlewares

Both the client and the server aim to only handle the connectivity and routing within the RPC setup. This means that there's no code interacting with the request or the responses before or after a request is being published or received. To handle this in a dynamic way and allow the user to affect as much as possible for each request both the server and the client is built around middlewares.

Server middlewares

Server middlewares can be hooked both to a specific handler or to all messages for the entire server. Middlewares can be chained to be executed in a specific order.

The middleware is inspired by the HTTP library where the middleware is defined as a function that takes a handler function as input and returns the same handler func. This makes it possible to add middlewares in any order.

The handler function HandlerFunc takes a context, a response writer and a delivery as input not returning anything. The ServerMiddlewareFunc thus takes this type as input and returns the same type.

type HandlerFunc func(context.Context, *ResponseWriter, amqp.Delivery)

type ServerMiddlewareFunc func(next HandlerFunc) HandlerFunc

To execute the next handler just call the received function.

server := NewServer("amqp://guest:guest@localhost:5672")

// Define a custom middleware to use for the server.
func myMiddleware(next HandlerFunc) HandlerFunc {
    // Preinitialization of middleware here.

    return func(ctx context.Context, rw *ResponseWriter d amqp.Delivery) {
        fmt.Println("this will be executed before the actual handler")

        // Execute the handler.
        next(ctx, rw, d)

        fmt.Println("this will be executed after the actual handler")
    }
}

// Add a middleware to specific handler.
server.Bind(DirectBinding("foobar", myMiddleware(HandlerFunc)))

// Add multiple middlewares to specific handler by chainging them.
server.Bind(
    DirectBinding(
        "foobar",
        ServerMiddlewareChain(
            myHandler,
            middlewareOne,
            middlewareTwo,
            middlewareThree,
        ),
    )
)

// Add middleware to all handlers on the server.
server.AddMiddleware(myMiddleware)

server.ListenAndServe()

Note that an example of how to handle panic recovery with a middleware if a handler would panic is located in the middleware folder in this project.

Client middlewares

The client supports middlewares which may be executed before or after a request has been published. This is a great way to i.e enrich headers or handle errors returned.

The sender function SendFunc takes a Request as input and returns an amqp.Delivery pointer and an error. The ClientMiddlewareFunc thus takes this type as input and returns the same type.

type SendFunc func(r *Request) (d *amqp.Delivery, e error)

type ClientMiddlewareFunc func(next SendFunc) SendFunc

A middleware can be added to the client to be executed for all requests or attached to the Request type instead.

func myMiddleware(next SendFunc) SendFunc {
    return func(r *Request) (*amqp.Delivery, error) (
        r.Publishing.Headers["enriched-header"] = "yes"
        r.Publishing.AppId = "my-app"

        return next(r)
    }
}

// Add the middleware to all request made with the client.
client := NewClient("amqp://guest:guest@localhost:5672").
    AddMiddleware(myMiddleware)

// Add the middleware to a single request
reuqest := NewRequest().
    WithRoutingKey("routing_key").
    AddMiddleware(myMiddleware)

client.Send(request)

Due to the way middlewares are added and chained the middlewares attached to the request will always be executed after the middlewares attached to the client.

For more examples of client middlewares, see examples/middleware.

Connection hooks

You often want to know when a connection has been established and when it comes to RabbitMQ also perform some post connection setup. This is enabled by the fact that both the server and the client holds a list of OnStarted. The function receives the incoming connection, outgoing connection, incoming channel and outgoing channel.

type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)
server := NewServer("amqp://guest:guest@localhost:5672").
    OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) {
        // Do something after connection here...
    })
}

server.ListenAndServe()

Both the server and the client follow the recommendations for RabbitMQ connections which means separate connections for incoming and outgoing traffic and separate channels for consuming and publishing messages. Because of this the signature looks the same way for both the server and the client.

TLS

Since this frameworks main responsibility is to handle connections it's shipped with a type named Certificates which can hold the RabbitMQ message bus CA and the clients certificate and private key. This is just a convenient way to keep track of certificates to use but it also implements a method named TLSConfig() which returns a *tls.Config. This may then be used to connect with a secured protocol (AMQPS) to the message bus.

certificates := Certificates{
    CA: "/path/to/rootCA.pem",
}

// Now we can get the TLS configuration to use for the client and server.
uri := "amqps://guest:guest@localhost:5671"

server := NewServer(uri).WithTLS(certificates.TLSConfig())
client := NewClient(uri).WithTLS(certificates.TLSConfig())

server.ListenAndServe()

Logging

You can specify your own slog.Logger instance. By default amqp-rpc will log errors using the logger from slog.Default(). Some logs will contain data contained in a amqp.Delivery or amqp.Publishing, including any headers. If you want to avoid logging some of the fields you can use an slog.Handler to filter out the fields you don't want to log.

The library will log using two different levels: slog.LevelDebug and slog.LevelInfo.

If you want to use something other than slog for logging, you can implement a slog.Handler wrapper that wraps your preferred logging implementation.

logger := slog.New(
	slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
		Level: slog.LevelDebug,
	}),
)

server := NewServer(url).
    WithLogger(logger)

client := NewClient(url).
    WithLogger(logger)

This is perfect when using a logger which supports debugging as a separate method such as the logrus logger which has Debugf and Errorf methods.

Examples

There are a few examples included in the examples folder. For more information about how to customize your setup, see the documentation.

Documentation

Index

Constants

View Source
const (
	ExchangeTypeDirect  = "direct"
	ExchangeTypeTopic   = "topic"
	ExchangeTypeHeaders = "headers"
)

The default exchange types that are available in RabbitMQ.

View Source
const (
	DefaultExchangeNameDirect  = "amq.direct"
	DefaultExchangeNameTopic   = "amq.topic"
	DefaultExchangeNameHeaders = "amq.match"
)

The default exchanges that are available in RabbitMQ.

View Source
const (
	QueueTypeClassic = "classic"
	QueueTypeQuorum  = "quorum"
)

The different queue types that are available in RabbitMQ.

Variables

View Source
var (
	// ErrRequestReturned can be returned by Client#Send() when the server
	// returns the message. For example when mandatory is set but the message
	// can not be routed.
	ErrRequestReturned = errors.New("publishing returned")

	// ErrRequestRejected can be returned by Client#Send() when the server Nacks
	// the message. This can happen if there is some problem inside the amqp
	// server. To check if the error returned is a ErrRequestReturned error, use
	// errors.Is(err, ErrRequestRejected).
	ErrRequestRejected = errors.New("publishing Nacked")

	// ErrRequestTimeout is an error returned when a client request does not
	// receive a response within the client timeout duration. To check if the
	// error returned is an ErrRequestTimeout error, use errors.Is(err,
	// ErrRequestTimeout).
	ErrRequestTimeout = errors.New("request timed out")
)
View Source
var ErrUnexpectedConnClosed = errors.New("unexpected connection close without specific error")

ErrUnexpectedConnClosed is returned by ListenAndServe() if the server shuts down without calling Stop() and if AMQP does not give an error when said shutdown happens.

Functions

func ContextWithQueueName

func ContextWithQueueName(ctx context.Context, queueName string) context.Context

ContextWithQueueName adds the given queueName to the provided context.

func ContextWithShutdownChan

func ContextWithShutdownChan(ctx context.Context, ch chan struct{}) context.Context

ContextWithShutdownChan adds a shutdown chan to the given context.

func QueueNameFromContext

func QueueNameFromContext(ctx context.Context) (string, bool)

QueueNameFromContext returns the queue name for the current request.

func ShutdownChanFromContext

func ShutdownChanFromContext(ctx context.Context) (chan struct{}, bool)

ShutdownChanFromContext returns the shutdown chan.

Types

type AwareAcknowledger

type AwareAcknowledger struct {
	Acknowledger amqp.Acknowledger
	Handled      bool
}

AwareAcknowledger implements the amqp.Acknowledger interface with the addition that it can tell if a message has been acked in any way.

func NewAwareAcknowledger

func NewAwareAcknowledger(acknowledger amqp.Acknowledger) *AwareAcknowledger

NewAwareAcknowledger returns the passed acknowledger as an AwareAcknowledger.

func (*AwareAcknowledger) Ack

func (a *AwareAcknowledger) Ack(tag uint64, multiple bool) error

Ack passes the Ack down to the underlying Acknowledger.

func (*AwareAcknowledger) Nack

func (a *AwareAcknowledger) Nack(tag uint64, multiple, requeue bool) error

Nack passes the Nack down to the underlying Acknowledger.

func (*AwareAcknowledger) Reject

func (a *AwareAcknowledger) Reject(tag uint64, requeue bool) error

Reject passes the Reject down to the underlying Acknowledger.

type Certificates

type Certificates struct {
	Cert string
	Key  string
	CA   string
}

Certificates represents the certificate, the key and the CA to use when using RabbitMQ with TLS or the certificate and key when using as TLS configuration for RPC server. The fields should be the path to files stored on disk and will be passed to ioutil.ReadFile and tls.LoadX509KeyPair.

func (*Certificates) TLSConfig

func (c *Certificates) TLSConfig() *tls.Config

TLSConfig will return a *tls.Config type based on the files set in the Certificates type.

type Client

type Client struct {

	// Sender is the main send function called after all middlewares has been
	// chained and called. This field can be overridden to simplify testing.
	Sender SendFunc
	// contains filtered or unexported fields
}

Client represents an AMQP client used within a RPC framework. This client can be used to communicate with RPC servers.

func NewClient

func NewClient(url string) *Client

NewClient will return a pointer to a new Client. There are two ways to manage the connection that will be used by the client (i.e. when using TLS).

The first one is to use the Certificates type and just pass the filenames to the client certificate, key and the server CA. If this is done the function will handle the reading of the files.

It is also possible to create a custom amqp.Config with whatever configuration desired and that will be used as dial configuration when connection to the message bus.

func (*Client) AddMiddleware

func (c *Client) AddMiddleware(m ClientMiddlewareFunc) *Client

AddMiddleware will add a middleware which will be executed on request.

func (*Client) OnStarted

func (c *Client) OnStarted(f OnStartedFunc)

OnStarted can be used to hook into the connections/channels that the client is using. This can be useful if you want more control over amqp directly. Note that since the client is lazy and won't connect until the first .Send() the provided OnStartedFunc won't be called until then. Also note that this is blocking and the client won't continue it's startup until this function has finished executing.

client := NewClient(url)
client.OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) {
	// Do something with amqp connections/channels.
})

func (*Client) Send

func (c *Client) Send(r *Request) (*amqp.Delivery, error)

Send will send a Request by using a amqp.Publishing.

func (*Client) Stop

func (c *Client) Stop()

Stop will gracefully disconnect from AMQP. It is not guaranteed that all in flight requests or responses are handled before the disconnect. Instead the user should ensure that all calls to c.Send() has returned before calling c.Stop().

func (*Client) WithConfirmMode

func (c *Client) WithConfirmMode(confirmMode bool) *Client

WithConfirmMode sets the confirm-mode on the client. This causes the client to wait for confirmations, and if none arrives or the confirmation is marked as Nack, Client#Send() returns a corresponding error.

func (*Client) WithDialConfig

func (c *Client) WithDialConfig(dc amqp.Config) *Client

WithDialConfig sets the dial config used for the client.

func (*Client) WithDialTimeout

func (c *Client) WithDialTimeout(timeout time.Duration) *Client

WithDialTimeout sets the DialTimeout and handshake deadlines to timeout.

func (*Client) WithLogger

func (c *Client) WithLogger(logger *slog.Logger) *Client

WithLogger sets the logger to use for error and debug logging. By default the library will log errors using the logger from slog.Default. Some logs will contain data contained in a amqp.Delivery or amqp.Publishing, including any headers. If you want to avoid logging some of the fields you can use an slog.Handler to filter out the fields you don't want to log.

func (*Client) WithMaxRetries

func (c *Client) WithMaxRetries(n int) *Client

WithMaxRetries sets the maximum amount of times the client will retry sending the request before giving up and returning the error to the caller of c.Send(). This retry will persist during reconnects.

func (*Client) WithName

func (c *Client) WithName(name string) *Client

WithName sets the name of the client, it will be used the creation of the reply-to queue, consumer tags and connection names.

func (*Client) WithReplyToConsumerArgs

func (c *Client) WithReplyToConsumerArgs(args amqp.Table) *Client

WithReplyToConsumerArgs will set the consumer args used when the client starts its reply-to consumer.

func (*Client) WithReplyToQueueDeclareArgs

func (c *Client) WithReplyToQueueDeclareArgs(args amqp.Table) *Client

WithReplyToQueueDeclareArgs will set the settings used when declaring the reply-to queue.

func (*Client) WithTLS

func (c *Client) WithTLS(tlsConfig *tls.Config) *Client

WithTLS sets the TLS config in the dial config for the client.

func (*Client) WithTimeout

func (c *Client) WithTimeout(t time.Duration) *Client

WithTimeout will set the client timeout used when publishing messages. t will be rounded using the duration's Round function to the nearest multiple of a millisecond. Rounding will be away from zero.

type ClientMiddlewareFunc

type ClientMiddlewareFunc func(next SendFunc) SendFunc

ClientMiddlewareFunc represents a function that can be used as a middleware.

type ExchangeDeclareSettings

type ExchangeDeclareSettings struct {
	// Name is the name of the exchange.
	Name string

	// Type is the exchange type.
	Type string

	// Durable sets the durable flag. Durable exchanges survives server restart.
	Durable bool

	// AutoDelete sets the auto-delete flag, this ensures the exchange is
	// deleted when it isn't bound to any more.
	AutoDelete bool

	// Args sets the arguments table used.
	Args amqp.Table
}

ExchangeDeclareSettings is the settings that will be used when a handler is mapped to a fanout exchange and an exchange is declared.

type HandlerBinding

type HandlerBinding struct {
	// QueueName is the name of the queue that the handler should be bound to.
	QueueName string

	// ExchangeName is the exchange that the queue should be bound to.
	ExchangeName string

	// RoutingKey is the routing key that the queue should be bound to.
	RoutingKey string

	// BindHeaders is the headers that the queue should be bound to.
	BindHeaders amqp.Table

	// Handler is the function that should be called when a message is
	// received.
	Handler HandlerFunc

	// QueueDurable sets the durable flag. Should not be used together with
	// QueueExclusive since that will fail in a future RabbitMQ version.
	// Setting setting it to false not work with quorum queues.
	QueueDurable bool

	// Set the queue to be automatically deleted when the last consumer stops.
	QueueAutoDelete bool

	// SkipQueueDeclare will skip the queue declaration. This can be useful when
	// you are migrating queue types or arguments and want to avoid the
	// PRECONDITION_FAILED error.
	SkipQueueDeclare bool

	// QueueExclusive sets the exclusive flag when declaring the queue.
	// Exclusive queues can only be used by the connection that created them.
	// If exclusive is true, the queue will be deleted during a network failure
	// making incoming requests fail before the reconnect and redeclare
	// happens.
	QueueExclusive bool

	// QueueDeclareArgs sets any extra queue arguments.
	QueueDeclareArgs amqp.Table

	// AutoAck sets the auto-ack flag on the consumer.
	AutoAck bool

	// PrefetchCount sets the prefetch count for the consumer. This is only
	// really usable when AutoAck is also set to false.
	PrefetchCount int

	// ExclusiveConsumer sets the exclusive flag when starting the consumer.
	// This ensures that the bound handler is the only consumer of its queue.
	// This works only with classic queues and setting this to true while using
	// a quorum queue will silently allow multiple consumers.
	ExclusiveConsumer bool

	// ConsumerArgs sets any extra consumer arguments.
	ConsumerArgs amqp.Table
}

HandlerBinding holds information about how an exchange and a queue should be declared and bound. If the ExchangeName is not defined (an empty string), the queue will not be bound to the exchange but assumed to use the default match.

func CreateBinding

func CreateBinding(queueName, exchangeName string, handler HandlerFunc) HandlerBinding

CreateBinding returns a HandlerBinding with default values set.

func DirectBinding

func DirectBinding(routingKey string, handler HandlerFunc) HandlerBinding

DirectBinding returns a HandlerBinding to use for direct exchanges where each routing key will be mapped to one handler.

func HeadersBinding

func HeadersBinding(queueName string, headers amqp.Table, handler HandlerFunc) HandlerBinding

HeadersBinding returns a HandlerBinding to use for header exchanges that will match on specific headers. The headers are specified as an amqp.Table. The default exchange amq.match will be used.

func TopicBinding

func TopicBinding(queueName, routingKey string, handler HandlerFunc) HandlerBinding

TopicBinding returns a HandlerBinding to use for topic exchanges. The default exchange (amq.topic) will be used. The topic is matched on the routing key.

func (HandlerBinding) WithAutoAck

func (b HandlerBinding) WithAutoAck(autoAck bool) HandlerBinding

WithAutoAck sets the auto-ack flag on the consumer.

func (HandlerBinding) WithBindHeaders

func (b HandlerBinding) WithBindHeaders(headers amqp.Table) HandlerBinding

WithBindHeaders sets the headers that the queue should be bound to.

func (HandlerBinding) WithConsumerArg

func (b HandlerBinding) WithConsumerArg(key string, val any) HandlerBinding

WithConsumerArg sets one extra consumer argument, this ensures that the consumer default arguments are not overwritten.

func (HandlerBinding) WithConsumerArgs

func (b HandlerBinding) WithConsumerArgs(args amqp.Table) HandlerBinding

WithConsumerArgs sets any extra consumer arguments.

func (HandlerBinding) WithExchangeName

func (b HandlerBinding) WithExchangeName(name string) HandlerBinding

WithExchangeName sets the exchange that the queue should be bound to.

func (HandlerBinding) WithExclusiveConsumer

func (b HandlerBinding) WithExclusiveConsumer(exclusive bool) HandlerBinding

WithExclusiveConsumer sets the exclusive flag when starting the consumer. This ensures that the bound handler is the only consumer of its queue. This works only with classic queues and setting this to true while using a quorum queue will silently allow multiple consumers.

func (HandlerBinding) WithHandler

func (b HandlerBinding) WithHandler(handler HandlerFunc) HandlerBinding

WithHandler sets the function that should be called when a message is received.

func (HandlerBinding) WithPrefetchCount

func (b HandlerBinding) WithPrefetchCount(count int) HandlerBinding

WithPrefetchCount sets the prefetch count for the consumer.

func (HandlerBinding) WithQueueAutoDelete

func (b HandlerBinding) WithQueueAutoDelete(autoDelete bool) HandlerBinding

WithQueueAutoDelete sets the queue to be automatically deleted when the last consumer stops.

func (HandlerBinding) WithQueueDeclareArg

func (b HandlerBinding) WithQueueDeclareArg(key string, val any) HandlerBinding

WithQueueDeclareArg sets one queue argument, this ensures that the queue default arguments are not overwritten.

func (HandlerBinding) WithQueueDeclareArgs

func (b HandlerBinding) WithQueueDeclareArgs(args amqp.Table) HandlerBinding

WithQueueDeclareArgs sets extra queue arguments.

func (HandlerBinding) WithQueueDurable

func (b HandlerBinding) WithQueueDurable(durable bool) HandlerBinding

WithQueueDurable sets the durable flag for the queue.

func (HandlerBinding) WithQueueExclusive

func (b HandlerBinding) WithQueueExclusive(exclusive bool) HandlerBinding

WithQueueExclusive sets the exclusive flag when declaring the queue. Exclusive queues can only be used by the connection that created them. If exclusive is true, the queue will be deleted during a network failure making incoming requests fail before the reconnect and redeclare happens.

func (HandlerBinding) WithQueueName

func (b HandlerBinding) WithQueueName(name string) HandlerBinding

WithQueueName sets the name of the queue that the handler should be bound to.

func (HandlerBinding) WithRoutingKey

func (b HandlerBinding) WithRoutingKey(key string) HandlerBinding

WithRoutingKey sets the routing key that the queue should be bound to.

func (HandlerBinding) WithSkipQueueDeclare

func (b HandlerBinding) WithSkipQueueDeclare(skip bool) HandlerBinding

WithSkipQueueDeclare sets the flag to skip the queue declaration.

type HandlerFunc

type HandlerFunc func(context.Context, *ResponseWriter, amqp.Delivery)

HandlerFunc is the function that handles all request based on the routing key.

func ServerMiddlewareChain

func ServerMiddlewareChain(next HandlerFunc, m ...ServerMiddlewareFunc) HandlerFunc

ServerMiddlewareChain will attatch all given middlewares to your HandlerFunc. The middlewares will be executed in the same order as your input.

For example:

s := New("url")

s.Bind(DirectBinding(
	"foobar",
	ServerMiddlewareChain(
		myHandler,
		middlewareOne,
		middlewareTwo,
		middlewareThree,
	),
))

type MockAcknowledger

type MockAcknowledger struct {
	Acks    int
	Nacks   int
	Rejects int
	OnAckFn func() error
}

MockAcknowledger is a mocked amqp.Acknowledger, useful for tests.

func (*MockAcknowledger) Ack

func (ma *MockAcknowledger) Ack(_ uint64, _ bool) error

Ack increases Acks.

func (*MockAcknowledger) Nack

func (ma *MockAcknowledger) Nack(_ uint64, _, _ bool) error

Nack increases Nacks.

func (*MockAcknowledger) Reject

func (ma *MockAcknowledger) Reject(_ uint64, _ bool) error

Reject increases Rejects.

type OnStartedFunc

type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)

OnStartedFunc can be registered at Server.OnStarted(f) and Client.OnStarted(f). This is used when you want to do more setup on the connections and/or channels from amqp, for example setting Qos, NotifyPublish etc.

type Request

type Request struct {
	// Exchange is the exchange to which the rquest will be published when
	// passing it to the clients send function.
	Exchange string

	// Routing key is the routing key that will be used in the amqp.Publishing
	// request.
	RoutingKey string

	// Mandatory will set the mandatory flag on the request. When this is true
	// the request must be routable or the Send will return a NO_ROUTE amqp
	// error.
	Mandatory bool

	// Reply is a boolean value telling if the request should wait for a reply
	// or just send the request without waiting.
	Reply bool

	// Timeout is the time we should wait after a request is published before
	// we assume the request got lost.
	Timeout time.Duration

	// Publishing is the publising that are going to be published.
	Publishing amqp.Publishing

	// Context is a context which you can use to pass data from where the
	// request is created to middlewares. By default this will be a
	// context.Background()
	Context context.Context //nolint:containedctx // Needed in the struct.
	// contains filtered or unexported fields
}

Request is a requet to perform with the client.

func NewRequest

func NewRequest() *Request

NewRequest will generate a new request to be published. The default request will use the content type "text/plain" and always wait for reply.

func (*Request) AddMiddleware

func (r *Request) AddMiddleware(m ClientMiddlewareFunc) *Request

AddMiddleware will add a middleware which will be executed when the request is published.

func (*Request) WithBody

func (r *Request) WithBody(b string) *Request

WithBody will convert a string to a byte slice and add as the body passed for the request.

func (*Request) WithContentType

func (r *Request) WithContentType(ct string) *Request

WithContentType will update the content type passed in the header of the request. This value will bee set as the ContentType in the amqp.Publishing type but also preserved as a header value.

func (*Request) WithContext

func (r *Request) WithContext(ctx context.Context) *Request

WithContext will set the context on the request.

func (*Request) WithCorrelationID

func (r *Request) WithCorrelationID(id string) *Request

WithCorrelationID will add/overwrite the correlation ID used for the request and set it on the Publishing. This string must be unique for each request.

func (*Request) WithExchange

func (r *Request) WithExchange(e string) *Request

WithExchange will set the exchange on to which the request will be published.

func (*Request) WithHeaders

func (r *Request) WithHeaders(h amqp.Table) *Request

WithHeaders will set the full amqp.Table as the headers for the request. Note that this will overwrite anything previously set on the headers.

func (*Request) WithMandatory

func (r *Request) WithMandatory(val bool) *Request

WithMandatory will set the mandatory flag on the request. When this is true, the request must be routable, eg. a queue must be bound in such a way that RabbitMQ can route the message to that queue.

func (*Request) WithResponse

func (r *Request) WithResponse(wr bool) *Request

WithResponse sets the value determining wether the request should wait for a response or not. A request that does not require a response will only catch errors occurring before the reuqest has been published.

func (*Request) WithRoutingKey

func (r *Request) WithRoutingKey(rk string) *Request

WithRoutingKey will set the routing key for the request.

func (*Request) WithTimeout

func (r *Request) WithTimeout(t time.Duration) *Request

WithTimeout will set the client timeout used when publishing messages. t will be rounded using the duration's Round function to the nearest multiple of a millisecond. Rounding will be away from zero.

func (*Request) Write

func (r *Request) Write(p []byte) (int, error)

Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.

func (*Request) WriteHeader

func (r *Request) WriteHeader(header string, value interface{})

WriteHeader will write a header for the specified key.

type RequestMap

type RequestMap struct {
	// contains filtered or unexported fields
}

RequestMap keeps track of requests based on their DeliveryTag and/or CorrelationID.

func (*RequestMap) Delete

func (m *RequestMap) Delete(r *Request)

Delete will remove r from m.

func (*RequestMap) GetByCorrelationID

func (m *RequestMap) GetByCorrelationID(key string) (*Request, bool)

GetByCorrelationID returns the request with the provided correlation id.

func (*RequestMap) GetByDeliveryTag

func (m *RequestMap) GetByDeliveryTag(key uint64) (*Request, bool)

GetByDeliveryTag returns the request with the provided delivery tag.

func (*RequestMap) Set

func (m *RequestMap) Set(r *Request)

Set will add r to m so it can be fetched later using it's correlation id or delivery tag.

type ResponseWriter

type ResponseWriter struct {
	Publishing *amqp.Publishing
}

ResponseWriter is used by a handler to construct an RPC response. The ResponseWriter may NOT be used after the handler has returned.

Because the ResponseWriter implements io.Writer you can for example use it to write json:

encoder := json.NewEncoder(responseWriter)
encoder.Encode(dataObject)

func (*ResponseWriter) Write

func (rw *ResponseWriter) Write(p []byte) (int, error)

Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.

func (*ResponseWriter) WriteHeader

func (rw *ResponseWriter) WriteHeader(header string, value interface{})

WriteHeader will write a header for the specified key.

type SendFunc

type SendFunc func(r *Request) (d *amqp.Delivery, e error)

SendFunc represents the function that Send does. It takes a Request as input and returns a delivery and an error.

func ClientMiddlewareChain

func ClientMiddlewareChain(next SendFunc, m ...ClientMiddlewareFunc) SendFunc

ClientMiddlewareChain will attatch all given middlewares to your SendFunc. The middlewares will be executed in the same order as your input.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents an AMQP server used within the RPC framework. The server uses bindings to keep a list of handler functions.

func NewServer

func NewServer(url string) *Server

NewServer will return a pointer to a new Server.

func (*Server) AddMiddleware

func (s *Server) AddMiddleware(m ServerMiddlewareFunc) *Server

AddMiddleware will add a ServerMiddleware to the list of middlewares to be triggered before the handle func for each request.

func (*Server) Bind

func (s *Server) Bind(binding HandlerBinding)

Bind will add a HandlerBinding to the list of servers to serve.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe()

ListenAndServe will dial the RabbitMQ message bus, set up all the channels, consume from all RPC server queues and monitor to connection to ensure the server is always connected.

func (*Server) OnStarted

func (s *Server) OnStarted(f OnStartedFunc)

OnStarted can be used to hook into the connections/channels that the server is using. This can be useful if you want more control over amqp directly. The OnStartedFunc will be executed after ListenAndServe is called. Note that this function is blocking and the server won't continue it's startup until it has finished executing.

server := NewServer(url)
server.OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) {
	// Do something with amqp connections/channels.
})

func (*Server) Restart

func (s *Server) Restart()

Restart will gracefully disconnect from AMQP exactly like `Stop` but instead of returning from `ListenAndServe` it will set everything up again from scratch and start listening again. This can be useful if a server restart is wanted without running `ListenAndServe` in a loop.

func (*Server) Stop

func (s *Server) Stop()

Stop will gracefully disconnect from AMQP after draining first incoming then outgoing messages. This method won't wait for server shutdown to complete, you should instead wait for ListenAndServe to exit.

func (*Server) WithDialConfig

func (s *Server) WithDialConfig(c amqp.Config) *Server

WithDialConfig sets the dial config used for the server.

func (*Server) WithDialTimeout

func (s *Server) WithDialTimeout(timeout time.Duration) *Server

WithDialTimeout sets the DialTimeout and handshake deadline to timeout.

func (*Server) WithExchanges

func (s *Server) WithExchanges(exchanges ...ExchangeDeclareSettings) *Server

WithExchanges adds exchanges exchange to the list of exchanges that should be declared on startup.

func (*Server) WithLogger

func (s *Server) WithLogger(logger *slog.Logger) *Server

WithLogger sets the logger to use for error and debug logging. By default the library will log errors using the logger from slog.Default. Some logs will contain data contained in a amqp.Delivery or amqp.Publishing, including any headers. If you want to avoid logging some of the fields you can use an slog.Handler to filter out the fields you don't want to log.

func (*Server) WithName

func (s *Server) WithName(name string) *Server

WithName sets the connection name prefix for the server. Every connection the server uses will have this prefix in it's connection name. The connection name is `myprefix.publisher` and `myprefix.consumer` for the publisher and consumer connection respectively. This can be overridden by using the `WithDialConfig` method to set the connection_name property.

func (*Server) WithRestartChan

func (s *Server) WithRestartChan(ch chan struct{}) *Server

WithRestartChan will add a channel to the server that will trigger a restart when it's triggered.

func (*Server) WithTLS

func (s *Server) WithTLS(tlsConfig *tls.Config) *Server

WithTLS sets the TLS config in the dial config for the server.

type ServerMiddlewareFunc

type ServerMiddlewareFunc func(next HandlerFunc) HandlerFunc

ServerMiddlewareFunc represent a function that can be used as a middleware.

For example:

func myMiddle(next HandlerFunc) HandlerFunc {

	// Preinitialization of middleware here.

	return func(ctx context.Context, rw *ResponseWriter d amqp.Delivery) {
		// Before handler execution here.

		// Execute the handler.
		next(ctx, rw, d)

		// After execution here.
	}
}

s := New("url")

// Add middleware to specific handler.
s.Bind(DirectBinding("foobar", myMiddle(HandlerFunc)))

// Add middleware to all handlers on the server.
s.AddMiddleware(myMiddle)

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL