amqp

package module
v2.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2024 License: Apache-2.0 Imports: 11 Imported by: 8

Documentation

Overview

Package amqp implements an AMQP binding using pack.ag/amqp module

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewReceiver

func NewReceiver(amqp *amqp.Receiver) protocol.Receiver

NewReceiver create a new Receiver which wraps an amqp.Receiver in a binding.Receiver

func NewSender

func NewSender(amqpSender *amqp.Sender, options ...SenderOptionFunc) protocol.Sender

NewSender creates a new Sender which wraps an amqp.Sender in a binding.Sender

func WriteMessage

func WriteMessage(ctx context.Context, m binding.Message, amqpMessage *amqp.Message, transformers ...binding.Transformer) error

WriteMessage fills the provided amqpMessage with the message m. Using context you can tweak the encoding processing (more details on binding.Write documentation).

Types

type Message

type Message struct {
	AMQP    *amqp.Message
	AMQPrcv *amqp.Receiver
	// contains filtered or unexported fields
}

Message implements binding.Message by wrapping an *amqp.Message. This message *can* be read several times safely

func NewMessage

func NewMessage(message *amqp.Message, receiver *amqp.Receiver) *Message

NewMessage wrap an *amqp.Message in a binding.Message. The returned message *can* be read several times safely

func (*Message) Finish

func (m *Message) Finish(err error) error

func (*Message) GetAttribute

func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})

func (*Message) GetExtension

func (m *Message) GetExtension(name string) interface{}

func (*Message) ReadBinary

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error

func (*Message) ReadEncoding

func (m *Message) ReadEncoding() binding.Encoding

func (*Message) ReadStructured

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

type Option

type Option func(*Protocol) error

Option is the function signature required to be considered an amqp.Option.

func WithConnOpt

func WithConnOpt(opt amqp.ConnOption) Option

WithConnOpt sets a connection option for amqp

func WithConnSASLPlain

func WithConnSASLPlain(username, password string) Option

WithConnSASLPlain sets SASLPlain connection option for amqp

func WithReceiverLinkOption

func WithReceiverLinkOption(opt amqp.LinkOption) Option

WithReceiverLinkOption sets a link option for amqp

func WithSenderLinkOption

func WithSenderLinkOption(opt amqp.LinkOption) Option

WithSenderLinkOption sets a link option for amqp

func WithSessionOpt

func WithSessionOpt(opt amqp.SessionOption) Option

WithSessionOpt sets a session option for amqp

type Protocol

type Protocol struct {

	// AMQP
	Client  *amqp.Client
	Session *amqp.Session

	Node string

	// Sender
	Sender                  *sender
	SenderContextDecorators []func(context.Context) context.Context

	// Receiver
	Receiver *receiver
	// contains filtered or unexported fields
}

func NewProtocol

func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error)

NewProtocol creates a new amqp transport.

func NewProtocolFromClient

func NewProtocolFromClient(client *amqp.Client, session *amqp.Session, queue string, opts ...Option) (*Protocol, error)

NewProtocolFromClient creates a new amqp transport.

func NewReceiverProtocol added in v2.4.0

func NewReceiverProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error)

NewReceiverProtocol creates a new receiver amqp transport.

func NewReceiverProtocolFromClient added in v2.4.0

func NewReceiverProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error)

NewReceiverProtocolFromClient creates a new receiver amqp transport.

func NewSenderProtocol added in v2.4.0

func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error)

NewSenderProtocol creates a new sender amqp transport.

func NewSenderProtocolFromClient added in v2.4.0

func NewSenderProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error)

NewSenderProtocolFromClient creates a new amqp sender transport.

func (*Protocol) Close

func (t *Protocol) Close(ctx context.Context) (err error)

func (*Protocol) Receive

func (t *Protocol) Receive(ctx context.Context) (binding.Message, error)

func (*Protocol) Send

func (t *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error

type SenderOptionFunc

type SenderOptionFunc func(sender *sender)

SenderOptionFunc is the type of amqp.Sender options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL