Documentation ¶
Index ¶
- Constants
- Variables
- func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
- func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
- func TimestampOffset(t time.Time) stream.OffsetSpecification
- type BindingArgs
- type ConsumeOptions
- type Handler
- type Producer
- type RawStreamOptions
- type StreamConsumer
- type StreamMessage
- type StreamOptions
Constants ¶
View Source
const ( PublishChannelSize = 1024 RetryMinDelay = 5 * time.Second PublishLogSampleRate = 0.1 MaxRetries = 3 )
Variables ¶
View Source
var ByteCapacity = stream.ByteCapacity{}
View Source
var OffsetSpec = stream.OffsetSpecification{}
Functions ¶
func NewConsumerOptions ¶
func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
func ParseStreamOptions ¶
func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
func TimestampOffset ¶
func TimestampOffset(t time.Time) stream.OffsetSpecification
Types ¶
type ConsumeOptions ¶
type ConsumeOptions struct { Stream string *StreamOptions *stream.ConsumerOptions MemorizeOffset bool }
type Handler ¶
type Handler interface {
HandleMessage(msg StreamMessage)
}
type Producer ¶
type Producer interface {
Publish(ctx context.Context, key string, body interface{}, persistent bool) error
}
func NewAMQPExchangeProducer ¶ added in v0.0.3
type RawStreamOptions ¶
type StreamConsumer ¶
type StreamConsumer interface { ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error) Consume(ctx context.Context, opts ConsumeOptions, handler Handler) error CheckConnection() error Close() error }
func NewStreamConsumer ¶
func NewStreamConsumer(streamUriStr, amqpUriStr string) (StreamConsumer, error)
type StreamMessage ¶
type StreamMessage struct { stream.ConsumerContext *streamAmqp.Message }
type StreamOptions ¶
type StreamOptions struct { stream.StreamOptions Bindings []BindingArgs }
Click to show internal directories.
Click to hide internal directories.