Documentation ¶
Overview ¶
Package cony is a high-level wrapper around http://github.com/streadway/amqp library, for working declaratively with AMQP. Cony will manage AMQP connect/reconnect to AMQP brocker, along with recovery of consumers.
Example ¶
package main import ( "log" "os" "github.com/assembla/cony" "github.com/streadway/amqp" ) func main() { client := cony.NewClient(cony.URL(os.Getenv("AMQP_URL")), cony.Backoff(cony.DefaultBackoff)) q := &cony.Queue{ Name: "", // autogenerated queue name AutoDelete: true, } exchange := cony.Exchange{ Name: "amq.topic", Durable: true, } b := cony.Binding{ Queue: q, Exchange: exchange, Key: "something.#", } // wrap all declarations and save into slice declarations := []cony.Declaration{ cony.DeclareQueue(q), cony.DeclareExchange(exchange), cony.DeclareBinding(b), } // declare consumer consumer := cony.NewConsumer(q, cony.Qos(10), cony.AutoTag(), cony.AutoAck(), ) // declare publisher publisher := cony.NewPublisher(exchange.Name, "ololo.key", cony.PublishingTemplate(amqp.Publishing{ ContentType: "application/json", AppId: "app1", }), // template amqp.Publising ) // let client know about declarations client.Declare(declarations) // let client know about consumers/publishers client.Consume(consumer) client.Publish(publisher) clientErrs := client.Errors() deliveries := consumer.Deliveries() consumerErrs := consumer.Errors() // connect, reconnect, or exit loop // run network operations such as: // queue, exchange, bidning, consumers declarations for client.Loop() { select { case msg := <-deliveries: log.Println(msg) msg.Ack(false) publisher.Write([]byte("ololo reply")) case err := <-consumerErrs: log.Println("CONSUMER ERROR: ", err) case err := <-clientErrs: log.Println("CLIENT ERROR: ", err) client.Close() } } }
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoConnection is an indicator that currently there is no connection // available ErrNoConnection = errors.New("No connection available") )
var ErrPublisherDead = errors.New("Publisher is dead")
ErrPublisherDead indicates that publisher was canceled, could be returned from Write() and Publish() methods
Functions ¶
This section is empty.
Types ¶
type BackoffPolicy ¶
type BackoffPolicy struct {
// contains filtered or unexported fields
}
BackoffPolicy is a default Backoffer implementation
type Backoffer ¶
Backoffer is interface to hold Backoff strategy
var DefaultBackoff Backoffer = BackoffPolicy{ []int{0, 10, 100, 200, 500, 1000, 2000, 3000, 5000}, }
DefaultBackoff See: http://blog.gopheracademy.com/advent-2014/backoff/
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Main AMQP client wrapper
func (*Client) Blocking ¶
Blocking notifies the server's TCP flow control of the Connection. Default buffer size is 10. Messages will be dropped in case if receiver can't keep up
func (*Client) Declare ¶
func (c *Client) Declare(d []Declaration)
Declare used to declare queues/exchanges/bindings. Declaration is saved and will be re-run every time Client gets connection
func (*Client) Errors ¶
Errors returns AMQP connection level errors. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up
func (*Client) Loop ¶
Loop should be run as condition for `for` with receiving from (*Client).Errors()
It will manage AMQP connection, run queue and exchange declarations, consumers. Will start to return false once (*Client).Close() called.
Example ¶
package main import ( "log" "time" "github.com/assembla/cony" ) func main() { client := cony.NewClient(cony.URL("amqp://guest:guest@localhost/")) for client.Loop() { select { case err := <-client.Errors(): log.Println("CLIENT ERROR: ", err) client.Close() } time.Sleep(1 * time.Second) // naive backoff } }
Output:
type ClientOpt ¶
type ClientOpt func(*Client)
ClientOpt is a Client's functional option type
func Backoff ¶
Backoff is a functional option, used to define backoff policy, used in `NewClient` constructor
func BlockingChan ¶
BlockingChan is a functional option, used to initialize blocking reporting channel in client code, maintaining control over buffering, used in `NewClient` constructor
Example ¶
package main import ( "github.com/assembla/cony" "github.com/streadway/amqp" ) func main() { blockings := make(chan amqp.Blocking, 100) // define custom buffer size cony.NewClient(cony.BlockingChan(blockings)) }
Output:
func ErrorsChan ¶
ErrorsChan is a functional option, used to initialize error reporting channel in client code, maintaining control over buffer size. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up, used in `NewClient` constructor
Example ¶
package main import ( "github.com/assembla/cony" ) func main() { errors := make(chan error, 100) // define custom buffer size cony.NewClient(cony.ErrorsChan(errors)) }
Output:
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer holds definition for AMQP consumer
func NewConsumer ¶
func NewConsumer(q *Queue, opts ...ConsumerOpt) *Consumer
NewConsumer Consumer's constructor
func (*Consumer) Cancel ¶
func (c *Consumer) Cancel()
Cancel this consumer.
This will CLOSE Deliveries() channel
func (*Consumer) Deliveries ¶
Deliveries return deliveries shipped to this consumer this channel never closed, even on disconnects
type ConsumerOpt ¶
type ConsumerOpt func(*Consumer)
ConsumerOpt is a consumer's functional option type
func AutoTag ¶
func AutoTag() ConsumerOpt
AutoTag set automatically generated tag like this
fmt.Sprintf(QueueName+"-pid-%d@%s", os.Getpid(), os.Hostname())
type Declaration ¶
Declaration is a callback type to declare AMQP queue/exchange/bidning
func DeclareBinding ¶
func DeclareBinding(b Binding) Declaration
DeclareBinding is a way to declare AMQP bidning between AMQP queue and exchange
func DeclareExchange ¶
func DeclareExchange(e Exchange) Declaration
DeclareExchange is a way to declare AMQP exchange
func DeclareQueue ¶
func DeclareQueue(q *Queue) Declaration
DeclareQueue is a way to declare AMQP queue
type Declarer ¶
type Declarer interface { QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error }
Declarer is implemented by *amqp.Channel
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher hold definition for AMQP publishing
func NewPublisher ¶
func NewPublisher(exchange string, key string, opts ...PublisherOpt) *Publisher
NewPublisher is a Publisher constructor
func (*Publisher) Publish ¶
func (p *Publisher) Publish(pub amqp.Publishing) error
Publish used to publish custom amqp.Publishing
WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.
type PublisherOpt ¶
type PublisherOpt func(*Publisher)
PublisherOpt is a functional option type for Publisher
func PublishingTemplate ¶
func PublishingTemplate(t amqp.Publishing) PublisherOpt
PublishingTemplate Publisher's functional option. Provide template amqp.Publishing and save typing.