Documentation ¶
Index ¶
- type AfterHandleFunc
- type AmqpConnectionConstructor
- type BeforeHandleFunc
- type Connection
- func (cn *Connection) Channel() (*amqp.Channel, error)
- func (cn *Connection) Close() error
- func (cn *Connection) Conn() *amqp.Connection
- func (cn *Connection) Connect(ctx context.Context) error
- func (cn *Connection) IsClosed() bool
- func (cn *Connection) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error
- func (cn *Connection) Schema() (*Schema, error)
- type ConnectionCfg
- type ConsumeParams
- type Consumer
- type ConsumerConfig
- type DeclareParams
- type DefaultMessageHandler
- func (dmh *DefaultMessageHandler) AfterHandle(ctx context.Context, msg *amqp.Delivery, channel *amqp.Channel, ...) error
- func (dmh *DefaultMessageHandler) BeforeHandle(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) error
- func (dmh *DefaultMessageHandler) DoMsgAction(msg *amqp.Delivery, action MsgAction) (err error)
- func (dmh *DefaultMessageHandler) Handle(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) (err error)
- type DelayedRetryMessageHandler
- type DeleteParams
- type ExchangeBindParams
- type ExchangeKind
- type ExchangeManager
- func (em *ExchangeManager) Bind(bindParams *ExchangeBindParams) (err error)
- func (em *ExchangeManager) BindMulti(bindParams ...*ExchangeBindParams) (err error)
- func (em *ExchangeManager) Declare(declareParams *DeclareParams) (err error)
- func (em *ExchangeManager) DeclareMulti(declareParams ...*DeclareParams) (err error)
- func (em *ExchangeManager) Delete(deleteParams *DeleteParams) (err error)
- func (em *ExchangeManager) DeleteMulti(deleteParams ...*DeleteParams) (err error)
- func (em *ExchangeManager) Unbind(bindParams *ExchangeBindParams) (err error)
- func (em *ExchangeManager) UnbindMulti(unbindParams ...*ExchangeBindParams) (err error)
- type HandleFunc
- type MessageHandler
- type MsgAction
- type Preset
- type PublishMessage
- type Publisher
- type PublisherConfig
- type QueueBindParams
- type QueueManager
- func (qs *QueueManager) Bind(bindParams *QueueBindParams) (err error)
- func (qs *QueueManager) BindMulti(bindParams ...*QueueBindParams) (err error)
- func (qs *QueueManager) Declare(declareParams *DeclareParams) (q amqp.Queue, err error)
- func (qs *QueueManager) DeclareMulti(declareParams ...*DeclareParams) (err error)
- func (qs *QueueManager) Delete(params *DeleteParams) (msgCount int, err error)
- func (qs *QueueManager) DeleteMulti(deleteParams ...*DeleteParams) (err error)
- func (qs *QueueManager) Inspect(name string) (q amqp.Queue, err error)
- func (qs *QueueManager) Purge(name string, noWait bool) (msgCount int, err error)
- func (qs *QueueManager) Unbind(bindParams *QueueBindParams) (err error)
- func (qs *QueueManager) UnbindMulti(bindParams ...*QueueBindParams) (err error)
- type Schema
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AfterHandleFunc ¶
type AfterHandleFunc func(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery, action MsgAction) error
AfterHandleFunc - see DefaultMessageHandler for usage
type AmqpConnectionConstructor ¶
type AmqpConnectionConstructor func() (*amqp.Connection, error)
AmqpConnectionConstructor - wrap method for amqp.Connection creation (cases for amqp.DialConfig or other methods)
type BeforeHandleFunc ¶
BeforeHandleFunc - see DefaultMessageHandler for usage
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection - wrapped connection struct
func NewConnection ¶
func NewConnection(ctx context.Context, constructor AmqpConnectionConstructor) *Connection
NewConnection - creates a new Connection
func NewDefaultConnection ¶
func NewDefaultConnection(ctx context.Context, dsn string) *Connection
NewDefaultConnection - creates new Connection instance with amqp.Dial method for connection inside
func (*Connection) Channel ¶
func (cn *Connection) Channel() (*amqp.Channel, error)
Channel - wrap for amqp.Connection Channel method
func (*Connection) Close ¶
func (cn *Connection) Close() error
Close - connection close wrapped method
func (*Connection) Connect ¶
func (cn *Connection) Connect(ctx context.Context) error
Connect - establish connection with rmq, context need for deadline/timeout stories
func (*Connection) IsClosed ¶
func (cn *Connection) IsClosed() bool
IsClosed - wrap for amqp.Connection IsClosed method
func (*Connection) NotifyClose ¶
func (cn *Connection) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error
NotifyClose - wrap for amqp.Connection NotifyClose method
func (*Connection) Schema ¶
func (cn *Connection) Schema() (*Schema, error)
Schema - creates a new schema object with new channel inside
type ConnectionCfg ¶
type ConnectionCfg struct { // ReconnectTimeout - period, when process try to establish connection again. ReconnectTimeout time.Duration }
ConnectionCfg - main connection config, is not used, because reconnect doesnt supported
type ConsumeParams ¶
type ConsumeParams struct {
Queue, Consumer string
AutoAck, Exclusive, NoLocal, NoWait bool
Args amqp.Table
}
ConsumeParams - wrapped amqp.Channel Consume method`s args
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer - instance for consuming process
func NewConsumer ¶
func NewConsumer(connection *Connection, cfg *ConsumerConfig) *Consumer
NewConsumer - creates new consumer with default params
func (*Consumer) StartWorker ¶
func (cnr *Consumer) StartWorker(ctx context.Context, params *ConsumeParams, handler MessageHandler) error
StartWorker - starts single consumer worker on a single queue
func (*Consumer) StartWorkersGroup ¶
func (cnr *Consumer) StartWorkersGroup(params *ConsumeParams, handler MessageHandler) (err error)
StartWorkersGroup - start group of workers for single queue
type ConsumerConfig ¶
type ConsumerConfig struct { // number of consuming workers WorkersCount int // run message handling in a single goroutine or in worker loop Synchronous bool }
ConsumerConfig - main consumer config
type DeclareParams ¶
type DeclareParams struct { // Primitive name Name string // Kind - for exchange only Kind ExchangeKind Durable, AutoDelete, NoWait, Passive bool // Internal - for exchange only Internal bool // Exclusive - for queue only Exclusive bool Args amqp.Table }
DeclareParams - common declare params
func (*DeclareParams) WithDeadLetterExchange ¶
func (dp *DeclareParams) WithDeadLetterExchange(name string) *DeclareParams
WithDeadLetterExchange - sets `x-dead-letter-exchange` param
func (*DeclareParams) WithDeadLetterRk ¶
func (dp *DeclareParams) WithDeadLetterRk(key string) *DeclareParams
WithDeadLetterRk - sets `x-dead-letter-routing-key` param
type DefaultMessageHandler ¶
type DefaultMessageHandler struct { // BeforeHandle - custom before handle func BeforeHandleFunc BeforeHandleFunc // HandleFunc - custom handle func HandleFunc HandleFunc // AfterHandleFunc - custom after handle func AfterHandleFunc AfterHandleFunc }
DefaultMessageHandler - default message handler
func NewDefaultMessageHandler ¶
func NewDefaultMessageHandler(handleFunc HandleFunc) *DefaultMessageHandler
NewDefaultMessageHandler - DefaultMessageHandler constructor with custom handle func as required value
func (*DefaultMessageHandler) AfterHandle ¶
func (dmh *DefaultMessageHandler) AfterHandle( ctx context.Context, msg *amqp.Delivery, channel *amqp.Channel, action MsgAction, ) error
AfterHandle - if AfterHandleFunc specified -> runs it, else check HandleResult for errors and ack or nack msg
func (*DefaultMessageHandler) BeforeHandle ¶
func (dmh *DefaultMessageHandler) BeforeHandle(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) error
BeforeHandle - if BeforeHandleFunc specified -> runs it
func (*DefaultMessageHandler) DoMsgAction ¶
func (dmh *DefaultMessageHandler) DoMsgAction(msg *amqp.Delivery, action MsgAction) (err error)
DoMsgAction - do ack or nack job with readed message
func (*DefaultMessageHandler) Handle ¶
func (dmh *DefaultMessageHandler) Handle(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) (err error)
Handle - main handle function, wraps result of HandleFunc to MessageResultContainer if HandleFunc is nil -> returns nil, msg will be nacked by AfterHandle event
type DelayedRetryMessageHandler ¶
type DelayedRetryMessageHandler struct { *DefaultMessageHandler // DelayQueueRoutingKey - queue routing key for delayed messages resend DelayQueueRoutingKey string // DelayExchangeName - exchange name for delayed messages resend DelayExchangeName string // Delay - time interval for message expiration in delay queue Delay time.Duration // MaxRetriesCount - maximum count of retires before message will be rejected MaxRetriesCount int64 }
DelayedRetryMessageHandler - extended DefaultMessageHandler with delay logic
func NewDelayedRetryMessageHandler ¶
func NewDelayedRetryMessageHandler( delayExchangeName, delayRk string, delay time.Duration, retriesCount int64, handleFunc HandleFunc, ) *DelayedRetryMessageHandler
NewDelayedRetryMessageHandler - DefaultMessageHandler constructor with custom handle func as required value
func (*DelayedRetryMessageHandler) Handle ¶
func (fmh *DelayedRetryMessageHandler) Handle(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) (err error)
Handle - redeclared DefaultMessageHandler.Handle method, main difference in error handling logic if HandleFunc returns err, by default message will be rejected but if message has x-death header and count < MaxRetriesCount -> message will be acknowledged and resented to delay queue
type DeleteParams ¶
DeleteParams - common deletion params
type ExchangeBindParams ¶
ExchangeBindParams - amqp.Channel().ExchangeBind(...) params
type ExchangeKind ¶
type ExchangeKind string
ExchangeKind - type for exchange kind
const ( // DirectExchange - kind direct DirectExchange ExchangeKind = "direct" // FanoutExchange - kind fanout FanoutExchange ExchangeKind = "fanout" // TopicExchange - kind topic TopicExchange ExchangeKind = "topic" // HeadersExchange - kind headers HeadersExchange ExchangeKind = "headers" )
func (ExchangeKind) String ¶
func (ek ExchangeKind) String() string
String - conversion ExchangeKind to string
type ExchangeManager ¶
type ExchangeManager struct {
// contains filtered or unexported fields
}
ExchangeManager - exchanges manager
func NewExchangeManager ¶
func NewExchangeManager(channel *amqp.Channel) *ExchangeManager
NewExchangeManager - ExchangeManager constructor
func (*ExchangeManager) Bind ¶
func (em *ExchangeManager) Bind(bindParams *ExchangeBindParams) (err error)
Bind - bind exchange
func (*ExchangeManager) BindMulti ¶
func (em *ExchangeManager) BindMulti(bindParams ...*ExchangeBindParams) (err error)
BindMulti - binds more than one exchange
func (*ExchangeManager) Declare ¶
func (em *ExchangeManager) Declare(declareParams *DeclareParams) (err error)
Declare - declare exchange
func (*ExchangeManager) DeclareMulti ¶
func (em *ExchangeManager) DeclareMulti(declareParams ...*DeclareParams) (err error)
DeclareMulti - declares more than one exchange
func (*ExchangeManager) Delete ¶
func (em *ExchangeManager) Delete(deleteParams *DeleteParams) (err error)
Delete - deletes exchange
func (*ExchangeManager) DeleteMulti ¶
func (em *ExchangeManager) DeleteMulti(deleteParams ...*DeleteParams) (err error)
DeleteMulti - delete more than one exchanges
func (*ExchangeManager) Unbind ¶
func (em *ExchangeManager) Unbind(bindParams *ExchangeBindParams) (err error)
Unbind - unbind exchange
func (*ExchangeManager) UnbindMulti ¶
func (em *ExchangeManager) UnbindMulti(unbindParams ...*ExchangeBindParams) (err error)
UnbindMulti - unbind more than one exchange
type HandleFunc ¶
type HandleFunc func(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) (MsgAction, error)
HandleFunc - see DefaultMessageHandler for usage
type MessageHandler ¶
type MessageHandler interface {
Handle(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) error
}
MessageHandler - Base message handler interface
type MsgAction ¶
type MsgAction int
MsgAction - type for declaring action after msg handle (ack, nack, etc)
type PublishMessage ¶
type PublishMessage struct {
ExchangeName, RoutingKey string
Mandatory, Immediate bool
Publishing amqp.Publishing
}
PublishMessage - struct with params from amqp.Channel().Publish(...) method
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher - struct of publisher
func NewPublisher ¶
func NewPublisher(connection *Connection, cfg *PublisherConfig) *Publisher
NewPublisher - publisher constructor
func (*Publisher) Close ¶
func (p *Publisher) Close()
Close - closes active connection and channels pool
type PublisherConfig ¶
type PublisherConfig struct { // MaxChannelsCount max channels count MaxChannelsCount int32 // CleanUp interval - task for closing idle channels in pool CleanUpInterval time.Duration // Max idle time per one channel. Set this param smaller than CleanUpInterval MaxIdleTime time.Duration }
PublisherConfig - main publisher config
type QueueBindParams ¶
QueueBindParams - amqp.Channel().QueueBind(...) params
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager - queue manager
func NewQueueManager ¶
func NewQueueManager(channel *amqp.Channel) *QueueManager
NewQueueManager - QueueManager constructor
func (*QueueManager) Bind ¶
func (qs *QueueManager) Bind(bindParams *QueueBindParams) (err error)
Bind - bind queue
func (*QueueManager) BindMulti ¶
func (qs *QueueManager) BindMulti(bindParams ...*QueueBindParams) (err error)
BindMulti - bind more than one queue
func (*QueueManager) Declare ¶
func (qs *QueueManager) Declare(declareParams *DeclareParams) (q amqp.Queue, err error)
Declare - declare queue
func (*QueueManager) DeclareMulti ¶
func (qs *QueueManager) DeclareMulti(declareParams ...*DeclareParams) (err error)
DeclareMulti - declares more than one queue
func (*QueueManager) Delete ¶
func (qs *QueueManager) Delete(params *DeleteParams) (msgCount int, err error)
Delete - deletes queue
func (*QueueManager) DeleteMulti ¶
func (qs *QueueManager) DeleteMulti(deleteParams ...*DeleteParams) (err error)
DeleteMulti - deletes for than one queue
func (*QueueManager) Inspect ¶
func (qs *QueueManager) Inspect(name string) (q amqp.Queue, err error)
Inspect - amqp.Channel().QueueInspect(...) wrap
func (*QueueManager) Purge ¶
func (qs *QueueManager) Purge(name string, noWait bool) (msgCount int, err error)
Purge - amqp.Channel().QueuePurge() wrap
func (*QueueManager) Unbind ¶
func (qs *QueueManager) Unbind(bindParams *QueueBindParams) (err error)
Unbind - unbind queue
func (*QueueManager) UnbindMulti ¶
func (qs *QueueManager) UnbindMulti(bindParams ...*QueueBindParams) (err error)
UnbindMulti - unbind more than one queue
type Schema ¶
type Schema struct { Queue *QueueManager Exchange *ExchangeManager // contains filtered or unexported fields }
Schema - struct for manager`s access
func (*Schema) ApplyPresets ¶
ApplyPresets - apply presets to schema