Documentation ¶
Overview ¶
Package cony is a high-level wrapper around http://github.com/streadway/amqp library, for working declaratively with AMQP. Cony will manage AMQP connect/reconnect to AMQP broker, along with recovery of consumers.
Example ¶
package main import ( "log" "os" "github.com/assembla/cony" "github.com/streadway/amqp" ) func main() { client := cony.NewClient(cony.URL(os.Getenv("AMQP_URL")), cony.Backoff(cony.DefaultBackoff)) q := &cony.Queue{ Name: "", // autogenerated queue name AutoDelete: true, } exchange := cony.Exchange{ Name: "amq.topic", Durable: true, } b := cony.Binding{ Queue: q, Exchange: exchange, Key: "something.#", } // wrap all declarations and save into slice declarations := []cony.Declaration{ cony.DeclareQueue(q), cony.DeclareExchange(exchange), cony.DeclareBinding(b), } // declare consumer consumer := cony.NewConsumer(q, cony.Qos(10), cony.AutoTag(), cony.AutoAck(), ) // declare publisher publisher := cony.NewPublisher(exchange.Name, "ololo.key", cony.PublishingTemplate(amqp.Publishing{ ContentType: "application/json", AppId: "app1", }), // template amqp.Publising ) // let client know about declarations client.Declare(declarations) // let client know about consumers/publishers client.Consume(consumer) client.Publish(publisher) clientErrs := client.Errors() deliveries := consumer.Deliveries() consumerErrs := consumer.Errors() // connect, reconnect, or exit loop // run network operations such as: // queue, exchange, bidning, consumers declarations for client.Loop() { select { case msg := <-deliveries: log.Println(msg) msg.Ack(false) publisher.Write([]byte("ololo reply")) case err := <-consumerErrs: log.Println("CONSUMER ERROR: ", err) case err := <-clientErrs: log.Println("CLIENT ERROR: ", err) client.Close() } } }
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoConnection is an indicator that currently there is no connection // available ErrNoConnection = errors.New("No connection available") )
var ErrPublisherDead = errors.New("Publisher is dead")
ErrPublisherDead indicates that publisher was canceled, could be returned from Write() and Publish() methods
Functions ¶
This section is empty.
Types ¶
type BackoffPolicy ¶
type BackoffPolicy struct {
// contains filtered or unexported fields
}
BackoffPolicy is a default Backoffer implementation
type Backoffer ¶
Backoffer is interface to hold Backoff strategy
var DefaultBackoff Backoffer = BackoffPolicy{ []int{0, 10, 100, 200, 500, 1000, 2000, 3000, 5000}, }
DefaultBackoff See: http://blog.gopheracademy.com/advent-2014/backoff/
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Main AMQP client wrapper
func (*Client) Blocking ¶
Blocking notifies the server's TCP flow control of the Connection. Default buffer size is 10. Messages will be dropped in case if receiver can't keep up
func (*Client) Declare ¶
func (c *Client) Declare(d []Declaration) error
Declare used to declare queues/exchanges/bindings. Declaration is saved and will be re-run every time Client gets connection
func (*Client) Errors ¶
Errors returns AMQP connection level errors. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up
func (*Client) Loop ¶
Loop should be run as condition for `for` with receiving from (*Client).Errors()
It will manage AMQP connection, run queue and exchange declarations, consumers. Will start to return false once (*Client).Close() called.
Example ¶
package main import ( "log" "time" "github.com/assembla/cony" ) func main() { client := cony.NewClient(cony.URL("amqp://guest:guest@localhost/")) for client.Loop() { select { case err := <-client.Errors(): log.Println("CLIENT ERROR: ", err) client.Close() } time.Sleep(1 * time.Second) // naive backoff } }
Output:
type ClientOpt ¶
type ClientOpt func(*Client)
ClientOpt is a Client's functional option type
func Backoff ¶
Backoff is a functional option, used to define backoff policy, used in `NewClient` constructor
func BlockingChan ¶
BlockingChan is a functional option, used to initialize blocking reporting channel in client code, maintaining control over buffering, used in `NewClient` constructor
Example ¶
package main import ( "github.com/assembla/cony" "github.com/streadway/amqp" ) func main() { blockings := make(chan amqp.Blocking, 100) // define custom buffer size cony.NewClient(cony.BlockingChan(blockings)) }
Output:
func Config ¶ added in v0.3.0
Config is a functional option, used to setup extended amqp configuration
func ErrorsChan ¶
ErrorsChan is a functional option, used to initialize error reporting channel in client code, maintaining control over buffer size. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up, used in `NewClient` constructor
Example ¶
package main import ( "github.com/assembla/cony" ) func main() { errors := make(chan error, 100) // define custom buffer size cony.NewClient(cony.ErrorsChan(errors)) }
Output:
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer holds definition for AMQP consumer
func NewConsumer ¶
func NewConsumer(q *Queue, opts ...ConsumerOpt) *Consumer
NewConsumer Consumer's constructor
func (*Consumer) Cancel ¶
func (c *Consumer) Cancel()
Cancel this consumer.
This will CLOSE Deliveries() channel
func (*Consumer) Deliveries ¶
Deliveries return deliveries shipped to this consumer this channel never closed, even on disconnects
type ConsumerOpt ¶
type ConsumerOpt func(*Consumer)
ConsumerOpt is a consumer's functional option type
func AutoTag ¶
func AutoTag() ConsumerOpt
AutoTag set automatically generated tag like this
fmt.Sprintf(QueueName+"-pid-%d@%s", os.Getpid(), os.Hostname())
type Declaration ¶
Declaration is a callback type to declare AMQP queue/exchange/binding
func DeclareExchange ¶
func DeclareExchange(e Exchange) Declaration
DeclareExchange is a way to declare AMQP exchange
func DeclareExchangeBinding ¶ added in v1.0.0
func DeclareExchangeBinding(b ExchangeBinding) Declaration
func DeclareQueue ¶
func DeclareQueue(q *Queue) Declaration
DeclareQueue is a way to declare AMQP queue
func DeclareQueueBinding ¶ added in v1.0.0
func DeclareQueueBinding(b QueueBinding) Declaration
DeclareQueueBinding is a way to declare AMQP binding between AMQP queue and exchange
type Declarer ¶
type Declarer interface { QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error }
Declarer is implemented by *amqp.Channel
type ExchangeBinding ¶ added in v1.0.0
type ExchangeBinding struct { SourceExchange Exchange DestinationExchange Exchange Keys []string Args amqp.Table }
Used to declare binding from one AMQP Exchange to another
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher hold definition for AMQP publishing
func NewPublisher ¶
func NewPublisher(exchange string, key string, opts ...PublisherOpt) *Publisher
NewPublisher is a Publisher constructor
func (*Publisher) Publish ¶
func (p *Publisher) Publish(pub amqp.Publishing) error
Publish used to publish custom amqp.Publishing
WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.
func (*Publisher) PublishWithRoutingKey ¶ added in v0.3.2
func (p *Publisher) PublishWithRoutingKey(pub amqp.Publishing, key string) error
PublishWithRoutingKey used to publish custom amqp.Publishing and routing key
WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.
type PublisherOpt ¶
type PublisherOpt func(*Publisher)
PublisherOpt is a functional option type for Publisher
func PublishingTemplate ¶
func PublishingTemplate(t amqp.Publishing) PublisherOpt
PublishingTemplate Publisher's functional option. Provide template amqp.Publishing and save typing.