amqp

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2023 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBroker

func NewBroker(config Config) (broker.IBroker, error)

Types

type Broker

type Broker struct {
	*broker.Broker
	// contains filtered or unexported fields
}

func (*Broker) Close

func (b *Broker) Close() error

func (*Broker) Send

func (b *Broker) Send(ctx context.Context, name string, value interface{}) error

func (*Broker) SendDelay

func (b *Broker) SendDelay(ctx context.Context, name string, value interface{}, delay time.Duration) error

func (*Broker) Worker

func (b *Broker) Worker() error

type Config

type Config struct {
	// URL This specification defines an "amqp" URI scheme.
	// example amqp://user:pass@host:5672/vhost
	// The "amqp" URI scheme: https://www.rabbitmq.com/uri-spec.html
	URL string
	// TLSClientConfig specifies the client configuration of the TLS connection
	// when establishing a tls transport.
	// If the URL uses an amqp scheme, then an empty tls.Config with the
	// ServerName from the URL is used.
	TLSClientConfig *tls.Config
	// Queue name
	Queue string
	// Exchange default queue name
	Exchange string
	// ExchangeType default "direct".
	// The common types are "direct", "fanout", "topic" and "headers".
	ExchangeType string
	// RoutingKey default queue name
	RoutingKey string
	// When reconnecting to the server after connection failure, default 5s
	ReconnectDelay time.Duration
	// When setting up the channel after a channel exception, default 2s
	ReInitDelay time.Duration
	// When resending messages the server didn't confirm, default 5s
	ResendDelay time.Duration
	// Whether to enable "RabbitMQ Delayed Message Plugin"
	// When enabled, delayed messages will be delivered by plug-in
	// Need RabbitMQ Enabling the Plugin: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
	DelayedMessagePlugin bool
	// The maximum number of times the message is re-consumed. default 16 times.
	RetryMaxReconsume uint8
	//The Duration of backoff to apply between retries. default 2^retry*100ms
	RetryBackoff func(retry uint8) time.Duration
	// Custom codec
	Codec codec.Codec
	// Define the concurrency number of worker processes, default runtime.NumCPU()*2
	Concurrency int
	// A Logger represents an active logging object that generates lines of output to an io.Writer
	Logger log.ILogger
}

Config is used in DialConfig and Open to specify the desired tuning parameters used during a connection open handshake. The negotiated tuning will be stored in the returned connection's Config field.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(s *Session) *Consumer

func (*Consumer) GetBuffer

func (c *Consumer) GetBuffer() chan amqp.Delivery

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session This exports a Session object that wraps this library. It automatically reconnects when the connection fails, and blocks all pushes until the connection succeeds. It also confirms every outgoing message, so none are lost. It doesn't automatically ack each message, but leaves that to the parent process, since it is usage-dependent.

Try running this in one terminal, and `rabbitmq-server` in another. Stop & restart RabbitMQ to see how the queue reacts.

func NewSession

func NewSession(config *Config) *Session

NewSession creates a new consumer state instance, and automatically attempts to connect to the server.

func (*Session) Close

func (s *Session) Close() error

Close will cleanly shut down the channel and connection.

func (*Session) Push

func (s *Session) Push(ctx context.Context, data []byte, delayMs int64) error

Push will push data onto the queue, and wait for a confirm. If no confirms are received until within the resendTimeout, it continuously re-sends messages until a confirm is received. This will block until the server sends a confirm. Errors are only returned if the push action itself fails, see UnsafePush.

func (*Session) Stream

func (s *Session) Stream() (<-chan amqp.Delivery, error)

Stream will continuously put queue items on the channel. It is required to call delivery.Ack when it has been successfully processed, or delivery.Nack when it fails. Ignoring this will cause data to build up on the server.

func (*Session) UnsafePush

func (s *Session) UnsafePush(ctx context.Context, data []byte, delayMs int64) error

UnsafePush will push to the queue without checking for confirmation. It returns an error if it fails to connect. No guarantees are provided for whether the server will receive the message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL