amqp

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2017 License: MIT Imports: 17 Imported by: 0

README

pack.ag/amqp

Go Report Card Coverage Status Build Status Build status GoDoc MIT licensed

pack.ag/amqp is an AMQP 1.0 client implementation for Go.

AMQP 1.0 is not compatible with AMQP 0-9-1 or 0-10, which are the most common AMQP protocols in use today.

This project is currently alpha status, though is currently being used by my employer in a pre-production capacity. The current focus is reading from Microsoft Azure's Service Bus. Only receive operations have been implemented. Producer operations will be implemented later.

API is subject to change until 1.0.0. If you choose to use this library, please vendor it.


Example Usage

package mypackage

import (
	"context"
	"fmt"
	"log"

	"pack.ag/amqp"
)

func main() {
	// Create client
	client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net", amqp.ConnSASLPlain("access-key-name", "access-key"))
	if err != nil {
		log.Fatal("Dialing AMQP server:", err)
	}
	defer client.Close()

	// Open a session
	session, err := client.NewSession()
	if err != nil {
		log.Fatal("Creating AMQP session:", err)
	}

	// Create a receiver
	receiver, err := session.NewReceiver(
		amqp.LinkSource("/queue-name"),
		amqp.LinkCredit(10),
	)
	if err != nil {
		fmt.Println(err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	for {
		// Receive next message
		msg, err := receiver.Receive(ctx)
		if err != nil {
			log.Fatal("Reading message from AMQP:", err)
		}

		// Accept message
		msg.Accept()

		fmt.Printf("Message received: %s\n", msg.Data)
	}
}

Notable Bugs/Shortcomings
  • Closing a sessions does not send an end performative.
  • Testing is lacking. Only fuzz testing is currently being performed.
Features - Short Term
  • Set sender filters to support Azure Event Hubs.
Features - Medium Term
  • Support message producer operations.
Other Notes

By default, this package depends only on the standard library. Building with the pkgerrors tag will cause errors to be created/wrapped by the github.com/pkg/errors library. This can be useful for debugging and when used in a project using github.com/pkg/errors.

Documentation

Overview

Package amqp provides an AMQP 1.0 client implementation.

AMQP 1.0 is not compatible with AMQP 0-9-1 or 0-10, which are the most common AMQP protocols in use today.

The example below shows how to use this package to connect to a Microsoft Azure Service Bus queue.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"pack.ag/amqp"
)

func main() {
	// Create client
	client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net", amqp.ConnSASLPlain("access-key-name", "access-key"))
	if err != nil {
		log.Fatal("Dialing AMQP server:", err)
	}
	defer client.Close()

	// Open a session
	session, err := client.NewSession()
	if err != nil {
		log.Fatal("Creating AMQP session:", err)
	}

	// Create a receiver
	receiver, err := session.NewReceiver(
		amqp.LinkSource("/queue-name"),
		amqp.LinkCredit(10),
	)
	if err != nil {
		fmt.Println(err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	for {
		// Receive next message
		msg, err := receiver.Receive(ctx)
		if err != nil {
			log.Fatal("Reading message from AMQP:", err)
		}

		// Accept message
		msg.Accept()

		fmt.Printf("Message received: %s\n", msg.Data)
	}
}
Output:

Index

Examples

Constants

View Source
const (
	DefaultLinkCredit      = 1
	DefaultLinkBatching    = true
	DefaultLinkBatchMaxAge = 5 * time.Second
)

Default link options

View Source
const (
	DefaultMaxFrameSize = 512
	DefaultIdleTimeout  = 1 * time.Minute
)

Default connection options

Variables

View Source
var (
	ErrTimeout = errorNew("timeout waiting for response")
)

Errors

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is an AMQP client connection.

func Dial

func Dial(addr string, opts ...ConnOption) (*Client, error)

Dial connects to an AMQP server.

If the addr includes a scheme, it must be "amqp" or "amqps". TLS will be negotiated when the scheme is "amqps".

If no port is provided, 5672 will be used.

func New

func New(conn net.Conn, opts ...ConnOption) (*Client, error)

New establishes an AMQP client connection over conn.

func (*Client) Close

func (c *Client) Close() error

Close disconnects the connection.

func (*Client) NewSession

func (c *Client) NewSession() (*Session, error)

NewSession opens a new AMQP session to the server.

type ConnOption

type ConnOption func(*conn) error

ConnOption is an function for configuring an AMQP connection.

func ConnConnectTimeout

func ConnConnectTimeout(d time.Duration) ConnOption

ConnConnectTimeout configures how long to wait for the server during connection establishment.

Once the connection has been established, ConnIdleTimeout applies. If duration is zero, no timeout will be applied.

Default: 0.

func ConnIdleTimeout

func ConnIdleTimeout(d time.Duration) ConnOption

ConnIdleTimeout specifies the maximum period between receiving frames from the peer.

Resolution is milliseconds. A value of zero indicates no timeout. This setting is in addition to TCP keepalives.

Default: 1 minute.

func ConnMaxFrameSize

func ConnMaxFrameSize(n uint32) ConnOption

ConnMaxFrameSize sets the maximum frame size that the connection will accept.

Must be 512 or greater.

Default: 512.

func ConnSASLPlain

func ConnSASLPlain(username, password string) ConnOption

ConnSASLPlain enables SASL PLAIN authentication for the connection.

SASL PLAIN transmits credentials in plain text and should only be used on TLS/SSL enabled connection.

func ConnServerHostname

func ConnServerHostname(hostname string) ConnOption

ConnServerHostname sets the hostname sent in the AMQP Open frame and TLS ServerName (if not otherwise set).

This is useful when the AMQP connection will be established via a pre-established TLS connection as the server may not know which hostname the client is attempting to connect to.

func ConnTLS

func ConnTLS(enable bool) ConnOption

ConnTLS toggles TLS negotiation.

Default: false.

func ConnTLSConfig

func ConnTLSConfig(tc *tls.Config) ConnOption

ConnTLSConfig sets the tls.Config to be used during TLS negotiation.

This option is for advanced usage, in most scenarios providing a URL scheme of "amqps://" or ConnTLS(true) is sufficient.

type ErrDetach

type ErrDetach struct {
	RemoteError *Error
}

ErrDetach is returned by a link (Receiver) when a detach frame is received.

RemoteError will be nil if the link was detached gracefully.

func (ErrDetach) Error

func (e ErrDetach) Error() string

type Error

type Error struct {

	// A symbolic value indicating the error condition.
	Condition ErrorCondition

	// descriptive text about the error condition
	//
	// This text supplies any supplementary details not indicated by the condition field.
	// This text can be logged as an aid to resolving issues.
	Description string

	// map carrying information about the error condition
	Info map[string]interface{}
}

Error is an AMQP error.

type ErrorCondition

type ErrorCondition string

ErrorCondition is one of the error conditions defined in the AMQP spec.

const (
	// AMQP Errors
	ErrorInternalError         ErrorCondition = "amqp:internal-error"
	ErrorNotFound              ErrorCondition = "amqp:not-found"
	ErrorUnauthorizedAccess    ErrorCondition = "amqp:unauthorized-access"
	ErrorDecodeError           ErrorCondition = "amqp:decode-error"
	ErrorResourceLimitExceeded ErrorCondition = "amqp:resource-limit-exceeded"
	ErrorNotAllowed            ErrorCondition = "amqp:not-allowed"
	ErrorInvalidField          ErrorCondition = "amqp:invalid-field"
	ErrorNotImplemented        ErrorCondition = "amqp:not-implemented"
	ErrorResourceLocked        ErrorCondition = "amqp:resource-locked"
	ErrorPreconditionFailed    ErrorCondition = "amqp:precondition-failed"
	ErrorResourceDeleted       ErrorCondition = "amqp:resource-deleted"
	ErrorIllegalState          ErrorCondition = "amqp:illegal-state"
	ErrorFrameSizeTooSmall     ErrorCondition = "amqp:frame-size-too-small"

	// Connection Errors
	ErrorConnectionForced   ErrorCondition = "amqp:connection:forced"
	ErrorFramingError       ErrorCondition = "amqp:connection:framing-error"
	ErrorConnectionRedirect ErrorCondition = "amqp:connection:redirect"

	// Session Errors
	ErrorWindowViolation  ErrorCondition = "amqp:session:window-violation"
	ErrorErrantLink       ErrorCondition = "amqp:session:errant-link"
	ErrorHandleInUse      ErrorCondition = "amqp:session:handle-in-use"
	ErrorUnattachedHandle ErrorCondition = "amqp:session:unattached-handle"

	// Link Errors
	ErrorDetachForced          ErrorCondition = "amqp:link:detach-forced"
	ErrorTransferLimitExceeded ErrorCondition = "amqp:link:transfer-limit-exceeded"
	ErrorMessageSizeExceeded   ErrorCondition = "amqp:link:message-size-exceeded"
	ErrorLinkRedirect          ErrorCondition = "amqp:link:redirect"
	ErrorStolen                ErrorCondition = "amqp:link:stolen"
)

Error Conditions

type LinkOption

type LinkOption func(*link) error

LinkOption is an function for configuring an AMQP links.

A link may be a Sender or a Receiver. Only Receiver is currently implemented.

func LinkBatchMaxAge

func LinkBatchMaxAge(d time.Duration) LinkOption

LinkBatchMaxAge sets the maximum time between the start of a disposition batch and sending the batch to the server.

func LinkBatching

func LinkBatching(enable bool) LinkOption

LinkBatching toggles batching of message disposition.

When enabled, accepting a message does not send the disposition to the server until the batch is equal to link credit or the batch max age expires.

func LinkCredit

func LinkCredit(credit uint32) LinkOption

LinkCredit specifies the maximum number of unacknowledged messages the sender can transmit.

func LinkSource

func LinkSource(source string) LinkOption

LinkSource sets the source address.

type Message

type Message struct {
	// The header section carries standard delivery details about the transfer
	// of a message through the AMQP network.
	Header MessageHeader

	// The delivery-annotations section is used for delivery-specific non-standard
	// properties at the head of the message. Delivery annotations convey information
	// from the sending peer to the receiving peer.
	DeliveryAnnotations map[interface{}]interface{}

	// The message-annotations section is used for properties of the message which
	// are aimed at the infrastructure.
	Annotations map[interface{}]interface{}

	// The properties section is used for a defined set of standard properties of
	// the message.
	Properties MessageProperties

	// The application-properties section is a part of the bare message used for
	// structured application data. Intermediaries can use the data within this
	// structure for the purposes of filtering or routing.
	ApplicationProperties map[string]interface{}

	// Message payload.
	Data []byte

	// The footer section is used for details about the message or delivery which
	// can only be calculated or evaluated once the whole bare message has been
	// constructed or seen (for example message hashes, HMACs, signatures and
	// encryption details).
	Footer map[interface{}]interface{}
	// contains filtered or unexported fields
}

Message is an AMQP message.

func (*Message) Accept

func (m *Message) Accept()

Accept notifies the server that the message has been accepted and does not require redelivery.

func (*Message) Reject

func (m *Message) Reject()

Reject notifies the server that the message is invalid.

func (*Message) Release

func (m *Message) Release()

Release releases the message back to the server. The message may be redelivered to this or another consumer.

type MessageHeader

type MessageHeader struct {
	Durable       bool
	Priority      uint8
	TTL           time.Duration // from milliseconds
	FirstAcquirer bool
	DeliveryCount uint32
}

MessageHeader carries standard delivery details about the transfer of a message.

type MessageProperties

type MessageProperties struct {
	MessageID          interface{} // uint64, UUID, []byte, or string
	UserID             []byte
	To                 string
	Subject            string
	ReplyTo            string
	CorrelationID      interface{} // uint64, UUID, []byte, or string
	ContentType        string
	ContentEncoding    string
	AbsoluteExpiryTime time.Time
	CreationTime       time.Time
	GroupID            string
	GroupSequence      uint32 // RFC-1982 sequence number
	ReplyToGroupID     string
}

MessageProperties is the defined set of properties for AMQP messages.

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver receives messages on a single AMQP link.

func (*Receiver) Close

func (r *Receiver) Close() error

Close closes the Receiver and AMQP link.

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context) (*Message, error)

Receive returns the next message from the sender.

Blocks until a message is received, ctx completes, or an error occurs.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session is an AMQP session.

A session multiplexes Receivers.

func (*Session) Close

func (s *Session) Close() error

Close closes the session.

func (*Session) NewReceiver

func (s *Session) NewReceiver(opts ...LinkOption) (*Receiver, error)

NewReceiver opens a new receiver link on the session.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL