Documentation ¶
Overview ¶
Package amqp implements an AMQP transport.
Index ¶
- Constants
- func DefaultErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, ...)
- func EncodeJSONResponse(ctx context.Context, pub *amqp.Publishing, response interface{}) error
- func EncodeNopResponse(ctx context.Context, pub *amqp.Publishing, response interface{}) error
- func ReplyAndAckErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, ...)
- func ReplyErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, ...)
- func SingleNackRequeueErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, ...)
- type Channel
- type DecodeRequestFunc
- type DecodeResponseFunc
- type DefaultErrorResponse
- type EncodeRequestFunc
- type EncodeResponseFunc
- type ErrorEncoder
- type Publisher
- type PublisherOption
- type PublisherResponseFunc
- type RequestFunc
- func SetConsumeArgs(args amqp.Table) RequestFunc
- func SetConsumeAutoAck(autoAck bool) RequestFunc
- func SetContentEncoding(contentEncoding string) RequestFunc
- func SetContentType(contentType string) RequestFunc
- func SetCorrelationID(cid string) RequestFunc
- func SetNackSleepDuration(duration time.Duration) RequestFunc
- func SetPublishDeliveryMode(dmode uint8) RequestFunc
- func SetPublishExchange(publishExchange string) RequestFunc
- func SetPublishKey(publishKey string) RequestFunc
- type Subscriber
- type SubscriberFinalizerFunc
- type SubscriberOption
- func ServerFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption
- func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption
- func SubscriberBefore(before ...RequestFunc) SubscriberOption
- func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption
- func SubscriberErrorLogger(logger log.Logger) SubscriberOption
- type SubscriberResponseFunc
Constants ¶
const ( // ContextKeyExchange is the value of the reply Exchange in // amqp.Publish. ContextKeyExchange contextKey = iota // ContextKeyPublishKey is the value of the ReplyTo field in // amqp.Publish. ContextKeyPublishKey // ContextKeyNackSleepDuration is the duration to sleep for if the // service Nack and requeues a message. // This is to prevent sporadic send-resending of message // when a message is constantly Nack'd and requeued. ContextKeyNackSleepDuration // ContextKeyAutoAck is the value of autoAck field when calling // amqp.Channel.Consume. ContextKeyAutoAck // ContextKeyConsumeArgs is the value of consumeArgs field when calling // amqp.Channel.Consume. ContextKeyConsumeArgs )
Variables ¶
This section is empty.
Functions ¶
func DefaultErrorEncoder ¶
func DefaultErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)
DefaultErrorEncoder simply ignores the message. It does not reply nor Ack/Nack the message.
func EncodeJSONResponse ¶
func EncodeJSONResponse( ctx context.Context, pub *amqp.Publishing, response interface{}, ) error
EncodeJSONResponse marshals the response as JSON as part of the payload of the AMQP Publishing object.
func EncodeNopResponse ¶
func EncodeNopResponse( ctx context.Context, pub *amqp.Publishing, response interface{}, ) error
EncodeNopResponse is a response function that does nothing.
func ReplyAndAckErrorEncoder ¶
func ReplyAndAckErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)
ReplyAndAckErrorEncoder serializes the error message as a DefaultErrorResponse JSON and sends the message to the ReplyTo address then Acks the original message.
func ReplyErrorEncoder ¶
func ReplyErrorEncoder( ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing, )
ReplyErrorEncoder serializes the error message as a DefaultErrorResponse JSON and sends the message to the ReplyTo address.
func SingleNackRequeueErrorEncoder ¶
func SingleNackRequeueErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)
SingleNackRequeueErrorEncoder issues a Nack to the delivery with multiple flag set as false and requeue flag set as true. It does not reply the message.
Types ¶
type Channel ¶
type Channel interface { Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error Consume(queue, consumer string, autoAck, exclusive, noLocal, noWail bool, args amqp.Table) (<-chan amqp.Delivery, error) }
Channel is a channel interface to make testing possible. It is highly recommended to use *amqp.Channel as the interface implementation.
type DecodeRequestFunc ¶
DecodeRequestFunc extracts a user-domain request object from an AMQP Delivery object. It is designed to be used in AMQP Subscribers.
type DecodeResponseFunc ¶
DecodeResponseFunc extracts a user-domain response object from an AMQP Delivery object. It is designed to be used in AMQP Publishers.
type DefaultErrorResponse ¶
type DefaultErrorResponse struct {
Error string `json:"err"`
}
DefaultErrorResponse is the default structure of responses in the event of an error.
type EncodeRequestFunc ¶
type EncodeRequestFunc func(context.Context, *amqp.Publishing, interface{}) error
EncodeRequestFunc encodes the passed request object into an AMQP Publishing object. It is designed to be used in AMQP Publishers.
type EncodeResponseFunc ¶
type EncodeResponseFunc func(context.Context, *amqp.Publishing, interface{}) error
EncodeResponseFunc encodes the passed reponse object to an AMQP Publishing object. It is designed to be used in AMQP Subscribers.
type ErrorEncoder ¶
type ErrorEncoder func(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)
ErrorEncoder is responsible for encoding an error to the subscriber reply. Users are encouraged to use custom ErrorEncoders to encode errors to their replies, and will likely want to pass and check for their own error types.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher wraps an AMQP channel and queue, and provides a method that implements endpoint.Endpoint.
func NewPublisher ¶
func NewPublisher( ch Channel, q *amqp.Queue, enc EncodeRequestFunc, dec DecodeResponseFunc, options ...PublisherOption, ) *Publisher
NewPublisher constructs a usable Publisher for a single remote method.
type PublisherOption ¶
type PublisherOption func(*Publisher)
PublisherOption sets an optional parameter for clients.
func PublisherAfter ¶
func PublisherAfter(after ...PublisherResponseFunc) PublisherOption
PublisherAfter sets the ClientResponseFuncs applied to the incoming AMQP request prior to it being decoded. This is useful for obtaining anything off of the response and adding onto the context prior to decoding.
func PublisherBefore ¶
func PublisherBefore(before ...RequestFunc) PublisherOption
PublisherBefore sets the RequestFuncs that are applied to the outgoing AMQP request before it's invoked.
func PublisherTimeout ¶
func PublisherTimeout(timeout time.Duration) PublisherOption
PublisherTimeout sets the available timeout for an AMQP request.
type PublisherResponseFunc ¶
PublisherResponseFunc may take information from an AMQP request and make the response available for consumption. PublisherResponseFunc are only executed in publishers, after a request has been made, but prior to it being decoded.
type RequestFunc ¶
RequestFunc may take information from a publisher request and put it into a request context. In Subscribers, RequestFuncs are executed prior to invoking the endpoint.
func SetConsumeArgs ¶
func SetConsumeArgs(args amqp.Table) RequestFunc
SetConsumeArgs returns a RequestFunc that set the arguments for amqp Consume function. It is designed to be used by Publishers.
func SetConsumeAutoAck ¶
func SetConsumeAutoAck(autoAck bool) RequestFunc
SetConsumeAutoAck returns a RequestFunc that sets whether or not to autoAck messages when consuming. When set to false, the publisher will Ack the first message it receives with a matching correlationId. It is designed to be used by Publishers.
func SetContentEncoding ¶
func SetContentEncoding(contentEncoding string) RequestFunc
SetContentEncoding returns a RequestFunc that sets the ContentEncoding field of an AMQP Publishing.
func SetContentType ¶
func SetContentType(contentType string) RequestFunc
SetContentType returns a RequestFunc that sets the ContentType field of an AMQP Publishing.
func SetCorrelationID ¶
func SetCorrelationID(cid string) RequestFunc
SetCorrelationID returns a RequestFunc that sets the CorrelationId field of an AMQP Publishing.
func SetNackSleepDuration ¶
func SetNackSleepDuration(duration time.Duration) RequestFunc
SetNackSleepDuration returns a RequestFunc that sets the amount of time to sleep in the event of a Nack. This has to be used in conjunction with an error encoder that Nack and sleeps. One example is the SingleNackRequeueErrorEncoder. It is designed to be used by Subscribers.
func SetPublishDeliveryMode ¶
func SetPublishDeliveryMode(dmode uint8) RequestFunc
SetPublishDeliveryMode sets the delivery mode of a Publishing. Please refer to AMQP delivery mode constants in the AMQP package.
func SetPublishExchange ¶
func SetPublishExchange(publishExchange string) RequestFunc
SetPublishExchange returns a RequestFunc that sets the Exchange field of an AMQP Publish call.
func SetPublishKey ¶
func SetPublishKey(publishKey string) RequestFunc
SetPublishKey returns a RequestFunc that sets the Key field of an AMQP Publish call.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber wraps an endpoint and provides a handler for AMQP Delivery messages.
func NewSubscriber ¶
func NewSubscriber( e endpoint.Endpoint, dec DecodeRequestFunc, enc EncodeResponseFunc, options ...SubscriberOption, ) *Subscriber
NewSubscriber constructs a new subscriber, which provides a handler for AMQP Delivery messages.
func (Subscriber) ServeDelivery ¶
func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery)
ServeDelivery handles AMQP Delivery messages It is strongly recommended to use *amqp.Channel as the Channel interface implementation.
type SubscriberFinalizerFunc ¶ added in v0.8.1
ServerFinalizerFunc can be used to perform work at the end of an MQ request, after the response has been written to the client. The principal intended use is for request logging. In addition to the response code provided in the function signature, additional response parameters are provided in the context under keys with the ContextKeyResponse prefix.
type SubscriberOption ¶
type SubscriberOption func(*Subscriber)
SubscriberOption sets an optional parameter for subscribers.
func ServerFinalizer ¶ added in v0.8.1
func ServerFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption
ServerFinalizer is executed at the end of every MQ request. By default, no finalizer is registered.
func SubscriberAfter ¶
func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption
SubscriberAfter functions are executed on the subscriber reply after the endpoint is invoked, but before anything is published to the reply.
func SubscriberBefore ¶
func SubscriberBefore(before ...RequestFunc) SubscriberOption
SubscriberBefore functions are executed on the publisher delivery object before the request is decoded.
func SubscriberErrorEncoder ¶
func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption
SubscriberErrorEncoder is used to encode errors to the subscriber reply whenever they're encountered in the processing of a request. Clients can use this to provide custom error formatting. By default, errors will be published with the DefaultErrorEncoder.
func SubscriberErrorLogger ¶
func SubscriberErrorLogger(logger log.Logger) SubscriberOption
SubscriberErrorLogger is used to log non-terminal errors. By default, no errors are logged. This is intended as a diagnostic measure. Finer-grained control of error handling, including logging in more detail, should be performed in a custom SubscriberErrorEncoder which has access to the context.
type SubscriberResponseFunc ¶
type SubscriberResponseFunc func(context.Context, *amqp.Delivery, Channel, *amqp.Publishing, ) context.Context
SubscriberResponseFunc may take information from a request context and use it to manipulate a Publisher. SubscriberResponseFuncs are only executed in subscribers, after invoking the endpoint but prior to publishing a reply.
func SetAckAfterEndpoint ¶
func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc
SetAckAfterEndpoint returns a SubscriberResponseFunc that prompts the service to Ack the Delivery object after successfully evaluating the endpoint, and before it encodes the response. It is designed to be used by Subscribers.