amqp

package
v0.8.59 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2023 License: GPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ContentTooLarge             = 311
	NoRoute                     = 312
	NoConsumers                 = 313
	ConnectionForced            = 320
	InvalidPath                 = 402
	AccessRefused               = 403
	NotFound                    = 404
	ResourceLocked              = 405
	PreconditionFailed          = 406
	FrameError                  = 501
	SyntaxError                 = 502
	CommandInvalid              = 503
	ChannelError                = 504
	UnexpectedFrame             = 505
	ResourceError               = 506
	NotAllowed                  = 530
	NotImplemented              = 540
	InternalError               = 541
	MaxSizeError                = 551
	MaxHeaderFrameSizeError     = 552
	BadMethodFrameUnknownMethod = 601
	BadMethodFrameUnknownClass  = 602
)

Error codes that can be sent from the server during a connection or channel exception or used by the client to indicate a class of error like ErrCredentials. The text of the error is likely more interesting than these constants.

View Source
const (
	ExchangeDirect  = "direct"
	ExchangeFanout  = "fanout"
	ExchangeTopic   = "topic"
	ExchangeHeaders = "headers"
)

Constants for standard AMQP 0-9-1 exchange types.

View Source
const (
	Transient  uint8 = 1
	Persistent uint8 = 2
)

DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.

This remains typed as uint8 to match Publishing.DeliveryMode. Other delivery modes specific to custom queue implementations are not enumerated here.

Variables

View Source
var (
	// ErrClosed is returned when the channel or connection is not open
	ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"}

	// ErrChannelMax is returned when Connection.Channel has been called enough
	// times that all channel IDs have been exhausted in the client or the
	// server.
	ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"}

	// ErrSASL is returned from Dial when the authentication mechanism could not
	// be negoated.
	ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"}

	// ErrCredentials is returned when the authenticated client is not authorized
	// to any vhost.
	ErrCredentials = &Error{Code: AccessRefused, Reason: "username or password not allowed"}

	// ErrVhost is returned when the authenticated user is not permitted to
	// access the requested Vhost.
	ErrVhost = &Error{Code: AccessRefused, Reason: "no access to this vhost"}

	// ErrSyntax is hard protocol error, indicating an unsupported protocol,
	// implementation or encoding.
	ErrSyntax = &Error{Code: SyntaxError, Reason: "invalid field or value inside of a frame"}

	// ErrFrame is returned when the protocol frame cannot be read from the
	// server, indicating an unsupported protocol or unsupported frame type.
	ErrFrame = &Error{Code: FrameError, Reason: "frame could not be parsed"}

	// ErrCommandInvalid is returned when the server sends an unexpected response
	// to this requested message type. This indicates a bug in this client.
	ErrCommandInvalid = &Error{Code: CommandInvalid, Reason: "unexpected command received"}

	// ErrUnexpectedFrame is returned when something other than a method or
	// heartbeat frame is delivered to the Connection, indicating a bug in the
	// client.
	ErrUnexpectedFrame = &Error{Code: UnexpectedFrame, Reason: "unexpected frame received"}

	// ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
	ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}

	ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 16MB"}

	ErrMaxHeaderFrameSize = &Error{Code: MaxHeaderFrameSizeError, Reason: "an AMQP header cannot be bigger than 512 bytes"}

	ErrBadMethodFrameUnknownMethod = &Error{Code: BadMethodFrameUnknownMethod, Reason: "Bad method frame, unknown method"}

	ErrBadMethodFrameUnknownClass = &Error{Code: BadMethodFrameUnknownClass, Reason: "Bad method frame, unknown class"}
)
View Source
var Dissector dissecting

Functions

func NewDissector

func NewDissector() api.Dissector

Types

type AMQPPayload

type AMQPPayload struct {
	Data interface{}
}

func (AMQPPayload) MarshalJSON

func (h AMQPPayload) MarshalJSON() ([]byte, error)

type AMQPPayloader

type AMQPPayloader interface {
	MarshalJSON() ([]byte, error)
}

type AMQPWrapper

type AMQPWrapper struct {
	Method  string      `json:"method"`
	Url     string      `json:"url"`
	Details interface{} `json:"details"`
}

type AmqpReader

type AmqpReader struct {
	R io.Reader
}

type BasicConsume

type BasicConsume struct {
	Queue       string `json:"queue"`
	ConsumerTag string `json:"consumerTag"`
	NoLocal     bool   `json:"noLocal"`
	NoAck       bool   `json:"noAck"`
	Exclusive   bool   `json:"exclusive"`
	NoWait      bool   `json:"noWait"`
	Arguments   Table  `json:"arguments"`
	// contains filtered or unexported fields
}

type BasicConsumeOk

type BasicConsumeOk struct {
	ConsumerTag string `json:"consumerTag"`
}

type BasicDeliver

type BasicDeliver struct {
	ConsumerTag string     `json:"consumerTag"`
	DeliveryTag uint64     `json:"deliveryTag"`
	Redelivered bool       `json:"redelivered"`
	Exchange    string     `json:"exchange"`
	RoutingKey  string     `json:"routingKey"`
	Properties  Properties `json:"properties"`
	Body        []byte     `json:"body"`
}

type BasicPublish

type BasicPublish struct {
	Exchange   string     `json:"exchange"`
	RoutingKey string     `json:"routingKey"`
	Mandatory  bool       `json:"mandatory"`
	Immediate  bool       `json:"immediate"`
	Properties Properties `json:"properties"`
	Body       []byte     `json:"body"`
	// contains filtered or unexported fields
}

type Blocking

type Blocking struct {
	Active bool   // TCP pushback active/inactive on server
	Reason string // Server reason for activation
}

Blocking notifies the server's TCP flow control of the Connection. When a server hits a memory or disk alarm it will block all connections until the resources are reclaimed. Use NotifyBlock on the Connection to receive these events.

type BodyFrame

type BodyFrame struct {
	ChannelId uint16
	Body      []byte
}

Content is the application data we carry from client-to-client via the AMQP server. Content is, roughly speaking, a set of properties plus a binary data part. The set of allowed properties are defined by the Basic class, and these form the "content header frame". The data can be any size, and MAY be broken into several (or many) chunks, each forming a "content body frame".

Looking at the frames for a specific channel, as they pass on the wire, we might see something like this:

[method]
[method] [header] [body] [body]
[method]
...

type Confirmation

type Confirmation struct {
	DeliveryTag uint64 // A 1 based counter of publishings from when the channel was put in Confirm mode
	Ack         bool   // True when the server successfully received the publishing
}

Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag. Use NotifyPublish on the Channel to consume these events.

type ConnectionClose

type ConnectionClose struct {
	ReplyCode uint16 `json:"relyCode"`
	ReplyText string `json:"replyText"`
	ClassId   uint16 `json:"classId"`
	MethodId  uint16 `json:"methodId"`
}

type ConnectionCloseOk

type ConnectionCloseOk struct {
}

type ConnectionStart

type ConnectionStart struct {
	VersionMajor     byte   `json:"versionMajor"`
	VersionMinor     byte   `json:"versionMinor"`
	ServerProperties Table  `json:"serverProperties"`
	Mechanisms       string `json:"mechanisms"`
	Locales          string `json:"locales"`
}

type ConnectionStartOk

type ConnectionStartOk struct {
	ClientProperties Table  `json:"clientProperties"`
	Mechanism        string `json:"mechanism"`
	Response         string `json:"response"`
	Locale           string `json:"locale"`
}

type Decimal

type Decimal struct {
	Scale uint8
	Value int32
}

Decimal matches the AMQP decimal type. Scale is the number of decimal digits Scale == 2, Value == 12345, Decimal == 123.45

type Error

type Error struct {
	Code    int    // constant code from the specification
	Reason  string // description of the error
	Server  bool   // true when initiated from the server, false when from this library
	Recover bool   // true when this error can be recovered by retrying later or with different parameters
}

Error captures the code and reason a channel or connection has been closed by the server.

func (Error) Error

func (e Error) Error() string

type ExchangeDeclare

type ExchangeDeclare struct {
	Exchange   string `json:"exchange"`
	Type       string `json:"type"`
	Passive    bool   `json:"passive"`
	Durable    bool   `json:"durable"`
	AutoDelete bool   `json:"autoDelete"`
	Internal   bool   `json:"internal"`
	NoWait     bool   `json:"noWait"`
	Arguments  Table  `json:"arguments"`
	// contains filtered or unexported fields
}

type ExchangeDeclareOk

type ExchangeDeclareOk struct {
}

type HeaderFrame

type HeaderFrame struct {
	ChannelId uint16
	ClassId   uint16

	Size       uint64
	Properties Properties
	// contains filtered or unexported fields
}

Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally defined as carrying content. When a peer sends such a method frame, it always follows it with a content header and zero or more content body frames.

A content header frame has this format:

0          2        4           12               14
+----------+--------+-----------+----------------+------------- - -
| class-id | weight | body size | property flags | property list...
+----------+--------+-----------+----------------+------------- - -
  short     short    long long       short        remainder...

We place content body in distinct frames (rather than including it in the method) so that AMQP may support "zero copy" techniques in which content is never marshalled or encoded. We place the content properties in their own frame so that recipients can selectively discard contents they do not want to process

type HeartbeatFrame

type HeartbeatFrame struct {
	ChannelId uint16
}

Heartbeating is a technique designed to undo one of TCP/IP's features, namely its ability to recover from a broken physical connection by closing only after a quite long time-out. In some scenarios we need to know very rapidly if a peer is disconnected or not responding for other reasons (e.g. it is looping). Since heartbeating can be done at a low level, we implement this as a special type of frame that peers exchange at the transport level, rather than as a class method.

type Message

type Message interface {
	// contains filtered or unexported methods
}

type MethodFrame

type MethodFrame struct {
	ChannelId uint16
	ClassId   uint16
	MethodId  uint16
	Method    Message
}

Method frames carry the high-level protocol commands (which we call "methods"). One method frame carries one command. The method frame payload has this format:

0          2           4
+----------+-----------+-------------- - -
| class-id | method-id | arguments...
+----------+-----------+-------------- - -
   short      short    ...

To process a method frame, we:

  1. Read the method frame payload.
  2. Unpack it into a structure. A given method always has the same structure, so we can unpack the method rapidly. 3. Check that the method is allowed in the current context.
  3. Check that the method arguments are valid.
  4. Execute the method.

Method frame bodies are constructed as a list of AMQP data fields (bits, integers, strings and string tables). The marshalling code is trivially generated directly from the protocol specifications, and can be very rapid.

type Properties

type Properties struct {
	ContentType     string    `json:"contentType"`     // MIME content type
	ContentEncoding string    `json:"contentEncoding"` // MIME content encoding
	Headers         Table     `json:"headers"`         // Application or header exchange table
	DeliveryMode    uint8     `json:"deliveryMode"`    // queue implementation use - Transient (1) or Persistent (2)
	Priority        uint8     `json:"priority"`        // queue implementation use - 0 to 9
	CorrelationId   string    `json:"correlationId"`   // application use - correlation identifier
	ReplyTo         string    `json:"replyTo"`         // application use - address to to reply to (ex: RPC)
	Expiration      string    `json:"expiration"`      // implementation use - message expiration spec
	MessageId       string    `json:"messageId"`       // application use - message identifier
	Timestamp       time.Time `json:"timestamp"`       // application use - message timestamp
	Type            string    `json:"type"`            // application use - message type name
	UserId          string    `json:"userId"`          // application use - creating user id
	AppId           string    `json:"appId"`           // application use - creating application
	// contains filtered or unexported fields
}

Used by header frames to capture routing and header information

type Publishing

type Publishing struct {
	// Application or exchange specific fields,
	// the headers exchange will inspect this field.
	Headers Table

	// Properties
	ContentType     string    // MIME content type
	ContentEncoding string    // MIME content encoding
	DeliveryMode    uint8     // Transient (0 or 1) or Persistent (2)
	Priority        uint8     // 0 to 9
	CorrelationId   string    // correlation identifier
	ReplyTo         string    // address to to reply to (ex: RPC)
	Expiration      string    // message expiration spec
	MessageId       string    // message identifier
	Timestamp       time.Time // message timestamp
	Type            string    // message type name
	UserId          string    // creating user id - ex: "guest"
	AppId           string    // creating application id

	// The application specific payload of the message
	Body []byte
}

Publishing captures the client message sent to the server. The fields outside of the Headers table included in this struct mirror the underlying fields in the content frame. They use native types for convenience and efficiency.

type Queue

type Queue struct {
	Name      string // server confirmed or generated name
	Messages  int    // count of messages not awaiting acknowledgment
	Consumers int    // number of consumers receiving deliveries
}

Queue captures the current server state of the queue on the server returned from Channel.QueueDeclare or Channel.QueueInspect.

type QueueBind

type QueueBind struct {
	Queue      string `json:"queue"`
	Exchange   string `json:"exchange"`
	RoutingKey string `json:"routingKey"`
	NoWait     bool   `json:"noWait"`
	Arguments  Table  `json:"arguments"`
	// contains filtered or unexported fields
}

type QueueBindOk

type QueueBindOk struct {
}

type QueueDeclare

type QueueDeclare struct {
	Queue      string `json:"queue"`
	Passive    bool   `json:"passive"`
	Durable    bool   `json:"durable"`
	Exclusive  bool   `json:"exclusive"`
	AutoDelete bool   `json:"autoDelete"`
	NoWait     bool   `json:"noWait"`
	Arguments  Table  `json:"arguments"`
	// contains filtered or unexported fields
}

type QueueDeclareOk

type QueueDeclareOk struct {
	Queue         string `json:"queue"`
	MessageCount  uint32 `json:"messageCount"`
	ConsumerCount uint32 `json:"consumerCount"`
}

type Table

type Table map[string]interface{}

Table stores user supplied fields of the following types:

bool
byte
float32
float64
int
int16
int32
int64
nil
string
time.Time
amqp.Decimal
amqp.Table
[]byte
[]interface{} - containing above types

Functions taking a table will immediately fail when the table contains a value of an unsupported type.

The caller must be specific in which precision of integer it wishes to encode.

Use a type assertion when reading values from a table for type conversion.

RabbitMQ expects int32 for integer values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL