Documentation
¶
Index ¶
- type DriverAmqp
- func (instance *DriverAmqp) Close() error
- func (instance *DriverAmqp) ExchangeBind(settings interface{}) error
- func (instance *DriverAmqp) ExchangeDeclare(settings interface{}) error
- func (instance *DriverAmqp) ExchangeDelete(name string, args ...interface{}) error
- func (instance *DriverAmqp) ExchangeUnbind(settings interface{}) error
- func (instance *DriverAmqp) GoString() string
- func (instance *DriverAmqp) Info() string
- func (instance *DriverAmqp) NewEmitter(settings interface{}) (mq_commons.IEmitter, error)
- func (instance *DriverAmqp) NewListener(settings interface{}) (mq_commons.IListener, error)
- func (instance *DriverAmqp) NotifyDisconnection(connectionCloseCallback func())
- func (instance *DriverAmqp) Ping() (bool, error)
- func (instance *DriverAmqp) QueueBind(settings interface{}) error
- func (instance *DriverAmqp) QueueDeclare(settings interface{}) (interface{}, error)
- func (instance *DriverAmqp) QueueDelete(name string, args ...interface{}) (int, error)
- func (instance *DriverAmqp) QueueInspect(name string) (interface{}, error)
- func (instance *DriverAmqp) QueuePurge(name string, args ...interface{}) (int, error)
- func (instance *DriverAmqp) QueueUnbind(settings interface{}) error
- func (instance *DriverAmqp) RpcCommand(rpcChannel string, emitterSettings, listenerSettings interface{}, ...) error
- func (instance *DriverAmqp) String() string
- type DriverAmqpConfig
- type EmitterAmqp
- type EmitterAmqpMessage
- type EmitterAmqpSettings
- type ExchangeAmqpConfig
- type ExchangeBindAmqpConfig
- type ListenerAmqp
- func (instance *ListenerAmqp) Close() error
- func (instance *ListenerAmqp) GoString() string
- func (instance *ListenerAmqp) Join()
- func (instance *ListenerAmqp) JoinTimeout(d time.Duration)
- func (instance *ListenerAmqp) Listen(callback mq_commons.ListenerHandler) error
- func (instance *ListenerAmqp) String() string
- type ListenerAmqpHandler
- type ListenerAmqpMessage
- type ListenerAmqpSettings
- type QueueAmqpConfig
- type QueueBindAmqpConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DriverAmqp ¶
type DriverAmqp struct {
// contains filtered or unexported fields
}
func NewDriverAmqp ¶
func NewDriverAmqp(config *DriverAmqpConfig) (*DriverAmqp, error)
func NewDriverAmqpFromString ¶
func NewDriverAmqpFromString(text string) (*DriverAmqp, error)
func (*DriverAmqp) Close ¶
func (instance *DriverAmqp) Close() error
func (*DriverAmqp) ExchangeBind ¶
func (instance *DriverAmqp) ExchangeBind(settings interface{}) error
ExchangeBind binds an exchange to another exchange to create inter-exchange routing topologies on the server. This can decouple the private topology and routing exchanges from exchanges intended solely for publishing endpoints.
Binding two exchanges with identical arguments will not create duplicate bindings.
Binding one exchange to another with multiple bindings will only deliver a message once. For example if you bind your exchange to `amq.fanout` with two different binding keys, only a single message will be delivered to your exchange even though multiple bindings will match.
Given a message delivered to the source exchange, the message will be forwarded to the destination exchange when the routing key is matched.
ExchangeBind("sell", "MSFT", "trade", false, nil) ExchangeBind("buy", "AAPL", "trade", false, nil) Delivery Source Key Destination example exchange exchange ----------------------------------------------- key: AAPL --> trade ----> MSFT sell \---> AAPL --> buy
When noWait is true, do not wait for the server to confirm the binding. If any error occurs the channel will be closed. Add a listener to NotifyClose to handle these errors.
Optional arguments specific to the exchanges bound can also be specified.
func (*DriverAmqp) ExchangeDeclare ¶
func (instance *DriverAmqp) ExchangeDeclare(settings interface{}) error
func (*DriverAmqp) ExchangeDelete ¶
func (instance *DriverAmqp) ExchangeDelete(name string, args ...interface{}) error
func (*DriverAmqp) ExchangeUnbind ¶
func (instance *DriverAmqp) ExchangeUnbind(settings interface{}) error
func (*DriverAmqp) GoString ¶
func (instance *DriverAmqp) GoString() string
func (*DriverAmqp) Info ¶
func (instance *DriverAmqp) Info() string
func (*DriverAmqp) NewEmitter ¶
func (instance *DriverAmqp) NewEmitter(settings interface{}) (mq_commons.IEmitter, error)
NewEmitter generate new emitter
func (*DriverAmqp) NewListener ¶
func (instance *DriverAmqp) NewListener(settings interface{}) (mq_commons.IListener, error)
NewListener generate new message listener/consumer
func (*DriverAmqp) NotifyDisconnection ¶
func (instance *DriverAmqp) NotifyDisconnection(connectionCloseCallback func())
func (*DriverAmqp) Ping ¶
func (instance *DriverAmqp) Ping() (bool, error)
func (*DriverAmqp) QueueBind ¶
func (instance *DriverAmqp) QueueBind(settings interface{}) error
QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.
QueueBind("pagers", "alert", "log", false, nil) QueueBind("emails", "info", "log", false, nil) Delivery Exchange Key Queue ----------------------------------------------- key: alert --> log ----> alert --> pagers key: info ---> log ----> info ---> emails key: debug --> log (none) (dropped)
If a binding with the same key and arguments already exists between the exchange and queue, the attempt to rebind will be ignored and the existing binding will be retained.
In the case that multiple bindings may cause the message to be routed to the same queue, the server will only route the publishing once. This is possible with topic exchanges.
QueueBind("pagers", "alert", "amq.topic", false, nil) QueueBind("emails", "info", "amq.topic", false, nil) QueueBind("emails", "#", "amq.topic", false, nil) // match everything Delivery Exchange Key Queue ----------------------------------------------- key: alert --> amq.topic ----> alert --> pagers key: info ---> amq.topic ----> # ------> emails \---> info ---/ key: debug --> amq.topic ----> # ------> emails
It is only possible to bind a durable queue to a durable exchange regardless of whether the queue or exchange is auto-deleted. Bindings between durable queues and exchanges will also be restored on server restart.
If the binding could not complete, an error will be returned and the channel will be closed.
When noWait is false and the queue could not be bound, the channel will be closed with an error.
func (*DriverAmqp) QueueDeclare ¶
func (instance *DriverAmqp) QueueDeclare(settings interface{}) (interface{}, error)
func (*DriverAmqp) QueueDelete ¶
func (instance *DriverAmqp) QueueDelete(name string, args ...interface{}) (int, error)
QueueDelete removes the queue from the server including all bindings then purges the messages based on server configuration, returning the number of messages purged.
func (*DriverAmqp) QueueInspect ¶
func (instance *DriverAmqp) QueueInspect(name string) (interface{}, error)
func (*DriverAmqp) QueuePurge ¶
func (instance *DriverAmqp) QueuePurge(name string, args ...interface{}) (int, error)
QueuePurge removes all messages from the named queue which are not waiting to be acknowledged. Messages that have been delivered but have not yet been acknowledged will not be removed.
When successful, returns the number of messages purged.
func (*DriverAmqp) QueueUnbind ¶
func (instance *DriverAmqp) QueueUnbind(settings interface{}) error
QueueUnbind removes a binding between an exchange and queue matching the key and arguments.
It is possible to send and empty string for the exchange name which means to unbind the queue from the default exchange.
func (*DriverAmqp) RpcCommand ¶
func (instance *DriverAmqp) RpcCommand(rpcChannel string, emitterSettings, listenerSettings interface{}, rawMessage interface{}, callback mq_commons.ListenerHandler, timeout time.Duration) error
func (*DriverAmqp) String ¶
func (instance *DriverAmqp) String() string
type DriverAmqpConfig ¶
type DriverAmqpConfig struct { mq_commons.DriverConfig Url string `json:"url"` }
func NewDriverAmqpConfig ¶
func NewDriverAmqpConfig(text string) (*DriverAmqpConfig, error)
type EmitterAmqp ¶
type EmitterAmqp struct {
// contains filtered or unexported fields
}
func (*EmitterAmqp) Close ¶
func (instance *EmitterAmqp) Close() error
func (*EmitterAmqp) Emit ¶
func (instance *EmitterAmqp) Emit(rawMessage interface{}) error
func (*EmitterAmqp) GoString ¶
func (instance *EmitterAmqp) GoString() string
func (*EmitterAmqp) String ¶
func (instance *EmitterAmqp) String() string
type EmitterAmqpMessage ¶
type EmitterAmqpMessage struct { // Application or exchange specific fields, // the headers exchange will inspect this field. Headers map[string]interface{} `json:"headers,omitempty"` // Properties ContentType string `json:"content-type,omitempty"` // MIME content type ContentEncoding string `json:"content-encoding,omitempty"` // MIME content encoding DeliveryMode uint8 `json:"delivery-mode,omitempty"` // Transient (0 or 1) or Persistent (2) Priority uint8 `json:"priority,omitempty"` // 0 to 9 CorrelationId string `json:"correlation-id,omitempty"` // correlation identifier ReplyTo string `json:"reply-to,omitempty"` // address to reply to (ex: RPC) Expiration string `json:"expiration,omitempty"` // message expiration spec MessageId string `json:"message-id,omitempty"` // message identifier Timestamp time.Time `json:"timestamp,omitempty"` // message timestamp Type string `json:"type,omitempty"` // message type name UserId string `json:"user-id,omitempty"` // creating user id - ex: "guest" AppId string `json:"app-id,omitempty"` // creating application id // The application specific payload of the message Body []byte `json:"body"` }
EmitterAmqpMessage message to send
type EmitterAmqpSettings ¶
type EmitterAmqpSettings struct { Exchange string `json:"exchange"` Key string `json:"key"` Mandatory bool `json:"mandatory"` Immediate bool `json:"immediate"` }
EmitterAmqpSettings configuration for emitter
type ExchangeAmqpConfig ¶
type ExchangeBindAmqpConfig ¶
type ListenerAmqp ¶
type ListenerAmqp struct {
// contains filtered or unexported fields
}
func (*ListenerAmqp) Close ¶
func (instance *ListenerAmqp) Close() error
func (*ListenerAmqp) GoString ¶
func (instance *ListenerAmqp) GoString() string
func (*ListenerAmqp) Join ¶
func (instance *ListenerAmqp) Join()
func (*ListenerAmqp) JoinTimeout ¶
func (instance *ListenerAmqp) JoinTimeout(d time.Duration)
func (*ListenerAmqp) Listen ¶
func (instance *ListenerAmqp) Listen(callback mq_commons.ListenerHandler) error
func (*ListenerAmqp) String ¶
func (instance *ListenerAmqp) String() string
type ListenerAmqpHandler ¶
type ListenerAmqpHandler struct {
// contains filtered or unexported fields
}
ListenerAmqpHandler Utility message handler
func NewListenerAmqpHandler ¶
func NewListenerAmqpHandler() *ListenerAmqpHandler
func (*ListenerAmqpHandler) Close ¶
func (instance *ListenerAmqpHandler) Close()
func (*ListenerAmqpHandler) Join ¶
func (instance *ListenerAmqpHandler) Join()
func (*ListenerAmqpHandler) JoinTimeout ¶
func (instance *ListenerAmqpHandler) JoinTimeout(d time.Duration)
type ListenerAmqpMessage ¶
type ListenerAmqpMessage struct { // Application or exchange specific fields, // the headers exchange will inspect this field. Headers map[string]interface{} `json:"headers,omitempty"` // Properties ContentType string `json:"content-type,omitempty"` // MIME content type ContentEncoding string `json:"content-encoding,omitempty"` // MIME content encoding DeliveryMode uint8 `json:"delivery-mode,omitempty"` // Transient (0 or 1) or Persistent (2) Priority uint8 `json:"priority,omitempty"` // 0 to 9 CorrelationId string `json:"correlation-id,omitempty"` // correlation identifier ReplyTo string `json:"reply-to,omitempty"` // address to reply to (ex: RPC) Expiration string `json:"expiration,omitempty"` // message expiration spec MessageId string `json:"message-id,omitempty"` // message identifier Timestamp time.Time `json:"timestamp,omitempty"` // message timestamp Type string `json:"type,omitempty"` // message type name UserId string `json:"user-id,omitempty"` // creating user id - ex: "guest" AppId string `json:"app-id,omitempty"` // creating application id // Valid only with Channel.Consume ConsumerTag string `json:"consumer-tag,omitempty"` // Valid only with Channel.Get MessageCount uint32 `json:"message-count,omitempty"` DeliveryTag uint64 `json:"delivery-tag,omitempty"` Redelivered bool `json:"redelivered"` Exchange string `json:"exchange,omitempty"` // basic.publish exchange RoutingKey string `json:"routing-key,omitempty"` // basic.publish routing key // The application specific payload of the message Body []byte `json:"body"` }