Documentation ¶
Overview ¶
Package cony is a high-level wrapper around http://github.com/streadway/amqp library, for working declaratively with AMQP. Cony will manage AMQP connect/reconnect to AMQP broker, along with recovery of consumers.
Example ¶
package main import ( "log" "os" "github.com/streadway/amqp" "github.com/vidmed/cony" ) func main() { client := cony.NewClient(cony.URL(os.Getenv("AMQP_URL")), cony.Backoff(cony.DefaultBackoff)) q := &cony.Queue{ Name: "", // autogenerated queue name AutoDelete: true, } exchange := cony.Exchange{ Name: "amq.topic", Durable: true, } b := cony.Binding{ Queue: q, Exchange: exchange, Key: "something.#", } // wrap all declarations and save into slice declarations := []cony.Declaration{ cony.DeclareQueue(q), cony.DeclareExchange(exchange), cony.DeclareBinding(b), } // declare consumer consumer := cony.NewConsumer(q, cony.Qos(10), cony.AutoTag(), cony.AutoAck(), ) // declare publisher publisher := cony.NewPublisher(exchange.Name, "ololo.key", cony.PublishingTemplate(amqp.Publishing{ ContentType: "application/json", AppId: "app1", }), // template amqp.Publising ) // let client know about declarations client.Declare(declarations) // let client know about consumers/publishers client.Consume(consumer) client.Publish(publisher) clientErrs := client.Errors() deliveries := consumer.Deliveries() consumerErrs := consumer.Errors() // connect, reconnect, or exit loop // run network operations such as: // queue, exchange, bidning, consumers declarations for client.Loop() { select { case msg := <-deliveries: log.Println(msg) msg.Ack(false) publisher.Write([]byte("ololo reply")) case err := <-consumerErrs: log.Println("CONSUMER ERROR: ", err) case err := <-clientErrs: log.Println("CLIENT ERROR: ", err) client.Close() } } }
Output:
Index ¶
- Variables
- type BackoffPolicy
- type Backoffer
- type Binding
- type Client
- func (c *Client) Blocking() <-chan amqp.Blocking
- func (c *Client) CancelConsumer(cons *Consumer)
- func (c *Client) CancelPublisher(pub *Publisher)
- func (c *Client) Close()
- func (c *Client) CloseCh() <-chan struct{}
- func (c *Client) Closed() bool
- func (c *Client) Connect() error
- func (c *Client) Connected() bool
- func (c *Client) Connection() (*amqp.Connection, error)
- func (c *Client) Consume(cons *Consumer)
- func (c *Client) Declare(d []Declaration) error
- func (c *Client) Disconnect() error
- func (c *Client) Errors() <-chan error
- func (c *Client) Loop() bool
- func (c *Client) Publish(pub *Publisher)
- func (c *Client) Redeclare() error
- type ClientOpt
- type Consumer
- func (c *Consumer) AutoAck() bool
- func (c *Consumer) Deliveries() <-chan *amqp.Delivery
- func (c *Consumer) Errors() <-chan error
- func (c *Consumer) Exclusive() bool
- func (c *Consumer) NoLocal() bool
- func (c *Consumer) Qos() int
- func (c *Consumer) Serving() bool
- func (c *Consumer) StopCh() <-chan struct{}
- func (c *Consumer) Stopped() bool
- func (c *Consumer) Tag() string
- type ConsumerOpt
- type Declaration
- type Declarer
- type ErrorsBatch
- func (eb *ErrorsBatch) Add(err error)
- func (eb *ErrorsBatch) CheckEPS(k float64) bool
- func (eb *ErrorsBatch) Errors() []error
- func (eb *ErrorsBatch) First() *time.Time
- func (eb *ErrorsBatch) GetEPS() float64
- func (eb *ErrorsBatch) Last() *time.Time
- func (eb *ErrorsBatch) Len() int
- func (eb *ErrorsBatch) Reset()
- func (eb *ErrorsBatch) Snapshot() *ErrorsBatchSnapshot
- func (eb *ErrorsBatch) String() string
- type ErrorsBatchSnapshot
- type Exchange
- type Logger
- type Publisher
- func (p *Publisher) ConfirmTimeout() time.Duration
- func (p *Publisher) Errors() <-chan error
- func (p *Publisher) Exchange() string
- func (p *Publisher) Publish(pub *amqp.Publishing, timeout time.Duration) error
- func (p *Publisher) PublishWithParams(pub *amqp.Publishing, exchange string, key string, timeout time.Duration) error
- func (p *Publisher) RoutingKey() string
- func (p *Publisher) Serving() bool
- func (p *Publisher) StopCh() <-chan struct{}
- func (p *Publisher) Stopped() bool
- func (p *Publisher) Write(b []byte, timeout time.Duration) (int, error)
- type PublisherOpt
- type Queue
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoConnection is an indicator that currently there is no connection // available ErrNoConnection = errors.New("no connection available") // ErrStopped is an indicator that client is stopped ErrStopped = errors.New("client stopped") // ErrConnected is an indicator that client already connected ErrConnected = errors.New("client already connected") // ErrDisconnectWanted is an indicator that someone called disconnect method ErrDisconnectWanted = errors.New("disconnect wanted") // ErrDisconnectTimeout is an indicator that disconnect timed out ErrDisconnectTimeout = errors.New("disconnect timeout") // ErrFrequentDisconnect is an indicator that there are frequent disconnects ErrFrequentDisconnect = errors.New("too frequent disconnects") // ErrConnectionBlocked is an indicator that connection blocked ErrConnectionBlocked = errors.New("connection blocked") // ErrConnectionClosedByClient is an indicator that connection was closed by this library ErrConnectionClosedByClient = errors.New("connection closed by client") )
var ( // ErrConsumerStopped is an indicator that consumer is stopped ErrConsumerStopped = errors.New("consumer stopped") ErrDeliveriesChannelClosed = errors.New("consumer deliveries serving channel closed") ErrConsumerServingStopped = errors.New("consumer serving stopped") )
var ( // ErrPublisherStopped is an indicator that publisher is stopped // could be returned from Write() and Publish() methods ErrPublisherStopped = errors.New("publisher stopped") ErrConfirmTimeOut = errors.New("confirm timed out") ErrPublishTimeOut = errors.New("publish timed out") ErrPublishingNotConfirmed = errors.New("publishing NOT confirmed") ErrPublisherServingStopped = errors.New("publisher serving stopped") )
Functions ¶
This section is empty.
Types ¶
type BackoffPolicy ¶
type BackoffPolicy struct {
// contains filtered or unexported fields
}
BackoffPolicy is a default Backoffer implementation
type Backoffer ¶
Backoffer is interface to hold Backoff strategy
var DefaultBackoff Backoffer = BackoffPolicy{ []int{0, 10, 100, 200, 500, 1000, 2000, 3000, 5000}, }
DefaultBackoff See: http://blog.gopheracademy.com/advent-2014/backoff/
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Main AMQP client wrapper
func (*Client) Blocking ¶
Blocking notifies the server's TCP flow control of the Connection. Default buffer size is 10. Messages will be dropped in case if receiver can't keep up
func (*Client) CancelConsumer ¶ added in v0.3.3
func (*Client) CancelPublisher ¶ added in v0.3.3
func (*Client) CloseCh ¶ added in v0.3.3
func (c *Client) CloseCh() <-chan struct{}
CloseCh notifies that client was closed
func (*Client) Connect ¶ added in v0.3.3
tries to Connect to amqp if connection already established - return error ErrConnected if not - tries to connect with some Backoff policy also guard connection goroutine runs to handle some connection troubles when something wrong happens guard connection goroutine calls c.reportErr(err) this will cause client channel listener wake up and launch Loop() iteration then established new connection
func (*Client) Connection ¶ added in v0.3.3
func (c *Client) Connection() (*amqp.Connection, error)
func (*Client) Declare ¶
func (c *Client) Declare(d []Declaration) error
Declare used to declare queues/exchanges/bindings. Declaration is saved and will be re-run every time Client gets connection
func (*Client) Disconnect ¶ added in v0.3.3
Disconnect tries gracefully close current connection using closeCurConnectCh channel This did due to the reason call closeCurrentConnection() from the one place - from guard connection goroutine In case of timeout or closing client - error will bw returned
func (*Client) Errors ¶
Errors returns AMQP connection level errors. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up
func (*Client) Loop ¶
Loop should be run as condition for `for` with receiving from (*Client).Errors()
It will manage AMQP connection, run queue and exchange declarations, consumers. Will start to return false once (*Client).Close() called.
Example ¶
package main import ( "log" "time" "github.com/vidmed/cony" ) func main() { client := cony.NewClient(cony.URL("amqp://guest:guest@localhost/")) for client.Loop() { select { case err := <-client.Errors(): log.Println("CLIENT ERROR: ", err) client.Close() } time.Sleep(1 * time.Second) // naive backoff } }
Output:
type ClientOpt ¶
type ClientOpt func(*Client)
ClientOpt is a Client's functional option type
func Backoff ¶
Backoff is a functional option, used to define backoff policy, used in `NewClient` constructor
func BlockingChan ¶
BlockingChan is a functional option, used to initialize blocking reporting channel in client code, maintaining control over buffering, used in `NewClient` constructor
Example ¶
package main import ( "github.com/streadway/amqp" "github.com/vidmed/cony" ) func main() { blockings := make(chan amqp.Blocking, 100) // define custom buffer size cony.NewClient(cony.BlockingChan(blockings)) }
Output:
func Config ¶ added in v0.3.0
Config is a functional option, used to setup extended amqp configuration
func ErrorsChan ¶
ErrorsChan is a functional option, used to initialize error reporting channel in client code, maintaining control over buffer size. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up, used in `NewClient` constructor
Example ¶
package main import ( "github.com/vidmed/cony" ) func main() { errors := make(chan error, 100) // define custom buffer size cony.NewClient(cony.ErrorsChan(errors)) }
Output:
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer holds definition for AMQP consumer
func NewConsumer ¶
func NewConsumer(q *Queue, opts ...ConsumerOpt) *Consumer
NewConsumer Consumer's constructor
func (*Consumer) Deliveries ¶
Deliveries return deliveries shipped to this consumer this channel never closed, even on disconnects
func (*Consumer) Serving ¶ added in v0.3.3
Serving returns if this consumer is serving at the moment
func (*Consumer) StopCh ¶ added in v0.3.3
func (c *Consumer) StopCh() <-chan struct{}
StopCh returns channel which will be closed when consumer stopped
type ConsumerOpt ¶
type ConsumerOpt func(*Consumer)
ConsumerOpt is a consumer's functional option type
func AutoTag ¶
func AutoTag() ConsumerOpt
AutoTag set automatically generated tag like this
fmt.Sprintf(QueueName+"-pid-%d@%s", os.Getpid(), os.Hostname())
func ConsumerLog ¶ added in v0.3.3
func ConsumerLog(l Logger) ConsumerOpt
ConsumerLog is a functional option, used in `NewConsumer` constructor to set the logger
type Declaration ¶
Declaration is a callback type to declare AMQP queue/exchange/binding
func DeclareBinding ¶
func DeclareBinding(b *Binding) Declaration
DeclareBinding is a way to declare AMQP binding between AMQP queue and exchange
func DeclareExchange ¶
func DeclareExchange(e *Exchange) Declaration
DeclareExchange is a way to declare AMQP exchange
func DeclareQueue ¶
func DeclareQueue(q *Queue) Declaration
DeclareQueue is a way to declare AMQP queue
type Declarer ¶
type Declarer interface { QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error }
Declarer is implemented by *amqp.Channel
type ErrorsBatch ¶ added in v0.3.3
type ErrorsBatch struct {
// contains filtered or unexported fields
}
func NewErrorBatch ¶ added in v0.3.3
func NewErrorBatch() *ErrorsBatch
func (*ErrorsBatch) Add ¶ added in v0.3.3
func (eb *ErrorsBatch) Add(err error)
func (*ErrorsBatch) CheckEPS ¶ added in v0.3.3
func (eb *ErrorsBatch) CheckEPS(k float64) bool
EPS - errors per second CheckEPS takes wanted coefficient and returns if current ErrorsBatch eps coefficient grater that given coefficient
func (*ErrorsBatch) Errors ¶ added in v0.3.3
func (eb *ErrorsBatch) Errors() []error
func (*ErrorsBatch) First ¶ added in v0.3.3
func (eb *ErrorsBatch) First() *time.Time
func (*ErrorsBatch) GetEPS ¶ added in v0.3.3
func (eb *ErrorsBatch) GetEPS() float64
EPS - errors per second GetEPS returns current ErrorsBatch eps coefficient
func (*ErrorsBatch) Last ¶ added in v0.3.3
func (eb *ErrorsBatch) Last() *time.Time
func (*ErrorsBatch) Len ¶ added in v0.3.3
func (eb *ErrorsBatch) Len() int
func (*ErrorsBatch) Reset ¶ added in v0.3.3
func (eb *ErrorsBatch) Reset()
func (*ErrorsBatch) Snapshot ¶ added in v0.3.3
func (eb *ErrorsBatch) Snapshot() *ErrorsBatchSnapshot
func (*ErrorsBatch) String ¶ added in v0.3.3
func (eb *ErrorsBatch) String() string
type ErrorsBatchSnapshot ¶ added in v0.3.3
type Exchange ¶
type Exchange struct { Name string Kind string Durable bool AutoDelete bool Internal bool NoWait bool Args amqp.Table }
Exchange hold definition of AMQP exchange
type Logger ¶ added in v0.3.3
type Logger interface { // Log a message at the given level with context key/value pairs Trace(msg string, ctx ...interface{}) Debug(msg string, ctx ...interface{}) Info(msg string, ctx ...interface{}) Warn(msg string, ctx ...interface{}) Error(msg string, ctx ...interface{}) Crit(msg string, ctx ...interface{}) }
A Logger writes key/value pairs to a Handler
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher hold definition for AMQP publishing
func NewPublisher ¶
func NewPublisher(exchange string, key string, opts ...PublisherOpt) *Publisher
NewPublisher is a Publisher constructor
func (*Publisher) ConfirmTimeout ¶ added in v0.3.3
func (*Publisher) Publish ¶
Publish used to publish custom amqp.Publishing
WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.
func (*Publisher) PublishWithParams ¶ added in v0.3.3
func (p *Publisher) PublishWithParams(pub *amqp.Publishing, exchange string, key string, timeout time.Duration) error
PublishWithRoutingKey used to publish custom amqp.Publishing and routing key
WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.
func (*Publisher) RoutingKey ¶ added in v0.3.3
func (*Publisher) Serving ¶ added in v0.3.3
Serving returns if this publisher is serving at the moment
func (*Publisher) StopCh ¶ added in v0.3.3
func (p *Publisher) StopCh() <-chan struct{}
StopCh returns channel which will be closed when publisher stopped
type PublisherOpt ¶
type PublisherOpt func(*Publisher)
PublisherOpt is a functional option type for Publisher
func PublisherConfirmTimeout ¶ added in v0.3.3
func PublisherConfirmTimeout(t time.Duration) PublisherOpt
PublisherConfirmTimeout is a functional option, used in `NewPublisher` constructor to set the confirm timeout
func PublisherLog ¶ added in v0.3.3
func PublisherLog(l Logger) PublisherOpt
PublisherLog is a functional option, used in `NewPublisher` constructor to set the logger
func PublishingTemplate ¶
func PublishingTemplate(t amqp.Publishing) PublisherOpt
PublishingTemplate Publisher's functional option. Provide template amqp.Publishing and save typing.