event

package
v0.3.1-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2021 License: MIT Imports: 13 Imported by: 2

Documentation

Index

Constants

View Source
const (
	PublishChannelSize   = 1024
	RetryMinDelay        = 5 * time.Second
	PublishLogSampleRate = 0.1
	MaxRetries           = 3
)

Variables

View Source
var (
	ErrProducerShuttingDown = errors.New("amqp: producer shutting down")
	ErrProducerClosed       = errors.New("amqp: producer closed")
)
View Source
var ByteCapacity = stream.ByteCapacity{}

Functions

func NewConsumerOptions

func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions

func ParseStreamOptions

func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)

func TimestampOffset

func TimestampOffset(t time.Time) stream.OffsetSpecification

Types

type AMQPChanPublisher added in v0.3.0

type AMQPChanPublisher interface {
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
}

type AMQPConnectFunc added in v0.3.0

type AMQPConnectFunc func(ctx context.Context, uri string, confirms chan amqp.Confirmation, closed chan *amqp.Error) (AMQPChanPublisher, error)

func NewAMQPConnectFunc added in v0.3.0

func NewAMQPConnectFunc(setup func(c *amqp.Channel) error) AMQPConnectFunc

type AMQPMessage added in v0.3.0

type AMQPMessage struct {
	Exchange, Key string
	Body          interface{}
	Persistent    bool
}

type AMQPProducer added in v0.3.0

type AMQPProducer struct {
	// contains filtered or unexported fields
}

func NewAMQPProducer added in v0.3.0

func NewAMQPProducer(uri string, connectFn AMQPConnectFunc) (*AMQPProducer, error)

func (*AMQPProducer) Publish added in v0.3.0

func (p *AMQPProducer) Publish(ctx context.Context, msg AMQPMessage) error

func (*AMQPProducer) Shutdown added in v0.4.0

func (p *AMQPProducer) Shutdown(ctx context.Context) error

Shutdown will try to gracefully stop the background event publishing process, by waiting until all the publish buffer is flushed to the remote broker and all message confirmations have been received. This function must be called only once in a producer or it will panic.

The Publish function must not be called concurrently with Shutdown or the events sent concurrently may be lost (concurrent Publish and Shutdown functions may succeed but the event never really gets sent).

type BindingArgs

type BindingArgs struct {
	Key      string
	Exchange string
	Args     amqp.Table
}

type ConsumeOptions

type ConsumeOptions struct {
	Stream string
	*StreamOptions
	*stream.ConsumerOptions
	MemorizeOffset bool
}

type Handler

type Handler interface {
	HandleMessage(msg StreamMessage)
}

type Producer

type Producer interface {
	Publish(ctx context.Context, key string, body interface{}, persistent bool) error
}

func NewAMQPExchangeProducer added in v0.0.3

func NewAMQPExchangeProducer(ctx context.Context, uri, exchange, keyNs string) (Producer, error)

func NewAMQPQueueProducer added in v0.0.3

func NewAMQPQueueProducer(ctx context.Context, uri, queue string) (Producer, error)

type RawStreamOptions

type RawStreamOptions struct {
	MaxLengthBytes      string
	MaxSegmentSizeBytes string
	MaxAge              time.Duration
}

type StreamConsumer

type StreamConsumer interface {
	ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error)
	Consume(ctx context.Context, opts ConsumeOptions, handler Handler) error
	CheckConnection() error
	Close() error
}

func NewStreamConsumer

func NewStreamConsumer(streamUriStr, amqpUriStr string) (StreamConsumer, error)

type StreamMessage

type StreamMessage struct {
	stream.ConsumerContext
	*streamAmqp.Message
}

type StreamOptions

type StreamOptions struct {
	stream.StreamOptions
	Bindings []BindingArgs
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL