Documentation ¶
Index ¶
- Constants
- Variables
- func DefaultDialer(network, addr string) (net.Conn, error)
- type AwareAcknowledger
- type Certificates
- type Client
- func (c *Client) AddMiddleware(m ClientMiddlewareFunc) *Client
- func (c *Client) OnStarted(f OnStartedFunc)
- func (c *Client) Send(r *Request) (*amqp.Delivery, error)
- func (c *Client) Stop()
- func (c *Client) WithConfirmMode(confirmMode bool) *Client
- func (c *Client) WithConsumeSettings(s ConsumeSettings) *Client
- func (c *Client) WithDebugLogger(f LogFunc) *Client
- func (c *Client) WithDialConfig(dc amqp.Config) *Client
- func (c *Client) WithErrorLogger(f LogFunc) *Client
- func (c *Client) WithMaxRetries(n int) *Client
- func (c *Client) WithPublishSettings(s PublishSettings) *Client
- func (c *Client) WithQueueDeclareSettings(s QueueDeclareSettings) *Client
- func (c *Client) WithTLS(tls *tls.Config) *Client
- func (c *Client) WithTimeout(t time.Duration) *Client
- type ClientMiddlewareFunc
- type ConsumeSettings
- type Dialer
- type ExchangeDeclareSettings
- type HandlerBinding
- func DirectBinding(routingKey string, handler HandlerFunc) HandlerBinding
- func FanoutBinding(exchangeName string, handler HandlerFunc) HandlerBinding
- func HeadersBinding(queueName string, headers amqp.Table, handler HandlerFunc) HandlerBinding
- func TopicBinding(queueName, routingKey string, handler HandlerFunc) HandlerBinding
- type HandlerFunc
- type LogFunc
- type MockAcknowledger
- type OnStartedFunc
- type PublishSettings
- type QueueDeclareSettings
- type Request
- func (r *Request) AddMiddleware(m ClientMiddlewareFunc) *Request
- func (r *Request) AfterTimeout() <-chan time.Time
- func (r *Request) WithBody(b string) *Request
- func (r *Request) WithContentType(ct string) *Request
- func (r *Request) WithContext(ctx context.Context) *Request
- func (r *Request) WithCorrelationID(id string) *Request
- func (r *Request) WithExchange(e string) *Request
- func (r *Request) WithHeaders(h amqp.Table) *Request
- func (r *Request) WithResponse(wr bool) *Request
- func (r *Request) WithRoutingKey(rk string) *Request
- func (r *Request) WithTimeout(t time.Duration) *Request
- func (r *Request) Write(p []byte) (int, error)
- func (r *Request) WriteHeader(header string, value interface{})
- type RequestMap
- type ResponseWriter
- type SendFunc
- type Server
- func (s *Server) AddMiddleware(m ServerMiddlewareFunc) *Server
- func (s *Server) Bind(binding HandlerBinding)
- func (s *Server) ListenAndServe()
- func (s *Server) OnStarted(f OnStartedFunc)
- func (s *Server) Stop()
- func (s *Server) WithAutoAck(b bool) *Server
- func (s *Server) WithConsumeSettings(settings ConsumeSettings) *Server
- func (s *Server) WithDebugLogger(f LogFunc) *Server
- func (s *Server) WithDialConfig(c amqp.Config) *Server
- func (s *Server) WithErrorLogger(f LogFunc) *Server
- func (s *Server) WithExchangeDeclareSettings(settings ExchangeDeclareSettings) *Server
- func (s *Server) WithQoSPrefetchCount(c int) *Server
- func (s *Server) WithQueueDeclareSettings(settings QueueDeclareSettings) *Server
- func (s *Server) WithTLS(tls *tls.Config) *Server
- type ServerMiddlewareFunc
Constants ¶
const ( ExchangeDirect = "direct" ExchangeFanout = "fanout" ExchangeTopic = "topic" ExchangeHeaders = "headers" )
Exchanges are enteties where messages are published. This defines the available entities based on https://www.rabbitmq.com/tutorials/amqp-concepts.html.
const ( // CtxQueueName can be used to get the queue name from the context.Context // inside the HandlerFunc. CtxQueueName ctxKey = "queue_name" )
Variables ¶
var ( // ErrRequestReturned can be returned by Client#Send() when the server // returns the message. For example when mandatory is set but the message // can not be routed. ErrRequestReturned = errors.New("publishing returned") // ErrRequestRejected can be returned by Client#Send() when the server Nacks // the message. This can happen if there is some problem inside the amqp // server. To check if the error returned is a ErrRequestReturned error, use // errors.Is(err, ErrRequestRejected). ErrRequestRejected = errors.New("publishing Nacked") // ErrRequestTimeout is an error returned when a client request does not // receive a response within the client timeout duration. To check if the // error returned is an ErrRequestTimeout error, use errors.Is(err, // ErrRequestTimeout). ErrRequestTimeout = errors.New("request timed out") )
var ErrUnexpectedConnClosed = errors.New("unexpected connection close without specific error")
ErrUnexpectedConnClosed is returned by ListenAndServe() if the server shuts down without calling Stop() and if AMQP does not give an error when said shutdown happens.
Functions ¶
Types ¶
type AwareAcknowledger ¶ added in v0.10.1
type AwareAcknowledger struct { Acknowledger amqp.Acknowledger Handled bool }
AwareAcknowledger implements the amqp.Acknowledger interface with the addition that it can tell if a message has been acked in any way.
func NewAwareAcknowledger ¶ added in v0.10.1
func NewAwareAcknowledger(acknowledger amqp.Acknowledger) *AwareAcknowledger
NewAwareAcknowledger returns the passed acknowledger as an AwareAcknowledger.
func (*AwareAcknowledger) Ack ¶ added in v0.10.1
func (a *AwareAcknowledger) Ack(tag uint64, multiple bool) error
Ack passes the Ack down to the underlying Acknowledger.
type Certificates ¶
Certificates represents the certificate, the key and the CA to use when using RabbitMQ with TLS or the certificate and key when using as TLS configuration for RPC server. The fields should be the path to files stored on disk and will be passed to ioutil.ReadFile and tls.LoadX509KeyPair.
func (*Certificates) TLSConfig ¶
func (c *Certificates) TLSConfig() *tls.Config
TLSConfig will return a *tls.Config type based on the files set in the Certificates type.
type Client ¶
type Client struct { // Sender is the main send function called after all middlewares has been // chained and called. This field can be overridden to simplify testing. Sender SendFunc // contains filtered or unexported fields }
Client represents an AMQP client used within a RPC framework. This client can be used to communicate with RPC servers.
func NewClient ¶
NewClient will return a pointer to a new Client. There are two ways to manage the connection that will be used by the client (i.e. when using TLS).
The first one is to use the Certificates type and just pass the filenames to the client certificate, key and the server CA. If this is done the function will handle the reading of the files.
It is also possible to create a custom amqp.Config with whatever configuration desired and that will be used as dial configuration when connection to the message bus.
func (*Client) AddMiddleware ¶
func (c *Client) AddMiddleware(m ClientMiddlewareFunc) *Client
AddMiddleware will add a middleware which will be executed on request.
func (*Client) OnStarted ¶
func (c *Client) OnStarted(f OnStartedFunc)
OnStarted can be used to hook into the connections/channels that the client is using. This can be useful if you want more control over amqp directly. Note that since the client is lazy and won't connect until the first .Send() the provided OnStartedFunc won't be called until then. Also note that this is blocking and the client won't continue it's startup until this function has finished executing.
client := NewClient(url) client.OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) { // Do something with amqp connections/channels. })
func (*Client) Stop ¶
func (c *Client) Stop()
Stop will gracefully disconnect from AMQP. It is not guaranteed that all in flight requests or responses are handled before the disconnect. Instead the user should ensure that all calls to c.Send() has returned before calling c.Stop().
func (*Client) WithConfirmMode ¶ added in v0.11.0
WithConfirmMode sets the confirm-mode on the client. This causes the client to wait for confirmations, and if none arrives or the confirmation is marked as Nack, Client#Send() returns a corresponding error.
func (*Client) WithConsumeSettings ¶
func (c *Client) WithConsumeSettings(s ConsumeSettings) *Client
WithConsumeSettings will set the settings used when consuming in the client globally.
func (*Client) WithDebugLogger ¶
WithDebugLogger sets the logger to use for debug logging.
func (*Client) WithDialConfig ¶
WithDialConfig sets the dial config used for the client.
func (*Client) WithErrorLogger ¶
WithErrorLogger sets the logger to use for error logging.
func (*Client) WithMaxRetries ¶ added in v0.10.1
WithMaxRetries sets the maximum amount of times the client will retry sending the request before giving up and returning the error to the caller of c.Send(). This retry will persist during reconnects.
func (*Client) WithPublishSettings ¶
func (c *Client) WithPublishSettings(s PublishSettings) *Client
WithPublishSettings will set the client publishing settings when publishing messages.
func (*Client) WithQueueDeclareSettings ¶
func (c *Client) WithQueueDeclareSettings(s QueueDeclareSettings) *Client
WithQueueDeclareSettings will set the settings used when declaring queues for the client globally.
type ClientMiddlewareFunc ¶
ClientMiddlewareFunc represents a function that can be used as a middleware.
type ConsumeSettings ¶
type ConsumeSettings struct { // Consumer sets the consumer tag used when consuming. Consumer string // AutoAck sets the auto-ack flag. When this is set to false, you must // manually ack any deliveries. This is always true for the Client when // consuming replies. AutoAck bool // Exclusive sets the exclusive flag. When this is set to true, no other // instances can consume from a given queue. This has no affect on the // Client when consuming replies where it's always set to true so that no // two clients can consume from the same reply-to queue. Exclusive bool // QoSPrefetchCount sets the prefetch-count. Set this to a value to ensure // that amqp-rpc won't prefetch all messages in the queue. This has no // effect on the Client which will always try to fetch everything. QoSPrefetchCount int // QoSPrefetchSize sets the prefetch-size. Set this to a value to ensure // that amqp-rpc won't prefetch all messages in the queue. QoSPrefetchSize int // Args sets the arguments table used. Args amqp.Table }
ConsumeSettings is the settings that will be used when the consumption on a specified queue is started.
type ExchangeDeclareSettings ¶
type ExchangeDeclareSettings struct { // Durable sets the durable flag. Durable exchanges survives server restart. Durable bool // AutoDelete sets the auto-delete flag, this ensures the exchange is // deleted when it isn't bound to any more. AutoDelete bool // Args sets the arguments table used. Args amqp.Table }
ExchangeDeclareSettings is the settings that will be used when a handler is mapped to a fanout exchange and an exchange is declared.
type HandlerBinding ¶
type HandlerBinding struct { QueueName string ExchangeName string ExchangeType string RoutingKey string BindHeaders amqp.Table Handler HandlerFunc }
HandlerBinding holds information about how an exchange and a queue should be declared and bound. If the ExchangeName is not defined (an empty string), the queue will not be bound to the exchange but assumed to use the default match.
func DirectBinding ¶
func DirectBinding(routingKey string, handler HandlerFunc) HandlerBinding
DirectBinding returns a HandlerBinding to use for direct exchanges where each routing key will be mapped to one handler.
func FanoutBinding ¶
func FanoutBinding(exchangeName string, handler HandlerFunc) HandlerBinding
FanoutBinding returns a HandlerBinding to use for fanout exchanges. These exchanges does not use the routing key. We do not use the default exchange (amq.fanout) since this would broadcast all messages everywhere.
func HeadersBinding ¶
func HeadersBinding(queueName string, headers amqp.Table, handler HandlerFunc) HandlerBinding
HeadersBinding returns a HandlerBinding to use for header exchanges that will match on specific headers. The heades are specified as an amqp.Table. The default exchange amq.match will be used.
func TopicBinding ¶
func TopicBinding(queueName, routingKey string, handler HandlerFunc) HandlerBinding
TopicBinding returns a HandlerBinding to use for topic exchanges. The default exchange (amq.topic) will be used. The topic is matched on the routing key.
type HandlerFunc ¶
type HandlerFunc func(context.Context, *ResponseWriter, amqp.Delivery)
HandlerFunc is the function that handles all request based on the routing key.
func ServerMiddlewareChain ¶
func ServerMiddlewareChain(next HandlerFunc, m ...ServerMiddlewareFunc) HandlerFunc
ServerMiddlewareChain will attatch all given middlewares to your HandlerFunc. The middlewares will be executed in the same order as your input.
For example:
s := New("url") s.Bind(DirectBinding( "foobar", ServerMiddlewareChain( myHandler, middlewareOne, middlewareTwo, middlewareThree, ), ))
type LogFunc ¶
type LogFunc func(format string, args ...interface{})
LogFunc is used for logging in amqp-rpc. It makes it possible to define your own logging.
Here is an example where the logger from the log package is used:
debugLogger := log.New(os.Stdout, "DEBUG - ", log.LstdFlags) errorLogger := log.New(os.Stdout, "ERROR - ", log.LstdFlags) server := NewServer(url) server.WithErrorLogger(errorLogger.Printf) server.WithDebugLogger(debugLogger.Printf)
It can also be used with for example a Logrus logger:
logger := logrus.New() logger.SetLevel(logrus.DebugLevel) logger.Formatter = &logrus.JSONFormatter{} s.WithErrorLogger(logger.Warnf) s.WithDebugLogger(logger.Debugf) client := NewClient(url) client.WithErrorLogger(logger.Errorf) client.WithDebugLogger(logger.Debugf)
type MockAcknowledger ¶ added in v0.10.1
MockAcknowledger is a mocked amqp.Acknowledger, useful for tests.
func (*MockAcknowledger) Ack ¶ added in v0.10.1
func (ma *MockAcknowledger) Ack(tag uint64, multiple bool) error
Ack increases Acks.
type OnStartedFunc ¶
type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)
OnStartedFunc can be registered at Server.OnStarted(f) and Client.OnStarted(f). This is used when you want to do more setup on the connections and/or channels from amqp, for example setting Qos, NotifyPublish etc.
type PublishSettings ¶
type PublishSettings struct { // Mandatory sets the mandatory flag. When this is true a Publish call will // be returned if it's not routable by the exchange. Mandatory bool // Immediate sets the immediate flag. When this is true a Publish call will // be returned if a consumer isn't directly available. Immediate bool // ConfirmMode puts the channel that messages are published over in // confirm mode. This makes sending requests more reliable at the cost // of some performance. Each publishing must be confirmed by the server. // See https://www.rabbitmq.com/confirms.html#publisher-confirms ConfirmMode bool }
PublishSettings is the settings that will be used when a message is about to be published to the message bus. These settings are only used by the Client and never by the Server. For the server, Mandatory or Immediate can be set on the ResponseWriter instead.
type QueueDeclareSettings ¶
type QueueDeclareSettings struct { // DeleteWhenUnused sets the auto-delete flag. It's recommended to have this // set to false so that amqp-rpc can reconnect and use the same queue while // keeping any messages in the queue. DeleteWhenUnused bool // Durable sets the durable flag. It's recommended to have this set to false // and instead use ha-mode for queues and messages. Durable bool // Exclusive sets the exclusive flag when declaring queues. This flag has // no effect on Clients reply-to queues which are never exclusive so it // can support reconnects properly. Exclusive bool // Args sets the arguments table used. Args amqp.Table }
QueueDeclareSettings is the settings that will be used when the response any kind of queue is declared. Se documentation for amqp.QueueDeclare for more information about these settings.
type Request ¶
type Request struct { // Exchange is the exchange to which the rquest will be published when // passing it to the clients send function. Exchange string // Routing key is the routing key that will be used in the amqp.Publishing // request. RoutingKey string // Reply is a boolean value telling if the request should wait for a reply // or just send the request without waiting. Reply bool // Timeout is the time we should wait after a request is published before // we assume the request got lost. Timeout time.Duration // Publishing is the publising that are going to be published. Publishing amqp.Publishing // Context is a context which you can use to pass data from where the // request is created to middlewares. By default this will be a // context.Background() Context context.Context // contains filtered or unexported fields }
Request is a requet to perform with the client.
func NewRequest ¶
func NewRequest() *Request
NewRequest will generate a new request to be published. The default request will use the content type "text/plain" and always wait for reply.
func (*Request) AddMiddleware ¶
func (r *Request) AddMiddleware(m ClientMiddlewareFunc) *Request
AddMiddleware will add a middleware which will be executed when the request is published.
func (*Request) AfterTimeout ¶ added in v0.11.0
AfterTimeout waits for the duration of the timeout.
func (*Request) WithBody ¶
WithBody will convert a string to a byte slice and add as the body passed for the request.
func (*Request) WithContentType ¶
WithContentType will update the content type passed in the header of the request. This value will bee set as the ContentType in the amqp.Publishing type but also preserved as a header value.
func (*Request) WithContext ¶
WithContext will set the context on the request.
func (*Request) WithCorrelationID ¶
WithCorrelationID will add/overwrite the correlation ID used for the request and set it on the Publishing.
func (*Request) WithExchange ¶
WithExchange will set the exchange on to which the request will be published.
func (*Request) WithHeaders ¶
WithHeaders will set the full amqp.Table as the headers for the request. Note that this will overwrite anything previously set on the headers.
func (*Request) WithResponse ¶
WithResponse sets the value determining wether the request should wait for a response or not. A request that does not require a response will only catch errors occurring before the reuqest has been published.
func (*Request) WithRoutingKey ¶
WithRoutingKey will set the routing key for the request.
func (*Request) WithTimeout ¶
WithTimeout will set the client timeout used when publishing messages. t will be rounded using the duration's Round function to the nearest multiple of a millisecond. Rounding will be away from zero.
func (*Request) Write ¶
Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.
func (*Request) WriteHeader ¶
WriteHeader will write a header for the specified key.
type RequestMap ¶ added in v0.11.0
type RequestMap struct {
// contains filtered or unexported fields
}
RequestMap keeps track of requests based on their DeliveryTag and/or CorrelationID.
func (*RequestMap) Delete ¶ added in v0.11.0
func (m *RequestMap) Delete(r *Request)
Delete will remove r from m.
func (*RequestMap) GetByCorrelationID ¶ added in v0.11.0
func (m *RequestMap) GetByCorrelationID(key string) (*Request, bool)
GetByCorrelationID returns the request with the provided correlation id.
func (*RequestMap) GetByDeliveryTag ¶ added in v0.11.0
func (m *RequestMap) GetByDeliveryTag(key uint64) (*Request, bool)
GetByDeliveryTag returns the request with the provided delivery tag.
func (*RequestMap) Set ¶ added in v0.11.0
func (m *RequestMap) Set(r *Request)
Set will add r to m so it can be fetched later using it's correlation id or delivery tag.
type ResponseWriter ¶
type ResponseWriter struct { Publishing *amqp.Publishing Mandatory bool Immediate bool }
ResponseWriter is used by a handler to construct an RPC response. The ResponseWriter may NOT be used after the handler has returned.
Because the ResponseWriter implements io.Writer you can for example use it to write json:
encoder := json.NewEncoder(responseWriter) encoder.Encode(dataObject)
func NewResponseWriter ¶
func NewResponseWriter(p *amqp.Publishing) *ResponseWriter
NewResponseWriter will create a new response writer with given amqp.Publishing.
func (*ResponseWriter) Write ¶
func (rw *ResponseWriter) Write(p []byte) (int, error)
Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.
func (*ResponseWriter) WriteHeader ¶
func (rw *ResponseWriter) WriteHeader(header string, value interface{})
WriteHeader will write a header for the specified key.
type SendFunc ¶
SendFunc represents the function that Send does. It takes a Request as input and returns a delivery and an error.
func ClientMiddlewareChain ¶
func ClientMiddlewareChain(next SendFunc, m ...ClientMiddlewareFunc) SendFunc
ClientMiddlewareChain will attatch all given middlewares to your SendFunc. The middlewares will be executed in the same order as your input.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server represents an AMQP server used within the RPC framework. The server uses bindings to keep a list of handler functions.
func (*Server) AddMiddleware ¶
func (s *Server) AddMiddleware(m ServerMiddlewareFunc) *Server
AddMiddleware will add a ServerMiddleware to the list of middlewares to be triggered before the handle func for each request.
func (*Server) Bind ¶
func (s *Server) Bind(binding HandlerBinding)
Bind will add a HandlerBinding to the list of servers to serve.
func (*Server) ListenAndServe ¶
func (s *Server) ListenAndServe()
ListenAndServe will dial the RabbitMQ message bus, set up all the channels, consume from all RPC server queues and monitor to connection to ensure the server is always connected.
func (*Server) OnStarted ¶
func (s *Server) OnStarted(f OnStartedFunc)
OnStarted can be used to hook into the connections/channels that the server is using. This can be useful if you want more control over amqp directly. The OnStartedFunc will be executed after ListenAndServe is called. Note that this function is blocking and the server won't continue it's startup until it has finished executing.
server := NewServer(url) server.OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) { // Do something with amqp connections/channels. })
func (*Server) Stop ¶
func (s *Server) Stop()
Stop will gracefully disconnect from AMQP after draining first incoming then outgoing messages. This method won't wait for server shutdown to complete, you should instead wait for ListenAndServe to exit.
func (*Server) WithAutoAck ¶ added in v0.10.1
WithAutoAck sets the AMQP servers auto-ack mode.
func (*Server) WithConsumeSettings ¶ added in v0.10.1
func (s *Server) WithConsumeSettings(settings ConsumeSettings) *Server
WithConsumeSettings sets configuration used when the server wants to start consuming from a queue.
func (*Server) WithDebugLogger ¶
WithDebugLogger sets the logger to use for debug logging.
func (*Server) WithDialConfig ¶
WithDialConfig sets the dial config used for the server.
func (*Server) WithErrorLogger ¶
WithErrorLogger sets the logger to use for error logging.
func (*Server) WithExchangeDeclareSettings ¶ added in v0.10.1
func (s *Server) WithExchangeDeclareSettings(settings ExchangeDeclareSettings) *Server
WithExchangeDeclareSettings sets configuration used when the server wants to declare exchanges.
func (*Server) WithQoSPrefetchCount ¶ added in v0.12.0
WithQoSPrefetchCount sets the AMQP servers QoS pre-fetch count.
func (*Server) WithQueueDeclareSettings ¶ added in v0.10.1
func (s *Server) WithQueueDeclareSettings(settings QueueDeclareSettings) *Server
WithQueueDeclareSettings sets configuration used when the server wants to declare queues.
type ServerMiddlewareFunc ¶
type ServerMiddlewareFunc func(next HandlerFunc) HandlerFunc
ServerMiddlewareFunc represent a function that can be used as a middleware.
For example:
func myMiddle(next HandlerFunc) HandlerFunc { // Preinitialization of middleware here. return func(ctx context.Context, rw *ResponseWriter d amqp.Delivery) { // Before handler execution here. // Execute the handler. next(ctx, rw, d) // After execution here. } } s := New("url") // Add middleware to specific handler. s.Bind(DirectBinding("foobar", myMiddle(HandlerFunc))) // Add middleware to all handlers on the server. s.AddMiddleware(myMiddle)