Documentation ¶
Index ¶
- Constants
- Variables
- func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
- func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
- func TimestampOffset(t time.Time) stream.OffsetSpecification
- type AMQPChanPublisher
- type AMQPConnectFunc
- type AMQPMessage
- type AMQPProducer
- type BindingArgs
- type ConsumeOptions
- type Handler
- type Producer
- type RawStreamOptions
- type StreamConsumer
- type StreamMessage
- type StreamOptions
Constants ¶
View Source
const ( PublishChannelSize = 1024 RetryMinDelay = 5 * time.Second PublishLogSampleRate = 0.1 MaxRetries = 3 )
Variables ¶
View Source
var ByteCapacity = stream.ByteCapacity{}
View Source
var OffsetSpec = stream.OffsetSpecification{}
Functions ¶
func NewConsumerOptions ¶
func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
func ParseStreamOptions ¶
func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
func TimestampOffset ¶
func TimestampOffset(t time.Time) stream.OffsetSpecification
Types ¶
type AMQPChanPublisher ¶ added in v0.3.0
type AMQPChanPublisher interface {
Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
}
type AMQPConnectFunc ¶ added in v0.3.0
type AMQPConnectFunc func(ctx context.Context, uri string, confirms chan amqp.Confirmation, closed chan *amqp.Error) (AMQPChanPublisher, error)
func NewAMQPConnectFunc ¶ added in v0.3.0
func NewAMQPConnectFunc(setup func(c *amqp.Channel) error) AMQPConnectFunc
type AMQPMessage ¶ added in v0.3.0
type AMQPProducer ¶ added in v0.3.0
type AMQPProducer struct {
// contains filtered or unexported fields
}
func NewAMQPProducer ¶ added in v0.3.0
func NewAMQPProducer(ctx context.Context, uri string, connectFn AMQPConnectFunc) (*AMQPProducer, error)
func (*AMQPProducer) Publish ¶ added in v0.3.0
func (p *AMQPProducer) Publish(ctx context.Context, msg AMQPMessage) error
type ConsumeOptions ¶
type ConsumeOptions struct { Stream string *StreamOptions *stream.ConsumerOptions MemorizeOffset bool }
type Handler ¶
type Handler interface {
HandleMessage(msg StreamMessage)
}
type Producer ¶
type Producer interface {
Publish(ctx context.Context, key string, body interface{}, persistent bool) error
}
func NewAMQPExchangeProducer ¶ added in v0.0.3
type RawStreamOptions ¶
type StreamConsumer ¶
type StreamConsumer interface { ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error) Consume(ctx context.Context, opts ConsumeOptions, handler Handler) error CheckConnection() error Close() error }
func NewStreamConsumer ¶
func NewStreamConsumer(streamUriStr, amqpUriStr string) (StreamConsumer, error)
type StreamMessage ¶
type StreamMessage struct { stream.ConsumerContext *streamAmqp.Message }
type StreamOptions ¶
type StreamOptions struct { stream.StreamOptions Bindings []BindingArgs }
Click to show internal directories.
Click to hide internal directories.