Documentation ¶
Index ¶
- Constants
- Variables
- func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
- func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
- func TimestampOffset(t time.Time) stream.OffsetSpecification
- type AMQPChanPublisher
- type AMQPConnectFunc
- type AMQPMessage
- type AMQPProducer
- type BindingArgs
- type ConsumeOptions
- type Handler
- type Producer
- type RawStreamOptions
- type StreamConsumer
- type StreamMessage
- type StreamOptions
Constants ¶
View Source
const ( PublishChannelSize = 1024 RetryMinDelay = 5 * time.Second PublishLogSampleRate = 0.1 MaxRetries = 3 )
Variables ¶
View Source
var ( ErrProducerShuttingDown = errors.New("amqp: producer shutting down") ErrProducerClosed = errors.New("amqp: producer closed") )
View Source
var ByteCapacity = stream.ByteCapacity{}
View Source
var OffsetSpec = stream.OffsetSpecification{}
Functions ¶
func NewConsumerOptions ¶
func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
func ParseStreamOptions ¶
func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
func TimestampOffset ¶
func TimestampOffset(t time.Time) stream.OffsetSpecification
Types ¶
type AMQPChanPublisher ¶ added in v0.3.0
type AMQPChanPublisher interface {
Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
}
type AMQPConnectFunc ¶ added in v0.3.0
type AMQPConnectFunc func(ctx context.Context, uri string, confirms chan amqp.Confirmation, closed chan *amqp.Error) (AMQPChanPublisher, error)
func NewAMQPConnectFunc ¶ added in v0.3.0
func NewAMQPConnectFunc(setup func(c *amqp.Channel) error) AMQPConnectFunc
type AMQPMessage ¶ added in v0.3.0
type AMQPProducer ¶ added in v0.3.0
type AMQPProducer struct {
// contains filtered or unexported fields
}
func NewAMQPProducer ¶ added in v0.3.0
func NewAMQPProducer(uri string, connectFn AMQPConnectFunc) (*AMQPProducer, error)
func (*AMQPProducer) Publish ¶ added in v0.3.0
func (p *AMQPProducer) Publish(ctx context.Context, msg AMQPMessage) error
func (*AMQPProducer) Shutdown ¶ added in v0.4.0
func (p *AMQPProducer) Shutdown(ctx context.Context) error
Shutdown will try to gracefully stop the background event publishing process, by waiting until all the publish buffer is flushed to the remote broker and all message confirmations have been received. This function must be called only once in a producer or it will panic.
The Publish function must not be called concurrently with Shutdown or the events sent concurrently may be lost (concurrent Publish and Shutdown functions may succeed but the event never really gets sent).
type ConsumeOptions ¶
type ConsumeOptions struct { Stream string *StreamOptions *stream.ConsumerOptions MemorizeOffset bool }
type Handler ¶
type Handler interface {
HandleMessage(msg StreamMessage)
}
type Producer ¶
type Producer interface {
Publish(ctx context.Context, key string, body interface{}, persistent bool) error
}
func NewAMQPExchangeProducer ¶ added in v0.0.3
type RawStreamOptions ¶
type StreamConsumer ¶
type StreamConsumer interface { ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error) Consume(ctx context.Context, opts ConsumeOptions, handler Handler) error CheckConnection() error Close() error }
func NewStreamConsumer ¶
func NewStreamConsumer(streamUriStr, amqpUriStr string) (StreamConsumer, error)
type StreamMessage ¶
type StreamMessage struct { stream.ConsumerContext *streamAmqp.Message }
type StreamOptions ¶
type StreamOptions struct { stream.StreamOptions Bindings []BindingArgs }
Click to show internal directories.
Click to hide internal directories.