Documentation ¶
Overview ¶
Package broker is an interface used for asynchronous messaging
Index ¶
- Variables
- func NewContext(ctx context.Context, s Broker) context.Context
- type BatchHandler
- type Broker
- type Event
- type Events
- type Handler
- type Message
- type Option
- func Addrs(addrs ...string) Option
- func BatchErrorHandler(h BatchHandler) Option
- func Codec(c codec.Codec) Option
- func Context(ctx context.Context) Option
- func ErrorHandler(h Handler) Option
- func Logger(l logger.Logger) Option
- func Meter(m meter.Meter) Option
- func Name(n string) Option
- func Register(r register.Register) Option
- func SetOption(k, v interface{}) Option
- func TLSConfig(t *tls.Config) Option
- func Tracer(t tracer.Tracer) Option
- type Options
- type PublishOption
- type PublishOptions
- type RawMessage
- type SubscribeOption
- func DisableAutoAck() SubscribeOption
- func Queue(name string) SubscribeOption
- func SetSubscribeOption(k, v interface{}) SubscribeOption
- func SubscribeAutoAck(b bool) SubscribeOption
- func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption
- func SubscribeBatchSize(n int) SubscribeOption
- func SubscribeBatchWait(td time.Duration) SubscribeOption
- func SubscribeBodyOnly(b bool) SubscribeOption
- func SubscribeContext(ctx context.Context) SubscribeOption
- func SubscribeErrorHandler(h Handler) SubscribeOption
- func SubscribeGroup(name string) SubscribeOption
- type SubscribeOptions
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotConnected returns when broker used but not connected yet ErrNotConnected = errors.New("broker not connected") // ErrDisconnected returns when broker disconnected ErrDisconnected = errors.New("broker disconnected") )
Functions ¶
Types ¶
type BatchHandler ¶ added in v3.5.0
BatchHandler is used to process messages in batches via a subscription of a topic.
type Broker ¶
type Broker interface { // Name returns broker instance name Name() string // Init initilize broker Init(opts ...Option) error // Options returns broker options Options() Options // Address return configured address Address() string // Connect connects to broker Connect(ctx context.Context) error // Disconnect disconnect from broker Disconnect(ctx context.Context) error // Publish message to broker topic Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error // Subscribe subscribes to topic message via handler Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) // BatchPublish messages to broker with multiple topics BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error // BatchSubscribe subscribes to topic messages via handler BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) // String type of broker String() string }
Broker is an interface used for asynchronous messaging.
func FromContext ¶
FromContext returns broker from passed context
type Event ¶
type Event interface { // Topic returns event topic Topic() string // Message returns broker message Message() *Message // Ack acknowledge message Ack() error // Error returns message error (like decoding errors or some other) Error() error // SetError set event processing error SetError(err error) }
Event is given to a subscription handler for processing
type Message ¶
type Message struct { // Header contains message metadata Header metadata.Metadata // Body contains message body Body RawMessage }
Message is used to transfer data
func NewMessage ¶ added in v3.5.1
NewMessage create broker message with topic filled
type Option ¶
type Option func(*Options)
Option func
func BatchErrorHandler ¶ added in v3.5.0
func BatchErrorHandler(h BatchHandler) Option
BatchErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors
func Codec ¶
Codec sets the codec used for encoding/decoding used where a broker does not support headers
func ErrorHandler ¶
ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors
type Options ¶
type Options struct { // Tracer used for tracing Tracer tracer.Tracer // Register can be used for clustering Register register.Register // Codec holds the codec for marshal/unmarshal Codec codec.Codec // Logger used for logging Logger logger.Logger // Meter used for metrics Meter meter.Meter // Context holds external options Context context.Context // TLSConfig holds tls.TLSConfig options TLSConfig *tls.Config // ErrorHandler used when broker can't unmarshal incoming message ErrorHandler Handler // BatchErrorHandler used when broker can't unmashal incoming messages BatchErrorHandler BatchHandler // Name holds the broker name Name string // Addrs holds the broker address Addrs []string }
Options struct
type PublishOption ¶
type PublishOption func(*PublishOptions)
PublishOption func
func PublishBodyOnly ¶ added in v3.1.0
func PublishBodyOnly(b bool) PublishOption
PublishBodyOnly publish only body of the message
func PublishContext ¶
func PublishContext(ctx context.Context) PublishOption
PublishContext sets the context
func SetPublishOption ¶
func SetPublishOption(k, v interface{}) PublishOption
SetPublishOption returns a function to setup a context with given value
type PublishOptions ¶
type PublishOptions struct { // Context holds external options Context context.Context // BodyOnly flag says the message contains raw body bytes BodyOnly bool }
PublishOptions struct
func NewPublishOptions ¶
func NewPublishOptions(opts ...PublishOption) PublishOptions
NewPublishOptions creates PublishOptions struct
type RawMessage ¶ added in v3.4.3
type RawMessage []byte
RawMessage is a raw encoded JSON value. It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
func (*RawMessage) MarshalJSON ¶ added in v3.4.3
func (m *RawMessage) MarshalJSON() ([]byte, error)
MarshalJSON returns m as the JSON encoding of m.
func (*RawMessage) UnmarshalJSON ¶ added in v3.4.3
func (m *RawMessage) UnmarshalJSON(data []byte) error
UnmarshalJSON sets *m to a copy of data.
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
SubscribeOption func
func DisableAutoAck ¶
func DisableAutoAck() SubscribeOption
DisableAutoAck disables auto ack Deprecated
func SetSubscribeOption ¶
func SetSubscribeOption(k, v interface{}) SubscribeOption
SetSubscribeOption returns a function to setup a context with given value
func SubscribeAutoAck ¶
func SubscribeAutoAck(b bool) SubscribeOption
SubscribeAutoAck contol auto acking of messages after they have been handled.
func SubscribeBatchErrorHandler ¶ added in v3.5.0
func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption
SubscribeBatchErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors
func SubscribeBatchSize ¶ added in v3.5.4
func SubscribeBatchSize(n int) SubscribeOption
SubscribeBatchSize specifies max batch size
func SubscribeBatchWait ¶ added in v3.5.4
func SubscribeBatchWait(td time.Duration) SubscribeOption
SubscribeBatchWait specifies max batch wait time
func SubscribeBodyOnly ¶ added in v3.1.0
func SubscribeBodyOnly(b bool) SubscribeOption
SubscribeBodyOnly consumes only body of the message
func SubscribeContext ¶
func SubscribeContext(ctx context.Context) SubscribeOption
SubscribeContext set context
func SubscribeErrorHandler ¶
func SubscribeErrorHandler(h Handler) SubscribeOption
SubscribeErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors
func SubscribeGroup ¶
func SubscribeGroup(name string) SubscribeOption
SubscribeGroup sets the name of the queue to share messages on
type SubscribeOptions ¶
type SubscribeOptions struct { // Context holds external options Context context.Context // ErrorHandler used when broker can't unmarshal incoming message ErrorHandler Handler // BatchErrorHandler used when broker can't unmashal incoming messages BatchErrorHandler BatchHandler // Group holds consumer group Group string // AutoAck flag specifies auto ack of incoming message when no error happens AutoAck bool // BodyOnly flag specifies that message contains only body bytes without header BodyOnly bool // BatchSize flag specifies max batch size BatchSize int // BatchWait flag specifies max wait time for batch filling BatchWait time.Duration }
SubscribeOptions struct
func NewSubscribeOptions ¶
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions
NewSubscribeOptions creates new SubscribeOptions
type Subscriber ¶
type Subscriber interface { // Options returns subscriber options Options() SubscribeOptions // Topic returns topic for subscription Topic() string // Unsubscribe from topic Unsubscribe(ctx context.Context) error }
Subscriber is a convenience return type for the Subscribe method