Documentation ¶
Index ¶
- Constants
- Variables
- func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
- func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
- func TimestampOffset(t time.Time) stream.OffsetSpecification
- type AMQPChanOps
- type AMQPChanSetup
- type AMQPClient
- type AMQPConnectFunc
- type AMQPConsumer
- type AMQPMessage
- type AMQPMessageHandler
- type AMQPProducer
- type BindingArgs
- type ConsumeOptions
- type Handler
- type PublishResult
- type RawStreamOptions
- type SimpleProducer
- type StreamConsumer
- type StreamMessage
- type StreamOptions
Constants ¶
View Source
const ( PublishChannelSize = 1024 RetryMinDelay = 5 * time.Second PublishLogSampleRate = 0.1 MaxRetries = 3 )
Variables ¶
View Source
var ( ErrProducerShuttingDown = errors.New("amqp: producer shutting down") ErrProducerClosed = errors.New("amqp: producer closed") ErrMaxRetriesReached = errors.New("amqp: publish max retries reached") ErrRetryQueueFull = errors.New("amqp: retry queue full") )
View Source
var ByteCapacity = stream.ByteCapacity{}
View Source
var ErrConsumerClosed = errors.New("amqp: consumer closed")
View Source
var OffsetSpec = stream.OffsetSpecification{}
Functions ¶
func NewConsumerOptions ¶
func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
func ParseStreamOptions ¶
func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
func TimestampOffset ¶
func TimestampOffset(t time.Time) stream.OffsetSpecification
Types ¶
type AMQPChanOps ¶ added in v0.4.12
type AMQPChanSetup ¶ added in v0.4.12
type AMQPChanSetup interface { ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error ExchangeDelete(name string, ifUnused, noWait bool) error ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error) QueueInspect(name string) (amqp.Queue, error) QueueUnbind(name, key, exchange string, args amqp.Table) error }
type AMQPClient ¶ added in v0.4.12
type AMQPClient interface { AMQPProducer AMQPConsumer }
func NewAMQPClient ¶ added in v0.4.12
func NewAMQPClient(uri string, connectFn AMQPConnectFunc) (AMQPClient, error)
type AMQPConnectFunc ¶ added in v0.3.0
type AMQPConnectFunc func(ctx context.Context, uri string, confirms chan amqp.Confirmation, closed chan *amqp.Error) (AMQPChanOps, error)
func NewAMQPConnectFunc ¶ added in v0.3.0
func NewAMQPConnectFunc(setup func(c AMQPChanSetup) error) AMQPConnectFunc
type AMQPConsumer ¶ added in v0.4.12
type AMQPConsumer interface { Consume(queue string, concurrency int, handler AMQPMessageHandler) error Shutdown(context.Context) error }
func NewAMQPConsumer ¶ added in v0.4.12
func NewAMQPConsumer(uri string, connectFn AMQPConnectFunc) (AMQPConsumer, error)
type AMQPMessage ¶ added in v0.3.0
type AMQPMessage struct {
Exchange, Key string
Body interface{}
Persistent bool
ResultChan chan<- PublishResult
}
type AMQPMessageHandler ¶ added in v0.4.12
type AMQPProducer ¶ added in v0.3.0
type AMQPProducer interface { Publish(ctx context.Context, msg AMQPMessage) error Shutdown(context.Context) error }
func NewAMQPProducer ¶ added in v0.3.0
func NewAMQPProducer(uri string, connectFn AMQPConnectFunc) (AMQPProducer, error)
type ConsumeOptions ¶
type ConsumeOptions struct { Stream string *StreamOptions *stream.ConsumerOptions // Whether to memorize the message offset in the stream and use it on // re-connections to continue from the last read message. MemorizeOffset bool }
type Handler ¶
type Handler interface {
HandleMessage(msg StreamMessage)
}
type PublishResult ¶ added in v0.4.13
type PublishResult struct { Message AMQPMessage Error error }
type RawStreamOptions ¶
type SimpleProducer ¶ added in v0.4.12
type SimpleProducer interface {
Publish(ctx context.Context, key string, body interface{}, persistent bool) error
}
func NewAMQPExchangeProducer ¶ added in v0.0.3
func NewAMQPExchangeProducer(ctx context.Context, uri, exchange, keyNs string) (SimpleProducer, error)
func NewAMQPQueueProducer ¶ added in v0.0.3
func NewAMQPQueueProducer(ctx context.Context, uri, queue string) (SimpleProducer, error)
type StreamConsumer ¶
type StreamConsumer interface { ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error) Consume(ctx context.Context, opts ConsumeOptions, handler Handler) error CheckConnection() error Close() error }
func NewStreamConsumer ¶
func NewStreamConsumer(streamUriStr, amqpUriStr string) (StreamConsumer, error)
type StreamMessage ¶
type StreamMessage struct { stream.ConsumerContext *streamAmqp.Message }
type StreamOptions ¶
type StreamOptions struct { stream.StreamOptions Bindings []BindingArgs }
Click to show internal directories.
Click to hide internal directories.