amqp

package
v1.0.2-beta.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2022 License: MIT, MIT Imports: 19 Imported by: 0

Documentation

Overview

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Copyright (C) 2017 Kale Blankenship Portions Copyright (c) Microsoft Corporation

Index

Constants

View Source
const (
	// Sender will send all deliveries initially unsettled to the receiver.
	ModeUnsettled = encoding.ModeUnsettled

	// Sender will send all deliveries settled to the receiver.
	ModeSettled = encoding.ModeSettled

	// Sender MAY send a mixture of settled and unsettled deliveries to the receiver.
	ModeMixed = encoding.ModeMixed
)

Sender Settlement Modes

View Source
const (
	// Receiver will spontaneously settle all incoming transfers.
	ModeFirst = encoding.ModeFirst

	// Receiver will only settle after sending the disposition to the
	// sender and receiving a disposition indicating settlement of
	// the delivery from the sender.
	ModeSecond = encoding.ModeSecond
)

Receiver Settlement Modes

View Source
const (
	// No terminus state is retained durably.
	DurabilityNone = encoding.DurabilityNone

	// Only the existence and configuration of the terminus is
	// retained durably.
	DurabilityConfiguration = encoding.DurabilityConfiguration

	// In addition to the existence and configuration of the
	// terminus, the unsettled state for durable messages is
	// retained durably.
	DurabilityUnsettledState = encoding.DurabilityUnsettledState
)

Durability Policies

View Source
const (
	// The expiry timer starts when terminus is detached.
	ExpiryLinkDetach = encoding.ExpiryLinkDetach

	// The expiry timer starts when the most recently
	// associated session is ended.
	ExpirySessionEnd = encoding.ExpirySessionEnd

	// The expiry timer starts when most recently associated
	// connection is closed.
	ExpiryConnectionClose = encoding.ExpiryConnectionClose

	// The terminus never expires.
	ExpiryNever = encoding.ExpiryNever
)

Expiry Policies

Variables

View Source
var (
	// ErrSessionClosed is propagated to Sender/Receivers
	// when Session.Close() is called.
	ErrSessionClosed = errors.New("amqp: session closed")

	// ErrLinkClosed is returned by send and receive operations when
	// Sender.Close() or Receiver.Close() are called.
	ErrLinkClosed = errors.New("amqp: link closed")
)

Errors

Functions

This section is empty.

Types

type Address added in v1.1.0

type Address = string

AMQPAddress corresponds to the 'address' type in the AMQP spec. <type name="address-string" class="restricted" source="string" provides="address"/>

type Annotations

type Annotations = encoding.Annotations

Annotations keys must be of type string, int, or int64.

String keys are encoded as AMQP Symbols.

type Binary added in v1.1.0

type Binary = []byte

AMQPBinary corresponds to the `binary` type in the AMQP spec.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is an AMQP client connection.

func Dial

func Dial(addr string, opts *ConnOptions) (*Client, error)

Dial connects to an AMQP server.

If the addr includes a scheme, it must be "amqp", "amqps", or "amqp+ssl". If no port is provided, 5672 will be used for "amqp" and 5671 for "amqps" or "amqp+ssl".

If username and password information is not empty it's used as SASL PLAIN credentials, equal to passing ConnSASLPlain option.

opts: pass nil to accept the default values.

func New

func New(conn net.Conn, opts *ConnOptions) (*Client, error)

New establishes an AMQP client connection over conn. opts: pass nil to accept the default values.

func (*Client) Close

func (c *Client) Close() error

Close disconnects the connection.

func (*Client) NewSession

func (c *Client) NewSession(ctx context.Context, opts *SessionOptions) (*Session, error)

NewSession opens a new AMQP session to the server. Returns ErrConnClosed if the underlying connection has been closed. opts: pass nil to accept the default values.

type ConnOptions added in v1.1.0

type ConnOptions struct {
	// ContainerID sets the container-id to use when opening the connection.
	//
	// A container ID will be randomly generated if this option is not used.
	ContainerID string

	// HostName sets the hostname sent in the AMQP
	// Open frame and TLS ServerName (if not otherwise set).
	HostName string

	// IdleTimeout specifies the maximum period in milliseconds between
	// receiving frames from the peer.
	//
	// Specify a value less than zero to disable idle timeout.
	//
	// Default: 1 minute.
	IdleTimeout time.Duration

	// MaxFrameSize sets the maximum frame size that
	// the connection will accept.
	//
	// Must be 512 or greater.
	//
	// Default: 512.
	MaxFrameSize uint32

	// MaxSessions sets the maximum number of channels.
	// The value must be greater than zero.
	//
	// Default: 65535.
	MaxSessions uint16

	// Properties sets an entry in the connection properties map sent to the server.
	Properties map[string]interface{}

	// SASLType contains the specified SASL authentication mechanism.
	SASLType SASLType

	// Timeout configures how long to wait for the
	// server during connection establishment.
	//
	// Once the connection has been established, IdleTimeout
	// applies. If duration is zero, no timeout will be applied.
	//
	// Default: 0.
	Timeout time.Duration

	// TLSConfig sets the tls.Config to be used during
	// TLS negotiation.
	//
	// This option is for advanced usage, in most scenarios
	// providing a URL scheme of "amqps://" is sufficient.
	TLSConfig *tls.Config
	// contains filtered or unexported fields
}

ConnOptions contains the optional settings for configuring an AMQP connection.

type ConnectionError added in v1.1.0

type ConnectionError struct {
	// contains filtered or unexported fields
}

ConnectionError is propagated to Session and Senders/Receivers when the connection has been closed or is no longer functional.

func (*ConnectionError) Error added in v1.1.0

func (c *ConnectionError) Error() string

type DetachError

type DetachError struct {
	RemoteError *Error
}

DetachError is returned by a link (Receiver/Sender) when a detach frame is received.

RemoteError will be nil if the link was detached gracefully.

func (*DetachError) Error

func (e *DetachError) Error() string

type Durability

type Durability = encoding.Durability

Durability specifies the durability of a link.

type Error

type Error = encoding.Error

type ErrorCondition

type ErrorCondition = encoding.ErrorCondition
const (
	// AMQP Errors
	ErrorInternalError         ErrorCondition = "amqp:internal-error"
	ErrorNotFound              ErrorCondition = "amqp:not-found"
	ErrorUnauthorizedAccess    ErrorCondition = "amqp:unauthorized-access"
	ErrorDecodeError           ErrorCondition = "amqp:decode-error"
	ErrorResourceLimitExceeded ErrorCondition = "amqp:resource-limit-exceeded"
	ErrorNotAllowed            ErrorCondition = "amqp:not-allowed"
	ErrorInvalidField          ErrorCondition = "amqp:invalid-field"
	ErrorNotImplemented        ErrorCondition = "amqp:not-implemented"
	ErrorResourceLocked        ErrorCondition = "amqp:resource-locked"
	ErrorPreconditionFailed    ErrorCondition = "amqp:precondition-failed"
	ErrorResourceDeleted       ErrorCondition = "amqp:resource-deleted"
	ErrorIllegalState          ErrorCondition = "amqp:illegal-state"
	ErrorFrameSizeTooSmall     ErrorCondition = "amqp:frame-size-too-small"

	// Connection Errors
	ErrorConnectionForced   ErrorCondition = "amqp:connection:forced"
	ErrorFramingError       ErrorCondition = "amqp:connection:framing-error"
	ErrorConnectionRedirect ErrorCondition = "amqp:connection:redirect"

	// Session Errors
	ErrorWindowViolation  ErrorCondition = "amqp:session:window-violation"
	ErrorErrantLink       ErrorCondition = "amqp:session:errant-link"
	ErrorHandleInUse      ErrorCondition = "amqp:session:handle-in-use"
	ErrorUnattachedHandle ErrorCondition = "amqp:session:unattached-handle"

	// Link Errors
	ErrorDetachForced          ErrorCondition = "amqp:link:detach-forced"
	ErrorTransferLimitExceeded ErrorCondition = "amqp:link:transfer-limit-exceeded"
	ErrorMessageSizeExceeded   ErrorCondition = "amqp:link:message-size-exceeded"
	ErrorLinkRedirect          ErrorCondition = "amqp:link:redirect"
	ErrorStolen                ErrorCondition = "amqp:link:stolen"
)

Error Conditions

type ExpiryPolicy

type ExpiryPolicy = encoding.ExpiryPolicy

ExpiryPolicy specifies when the expiry timer of a terminus starts counting down from the timeout value.

If the link is subsequently re-attached before the terminus is expired, then the count down is aborted. If the conditions for the terminus-expiry-policy are subsequently re-met, the expiry timer restarts from its originally configured timeout value.

type LinkFilter added in v1.1.0

type LinkFilter func(encoding.Filter)

LinkFilter is an advanced API for setting non-standard source filters. Please file an issue or open a PR if a standard filter is missing from this library.

The name is the key for the filter map. It will be encoded as an AMQP symbol type.

The code is the descriptor of the described type value. The domain-id and descriptor-id should be concatenated together. If 0 is passed as the code, the name will be used as the descriptor.

The value is the value of the descriped types. Acceptable types for value are specific to the filter.

Example:

The standard selector-filter is defined as:

<descriptor name="apache.org:selector-filter:string" code="0x0000468C:0x00000004"/>

In this case the name is "apache.org:selector-filter:string" and the code is 0x0000468C00000004.

LinkSourceFilter("apache.org:selector-filter:string", 0x0000468C00000004, exampleValue)

References:

http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-filter-set
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#section-descriptor-values

func LinkFilterSelector added in v1.1.0

func LinkFilterSelector(filter string) LinkFilter

LinkFilterSelector creates or updates the selector filter (apache.org:selector-filter:string) for this LinkFilter.

func LinkFilterSource added in v1.1.0

func LinkFilterSource(name string, code uint64, value interface{}) LinkFilter

LinkFilterSource creates or updates the named filter for this LinkFilter.

type Message

type Message struct {
	// Message format code.
	//
	// The upper three octets of a message format code identify a particular message
	// format. The lowest octet indicates the version of said message format. Any
	// given version of a format is forwards compatible with all higher versions.
	Format uint32

	// The DeliveryTag can be up to 32 octets of binary data.
	// Note that when mode one is enabled there will be no delivery tag.
	DeliveryTag []byte

	// The header section carries standard delivery details about the transfer
	// of a message through the AMQP network.
	Header *MessageHeader

	// The delivery-annotations section is used for delivery-specific non-standard
	// properties at the head of the message. Delivery annotations convey information
	// from the sending peer to the receiving peer.
	DeliveryAnnotations encoding.Annotations

	// The message-annotations section is used for properties of the message which
	// are aimed at the infrastructure.
	Annotations encoding.Annotations

	// The properties section is used for a defined set of standard properties of
	// the message.
	Properties *MessageProperties

	// The application-properties section is a part of the bare message used for
	// structured application data. Intermediaries can use the data within this
	// structure for the purposes of filtering or routing.
	ApplicationProperties map[string]interface{}

	// Data payloads.
	// A data section contains opaque binary data.
	Data [][]byte

	// Value payload.
	// An amqp-value section contains a single AMQP value.
	Value interface{}

	// Sequence will contain AMQP sequence sections from the body of the message.
	// An amqp-sequence section contains an AMQP sequence.
	Sequence [][]interface{}

	// The footer section is used for details about the message or delivery which
	// can only be calculated or evaluated once the whole bare message has been
	// constructed or seen (for example message hashes, HMACs, signatures and
	// encryption details).
	Footer encoding.Annotations

	// Mark the message as settled when LinkSenderSettle is ModeMixed.
	//
	// This field is ignored when LinkSenderSettle is not ModeMixed.
	SendSettled bool
	// contains filtered or unexported fields
}

Message is an AMQP message.

func NewMessage

func NewMessage(data []byte) *Message

NewMessage returns a *Message with data as the payload.

This constructor is intended as a helper for basic Messages with a single data payload. It is valid to construct a Message directly for more complex usages.

func (*Message) GetData

func (m *Message) GetData() []byte

GetData returns the first []byte from the Data field or nil if Data is empty.

func (*Message) LinkName

func (m *Message) LinkName() string

LinkName returns the receiving link name or the empty string.

func (*Message) Marshal

func (m *Message) Marshal(wr *buffer.Buffer) error

func (*Message) MarshalBinary

func (m *Message) MarshalBinary() ([]byte, error)

MarshalBinary encodes the message into binary form.

func (*Message) Unmarshal

func (m *Message) Unmarshal(r *buffer.Buffer) error

func (*Message) UnmarshalBinary

func (m *Message) UnmarshalBinary(data []byte) error

UnmarshalBinary decodes the message from binary form.

type MessageHeader

type MessageHeader struct {
	Durable       bool
	Priority      uint8
	TTL           time.Duration // from milliseconds
	FirstAcquirer bool
	DeliveryCount uint32
}

MessageHeader carries standard delivery details about the transfer of a message.

func (*MessageHeader) Marshal

func (h *MessageHeader) Marshal(wr *buffer.Buffer) error

func (*MessageHeader) Unmarshal

func (h *MessageHeader) Unmarshal(r *buffer.Buffer) error

type MessageID added in v1.1.0

type MessageID = interface{}

AMQPMessageID corresponds to the 'message-id' type in the AMQP spec. Internally it can be one of the following: - uint64: <type name="message-id-ulong" class="restricted" source="ulong" provides="message-id"/> - amqp.UUID: <type name="message-id-uuid" class="restricted" source="uuid" provides="message-id"/> - []byte: <type name="message-id-binary" class="restricted" source="binary" provides="message-id"/> - string: <type name="message-id-string" class="restricted" source="string" provides="message-id"/>

type MessageProperties

type MessageProperties struct {
	// Message-id, if set, uniquely identifies a message within the message system.
	// The message producer is usually responsible for setting the message-id in
	// such a way that it is assured to be globally unique. A broker MAY discard a
	// message as a duplicate if the value of the message-id matches that of a
	// previously received message sent to the same node.
	MessageID MessageID // uint64, UUID, []byte, or string

	// The identity of the user responsible for producing the message.
	// The client sets this value, and it MAY be authenticated by intermediaries.
	UserID Binary

	// The to field identifies the node that is the intended destination of the message.
	// On any given transfer this might not be the node at the receiving end of the link.
	To *Address

	// A common field for summary information about the message content and purpose.
	Subject *string

	// The address of the node to send replies to.
	ReplyTo *Address

	// This is a client-specific id that can be used to mark or identify messages
	// between clients.
	CorrelationID MessageID // uint64, UUID, []byte, or string

	// The RFC-2046 [RFC2046] MIME type for the message's application-data section
	// (body). As per RFC-2046 [RFC2046] this can contain a charset parameter defining
	// the character encoding used: e.g., 'text/plain; charset="utf-8"'.
	//
	// For clarity, as per section 7.2.1 of RFC-2616 [RFC2616], where the content type
	// is unknown the content-type SHOULD NOT be set. This allows the recipient the
	// opportunity to determine the actual type. Where the section is known to be truly
	// opaque binary data, the content-type SHOULD be set to application/octet-stream.
	//
	// When using an application-data section with a section code other than data,
	// content-type SHOULD NOT be set.
	ContentType *Symbol

	// The content-encoding property is used as a modifier to the content-type.
	// When present, its value indicates what additional content encodings have been
	// applied to the application-data, and thus what decoding mechanisms need to be
	// applied in order to obtain the media-type referenced by the content-type header
	// field.
	//
	// Content-encoding is primarily used to allow a document to be compressed without
	// losing the identity of its underlying content type.
	//
	// Content-encodings are to be interpreted as per section 3.5 of RFC 2616 [RFC2616].
	// Valid content-encodings are registered at IANA [IANAHTTPPARAMS].
	//
	// The content-encoding MUST NOT be set when the application-data section is other
	// than data. The binary representation of all other application-data section types
	// is defined completely in terms of the AMQP type system.
	//
	// Implementations MUST NOT use the identity encoding. Instead, implementations
	// SHOULD NOT set this property. Implementations SHOULD NOT use the compress encoding,
	// except as to remain compatible with messages originally sent with other protocols,
	// e.g. HTTP or SMTP.
	//
	// Implementations SHOULD NOT specify multiple content-encoding values except as to
	// be compatible with messages originally sent with other protocols, e.g. HTTP or SMTP.
	ContentEncoding *Symbol

	// An absolute time when this message is considered to be expired.
	AbsoluteExpiryTime *time.Time

	// An absolute time when this message was created.
	CreationTime *time.Time

	// Identifies the group the message belongs to.
	GroupID *string

	// The relative position of this message within its group.
	GroupSequence *SequenceNumber // RFC-1982 sequence number

	// This is a client-specific id that is used so that client can send replies to this
	// message to a specific group.
	ReplyToGroupID *string
}

MessageProperties is the defined set of properties for AMQP messages.

func (*MessageProperties) Marshal

func (p *MessageProperties) Marshal(wr *buffer.Buffer) error

func (*MessageProperties) Unmarshal

func (p *MessageProperties) Unmarshal(r *buffer.Buffer) error

type ModifyMessageOptions added in v1.1.0

type ModifyMessageOptions struct {
	// DeliveryFailed indicates that the server must consider this an
	// unsuccessful delivery attempt and increment the delivery count.
	DeliveryFailed bool

	// UndeliverableHere indicates that the server must not redeliver
	// the message to this link.
	UndeliverableHere bool

	// Annotations is an optional annotation map to be merged
	// with the existing message annotations, overwriting existing keys
	// if necessary.
	Annotations Annotations
}

ModifyMessageOptions contains the optional parameters to ModifyMessage.

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver receives messages on a single AMQP link.

func (*Receiver) AcceptMessage

func (r *Receiver) AcceptMessage(ctx context.Context, msg *Message) error

Accept notifies the server that the message has been accepted and does not require redelivery.

func (*Receiver) Address

func (r *Receiver) Address() string

Address returns the link's address.

func (*Receiver) Close

func (r *Receiver) Close(ctx context.Context) error

Close closes the Receiver and AMQP link.

If ctx expires while waiting for servers response, ctx.Err() will be returned. The session will continue to wait for the response until the Session or Client is closed.

func (*Receiver) DrainCredit

func (r *Receiver) DrainCredit(ctx context.Context) error

DrainCredit sets the drain flag on the next flow frame and waits for the drain to be acknowledged.

func (*Receiver) IssueCredit

func (r *Receiver) IssueCredit(credit uint32) error

IssueCredit adds credits to be requested in the next flow request.

func (*Receiver) LinkName

func (r *Receiver) LinkName() string

LinkName returns associated link name or an empty string if link is not defined.

func (*Receiver) LinkSourceFilterValue

func (r *Receiver) LinkSourceFilterValue(name string) interface{}

LinkSourceFilterValue retrieves the specified link source filter value or nil if it doesn't exist.

func (*Receiver) ModifyMessage

func (r *Receiver) ModifyMessage(ctx context.Context, msg *Message, options *ModifyMessageOptions) error

Modify notifies the server that the message was not acted upon and should be modifed.

func (*Receiver) Prefetched

func (r *Receiver) Prefetched() *Message

Prefetched returns the next message that is stored in the Receiver's prefetch cache. It does NOT wait for the remote sender to send messages and returns immediately if the prefetch cache is empty. To receive from the prefetch and wait for messages from the remote Sender use `Receive`.

When using ModeSecond, you *must* take an action on the message by calling one of the following: AcceptMessage, RejectMessage, ReleaseMessage, ModifyMessage. When using ModeFirst, the message is spontaneously Accepted at reception.

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context) (*Message, error)

Receive returns the next message from the sender.

Blocks until a message is received, ctx completes, or an error occurs. When using ModeSecond, you *must* take an action on the message by calling one of the following: AcceptMessage, RejectMessage, ReleaseMessage, ModifyMessage. When using ModeFirst, the message is spontaneously Accepted at reception.

func (*Receiver) RejectMessage

func (r *Receiver) RejectMessage(ctx context.Context, msg *Message, e *Error) error

Reject notifies the server that the message is invalid.

Rejection error is optional.

func (*Receiver) ReleaseMessage

func (r *Receiver) ReleaseMessage(ctx context.Context, msg *Message) error

Release releases the message back to the server. The message may be redelivered to this or another consumer.

type ReceiverOptions added in v1.1.0

type ReceiverOptions struct {
	// LinkBatching toggles batching of message disposition.
	//
	// When enabled, accepting a message does not send the disposition
	// to the server until the batch is equal to link credit or the
	// batch max age expires.
	//
	// Default: false.
	Batching bool

	// BatchMaxAge sets the maximum time between the start
	// of a disposition batch and sending the batch to the server.
	//
	// Has no effect when Batching is false.
	//
	// Default: 5 seconds.
	BatchMaxAge time.Duration

	// Capabilities is the list of extension capabilities the receiver supports/desires.
	Capabilities []string

	// Credit specifies the maximum number of unacknowledged messages
	// the sender can transmit.
	//
	// Default: 1.
	Credit uint32

	// Durability indicates what state of the receiver will be retained durably.
	//
	// Default: DurabilityNone.
	Durability Durability

	// DynamicAddress indicates a dynamic address is to be used.
	// Any specified address will be ignored.
	//
	// Default: false.
	DynamicAddress bool

	// ExpiryPolicy determines when the expiry timer of the sender starts counting
	// down from the timeout value.  If the link is subsequently re-attached before
	// the timeout is reached, the count down is aborted.
	//
	// Default: ExpirySessionEnd.
	ExpiryPolicy ExpiryPolicy

	// ExpiryTimeout is the duration in seconds that the sender will be retained.
	//
	// Default: 0.
	ExpiryTimeout uint32

	// Filters contains the desired filters for this receiver.
	// If the peer cannot fulfill the filters the link will be detached.
	Filters []LinkFilter

	// ManualCredits enables manual credit management for this link.
	// Credits can be added with IssueCredit(), and links can also be
	// drained with DrainCredit().
	ManualCredits bool

	// MaxMessageSize sets the maximum message size that can
	// be received on the link.
	//
	// A size of zero indicates no limit.
	//
	// Default: 0.
	MaxMessageSize uint64

	// Name sets the name of the link.
	//
	// Link names must be unique per-connection and direction.
	//
	// Default: randomly generated.
	Name string

	// Properties sets an entry in the link properties map sent to the server.
	Properties map[string]interface{}

	// RequestedSenderSettleMode sets the requested sender settlement mode.
	//
	// If a settlement mode is explicitly set and the server does not
	// honor it an error will be returned during link attachment.
	//
	// Default: Accept the settlement mode set by the server, commonly ModeMixed.
	RequestedSenderSettleMode *SenderSettleMode

	// SettlementMode sets the settlement mode in use by this receiver.
	//
	// Default: ModeFirst.
	SettlementMode *ReceiverSettleMode

	// TargetAddress specifies the target address for this receiver.
	TargetAddress string
}

type ReceiverSettleMode

type ReceiverSettleMode = encoding.ReceiverSettleMode

ReceiverSettleMode specifies how the receiver will settle messages.

type SASLType added in v1.1.0

type SASLType func(c *conn) error

SASLType represents a SASL configuration to use during authentication.

func SASLTypeAnonymous added in v1.1.0

func SASLTypeAnonymous() SASLType

ConnSASLAnonymous enables SASL ANONYMOUS authentication for the connection.

func SASLTypeExternal added in v1.1.0

func SASLTypeExternal(resp string) SASLType

ConnSASLExternal enables SASL EXTERNAL authentication for the connection. The value for resp is dependent on the type of authentication (empty string is common for TLS). See https://datatracker.ietf.org/doc/html/rfc4422#appendix-A for additional info.

func SASLTypePlain added in v1.1.0

func SASLTypePlain(username, password string) SASLType

ConnSASLPlain enables SASL PLAIN authentication for the connection.

SASL PLAIN transmits credentials in plain text and should only be used on TLS/SSL enabled connection.

func SASLTypeXOAUTH2 added in v1.1.0

func SASLTypeXOAUTH2(username, bearer string, saslMaxFrameSizeOverride uint32) SASLType

ConnSASLXOAUTH2 enables SASL XOAUTH2 authentication for the connection.

The saslMaxFrameSizeOverride parameter allows the limit that governs the maximum frame size this client will allow itself to generate to be raised for the sasl-init frame only. Set this when the size of the size of the SASL XOAUTH2 initial client response (which contains the username and bearer token) would otherwise breach the 512 byte min-max-frame-size (http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#definition-MIN-MAX-FRAME-SIZE). Pass -1 to keep the default.

SASL XOAUTH2 transmits the bearer in plain text and should only be used on TLS/SSL enabled connection.

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender sends messages on a single AMQP link.

func (*Sender) Address

func (s *Sender) Address() string

Address returns the link's address.

func (*Sender) Close

func (s *Sender) Close(ctx context.Context) error

Close closes the Sender and AMQP link.

func (*Sender) LinkName

func (s *Sender) LinkName() string

LinkName() is the name of the link used for this Sender.

func (*Sender) MaxMessageSize

func (s *Sender) MaxMessageSize() uint64

MaxMessageSize is the maximum size of a single message.

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, msg *Message) error

Send sends a Message.

Blocks until the message is sent, ctx completes, or an error occurs.

Send is safe for concurrent use. Since only a single message can be sent on a link at a time, this is most useful when settlement confirmation has been requested (receiver settle mode is "Second"). In this case, additional messages can be sent while the current goroutine is waiting for the confirmation.

type SenderOptions added in v1.1.0

type SenderOptions struct {
	// Capabilities is the list of extension capabilities the sender supports/desires.
	Capabilities []string

	// Durability indicates what state of the sender will be retained durably.
	//
	// Default: DurabilityNone.
	Durability Durability

	// DynamicAddress indicates a dynamic address is to be used.
	// Any specified address will be ignored.
	//
	// Default: false.
	DynamicAddress bool

	// ExpiryPolicy determines when the expiry timer of the sender starts counting
	// down from the timeout value.  If the link is subsequently re-attached before
	// the timeout is reached, the count down is aborted.
	//
	// Default: ExpirySessionEnd.
	ExpiryPolicy ExpiryPolicy

	// ExpiryTimeout is the duration in seconds that the sender will be retained.
	//
	// Default: 0.
	ExpiryTimeout uint32

	// IgnoreDispositionErrors controls automatic detach on disposition errors.
	//
	// Default: false.
	IgnoreDispositionErrors bool

	// Name sets the name of the link.
	//
	// Link names must be unique per-connection and direction.
	//
	// Default: randomly generated.
	Name string

	// Properties sets an entry in the link properties map sent to the server.
	Properties map[string]interface{}

	// RequestedReceiverSettleMode sets the requested receiver settlement mode.
	//
	// If a settlement mode is explicitly set and the server does not
	// honor it an error will be returned during link attachment.
	//
	// Default: Accept the settlement mode set by the server, commonly ModeFirst.
	RequestedReceiverSettleMode *ReceiverSettleMode

	// SettlementMode sets the settlement mode in use by this sender.
	//
	// Default: ModeMixed.
	SettlementMode *SenderSettleMode

	// SourceAddress specifies the source address for this sender.
	SourceAddress string
}

type SenderSettleMode

type SenderSettleMode = encoding.SenderSettleMode

SenderSettleMode specifies how the sender will settle messages.

type SequenceNumber added in v1.1.0

type SequenceNumber = uint32

AMQPSequenceNumber corresponds to the `sequence-no` type in the AMQP spec. <type name="sequence-no" class="restricted" source="uint"/>

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session is an AMQP session.

A session multiplexes Receivers.

func (*Session) Close

func (s *Session) Close(ctx context.Context) error

Close gracefully closes the session.

If ctx expires while waiting for servers response, ctx.Err() will be returned. The session will continue to wait for the response until the Client is closed.

func (*Session) NewReceiver

func (s *Session) NewReceiver(ctx context.Context, source string, opts *ReceiverOptions) (*Receiver, error)

NewReceiver opens a new receiver link on the session. opts: pass nil to accept the default values.

func (*Session) NewSender

func (s *Session) NewSender(ctx context.Context, target string, opts *SenderOptions) (*Sender, error)

NewSender opens a new sender link on the session. opts: pass nil to accept the default values.

type SessionOptions added in v1.1.0

type SessionOptions struct {
	// IncomingWindow sets the maximum number of unacknowledged
	// transfer frames the server can send.
	IncomingWindow uint32

	// OutgoingWindow sets the maximum number of unacknowledged
	// transfer frames the client can send.
	OutgoingWindow uint32

	// MaxLinks sets the maximum number of links (Senders/Receivers)
	// allowed on the session.
	//
	// Minimum: 1.
	// Default: 4294967295.
	MaxLinks uint32
}

SessionOption contains the optional settings for configuring an AMQP session.

type Symbol added in v1.1.0

type Symbol = string

AMQPSymbol corresponds to the 'symbol' type in the AMQP spec. <type name="symbol" class="primitive"/> And either: - variable-width, 1 byte size up to 2^8 - 1 seven bit ASCII characters representing a symbolic value - variable-width, 4 byte size up to 2^32 - 1 seven bit ASCII characters representing a symbolic value

type UUID

type UUID = encoding.UUID

Directories

Path Synopsis
internal
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL