broker

package
v3.10.78 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2024 License: Apache-2.0 Imports: 13 Imported by: 17

Documentation

Overview

Package broker is an interface used for asynchronous messaging

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotConnected returns when broker used but not connected yet
	ErrNotConnected = errors.New("broker not connected")
	// ErrDisconnected returns when broker disconnected
	ErrDisconnected = errors.New("broker disconnected")
	// DefaultGracefulTimeout
	DefaultGracefulTimeout = 5 * time.Second
)

Functions

func NewContext

func NewContext(ctx context.Context, s Broker) context.Context

NewContext savess broker in context

Types

type BatchHandler

type BatchHandler func(Events) error

BatchHandler is used to process messages in batches via a subscription of a topic.

type Broker

type Broker interface {
	// Name returns broker instance name
	Name() string
	// Init initilize broker
	Init(opts ...Option) error
	// Options returns broker options
	Options() Options
	// Address return configured address
	Address() string
	// Connect connects to broker
	Connect(ctx context.Context) error
	// Disconnect disconnect from broker
	Disconnect(ctx context.Context) error
	// Publish message to broker topic
	Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
	// Subscribe subscribes to topic message via handler
	Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
	// BatchPublish messages to broker with multiple topics
	BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
	// BatchSubscribe subscribes to topic messages via handler
	BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
	// String type of broker
	String() string
}

Broker is an interface used for asynchronous messaging.

var DefaultBroker Broker = NewBroker()

DefaultBroker default memory broker

func FromContext

func FromContext(ctx context.Context) (Broker, bool)

FromContext returns broker from passed context

type Event

type Event interface {
	// Context return context.Context for event
	Context() context.Context
	// Topic returns event topic
	Topic() string
	// Message returns broker message
	Message() *Message
	// Ack acknowledge message
	Ack() error
	// Error returns message error (like decoding errors or some other)
	Error() error
	// SetError set event processing error
	SetError(err error)
}

Event is given to a subscription handler for processing

type Events

type Events []Event

Events contains multiple events

func (Events) Ack

func (evs Events) Ack() error

Ack try to ack all events and return

func (Events) SetError

func (evs Events) SetError(err error)

SetError sets error on event

type FuncBatchPublish added in v3.10.63

type FuncBatchPublish func(ctx context.Context, msgs []*Message, opts ...PublishOption) error

type FuncBatchSubscribe added in v3.10.63

type FuncBatchSubscribe func(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)

type FuncPublish added in v3.10.63

type FuncPublish func(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error

type FuncSubscribe added in v3.10.63

type FuncSubscribe func(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)

type Handler

type Handler func(Event) error

Handler is used to process messages via a subscription of a topic.

type HookBatchPublish added in v3.10.63

type HookBatchPublish func(next FuncBatchPublish) FuncBatchPublish

type HookBatchSubscribe added in v3.10.63

type HookBatchSubscribe func(next FuncBatchSubscribe) FuncBatchSubscribe

type HookPublish added in v3.10.63

type HookPublish func(next FuncPublish) FuncPublish

type HookSubscribe added in v3.10.63

type HookSubscribe func(next FuncSubscribe) FuncSubscribe

type Message

type Message struct {
	// Header contains message metadata
	Header metadata.Metadata
	// Body contains message body
	Body codec.RawMessage
}

Message is used to transfer data

func NewMessage

func NewMessage(topic string) *Message

NewMessage create broker message with topic filled

type NoopBroker added in v3.10.41

type NoopBroker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(opts ...Option) *NoopBroker

func (*NoopBroker) Address added in v3.10.41

func (b *NoopBroker) Address() string

func (*NoopBroker) BatchPublish added in v3.10.41

func (b *NoopBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error

func (*NoopBroker) BatchSubscribe added in v3.10.41

func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error)

func (*NoopBroker) Connect added in v3.10.41

func (b *NoopBroker) Connect(_ context.Context) error

func (*NoopBroker) Disconnect added in v3.10.41

func (b *NoopBroker) Disconnect(_ context.Context) error

func (*NoopBroker) Init added in v3.10.41

func (b *NoopBroker) Init(opts ...Option) error

func (*NoopBroker) Name added in v3.10.41

func (b *NoopBroker) Name() string

func (*NoopBroker) Options added in v3.10.41

func (b *NoopBroker) Options() Options

func (*NoopBroker) Publish added in v3.10.41

func (b *NoopBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error

func (*NoopBroker) String added in v3.10.41

func (b *NoopBroker) String() string

func (*NoopBroker) Subscribe added in v3.10.41

func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error)

type NoopSubscriber added in v3.10.41

type NoopSubscriber struct {
	// contains filtered or unexported fields
}

func (*NoopSubscriber) Options added in v3.10.41

func (s *NoopSubscriber) Options() SubscribeOptions

func (*NoopSubscriber) Topic added in v3.10.41

func (s *NoopSubscriber) Topic() string

func (*NoopSubscriber) Unsubscribe added in v3.10.41

func (s *NoopSubscriber) Unsubscribe(_ context.Context) error

type Option

type Option func(*Options)

Option func

func Addrs

func Addrs(addrs ...string) Option

Addrs sets the host addresses to be used by the broker

func BatchErrorHandler

func BatchErrorHandler(h BatchHandler) Option

BatchErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

func Codec

func Codec(c codec.Codec) Option

Codec sets the codec used for encoding/decoding used where a broker does not support headers

func Context

func Context(ctx context.Context) Option

Context sets the context option

func ErrorHandler

func ErrorHandler(h Handler) Option

ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

func Hooks added in v3.10.63

func Hooks(h ...options.Hook) Option

Hooks sets hook runs before action

func Logger

func Logger(l logger.Logger) Option

Logger sets the logger

func Meter

func Meter(m meter.Meter) Option

Meter sets the meter

func Name

func Name(n string) Option

Name sets the name

func Register

func Register(r register.Register) Option

Register sets register option

func SetOption

func SetOption(k, v interface{}) Option

SetOption returns a function to setup a context with given value

func TLSConfig

func TLSConfig(t *tls.Config) Option

TLSConfig sets the TLS Config

func Tracer

func Tracer(t tracer.Tracer) Option

Tracer to be used for tracing

type Options

type Options struct {
	// Tracer used for tracing
	Tracer tracer.Tracer
	// Register can be used for clustering
	Register register.Register
	// Codec holds the codec for marshal/unmarshal
	Codec codec.Codec
	// Logger used for logging
	Logger logger.Logger
	// Meter used for metrics
	Meter meter.Meter
	// Context holds external options
	Context context.Context
	// TLSConfig holds tls.TLSConfig options
	TLSConfig *tls.Config
	// ErrorHandler used when broker can't unmarshal incoming message
	ErrorHandler Handler
	// BatchErrorHandler used when broker can't unmashal incoming messages
	BatchErrorHandler BatchHandler
	// Name holds the broker name
	Name string
	// Addrs holds the broker address
	Addrs []string
	// Wait waits for a collection of goroutines to finish
	Wait *sync.WaitGroup
	// GracefulTimeout contains time to wait to finish in flight requests
	GracefulTimeout time.Duration
	// Hooks can be run before broker Publish/BatchPublish and
	// Subscribe/BatchSubscribe methods
	Hooks options.Hooks
}

Options struct

func NewOptions

func NewOptions(opts ...Option) Options

NewOptions create new Options

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption func

func PublishBodyOnly

func PublishBodyOnly(b bool) PublishOption

PublishBodyOnly publish only body of the message

func PublishContext

func PublishContext(ctx context.Context) PublishOption

PublishContext sets the context

func SetPublishOption

func SetPublishOption(k, v interface{}) PublishOption

SetPublishOption returns a function to setup a context with given value

type PublishOptions

type PublishOptions struct {
	// Context holds external options
	Context context.Context
	// BodyOnly flag says the message contains raw body bytes
	BodyOnly bool
}

PublishOptions struct

func NewPublishOptions

func NewPublishOptions(opts ...PublishOption) PublishOptions

NewPublishOptions creates PublishOptions struct

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption func

func DisableAutoAck

func DisableAutoAck() SubscribeOption

DisableAutoAck disables auto ack Deprecated

func Queue

func Queue(name string) SubscribeOption

Queue sets the subscribers queue Deprecated

func SetSubscribeOption

func SetSubscribeOption(k, v interface{}) SubscribeOption

SetSubscribeOption returns a function to setup a context with given value

func SubscribeAutoAck

func SubscribeAutoAck(b bool) SubscribeOption

SubscribeAutoAck contol auto acking of messages after they have been handled.

func SubscribeBatchErrorHandler

func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption

SubscribeBatchErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

func SubscribeBatchSize

func SubscribeBatchSize(n int) SubscribeOption

SubscribeBatchSize specifies max batch size

func SubscribeBatchWait

func SubscribeBatchWait(td time.Duration) SubscribeOption

SubscribeBatchWait specifies max batch wait time

func SubscribeBodyOnly

func SubscribeBodyOnly(b bool) SubscribeOption

SubscribeBodyOnly consumes only body of the message

func SubscribeContext

func SubscribeContext(ctx context.Context) SubscribeOption

SubscribeContext set context

func SubscribeErrorHandler

func SubscribeErrorHandler(h Handler) SubscribeOption

SubscribeErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

func SubscribeGroup

func SubscribeGroup(name string) SubscribeOption

SubscribeGroup sets the name of the queue to share messages on

type SubscribeOptions

type SubscribeOptions struct {
	// Context holds external options
	Context context.Context
	// ErrorHandler used when broker can't unmarshal incoming message
	ErrorHandler Handler
	// BatchErrorHandler used when broker can't unmashal incoming messages
	BatchErrorHandler BatchHandler
	// Group holds consumer group
	Group string
	// AutoAck flag specifies auto ack of incoming message when no error happens
	AutoAck bool
	// BodyOnly flag specifies that message contains only body bytes without header
	BodyOnly bool
	// BatchSize flag specifies max batch size
	BatchSize int
	// BatchWait flag specifies max wait time for batch filling
	BatchWait time.Duration
}

SubscribeOptions struct

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

NewSubscribeOptions creates new SubscribeOptions

type Subscriber

type Subscriber interface {
	// Options returns subscriber options
	Options() SubscribeOptions
	// Topic returns topic for subscription
	Topic() string
	// Unsubscribe from topic
	Unsubscribe(ctx context.Context) error
}

Subscriber is a convenience return type for the Subscribe method

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL