pubsub

package
v0.0.0-...-cfcd8fe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultLogger

func DefaultLogger(scope Scope, name string, msg string, attributes map[string]any)

func GetAttempts

func GetAttempts(msg *amqp.Delivery) int

func GetMessageHeader

func GetMessageHeader[T any](msg *amqp.Delivery, key string) (value T, ok bool)

func NewConsumerOptionsMetricsThresholdError

func NewConsumerOptionsMetricsThresholdError(v float64) consumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdErrorFunc

func NewConsumerOptionsMetricsThresholdErrorFunc(f func() float64) consumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarning

func NewConsumerOptionsMetricsThresholdWarning(v float64) consumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarningAndError

func NewConsumerOptionsMetricsThresholdWarningAndError(v1 float64, v2 float64) consumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarningFunc

func NewConsumerOptionsMetricsThresholdWarningFunc(f func() float64) consumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarningFuncAndErrorFunc

func NewConsumerOptionsMetricsThresholdWarningFuncAndErrorFunc(f1 func() float64, f2 func() float64) consumerOptionsMetricsThreshold

func RejectWithRetry

func RejectWithRetry(msg *amqp.Delivery, ttl time.Duration) error

func SetLogger

func SetLogger(cb func(scope Scope, name string, msg string, attributes map[string]any))

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(name string, opt ConnectionOptions) (*Connection, error)

func (*Connection) Close

func (c *Connection) Close() error

func (*Connection) IsClosed

func (c *Connection) IsClosed() bool

func (*Connection) ListenConnection

func (c *Connection) ListenConnection() (func(), <-chan *amqp.Connection)

ListenConnection implements the Observable pattern.

type ConnectionOptions

type ConnectionOptions struct {
	URI    string
	Config amqp.Config

	// optional arguments
	ReconnectInterval mo.Option[time.Duration] // default 2s
	LazyConnection    mo.Option[bool]          // default false
}

type ConstantRetryStrategy

type ConstantRetryStrategy struct {
	// contains filtered or unexported fields
}

func (*ConstantRetryStrategy) NextBackOff

func (rs *ConstantRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(conn *Connection, name string, opt ConsumerOptions) *Consumer

func (*Consumer) AddBinding

func (c *Consumer) AddBinding(exchangeName string, routingKey string, args mo.Option[amqp.Table]) error

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) Collect

func (svc *Consumer) Collect(ch chan<- prometheus.Metric)

func (*Consumer) Consume

func (c *Consumer) Consume() <-chan *amqp.Delivery

func (*Consumer) Describe

func (svc *Consumer) Describe(ch chan<- *prometheus.Desc)

func (*Consumer) RemoveBinding

func (c *Consumer) RemoveBinding(exchangeName string, routingKey string, args mo.Option[amqp.Table]) error

type ConsumerOptions

type ConsumerOptions struct {
	Queue    ConsumerOptionsQueue
	Bindings []ConsumerOptionsBinding
	Message  ConsumerOptionsMessage

	// optional arguments
	Metrics          ConsumerOptionsMetrics
	EnableDeadLetter mo.Option[bool]             // default false
	Defer            mo.Option[time.Duration]    // default no Defer
	ConsumeArgs      mo.Option[amqp.Table]       // default nil
	RetryStrategy    mo.Option[RetryStrategy]    // default no retry
	RetryConsistency mo.Option[RetryConsistency] // default eventually consistent
}

type ConsumerOptionsBinding

type ConsumerOptionsBinding struct {
	ExchangeName string
	RoutingKey   string

	// optional arguments
	Args mo.Option[amqp.Table] // default nil
}

type ConsumerOptionsMessage

type ConsumerOptionsMessage struct {
	// optional arguments
	AutoAck       mo.Option[bool] // default false
	PrefetchCount mo.Option[int]  // default 0
	PrefetchSize  mo.Option[int]  // default 0
}

type ConsumerOptionsMetrics

type ConsumerOptionsMetrics struct {
	QueueMessageBytesThreshold consumerOptionsMetricsThreshold
	QueueMessagesThreshold     consumerOptionsMetricsThreshold

	DeadLetterQueueMessageBytesThreshold consumerOptionsMetricsThreshold
	DeadLetterQueueMessagesThreshold     consumerOptionsMetricsThreshold
	DeadLetterQueueMessageRateThreshold  consumerOptionsMetricsThreshold

	RetryQueueMessageBytesThreshold consumerOptionsMetricsThreshold
	RetryQueueMessagesThreshold     consumerOptionsMetricsThreshold
	RetryQueueMessageRateThreshold  consumerOptionsMetricsThreshold
}

type ConsumerOptionsQueue

type ConsumerOptionsQueue struct {
	Name string

	// optional arguments
	Durable           mo.Option[bool]       // default true
	AutoDelete        mo.Option[bool]       // default false
	ExclusiveConsumer mo.Option[bool]       // default false
	NoWait            mo.Option[bool]       // default false
	Args              mo.Option[amqp.Table] // default nil
}

type Event

type Event struct {
	// contains filtered or unexported fields
}

func NewEvent

func NewEvent(appName, username, password, host, vhost string) (*Event, error)

func (*Event) Consume

func (e *Event) Consume(msg func(int64, *amqp.Delivery))

func (*Event) Publish

func (e *Event) Publish(eventName string, payload Payload) error

func (*Event) SetConsumer

func (e *Event) SetConsumer(queueName string, bindings []ConsumerOptionsBinding)

type EventData

type EventData struct {
	ID      string  `json:"id"`
	Name    string  `json:"name"`
	Payload Payload `json:"payload"`
}

type ExchangeKind

type ExchangeKind string
const (
	ExchangeKindDirect  ExchangeKind = "direct"
	ExchangeKindFanout  ExchangeKind = "fanout"
	ExchangeKindTopic   ExchangeKind = "topic"
	ExchangeKindHeaders ExchangeKind = "headers"
)

type ExponentialRetryStrategy

type ExponentialRetryStrategy struct {
	// contains filtered or unexported fields
}

func (*ExponentialRetryStrategy) NextBackOff

func (rs *ExponentialRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool)

type LazyRetryStrategy

type LazyRetryStrategy struct {
	// contains filtered or unexported fields
}

func (*LazyRetryStrategy) NextBackOff

func (rs *LazyRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool)

type Payload

type Payload interface{}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(conn *Connection, name string, opt ProducerOptions) *Producer

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) Publish

func (p *Producer) Publish(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) error

func (*Producer) PublishWithContext

func (p *Producer) PublishWithContext(ctx context.Context, routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) error

func (*Producer) PublishWithDeferredConfirm

func (p *Producer) PublishWithDeferredConfirm(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)

func (*Producer) PublishWithDeferredConfirmWithContext

func (p *Producer) PublishWithDeferredConfirmWithContext(ctx context.Context, routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)

type ProducerOptions

type ProducerOptions struct {
	Exchange ProducerOptionsExchange
}

type ProducerOptionsExchange

type ProducerOptionsExchange struct {
	Name mo.Option[string]       // default "amq.direct"
	Kind mo.Option[ExchangeKind] // default "direct"

	// optional arguments
	Durable    mo.Option[bool]       // default true
	AutoDelete mo.Option[bool]       // default false
	Internal   mo.Option[bool]       // default false
	NoWait     mo.Option[bool]       // default false
	Args       mo.Option[amqp.Table] // default nil
}

type QueueSetupExchangeOptions

type QueueSetupExchangeOptions struct {
	// contains filtered or unexported fields
}

type QueueSetupOptions

type QueueSetupOptions struct {
	Exchange QueueSetupExchangeOptions
	Queue    QueueSetupQueueOptions
}

type QueueSetupQueueOptions

type QueueSetupQueueOptions struct {
	// contains filtered or unexported fields
}

type RetryConsistency

type RetryConsistency int
const (
	ConsistentRetry           RetryConsistency = 0 // slow
	EventuallyConsistentRetry RetryConsistency = 1 // fast, at *least* once
)

type RetryStrategy

type RetryStrategy interface {
	NextBackOff(*amqp.Delivery, int) (time.Duration, bool)
}

func NewConstantRetryStrategy

func NewConstantRetryStrategy(maxRetry int, interval time.Duration) RetryStrategy

func NewExponentialRetryStrategy

func NewExponentialRetryStrategy(maxRetry int, initialInterval time.Duration, intervalMultiplier float64) RetryStrategy

func NewLazyRetryStrategy

func NewLazyRetryStrategy(maxRetry int) RetryStrategy

ManualRetryStrategy is a retry strategy that will never automatically retry. It will only retry if the message is rejected with a TTL. This is useful if you want to retry the message manually with a custom TTL. To do this, you should use the RejectWithRetry function.

type Scope

type Scope string
const (
	ScopeConnection Scope = "connection"
	ScopeChannel    Scope = "channel"
	ScopeExchange   Scope = "exchange"
	ScopeQueue      Scope = "queue"
	ScopeConsumer   Scope = "consumer"
	ScopeProducer   Scope = "producer"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL