amqp

package
v2.0.0-RC2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2020 License: Apache-2.0 Imports: 12 Imported by: 2

Documentation

Overview

Package amqp implements an AMQP binding using pack.ag/amqp module

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewReceiver

func NewReceiver(amqp *amqp.Receiver) protocol.Receiver

NewReceiver create a new Receiver which wraps an amqp.Receiver in a binding.Receiver

func NewSender

func NewSender(amqpSender *amqp.Sender, options ...SenderOptionFunc) protocol.Sender

NewSender creates a new Sender which wraps an amqp.Sender in a binding.Sender

func WriteMessage

func WriteMessage(ctx context.Context, m binding.Message, amqpMessage *amqp.Message, transformers ...binding.Transformer) error

WriteMessage fills the provided amqpMessage with the message m. Using context you can tweak the encoding processing (more details on binding.Write documentation).

Types

type Message

type Message struct {
	AMQP *amqp.Message
	// contains filtered or unexported fields
}

Message implements binding.Message by wrapping an *amqp.Message. This message *can* be read several times safely

func NewMessage

func NewMessage(message *amqp.Message) *Message

NewMessage wrap an *amqp.Message in a binding.Message. The returned message *can* be read several times safely

func (*Message) Finish

func (m *Message) Finish(err error) error

func (*Message) GetAttribute

func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})

func (*Message) GetExtension

func (m *Message) GetExtension(name string) interface{}

func (*Message) ReadBinary

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error

func (*Message) ReadEncoding

func (m *Message) ReadEncoding() binding.Encoding

func (*Message) ReadStructured

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

type Option

type Option func(*Protocol) error

Option is the function signature required to be considered an amqp.Option.

func WithConnOpt

func WithConnOpt(opt amqp.ConnOption) Option

WithConnOpt sets a connection option for amqp

func WithConnSASLPlain

func WithConnSASLPlain(username, password string) Option

WithConnSASLPlain sets SASLPlain connection option for amqp

func WithReceiverLinkOption

func WithReceiverLinkOption(opt amqp.LinkOption) Option

WithReceiverLinkOption sets a link option for amqp

func WithSenderLinkOption

func WithSenderLinkOption(opt amqp.LinkOption) Option

WithSenderLinkOption sets a link option for amqp

func WithSessionOpt

func WithSessionOpt(opt amqp.SessionOption) Option

WithSessionOpt sets a session option for amqp

type Protocol

type Protocol struct {

	// AMQP
	Client  *amqp.Client
	Session *amqp.Session

	Node string

	// Sender
	Sender                  *sender
	SenderContextDecorators []func(context.Context) context.Context

	// Receiver
	Receiver *receiver
	// contains filtered or unexported fields
}

func NewProtocol

func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error)

NewProtocol creates a new amqp transport.

func NewProtocolFromClient

func NewProtocolFromClient(client *amqp.Client, session *amqp.Session, queue string, opts ...Option) (*Protocol, error)

NewProtocolFromClient creates a new amqp transport.

func (*Protocol) Close

func (t *Protocol) Close(ctx context.Context) (err error)

func (*Protocol) Receive

func (t *Protocol) Receive(ctx context.Context) (binding.Message, error)

func (*Protocol) Send

func (t *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error

type SenderOptionFunc

type SenderOptionFunc func(sender *sender)

SenderOptionFunc is the type of amqp.Sender options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL