broker

package
v3.8.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2022 License: Apache-2.0 Imports: 15 Imported by: 18

Documentation

Overview

Package broker is an interface used for asynchronous messaging

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotConnected returns when broker used but not connected yet
	ErrNotConnected = errors.New("broker not connected")
	// ErrDisconnected returns when broker disconnected
	ErrDisconnected = errors.New("broker disconnected")
)

Functions

func NewContext

func NewContext(ctx context.Context, s Broker) context.Context

NewContext savess broker in context

Types

type BatchHandler

type BatchHandler func(Events) error

BatchHandler is used to process messages in batches via a subscription of a topic.

type Broker

type Broker interface {
	// Name returns broker instance name
	Name() string
	// Init initilize broker
	Init(opts ...Option) error
	// Options returns broker options
	Options() Options
	// Address return configured address
	Address() string
	// Connect connects to broker
	Connect(ctx context.Context) error
	// Disconnect disconnect from broker
	Disconnect(ctx context.Context) error
	// Publish message to broker topic
	Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
	// Subscribe subscribes to topic message via handler
	Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
	// BatchPublish messages to broker with multiple topics
	BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
	// BatchSubscribe subscribes to topic messages via handler
	BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
	// String type of broker
	String() string
}

Broker is an interface used for asynchronous messaging.

var DefaultBroker Broker = NewBroker()

DefaultBroker default memory broker

func FromContext

func FromContext(ctx context.Context) (Broker, bool)

FromContext returns broker from passed context

func NewBroker

func NewBroker(opts ...Option) Broker

NewBroker return new memory broker

type Event

type Event interface {
	// Topic returns event topic
	Topic() string
	// Message returns broker message
	Message() *Message
	// Ack acknowledge message
	Ack() error
	// Error returns message error (like decoding errors or some other)
	Error() error
	// SetError set event processing error
	SetError(err error)
}

Event is given to a subscription handler for processing

type Events

type Events []Event

Events contains multiple events

func (Events) Ack

func (evs Events) Ack() error

func (Events) SetError

func (evs Events) SetError(err error)

type Handler

type Handler func(Event) error

Handler is used to process messages via a subscription of a topic.

type Message

type Message struct {
	// Header contains message metadata
	Header metadata.Metadata
	// Body contains message body
	Body RawMessage
}

Message is used to transfer data

func NewMessage

func NewMessage(topic string) *Message

NewMessage create broker message with topic filled

type Option

type Option func(*Options)

Option func

func Addrs

func Addrs(addrs ...string) Option

Addrs sets the host addresses to be used by the broker

func BatchErrorHandler

func BatchErrorHandler(h BatchHandler) Option

BatchErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

func Codec

func Codec(c codec.Codec) Option

Codec sets the codec used for encoding/decoding used where a broker does not support headers

func Context

func Context(ctx context.Context) Option

Context sets the context option

func ErrorHandler

func ErrorHandler(h Handler) Option

ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

func Logger

func Logger(l logger.Logger) Option

Logger sets the logger

func Meter

func Meter(m meter.Meter) Option

Meter sets the meter

func Name

func Name(n string) Option

Name sets the name

func Register

func Register(r register.Register) Option

Register sets register option

func SetOption

func SetOption(k, v interface{}) Option

SetOption returns a function to setup a context with given value

func TLSConfig

func TLSConfig(t *tls.Config) Option

TLSConfig sets the TLS Config

func Tracer

func Tracer(t tracer.Tracer) Option

Tracer to be used for tracing

type Options

type Options struct {
	// Tracer used for tracing
	Tracer tracer.Tracer
	// Register can be used for clustering
	Register register.Register
	// Codec holds the codec for marshal/unmarshal
	Codec codec.Codec
	// Logger used for logging
	Logger logger.Logger
	// Meter used for metrics
	Meter meter.Meter
	// Context holds external options
	Context context.Context
	// TLSConfig holds tls.TLSConfig options
	TLSConfig *tls.Config
	// ErrorHandler used when broker can't unmarshal incoming message
	ErrorHandler Handler
	// BatchErrorHandler used when broker can't unmashal incoming messages
	BatchErrorHandler BatchHandler
	// Name holds the broker name
	Name string
	// Addrs holds the broker address
	Addrs []string
}

Options struct

func NewOptions

func NewOptions(opts ...Option) Options

NewOptions create new Options

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption func

func PublishBodyOnly

func PublishBodyOnly(b bool) PublishOption

PublishBodyOnly publish only body of the message

func PublishContext

func PublishContext(ctx context.Context) PublishOption

PublishContext sets the context

func SetPublishOption

func SetPublishOption(k, v interface{}) PublishOption

SetPublishOption returns a function to setup a context with given value

type PublishOptions

type PublishOptions struct {
	// Context holds external options
	Context context.Context
	// BodyOnly flag says the message contains raw body bytes
	BodyOnly bool
}

PublishOptions struct

func NewPublishOptions

func NewPublishOptions(opts ...PublishOption) PublishOptions

NewPublishOptions creates PublishOptions struct

type RawMessage

type RawMessage []byte

RawMessage is a raw encoded JSON value. It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.

func (*RawMessage) MarshalJSON

func (m *RawMessage) MarshalJSON() ([]byte, error)

MarshalJSON returns m as the JSON encoding of m.

func (*RawMessage) UnmarshalJSON

func (m *RawMessage) UnmarshalJSON(data []byte) error

UnmarshalJSON sets *m to a copy of data.

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption func

func DisableAutoAck

func DisableAutoAck() SubscribeOption

DisableAutoAck disables auto ack Deprecated

func Queue

func Queue(name string) SubscribeOption

Queue sets the subscribers queue Deprecated

func SetSubscribeOption

func SetSubscribeOption(k, v interface{}) SubscribeOption

SetSubscribeOption returns a function to setup a context with given value

func SubscribeAutoAck

func SubscribeAutoAck(b bool) SubscribeOption

SubscribeAutoAck contol auto acking of messages after they have been handled.

func SubscribeBatchErrorHandler

func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption

SubscribeBatchErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

func SubscribeBatchSize

func SubscribeBatchSize(n int) SubscribeOption

SubscribeBatchSize specifies max batch size

func SubscribeBatchWait

func SubscribeBatchWait(td time.Duration) SubscribeOption

SubscribeBatchWait specifies max batch wait time

func SubscribeBodyOnly

func SubscribeBodyOnly(b bool) SubscribeOption

SubscribeBodyOnly consumes only body of the message

func SubscribeContext

func SubscribeContext(ctx context.Context) SubscribeOption

SubscribeContext set context

func SubscribeErrorHandler

func SubscribeErrorHandler(h Handler) SubscribeOption

SubscribeErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

func SubscribeGroup

func SubscribeGroup(name string) SubscribeOption

SubscribeGroup sets the name of the queue to share messages on

type SubscribeOptions

type SubscribeOptions struct {
	// Context holds external options
	Context context.Context
	// ErrorHandler used when broker can't unmarshal incoming message
	ErrorHandler Handler
	// BatchErrorHandler used when broker can't unmashal incoming messages
	BatchErrorHandler BatchHandler
	// Group holds consumer group
	Group string
	// AutoAck flag specifies auto ack of incoming message when no error happens
	AutoAck bool
	// BodyOnly flag specifies that message contains only body bytes without header
	BodyOnly bool
	// BatchSize flag specifies max batch size
	BatchSize int
	// BatchWait flag specifies max wait time for batch filling
	BatchWait time.Duration
}

SubscribeOptions struct

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

NewSubscribeOptions creates new SubscribeOptions

type Subscriber

type Subscriber interface {
	// Options returns subscriber options
	Options() SubscribeOptions
	// Topic returns topic for subscription
	Topic() string
	// Unsubscribe from topic
	Unsubscribe(ctx context.Context) error
}

Subscriber is a convenience return type for the Subscribe method

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL