Documentation ¶
Index ¶
- func DefaultLogger(scope Scope, name string, msg string, attributes map[string]any)
- func GetAttempts(msg *amqp.Delivery) int
- func GetMessageHeader[T any](msg *amqp.Delivery, key string) (value T, ok bool)
- func NewConsumerOptionsMetricsThresholdError(v float64) consumerOptionsMetricsThreshold
- func NewConsumerOptionsMetricsThresholdErrorFunc(f func() float64) consumerOptionsMetricsThreshold
- func NewConsumerOptionsMetricsThresholdWarning(v float64) consumerOptionsMetricsThreshold
- func NewConsumerOptionsMetricsThresholdWarningAndError(v1 float64, v2 float64) consumerOptionsMetricsThreshold
- func NewConsumerOptionsMetricsThresholdWarningFunc(f func() float64) consumerOptionsMetricsThreshold
- func NewConsumerOptionsMetricsThresholdWarningFuncAndErrorFunc(f1 func() float64, f2 func() float64) consumerOptionsMetricsThreshold
- func RejectWithRetry(msg *amqp.Delivery, ttl time.Duration) error
- func SetLogger(cb func(scope Scope, name string, msg string, attributes map[string]any))
- type Connection
- type ConnectionOptions
- type ConstantRetryStrategy
- type Consumer
- func (c *Consumer) AddBinding(exchangeName string, routingKey string, args mo.Option[amqp.Table]) error
- func (c *Consumer) Close() error
- func (svc *Consumer) Collect(ch chan<- prometheus.Metric)
- func (c *Consumer) Consume() <-chan *amqp.Delivery
- func (svc *Consumer) Describe(ch chan<- *prometheus.Desc)
- func (c *Consumer) RemoveBinding(exchangeName string, routingKey string, args mo.Option[amqp.Table]) error
- type ConsumerOptions
- type ConsumerOptionsBinding
- type ConsumerOptionsMessage
- type ConsumerOptionsMetrics
- type ConsumerOptionsQueue
- type Event
- type EventData
- type ExchangeKind
- type ExponentialRetryStrategy
- type LazyRetryStrategy
- type Payload
- type Producer
- func (p *Producer) Close() error
- func (p *Producer) Publish(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) error
- func (p *Producer) PublishWithContext(ctx context.Context, routingKey string, mandatory bool, immediate bool, ...) error
- func (p *Producer) PublishWithDeferredConfirm(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)
- func (p *Producer) PublishWithDeferredConfirmWithContext(ctx context.Context, routingKey string, mandatory bool, immediate bool, ...) (*amqp.DeferredConfirmation, error)
- type ProducerOptions
- type ProducerOptionsExchange
- type QueueSetupExchangeOptions
- type QueueSetupOptions
- type QueueSetupQueueOptions
- type RetryConsistency
- type RetryStrategy
- type Scope
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultLogger ¶
func GetAttempts ¶
func GetMessageHeader ¶
func NewConsumerOptionsMetricsThresholdError ¶
func NewConsumerOptionsMetricsThresholdError(v float64) consumerOptionsMetricsThreshold
func NewConsumerOptionsMetricsThresholdErrorFunc ¶
func NewConsumerOptionsMetricsThresholdErrorFunc(f func() float64) consumerOptionsMetricsThreshold
func NewConsumerOptionsMetricsThresholdWarning ¶
func NewConsumerOptionsMetricsThresholdWarning(v float64) consumerOptionsMetricsThreshold
func NewConsumerOptionsMetricsThresholdWarningFunc ¶
func NewConsumerOptionsMetricsThresholdWarningFunc(f func() float64) consumerOptionsMetricsThreshold
Types ¶
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
func NewConnection ¶
func NewConnection(name string, opt ConnectionOptions) (*Connection, error)
func (*Connection) Close ¶
func (c *Connection) Close() error
func (*Connection) IsClosed ¶
func (c *Connection) IsClosed() bool
func (*Connection) ListenConnection ¶
func (c *Connection) ListenConnection() (func(), <-chan *amqp.Connection)
ListenConnection implements the Observable pattern.
type ConnectionOptions ¶
type ConstantRetryStrategy ¶
type ConstantRetryStrategy struct {
// contains filtered or unexported fields
}
func (*ConstantRetryStrategy) NextBackOff ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(conn *Connection, name string, opt ConsumerOptions) *Consumer
func (*Consumer) AddBinding ¶
func (*Consumer) Collect ¶
func (svc *Consumer) Collect(ch chan<- prometheus.Metric)
func (*Consumer) Describe ¶
func (svc *Consumer) Describe(ch chan<- *prometheus.Desc)
type ConsumerOptions ¶
type ConsumerOptions struct { Queue ConsumerOptionsQueue Bindings []ConsumerOptionsBinding Message ConsumerOptionsMessage // optional arguments Metrics ConsumerOptionsMetrics EnableDeadLetter mo.Option[bool] // default false Defer mo.Option[time.Duration] // default no Defer ConsumeArgs mo.Option[amqp.Table] // default nil RetryStrategy mo.Option[RetryStrategy] // default no retry RetryConsistency mo.Option[RetryConsistency] // default eventually consistent }
type ConsumerOptionsBinding ¶
type ConsumerOptionsMessage ¶
type ConsumerOptionsMetrics ¶
type ConsumerOptionsMetrics struct { QueueMessageBytesThreshold consumerOptionsMetricsThreshold QueueMessagesThreshold consumerOptionsMetricsThreshold DeadLetterQueueMessageBytesThreshold consumerOptionsMetricsThreshold DeadLetterQueueMessagesThreshold consumerOptionsMetricsThreshold DeadLetterQueueMessageRateThreshold consumerOptionsMetricsThreshold RetryQueueMessageBytesThreshold consumerOptionsMetricsThreshold RetryQueueMessagesThreshold consumerOptionsMetricsThreshold RetryQueueMessageRateThreshold consumerOptionsMetricsThreshold }
type ConsumerOptionsQueue ¶
type Event ¶
type Event struct {
// contains filtered or unexported fields
}
func (*Event) SetConsumer ¶
func (e *Event) SetConsumer(queueName string, bindings []ConsumerOptionsBinding)
type ExchangeKind ¶
type ExchangeKind string
const ( ExchangeKindDirect ExchangeKind = "direct" ExchangeKindFanout ExchangeKind = "fanout" ExchangeKindTopic ExchangeKind = "topic" ExchangeKindHeaders ExchangeKind = "headers" )
type ExponentialRetryStrategy ¶
type ExponentialRetryStrategy struct {
// contains filtered or unexported fields
}
func (*ExponentialRetryStrategy) NextBackOff ¶
type LazyRetryStrategy ¶
type LazyRetryStrategy struct {
// contains filtered or unexported fields
}
func (*LazyRetryStrategy) NextBackOff ¶
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(conn *Connection, name string, opt ProducerOptions) *Producer
func (*Producer) PublishWithContext ¶
func (*Producer) PublishWithDeferredConfirm ¶
func (p *Producer) PublishWithDeferredConfirm(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)
func (*Producer) PublishWithDeferredConfirmWithContext ¶
type ProducerOptions ¶
type ProducerOptions struct {
Exchange ProducerOptionsExchange
}
type ProducerOptionsExchange ¶
type ProducerOptionsExchange struct { Name mo.Option[string] // default "amq.direct" Kind mo.Option[ExchangeKind] // default "direct" // optional arguments Durable mo.Option[bool] // default true AutoDelete mo.Option[bool] // default false Internal mo.Option[bool] // default false NoWait mo.Option[bool] // default false Args mo.Option[amqp.Table] // default nil }
type QueueSetupExchangeOptions ¶
type QueueSetupExchangeOptions struct {
// contains filtered or unexported fields
}
type QueueSetupOptions ¶
type QueueSetupOptions struct { Exchange QueueSetupExchangeOptions Queue QueueSetupQueueOptions }
type QueueSetupQueueOptions ¶
type QueueSetupQueueOptions struct {
// contains filtered or unexported fields
}
type RetryConsistency ¶
type RetryConsistency int
const ( ConsistentRetry RetryConsistency = 0 // slow EventuallyConsistentRetry RetryConsistency = 1 // fast, at *least* once )
type RetryStrategy ¶
func NewConstantRetryStrategy ¶
func NewConstantRetryStrategy(maxRetry int, interval time.Duration) RetryStrategy
func NewExponentialRetryStrategy ¶
func NewExponentialRetryStrategy(maxRetry int, initialInterval time.Duration, intervalMultiplier float64) RetryStrategy
func NewLazyRetryStrategy ¶
func NewLazyRetryStrategy(maxRetry int) RetryStrategy
ManualRetryStrategy is a retry strategy that will never automatically retry. It will only retry if the message is rejected with a TTL. This is useful if you want to retry the message manually with a custom TTL. To do this, you should use the RejectWithRetry function.
Click to show internal directories.
Click to hide internal directories.