Documentation ¶
Index ¶
- Constants
- Variables
- func IsUnprocessableMessageErr(err error) bool
- func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
- func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
- func TimestampOffset(t time.Time) stream.OffsetSpecification
- func UnprocessableMsgErr(reason error) error
- type AMQPChanOps
- type AMQPChanSetup
- type AMQPClient
- type AMQPConnectFunc
- type AMQPConsumer
- type AMQPMessage
- type AMQPMessageHandler
- type AMQPProducer
- type BindingArgs
- type ConsumeOptions
- type Handler
- type PublishResult
- type RawStreamOptions
- type SimpleProducer
- type StreamConsumer
- type StreamMessage
- type StreamOptions
Constants ¶
View Source
const ( PublishChannelSize = 1024 RetryMinDelay = 5 * time.Second PublishLogSampleRate = 0.1 MaxRetries = 3 )
Variables ¶
View Source
var ( ErrProducerShuttingDown = errors.New("amqp: producer shutting down") ErrProducerClosed = errors.New("amqp: producer closed") ErrMaxRetriesReached = errors.New("amqp: publish max retries reached") ErrRetryQueueFull = errors.New("amqp: retry queue full") )
View Source
var ByteCapacity = stream.ByteCapacity{}
View Source
var ErrConsumerClosed = errors.New("amqp: consumer closed")
View Source
var OffsetSpec = stream.OffsetSpecification{}
Functions ¶
func IsUnprocessableMessageErr ¶ added in v0.5.3
func NewConsumerOptions ¶
func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
func ParseStreamOptions ¶
func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
func TimestampOffset ¶
func TimestampOffset(t time.Time) stream.OffsetSpecification
func UnprocessableMsgErr ¶ added in v0.6.0
If error is not nil, wraps it in an unprocessable message error so the consumer does not requeue the message.
Types ¶
type AMQPChanOps ¶ added in v0.4.12
type AMQPChanSetup ¶ added in v0.4.12
type AMQPChanSetup interface { ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error ExchangeDelete(name string, ifUnused, noWait bool) error ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error) QueueInspect(name string) (amqp.Queue, error) QueueUnbind(name, key, exchange string, args amqp.Table) error Qos(prefetchCount, prefetchSize int, global bool) error }
type AMQPClient ¶ added in v0.4.12
type AMQPClient interface { AMQPProducer AMQPConsumer }
func NewAMQPClient ¶ added in v0.4.12
func NewAMQPClient(uri string, connectFn AMQPConnectFunc) (AMQPClient, error)
type AMQPConnectFunc ¶ added in v0.3.0
type AMQPConnectFunc func(ctx context.Context, uri string, confirms chan amqp.Confirmation, closed chan *amqp.Error) (AMQPChanOps, error)
func NewAMQPConnectFunc ¶ added in v0.3.0
func NewAMQPConnectFunc(setup func(c AMQPChanSetup) error) AMQPConnectFunc
type AMQPConsumer ¶ added in v0.4.12
type AMQPConsumer interface { Consume(queue string, concurrency int, handler AMQPMessageHandler) error Shutdown(context.Context) error }
func NewAMQPConsumer ¶ added in v0.4.12
func NewAMQPConsumer(uri string, connectFn AMQPConnectFunc) (AMQPConsumer, error)
type AMQPMessage ¶ added in v0.3.0
type AMQPMessage struct {
// Exchange and Key of message in the AMQP protocol.
Exchange, Key string
// Body is the payload of the message.
Body interface{}
// Persistent means whether this message should be persisted in durable
// storage not to be lost on broker restarts.
Persistent bool
// Mandatory means that if the message cannot be routed to a queue, the
// broker should return it to the sender. In other words, the broker will
// try to put the message in at least one queue, and if there's no queue
// bound to receive the message it will fail the publishing.
Mandatory bool
// ResultChan receives the result message from the publish operation. Used
// to guarantee delivery of messages to the broker through confirmation.
ResultChan chan<- PublishResult
// WaitResult simplifies waiting for the result of a publish operation. If
// true, `Publish` will only return after confirmation has been received for
// the specific message. Cannot be specified together with a `ResultChan`.
WaitResult bool
}
type AMQPMessageHandler ¶ added in v0.4.12
AMQPMessageHandler is a function that will be called for each message received.
type AMQPProducer ¶ added in v0.3.0
type AMQPProducer interface { Publish(ctx context.Context, msg AMQPMessage) error Shutdown(context.Context) error }
func NewAMQPProducer ¶ added in v0.3.0
func NewAMQPProducer(uri string, connectFn AMQPConnectFunc) (AMQPProducer, error)
type ConsumeOptions ¶
type ConsumeOptions struct { Stream string *StreamOptions *stream.ConsumerOptions // Whether to memorize the message offset in the stream and use it on // re-connections to continue from the last read message. MemorizeOffset bool }
type Handler ¶
type Handler interface {
HandleMessage(msg StreamMessage)
}
type PublishResult ¶ added in v0.4.13
type PublishResult struct { Message AMQPMessage Error error }
type RawStreamOptions ¶
type SimpleProducer ¶ added in v0.4.12
type SimpleProducer interface {
Publish(ctx context.Context, key string, body interface{}, persistent bool) error
}
func NewAMQPExchangeProducer ¶ added in v0.0.3
func NewAMQPExchangeProducer(ctx context.Context, uri, exchange, keyNs string) (SimpleProducer, error)
func NewAMQPQueueProducer ¶ added in v0.0.3
func NewAMQPQueueProducer(ctx context.Context, uri, queue string) (SimpleProducer, error)
type StreamConsumer ¶
type StreamConsumer interface { ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error) Consume(ctx context.Context, opts ConsumeOptions, handler Handler) error CheckConnection() error Close() error }
func NewStreamConsumer ¶
func NewStreamConsumer(streamUriStr, amqpUriStr string) (StreamConsumer, error)
type StreamMessage ¶
type StreamMessage struct { stream.ConsumerContext *streamAmqp.Message }
type StreamOptions ¶
type StreamOptions struct { stream.StreamOptions Bindings []BindingArgs }
Click to show internal directories.
Click to hide internal directories.