amqpwrap

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.

Index

Constants

This section is empty.

Variables

View Source
var ErrConnResetNeeded = errors.New("connection must be reset, link/connection state may be inconsistent")

Functions

This section is empty.

Types

type AMQPClient

type AMQPClient interface {
	Close() error
	NewSession(ctx context.Context, opts *amqp.SessionOptions) (AMQPSession, error)
	Name() string
}

type AMQPClientWrapper

type AMQPClientWrapper struct {
	Inner goamqpConn
	ID    string
}

AMQPClientWrapper is a simple interface, implemented by *AMQPClientWrapper It exists only so we can return AMQPSession, which itself only exists so we can return interfaces for AMQPSender and AMQPReceiver from AMQPSession.

func (*AMQPClientWrapper) Close

func (w *AMQPClientWrapper) Close() error

func (*AMQPClientWrapper) Name added in v1.2.0

func (w *AMQPClientWrapper) Name() string

func (*AMQPClientWrapper) NewSession

func (w *AMQPClientWrapper) NewSession(ctx context.Context, opts *amqp.SessionOptions) (AMQPSession, error)

type AMQPReceiver

type AMQPReceiver interface {
	IssueCredit(credit uint32) error
	Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
	Prefetched() *amqp.Message

	// settlement functions
	AcceptMessage(ctx context.Context, msg *amqp.Message) error
	RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error
	ReleaseMessage(ctx context.Context, msg *amqp.Message) error
	ModifyMessage(ctx context.Context, msg *amqp.Message, options *amqp.ModifyMessageOptions) error

	LinkName() string
	LinkSourceFilterValue(name string) any

	// Credits returns the # of credits still active on this link.
	Credits() uint32
}

AMQPReceiver is implemented by *amqp.Receiver

type AMQPReceiverCloser

type AMQPReceiverCloser interface {
	AMQPReceiver
	Close(ctx context.Context) error
}

AMQPReceiverCloser is implemented by *amqp.Receiver

type AMQPReceiverWrapper added in v1.1.0

type AMQPReceiverWrapper struct {
	Inner goamqpReceiver

	ContextWithTimeoutFn ContextWithTimeoutFn
	// contains filtered or unexported fields
}

func (*AMQPReceiverWrapper) AcceptMessage added in v1.1.0

func (rw *AMQPReceiverWrapper) AcceptMessage(ctx context.Context, msg *amqp.Message) error

settlement functions

func (*AMQPReceiverWrapper) Close added in v1.1.0

func (rw *AMQPReceiverWrapper) Close(ctx context.Context) error

func (*AMQPReceiverWrapper) Credits added in v1.1.0

func (rw *AMQPReceiverWrapper) Credits() uint32

func (*AMQPReceiverWrapper) IssueCredit added in v1.1.0

func (rw *AMQPReceiverWrapper) IssueCredit(credit uint32) error

func (*AMQPReceiverWrapper) LinkName added in v1.1.0

func (rw *AMQPReceiverWrapper) LinkName() string

func (*AMQPReceiverWrapper) LinkSourceFilterValue added in v1.1.0

func (rw *AMQPReceiverWrapper) LinkSourceFilterValue(name string) any

func (*AMQPReceiverWrapper) ModifyMessage added in v1.1.0

func (rw *AMQPReceiverWrapper) ModifyMessage(ctx context.Context, msg *amqp.Message, options *amqp.ModifyMessageOptions) error

func (*AMQPReceiverWrapper) Prefetched added in v1.1.0

func (rw *AMQPReceiverWrapper) Prefetched() *amqp.Message

func (*AMQPReceiverWrapper) Receive added in v1.1.0

func (rw *AMQPReceiverWrapper) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)

func (*AMQPReceiverWrapper) RejectMessage added in v1.1.0

func (rw *AMQPReceiverWrapper) RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error

func (*AMQPReceiverWrapper) ReleaseMessage added in v1.1.0

func (rw *AMQPReceiverWrapper) ReleaseMessage(ctx context.Context, msg *amqp.Message) error

type AMQPSender

type AMQPSender interface {
	Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error
	MaxMessageSize() uint64
	LinkName() string
}

AMQPSender is implemented by *amqp.Sender

type AMQPSenderCloser

type AMQPSenderCloser interface {
	AMQPSender
	Close(ctx context.Context) error
}

AMQPSenderCloser is implemented by *amqp.Sender

type AMQPSenderWrapper added in v1.3.0

type AMQPSenderWrapper struct {
	Inner                AMQPSenderCloser
	ContextWithTimeoutFn ContextWithTimeoutFn
}

func (*AMQPSenderWrapper) Close added in v1.3.0

func (sw *AMQPSenderWrapper) Close(ctx context.Context) error

func (*AMQPSenderWrapper) LinkName added in v1.3.0

func (sw *AMQPSenderWrapper) LinkName() string

func (*AMQPSenderWrapper) MaxMessageSize added in v1.3.0

func (sw *AMQPSenderWrapper) MaxMessageSize() uint64

func (*AMQPSenderWrapper) Send added in v1.3.0

func (sw *AMQPSenderWrapper) Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error

type AMQPSession

type AMQPSession interface {
	Close(ctx context.Context) error
	NewReceiver(ctx context.Context, source string, opts *amqp.ReceiverOptions) (AMQPReceiverCloser, error)
	NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)
}

AMQPSession is a simple interface, implemented by *AMQPSessionWrapper. It exists only so we can return AMQPReceiver/AMQPSender interfaces.

type AMQPSessionWrapper

type AMQPSessionWrapper struct {
	Inner                goamqpSession
	ContextWithTimeoutFn ContextWithTimeoutFn
}

func (*AMQPSessionWrapper) Close

func (w *AMQPSessionWrapper) Close(ctx context.Context) error

func (*AMQPSessionWrapper) NewReceiver

func (w *AMQPSessionWrapper) NewReceiver(ctx context.Context, source string, opts *amqp.ReceiverOptions) (AMQPReceiverCloser, error)

func (*AMQPSessionWrapper) NewSender

func (w *AMQPSessionWrapper) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)

type Closeable added in v1.2.0

type Closeable interface {
	Close(ctx context.Context) error
}

Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.

type ContextWithTimeoutFn added in v1.3.0

type ContextWithTimeoutFn func(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc)

ContextWithTimeoutFn matches the signature for `context.WithTimeout` and is used when we want to stub things out for tests.

type RPCLink interface {
	Close(ctx context.Context) error
	RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error)
}

RPCLink is implemented by *rpc.Link

type RPCResponse added in v1.2.0

type RPCResponse struct {
	// Code is the response code - these originate from Service Bus. Some
	// common values are called out below, with the RPCResponseCode* constants.
	Code        int
	Description string
	Message     *amqp.Message
}

RPCResponse is the simplified response structure from an RPC like call

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL