cony

package module
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2019 License: BSD-2-Clause Imports: 12 Imported by: 9

README

Cony

High-level AMQP 0.9.1 client library. It's wrapper around low-level streadway/amqp library.

Goals

Provide a way to work with AMQP declaratively

Requirments

The library uses atomic.Value, so Go 1.4+ is needed.

Documentation

GoDoc Build Status

Thread-safety

Cony is thread-safe as long as streadway/amqp is thread-safe. It's recommended to open AMQP channel per thread, so in case of cony it should be Consumer Producer per goroutine.

License

BSD 2 clause - see LICENSE for more details.

Documentation

Overview

Package cony is a high-level wrapper around http://github.com/streadway/amqp library, for working declaratively with AMQP. Cony will manage AMQP connect/reconnect to AMQP broker, along with recovery of consumers.

Example
package main

import (
	"log"
	"os"

	"github.com/streadway/amqp"
	"github.com/vidmed/cony"
)

func main() {
	client := cony.NewClient(cony.URL(os.Getenv("AMQP_URL")), cony.Backoff(cony.DefaultBackoff))

	q := &cony.Queue{
		Name:       "", // autogenerated queue name
		AutoDelete: true,
	}

	exchange := cony.Exchange{
		Name:    "amq.topic",
		Durable: true,
	}

	b := cony.Binding{
		Queue:    q,
		Exchange: exchange,
		Key:      "something.#",
	}

	// wrap all declarations and save into slice
	declarations := []cony.Declaration{
		cony.DeclareQueue(q),
		cony.DeclareExchange(exchange),
		cony.DeclareBinding(b),
	}

	// declare consumer
	consumer := cony.NewConsumer(q,
		cony.Qos(10),
		cony.AutoTag(),
		cony.AutoAck(),
	)

	// declare publisher
	publisher := cony.NewPublisher(exchange.Name,
		"ololo.key",
		cony.PublishingTemplate(amqp.Publishing{
			ContentType: "application/json",
			AppId:       "app1",
		}), // template amqp.Publising
	)

	// let client know about declarations
	client.Declare(declarations)

	// let client know about consumers/publishers
	client.Consume(consumer)
	client.Publish(publisher)

	clientErrs := client.Errors()
	deliveries := consumer.Deliveries()
	consumerErrs := consumer.Errors()

	// connect, reconnect, or exit loop
	// run network operations such as:
	// queue, exchange, bidning, consumers declarations
	for client.Loop() {
		select {
		case msg := <-deliveries:
			log.Println(msg)
			msg.Ack(false)
			publisher.Write([]byte("ololo reply"))
		case err := <-consumerErrs:
			log.Println("CONSUMER ERROR: ", err)
		case err := <-clientErrs:
			log.Println("CLIENT ERROR: ", err)
			client.Close()
		}
	}

}
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoConnection is an indicator that currently there is no connection
	// available
	ErrNoConnection = errors.New("no connection available")
	// ErrStopped is an indicator that client is stopped
	ErrStopped = errors.New("client stopped")
	// ErrConnected is an indicator that client already connected
	ErrConnected = errors.New("client already connected")
	// ErrDisconnectWanted is an indicator that someone called disconnect method
	ErrDisconnectWanted = errors.New("disconnect wanted")
	// ErrDisconnectTimeout is an indicator that disconnect timed out
	ErrDisconnectTimeout = errors.New("disconnect timeout")
	// ErrFrequentDisconnect is an indicator that there are frequent disconnects
	ErrFrequentDisconnect = errors.New("too frequent disconnects")
	// ErrConnectionBlocked is an indicator that connection blocked
	ErrConnectionBlocked = errors.New("connection blocked")
	// ErrConnectionClosedByClient is an indicator that connection was closed by this library
	ErrConnectionClosedByClient = errors.New("connection closed by client")
)
View Source
var (
	// ErrConsumerStopped is an indicator that consumer is stopped
	ErrConsumerStopped         = errors.New("consumer stopped")
	ErrDeliveriesChannelClosed = errors.New("consumer deliveries serving channel closed")
	ErrConsumerServingStopped  = errors.New("consumer serving stopped")
)
View Source
var (
	// ErrPublisherStopped is an indicator that publisher is stopped
	// could be returned from Write() and Publish() methods
	ErrPublisherStopped        = errors.New("publisher stopped")
	ErrConfirmTimeOut          = errors.New("confirm timed out")
	ErrPublishTimeOut          = errors.New("publish timed out")
	ErrPublishingNotConfirmed  = errors.New("publishing NOT confirmed")
	ErrPublisherServingStopped = errors.New("publisher serving stopped")
)

Functions

This section is empty.

Types

type BackoffPolicy

type BackoffPolicy struct {
	// contains filtered or unexported fields
}

BackoffPolicy is a default Backoffer implementation

func (BackoffPolicy) Backoff

func (b BackoffPolicy) Backoff(n int) time.Duration

Backoff implements Backoffer

type Backoffer

type Backoffer interface {
	Backoff(int) time.Duration
}

Backoffer is interface to hold Backoff strategy

var DefaultBackoff Backoffer = BackoffPolicy{
	[]int{0, 10, 100, 200, 500, 1000, 2000, 3000, 5000},
}

DefaultBackoff See: http://blog.gopheracademy.com/advent-2014/backoff/

type Binding

type Binding struct {
	Queue    *Queue
	Exchange *Exchange
	Key      string
	NoWait   bool
	Args     amqp.Table
}

Binding used to declare binding between AMQP Queue and AMQP Exchange

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Main AMQP client wrapper

func NewClient

func NewClient(opts ...ClientOpt) *Client

NewClient initializes new Client

func (*Client) Blocking

func (c *Client) Blocking() <-chan amqp.Blocking

Blocking notifies the server's TCP flow control of the Connection. Default buffer size is 10. Messages will be dropped in case if receiver can't keep up

func (*Client) CancelConsumer added in v0.3.3

func (c *Client) CancelConsumer(cons *Consumer)

func (*Client) CancelPublisher added in v0.3.3

func (c *Client) CancelPublisher(pub *Publisher)

func (*Client) Close

func (c *Client) Close()

Close shutdown the client

func (*Client) CloseCh added in v0.3.3

func (c *Client) CloseCh() <-chan struct{}

CloseCh notifies that client was closed

func (*Client) Closed added in v0.3.3

func (c *Client) Closed() bool

Canceled checks if Client is cancelled

func (*Client) Connect added in v0.3.3

func (c *Client) Connect() error

tries to Connect to amqp if connection already established - return error ErrConnected if not - tries to connect with some Backoff policy also guard connection goroutine runs to handle some connection troubles when something wrong happens guard connection goroutine calls c.reportErr(err) this will cause client channel listener wake up and launch Loop() iteration then established new connection

func (*Client) Connected added in v0.3.3

func (c *Client) Connected() bool

func (*Client) Connection added in v0.3.3

func (c *Client) Connection() (*amqp.Connection, error)

func (*Client) Consume

func (c *Client) Consume(cons *Consumer)

Consume used to declare consumers

func (*Client) Declare

func (c *Client) Declare(d []Declaration) error

Declare used to declare queues/exchanges/bindings. Declaration is saved and will be re-run every time Client gets connection

func (*Client) Disconnect added in v0.3.3

func (c *Client) Disconnect() error

Disconnect tries gracefully close current connection using closeCurConnectCh channel This did due to the reason call closeCurrentConnection() from the one place - from guard connection goroutine In case of timeout or closing client - error will bw returned

func (*Client) Errors

func (c *Client) Errors() <-chan error

Errors returns AMQP connection level errors. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up

func (*Client) Loop

func (c *Client) Loop() bool

Loop should be run as condition for `for` with receiving from (*Client).Errors()

It will manage AMQP connection, run queue and exchange declarations, consumers. Will start to return false once (*Client).Close() called.

Example
package main

import (
	"log"
	"time"

	"github.com/vidmed/cony"
)

func main() {
	client := cony.NewClient(cony.URL("amqp://guest:guest@localhost/"))

	for client.Loop() {
		select {
		case err := <-client.Errors():
			log.Println("CLIENT ERROR: ", err)
			client.Close()
		}
		time.Sleep(1 * time.Second) // naive backoff
	}
}
Output:

func (*Client) Publish

func (c *Client) Publish(pub *Publisher)

Publish used to declare publishers

func (*Client) Redeclare added in v0.3.3

func (c *Client) Redeclare() error

type ClientOpt

type ClientOpt func(*Client)

ClientOpt is a Client's functional option type

func Backoff

func Backoff(bo Backoffer) ClientOpt

Backoff is a functional option, used to define backoff policy, used in `NewClient` constructor

func BlockingChan

func BlockingChan(blockingChan chan amqp.Blocking) ClientOpt

BlockingChan is a functional option, used to initialize blocking reporting channel in client code, maintaining control over buffering, used in `NewClient` constructor

Example
package main

import (
	"github.com/streadway/amqp"
	"github.com/vidmed/cony"
)

func main() {
	blockings := make(chan amqp.Blocking, 100) // define custom buffer size
	cony.NewClient(cony.BlockingChan(blockings))
}
Output:

func Config added in v0.3.0

func Config(config amqp.Config) ClientOpt

Config is a functional option, used to setup extended amqp configuration

func ErrorsChan

func ErrorsChan(errChan chan error) ClientOpt

ErrorsChan is a functional option, used to initialize error reporting channel in client code, maintaining control over buffer size. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up, used in `NewClient` constructor

Example
package main

import (
	"github.com/vidmed/cony"
)

func main() {
	errors := make(chan error, 100) // define custom buffer size
	cony.NewClient(cony.ErrorsChan(errors))
}
Output:

func Log added in v0.3.3

func Log(l Logger) ClientOpt

Log is a functional option, used in `NewClient` constructor to set the logger

func URLs added in v0.3.3

func URLs(addrs []string) ClientOpt

URL is a functional option, used in `NewClient` constructor default URL is amqp://guest:guest@localhost/

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer holds definition for AMQP consumer

func NewConsumer

func NewConsumer(q *Queue, opts ...ConsumerOpt) *Consumer

NewConsumer Consumer's constructor

func (*Consumer) AutoAck added in v0.3.3

func (c *Consumer) AutoAck() bool

func (*Consumer) Deliveries

func (c *Consumer) Deliveries() <-chan *amqp.Delivery

Deliveries return deliveries shipped to this consumer this channel never closed, even on disconnects

func (*Consumer) Errors

func (c *Consumer) Errors() <-chan error

Errors returns channel with AMQP channel level errors

func (*Consumer) Exclusive added in v0.3.3

func (c *Consumer) Exclusive() bool

func (*Consumer) NoLocal added in v0.3.3

func (c *Consumer) NoLocal() bool

func (*Consumer) Qos added in v0.3.3

func (c *Consumer) Qos() int

func (*Consumer) Serving added in v0.3.3

func (c *Consumer) Serving() bool

Serving returns if this consumer is serving at the moment

func (*Consumer) StopCh added in v0.3.3

func (c *Consumer) StopCh() <-chan struct{}

StopCh returns channel which will be closed when consumer stopped

func (*Consumer) Stopped added in v0.3.3

func (c *Consumer) Stopped() bool

Canceled checks if Consumer is cancelled

func (*Consumer) Tag added in v0.3.3

func (c *Consumer) Tag() string

type ConsumerOpt

type ConsumerOpt func(*Consumer)

ConsumerOpt is a consumer's functional option type

func AutoAck

func AutoAck() ConsumerOpt

AutoAck set this consumer in AutoAck mode

func AutoTag

func AutoTag() ConsumerOpt

AutoTag set automatically generated tag like this

fmt.Sprintf(QueueName+"-pid-%d@%s", os.Getpid(), os.Hostname())

func ConsumerLog added in v0.3.3

func ConsumerLog(l Logger) ConsumerOpt

ConsumerLog is a functional option, used in `NewConsumer` constructor to set the logger

func Exclusive

func Exclusive() ConsumerOpt

Exclusive set this consumer in exclusive mode

func NoLocal

func NoLocal() ConsumerOpt

NoLocal set this consumer in NoLocal mode.

func Qos

func Qos(count int) ConsumerOpt

Qos on channel

func Tag

func Tag(tag string) ConsumerOpt

Tag the consumer

type Declaration

type Declaration func(Declarer) error

Declaration is a callback type to declare AMQP queue/exchange/binding

func DeclareBinding

func DeclareBinding(b *Binding) Declaration

DeclareBinding is a way to declare AMQP binding between AMQP queue and exchange

func DeclareExchange

func DeclareExchange(e *Exchange) Declaration

DeclareExchange is a way to declare AMQP exchange

func DeclareQueue

func DeclareQueue(q *Queue) Declaration

DeclareQueue is a way to declare AMQP queue

type Declarer

type Declarer interface {
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
}

Declarer is implemented by *amqp.Channel

type ErrorsBatch added in v0.3.3

type ErrorsBatch struct {
	// contains filtered or unexported fields
}

func NewErrorBatch added in v0.3.3

func NewErrorBatch() *ErrorsBatch

func (*ErrorsBatch) Add added in v0.3.3

func (eb *ErrorsBatch) Add(err error)

func (*ErrorsBatch) CheckEPS added in v0.3.3

func (eb *ErrorsBatch) CheckEPS(k float64) bool

EPS - errors per second CheckEPS takes wanted coefficient and returns if current ErrorsBatch eps coefficient grater that given coefficient

func (*ErrorsBatch) Errors added in v0.3.3

func (eb *ErrorsBatch) Errors() []error

func (*ErrorsBatch) First added in v0.3.3

func (eb *ErrorsBatch) First() *time.Time

func (*ErrorsBatch) GetEPS added in v0.3.3

func (eb *ErrorsBatch) GetEPS() float64

EPS - errors per second GetEPS returns current ErrorsBatch eps coefficient

func (*ErrorsBatch) Last added in v0.3.3

func (eb *ErrorsBatch) Last() *time.Time

func (*ErrorsBatch) Len added in v0.3.3

func (eb *ErrorsBatch) Len() int

func (*ErrorsBatch) Reset added in v0.3.3

func (eb *ErrorsBatch) Reset()

func (*ErrorsBatch) Snapshot added in v0.3.3

func (eb *ErrorsBatch) Snapshot() *ErrorsBatchSnapshot

func (*ErrorsBatch) String added in v0.3.3

func (eb *ErrorsBatch) String() string

type ErrorsBatchSnapshot added in v0.3.3

type ErrorsBatchSnapshot struct {
	Errors    []error
	ErrorsLen int
	First     *time.Time
	Last      *time.Time
	LastReset *time.Time
}

type Exchange

type Exchange struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

Exchange hold definition of AMQP exchange

type Logger added in v0.3.3

type Logger interface {
	// Log a message at the given level with context key/value pairs
	Trace(msg string, ctx ...interface{})
	Debug(msg string, ctx ...interface{})
	Info(msg string, ctx ...interface{})
	Warn(msg string, ctx ...interface{})
	Error(msg string, ctx ...interface{})
	Crit(msg string, ctx ...interface{})
}

A Logger writes key/value pairs to a Handler

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher hold definition for AMQP publishing

func NewPublisher

func NewPublisher(exchange string, key string, opts ...PublisherOpt) *Publisher

NewPublisher is a Publisher constructor

func (*Publisher) ConfirmTimeout added in v0.3.3

func (p *Publisher) ConfirmTimeout() time.Duration

func (*Publisher) Errors added in v0.3.3

func (p *Publisher) Errors() <-chan error

Errors returns channel with AMQP channel level errors

func (*Publisher) Exchange added in v0.3.3

func (p *Publisher) Exchange() string

func (*Publisher) Publish

func (p *Publisher) Publish(pub *amqp.Publishing, timeout time.Duration) error

Publish used to publish custom amqp.Publishing

WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.

func (*Publisher) PublishWithParams added in v0.3.3

func (p *Publisher) PublishWithParams(pub *amqp.Publishing, exchange string, key string, timeout time.Duration) error

PublishWithRoutingKey used to publish custom amqp.Publishing and routing key

WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.

func (*Publisher) RoutingKey added in v0.3.3

func (p *Publisher) RoutingKey() string

func (*Publisher) Serving added in v0.3.3

func (p *Publisher) Serving() bool

Serving returns if this publisher is serving at the moment

func (*Publisher) StopCh added in v0.3.3

func (p *Publisher) StopCh() <-chan struct{}

StopCh returns channel which will be closed when publisher stopped

func (*Publisher) Stopped added in v0.3.3

func (p *Publisher) Stopped() bool

Canceled checks if Consumer is cancelled

func (*Publisher) Write

func (p *Publisher) Write(b []byte, timeout time.Duration) (int, error)

Template will be used, input buffer will be added as Publishing.Body. return int will always be len(b)

Implements io.Writer

WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.

type PublisherOpt

type PublisherOpt func(*Publisher)

PublisherOpt is a functional option type for Publisher

func PublisherConfirmTimeout added in v0.3.3

func PublisherConfirmTimeout(t time.Duration) PublisherOpt

PublisherConfirmTimeout is a functional option, used in `NewPublisher` constructor to set the confirm timeout

func PublisherLog added in v0.3.3

func PublisherLog(l Logger) PublisherOpt

PublisherLog is a functional option, used in `NewPublisher` constructor to set the logger

func PublishingTemplate

func PublishingTemplate(t amqp.Publishing) PublisherOpt

PublishingTemplate Publisher's functional option. Provide template amqp.Publishing and save typing.

type Queue

type Queue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
	// contains filtered or unexported fields
}

Queue hold definition of AMQP queue

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL