rmq

package module
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2022 License: Unlicense Imports: 9 Imported by: 0

README

Go Report Card Go Reference

RMQ:

Library provide some wrappers of github.com/rabbitmq/amqp091-go

Connection:

connection := rmq.NewDefaultConnection(context.Background(), "amqp://test:test@localhost:5672")
// or use rmq.NewConnection with callback for construct connection with options
err := connection.Connect(context.TODO())

if err != nil {
	log.Fatal(err)
}

Working with schema:

schema, err := connection.Schema() // creates a new schema with separate channel inside
if err != nil {
	log.Fatal(err)
}

// declare an exchange
err = schema.Exchange.Declare(&rmq.DeclareParams{Name: "test-exchange", Kind: rmq.DirectExchange})
if err != nil {
	log.Fatal(err)
}

// declare two queue
err = schema.Queue.DeclareMulti(&rmq.DeclareParams{Name: "test-q1"}, &rmq.DeclareParams{Name: "test-q2"})
if err != nil {
	log.Fatal(err)
}
// bind queues to exchange
err = schema.Queue.BindMulti(
	&rmq.QueueBindParams{Name: "test-q1", Key: "rk1", Exchange: "test-exchange"},
	&rmq.QueueBindParams{Name: "test-q2", Key: "rk1", Exchange: "test-exchange"},
)

if err != nil {
	log.Fatal(err)
}

Consumer:

consumer := rmq.NewConsumer(connection, &rmq.ConsumerConfig{
		WorkersCount: 3, // 3 workers goroutine will be started
		Synchronous:  false, // run handler in single goroutine or not
	})
//define a message handler (use defaults or write own)
handler := rmq.NewDefaultMessageHandler(func(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) (rmq.MsgAction, error) {
	fmt.Println(msg.Body)
	return rmq.ActionAck, nil
})
// start worker
err := consumer.StartWorkersGroup(&rmq.ConsumeParams{Queue: "test"}, handler)
// or use consumer.StartWorker(...) for single consuming process
if err != nil {
	log.Fatal(err)
}

Publisher:

// create a new publisher instance
publisher := rmq.NewPublisher(connection, &rmq.PublisherConfig{
	MaxChannelsCount: 10, // max pool channels count
})
err = publisher.Init()

if err != nil {
	log.Fatal(err)
}
err = publisher.Publish(context.TODO(), &rmq.PublishMessage{
				ExchangeName: "main_exchange",
				RoutingKey:   "main",
				Publishing: amqp.Publishing{
					ContentType: "application/octet-stream",
					Body:        []byte("test test"),
				},
})

if err != nil {
	log.Fatal(err)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AfterHandleFunc

type AfterHandleFunc func(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery, action MsgAction) error

AfterHandleFunc - see DefaultMessageHandler for usage

type AmqpConnectionConstructor

type AmqpConnectionConstructor func() (*amqp.Connection, error)

AmqpConnectionConstructor - wrap method for amqp.Connection creation (cases for amqp.DialConfig or other methods)

type BeforeHandleFunc

type BeforeHandleFunc func(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) error

BeforeHandleFunc - see DefaultMessageHandler for usage

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection - wrapped connection struct

func NewConnection

func NewConnection(ctx context.Context, constructor AmqpConnectionConstructor) *Connection

NewConnection - creates a new Connection

func NewDefaultConnection

func NewDefaultConnection(ctx context.Context, dsn string) *Connection

NewDefaultConnection - creates new Connection instance with amqp.Dial method for connection inside

func (*Connection) Channel

func (cn *Connection) Channel() (*amqp.Channel, error)

Channel - wrap for amqp.Connection Channel method

func (*Connection) Close

func (cn *Connection) Close() error

Close - connection close wrapped method

func (*Connection) Conn

func (cn *Connection) Conn() *amqp.Connection

Conn - connection getter

func (*Connection) Connect

func (cn *Connection) Connect(ctx context.Context) error

Connect - establish connection with rmq, context need for deadline/timeout stories

func (*Connection) IsClosed

func (cn *Connection) IsClosed() bool

IsClosed - wrap for amqp.Connection IsClosed method

func (*Connection) NotifyClose

func (cn *Connection) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error

NotifyClose - wrap for amqp.Connection NotifyClose method

func (*Connection) Schema

func (cn *Connection) Schema() (*Schema, error)

Schema - creates a new schema object with new channel inside

type ConnectionCfg

type ConnectionCfg struct {
	// ReconnectTimeout - period, when process try to establish connection again.
	ReconnectTimeout time.Duration
}

ConnectionCfg - main connection config, is not used, because reconnect doesnt supported

type ConsumeParams

type ConsumeParams struct {
	Queue, Consumer                     string
	AutoAck, Exclusive, NoLocal, NoWait bool
	Args                                amqp.Table
}

ConsumeParams - wrapped amqp.Channel Consume method`s args

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer - instance for consuming process

func NewConsumer

func NewConsumer(connection *Connection, cfg *ConsumerConfig) *Consumer

NewConsumer - creates new consumer with default params

func (*Consumer) StartWorker

func (cnr *Consumer) StartWorker(ctx context.Context, params *ConsumeParams, handler MessageHandler) error

StartWorker - starts single consumer worker on a single queue

func (*Consumer) StartWorkersGroup

func (cnr *Consumer) StartWorkersGroup(params *ConsumeParams, handler MessageHandler) (err error)

StartWorkersGroup - start group of workers for single queue

type ConsumerConfig

type ConsumerConfig struct {
	// number of consuming workers
	WorkersCount int
	// run message handling in a single goroutine or in worker loop
	Synchronous bool
}

ConsumerConfig - main consumer config

type DeclareParams

type DeclareParams struct {
	// Primitive name
	Name string
	// Kind - for exchange only
	Kind                                 ExchangeKind
	Durable, AutoDelete, NoWait, Passive bool
	// Internal - for exchange only
	Internal bool
	// Exclusive - for queue only
	Exclusive bool
	Args      amqp.Table
}

DeclareParams - common declare params

func (*DeclareParams) WithDeadLetterExchange

func (dp *DeclareParams) WithDeadLetterExchange(name string) *DeclareParams

WithDeadLetterExchange - sets `x-dead-letter-exchange` param

func (*DeclareParams) WithDeadLetterRk

func (dp *DeclareParams) WithDeadLetterRk(key string) *DeclareParams

WithDeadLetterRk - sets `x-dead-letter-routing-key` param

type DefaultMessageHandler

type DefaultMessageHandler struct {
	// BeforeHandle - custom before handle func
	BeforeHandleFunc BeforeHandleFunc
	// HandleFunc - custom handle func
	HandleFunc HandleFunc
	// AfterHandleFunc - custom after handle func
	AfterHandleFunc AfterHandleFunc
}

DefaultMessageHandler - default message handler

func NewDefaultMessageHandler

func NewDefaultMessageHandler(handleFunc HandleFunc) *DefaultMessageHandler

NewDefaultMessageHandler - DefaultMessageHandler constructor with custom handle func as required value

func (*DefaultMessageHandler) AfterHandle

func (dmh *DefaultMessageHandler) AfterHandle(
	ctx context.Context,
	msg *amqp.Delivery,
	channel *amqp.Channel,
	action MsgAction,
) error

AfterHandle - if AfterHandleFunc specified -> runs it, else check HandleResult for errors and ack or nack msg

func (*DefaultMessageHandler) BeforeHandle

func (dmh *DefaultMessageHandler) BeforeHandle(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) error

BeforeHandle - if BeforeHandleFunc specified -> runs it

func (*DefaultMessageHandler) DoMsgAction

func (dmh *DefaultMessageHandler) DoMsgAction(msg *amqp.Delivery, action MsgAction) (err error)

DoMsgAction - do ack or nack job with readed message

func (*DefaultMessageHandler) Handle

func (dmh *DefaultMessageHandler) Handle(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) (err error)

Handle - main handle function, wraps result of HandleFunc to MessageResultContainer if HandleFunc is nil -> returns nil, msg will be nacked by AfterHandle event

type DelayedRetryMessageHandler

type DelayedRetryMessageHandler struct {
	*DefaultMessageHandler
	// DelayQueueRoutingKey - queue routing key for delayed messages resend
	DelayQueueRoutingKey string
	// DelayExchangeName - exchange name for delayed messages resend
	DelayExchangeName string
	// Delay - time interval for message expiration in delay queue
	Delay time.Duration
	// MaxRetriesCount - maximum count of retires before message will be rejected
	MaxRetriesCount int64
}

DelayedRetryMessageHandler - extended DefaultMessageHandler with delay logic

func NewDelayedRetryMessageHandler

func NewDelayedRetryMessageHandler(
	delayExchangeName, delayRk string,
	delay time.Duration,
	retriesCount int64,
	handleFunc HandleFunc,
) *DelayedRetryMessageHandler

NewDelayedRetryMessageHandler - DefaultMessageHandler constructor with custom handle func as required value

func (*DelayedRetryMessageHandler) Handle

func (fmh *DelayedRetryMessageHandler) Handle(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) (err error)

Handle - redeclared DefaultMessageHandler.Handle method, main difference in error handling logic if HandleFunc returns err, by default message will be rejected but if message has x-death header and count < MaxRetriesCount -> message will be acknowledged and resented to delay queue

type DeleteParams

type DeleteParams struct {
	Name                      string
	IfUnused, IfEmpty, NoWait bool
}

DeleteParams - common deletion params

type ExchangeBindParams

type ExchangeBindParams struct {
	Destination, Key, Source string
	NoWait                   bool
	Args                     amqp.Table
}

ExchangeBindParams - amqp.Channel().ExchangeBind(...) params

type ExchangeKind

type ExchangeKind string

ExchangeKind - type for exchange kind

const (
	// DirectExchange - kind direct
	DirectExchange ExchangeKind = "direct"
	// FanoutExchange - kind fanout
	FanoutExchange ExchangeKind = "fanout"
	// TopicExchange  - kind topic
	TopicExchange ExchangeKind = "topic"
	// HeadersExchange - kind headers
	HeadersExchange ExchangeKind = "headers"
)

func (ExchangeKind) String

func (ek ExchangeKind) String() string

String - conversion ExchangeKind to string

type ExchangeManager

type ExchangeManager struct {
	// contains filtered or unexported fields
}

ExchangeManager - exchanges manager

func NewExchangeManager

func NewExchangeManager(channel *amqp.Channel) *ExchangeManager

NewExchangeManager - ExchangeManager constructor

func (*ExchangeManager) Bind

func (em *ExchangeManager) Bind(bindParams *ExchangeBindParams) (err error)

Bind - bind exchange

func (*ExchangeManager) BindMulti

func (em *ExchangeManager) BindMulti(bindParams ...*ExchangeBindParams) (err error)

BindMulti - binds more than one exchange

func (*ExchangeManager) Declare

func (em *ExchangeManager) Declare(declareParams *DeclareParams) (err error)

Declare - declare exchange

func (*ExchangeManager) DeclareMulti

func (em *ExchangeManager) DeclareMulti(declareParams ...*DeclareParams) (err error)

DeclareMulti - declares more than one exchange

func (*ExchangeManager) Delete

func (em *ExchangeManager) Delete(deleteParams *DeleteParams) (err error)

Delete - deletes exchange

func (*ExchangeManager) DeleteMulti

func (em *ExchangeManager) DeleteMulti(deleteParams ...*DeleteParams) (err error)

DeleteMulti - delete more than one exchanges

func (*ExchangeManager) Unbind

func (em *ExchangeManager) Unbind(bindParams *ExchangeBindParams) (err error)

Unbind - unbind exchange

func (*ExchangeManager) UnbindMulti

func (em *ExchangeManager) UnbindMulti(unbindParams ...*ExchangeBindParams) (err error)

UnbindMulti - unbind more than one exchange

type HandleFunc

type HandleFunc func(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) (MsgAction, error)

HandleFunc - see DefaultMessageHandler for usage

type MessageHandler

type MessageHandler interface {
	Handle(ctx context.Context, channel *amqp.Channel, msg *amqp.Delivery) error
}

MessageHandler - Base message handler interface

type MsgAction

type MsgAction int

MsgAction - type for declaring action after msg handle (ack, nack, etc)

const (
	// ActionAck - acknowledge message
	ActionAck MsgAction = 1 + iota
	// ActionNack - not acknowledge without requeue
	ActionNack
	// ActionRequeue - not acknowledge with requeue
	ActionRequeue
	// ActionReject - reject message
	ActionReject
)

type Preset

type Preset interface {
	Apply(*amqp.Channel, *Schema) error
}

Preset - interface for apply any set of params to schema

type PublishMessage

type PublishMessage struct {
	ExchangeName, RoutingKey string
	Mandatory, Immediate     bool
	Publishing               amqp.Publishing
}

PublishMessage - struct with params from amqp.Channel().Publish(...) method

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher - struct of publisher

func NewPublisher

func NewPublisher(connection *Connection, cfg *PublisherConfig) *Publisher

NewPublisher - publisher constructor

func (*Publisher) Close

func (p *Publisher) Close()

Close - closes active connection and channels pool

func (*Publisher) Init

func (p *Publisher) Init() error

Init - runs background tasks and init a pool first element

func (*Publisher) Pool

func (p *Publisher) Pool() *puddle.Pool

Pool - channels pool getter

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, msg *PublishMessage) error

Publish - publish a message to exchange

type PublisherConfig

type PublisherConfig struct {
	// MaxChannelsCount max channels count
	MaxChannelsCount int32
	// CleanUp interval - task for closing idle channels in pool
	CleanUpInterval time.Duration
	// Max idle time per one channel. Set this param smaller than CleanUpInterval
	MaxIdleTime time.Duration
}

PublisherConfig - main publisher config

type QueueBindParams

type QueueBindParams struct {
	Name, Key, Exchange string
	NoWait              bool
	Args                amqp.Table
}

QueueBindParams - amqp.Channel().QueueBind(...) params

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager - queue manager

func NewQueueManager

func NewQueueManager(channel *amqp.Channel) *QueueManager

NewQueueManager - QueueManager constructor

func (*QueueManager) Bind

func (qs *QueueManager) Bind(bindParams *QueueBindParams) (err error)

Bind - bind queue

func (*QueueManager) BindMulti

func (qs *QueueManager) BindMulti(bindParams ...*QueueBindParams) (err error)

BindMulti - bind more than one queue

func (*QueueManager) Declare

func (qs *QueueManager) Declare(declareParams *DeclareParams) (q amqp.Queue, err error)

Declare - declare queue

func (*QueueManager) DeclareMulti

func (qs *QueueManager) DeclareMulti(declareParams ...*DeclareParams) (err error)

DeclareMulti - declares more than one queue

func (*QueueManager) Delete

func (qs *QueueManager) Delete(params *DeleteParams) (msgCount int, err error)

Delete - deletes queue

func (*QueueManager) DeleteMulti

func (qs *QueueManager) DeleteMulti(deleteParams ...*DeleteParams) (err error)

DeleteMulti - deletes for than one queue

func (*QueueManager) Inspect

func (qs *QueueManager) Inspect(name string) (q amqp.Queue, err error)

Inspect - amqp.Channel().QueueInspect(...) wrap

func (*QueueManager) Purge

func (qs *QueueManager) Purge(name string, noWait bool) (msgCount int, err error)

Purge - amqp.Channel().QueuePurge() wrap

func (*QueueManager) Unbind

func (qs *QueueManager) Unbind(bindParams *QueueBindParams) (err error)

Unbind - unbind queue

func (*QueueManager) UnbindMulti

func (qs *QueueManager) UnbindMulti(bindParams ...*QueueBindParams) (err error)

UnbindMulti - unbind more than one queue

type Schema

type Schema struct {
	Queue    *QueueManager
	Exchange *ExchangeManager
	// contains filtered or unexported fields
}

Schema - struct for manager`s access

func GetSchema

func GetSchema(channel *amqp.Channel) *Schema

GetSchema - creates a new Schema instance

func (*Schema) ApplyPresets

func (sc *Schema) ApplyPresets(presets ...Preset) (err error)

ApplyPresets - apply presets to schema

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL