amqp

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2022 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RequestIDHeader = "requestId"
	AccountIDHeader = "accountId"
	UserIDHeader    = "userId"
)
View Source
const AddressConfigKey = "amqp-url"
View Source
const ReconnectDelay = 5 * time.Second

Variables

This section is empty.

Functions

func Flags

func Flags() *pflag.FlagSet

Types

type Binding

type Binding struct {
	// contains filtered or unexported fields
}

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(options ...Option) *Broker

func (*Broker) Consume

func (b *Broker) Consume(wg *sync.WaitGroup)

func (*Broker) Disconnect

func (b *Broker) Disconnect() error

func (*Broker) Initialize

func (b *Broker) Initialize() error

Initialize will setup the connections and declare all required amqp bindings for producers and consumers

func (*Broker) Publish

func (b *Broker) Publish(exchange, routingKey string, message *broker.Message) error

func (*Broker) Subscribe

func (b *Broker) Subscribe(exchange, routingKey string, handler broker.Handler) error

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is a wrapper for amqp.Connection but adding reconnection functionality.

func NewConnection

func NewConnection(addr string) *Connection

func (*Connection) Channel

func (c *Connection) Channel() (channel *amqp.Channel, err error)

func (*Connection) Connect

func (c *Connection) Connect() (err error)

Consume will dial to the specified AMQP server addr.

func (*Connection) IsConnected

func (c *Connection) IsConnected() bool

func (*Connection) SetName

func (c *Connection) SetName(name string)

func (*Connection) Shutdown

func (c *Connection) Shutdown()

Shutdown the reconnector and terminate any existing connections

func (*Connection) WaitForConnection

func (c *Connection) WaitForConnection()

type Declaration

type Declaration func(Declarator) error

func AutoBinding

func AutoBinding(routingKey, queue, exchange string) Declaration

func AutoExchange

func AutoExchange(name string) Declaration

func AutoQueue

func AutoQueue(name string) Declaration

func DeclareBinding

func DeclareBinding(b *Binding) Declaration

func DeclareExchange

func DeclareExchange(e *Exchange) Declaration

func DeclareQueue

func DeclareQueue(q *Queue) Declaration

type Declarator

type Declarator interface {
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
}

Declarator is implemented by amqp.Channel

type Delivery

type Delivery struct {
	amqp.Delivery
}

func (*Delivery) Ack

func (d *Delivery) Ack(multiple bool)

func (*Delivery) Nack

func (d *Delivery) Nack(multiple, requeue bool)

type Event

type Event struct {
	// contains filtered or unexported fields
}

func NewEvent

func NewEvent(queue, routingKey string, delivery amqp.Delivery) *Event

func (*Event) Ack

func (evt *Event) Ack()

func (*Event) Message

func (evt *Event) Message() *broker.Message

func (*Event) Nack

func (evt *Event) Nack(retry bool)

func (*Event) QueueName

func (evt *Event) QueueName() string

func (*Event) RoutingKey

func (evt *Event) RoutingKey() string

func (*Event) SetContext

func (evt *Event) SetContext(ctx context.Context)

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

type Option

type Option func(*Options)

func Address

func Address(address string) Option

func ConsumerName

func ConsumerName(name string) Option

func ConsumerQueue

func ConsumerQueue(queue string) Option

func PrefetchCount

func PrefetchCount(prefetch int) Option

type Options

type Options struct {
	Address         string
	PrefetchCount   int
	SubscriberQueue string
	ConsumerName    string
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL