Documentation ¶
Index ¶
- Constants
- func ExtractAMQPHeaders(ctx context.Context, headers map[string]interface{}) context.Context
- func Flags() *pflag.FlagSet
- func InjectAMQPHeaders(ctx context.Context) map[string]interface{}
- type Binding
- type Broker
- func (b *Broker) Consume(wg *sync.WaitGroup)
- func (b *Broker) Disconnect() error
- func (b *Broker) EnsureExchange(exchange string)
- func (b *Broker) HasConsumer() bool
- func (b *Broker) Initialize() error
- func (b *Broker) Publish(exchange, routingKey string, message *broker.Message) error
- func (b *Broker) Subscribe(exchange, routingKey string, handler broker.Handler) error
- type Connection
- type Declaration
- type Declarator
- type Delivery
- type Event
- type Exchange
- type HeadersCarrier
- type Option
- type Options
- type Queue
Constants ¶
View Source
const ( RequestIDHeader = "requestId" AccountIDHeader = "accountId" UserIDHeader = "userId" InternalHeader = "internal" )
View Source
const AddressConfigKey = "amqp-url"
View Source
const ReconnectDelay = 5 * time.Second
Variables ¶
This section is empty.
Functions ¶
func ExtractAMQPHeaders ¶ added in v1.8.1
ExtractAMQPHeaders extracts the tracing from the header and puts it into the context
func InjectAMQPHeaders ¶ added in v1.8.1
InjectAMQPHeaders injects the tracing from the context into the header map
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func (*Broker) Disconnect ¶
func (*Broker) EnsureExchange ¶ added in v1.1.3
func (*Broker) HasConsumer ¶ added in v1.3.1
func (*Broker) Initialize ¶
Initialize will setup the connections and declare all required amqp bindings for producers and consumers
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection is a wrapper for amqp.Connection but adding reconnection functionality.
func NewConnection ¶
func NewConnection(addr string) *Connection
func (*Connection) Connect ¶
func (c *Connection) Connect() (err error)
Consume will dial to the specified AMQP server addr.
func (*Connection) IsConnected ¶
func (c *Connection) IsConnected() bool
func (*Connection) SetName ¶
func (c *Connection) SetName(name string)
func (*Connection) Shutdown ¶
func (c *Connection) Shutdown()
Shutdown the reconnector and terminate any existing connections
func (*Connection) WaitForConnection ¶
func (c *Connection) WaitForConnection()
type Declaration ¶
type Declaration func(Declarator) error
func AutoBinding ¶
func AutoBinding(routingKey, queue, exchange string) Declaration
func AutoExchange ¶
func AutoExchange(name string) Declaration
func AutoQueue ¶
func AutoQueue(name string) Declaration
func DeclareBinding ¶
func DeclareBinding(b *Binding) Declaration
func DeclareExchange ¶
func DeclareExchange(e *Exchange) Declaration
func DeclareQueue ¶
func DeclareQueue(q *Queue) Declaration
type Declarator ¶
type Declarator interface { QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error }
Declarator is implemented by amqp.Channel
type Event ¶
type Event struct {
// contains filtered or unexported fields
}
func (*Event) NackWithTimeout ¶ added in v1.7.4
func (*Event) RoutingKey ¶
func (*Event) SetContext ¶
type HeadersCarrier ¶ added in v1.8.1
type HeadersCarrier map[string]interface{}
func (HeadersCarrier) Get ¶ added in v1.8.1
func (a HeadersCarrier) Get(key string) string
func (HeadersCarrier) Keys ¶ added in v1.8.1
func (a HeadersCarrier) Keys() []string
func (HeadersCarrier) Set ¶ added in v1.8.1
func (a HeadersCarrier) Set(key string, value string)
type Option ¶
type Option func(*Options)
func ConsumerName ¶
func ConsumerQueue ¶
func PrefetchCount ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.