Documentation ¶
Index ¶
- Constants
- type Channel
- func (channel *Channel) AddUnackedMessage(dTag uint64, cTag string, queue string, message *amqp.Message)
- func (channel *Channel) GetConsumersCount() int
- func (channel *Channel) GetQos() *qos.AmqpQos
- func (channel *Channel) NextDeliveryTag() uint64
- func (channel *Channel) SendContent(method amqp.Method, message *amqp.Message)
- func (channel *Channel) SendMethod(method amqp.Method)
- type Connection
- type Server
- func (srv *Server) GetConnections() map[uint64]*Connection
- func (srv *Server) GetProtoVersion() string
- func (srv *Server) GetStatus() ServerState
- func (srv *Server) GetVhost(name string) *VirtualHost
- func (srv *Server) GetVhosts() map[string]*VirtualHost
- func (srv *Server) Start()
- func (srv *Server) Stop()
- type ServerState
- type UnackedMessage
- type VirtualHost
- func (vhost *VirtualHost) AppendExchange(ex *exchange.Exchange)
- func (vhost *VirtualHost) AppendQueue(qu *queue.Queue) error
- func (vhost *VirtualHost) DeleteQueue(queueName string, ifUnused bool, ifEmpty bool) (uint64, error)
- func (vhost *VirtualHost) GetDefaultExchange() *exchange.Exchange
- func (vhost *VirtualHost) GetExchange(name string) *exchange.Exchange
- func (vhost *VirtualHost) GetExchanges() map[string]*exchange.Exchange
- func (vhost *VirtualHost) GetName() string
- func (vhost *VirtualHost) GetQueue(name string) *queue.Queue
- func (vhost *VirtualHost) GetQueues() map[string]*queue.Queue
- func (vhost *VirtualHost) NewQueue(name string, connID uint64, exclusive bool, autoDelete bool, durable bool, ...) *queue.Queue
- func (vhost *VirtualHost) PersistBinding(binding *binding.Binding)
- func (vhost *VirtualHost) RemoveBindings(bindings []*binding.Binding)
- func (vhost *VirtualHost) Stop() error
Constants ¶
const ( ConnStart = iota ConnStartOK ConnSecure ConnSecureOK ConnTune ConnTuneOK ConnOpen ConnOpenOK ConnCloseOK ConnClosed )
connection status list
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel is an implementation of the AMQP-channel entity Within a single socket connection, there can be multiple independent threads of control, called "channels"
func NewChannel ¶
func NewChannel(id uint16, conn *Connection) *Channel
NewChannel returns new instance of Channel
func (*Channel) AddUnackedMessage ¶
func (channel *Channel) AddUnackedMessage(dTag uint64, cTag string, queue string, message *amqp.Message)
AddUnackedMessage add message to unacked queue
func (*Channel) GetConsumersCount ¶
GetConsumersCount returns consumers count on channel
func (*Channel) NextDeliveryTag ¶
NextDeliveryTag returns next delivery tag for current channel
func (*Channel) SendContent ¶
SendContent send message to consumers or returns to publishers
func (*Channel) SendMethod ¶
SendMethod send method to client Method will be packed into frame and send to outgoing channel
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection represents AMQP-connection
func NewConnection ¶
func NewConnection(server *Server, netConn *net.TCPConn) (connection *Connection)
NewConnection returns new instance of amqp Connection
func (*Connection) GetChannels ¶
func (conn *Connection) GetChannels() map[uint16]*Channel
func (*Connection) GetID ¶
func (conn *Connection) GetID() uint64
func (*Connection) GetRemoteAddr ¶
func (conn *Connection) GetRemoteAddr() net.Addr
func (*Connection) GetUsername ¶
func (conn *Connection) GetUsername() string
func (*Connection) GetVirtualHost ¶
func (conn *Connection) GetVirtualHost() *VirtualHost
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server implements AMQP server
func NewServer ¶
func NewServer(host string, port string, protoVersion string, config *config.Config) (server *Server)
NewServer returns new instance of AMQP Server
func (*Server) GetConnections ¶
func (srv *Server) GetConnections() map[uint64]*Connection
func (*Server) GetProtoVersion ¶
func (*Server) GetStatus ¶
func (srv *Server) GetStatus() ServerState
func (*Server) GetVhost ¶
func (srv *Server) GetVhost(name string) *VirtualHost
func (*Server) GetVhosts ¶
func (srv *Server) GetVhosts() map[string]*VirtualHost
type ServerState ¶
type ServerState int
const ( Stopped ServerState = iota Running Stopping )
server state statuses
type UnackedMessage ¶
type UnackedMessage struct {
// contains filtered or unexported fields
}
UnackedMessage represents the unacknowledged message
type VirtualHost ¶
type VirtualHost struct {
// contains filtered or unexported fields
}
VirtualHost represents AMQP virtual host Each virtual host is "parent" for its queues and exchanges
func NewVhost ¶
func NewVhost(name string, system bool, srv *Server) *VirtualHost
NewVhost returns instance of VirtualHost When instantiating virtual host we 1) init system exchanges 2) load durable exchanges, queues and bindings from server storage 3) load persisted messages from message store into all initiated queues 4) run confirm loop Only after that vhost is in state running msgStoragePersistent, msgStorageTransient
func (*VirtualHost) AppendExchange ¶
func (vhost *VirtualHost) AppendExchange(ex *exchange.Exchange)
AppendExchange append new exchange and persist if it is durable
func (*VirtualHost) AppendQueue ¶
func (vhost *VirtualHost) AppendQueue(qu *queue.Queue) error
AppendQueue append new queue and persist if it is durable and bindings into default exchange
func (*VirtualHost) DeleteQueue ¶
func (vhost *VirtualHost) DeleteQueue(queueName string, ifUnused bool, ifEmpty bool) (uint64, error)
DeleteQueue delete queue from virtual host and all bindings to that queue Also queue will be removed from server storage
func (*VirtualHost) GetDefaultExchange ¶
func (vhost *VirtualHost) GetDefaultExchange() *exchange.Exchange
GetDefaultExchange returns default exchange
func (*VirtualHost) GetExchange ¶
func (vhost *VirtualHost) GetExchange(name string) *exchange.Exchange
GetExchange returns exchange by name or nil if not exists
func (*VirtualHost) GetExchanges ¶
func (vhost *VirtualHost) GetExchanges() map[string]*exchange.Exchange
func (*VirtualHost) GetName ¶
func (vhost *VirtualHost) GetName() string
func (*VirtualHost) GetQueue ¶
func (vhost *VirtualHost) GetQueue(name string) *queue.Queue
GetQueue returns queue by name or nil if not exists
func (*VirtualHost) GetQueues ¶
func (vhost *VirtualHost) GetQueues() map[string]*queue.Queue
GetQueues return all vhost's queues
func (*VirtualHost) NewQueue ¶
func (vhost *VirtualHost) NewQueue(name string, connID uint64, exclusive bool, autoDelete bool, durable bool, shardSize int) *queue.Queue
NewQueue returns new instance of queue by params we can't use just queue.NewQueue, cause we need to set msgStorage to queue
func (*VirtualHost) PersistBinding ¶
func (vhost *VirtualHost) PersistBinding(binding *binding.Binding)
PersistBinding store binding into server storage
func (*VirtualHost) RemoveBindings ¶
func (vhost *VirtualHost) RemoveBindings(bindings []*binding.Binding)
RemoveBindings remove given bindings from server storage
func (*VirtualHost) Stop ¶
func (vhost *VirtualHost) Stop() error
Stop properly stop virtual host TODO: properly stop confirm loop