amqpwrap

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.

Index

Constants

This section is empty.

Variables

View Source
var ErrConnResetNeeded = errors.New("connection must be reset, link/connection state may be inconsistent")

Functions

func WrapError added in v1.0.1

func WrapError(err error, connID uint64, linkName string, partitionID string) error

Types

type AMQPClient

type AMQPClient interface {
	Close() error
	NewSession(ctx context.Context, opts *amqp.SessionOptions) (AMQPSession, error)
	ID() uint64
}

type AMQPClientWrapper

type AMQPClientWrapper struct {
	ConnID uint64
	Inner  goamqpConn
}

AMQPClientWrapper is a simple interface, implemented by *AMQPClientWrapper It exists only so we can return AMQPSession, which itself only exists so we can return interfaces for AMQPSender and AMQPReceiver from AMQPSession.

func (*AMQPClientWrapper) Close

func (w *AMQPClientWrapper) Close() error

func (*AMQPClientWrapper) ID added in v1.0.1

func (w *AMQPClientWrapper) ID() uint64

func (*AMQPClientWrapper) NewSession

func (w *AMQPClientWrapper) NewSession(ctx context.Context, opts *amqp.SessionOptions) (AMQPSession, error)

type AMQPReceiver

type AMQPReceiver interface {
	IssueCredit(credit uint32) error
	Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
	Prefetched() *amqp.Message

	// settlement functions
	AcceptMessage(ctx context.Context, msg *amqp.Message) error
	RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error
	ReleaseMessage(ctx context.Context, msg *amqp.Message) error
	ModifyMessage(ctx context.Context, msg *amqp.Message, options *amqp.ModifyMessageOptions) error

	LinkName() string
	LinkSourceFilterValue(name string) any

	// Credits returns the # of credits still active on this link.
	Credits() uint32

	ConnID() uint64
}

AMQPReceiver is implemented by *amqp.Receiver

type AMQPReceiverCloser

type AMQPReceiverCloser interface {
	AMQPReceiver
	Close(ctx context.Context) error
}

AMQPReceiverCloser is implemented by *amqp.Receiver

type AMQPReceiverWrapper

type AMQPReceiverWrapper struct {
	Inner goamqpReceiver

	ContextWithTimeoutFn ContextWithTimeoutFn
	// contains filtered or unexported fields
}

func (*AMQPReceiverWrapper) AcceptMessage

func (rw *AMQPReceiverWrapper) AcceptMessage(ctx context.Context, msg *amqp.Message) error

settlement functions

func (*AMQPReceiverWrapper) Close

func (rw *AMQPReceiverWrapper) Close(ctx context.Context) error

func (*AMQPReceiverWrapper) ConnID added in v1.0.1

func (rw *AMQPReceiverWrapper) ConnID() uint64

func (*AMQPReceiverWrapper) Credits

func (rw *AMQPReceiverWrapper) Credits() uint32

func (*AMQPReceiverWrapper) IssueCredit

func (rw *AMQPReceiverWrapper) IssueCredit(credit uint32) error

func (*AMQPReceiverWrapper) LinkName

func (rw *AMQPReceiverWrapper) LinkName() string

func (*AMQPReceiverWrapper) LinkSourceFilterValue

func (rw *AMQPReceiverWrapper) LinkSourceFilterValue(name string) any

func (*AMQPReceiverWrapper) ModifyMessage

func (rw *AMQPReceiverWrapper) ModifyMessage(ctx context.Context, msg *amqp.Message, options *amqp.ModifyMessageOptions) error

func (*AMQPReceiverWrapper) Prefetched

func (rw *AMQPReceiverWrapper) Prefetched() *amqp.Message

func (*AMQPReceiverWrapper) Receive

func (rw *AMQPReceiverWrapper) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)

func (*AMQPReceiverWrapper) RejectMessage

func (rw *AMQPReceiverWrapper) RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error

func (*AMQPReceiverWrapper) ReleaseMessage

func (rw *AMQPReceiverWrapper) ReleaseMessage(ctx context.Context, msg *amqp.Message) error

type AMQPSender

type AMQPSender interface {
	Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error
	MaxMessageSize() uint64
	LinkName() string
	ConnID() uint64
}

AMQPSender is implemented by *amqp.Sender

type AMQPSenderCloser

type AMQPSenderCloser interface {
	AMQPSender
	Close(ctx context.Context) error
}

AMQPSenderCloser is implemented by *amqp.Sender

type AMQPSenderWrapper added in v1.0.0

type AMQPSenderWrapper struct {
	Inner                goamqpSender
	ContextWithTimeoutFn ContextWithTimeoutFn
	// contains filtered or unexported fields
}

func (*AMQPSenderWrapper) Close added in v1.0.0

func (sw *AMQPSenderWrapper) Close(ctx context.Context) error

func (*AMQPSenderWrapper) ConnID added in v1.0.1

func (sw *AMQPSenderWrapper) ConnID() uint64

func (*AMQPSenderWrapper) LinkName added in v1.0.0

func (sw *AMQPSenderWrapper) LinkName() string

func (*AMQPSenderWrapper) MaxMessageSize added in v1.0.0

func (sw *AMQPSenderWrapper) MaxMessageSize() uint64

func (*AMQPSenderWrapper) Send added in v1.0.0

func (sw *AMQPSenderWrapper) Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error

type AMQPSession

type AMQPSession interface {
	Close(ctx context.Context) error
	ConnID() uint64
	NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (AMQPReceiverCloser, error)
	NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)
}

AMQPSession is a simple interface, implemented by *AMQPSessionWrapper. It exists only so we can return AMQPReceiver/AMQPSender interfaces.

type AMQPSessionWrapper

type AMQPSessionWrapper struct {
	Inner                goamqpSession
	ContextWithTimeoutFn ContextWithTimeoutFn
	// contains filtered or unexported fields
}

func (*AMQPSessionWrapper) Close

func (w *AMQPSessionWrapper) Close(ctx context.Context) error

func (*AMQPSessionWrapper) ConnID added in v1.0.1

func (w *AMQPSessionWrapper) ConnID() uint64

func (*AMQPSessionWrapper) NewReceiver

func (w *AMQPSessionWrapper) NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (AMQPReceiverCloser, error)

func (*AMQPSessionWrapper) NewSender

func (w *AMQPSessionWrapper) NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)

type ContextWithTimeoutFn added in v1.0.0

type ContextWithTimeoutFn func(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc)

ContextWithTimeoutFn matches the signature for `context.WithTimeout` and is used when we want to stub things out for tests.

type Error added in v1.0.1

type Error struct {
	ConnID      uint64
	LinkName    string
	PartitionID string
	Err         error
}

Error is a wrapper that has the context of which connection and link the error happened with.

func (Error) As added in v1.0.1

func (e Error) As(target any) bool

func (Error) Error added in v1.0.1

func (e Error) Error() string

func (Error) Is added in v1.0.1

func (e Error) Is(target error) bool
type RPCLink interface {
	Close(ctx context.Context) error
	ConnID() uint64
	RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error)
	LinkName() string
}

RPCLink is implemented by *rpc.Link

type RPCResponse added in v0.4.0

type RPCResponse struct {
	// Code is the response code - these originate from Service Bus. Some
	// common values are called out below, with the RPCResponseCode* constants.
	Code        int
	Description string
	Message     *amqp.Message
}

RPCResponse is the simplified response structure from an RPC like call

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL