event

package
v0.4.20-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2022 License: MIT Imports: 14 Imported by: 2

Documentation

Index

Constants

View Source
const (
	PublishChannelSize   = 1024
	RetryMinDelay        = 5 * time.Second
	PublishLogSampleRate = 0.1
	MaxRetries           = 3
)

Variables

View Source
var (
	ErrProducerShuttingDown = errors.New("amqp: producer shutting down")
	ErrProducerClosed       = errors.New("amqp: producer closed")
	ErrMaxRetriesReached    = errors.New("amqp: publish max retries reached")
	ErrRetryQueueFull       = errors.New("amqp: retry queue full")
)
View Source
var ByteCapacity = stream.ByteCapacity{}
View Source
var ErrConsumerClosed = errors.New("amqp: consumer closed")

Functions

func NewConsumerOptions

func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions

func ParseStreamOptions

func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)

func TimestampOffset

func TimestampOffset(t time.Time) stream.OffsetSpecification

Types

type AMQPChanOps added in v0.4.12

type AMQPChanOps interface {
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
}

type AMQPChanSetup added in v0.4.12

type AMQPChanSetup interface {
	ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	ExchangeDelete(name string, ifUnused, noWait bool) error
	ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
	QueueInspect(name string) (amqp.Queue, error)
	QueueUnbind(name, key, exchange string, args amqp.Table) error
}

type AMQPClient added in v0.4.12

type AMQPClient interface {
	AMQPProducer
	AMQPConsumer
}

func NewAMQPClient added in v0.4.12

func NewAMQPClient(uri string, connectFn AMQPConnectFunc) (AMQPClient, error)

type AMQPConnectFunc added in v0.3.0

type AMQPConnectFunc func(ctx context.Context, uri string, confirms chan amqp.Confirmation, closed chan *amqp.Error) (AMQPChanOps, error)

func NewAMQPConnectFunc added in v0.3.0

func NewAMQPConnectFunc(setup func(c AMQPChanSetup) error) AMQPConnectFunc

type AMQPConsumer added in v0.4.12

type AMQPConsumer interface {
	Consume(queue string, concurrency int, handler AMQPMessageHandler) error
	Shutdown(context.Context) error
}

func NewAMQPConsumer added in v0.4.12

func NewAMQPConsumer(uri string, connectFn AMQPConnectFunc) (AMQPConsumer, error)

type AMQPMessage added in v0.3.0

type AMQPMessage struct {
	Exchange, Key string
	Body          interface{}
	Persistent    bool
	ResultChan    chan<- PublishResult
}

type AMQPMessageHandler added in v0.4.12

type AMQPMessageHandler func(amqp.Delivery) error

type AMQPProducer added in v0.3.0

type AMQPProducer interface {
	Publish(ctx context.Context, msg AMQPMessage) error
	Shutdown(context.Context) error
}

func NewAMQPProducer added in v0.3.0

func NewAMQPProducer(uri string, connectFn AMQPConnectFunc) (AMQPProducer, error)

type BindingArgs

type BindingArgs struct {
	Key      string
	Exchange string
	Args     amqp.Table
}

type ConsumeOptions

type ConsumeOptions struct {
	Stream string
	*StreamOptions
	*stream.ConsumerOptions
	// Whether to memorize the message offset in the stream and use it on
	// re-connections to continue from the last read message.
	MemorizeOffset bool
}

type Handler

type Handler interface {
	HandleMessage(msg StreamMessage)
}

type PublishResult added in v0.4.13

type PublishResult struct {
	Message AMQPMessage
	Error   error
}

type RawStreamOptions

type RawStreamOptions struct {
	MaxLengthBytes      string
	MaxSegmentSizeBytes string
	MaxAge              time.Duration
}

type SimpleProducer added in v0.4.12

type SimpleProducer interface {
	Publish(ctx context.Context, key string, body interface{}, persistent bool) error
}

func NewAMQPExchangeProducer added in v0.0.3

func NewAMQPExchangeProducer(ctx context.Context, uri, exchange, keyNs string) (SimpleProducer, error)

func NewAMQPQueueProducer added in v0.0.3

func NewAMQPQueueProducer(ctx context.Context, uri, queue string) (SimpleProducer, error)

type StreamConsumer

type StreamConsumer interface {
	ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error)
	Consume(ctx context.Context, opts ConsumeOptions, handler Handler) error
	CheckConnection() error
	Close() error
}

func NewStreamConsumer

func NewStreamConsumer(streamUriStr, amqpUriStr string) (StreamConsumer, error)

type StreamMessage

type StreamMessage struct {
	stream.ConsumerContext
	*streamAmqp.Message
}

type StreamOptions

type StreamOptions struct {
	stream.StreamOptions
	Bindings []BindingArgs
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL