Documentation
¶
Index ¶
- Constants
- Variables
- type AMQP
- type BindArgs
- type ConsumerMessage
- type DeclareArgs
- type Delivery
- type Emitter
- type ListenConfig
- type Message
- type OnStateChangeFunc
- type Option
- func AMQPProvider(provider AMQP) Option
- func Attempts(attempts int) Option
- func BreakerInterval(interval time.Duration) Option
- func BreakerTimeout(timeout time.Duration) Option
- func Durable(durable bool) Option
- func OnStateChange(fn OnStateChangeFunc) Option
- func PassiveExchange(isExchangePassive bool) Option
- func PrefetchCount(count int) Option
- func PrefetchSize(size int) Option
- func QosGlobal(global bool) Option
- func Sleep(sleep time.Duration) Option
- func Threshold(threshold uint32) Option
- type Rabbus
Constants ¶
const ( // Transient means higher throughput but messages will not be restored on broker restart. Transient uint8 = 1 // Persistent messages will be restored to durable queues and lost on non-durable queues during server restart. Persistent uint8 = 2 // ContentTypeJSON define json content type. ContentTypeJSON = "application/json" // ContentTypePlain define plain text content type. ContentTypePlain = "plain/text" // ExchangeDirect indicates the exchange is of direct type. ExchangeDirect = "direct" // ExchangeFanout indicates the exchange is of fanout type. ExchangeFanout = "fanout" // ExchangeTopic indicates the exchange is of topic type. ExchangeTopic = "topic" )
Variables ¶
var ( // ErrMissingExchange is returned when exchange name is not passed as parameter. ErrMissingExchange = errors.New("Missing field exchange") // ErrMissingKind is returned when exchange type is not passed as parameter. ErrMissingKind = errors.New("Missing field kind") // ErrMissingQueue is returned when queue name is not passed as parameter. ErrMissingQueue = errors.New("Missing field queue") // ErrMissingHandler is returned when function handler is not passed as parameter. ErrMissingHandler = errors.New("Missing field handler") // ErrUnsupportedArguments is returned when more than the permitted arguments is passed to a function. ErrUnsupportedArguments = errors.New("Unsupported arguments size") )
Functions ¶
This section is empty.
Types ¶
type AMQP ¶
type AMQP interface { // Publish wraps amqp.Publish method Publish(exchange, key string, opts amqp.Publishing) error // CreateConsumer creates a amqp consumer CreateConsumer(exchange, key, kind, queue string, durable bool, declareArgs, bindArgs amqp.Table) (<-chan amqp.Delivery, error) // WithExchange creates a amqp exchange WithExchange(exchange, kind string, durable bool) error // WithQos wrapper over amqp.Qos method WithQos(count, size int, global bool) error // NotifyClose wrapper over notifyClose method NotifyClose(c chan *amqp.Error) chan *amqp.Error // Close closes the running amqp connection and channel Close() error }
AMQP exposes a interface for interacting with AMQP broker
type BindArgs ¶
type BindArgs struct {
// contains filtered or unexported fields
}
BindArgs is the wrapper for AMQP Table class to set common queue bind values
type ConsumerMessage ¶
type ConsumerMessage struct { ContentType string ContentEncoding string // DeliveryMode queue implementation use, non-persistent (1) or persistent (2) DeliveryMode uint8 // Priority queue implementation use, 0 to 9 Priority uint8 // CorrelationId application use, correlation identifier CorrelationId string // ReplyTo application use, address to to reply to (ex: RPC) ReplyTo string // Expiration implementation use, message expiration spec Expiration string // MessageId application use, message identifier MessageId string // Timestamp application use, message timestamp Timestamp time.Time // Type application use, message type name Type string // ConsumerTag valid only with Channel.Consume ConsumerTag string // MessageCount valid only with Channel.Get MessageCount uint32 DeliveryTag uint64 Redelivered bool Exchange string // Headers application or header exchange table Headers map[string]interface{} // Key basic.publish routing key Key string Body []byte // contains filtered or unexported fields }
ConsumerMessage captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer.
func (ConsumerMessage) Ack ¶
func (cm ConsumerMessage) Ack(multiple bool) error
Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery. All deliveries in AMQP must be acknowledged. If you called Channel.Consume with autoAck true then the server will be automatically ack each message and this method should not be called. Otherwise, you must call Delivery.Ack after you have successfully processed this delivery. When multiple is true, this delivery and all prior unacknowledged deliveries on the same channel will be acknowledged. This is useful for batch processing of deliveries. An error will indicate that the acknowledge could not be delivered to the channel it was sent from. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.
func (ConsumerMessage) Nack ¶
func (cm ConsumerMessage) Nack(multiple, requeue bool) error
Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.
func (ConsumerMessage) Reject ¶
func (cm ConsumerMessage) Reject(requeue bool) error
Reject delegates a negatively acknowledgement through the Acknowledger interface. When requeue is true, queue this message to be delivered to a consumer on a different channel. When requeue is false or the server is unable to queue this message, it will be dropped. If you are batch processing deliveries, and your server supports it, prefer Delivery.Nack. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.
type DeclareArgs ¶
type DeclareArgs struct {
// contains filtered or unexported fields
}
DeclareArgs is the queue declaration values builder
func NewDeclareArgs ¶
func NewDeclareArgs() *DeclareArgs
NewDeclareArgs creates new queue declaration values builder
func (*DeclareArgs) With ¶
func (a *DeclareArgs) With(name string, value interface{}) *DeclareArgs
With sets the value by name
func (*DeclareArgs) WithMessageTTL ¶
func (a *DeclareArgs) WithMessageTTL(d time.Duration) *DeclareArgs
WithMessageTTL sets Queue message TTL. See details at https://www.rabbitmq.com/ttl.html#message-ttl-using-x-args
type Emitter ¶
type Emitter interface { // EmitAsync emits a message to RabbitMQ, but does not wait for the response from broker. EmitAsync() chan<- Message // EmitErr returns an error if encoding payload fails, or if after circuit breaker is open or retries attempts exceed. EmitErr() <-chan error // EmitOk returns true when the message was sent. EmitOk() <-chan struct{} }
Emitter exposes a interface for publishing messages to AMQP broker
type ListenConfig ¶
type ListenConfig struct { // Exchange the exchange name. Exchange string // Kind the exchange type. Kind string // Key the routing key name. Key string // PassiveExchange determines a passive exchange connection it uses // amqp's ExchangeDeclarePassive instead the default ExchangeDeclare PassiveExchange bool // Queue the queue name Queue string // DeclareArgs is a list of arguments accepted for when declaring the queue. // See https://www.rabbitmq.com/queues.html#optional-arguments for more info. DeclareArgs *DeclareArgs // BindArgs is a list of arguments accepted for when binding the exchange to the queue BindArgs *BindArgs }
ListenConfig carries fields for listening messages.
type Message ¶
type Message struct { // Exchange the exchange name. Exchange string // Kind the exchange type. Kind string // Key the routing key name. Key string // Payload the message payload. Payload []byte // DeliveryMode indicates if the is Persistent or Transient. DeliveryMode uint8 // ContentType the message content-type. ContentType string // Headers the message application headers Headers map[string]interface{} // ContentEncoding the message encoding. ContentEncoding string }
Message carries fields for sending messages.
type OnStateChangeFunc ¶
type OnStateChangeFunc func(name, from, to string)
OnStateChangeFunc is the callback function when circuit breaker state changes.
type Option ¶
Option represents an option you can pass to New. See the documentation for the individual options.
func AMQPProvider ¶
AMQPProvider expose a interface for interacting with amqp broker
func BreakerInterval ¶
BreakerInterval is the cyclic period of the closed state for CircuitBreaker to clear the internal counts, If Interval is 0, CircuitBreaker doesn't clear the internal counts during the closed state.
func BreakerTimeout ¶
BreakerTimeout is the period of the open state, after which the state of CircuitBreaker becomes half-open. If Timeout is 0, the timeout value of CircuitBreaker is set to 60 seconds.
func OnStateChange ¶
func OnStateChange(fn OnStateChangeFunc) Option
OnStateChange is called whenever the state of CircuitBreaker changes.
func PassiveExchange ¶
PassiveExchange forces passive connection with all exchanges using amqp's ExchangeDeclarePassive instead the default ExchangeDeclare
func PrefetchCount ¶
PrefetchCount limit the number of unacknowledged messages.
func PrefetchSize ¶
PrefetchSize when greater than zero, the server will try to keep at least that many bytes of deliveries flushed to the network before receiving acknowledgments from the consumers.
func QosGlobal ¶
QosGlobal when global is true, these Qos settings apply to all existing and future consumers on all channels on the same connection. When false, the Channel.Qos settings will apply to all existing and future consumers on this channel. RabbitMQ does not implement the global flag.
type Rabbus ¶
type Rabbus struct { AMQP // contains filtered or unexported fields }
Rabbus interpret (implement) Rabbus interface definition
func New ¶
New returns a new Rabbus configured with the variables from the config parameter, or returning an non-nil err if an error occurred while creating connection and channel.
func (*Rabbus) EmitAsync ¶
EmitAsync emits a message to RabbitMQ, but does not wait for the response from broker.
func (*Rabbus) EmitErr ¶
EmitErr returns an error if encoding payload fails, or if after circuit breaker is open or retries attempts exceed.
func (*Rabbus) EmitOk ¶
func (r *Rabbus) EmitOk() <-chan struct{}
EmitOk returns true when the message was sent.
func (*Rabbus) Listen ¶
func (r *Rabbus) Listen(c ListenConfig) (chan ConsumerMessage, error)
Listen to a message from RabbitMQ, returns an error if exchange, queue name and function handler not passed or if an error occurred while creating amqp consumer.