driver_amqp

package
v0.3.36 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2025 License: BSD-3-Clause Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DriverAmqp

type DriverAmqp struct {
	// contains filtered or unexported fields
}

func NewDriverAmqp

func NewDriverAmqp(config *DriverAmqpConfig) (*DriverAmqp, error)

func NewDriverAmqpFromString

func NewDriverAmqpFromString(text string) (*DriverAmqp, error)

func (*DriverAmqp) Close

func (instance *DriverAmqp) Close() error

func (*DriverAmqp) ExchangeBind

func (instance *DriverAmqp) ExchangeBind(settings interface{}) error

ExchangeBind binds an exchange to another exchange to create inter-exchange routing topologies on the server. This can decouple the private topology and routing exchanges from exchanges intended solely for publishing endpoints.

Binding two exchanges with identical arguments will not create duplicate bindings.

Binding one exchange to another with multiple bindings will only deliver a message once. For example if you bind your exchange to `amq.fanout` with two different binding keys, only a single message will be delivered to your exchange even though multiple bindings will match.

Given a message delivered to the source exchange, the message will be forwarded to the destination exchange when the routing key is matched.

ExchangeBind("sell", "MSFT", "trade", false, nil)
ExchangeBind("buy", "AAPL", "trade", false, nil)

Delivery       Source      Key      Destination
example        exchange             exchange
-----------------------------------------------
key: AAPL  --> trade ----> MSFT     sell
                     \---> AAPL --> buy

When noWait is true, do not wait for the server to confirm the binding. If any error occurs the channel will be closed. Add a listener to NotifyClose to handle these errors.

Optional arguments specific to the exchanges bound can also be specified.

func (*DriverAmqp) ExchangeDeclare

func (instance *DriverAmqp) ExchangeDeclare(settings interface{}) error

func (*DriverAmqp) ExchangeDelete

func (instance *DriverAmqp) ExchangeDelete(name string, args ...interface{}) error

func (*DriverAmqp) ExchangeUnbind

func (instance *DriverAmqp) ExchangeUnbind(settings interface{}) error

func (*DriverAmqp) GoString

func (instance *DriverAmqp) GoString() string

func (*DriverAmqp) Info

func (instance *DriverAmqp) Info() string

func (*DriverAmqp) NewEmitter

func (instance *DriverAmqp) NewEmitter(settings interface{}) (mq_commons.IEmitter, error)

NewEmitter generate new emitter

func (*DriverAmqp) NewListener

func (instance *DriverAmqp) NewListener(settings interface{}) (mq_commons.IListener, error)

NewListener generate new message listener/consumer

func (*DriverAmqp) NotifyDisconnection

func (instance *DriverAmqp) NotifyDisconnection(connectionCloseCallback func())

func (*DriverAmqp) Ping

func (instance *DriverAmqp) Ping() (bool, error)

func (*DriverAmqp) QueueBind

func (instance *DriverAmqp) QueueBind(settings interface{}) error

QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.

QueueBind("pagers", "alert", "log", false, nil)
QueueBind("emails", "info", "log", false, nil)

Delivery       Exchange  Key       Queue
-----------------------------------------------
key: alert --> log ----> alert --> pagers
key: info ---> log ----> info ---> emails
key: debug --> log       (none)    (dropped)

If a binding with the same key and arguments already exists between the exchange and queue, the attempt to rebind will be ignored and the existing binding will be retained.

In the case that multiple bindings may cause the message to be routed to the same queue, the server will only route the publishing once. This is possible with topic exchanges.

QueueBind("pagers", "alert", "amq.topic", false, nil)
QueueBind("emails", "info", "amq.topic", false, nil)
QueueBind("emails", "#", "amq.topic", false, nil) // match everything

Delivery       Exchange        Key       Queue
-----------------------------------------------
key: alert --> amq.topic ----> alert --> pagers
key: info ---> amq.topic ----> # ------> emails
                         \---> info ---/
key: debug --> amq.topic ----> # ------> emails

It is only possible to bind a durable queue to a durable exchange regardless of whether the queue or exchange is auto-deleted. Bindings between durable queues and exchanges will also be restored on server restart.

If the binding could not complete, an error will be returned and the channel will be closed.

When noWait is false and the queue could not be bound, the channel will be closed with an error.

func (*DriverAmqp) QueueDeclare

func (instance *DriverAmqp) QueueDeclare(settings interface{}) (interface{}, error)

func (*DriverAmqp) QueueDelete

func (instance *DriverAmqp) QueueDelete(name string, args ...interface{}) (int, error)

QueueDelete removes the queue from the server including all bindings then purges the messages based on server configuration, returning the number of messages purged.

func (*DriverAmqp) QueueInspect

func (instance *DriverAmqp) QueueInspect(name string) (interface{}, error)

func (*DriverAmqp) QueuePurge

func (instance *DriverAmqp) QueuePurge(name string, args ...interface{}) (int, error)

QueuePurge removes all messages from the named queue which are not waiting to be acknowledged. Messages that have been delivered but have not yet been acknowledged will not be removed.

When successful, returns the number of messages purged.

func (*DriverAmqp) QueueUnbind

func (instance *DriverAmqp) QueueUnbind(settings interface{}) error

QueueUnbind removes a binding between an exchange and queue matching the key and arguments.

It is possible to send and empty string for the exchange name which means to unbind the queue from the default exchange.

func (*DriverAmqp) RpcCommand

func (instance *DriverAmqp) RpcCommand(rpcChannel string, emitterSettings, listenerSettings interface{}, rawMessage interface{}, callback mq_commons.ListenerHandler, timeout time.Duration) error

func (*DriverAmqp) String

func (instance *DriverAmqp) String() string

type DriverAmqpConfig

type DriverAmqpConfig struct {
	mq_commons.DriverConfig
	Url string `json:"url"`
}

func NewDriverAmqpConfig

func NewDriverAmqpConfig(text string) (*DriverAmqpConfig, error)

type EmitterAmqp

type EmitterAmqp struct {
	// contains filtered or unexported fields
}

func (*EmitterAmqp) Close

func (instance *EmitterAmqp) Close() error

func (*EmitterAmqp) Emit

func (instance *EmitterAmqp) Emit(rawMessage interface{}) error

func (*EmitterAmqp) GoString

func (instance *EmitterAmqp) GoString() string

func (*EmitterAmqp) String

func (instance *EmitterAmqp) String() string

type EmitterAmqpMessage

type EmitterAmqpMessage struct {
	// Application or exchange specific fields,
	// the headers exchange will inspect this field.
	Headers map[string]interface{} `json:"headers,omitempty"`

	// Properties
	ContentType     string    `json:"content-type,omitempty"`     // MIME content type
	ContentEncoding string    `json:"content-encoding,omitempty"` // MIME content encoding
	DeliveryMode    uint8     `json:"delivery-mode,omitempty"`    // Transient (0 or 1) or Persistent (2)
	Priority        uint8     `json:"priority,omitempty"`         // 0 to 9
	CorrelationId   string    `json:"correlation-id,omitempty"`   // correlation identifier
	ReplyTo         string    `json:"reply-to,omitempty"`         // address to reply to (ex: RPC)
	Expiration      string    `json:"expiration,omitempty"`       // message expiration spec
	MessageId       string    `json:"message-id,omitempty"`       // message identifier
	Timestamp       time.Time `json:"timestamp,omitempty"`        // message timestamp
	Type            string    `json:"type,omitempty"`             // message type name
	UserId          string    `json:"user-id,omitempty"`          // creating user id - ex: "guest"
	AppId           string    `json:"app-id,omitempty"`           // creating application id

	// The application specific payload of the message
	Body []byte `json:"body"`
}

EmitterAmqpMessage message to send

type EmitterAmqpSettings

type EmitterAmqpSettings struct {
	Exchange  string `json:"exchange"`
	Key       string `json:"key"`
	Mandatory bool   `json:"mandatory"`
	Immediate bool   `json:"immediate"`
}

EmitterAmqpSettings configuration for emitter

type ExchangeAmqpConfig

type ExchangeAmqpConfig struct {
	Name       string                 `json:"name"`
	Kind       string                 `json:"kind"`
	Durable    bool                   `json:"durable"`
	Internal   bool                   `json:"internal"`
	AutoDelete bool                   `json:"auto-delete"`
	NoWait     bool                   `json:"no-wait"`
	Arguments  map[string]interface{} `json:"args"`
	IsPassive  bool                   `json:"passive"`
}

type ExchangeBindAmqpConfig

type ExchangeBindAmqpConfig struct {
	Destination string                 `json:"destination"`
	Key         string                 `json:"key"`
	Source      string                 `json:"source"`
	NoWait      bool                   `json:"no-wait"`
	Arguments   map[string]interface{} `json:"args"`
}

type ListenerAmqp

type ListenerAmqp struct {
	// contains filtered or unexported fields
}

func (*ListenerAmqp) Close

func (instance *ListenerAmqp) Close() error

func (*ListenerAmqp) GoString

func (instance *ListenerAmqp) GoString() string

func (*ListenerAmqp) Join

func (instance *ListenerAmqp) Join()

func (*ListenerAmqp) JoinTimeout

func (instance *ListenerAmqp) JoinTimeout(d time.Duration)

func (*ListenerAmqp) Listen

func (instance *ListenerAmqp) Listen(callback mq_commons.ListenerHandler) error

func (*ListenerAmqp) String

func (instance *ListenerAmqp) String() string

type ListenerAmqpHandler

type ListenerAmqpHandler struct {
	// contains filtered or unexported fields
}

ListenerAmqpHandler Utility message handler

func NewListenerAmqpHandler

func NewListenerAmqpHandler() *ListenerAmqpHandler

func (*ListenerAmqpHandler) Close

func (instance *ListenerAmqpHandler) Close()

func (*ListenerAmqpHandler) Join

func (instance *ListenerAmqpHandler) Join()

func (*ListenerAmqpHandler) JoinTimeout

func (instance *ListenerAmqpHandler) JoinTimeout(d time.Duration)

type ListenerAmqpMessage

type ListenerAmqpMessage struct {
	// Application or exchange specific fields,
	// the headers exchange will inspect this field.
	Headers map[string]interface{} `json:"headers,omitempty"`

	// Properties
	ContentType     string    `json:"content-type,omitempty"`     // MIME content type
	ContentEncoding string    `json:"content-encoding,omitempty"` // MIME content encoding
	DeliveryMode    uint8     `json:"delivery-mode,omitempty"`    // Transient (0 or 1) or Persistent (2)
	Priority        uint8     `json:"priority,omitempty"`         // 0 to 9
	CorrelationId   string    `json:"correlation-id,omitempty"`   // correlation identifier
	ReplyTo         string    `json:"reply-to,omitempty"`         // address to reply to (ex: RPC)
	Expiration      string    `json:"expiration,omitempty"`       // message expiration spec
	MessageId       string    `json:"message-id,omitempty"`       // message identifier
	Timestamp       time.Time `json:"timestamp,omitempty"`        // message timestamp
	Type            string    `json:"type,omitempty"`             // message type name
	UserId          string    `json:"user-id,omitempty"`          // creating user id - ex: "guest"
	AppId           string    `json:"app-id,omitempty"`           // creating application id

	// Valid only with Channel.Consume
	ConsumerTag string `json:"consumer-tag,omitempty"`

	// Valid only with Channel.Get
	MessageCount uint32 `json:"message-count,omitempty"`

	DeliveryTag uint64 `json:"delivery-tag,omitempty"`
	Redelivered bool   `json:"redelivered"`
	Exchange    string `json:"exchange,omitempty"`    // basic.publish exchange
	RoutingKey  string `json:"routing-key,omitempty"` // basic.publish routing key

	// The application specific payload of the message
	Body []byte `json:"body"`
}

type ListenerAmqpSettings

type ListenerAmqpSettings struct {
	Queue       string                 `json:"queue"`
	ConsumerTag string                 `json:"consumer-tag"`
	NoLocal     bool                   `json:"no-local"`
	AutoAck     bool                   `json:"auto-ack"`
	Exclusive   bool                   `json:"exclusive"`
	NoWait      bool                   `json:"no-wait"`
	Arguments   map[string]interface{} `json:"args"`
}

type QueueAmqpConfig

type QueueAmqpConfig struct {
	Name       string                 `json:"name"`
	Durable    bool                   `json:"durable"`
	Exclusive  bool                   `json:"exclusive"`
	AutoDelete bool                   `json:"auto-delete"`
	NoWait     bool                   `json:"no-wait"`
	Arguments  map[string]interface{} `json:"args"`
	IsPassive  bool                   `json:"passive"`
}

type QueueBindAmqpConfig

type QueueBindAmqpConfig struct {
	Name      string                 `json:"name"`
	Key       string                 `json:"key"`
	Exchange  string                 `json:"exchange"`
	NoWait    bool                   `json:"no-wait"`
	Arguments map[string]interface{} `json:"args"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL