Documentation
¶
Overview ¶
Package harego contains the logic for communicating with RabbitMQ.
Client ¶
A Client wraps an exchange and a queue in a concurrent safe manner for managing all communications with RabbitMQ. If the Client doesn't have a queue name set, it will not create a queue. Zero value is not usable, therefore a Client should be constructed with NewClient() function.
The only requirement for an Client to operate is a connector that returns a connection to the broker. There is also a helper function that can create a new connection from the address of the broker. The Client will create 1 worker by default for publishing and consuming messages. The default exchange of the Client is "default" and it is set woth the "topic" type. The default delivery method is persistent. You can use provided ConfigFunc functions to change the Client's behaviour.
You should call the Close() method when you are done with this object, otherwise it leaks goroutines.
NewClient ¶
NewClient returns an Client instance. You can configure the object by providing ConfigFunc functions. See ConfigFunc documentation for more information.
client, err := harego.NewClient(harego.URLConnector("amqp://"), harego.Workers(6), ) // handle error
Publish ¶
Queue names are not needed for publishing messages. If the client is setup with multiple workers, each Publish call will go through the next available worker.
Consume ¶
To use the Consume method, you need to provide a queue name. The Client will bind the queue to the exchange, then Consume calls the handler with the next available message on the next available worker. It stops handling messages when the context is done or the client is closed. Consume internally creates another worker for requeueing messages. By default messages are consumed with false autoAck.
Make sure you have enough workers so the Consume is not clogged on delays.
Index ¶
- Constants
- Variables
- func AutoDelete(c *Client) error
- func ExclusiveQueue(c *Client) error
- func Internal(c *Client) error
- func NoWait(c *Client) error
- func NotDurable(c *Client) error
- type AckType
- type Channel
- type Client
- type ConfigFunc
- func ConsumerName(name string) ConfigFunc
- func ExchangeName(name string) ConfigFunc
- func PrefetchCount(i int) ConfigFunc
- func PrefetchSize(i int) ConfigFunc
- func QueueArgs(args amqp.Table) ConfigFunc
- func QueueName(name string) ConfigFunc
- func RoutingKey(key string) ConfigFunc
- func WithDeliveryMode(mode DeliveryMode) ConfigFunc
- func WithExchangeType(t ExchangeType) ConfigFunc
- func Workers(n int) ConfigFunc
- type Connector
- type DeliveryMode
- type ExchangeType
- type HandlerFunc
- type RabbitMQ
Examples ¶
Constants ¶
const ( // DeliveryModeTransient means higher throughput but messages will not be // restored on broker restart. The delivery mode of publishings is // unrelated to the durability of the queues they reside on. Transient // messages will not be restored to durable queues. DeliveryModeTransient = DeliveryMode(amqp.Transient) // DeliveryModePersistent messages will be restored to durable queues and // lost on non-durable queues during server restart. DeliveryModePersistent = DeliveryMode(amqp.Persistent) )
Variables ¶
var ( // ErrInput is returned when an input is invalid. ErrInput = errors.New("invalid input") // ErrNilHnadler is returned when the handler is nil. ErrNilHnadler = errors.New("handler can not be nil") // ErrClosed is returned when the Client is closed and is being reused. ErrClosed = errors.New("exchange is already closed") // ErrAlreadyConfigured is returned when an already configured client is // about to receive new configuration. ErrAlreadyConfigured = errors.New("client is already configured") )
Functions ¶
func AutoDelete ¶
AutoDelete marks the exchange and queues with autoDelete property which causes the messages to be automatically removed from the queue when consumed.
func ExclusiveQueue ¶ added in v0.3.0
ExclusiveQueue marks the queue as exclusive. Exclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes. Channels on other connections will receive an error when attempting to declare, bind, consume, purge or delete a queue with the same name.
func NoWait ¶
NoWait marks the exchange as noWait. When noWait is true, declare without waiting for a confirmation from the server. The channel may be closed as a result of an error.
func NotDurable ¶
NotDurable marks the exchange and the queue not to be durable. Default is durable.
Types ¶
type AckType ¶
type AckType int
AckType specifies how the message is acknowledged to RabbotMQ.
const ( // AckTypeAck causes the message to be removed in broker. The multiple value // is false, causing the broker to act on one message. AckTypeAck AckType = iota // AckTypeNack causes the message to be requeued in broker. The multiple // value is false, causing the broker to act on one message. AckTypeNack // AckTypeReject causes the message to be dropped in broker. AckTypeNack // must not be used to select or requeue messages the client wishes not to // handle, rather it is to inform the server that the client is incapable // of handling this message at this time. AckTypeReject // AckTypeRequeue causes the message to be requeued back to the end of the // queue. AckTypeRequeue )
type Channel ¶
type Channel interface { // ExchangeDeclare declares an exchange on the server. If the exchange does // not already exist, the server will create it. If the exchange exists, // the server verifies that it is of the provided type, durability and // auto-delete flags. ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error // Publish sends a Publishing from the client to an exchange on the server. Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error // QueueDeclare declares a queue to hold messages and deliver to consumers. // Declaring creates a queue if it doesn't already exist, or ensures that // an existing queue matches the same parameters. QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) // QueueBind binds an exchange to a queue so that publishings to the // exchange will be routed to the queue when the publishing routing key // matches the binding routing key. QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error // Consume immediately starts delivering queued messages. Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) // Qos controls how many messages or how many bytes the server will try to // keep on the network for consumers before receiving delivery acks. The // intent of Qos is to make sure the network buffers stay full between the // server and client. Qos(prefetchCount, prefetchSize int, global bool) error // Close initiate a clean channel closure by sending a close message with // the error code set to '200'. Close() error // NotifyClose registers a listener for when the server sends a channel or // connection exception in the form of a Connection.Close or Channel.Close // method. Connection exceptions will be broadcast to all open channels and // all channels will be closed, where channel exceptions will only be // broadcast to listeners to this channel. NotifyClose(receiver chan *amqp.Error) chan *amqp.Error }
A Channel can operate queues. This is a subset of the amqp.Channel api.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a concurrent safe construct for publishing a message to exchanges, and consuming messages from queues. It creates multiple workers for safe communication. Zero value is not usable. nolint:govet // most likely not an issue, but cleaner this way.
func NewClient ¶
func NewClient(connector Connector, conf ...ConfigFunc) (*Client, error)
NewClient returns a Client capable of publishing and consuming messages. The default Client config uses the "default" exchange of the "topic" type, both exchange and queues will be marked as "durable", messages will be persistent, and the consumer gets a random name. The connector value should provide a live connection. The connector value is used during reconnection process.
func (*Client) Consume ¶
func (c *Client) Consume(ctx context.Context, handler HandlerFunc) error
Consume is a bloking call that passes each message to the handler and stops handling messages when the context is done. If the handler returns false, the message is returned back to the queue. If the context is cancelled, the Client remains operational but no messages will be deliverd to this handler. Consume returns an error if you don't specify a queue name.
type ConfigFunc ¶
ConfigFunc is a function for setting up the Client. A config function returns an error if the client is already started. You should not use this type outside of the NewClient function call.
func ConsumerName ¶
func ConsumerName(name string) ConfigFunc
ConsumerName sets the consumer name of the consuming queue.
func ExchangeName ¶
func ExchangeName(name string) ConfigFunc
ExchangeName sets the exchange name. For each worker, and additional string will be appended for the worker number.
func PrefetchCount ¶
func PrefetchCount(i int) ConfigFunc
PrefetchCount sets how many items should be prefetched for consumption. With a prefetch count greater than zero, the server will deliver that many messages to consumers before acknowledgments are received. The server ignores this option when consumers are started with noAck because no acknowledgments are expected or sent.
func PrefetchSize ¶
func PrefetchSize(i int) ConfigFunc
PrefetchSize sets the prefetch size of the Qos. If it is greater than zero, the server will try to keep at least that many bytes of deliveries flushed to the network before receiving acknowledgments from the consumers.
func QueueArgs ¶ added in v0.3.1
func QueueArgs(args amqp.Table) ConfigFunc
QueueArgs sets the args possed to the QueueDeclare method.
func RoutingKey ¶
func RoutingKey(key string) ConfigFunc
RoutingKey sets the routing key of the queue.
func WithDeliveryMode ¶
func WithDeliveryMode(mode DeliveryMode) ConfigFunc
WithDeliveryMode sets the default delivery mode of messages.
func WithExchangeType ¶
func WithExchangeType(t ExchangeType) ConfigFunc
WithExchangeType sets the exchange type. The default is ExchangeTypeTopic.
type Connector ¶ added in v0.3.0
A Connector should return a live connection. It will be called during the Client initialisation and during reconnection process.
func AMQPConnector ¶ added in v0.3.0
func AMQPConnector(r *amqp.Connection) Connector
AMQPConnector uses r everytime the Client needs a new connection. You should make sure r keep being alive.
func URLConnector ¶ added in v0.3.0
URLConnector creates a new connection from url.
type DeliveryMode ¶
type DeliveryMode uint8
DeliveryMode is the DeliveryMode of a amqp.Publishing message.
func (DeliveryMode) IsValid ¶
func (d DeliveryMode) IsValid() bool
IsValid returns true if the object is within the valid boundries.
func (DeliveryMode) String ¶
func (i DeliveryMode) String() string
type ExchangeType ¶
type ExchangeType int
ExchangeType is the kind of exchange.
const ( // ExchangeTypeDirect defines a direct exchange. ExchangeTypeDirect ExchangeType = iota // ExchangeTypeFanout defines a fanout exchange. ExchangeTypeFanout // ExchangeTypeTopic defines a topic exchange. ExchangeTypeTopic // ExchangeTypeHeaders defines a headers exchange. ExchangeTypeHeaders )
func (ExchangeType) IsValid ¶
func (e ExchangeType) IsValid() bool
IsValid returns true if the object is within the valid boundries.
func (ExchangeType) String ¶
func (e ExchangeType) String() string
type HandlerFunc ¶
A HandlerFunc receives a message when it is available. The returned AckType dictates how to deal with the message. The delay can be 0 or any duration. The consumer will sleep this amount before sending Ack. If the user needs to requeue the message, they may mutate the message if required. Mutate the msg at your own peril.
Example (Ack) ¶
ExampleHandlerFunc_ack instructs the consumer to drop the message immediately.
package main import ( "fmt" "time" "github.com/blokur/harego" amqp "github.com/rabbitmq/amqp091-go" ) func main() { var fn harego.HandlerFunc = func(msg *amqp.Delivery) (harego.AckType, time.Duration) { // logic for working with msg.Body goes here. return harego.AckTypeAck, 0 } got, delay := fn(&amqp.Delivery{}) fmt.Printf("Got %s and will delay for %s", got, delay) }
Output: Got AckTypeAck and will delay for 0s
Example (Reject) ¶
ExampleHandlerFunc_reject instructs the consumer to reject the message after 100ms. This will cause the consumer to sleep, therefore you need to make sure there are enough workers to respond to other tasks.
package main import ( "fmt" "time" "github.com/blokur/harego" amqp "github.com/rabbitmq/amqp091-go" ) func main() { var fn harego.HandlerFunc = func(*amqp.Delivery) (harego.AckType, time.Duration) { // logic for working with msg.Body goes here. return harego.AckTypeReject, 100 * time.Millisecond } got, delay := fn(&amqp.Delivery{}) fmt.Printf("Got %s and will delay for %s", got, delay) }
Output: Got AckTypeReject and will delay for 100ms
Example (Requeue) ¶
ExampleHandlerFunc_requeue instructs the consumer to put the messaage at the end of the queue after 1 second.
package main import ( "fmt" "time" "github.com/blokur/harego" amqp "github.com/rabbitmq/amqp091-go" ) func main() { var fn harego.HandlerFunc = func(*amqp.Delivery) (harego.AckType, time.Duration) { // logic for working with msg.Body goes here. return harego.AckTypeRequeue, time.Second } got, delay := fn(&amqp.Delivery{}) fmt.Printf("Got %s and will delay for %s", got, delay) }
Output: Got AckTypeRequeue and will delay for 1s