Documentation ¶
Overview ¶
Package amqp provides an AMQP 1.0 client implementation.
AMQP 1.0 is not compatible with AMQP 0-9-1 or 0-10, which are the most common AMQP protocols in use today.
The example below shows how to use this package to connect to a Microsoft Azure Service Bus queue.
Example ¶
package main import ( "context" "fmt" "log" "pack.ag/amqp" ) func main() { // Create client client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net", amqp.ConnSASLPlain("access-key-name", "access-key")) if err != nil { log.Fatal("Dialing AMQP server:", err) } defer client.Close() // Open a session session, err := client.NewSession() if err != nil { log.Fatal("Creating AMQP session:", err) } // Create a receiver receiver, err := session.NewReceiver( amqp.LinkSource("/queue-name"), amqp.LinkCredit(10), ) if err != nil { fmt.Println(err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() for { // Receive next message msg, err := receiver.Receive(ctx) if err != nil { log.Fatal("Reading message from AMQP:", err) } // Accept message msg.Accept() fmt.Printf("Message received: %s\n", msg.Data) } }
Output:
Index ¶
- Constants
- Variables
- type Client
- type ConnOption
- func ConnConnectTimeout(d time.Duration) ConnOption
- func ConnIdleTimeout(d time.Duration) ConnOption
- func ConnMaxFrameSize(n uint32) ConnOption
- func ConnSASLPlain(username, password string) ConnOption
- func ConnServerHostname(hostname string) ConnOption
- func ConnTLS(enable bool) ConnOption
- func ConnTLSConfig(tc *tls.Config) ConnOption
- type ErrDetach
- type Error
- type ErrorCondition
- type LinkOption
- type Message
- type MessageHeader
- type MessageProperties
- type Receiver
- type Session
Examples ¶
Constants ¶
const ( DefaultLinkCredit = 1 DefaultLinkBatching = true DefaultLinkBatchMaxAge = 5 * time.Second )
Default link options
const ( DefaultMaxFrameSize = 512 DefaultIdleTimeout = 1 * time.Minute )
Default connection options
Variables ¶
var (
ErrTimeout = errorNew("timeout waiting for response")
)
Errors
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is an AMQP client connection.
func Dial ¶
func Dial(addr string, opts ...ConnOption) (*Client, error)
Dial connects to an AMQP server.
If the addr includes a scheme, it must be "amqp" or "amqps". TLS will be negotiated when the scheme is "amqps".
If no port is provided, 5672 will be used.
func New ¶
func New(conn net.Conn, opts ...ConnOption) (*Client, error)
New establishes an AMQP client connection over conn.
func (*Client) NewSession ¶
NewSession opens a new AMQP session to the server.
type ConnOption ¶
type ConnOption func(*conn) error
ConnOption is an function for configuring an AMQP connection.
func ConnConnectTimeout ¶
func ConnConnectTimeout(d time.Duration) ConnOption
ConnConnectTimeout configures how long to wait for the server during connection establishment.
Once the connection has been established, ConnIdleTimeout applies. If duration is zero, no timeout will be applied.
Default: 0.
func ConnIdleTimeout ¶
func ConnIdleTimeout(d time.Duration) ConnOption
ConnIdleTimeout specifies the maximum period between receiving frames from the peer.
Resolution is milliseconds. A value of zero indicates no timeout. This setting is in addition to TCP keepalives.
Default: 1 minute.
func ConnMaxFrameSize ¶
func ConnMaxFrameSize(n uint32) ConnOption
ConnMaxFrameSize sets the maximum frame size that the connection will accept.
Must be 512 or greater.
Default: 512.
func ConnSASLPlain ¶
func ConnSASLPlain(username, password string) ConnOption
ConnSASLPlain enables SASL PLAIN authentication for the connection.
SASL PLAIN transmits credentials in plain text and should only be used on TLS/SSL enabled connection.
func ConnServerHostname ¶
func ConnServerHostname(hostname string) ConnOption
ConnServerHostname sets the hostname sent in the AMQP Open frame and TLS ServerName (if not otherwise set).
This is useful when the AMQP connection will be established via a pre-established TLS connection as the server may not know which hostname the client is attempting to connect to.
func ConnTLS ¶
func ConnTLS(enable bool) ConnOption
ConnTLS toggles TLS negotiation.
Default: false.
func ConnTLSConfig ¶
func ConnTLSConfig(tc *tls.Config) ConnOption
ConnTLSConfig sets the tls.Config to be used during TLS negotiation.
This option is for advanced usage, in most scenarios providing a URL scheme of "amqps://" or ConnTLS(true) is sufficient.
type ErrDetach ¶
type ErrDetach struct {
RemoteError *Error
}
ErrDetach is returned by a link (Receiver) when a detach frame is received.
RemoteError will be nil if the link was detached gracefully.
type Error ¶
type Error struct { // A symbolic value indicating the error condition. Condition ErrorCondition // descriptive text about the error condition // // This text supplies any supplementary details not indicated by the condition field. // This text can be logged as an aid to resolving issues. Description string // map carrying information about the error condition Info map[string]interface{} }
Error is an AMQP error.
type ErrorCondition ¶
type ErrorCondition string
ErrorCondition is one of the error conditions defined in the AMQP spec.
const ( // AMQP Errors ErrorInternalError ErrorCondition = "amqp:internal-error" ErrorNotFound ErrorCondition = "amqp:not-found" ErrorDecodeError ErrorCondition = "amqp:decode-error" ErrorResourceLimitExceeded ErrorCondition = "amqp:resource-limit-exceeded" ErrorNotAllowed ErrorCondition = "amqp:not-allowed" ErrorInvalidField ErrorCondition = "amqp:invalid-field" ErrorNotImplemented ErrorCondition = "amqp:not-implemented" ErrorResourceLocked ErrorCondition = "amqp:resource-locked" ErrorPreconditionFailed ErrorCondition = "amqp:precondition-failed" ErrorResourceDeleted ErrorCondition = "amqp:resource-deleted" ErrorIllegalState ErrorCondition = "amqp:illegal-state" ErrorFrameSizeTooSmall ErrorCondition = "amqp:frame-size-too-small" // Connection Errors ErrorConnectionForced ErrorCondition = "amqp:connection:forced" ErrorFramingError ErrorCondition = "amqp:connection:framing-error" ErrorConnectionRedirect ErrorCondition = "amqp:connection:redirect" // Session Errors ErrorWindowViolation ErrorCondition = "amqp:session:window-violation" ErrorErrantLink ErrorCondition = "amqp:session:errant-link" ErrorHandleInUse ErrorCondition = "amqp:session:handle-in-use" ErrorUnattachedHandle ErrorCondition = "amqp:session:unattached-handle" // Link Errors ErrorDetachForced ErrorCondition = "amqp:link:detach-forced" ErrorTransferLimitExceeded ErrorCondition = "amqp:link:transfer-limit-exceeded" ErrorMessageSizeExceeded ErrorCondition = "amqp:link:message-size-exceeded" ErrorLinkRedirect ErrorCondition = "amqp:link:redirect" ErrorStolen ErrorCondition = "amqp:link:stolen" )
Error Conditions
type LinkOption ¶
type LinkOption func(*link) error
LinkOption is an function for configuring an AMQP links.
A link may be a Sender or a Receiver. Only Receiver is currently implemented.
func LinkBatchMaxAge ¶
func LinkBatchMaxAge(d time.Duration) LinkOption
LinkBatchMaxAge sets the maximum time between the start of a disposition batch and sending the batch to the server.
func LinkBatching ¶
func LinkBatching(enable bool) LinkOption
LinkBatching toggles batching of message disposition.
When enabled, accepting a message does not send the disposition to the server until the batch is equal to link credit or the batch max age expires.
func LinkCredit ¶
func LinkCredit(credit uint32) LinkOption
LinkCredit specifies the maximum number of unacknowledged messages the sender can transmit.
type Message ¶
type Message struct { // The header section carries standard delivery details about the transfer // of a message through the AMQP network. Header MessageHeader // The delivery-annotations section is used for delivery-specific non-standard // properties at the head of the message. Delivery annotations convey information // from the sending peer to the receiving peer. DeliveryAnnotations map[interface{}]interface{} // The message-annotations section is used for properties of the message which // are aimed at the infrastructure. Annotations map[interface{}]interface{} // The properties section is used for a defined set of standard properties of // the message. Properties MessageProperties // The application-properties section is a part of the bare message used for // structured application data. Intermediaries can use the data within this // structure for the purposes of filtering or routing. ApplicationProperties map[string]interface{} // Message payload. Data []byte // can only be calculated or evaluated once the whole bare message has been // constructed or seen (for example message hashes, HMACs, signatures and // encryption details). Footer map[interface{}]interface{} // contains filtered or unexported fields }
Message is an AMQP message.
func (*Message) Accept ¶
func (m *Message) Accept()
Accept notifies the server that the message has been accepted and does not require redelivery.
type MessageHeader ¶
type MessageHeader struct { Durable bool Priority uint8 TTL time.Duration // from milliseconds FirstAcquirer bool DeliveryCount uint32 }
MessageHeader carries standard delivery details about the transfer of a message.
type MessageProperties ¶
type MessageProperties struct { MessageID interface{} // uint64, UUID, []byte, or string UserID []byte To string Subject string ReplyTo string CorrelationID interface{} // uint64, UUID, []byte, or string ContentType string ContentEncoding string AbsoluteExpiryTime time.Time CreationTime time.Time GroupID string GroupSequence uint32 // RFC-1982 sequence number ReplyToGroupID string }
MessageProperties is the defined set of properties for AMQP messages.
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver receives messages on a single AMQP link.
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session is an AMQP session.
A session multiplexes Receivers.
func (*Session) NewReceiver ¶
func (s *Session) NewReceiver(opts ...LinkOption) (*Receiver, error)
NewReceiver opens a new receiver link on the session.