Documentation ¶
Index ¶
- type AMQPChannel
- type AMQPConnection
- type Connection
- type Consumer
- type DefaultWorker
- type Handler
- type HandlerFunc
- type Middleware
- type Option
- func WithConsumeArgs(consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) Option
- func WithContext(ctx context.Context) Option
- func WithDeclareQueue(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) Option
- func WithExchange(exchange, routingKey string) Option
- func WithHandler(h Handler) Option
- func WithInitFunc(f func(conn AMQPConnection) (AMQPChannel, error)) Option
- func WithLogger(l logger.Logger) Option
- func WithNotify(stateCh chan State) Option
- func WithQos(prefetchCount int, global bool) Option
- func WithQueue(queue string) Option
- func WithRetryPeriod(dur time.Duration) Option
- func WithRetryPeriodFunc(durFunc func(retryCount int) time.Duration) Option
- func WithTmpQueue() Option
- func WithWorker(w Worker) Option
- type ParallelWorker
- type Ready
- type State
- type Unready
- type Worker
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQPChannel ¶
type AMQPChannel interface { Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) Qos(prefetchCount, prefetchSize int, global bool) error NotifyClose(receiver chan *amqp.Error) chan *amqp.Error NotifyCancel(c chan string) chan string QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error Cancel(consumer string, noWait bool) error Close() error }
type AMQPConnection ¶
type AMQPConnection interface { }
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
func NewConnection ¶
func NewConnection(amqpConn AMQPConnection, closeCh chan struct{}) *Connection
func (*Connection) AMQPConnection ¶
func (c *Connection) AMQPConnection() AMQPConnection
func (*Connection) NotifyClose ¶
func (c *Connection) NotifyClose() chan struct{}
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) NotifyClosed ¶
func (c *Consumer) NotifyClosed() <-chan struct{}
type DefaultWorker ¶
type Handler ¶
func Wrap ¶
func Wrap(handler Handler, middlewares ...Middleware) Handler
Example ¶
package main import ( "context" "github.com/latifrons/amqpextra/consumer" "github.com/latifrons/amqpextra/consumer/middleware" amqp "github.com/rabbitmq/amqp091-go" ) func main() { // wrap a handler by some middlewares consumer.Wrap( consumer.HandlerFunc(func(ctx context.Context, msg amqp.Delivery) interface{} { // process message msg.Ack(false) return nil }), middleware.HasCorrelationID(), middleware.HasReplyTo(), ) }
Output:
type HandlerFunc ¶
type Middleware ¶
type Option ¶
type Option func(c *Consumer)
func WithConsumeArgs ¶
func WithContext ¶
func WithDeclareQueue ¶
func WithExchange ¶
func WithHandler ¶
func WithInitFunc ¶
func WithInitFunc(f func(conn AMQPConnection) (AMQPChannel, error)) Option
func WithLogger ¶
func WithNotify ¶
func WithRetryPeriod ¶
func WithRetryPeriodFunc ¶
func WithTmpQueue ¶
func WithTmpQueue() Option
func WithWorker ¶
type ParallelWorker ¶
func NewParallelWorker ¶
func NewParallelWorker(num int) *ParallelWorker
Directories ¶
Path | Synopsis |
---|---|
Package mock_consumer is a generated GoMock package.
|
Package mock_consumer is a generated GoMock package. |
Click to show internal directories.
Click to hide internal directories.