relay

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2020 License: MIT Imports: 16 Imported by: 0

README

relay

Relay is a Go framework for task queues, and makes writing code for publishers and consumers very simple. It is a wrapper around the AMQP protocol, and relies on a message broker like RabbitMQ.

The reason Relay exists is that AMQP is a tedious protocol to deal with, and the high level abstraction of a task queue is often something that is desirable. With Relay, you simply Publish objects into task queues, and Consume them on the other end.

Features

  • Simple to use, hides the AMQP details
  • Flexible encoding and decoding support
  • Configuration changes instead of code changes

Documentation

See the online documentation here: http://godoc.org/github.com/armon/relay.

Example

Here is an example of a simple publisher:

conf := &relay.Config{Addr: "rabbitmq"}
conn, err := relay.New(conf)
defer conn.Close()

pub, err := conn.Publisher("tasks")
defer pub.Close()

pub.Publish("this is a test message")

Here is an example of a simple consumer:

conf := &relay.Config{Addr: "rabbitmq"}
conn, err := relay.New(conf)
defer conn.Close()

cons, err := conn.Consumer("tasks")
defer cons.Close()

var msg string
for {
    cons.ConsumeAck(&msg)
    fmt.Printf("Got msg: %s\n", msg)
}

Here is an example of a consumer using prefetching and multi Acks:

conf := &relay.Config{Addr: "rabbitmq", PrefetchCount: 5, EnableMultiAck: true}
conn, err := relay.New(conf)
defer conn.Close()

cons, err := conn.Consumer("tasks")
defer cons.Close()

var msg string
for {
    // Consume 5 messages
    for i := 0; i < 5; i++ {
        cons.Consume(&msg)
        fmt.Printf("Got msg: %s\n", msg)
    }

    // Acks the last 5 messages
    cons.Ack()
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ChannelClosed = fmt.Errorf("Channel closed!")

Returned to indicate a closed channel

View Source
var TimedOut = fmt.Errorf("Timeout")

Returned to indicate a read timeout

Functions

func IsDecodeFailure

func IsDecodeFailure(err error) bool

IsDecodeFailure is a helper to determine if the error returned is a deserialization error.

Types

type Config

type Config struct {
	Addr                  string        // Host address to dial
	Port                  int           // Host por to bind
	Vhost                 string        // Broker Vhost
	Username              string        // Broker username
	Password              string        // Broker password
	EnableTLS             bool          // Broker TLS connection
	PrefetchCount         int           // How many messages to prefetch. If < 1, defaults to 1.
	EnableMultiAck        bool          // Controls if we allow multi acks
	DisablePublishConfirm bool          // Disables confirmations of publish
	DisablePersistence    bool          // Disables message persistence
	Exchange              string        // Custom exchange. Defaults to "relay"
	ExchangeType          string        // Type of exchange. Defaults to "direct"
	Serializer            Serializer    // Defaults to GOBSerializer
	MessageTTL            time.Duration // Optional, attempts to put a TTL on message life
	QueueTTL              time.Duration // Optional, attempts to make a TTL on a queue life
	WithoutPrefix         bool          // by default "relay." prefix will be used for legacy support
	AutoDelete            bool          // Optional, delete queue for after disconnect, always works for unnamed queues
}

Config is passed into New when creating a Relay to tune various parameters around broker interactions.

func ConfigFromURI

func ConfigFromURI(amqpUri string) (*Config, error)

ConfigFromURI attempts to parse the given AMQP URI according to the spec and return a relay config based on it. See http://www.rabbitmq.com/uri-spec.html.

Default values for the fields are:

Scheme: amqp
Host: localhost
Port: 5672
Username: guest
Password: guest
Vhost: /

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a type that is used only for consuming messages from a single queue. Multiple Consumers can multiplex a single relay

func (*Consumer) Ack

func (c *Consumer) Ack() error

Ack will send an acknowledgement to the server that the last message returned by Consume was processed. If EnableMultiAck is true, then all messages up to the last consumed one will be acknowledged

func (*Consumer) Close

func (c *Consumer) Close() error

Close will shutdown the Consumer. Any messages that are still in flight will be Nack'ed.

func (*Consumer) Consume

func (c *Consumer) Consume(out interface{}) error

Consume will consume the next available message. The message must be acknowledged with Ack() or Nack() before the next call to Consume unless EnableMultiAck is true.

func (*Consumer) ConsumeAck

func (c *Consumer) ConsumeAck(out interface{}) error

ConsumeAck will consume the next message and acknowledge that the message has been received. This prevents the message from being redelivered, and no call to Ack() or Nack() is needed.

func (*Consumer) ConsumeTimeout

func (c *Consumer) ConsumeTimeout(out interface{}, timeout time.Duration) error

Consume will consume the next available message or times out waiting. The message must be acknowledged with Ack() or Nack() before the next call to Consume unless EnableMultiAck is true.

func (*Consumer) Nack

func (c *Consumer) Nack() error

Nack will send a negative acknowledgement to the server that the last message returned by Consume was not processed and should be redelivered. If EnableMultiAck is true, then all messages up to the last consumed one will be negatively acknowledged

type GOBSerializer

type GOBSerializer struct{}

GOBSerializer implements the Serializer interface and uses the GOB format

func (*GOBSerializer) ContentType

func (*GOBSerializer) ContentType() string

func (*GOBSerializer) RelayDecode

func (*GOBSerializer) RelayDecode(r io.Reader, o interface{}) error

func (*GOBSerializer) RelayEncode

func (*GOBSerializer) RelayEncode(w io.Writer, e interface{}) error

type JSONSerializer

type JSONSerializer struct{}

JSONSerializer implements the Serializer interface and uses JSON

func (*JSONSerializer) ContentType

func (*JSONSerializer) ContentType() string

func (*JSONSerializer) RelayDecode

func (*JSONSerializer) RelayDecode(r io.Reader, o interface{}) error

func (*JSONSerializer) RelayEncode

func (*JSONSerializer) RelayEncode(w io.Writer, e interface{}) error

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is a type that is used only for publishing messages to a single queue. Multiple Publishers can multiplex a single relay

func (*Publisher) Close

func (p *Publisher) Close() error

Close will shutdown the publisher

func (*Publisher) Publish

func (p *Publisher) Publish(in interface{}) error

Publish will send the message to the server to be consumed

type Relay

type Relay struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func New

func New(c *Config) (*Relay, error)

New will create a new Relay that can be used to create new publishers or consumers. The caller should no longer modify the configuration once New is invoked, nor should it be shared between multiple relays.

func (*Relay) Broker

func (r *Relay) Broker() broker.Broker

Broker is used to wrap a Relay connection in one that is Broker compatible.

func (*Relay) Close

func (r *Relay) Close() error

Close will shutdown the relay. It is best to first Close all the Consumer and Publishers, as this will close the underlying connections.

func (*Relay) Consumer

func (r *Relay) Consumer(queue string) (*Consumer, error)

Consumer will return a new handle that can be used to consume messages from a given queue.

func (*Relay) ConsumerWithRoutingKey

func (r *Relay) ConsumerWithRoutingKey(queue string, routingKey string) (*Consumer, error)

ConsumerWithRoutingKey will return a new handle that can be used to consume messages from a given queue and routing key.

func (*Relay) Publisher

func (r *Relay) Publisher(queue string) (*Publisher, error)

Publisher will return a new handle that can be used to publish messages to the given queue.

func (*Relay) PublisherWithRoutingKey

func (r *Relay) PublisherWithRoutingKey(queue string, routingKey string) (*Publisher, error)

PublisherWithRoutingKey will return a new handle that can be used to publish messages to the given queue and routing key.

func (*Relay) RetryBroker

func (r *Relay) RetryBroker(attempts int, min, max time.Duration) (*retryBroker, error)

RetryBroker returns a new retrying broker with the given settings.

type Serializer

type Serializer interface {
	ContentType() string
	RelayEncode(io.Writer, interface{}) error
	RelayDecode(io.Reader, interface{}) error
}

Serializer interface is used to encode and decode messages. If not provided, a default serializer using gob is provided.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL