Documentation
¶
Index ¶
- type AMQPChannel
- type AMQPConnection
- type Connection
- type Consumer
- type DefaultWorker
- type Handler
- type HandlerFunc
- type Middleware
- type Option
- func WithConsumeArgs(consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) Option
- func WithContext(ctx context.Context) Option
- func WithDeclareQueue(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) Option
- func WithExchange(exchange, routingKey string) Option
- func WithHandler(h Handler) Option
- func WithInitFunc(f func(conn AMQPConnection) (AMQPChannel, error)) Option
- func WithLogger(l logger.Logger) Option
- func WithNotify(stateCh chan State) Option
- func WithQos(prefetchCount int, global bool) Option
- func WithQueue(queue string) Option
- func WithRetryPeriod(dur time.Duration) Option
- func WithRetryPeriodFunc(durFunc func(retryCount int) time.Duration) Option
- func WithTmpQueue() Option
- func WithWorker(w Worker) Option
- type ParallelWorker
- type Ready
- type State
- type Unready
- type Worker
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQPChannel ¶
type AMQPChannel interface { Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) Qos(prefetchCount, prefetchSize int, global bool) error NotifyClose(receiver chan *amqp.Error) chan *amqp.Error NotifyCancel(c chan string) chan string QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error Close() error }
type AMQPConnection ¶
type AMQPConnection interface { }
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
func NewConnection ¶
func NewConnection(amqpConn AMQPConnection, closeCh chan struct{}) *Connection
func (*Connection) AMQPConnection ¶
func (c *Connection) AMQPConnection() AMQPConnection
func (*Connection) NotifyClose ¶
func (c *Connection) NotifyClose() chan struct{}
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) NotifyClosed ¶
func (c *Consumer) NotifyClosed() <-chan struct{}
type DefaultWorker ¶
type Handler ¶
func Wrap ¶
func Wrap(handler Handler, middlewares ...Middleware) Handler
Example ¶
package main import ( "context" "github.com/makasim/amqpextra/consumer" "github.com/makasim/amqpextra/consumer/middleware" amqp "github.com/rabbitmq/amqp091-go" ) func main() { // wrap a handler by some middlewares consumer.Wrap( consumer.HandlerFunc(func(ctx context.Context, msg amqp.Delivery) interface{} { // process message msg.Ack(false) return nil }), middleware.HasCorrelationID(), middleware.HasReplyTo(), ) }
Output:
type HandlerFunc ¶
type Middleware ¶
type Option ¶
type Option func(c *Consumer)
func WithConsumeArgs ¶
func WithContext ¶
func WithDeclareQueue ¶ added in v0.16.2
func WithExchange ¶ added in v0.16.0
func WithHandler ¶ added in v0.16.0
func WithInitFunc ¶
func WithInitFunc(f func(conn AMQPConnection) (AMQPChannel, error)) Option
func WithLogger ¶
func WithNotify ¶ added in v0.16.0
func WithRetryPeriod ¶
func WithRetryPeriodFunc ¶ added in v0.16.4
func WithTmpQueue ¶ added in v0.16.2
func WithTmpQueue() Option
func WithWorker ¶
type ParallelWorker ¶
func NewParallelWorker ¶
func NewParallelWorker(num int) *ParallelWorker
Directories
¶
Path | Synopsis |
---|---|
Package mock_consumer is a generated GoMock package.
|
Package mock_consumer is a generated GoMock package. |
Click to show internal directories.
Click to hide internal directories.