amqp

package
v0.74.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package amqp provides a native consumer for the AMQP protocol.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch interface {
	// Messages of the batch.
	Messages() []Message
	// ACK deletes all messages and completes the message tracing spans.
	// In case the action will not manage to ACK all the messages, a slice of the failed messages will be returned.
	ACK() ([]Message, error)
	// NACK leaves all messages in the queue and completes all message tracing spans.
	// In case the action will not manage to NACK all the messages, a slice of the failed messages will be returned.
	NACK() ([]Message, error)
}

Batch interface for multiple AWS SQS messages.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implementation of an async component.

func New

func New(url, queue string, proc ProcessorFunc, oo ...OptionFunc) (*Component, error)

New creates a new component with support for functional configuration.

func (*Component) Run

func (c *Component) Run(ctx context.Context) error

Run starts the consumer processing loop messages.

type Message

type Message interface {
	// Context will contain the context to be used for processing.
	// Each context will have a logger setup which can be used to create a logger from context.
	Context() context.Context
	// ID of the message.
	ID() string
	// Body of the message.
	Body() []byte
	// Message will contain the raw AMQP delivery.
	Message() amqp.Delivery
	// Span contains the tracing span of this message.
	Span() trace.Span
	// ACK deletes the message from the queue and completes the tracing span.
	ACK() error
	// NACK leaves the message in the queue and completes the tracing span.
	NACK() error
}

Message interface for an AMQP Delivery.

type OptionFunc

type OptionFunc func(*Component) error

OptionFunc definition for configuring the component in a functional way.

func WithBatching added in v0.74.0

func WithBatching(count uint, timeout time.Duration) OptionFunc

WithBatching option for setting up batching. Allowed values for count is > 1 and timeout > 0.

func WithConfig added in v0.74.0

func WithConfig(cfg amqp.Config) OptionFunc

WithConfig option for setting AMQP configuration.

func WithRequeue added in v0.74.0

func WithRequeue(requeue bool) OptionFunc

WithRequeue option for adjusting the requeue policy of a message.

func WithRetry added in v0.74.0

func WithRetry(count uint, delay time.Duration) OptionFunc

WithRetry option for setting up retries.

func WithStatsInterval added in v0.74.0

func WithStatsInterval(interval time.Duration) OptionFunc

WithStatsInterval option for setting the interval to retrieve statistics.

type ProcessorFunc

type ProcessorFunc func(context.Context, Batch)

ProcessorFunc definition of an async processor.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL