cony

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2017 License: BSD-2-Clause Imports: 8 Imported by: 0

README

Cony

High-level AMQP 0.9.1 client library. It's wrapper around low-level streadway/amqp library.

Goals

Provide a way to work with AMQP declaratively

Requirments

The library uses atomic.Value, so Go 1.4+ is needed.

Documentation

GoDoc Build Status

Thread-safety

Cony is thread-safe as long as streadway/amqp is thread-safe. It's recommended to open AMQP channel per thread, so in case of cony it should be Consumer Producer per goroutine.

License

BSD 2 clause - see LICENSE for more details.

Documentation

Overview

Package cony is a high-level wrapper around http://github.com/streadway/amqp library, for working declaratively with AMQP. Cony will manage AMQP connect/reconnect to AMQP broker, along with recovery of consumers.

Example
package main

import (
	"log"
	"os"

	"github.com/assembla/cony"
	"github.com/streadway/amqp"
)

func main() {
	client := cony.NewClient(cony.URL(os.Getenv("AMQP_URL")), cony.Backoff(cony.DefaultBackoff))

	q := &cony.Queue{
		Name:       "", // autogenerated queue name
		AutoDelete: true,
	}

	exchange := cony.Exchange{
		Name:    "amq.topic",
		Durable: true,
	}

	b := cony.Binding{
		Queue:    q,
		Exchange: exchange,
		Key:      "something.#",
	}

	// wrap all declarations and save into slice
	declarations := []cony.Declaration{
		cony.DeclareQueue(q),
		cony.DeclareExchange(exchange),
		cony.DeclareBinding(b),
	}

	// declare consumer
	consumer := cony.NewConsumer(q,
		cony.Qos(10),
		cony.AutoTag(),
		cony.AutoAck(),
	)

	// declare publisher
	publisher := cony.NewPublisher(exchange.Name,
		"ololo.key",
		cony.PublishingTemplate(amqp.Publishing{
			ContentType: "application/json",
			AppId:       "app1",
		}), // template amqp.Publising
	)

	// let client know about declarations
	client.Declare(declarations)

	// let client know about consumers/publishers
	client.Consume(consumer)
	client.Publish(publisher)

	clientErrs := client.Errors()
	deliveries := consumer.Deliveries()
	consumerErrs := consumer.Errors()

	// connect, reconnect, or exit loop
	// run network operations such as:
	// queue, exchange, bidning, consumers declarations
	for client.Loop() {
		select {
		case msg := <-deliveries:
			log.Println(msg)
			msg.Ack(false)
			publisher.Write([]byte("ololo reply"))
		case err := <-consumerErrs:
			log.Println("CONSUMER ERROR: ", err)
		case err := <-clientErrs:
			log.Println("CLIENT ERROR: ", err)
			client.Close()
		}
	}

}
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoConnection is an indicator that currently there is no connection
	// available
	ErrNoConnection = errors.New("No connection available")
)
View Source
var ErrPublisherDead = errors.New("Publisher is dead")

ErrPublisherDead indicates that publisher was canceled, could be returned from Write() and Publish() methods

Functions

This section is empty.

Types

type BackoffPolicy

type BackoffPolicy struct {
	// contains filtered or unexported fields
}

BackoffPolicy is a default Backoffer implementation

func (BackoffPolicy) Backoff

func (b BackoffPolicy) Backoff(n int) time.Duration

Backoff implements Backoffer

type Backoffer

type Backoffer interface {
	Backoff(int) time.Duration
}

Backoffer is interface to hold Backoff strategy

var DefaultBackoff Backoffer = BackoffPolicy{
	[]int{0, 10, 100, 200, 500, 1000, 2000, 3000, 5000},
}

DefaultBackoff See: http://blog.gopheracademy.com/advent-2014/backoff/

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Main AMQP client wrapper

func NewClient

func NewClient(opts ...ClientOpt) *Client

NewClient initializes new Client

func (*Client) Blocking

func (c *Client) Blocking() <-chan amqp.Blocking

Blocking notifies the server's TCP flow control of the Connection. Default buffer size is 10. Messages will be dropped in case if receiver can't keep up

func (*Client) Close

func (c *Client) Close()

Close shutdown the client

func (*Client) Consume

func (c *Client) Consume(cons *Consumer)

Consume used to declare consumers

func (*Client) Declare

func (c *Client) Declare(d []Declaration) error

Declare used to declare queues/exchanges/bindings. Declaration is saved and will be re-run every time Client gets connection

func (*Client) Errors

func (c *Client) Errors() <-chan error

Errors returns AMQP connection level errors. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up

func (*Client) Loop

func (c *Client) Loop() bool

Loop should be run as condition for `for` with receiving from (*Client).Errors()

It will manage AMQP connection, run queue and exchange declarations, consumers. Will start to return false once (*Client).Close() called.

Example
package main

import (
	"log"
	"time"

	"github.com/assembla/cony"
)

func main() {
	client := cony.NewClient(cony.URL("amqp://guest:guest@localhost/"))

	for client.Loop() {
		select {
		case err := <-client.Errors():
			log.Println("CLIENT ERROR: ", err)
			client.Close()
		}
		time.Sleep(1 * time.Second) // naive backoff
	}
}
Output:

func (*Client) Publish

func (c *Client) Publish(pub *Publisher)

Publish used to declare publishers

type ClientOpt

type ClientOpt func(*Client)

ClientOpt is a Client's functional option type

func Backoff

func Backoff(bo Backoffer) ClientOpt

Backoff is a functional option, used to define backoff policy, used in `NewClient` constructor

func BlockingChan

func BlockingChan(blockingChan chan amqp.Blocking) ClientOpt

BlockingChan is a functional option, used to initialize blocking reporting channel in client code, maintaining control over buffering, used in `NewClient` constructor

Example
package main

import (
	"github.com/assembla/cony"
	"github.com/streadway/amqp"
)

func main() {
	blockings := make(chan amqp.Blocking, 100) // define custom buffer size
	cony.NewClient(cony.BlockingChan(blockings))
}
Output:

func Config added in v0.3.0

func Config(config amqp.Config) ClientOpt

Config is a functional option, used to setup extended amqp configuration

func ErrorsChan

func ErrorsChan(errChan chan error) ClientOpt

ErrorsChan is a functional option, used to initialize error reporting channel in client code, maintaining control over buffer size. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up, used in `NewClient` constructor

Example
package main

import (
	"github.com/assembla/cony"
)

func main() {
	errors := make(chan error, 100) // define custom buffer size
	cony.NewClient(cony.ErrorsChan(errors))
}
Output:

func URL

func URL(addr string) ClientOpt

URL is a functional option, used in `NewClient` constructor default URL is amqp://guest:guest@localhost/

Example
package main

import (
	"github.com/assembla/cony"
)

func main() {
	cony.NewClient(cony.URL("amqp://guest:guest@localhost/"))
}
Output:

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer holds definition for AMQP consumer

func NewConsumer

func NewConsumer(q *Queue, opts ...ConsumerOpt) *Consumer

NewConsumer Consumer's constructor

func (*Consumer) Cancel

func (c *Consumer) Cancel()

Cancel this consumer.

This will CLOSE Deliveries() channel

func (*Consumer) Deliveries

func (c *Consumer) Deliveries() <-chan amqp.Delivery

Deliveries return deliveries shipped to this consumer this channel never closed, even on disconnects

func (*Consumer) Errors

func (c *Consumer) Errors() <-chan error

Errors returns channel with AMQP channel level errors

type ConsumerOpt

type ConsumerOpt func(*Consumer)

ConsumerOpt is a consumer's functional option type

func AutoAck

func AutoAck() ConsumerOpt

AutoAck set this consumer in AutoAck mode

func AutoTag

func AutoTag() ConsumerOpt

AutoTag set automatically generated tag like this

fmt.Sprintf(QueueName+"-pid-%d@%s", os.Getpid(), os.Hostname())

func Exclusive

func Exclusive() ConsumerOpt

Exclusive set this consumer in exclusive mode

func NoLocal

func NoLocal() ConsumerOpt

NoLocal set this consumer in NoLocal mode.

func Qos

func Qos(count int) ConsumerOpt

Qos on channel

func Tag

func Tag(tag string) ConsumerOpt

Tag the consumer

type Declaration

type Declaration func(Declarer) error

Declaration is a callback type to declare AMQP queue/exchange/binding

func DeclareExchange

func DeclareExchange(e Exchange) Declaration

DeclareExchange is a way to declare AMQP exchange

func DeclareExchangeBinding added in v1.0.0

func DeclareExchangeBinding(b ExchangeBinding) Declaration

func DeclareQueue

func DeclareQueue(q *Queue) Declaration

DeclareQueue is a way to declare AMQP queue

func DeclareQueueBinding added in v1.0.0

func DeclareQueueBinding(b QueueBinding) Declaration

DeclareQueueBinding is a way to declare AMQP binding between AMQP queue and exchange

type Declarer

type Declarer interface {
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
	ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
}

Declarer is implemented by *amqp.Channel

type Exchange

type Exchange struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Args       amqp.Table
}

Exchange hold definition of AMQP exchange

type ExchangeBinding added in v1.0.0

type ExchangeBinding struct {
	SourceExchange      Exchange
	DestinationExchange Exchange
	Keys                []string
	Args                amqp.Table
}

Used to declare binding from one AMQP Exchange to another

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher hold definition for AMQP publishing

func NewPublisher

func NewPublisher(exchange string, key string, opts ...PublisherOpt) *Publisher

NewPublisher is a Publisher constructor

func (*Publisher) Cancel

func (p *Publisher) Cancel()

Cancel this publisher

func (*Publisher) Publish

func (p *Publisher) Publish(pub amqp.Publishing) error

Publish used to publish custom amqp.Publishing

WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.

func (*Publisher) PublishWithRoutingKey added in v0.3.2

func (p *Publisher) PublishWithRoutingKey(pub amqp.Publishing, key string) error

PublishWithRoutingKey used to publish custom amqp.Publishing and routing key

WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.

func (*Publisher) Write

func (p *Publisher) Write(b []byte) (int, error)

Template will be used, input buffer will be added as Publishing.Body. return int will always be len(b)

Implements io.Writer

WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.

type PublisherOpt

type PublisherOpt func(*Publisher)

PublisherOpt is a functional option type for Publisher

func PublishingTemplate

func PublishingTemplate(t amqp.Publishing) PublisherOpt

PublishingTemplate Publisher's functional option. Provide template amqp.Publishing and save typing.

type Queue

type Queue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	Args       amqp.Table
	// contains filtered or unexported fields
}

Queue hold definition of AMQP queue

type QueueBinding added in v1.0.0

type QueueBinding struct {
	Queue    *Queue
	Exchange Exchange
	Keys     []string
	Args     amqp.Table
}

Binding used to declare binding between AMQP Queue and AMQP Exchange

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL