Documentation ¶
Index ¶
- Constants
- func Flags() *pflag.FlagSet
- type Binding
- type Broker
- func (b *Broker) Consume(wg *sync.WaitGroup)
- func (b *Broker) Disconnect() error
- func (b *Broker) EnsureExchange(exchange string)
- func (b *Broker) HasConsumer() bool
- func (b *Broker) Initialize() error
- func (b *Broker) Publish(exchange, routingKey string, message *broker.Message) error
- func (b *Broker) Subscribe(exchange, routingKey string, handler broker.Handler) error
- type Connection
- type Declaration
- type Declarator
- type Delivery
- type Event
- type Exchange
- type Option
- type Options
- type Queue
Constants ¶
View Source
const ( RequestIDHeader = "requestId" AccountIDHeader = "accountId" UserIDHeader = "userId" )
View Source
const AddressConfigKey = "amqp-url"
View Source
const ReconnectDelay = 5 * time.Second
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func (*Broker) Disconnect ¶
func (*Broker) EnsureExchange ¶ added in v1.1.3
func (*Broker) HasConsumer ¶ added in v1.3.1
func (*Broker) Initialize ¶
Initialize will setup the connections and declare all required amqp bindings for producers and consumers
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection is a wrapper for amqp.Connection but adding reconnection functionality.
func NewConnection ¶
func NewConnection(addr string) *Connection
func (*Connection) Connect ¶
func (c *Connection) Connect() (err error)
Consume will dial to the specified AMQP server addr.
func (*Connection) IsConnected ¶
func (c *Connection) IsConnected() bool
func (*Connection) SetName ¶
func (c *Connection) SetName(name string)
func (*Connection) Shutdown ¶
func (c *Connection) Shutdown()
Shutdown the reconnector and terminate any existing connections
func (*Connection) WaitForConnection ¶
func (c *Connection) WaitForConnection()
type Declaration ¶
type Declaration func(Declarator) error
func AutoBinding ¶
func AutoBinding(routingKey, queue, exchange string) Declaration
func AutoExchange ¶
func AutoExchange(name string) Declaration
func AutoQueue ¶
func AutoQueue(name string) Declaration
func DeclareBinding ¶
func DeclareBinding(b *Binding) Declaration
func DeclareExchange ¶
func DeclareExchange(e *Exchange) Declaration
func DeclareQueue ¶
func DeclareQueue(q *Queue) Declaration
type Declarator ¶
type Declarator interface { QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error }
Declarator is implemented by amqp.Channel
type Event ¶
type Event struct {
// contains filtered or unexported fields
}
func (*Event) NackWithTimeout ¶ added in v1.7.4
func (*Event) RoutingKey ¶
func (*Event) SetContext ¶
type Option ¶
type Option func(*Options)
func ConsumerName ¶
func ConsumerQueue ¶
func PrefetchCount ¶
Click to show internal directories.
Click to hide internal directories.