amqp

package
v1.2.1-RC.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package amqp implements the CloudEvent transport implementation using amqp.

Index

Constants

View Source
const (
	// TransportName is the name of this transport.
	TransportName = "AMQP"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Encoding

type Encoding int32

Encoding to use for amqp transport.

const (
	// Default allows amqp transport implementation to pick.
	Default Encoding = iota
	// BinaryV02 is Binary CloudEvents spec v0.2.
	BinaryV02
	// StructuredV02 is Structured CloudEvents spec v0.2.
	StructuredV02
	// BinaryV03 is Binary CloudEvents spec v0.3.
	BinaryV03
	// StructuredV03 is Structured CloudEvents spec v0.3.
	StructuredV03
	// BinaryV1 is Binary CloudEvents spec v1.0.
	BinaryV1
	// StructuredV1 is Structured CloudEvents spec v1.0.
	StructuredV1
	// Unknown is unknown.
	Unknown
)

func (Encoding) String

func (e Encoding) String() string

String pretty-prints the encoding as a string.

func (Encoding) Version

func (e Encoding) Version() string

Version pretty-prints the encoding version as a string.

type Message

type Message struct {
	ContentType           string
	ApplicationProperties map[string]interface{}
	Body                  []byte
}

func (Message) CloudEventsVersion

func (m Message) CloudEventsVersion() string

TODO: update this to work with AMQP

type Option

type Option func(*Transport) error

Option is the function signature required to be considered an amqp.Option.

func WithConnOpt

func WithConnOpt(opt amqp.ConnOption) Option

WithConnOpt sets a connection option for amqp

func WithConnSASLPlain

func WithConnSASLPlain(username, password string) Option

WithConnSASLPlain sets SASLPlain connection option for amqp

func WithEncoding

func WithEncoding(encoding Encoding) Option

WithEncoding sets the encoding for amqp transport.

func WithReceiverLinkOption

func WithReceiverLinkOption(opt amqp.LinkOption) Option

WithReceiverLinkOption sets a link option for amqp

func WithSenderLinkOption

func WithSenderLinkOption(opt amqp.LinkOption) Option

WithSenderLinkOption sets a link option for amqp

func WithSessionOpt

func WithSessionOpt(opt amqp.SessionOption) Option

WithSessionOpt sets a session option for amqp

type Transport

type Transport struct {
	binding.BindingTransport

	// Encoding
	Encoding Encoding

	// AMQP
	Client  *amqp.Client
	Session *amqp.Session
	Sender  *amqp.Sender
	Node    string

	// Receiver
	Receiver transport.Receiver
	// Converter is invoked if the incoming transport receives an undecodable
	// message.
	Converter transport.Converter
	// contains filtered or unexported fields
}

func New

func New(server, queue string, opts ...Option) (*Transport, error)

New creates a new amqp transport.

func (*Transport) Close

func (t *Transport) Close() error

func (*Transport) HasConverter

func (t *Transport) HasConverter() bool

HasConverter implements Transport.HasConverter

func (*Transport) HasTracePropagation

func (t *Transport) HasTracePropagation() bool

HasTracePropagation implements Transport.HasTracePropagation

func (*Transport) SetConverter

func (t *Transport) SetConverter(c transport.Converter)

SetConverter implements Transport.SetConverter

func (*Transport) StartReceiver

func (t *Transport) StartReceiver(ctx context.Context) error

StartReceiver implements Transport.StartReceiver NOTE: This is a blocking call.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL