Documentation ¶
Index ¶
- Constants
- Variables
- func ContextWithQueueName(ctx context.Context, queueName string) context.Context
- func ContextWithShutdownChan(ctx context.Context, ch chan struct{}) context.Context
- func QueueNameFromContext(ctx context.Context) (string, bool)
- func ShutdownChanFromContext(ctx context.Context) (chan struct{}, bool)
- type AwareAcknowledger
- type Certificates
- type Client
- func (c *Client) AddMiddleware(m ClientMiddlewareFunc) *Client
- func (c *Client) OnStarted(f OnStartedFunc)
- func (c *Client) Send(r *Request) (*amqp.Delivery, error)
- func (c *Client) Stop()
- func (c *Client) WithConfirmMode(confirmMode bool) *Client
- func (c *Client) WithDialConfig(dc amqp.Config) *Client
- func (c *Client) WithDialTimeout(timeout time.Duration) *Client
- func (c *Client) WithLogger(logger *slog.Logger) *Client
- func (c *Client) WithMaxRetries(n int) *Client
- func (c *Client) WithName(name string) *Client
- func (c *Client) WithReplyToConsumerArgs(args amqp.Table) *Client
- func (c *Client) WithReplyToQueueDeclareArgs(args amqp.Table) *Client
- func (c *Client) WithTLS(tlsConfig *tls.Config) *Client
- func (c *Client) WithTimeout(t time.Duration) *Client
- type ClientMiddlewareFunc
- type ExchangeDeclareSettings
- type HandlerBinding
- func CreateBinding(queueName, exchangeName string, handler HandlerFunc) HandlerBinding
- func DirectBinding(routingKey string, handler HandlerFunc) HandlerBinding
- func HeadersBinding(queueName string, headers amqp.Table, handler HandlerFunc) HandlerBinding
- func TopicBinding(queueName, routingKey string, handler HandlerFunc) HandlerBinding
- func (b HandlerBinding) WithAutoAck(autoAck bool) HandlerBinding
- func (b HandlerBinding) WithBindHeaders(headers amqp.Table) HandlerBinding
- func (b HandlerBinding) WithConsumerArg(key string, val any) HandlerBinding
- func (b HandlerBinding) WithConsumerArgs(args amqp.Table) HandlerBinding
- func (b HandlerBinding) WithExchangeName(name string) HandlerBinding
- func (b HandlerBinding) WithExclusiveConsumer(exclusive bool) HandlerBinding
- func (b HandlerBinding) WithHandler(handler HandlerFunc) HandlerBinding
- func (b HandlerBinding) WithPrefetchCount(count int) HandlerBinding
- func (b HandlerBinding) WithQueueAutoDelete(autoDelete bool) HandlerBinding
- func (b HandlerBinding) WithQueueDeclareArg(key string, val any) HandlerBinding
- func (b HandlerBinding) WithQueueDeclareArgs(args amqp.Table) HandlerBinding
- func (b HandlerBinding) WithQueueDurable(durable bool) HandlerBinding
- func (b HandlerBinding) WithQueueExclusive(exclusive bool) HandlerBinding
- func (b HandlerBinding) WithQueueName(name string) HandlerBinding
- func (b HandlerBinding) WithRoutingKey(key string) HandlerBinding
- func (b HandlerBinding) WithSkipQueueDeclare(skip bool) HandlerBinding
- type HandlerFunc
- type MockAcknowledger
- type OnStartedFunc
- type Request
- func (r *Request) AddMiddleware(m ClientMiddlewareFunc) *Request
- func (r *Request) WithBody(b string) *Request
- func (r *Request) WithContentType(ct string) *Request
- func (r *Request) WithContext(ctx context.Context) *Request
- func (r *Request) WithCorrelationID(id string) *Request
- func (r *Request) WithExchange(e string) *Request
- func (r *Request) WithHeaders(h amqp.Table) *Request
- func (r *Request) WithMandatory(val bool) *Request
- func (r *Request) WithResponse(wr bool) *Request
- func (r *Request) WithRoutingKey(rk string) *Request
- func (r *Request) WithTimeout(t time.Duration) *Request
- func (r *Request) Write(p []byte) (int, error)
- func (r *Request) WriteHeader(header string, value interface{})
- type RequestMap
- type ResponseWriter
- type SendFunc
- type Server
- func (s *Server) AddMiddleware(m ServerMiddlewareFunc) *Server
- func (s *Server) Bind(binding HandlerBinding)
- func (s *Server) ListenAndServe()
- func (s *Server) OnStarted(f OnStartedFunc)
- func (s *Server) Restart()
- func (s *Server) Stop()
- func (s *Server) WithDialConfig(c amqp.Config) *Server
- func (s *Server) WithDialTimeout(timeout time.Duration) *Server
- func (s *Server) WithExchanges(exchanges ...ExchangeDeclareSettings) *Server
- func (s *Server) WithLogger(logger *slog.Logger) *Server
- func (s *Server) WithName(name string) *Server
- func (s *Server) WithRestartChan(ch chan struct{}) *Server
- func (s *Server) WithTLS(tlsConfig *tls.Config) *Server
- type ServerMiddlewareFunc
Constants ¶
const ( ExchangeTypeDirect = "direct" ExchangeTypeTopic = "topic" ExchangeTypeHeaders = "headers" )
The default exchange types that are available in RabbitMQ.
const ( DefaultExchangeNameDirect = "amq.direct" DefaultExchangeNameTopic = "amq.topic" DefaultExchangeNameHeaders = "amq.match" )
The default exchanges that are available in RabbitMQ.
const ( QueueTypeClassic = "classic" QueueTypeQuorum = "quorum" )
The different queue types that are available in RabbitMQ.
Variables ¶
var ( // ErrRequestReturned can be returned by Client#Send() when the server // returns the message. For example when mandatory is set but the message // can not be routed. ErrRequestReturned = errors.New("publishing returned") // ErrRequestRejected can be returned by Client#Send() when the server Nacks // the message. This can happen if there is some problem inside the amqp // server. To check if the error returned is a ErrRequestReturned error, use // errors.Is(err, ErrRequestRejected). ErrRequestRejected = errors.New("publishing Nacked") // ErrRequestTimeout is an error returned when a client request does not // receive a response within the client timeout duration. To check if the // error returned is an ErrRequestTimeout error, use errors.Is(err, // ErrRequestTimeout). ErrRequestTimeout = errors.New("request timed out") )
var ErrUnexpectedConnClosed = errors.New("unexpected connection close without specific error")
ErrUnexpectedConnClosed is returned by ListenAndServe() if the server shuts down without calling Stop() and if AMQP does not give an error when said shutdown happens.
Functions ¶
func ContextWithQueueName ¶
ContextWithQueueName adds the given queueName to the provided context.
func ContextWithShutdownChan ¶
ContextWithShutdownChan adds a shutdown chan to the given context.
func QueueNameFromContext ¶
QueueNameFromContext returns the queue name for the current request.
func ShutdownChanFromContext ¶
ShutdownChanFromContext returns the shutdown chan.
Types ¶
type AwareAcknowledger ¶
type AwareAcknowledger struct { Acknowledger amqp.Acknowledger Handled bool }
AwareAcknowledger implements the amqp.Acknowledger interface with the addition that it can tell if a message has been acked in any way.
func NewAwareAcknowledger ¶
func NewAwareAcknowledger(acknowledger amqp.Acknowledger) *AwareAcknowledger
NewAwareAcknowledger returns the passed acknowledger as an AwareAcknowledger.
func (*AwareAcknowledger) Ack ¶
func (a *AwareAcknowledger) Ack(tag uint64, multiple bool) error
Ack passes the Ack down to the underlying Acknowledger.
type Certificates ¶
Certificates represents the certificate, the key and the CA to use when using RabbitMQ with TLS or the certificate and key when using as TLS configuration for RPC server. The fields should be the path to files stored on disk and will be passed to ioutil.ReadFile and tls.LoadX509KeyPair.
func (*Certificates) TLSConfig ¶
func (c *Certificates) TLSConfig() *tls.Config
TLSConfig will return a *tls.Config type based on the files set in the Certificates type.
type Client ¶
type Client struct { // Sender is the main send function called after all middlewares has been // chained and called. This field can be overridden to simplify testing. Sender SendFunc // contains filtered or unexported fields }
Client represents an AMQP client used within a RPC framework. This client can be used to communicate with RPC servers.
func NewClient ¶
NewClient will return a pointer to a new Client. There are two ways to manage the connection that will be used by the client (i.e. when using TLS).
The first one is to use the Certificates type and just pass the filenames to the client certificate, key and the server CA. If this is done the function will handle the reading of the files.
It is also possible to create a custom amqp.Config with whatever configuration desired and that will be used as dial configuration when connection to the message bus.
func (*Client) AddMiddleware ¶
func (c *Client) AddMiddleware(m ClientMiddlewareFunc) *Client
AddMiddleware will add a middleware which will be executed on request.
func (*Client) OnStarted ¶
func (c *Client) OnStarted(f OnStartedFunc)
OnStarted can be used to hook into the connections/channels that the client is using. This can be useful if you want more control over amqp directly. Note that since the client is lazy and won't connect until the first .Send() the provided OnStartedFunc won't be called until then. Also note that this is blocking and the client won't continue it's startup until this function has finished executing.
client := NewClient(url) client.OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) { // Do something with amqp connections/channels. })
func (*Client) Stop ¶
func (c *Client) Stop()
Stop will gracefully disconnect from AMQP. It is not guaranteed that all in flight requests or responses are handled before the disconnect. Instead the user should ensure that all calls to c.Send() has returned before calling c.Stop().
func (*Client) WithConfirmMode ¶
WithConfirmMode sets the confirm-mode on the client. This causes the client to wait for confirmations, and if none arrives or the confirmation is marked as Nack, Client#Send() returns a corresponding error.
func (*Client) WithDialConfig ¶
WithDialConfig sets the dial config used for the client.
func (*Client) WithDialTimeout ¶
WithDialTimeout sets the DialTimeout and handshake deadlines to timeout.
func (*Client) WithLogger ¶
WithLogger sets the logger to use for error and debug logging. By default the library will log errors using the logger from slog.Default. Some logs will contain data contained in a amqp.Delivery or amqp.Publishing, including any headers. If you want to avoid logging some of the fields you can use an slog.Handler to filter out the fields you don't want to log.
func (*Client) WithMaxRetries ¶
WithMaxRetries sets the maximum amount of times the client will retry sending the request before giving up and returning the error to the caller of c.Send(). This retry will persist during reconnects.
func (*Client) WithName ¶
WithName sets the name of the client, it will be used the creation of the reply-to queue, consumer tags and connection names.
func (*Client) WithReplyToConsumerArgs ¶
WithReplyToConsumerArgs will set the consumer args used when the client starts its reply-to consumer.
func (*Client) WithReplyToQueueDeclareArgs ¶
WithReplyToQueueDeclareArgs will set the settings used when declaring the reply-to queue.
type ClientMiddlewareFunc ¶
ClientMiddlewareFunc represents a function that can be used as a middleware.
type ExchangeDeclareSettings ¶
type ExchangeDeclareSettings struct { // Name is the name of the exchange. Name string // Type is the exchange type. Type string // Durable sets the durable flag. Durable exchanges survives server restart. Durable bool // AutoDelete sets the auto-delete flag, this ensures the exchange is // deleted when it isn't bound to any more. AutoDelete bool // Args sets the arguments table used. Args amqp.Table }
ExchangeDeclareSettings is the settings that will be used when a handler is mapped to a fanout exchange and an exchange is declared.
type HandlerBinding ¶
type HandlerBinding struct { // QueueName is the name of the queue that the handler should be bound to. QueueName string // ExchangeName is the exchange that the queue should be bound to. ExchangeName string // RoutingKey is the routing key that the queue should be bound to. RoutingKey string // BindHeaders is the headers that the queue should be bound to. BindHeaders amqp.Table // Handler is the function that should be called when a message is // received. Handler HandlerFunc // QueueDurable sets the durable flag. Should not be used together with // QueueExclusive since that will fail in a future RabbitMQ version. // Setting setting it to false not work with quorum queues. QueueDurable bool // Set the queue to be automatically deleted when the last consumer stops. QueueAutoDelete bool // SkipQueueDeclare will skip the queue declaration. This can be useful when // you are migrating queue types or arguments and want to avoid the // PRECONDITION_FAILED error. SkipQueueDeclare bool // QueueExclusive sets the exclusive flag when declaring the queue. // Exclusive queues can only be used by the connection that created them. // If exclusive is true, the queue will be deleted during a network failure // making incoming requests fail before the reconnect and redeclare // happens. QueueExclusive bool // QueueDeclareArgs sets any extra queue arguments. QueueDeclareArgs amqp.Table // AutoAck sets the auto-ack flag on the consumer. AutoAck bool // PrefetchCount sets the prefetch count for the consumer. This is only // really usable when AutoAck is also set to false. PrefetchCount int // ExclusiveConsumer sets the exclusive flag when starting the consumer. // This ensures that the bound handler is the only consumer of its queue. // This works only with classic queues and setting this to true while using // a quorum queue will silently allow multiple consumers. ExclusiveConsumer bool // ConsumerArgs sets any extra consumer arguments. ConsumerArgs amqp.Table }
HandlerBinding holds information about how an exchange and a queue should be declared and bound. If the ExchangeName is not defined (an empty string), the queue will not be bound to the exchange but assumed to use the default match.
func CreateBinding ¶
func CreateBinding(queueName, exchangeName string, handler HandlerFunc) HandlerBinding
CreateBinding returns a HandlerBinding with default values set.
func DirectBinding ¶
func DirectBinding(routingKey string, handler HandlerFunc) HandlerBinding
DirectBinding returns a HandlerBinding to use for direct exchanges where each routing key will be mapped to one handler.
func HeadersBinding ¶
func HeadersBinding(queueName string, headers amqp.Table, handler HandlerFunc) HandlerBinding
HeadersBinding returns a HandlerBinding to use for header exchanges that will match on specific headers. The headers are specified as an amqp.Table. The default exchange amq.match will be used.
func TopicBinding ¶
func TopicBinding(queueName, routingKey string, handler HandlerFunc) HandlerBinding
TopicBinding returns a HandlerBinding to use for topic exchanges. The default exchange (amq.topic) will be used. The topic is matched on the routing key.
func (HandlerBinding) WithAutoAck ¶
func (b HandlerBinding) WithAutoAck(autoAck bool) HandlerBinding
WithAutoAck sets the auto-ack flag on the consumer.
func (HandlerBinding) WithBindHeaders ¶
func (b HandlerBinding) WithBindHeaders(headers amqp.Table) HandlerBinding
WithBindHeaders sets the headers that the queue should be bound to.
func (HandlerBinding) WithConsumerArg ¶
func (b HandlerBinding) WithConsumerArg(key string, val any) HandlerBinding
WithConsumerArg sets one extra consumer argument, this ensures that the consumer default arguments are not overwritten.
func (HandlerBinding) WithConsumerArgs ¶
func (b HandlerBinding) WithConsumerArgs(args amqp.Table) HandlerBinding
WithConsumerArgs sets any extra consumer arguments.
func (HandlerBinding) WithExchangeName ¶
func (b HandlerBinding) WithExchangeName(name string) HandlerBinding
WithExchangeName sets the exchange that the queue should be bound to.
func (HandlerBinding) WithExclusiveConsumer ¶
func (b HandlerBinding) WithExclusiveConsumer(exclusive bool) HandlerBinding
WithExclusiveConsumer sets the exclusive flag when starting the consumer. This ensures that the bound handler is the only consumer of its queue. This works only with classic queues and setting this to true while using a quorum queue will silently allow multiple consumers.
func (HandlerBinding) WithHandler ¶
func (b HandlerBinding) WithHandler(handler HandlerFunc) HandlerBinding
WithHandler sets the function that should be called when a message is received.
func (HandlerBinding) WithPrefetchCount ¶
func (b HandlerBinding) WithPrefetchCount(count int) HandlerBinding
WithPrefetchCount sets the prefetch count for the consumer.
func (HandlerBinding) WithQueueAutoDelete ¶
func (b HandlerBinding) WithQueueAutoDelete(autoDelete bool) HandlerBinding
WithQueueAutoDelete sets the queue to be automatically deleted when the last consumer stops.
func (HandlerBinding) WithQueueDeclareArg ¶
func (b HandlerBinding) WithQueueDeclareArg(key string, val any) HandlerBinding
WithQueueDeclareArg sets one queue argument, this ensures that the queue default arguments are not overwritten.
func (HandlerBinding) WithQueueDeclareArgs ¶
func (b HandlerBinding) WithQueueDeclareArgs(args amqp.Table) HandlerBinding
WithQueueDeclareArgs sets extra queue arguments.
func (HandlerBinding) WithQueueDurable ¶
func (b HandlerBinding) WithQueueDurable(durable bool) HandlerBinding
WithQueueDurable sets the durable flag for the queue.
func (HandlerBinding) WithQueueExclusive ¶
func (b HandlerBinding) WithQueueExclusive(exclusive bool) HandlerBinding
WithQueueExclusive sets the exclusive flag when declaring the queue. Exclusive queues can only be used by the connection that created them. If exclusive is true, the queue will be deleted during a network failure making incoming requests fail before the reconnect and redeclare happens.
func (HandlerBinding) WithQueueName ¶
func (b HandlerBinding) WithQueueName(name string) HandlerBinding
WithQueueName sets the name of the queue that the handler should be bound to.
func (HandlerBinding) WithRoutingKey ¶
func (b HandlerBinding) WithRoutingKey(key string) HandlerBinding
WithRoutingKey sets the routing key that the queue should be bound to.
func (HandlerBinding) WithSkipQueueDeclare ¶
func (b HandlerBinding) WithSkipQueueDeclare(skip bool) HandlerBinding
WithSkipQueueDeclare sets the flag to skip the queue declaration.
type HandlerFunc ¶
type HandlerFunc func(context.Context, *ResponseWriter, amqp.Delivery)
HandlerFunc is the function that handles all request based on the routing key.
func ServerMiddlewareChain ¶
func ServerMiddlewareChain(next HandlerFunc, m ...ServerMiddlewareFunc) HandlerFunc
ServerMiddlewareChain will attatch all given middlewares to your HandlerFunc. The middlewares will be executed in the same order as your input.
For example:
s := New("url") s.Bind(DirectBinding( "foobar", ServerMiddlewareChain( myHandler, middlewareOne, middlewareTwo, middlewareThree, ), ))
type MockAcknowledger ¶
MockAcknowledger is a mocked amqp.Acknowledger, useful for tests.
func (*MockAcknowledger) Ack ¶
func (ma *MockAcknowledger) Ack(_ uint64, _ bool) error
Ack increases Acks.
type OnStartedFunc ¶
type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)
OnStartedFunc can be registered at Server.OnStarted(f) and Client.OnStarted(f). This is used when you want to do more setup on the connections and/or channels from amqp, for example setting Qos, NotifyPublish etc.
type Request ¶
type Request struct { // Exchange is the exchange to which the rquest will be published when // passing it to the clients send function. Exchange string // Routing key is the routing key that will be used in the amqp.Publishing // request. RoutingKey string // Mandatory will set the mandatory flag on the request. When this is true // the request must be routable or the Send will return a NO_ROUTE amqp // error. Mandatory bool // Reply is a boolean value telling if the request should wait for a reply // or just send the request without waiting. Reply bool // Timeout is the time we should wait after a request is published before // we assume the request got lost. Timeout time.Duration // Publishing is the publising that are going to be published. Publishing amqp.Publishing // Context is a context which you can use to pass data from where the // request is created to middlewares. By default this will be a // context.Background() Context context.Context //nolint:containedctx // Needed in the struct. // contains filtered or unexported fields }
Request is a requet to perform with the client.
func NewRequest ¶
func NewRequest() *Request
NewRequest will generate a new request to be published. The default request will use the content type "text/plain" and always wait for reply.
func (*Request) AddMiddleware ¶
func (r *Request) AddMiddleware(m ClientMiddlewareFunc) *Request
AddMiddleware will add a middleware which will be executed when the request is published.
func (*Request) WithBody ¶
WithBody will convert a string to a byte slice and add as the body passed for the request.
func (*Request) WithContentType ¶
WithContentType will update the content type passed in the header of the request. This value will bee set as the ContentType in the amqp.Publishing type but also preserved as a header value.
func (*Request) WithContext ¶
WithContext will set the context on the request.
func (*Request) WithCorrelationID ¶
WithCorrelationID will add/overwrite the correlation ID used for the request and set it on the Publishing. This string must be unique for each request.
func (*Request) WithExchange ¶
WithExchange will set the exchange on to which the request will be published.
func (*Request) WithHeaders ¶
WithHeaders will set the full amqp.Table as the headers for the request. Note that this will overwrite anything previously set on the headers.
func (*Request) WithMandatory ¶
WithMandatory will set the mandatory flag on the request. When this is true, the request must be routable, eg. a queue must be bound in such a way that RabbitMQ can route the message to that queue.
func (*Request) WithResponse ¶
WithResponse sets the value determining wether the request should wait for a response or not. A request that does not require a response will only catch errors occurring before the reuqest has been published.
func (*Request) WithRoutingKey ¶
WithRoutingKey will set the routing key for the request.
func (*Request) WithTimeout ¶
WithTimeout will set the client timeout used when publishing messages. t will be rounded using the duration's Round function to the nearest multiple of a millisecond. Rounding will be away from zero.
func (*Request) Write ¶
Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.
func (*Request) WriteHeader ¶
WriteHeader will write a header for the specified key.
type RequestMap ¶
type RequestMap struct {
// contains filtered or unexported fields
}
RequestMap keeps track of requests based on their DeliveryTag and/or CorrelationID.
func (*RequestMap) GetByCorrelationID ¶
func (m *RequestMap) GetByCorrelationID(key string) (*Request, bool)
GetByCorrelationID returns the request with the provided correlation id.
func (*RequestMap) GetByDeliveryTag ¶
func (m *RequestMap) GetByDeliveryTag(key uint64) (*Request, bool)
GetByDeliveryTag returns the request with the provided delivery tag.
func (*RequestMap) Set ¶
func (m *RequestMap) Set(r *Request)
Set will add r to m so it can be fetched later using it's correlation id or delivery tag.
type ResponseWriter ¶
type ResponseWriter struct {
Publishing *amqp.Publishing
}
ResponseWriter is used by a handler to construct an RPC response. The ResponseWriter may NOT be used after the handler has returned.
Because the ResponseWriter implements io.Writer you can for example use it to write json:
encoder := json.NewEncoder(responseWriter) encoder.Encode(dataObject)
func (*ResponseWriter) Write ¶
func (rw *ResponseWriter) Write(p []byte) (int, error)
Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.
func (*ResponseWriter) WriteHeader ¶
func (rw *ResponseWriter) WriteHeader(header string, value interface{})
WriteHeader will write a header for the specified key.
type SendFunc ¶
SendFunc represents the function that Send does. It takes a Request as input and returns a delivery and an error.
func ClientMiddlewareChain ¶
func ClientMiddlewareChain(next SendFunc, m ...ClientMiddlewareFunc) SendFunc
ClientMiddlewareChain will attatch all given middlewares to your SendFunc. The middlewares will be executed in the same order as your input.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server represents an AMQP server used within the RPC framework. The server uses bindings to keep a list of handler functions.
func (*Server) AddMiddleware ¶
func (s *Server) AddMiddleware(m ServerMiddlewareFunc) *Server
AddMiddleware will add a ServerMiddleware to the list of middlewares to be triggered before the handle func for each request.
func (*Server) Bind ¶
func (s *Server) Bind(binding HandlerBinding)
Bind will add a HandlerBinding to the list of servers to serve.
func (*Server) ListenAndServe ¶
func (s *Server) ListenAndServe()
ListenAndServe will dial the RabbitMQ message bus, set up all the channels, consume from all RPC server queues and monitor to connection to ensure the server is always connected.
func (*Server) OnStarted ¶
func (s *Server) OnStarted(f OnStartedFunc)
OnStarted can be used to hook into the connections/channels that the server is using. This can be useful if you want more control over amqp directly. The OnStartedFunc will be executed after ListenAndServe is called. Note that this function is blocking and the server won't continue it's startup until it has finished executing.
server := NewServer(url) server.OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) { // Do something with amqp connections/channels. })
func (*Server) Restart ¶
func (s *Server) Restart()
Restart will gracefully disconnect from AMQP exactly like `Stop` but instead of returning from `ListenAndServe` it will set everything up again from scratch and start listening again. This can be useful if a server restart is wanted without running `ListenAndServe` in a loop.
func (*Server) Stop ¶
func (s *Server) Stop()
Stop will gracefully disconnect from AMQP after draining first incoming then outgoing messages. This method won't wait for server shutdown to complete, you should instead wait for ListenAndServe to exit.
func (*Server) WithDialConfig ¶
WithDialConfig sets the dial config used for the server.
func (*Server) WithDialTimeout ¶
WithDialTimeout sets the DialTimeout and handshake deadline to timeout.
func (*Server) WithExchanges ¶
func (s *Server) WithExchanges(exchanges ...ExchangeDeclareSettings) *Server
WithExchanges adds exchanges exchange to the list of exchanges that should be declared on startup.
func (*Server) WithLogger ¶
WithLogger sets the logger to use for error and debug logging. By default the library will log errors using the logger from slog.Default. Some logs will contain data contained in a amqp.Delivery or amqp.Publishing, including any headers. If you want to avoid logging some of the fields you can use an slog.Handler to filter out the fields you don't want to log.
func (*Server) WithName ¶
WithName sets the connection name prefix for the server. Every connection the server uses will have this prefix in it's connection name. The connection name is `myprefix.publisher` and `myprefix.consumer` for the publisher and consumer connection respectively. This can be overridden by using the `WithDialConfig` method to set the connection_name property.
func (*Server) WithRestartChan ¶
WithRestartChan will add a channel to the server that will trigger a restart when it's triggered.
type ServerMiddlewareFunc ¶
type ServerMiddlewareFunc func(next HandlerFunc) HandlerFunc
ServerMiddlewareFunc represent a function that can be used as a middleware.
For example:
func myMiddle(next HandlerFunc) HandlerFunc { // Preinitialization of middleware here. return func(ctx context.Context, rw *ResponseWriter d amqp.Delivery) { // Before handler execution here. // Execute the handler. next(ctx, rw, d) // After execution here. } } s := New("url") // Add middleware to specific handler. s.Bind(DirectBinding("foobar", myMiddle(HandlerFunc))) // Add middleware to all handlers on the server. s.AddMiddleware(myMiddle)