Documentation ¶
Overview ¶
Package broker is an interface used for asynchronous messaging
Index ¶
- Variables
- func NewContext(ctx context.Context, s Broker) context.Context
- type BatchHandler
- type Broker
- type Event
- type Events
- type FuncBatchPublish
- type FuncBatchSubscribe
- type FuncPublish
- type FuncSubscribe
- type Handler
- type HookBatchPublish
- type HookBatchSubscribe
- type HookPublish
- type HookSubscribe
- type Message
- type NoopBroker
- func (b *NoopBroker) Address() string
- func (b *NoopBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
- func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, ...) (Subscriber, error)
- func (b *NoopBroker) Connect(_ context.Context) error
- func (b *NoopBroker) Disconnect(_ context.Context) error
- func (b *NoopBroker) Init(opts ...Option) error
- func (b *NoopBroker) Name() string
- func (b *NoopBroker) Options() Options
- func (b *NoopBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
- func (b *NoopBroker) String() string
- func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error)
- type NoopSubscriber
- type Option
- func Addrs(addrs ...string) Option
- func BatchErrorHandler(h BatchHandler) Option
- func Codec(c codec.Codec) Option
- func Context(ctx context.Context) Option
- func ErrorHandler(h Handler) Option
- func Hooks(h ...options.Hook) Option
- func Logger(l logger.Logger) Option
- func Meter(m meter.Meter) Option
- func Name(n string) Option
- func Register(r register.Register) Option
- func SetOption(k, v interface{}) Option
- func TLSConfig(t *tls.Config) Option
- func Tracer(t tracer.Tracer) Option
- type Options
- type PublishOption
- type PublishOptions
- type SubscribeOption
- func DisableAutoAck() SubscribeOption
- func Queue(name string) SubscribeOption
- func SetSubscribeOption(k, v interface{}) SubscribeOption
- func SubscribeAutoAck(b bool) SubscribeOption
- func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption
- func SubscribeBatchSize(n int) SubscribeOption
- func SubscribeBatchWait(td time.Duration) SubscribeOption
- func SubscribeBodyOnly(b bool) SubscribeOption
- func SubscribeContext(ctx context.Context) SubscribeOption
- func SubscribeErrorHandler(h Handler) SubscribeOption
- func SubscribeGroup(name string) SubscribeOption
- type SubscribeOptions
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotConnected returns when broker used but not connected yet ErrNotConnected = errors.New("broker not connected") // ErrDisconnected returns when broker disconnected ErrDisconnected = errors.New("broker disconnected") // DefaultGracefulTimeout DefaultGracefulTimeout = 5 * time.Second )
Functions ¶
Types ¶
type BatchHandler ¶
BatchHandler is used to process messages in batches via a subscription of a topic.
type Broker ¶
type Broker interface { // Name returns broker instance name Name() string // Init initilize broker Init(opts ...Option) error // Options returns broker options Options() Options // Address return configured address Address() string // Connect connects to broker Connect(ctx context.Context) error // Disconnect disconnect from broker Disconnect(ctx context.Context) error // Publish message to broker topic Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error // Subscribe subscribes to topic message via handler Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) // BatchPublish messages to broker with multiple topics BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error // BatchSubscribe subscribes to topic messages via handler BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) // String type of broker String() string }
Broker is an interface used for asynchronous messaging.
type Event ¶
type Event interface { // Context return context.Context for event Context() context.Context // Topic returns event topic Topic() string // Message returns broker message Message() *Message // Ack acknowledge message Ack() error // Error returns message error (like decoding errors or some other) Error() error // SetError set event processing error SetError(err error) }
Event is given to a subscription handler for processing
type FuncBatchPublish ¶ added in v3.10.63
type FuncBatchPublish func(ctx context.Context, msgs []*Message, opts ...PublishOption) error
type FuncBatchSubscribe ¶ added in v3.10.63
type FuncBatchSubscribe func(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
type FuncPublish ¶ added in v3.10.63
type FuncSubscribe ¶ added in v3.10.63
type FuncSubscribe func(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
type HookBatchPublish ¶ added in v3.10.63
type HookBatchPublish func(next FuncBatchPublish) FuncBatchPublish
type HookBatchSubscribe ¶ added in v3.10.63
type HookBatchSubscribe func(next FuncBatchSubscribe) FuncBatchSubscribe
type HookPublish ¶ added in v3.10.63
type HookPublish func(next FuncPublish) FuncPublish
type HookSubscribe ¶ added in v3.10.63
type HookSubscribe func(next FuncSubscribe) FuncSubscribe
type Message ¶
type Message struct { // Header contains message metadata Header metadata.Metadata // Body contains message body Body codec.RawMessage }
Message is used to transfer data
func NewMessage ¶
NewMessage create broker message with topic filled
type NoopBroker ¶ added in v3.10.41
type NoopBroker struct {
// contains filtered or unexported fields
}
func NewBroker ¶
func NewBroker(opts ...Option) *NoopBroker
func (*NoopBroker) Address ¶ added in v3.10.41
func (b *NoopBroker) Address() string
func (*NoopBroker) BatchPublish ¶ added in v3.10.41
func (b *NoopBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
func (*NoopBroker) BatchSubscribe ¶ added in v3.10.41
func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error)
func (*NoopBroker) Connect ¶ added in v3.10.41
func (b *NoopBroker) Connect(_ context.Context) error
func (*NoopBroker) Disconnect ¶ added in v3.10.41
func (b *NoopBroker) Disconnect(_ context.Context) error
func (*NoopBroker) Init ¶ added in v3.10.41
func (b *NoopBroker) Init(opts ...Option) error
func (*NoopBroker) Name ¶ added in v3.10.41
func (b *NoopBroker) Name() string
func (*NoopBroker) Options ¶ added in v3.10.41
func (b *NoopBroker) Options() Options
func (*NoopBroker) Publish ¶ added in v3.10.41
func (b *NoopBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
func (*NoopBroker) String ¶ added in v3.10.41
func (b *NoopBroker) String() string
func (*NoopBroker) Subscribe ¶ added in v3.10.41
func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error)
type NoopSubscriber ¶ added in v3.10.41
type NoopSubscriber struct {
// contains filtered or unexported fields
}
func (*NoopSubscriber) Options ¶ added in v3.10.41
func (s *NoopSubscriber) Options() SubscribeOptions
func (*NoopSubscriber) Topic ¶ added in v3.10.41
func (s *NoopSubscriber) Topic() string
func (*NoopSubscriber) Unsubscribe ¶ added in v3.10.41
func (s *NoopSubscriber) Unsubscribe(_ context.Context) error
type Option ¶
type Option func(*Options)
Option func
func BatchErrorHandler ¶
func BatchErrorHandler(h BatchHandler) Option
BatchErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors
func Codec ¶
Codec sets the codec used for encoding/decoding used where a broker does not support headers
func ErrorHandler ¶
ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors
type Options ¶
type Options struct { // Tracer used for tracing Tracer tracer.Tracer // Register can be used for clustering Register register.Register // Codec holds the codec for marshal/unmarshal Codec codec.Codec // Logger used for logging Logger logger.Logger // Meter used for metrics Meter meter.Meter // Context holds external options Context context.Context // TLSConfig holds tls.TLSConfig options TLSConfig *tls.Config // ErrorHandler used when broker can't unmarshal incoming message ErrorHandler Handler // BatchErrorHandler used when broker can't unmashal incoming messages BatchErrorHandler BatchHandler // Name holds the broker name Name string // Addrs holds the broker address Addrs []string // Wait waits for a collection of goroutines to finish Wait *sync.WaitGroup // GracefulTimeout contains time to wait to finish in flight requests GracefulTimeout time.Duration // Hooks can be run before broker Publish/BatchPublish and // Subscribe/BatchSubscribe methods Hooks options.Hooks }
Options struct
type PublishOption ¶
type PublishOption func(*PublishOptions)
PublishOption func
func PublishBodyOnly ¶
func PublishBodyOnly(b bool) PublishOption
PublishBodyOnly publish only body of the message
func PublishContext ¶
func PublishContext(ctx context.Context) PublishOption
PublishContext sets the context
func SetPublishOption ¶
func SetPublishOption(k, v interface{}) PublishOption
SetPublishOption returns a function to setup a context with given value
type PublishOptions ¶
type PublishOptions struct { // Context holds external options Context context.Context // BodyOnly flag says the message contains raw body bytes BodyOnly bool }
PublishOptions struct
func NewPublishOptions ¶
func NewPublishOptions(opts ...PublishOption) PublishOptions
NewPublishOptions creates PublishOptions struct
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
SubscribeOption func
func DisableAutoAck ¶
func DisableAutoAck() SubscribeOption
DisableAutoAck disables auto ack Deprecated
func SetSubscribeOption ¶
func SetSubscribeOption(k, v interface{}) SubscribeOption
SetSubscribeOption returns a function to setup a context with given value
func SubscribeAutoAck ¶
func SubscribeAutoAck(b bool) SubscribeOption
SubscribeAutoAck contol auto acking of messages after they have been handled.
func SubscribeBatchErrorHandler ¶
func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption
SubscribeBatchErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors
func SubscribeBatchSize ¶
func SubscribeBatchSize(n int) SubscribeOption
SubscribeBatchSize specifies max batch size
func SubscribeBatchWait ¶
func SubscribeBatchWait(td time.Duration) SubscribeOption
SubscribeBatchWait specifies max batch wait time
func SubscribeBodyOnly ¶
func SubscribeBodyOnly(b bool) SubscribeOption
SubscribeBodyOnly consumes only body of the message
func SubscribeContext ¶
func SubscribeContext(ctx context.Context) SubscribeOption
SubscribeContext set context
func SubscribeErrorHandler ¶
func SubscribeErrorHandler(h Handler) SubscribeOption
SubscribeErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors
func SubscribeGroup ¶
func SubscribeGroup(name string) SubscribeOption
SubscribeGroup sets the name of the queue to share messages on
type SubscribeOptions ¶
type SubscribeOptions struct { // Context holds external options Context context.Context // ErrorHandler used when broker can't unmarshal incoming message ErrorHandler Handler // BatchErrorHandler used when broker can't unmashal incoming messages BatchErrorHandler BatchHandler // Group holds consumer group Group string // AutoAck flag specifies auto ack of incoming message when no error happens AutoAck bool // BodyOnly flag specifies that message contains only body bytes without header BodyOnly bool // BatchSize flag specifies max batch size BatchSize int // BatchWait flag specifies max wait time for batch filling BatchWait time.Duration }
SubscribeOptions struct
func NewSubscribeOptions ¶
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions
NewSubscribeOptions creates new SubscribeOptions
type Subscriber ¶
type Subscriber interface { // Options returns subscriber options Options() SubscribeOptions // Topic returns topic for subscription Topic() string // Unsubscribe from topic Unsubscribe(ctx context.Context) error }
Subscriber is a convenience return type for the Subscribe method