Documentation ¶
Overview ¶
Package amqp provides an AMQP 1.0 client implementation.
AMQP 1.0 is not compatible with AMQP 0-9-1 or 0-10, which are the most common AMQP protocols in use today.
The example below shows how to use this package to connect to a Microsoft Azure Service Bus queue.
Example ¶
package main import ( "context" "fmt" "log" "time" "pack.ag/amqp" ) func main() { // Create client client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net", amqp.ConnSASLPlain("access-key-name", "access-key"), ) if err != nil { log.Fatal("Dialing AMQP server:", err) } defer client.Close() // Open a session session, err := client.NewSession() if err != nil { log.Fatal("Creating AMQP session:", err) } ctx := context.Background() // Send a message { // Create a sender sender, err := session.NewSender( amqp.LinkTargetAddress("/queue-name"), ) if err != nil { log.Fatal("Creating sender link:", err) } ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // Send message err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!"))) if err != nil { log.Fatal("Sending message:", err) } sender.Close(ctx) cancel() } // Continuously read messages { // Create a receiver receiver, err := session.NewReceiver( amqp.LinkSourceAddress("/queue-name"), amqp.LinkCredit(10), ) if err != nil { log.Fatal("Creating receiver link:", err) } defer func() { ctx, cancel := context.WithTimeout(ctx, 1*time.Second) receiver.Close(ctx) cancel() }() for { // Receive next message msg, err := receiver.Receive(ctx) if err != nil { log.Fatal("Reading message from AMQP:", err) } // Accept message msg.Accept() fmt.Printf("Message received: %s\n", msg.GetData()) } } }
Output:
Index ¶
- Constants
- Variables
- type Annotations
- type ArrayUByte
- type Client
- type ConnOption
- func ConnConnectTimeout(d time.Duration) ConnOption
- func ConnIdleTimeout(d time.Duration) ConnOption
- func ConnMaxFrameSize(n uint32) ConnOption
- func ConnMaxSessions(n int) ConnOption
- func ConnProperty(key, value string) ConnOption
- func ConnSASLAnonymous() ConnOption
- func ConnSASLPlain(username, password string) ConnOption
- func ConnServerHostname(hostname string) ConnOption
- func ConnTLS(enable bool) ConnOption
- func ConnTLSConfig(tc *tls.Config) ConnOption
- type DetachError
- type Error
- type ErrorCondition
- type LinkOption
- func LinkAddress(source string) LinkOptiondeprecated
- func LinkAddressDynamic() LinkOption
- func LinkBatchMaxAge(d time.Duration) LinkOption
- func LinkBatching(enable bool) LinkOption
- func LinkCredit(credit uint32) LinkOption
- func LinkMaxMessageSize(size uint64) LinkOption
- func LinkProperty(key, value string) LinkOption
- func LinkPropertyInt64(key string, value int64) LinkOption
- func LinkReceiverSettle(mode ReceiverSettleMode) LinkOption
- func LinkSelectorFilter(filter string) LinkOption
- func LinkSenderSettle(mode SenderSettleMode) LinkOption
- func LinkSessionFilter(sessionID string) LinkOption
- func LinkSourceAddress(addr string) LinkOption
- func LinkTargetAddress(addr string) LinkOption
- type Message
- type MessageHeader
- type MessageProperties
- type Receiver
- type ReceiverSettleMode
- type Sender
- type SenderSettleMode
- type Session
- type SessionOption
- type UUID
Examples ¶
Constants ¶
const ( DefaultMaxLinks = 4294967296 DefaultWindow = 100 )
Default session options
const ( DefaultLinkCredit = 1 DefaultLinkBatching = true DefaultLinkBatchMaxAge = 5 * time.Second )
Default link options
const ( DefaultIdleTimeout = 1 * time.Minute DefaultMaxFrameSize = 512 DefaultMaxSessions = 65536 )
Default connection options
Variables ¶
var ( // ErrSessionClosed is propagated to Sender/Receivers // when Session.Close() is called. ErrSessionClosed = errors.New("amqp: session closed") // ErrLinkClosed returned by send and receive operations when // Sender.Close() or Receiver.Close() are called. ErrLinkClosed = errors.New("amqp: link closed") )
var ( ErrTimeout = errors.New("amqp: timeout waiting for response") // ErrConnClosed is propagated to Session and Senders/Receivers // when Client.Close() is called. ErrConnClosed = errors.New("amqp: connection closed") )
Errors
Functions ¶
This section is empty.
Types ¶
type Annotations ¶ added in v0.4.0
type Annotations map[interface{}]interface{}
Annotations keys must be of type string, int, or int64.
String keys are encoded as AMQP Symbols.
type ArrayUByte ¶ added in v0.4.0
type ArrayUByte []uint8
ArrayUByte allows encoding []uint8/[]byte as an array rather than binary data.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is an AMQP client connection.
func Dial ¶
func Dial(addr string, opts ...ConnOption) (*Client, error)
Dial connects to an AMQP server.
If the addr includes a scheme, it must be "amqp" or "amqps". If no port is provided, 5672 will be used for "amqp" and 5671 for "amqps".
If username and password information is not empty it's used as SASL PLAIN credentials, equal to passing ConnSASLPlain option.
func New ¶
func New(conn net.Conn, opts ...ConnOption) (*Client, error)
New establishes an AMQP client connection over conn.
func (*Client) NewSession ¶
func (c *Client) NewSession(opts ...SessionOption) (*Session, error)
NewSession opens a new AMQP session to the server.
type ConnOption ¶
type ConnOption func(*conn) error
ConnOption is a function for configuring an AMQP connection.
func ConnConnectTimeout ¶
func ConnConnectTimeout(d time.Duration) ConnOption
ConnConnectTimeout configures how long to wait for the server during connection establishment.
Once the connection has been established, ConnIdleTimeout applies. If duration is zero, no timeout will be applied.
Default: 0.
func ConnIdleTimeout ¶
func ConnIdleTimeout(d time.Duration) ConnOption
ConnIdleTimeout specifies the maximum period between receiving frames from the peer.
Resolution is milliseconds. A value of zero indicates no timeout. This setting is in addition to TCP keepalives.
Default: 1 minute.
func ConnMaxFrameSize ¶
func ConnMaxFrameSize(n uint32) ConnOption
ConnMaxFrameSize sets the maximum frame size that the connection will accept.
Must be 512 or greater.
Default: 512.
func ConnMaxSessions ¶ added in v0.4.0
func ConnMaxSessions(n int) ConnOption
ConnMaxSessions sets the maximum number of channels.
n must be in the range 1 to 65536.
Default: 65536.
func ConnProperty ¶ added in v0.4.0
func ConnProperty(key, value string) ConnOption
ConnProperty sets an entry in the connection properties map sent to the server.
This option can be used multiple times.
func ConnSASLAnonymous ¶ added in v0.2.0
func ConnSASLAnonymous() ConnOption
ConnSASLAnonymous enables SASL ANONYMOUS authentication for the connection.
func ConnSASLPlain ¶
func ConnSASLPlain(username, password string) ConnOption
ConnSASLPlain enables SASL PLAIN authentication for the connection.
SASL PLAIN transmits credentials in plain text and should only be used on TLS/SSL enabled connection.
func ConnServerHostname ¶
func ConnServerHostname(hostname string) ConnOption
ConnServerHostname sets the hostname sent in the AMQP Open frame and TLS ServerName (if not otherwise set).
This is useful when the AMQP connection will be established via a pre-established TLS connection as the server may not know which hostname the client is attempting to connect to.
func ConnTLS ¶
func ConnTLS(enable bool) ConnOption
ConnTLS toggles TLS negotiation.
Default: false.
func ConnTLSConfig ¶
func ConnTLSConfig(tc *tls.Config) ConnOption
ConnTLSConfig sets the tls.Config to be used during TLS negotiation.
This option is for advanced usage, in most scenarios providing a URL scheme of "amqps://" or ConnTLS(true) is sufficient.
type DetachError ¶ added in v0.2.0
type DetachError struct {
RemoteError *Error
}
DetachError is returned by a link (Receiver/Sender) when a detach frame is received.
RemoteError will be nil if the link was detached gracefully.
func (DetachError) Error ¶ added in v0.2.0
func (e DetachError) Error() string
type Error ¶
type Error struct { // A symbolic value indicating the error condition. Condition ErrorCondition // descriptive text about the error condition // // This text supplies any supplementary details not indicated by the condition field. // This text can be logged as an aid to resolving issues. Description string // map carrying information about the error condition Info map[string]interface{} }
Error is an AMQP error.
type ErrorCondition ¶
type ErrorCondition string
ErrorCondition is one of the error conditions defined in the AMQP spec.
const ( // AMQP Errors ErrorInternalError ErrorCondition = "amqp:internal-error" ErrorNotFound ErrorCondition = "amqp:not-found" ErrorDecodeError ErrorCondition = "amqp:decode-error" ErrorResourceLimitExceeded ErrorCondition = "amqp:resource-limit-exceeded" ErrorNotAllowed ErrorCondition = "amqp:not-allowed" ErrorInvalidField ErrorCondition = "amqp:invalid-field" ErrorNotImplemented ErrorCondition = "amqp:not-implemented" ErrorResourceLocked ErrorCondition = "amqp:resource-locked" ErrorPreconditionFailed ErrorCondition = "amqp:precondition-failed" ErrorResourceDeleted ErrorCondition = "amqp:resource-deleted" ErrorIllegalState ErrorCondition = "amqp:illegal-state" ErrorFrameSizeTooSmall ErrorCondition = "amqp:frame-size-too-small" // Connection Errors ErrorConnectionForced ErrorCondition = "amqp:connection:forced" ErrorFramingError ErrorCondition = "amqp:connection:framing-error" ErrorConnectionRedirect ErrorCondition = "amqp:connection:redirect" // Session Errors ErrorWindowViolation ErrorCondition = "amqp:session:window-violation" ErrorErrantLink ErrorCondition = "amqp:session:errant-link" ErrorHandleInUse ErrorCondition = "amqp:session:handle-in-use" ErrorUnattachedHandle ErrorCondition = "amqp:session:unattached-handle" // Link Errors ErrorDetachForced ErrorCondition = "amqp:link:detach-forced" ErrorTransferLimitExceeded ErrorCondition = "amqp:link:transfer-limit-exceeded" ErrorMessageSizeExceeded ErrorCondition = "amqp:link:message-size-exceeded" ErrorLinkRedirect ErrorCondition = "amqp:link:redirect" ErrorStolen ErrorCondition = "amqp:link:stolen" )
Error Conditions
type LinkOption ¶
type LinkOption func(*link) error
LinkOption is a function for configuring an AMQP link.
A link may be a Sender or a Receiver.
func LinkAddress
deprecated
added in
v0.2.0
func LinkAddress(source string) LinkOption
LinkAddress sets the link address.
For a Receiver this configures the source address. For a Sender this configures the target address.
Deprecated: use LinkSourceAddress or LinkTargetAddress instead.
func LinkAddressDynamic ¶ added in v0.2.0
func LinkAddressDynamic() LinkOption
LinkAddressDynamic requests a dynamically created address from the server.
func LinkBatchMaxAge ¶
func LinkBatchMaxAge(d time.Duration) LinkOption
LinkBatchMaxAge sets the maximum time between the start of a disposition batch and sending the batch to the server.
func LinkBatching ¶
func LinkBatching(enable bool) LinkOption
LinkBatching toggles batching of message disposition.
When enabled, accepting a message does not send the disposition to the server until the batch is equal to link credit or the batch max age expires.
func LinkCredit ¶
func LinkCredit(credit uint32) LinkOption
LinkCredit specifies the maximum number of unacknowledged messages the sender can transmit.
func LinkMaxMessageSize ¶ added in v0.4.2
func LinkMaxMessageSize(size uint64) LinkOption
LinkMaxMessageSize sets the maximum message size that can be sent or received on the link.
A size of zero indicates no limit.
Default: 0.
func LinkProperty ¶ added in v0.4.1
func LinkProperty(key, value string) LinkOption
LinkProperty sets an entry in the link properties map sent to the server.
This option can be used multiple times.
func LinkPropertyInt64 ¶ added in v0.4.1
func LinkPropertyInt64(key string, value int64) LinkOption
LinkPropertyInt64 sets an entry in the link properties map sent to the server.
This option can be used multiple times.
func LinkReceiverSettle ¶ added in v0.2.0
func LinkReceiverSettle(mode ReceiverSettleMode) LinkOption
LinkReceiverSettle sets the receiver settlement mode.
When the Link is the Sender, this is a request to the remote server.
When the Link is the Receiver, this is the actual settlement mode.
func LinkSelectorFilter ¶ added in v0.3.0
func LinkSelectorFilter(filter string) LinkOption
LinkSelectorFilter sets a selector filter (apache.org:selector-filter:string) on the link source.
func LinkSenderSettle ¶ added in v0.2.0
func LinkSenderSettle(mode SenderSettleMode) LinkOption
LinkSenderSettle sets the sender settlement mode.
When the Link is the Receiver, this is a request to the remote server.
When the Link is the Sender, this is the actual settlement mode.
func LinkSessionFilter ¶ added in v0.4.2
func LinkSessionFilter(sessionID string) LinkOption
LinkSessionFilter sets a session filter (com.microsoft:session-filter) on the link source. This is used in Azure Service Bus to filter messages by session ID on a receiving link.
func LinkSourceAddress ¶ added in v0.4.0
func LinkSourceAddress(addr string) LinkOption
LinkSourceAddress sets the source address.
func LinkTargetAddress ¶ added in v0.4.0
func LinkTargetAddress(addr string) LinkOption
LinkTargetAddress sets the target address.
type Message ¶
type Message struct { // Message format code. // // The upper three octets of a message format code identify a particular message // format. The lowest octet indicates the version of said message format. Any // given version of a format is forwards compatible with all higher versions. Format uint32 // The header section carries standard delivery details about the transfer // of a message through the AMQP network. Header *MessageHeader // The delivery-annotations section is used for delivery-specific non-standard // properties at the head of the message. Delivery annotations convey information // from the sending peer to the receiving peer. DeliveryAnnotations Annotations // The message-annotations section is used for properties of the message which // are aimed at the infrastructure. Annotations Annotations // The properties section is used for a defined set of standard properties of // the message. Properties *MessageProperties // The application-properties section is a part of the bare message used for // structured application data. Intermediaries can use the data within this // structure for the purposes of filtering or routing. ApplicationProperties map[string]interface{} // Data payloads. Data [][]byte // Value payload. Value interface{} // can only be calculated or evaluated once the whole bare message has been // constructed or seen (for example message hashes, HMACs, signatures and // encryption details). Footer Annotations // contains filtered or unexported fields }
Message is an AMQP message.
func NewMessage ¶ added in v0.4.0
NewMessage returns a *Message with data as the payload.
This constructor is intended as a helper for basic Messages with a single data payload. It is valid to construct a Message directly for more complex usages.
func (*Message) Accept ¶
Accept notifies the server that the message has been accepted and does not require redelivery.
func (*Message) GetData ¶ added in v0.4.0
GetData returns the first []byte from the Data field or nil if Data is empty.
func (*Message) MarshalBinary ¶ added in v0.4.1
MarshalBinary encodes the message into binary form
func (*Message) Modify ¶ added in v0.5.0
func (m *Message) Modify(deliveryFailed, undeliverableHere bool, messageAnnotations Annotations) error
Modify notifies the server that the message was not acted upon and should be modifed.
deliveryFailed indicates that the server must consider this and unsuccessful delivery attempt and increment the delivery count.
undeliverableHere indicates that the server must not redeliver the message to this link.
messageAnnotations is an optional annotation map to be merged with the existing message annotations, overwriting existing keys if necessary.
type MessageHeader ¶
type MessageHeader struct { Durable bool Priority uint8 TTL time.Duration // from milliseconds FirstAcquirer bool DeliveryCount uint32 }
MessageHeader carries standard delivery details about the transfer of a message.
type MessageProperties ¶
type MessageProperties struct { // Message-id, if set, uniquely identifies a message within the message system. // The message producer is usually responsible for setting the message-id in // such a way that it is assured to be globally unique. A broker MAY discard a // message as a duplicate if the value of the message-id matches that of a // previously received message sent to the same node. MessageID interface{} // uint64, UUID, []byte, or string // The identity of the user responsible for producing the message. // The client sets this value, and it MAY be authenticated by intermediaries. UserID []byte // The to field identifies the node that is the intended destination of the message. // On any given transfer this might not be the node at the receiving end of the link. To string // A common field for summary information about the message content and purpose. Subject string // The address of the node to send replies to. ReplyTo string // This is a client-specific id that can be used to mark or identify messages // between clients. CorrelationID interface{} // uint64, UUID, []byte, or string // The RFC-2046 [RFC2046] MIME type for the message's application-data section // (body). As per RFC-2046 [RFC2046] this can contain a charset parameter defining // the character encoding used: e.g., 'text/plain; charset="utf-8"'. // // For clarity, as per section 7.2.1 of RFC-2616 [RFC2616], where the content type // is unknown the content-type SHOULD NOT be set. This allows the recipient the // opportunity to determine the actual type. Where the section is known to be truly // opaque binary data, the content-type SHOULD be set to application/octet-stream. // // When using an application-data section with a section code other than data, // content-type SHOULD NOT be set. ContentType string // The content-encoding property is used as a modifier to the content-type. // When present, its value indicates what additional content encodings have been // applied to the application-data, and thus what decoding mechanisms need to be // applied in order to obtain the media-type referenced by the content-type header // field. // // Content-encoding is primarily used to allow a document to be compressed without // losing the identity of its underlying content type. // // Content-encodings are to be interpreted as per section 3.5 of RFC 2616 [RFC2616]. // Valid content-encodings are registered at IANA [IANAHTTPPARAMS]. // // The content-encoding MUST NOT be set when the application-data section is other // than data. The binary representation of all other application-data section types // is defined completely in terms of the AMQP type system. // // Implementations MUST NOT use the identity encoding. Instead, implementations // SHOULD NOT set this property. Implementations SHOULD NOT use the compress encoding, // except as to remain compatible with messages originally sent with other protocols, // e.g. HTTP or SMTP. // // Implementations SHOULD NOT specify multiple content-encoding values except as to // be compatible with messages originally sent with other protocols, e.g. HTTP or SMTP. ContentEncoding string // An absolute time when this message is considered to be expired. AbsoluteExpiryTime time.Time // An absolute time when this message was created. CreationTime time.Time // Identifies the group the message belongs to. GroupID string // The relative position of this message within its group. GroupSequence uint32 // RFC-1982 sequence number // This is a client-specific id that is used so that client can send replies to this // message to a specific group. ReplyToGroupID string }
MessageProperties is the defined set of properties for AMQP messages.
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver receives messages on a single AMQP link.
type ReceiverSettleMode ¶ added in v0.2.0
type ReceiverSettleMode uint8
ReceiverSettleMode specifies how the receiver will settle messages.
const ( // Receiver will spontaneously settle all incoming transfers. ModeFirst ReceiverSettleMode = 0 // Receiver will only settle after sending the disposition to the // sender and receiving a disposition indicating settlement of // the delivery from the sender. ModeSecond ReceiverSettleMode = 1 )
Receiver Settlement Modes
func (*ReceiverSettleMode) String ¶ added in v0.2.0
func (m *ReceiverSettleMode) String() string
type Sender ¶ added in v0.2.0
type Sender struct {
// contains filtered or unexported fields
}
Sender sends messages on a single AMQP link.
func (*Sender) Send ¶ added in v0.2.0
Send sends a Message.
Blocks until the message is sent, ctx completes, or an error occurs.
Send is safe for concurrent use. Since only a single message can be sent on a link at a time, this is most useful when settlement confirmation has been requested (receiver settle mode is "Second"). In this case, additional messages can be sent while the current goroutine is waiting for the confirmation.
type SenderSettleMode ¶ added in v0.2.0
type SenderSettleMode uint8
SenderSettleMode specifies how the sender will settle messages.
const ( // Sender will send all deliveries initially unsettled to the receiver. ModeUnsettled SenderSettleMode = 0 // Sender will send all deliveries settled to the receiver. ModeSettled SenderSettleMode = 1 // Sender MAY send a mixture of settled and unsettled deliveries to the receiver. ModeMixed SenderSettleMode = 2 )
Sender Settlement Modes
func (*SenderSettleMode) String ¶ added in v0.2.0
func (m *SenderSettleMode) String() string
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session is an AMQP session.
A session multiplexes Receivers.
func (*Session) Close ¶
Close gracefully closes the session.
If ctx expires while waiting for servers response, ctx.Err() will be returned. The session will continue to wait for the response until the Client is closed.
func (*Session) NewReceiver ¶
func (s *Session) NewReceiver(opts ...LinkOption) (*Receiver, error)
NewReceiver opens a new receiver link on the session.
type SessionOption ¶ added in v0.4.2
SessionOption is an function for configuring an AMQP session.
func SessionIncomingWindow ¶ added in v0.4.2
func SessionIncomingWindow(window uint32) SessionOption
SessionIncomingWindow sets the maximum number of unacknowledged transfer frames the server can send.
func SessionMaxLinks ¶ added in v0.4.2
func SessionMaxLinks(n int) SessionOption
SessionMaxLinks sets the maximum number of links (Senders/Receivers) allowed on the session.
n must be in the range 1 to 4294967296.
Default: 4294967296.
func SessionOutgoingWindow ¶ added in v0.4.2
func SessionOutgoingWindow(window uint32) SessionOption
SessionOutgoingWindow sets the maximum number of unacknowledged transfer frames the client can send.