channel

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channeler

type Channeler interface {
	Channel() (*amqp.Channel, error)
}

Channeler creates new channel. Implemented by amqp.Connection and connection.Redialer.

type Option

type Option func(r *Reopener)

Option to configure Reopener.

func WithBackoff

func WithBackoff(bo backoff.BackOff) Option

WithBackoff sets backoff function for reconnection.

func WithOpenCallback

func WithOpenCallback(fn func(channel *amqp.Channel) error) Option

WithOpenCallback sets callback function on channel opening. Given function will receive a newly created channel. Could be used to set up QoS or listeners for various evens (NotifyClose, NotifyFlow, etc.) after channel opening.

func WithQOS

func WithQOS(prefetchCount, prefetchSize int, global bool) Option

WithQOS creates new connection callback which sets channel's QOS on each channel creation.

type Reopener

type Reopener struct {
	*amqp.Channel
	// contains filtered or unexported fields
}

Reopener wraps channel to add re-open capabilities. It re-opens channel on channel errors, but closes on graceful shutdown. All methods of the regular channel are also available.

func New

func New(conn Channeler, ops ...Option) (*Reopener, error)

New creates new Reopener with channel re-open capabilities. Accepts additional options, like setting up QoS and registering notifications for events.

func (*Reopener) Consume

func (r *Reopener) Consume(
	queue, consumer string,
	autoAck, exclusive, noLocal, noWait bool,
	args amqp.Table,
) <-chan amqp.Delivery

Consume consumes with re-open of the channel. Provides constant flow of the deliveries. On graceful closing of the channel, will deliver all remaining deliveries and exit.

func (*Reopener) NotifyConsume

func (r *Reopener) NotifyConsume(c chan error) <-chan error

NotifyConsume registers a listener for start of the consuming process. Will send result of each start of the consuming process, so it is possible to catch all restarting of consuming. Channel will be closed on graceful shutdown.

func (*Reopener) NotifyReopen

func (r *Reopener) NotifyReopen(c chan error) <-chan error

NotifyReopen registers a listener for re-open attempts. Will send reopen result for each opening attempt. Channel will be closed on graceful shutdown.

func (*Reopener) PublishWithContext

func (r *Reopener) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error

PublishWithContext checks if channel is closed and re-opens if needed. Useful to reliably publish events even after channel errors (which closes channel).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL