broker

package
v3.2.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2021 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package broker is an interface used for asynchronous messaging

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewContext

func NewContext(ctx context.Context, s Broker) context.Context

NewContext savess broker in context

Types

type Broker

type Broker interface {
	Name() string
	Init(...Option) error
	Options() Options
	Address() string
	Connect(context.Context) error
	Disconnect(context.Context) error
	Publish(context.Context, string, *Message, ...PublishOption) error
	Subscribe(context.Context, string, Handler, ...SubscribeOption) (Subscriber, error)
	String() string
}

Broker is an interface used for asynchronous messaging.

var (
	// DefaultBroker default broker
	DefaultBroker Broker = NewBroker()
)

func FromContext

func FromContext(ctx context.Context) (Broker, bool)

FromContext returns broker from passed context

func NewBroker

func NewBroker(opts ...Option) Broker

NewBroker return new memory broker

type Event

type Event interface {
	Topic() string
	Message() *Message
	Ack() error
	Error() error
}

Event is given to a subscription handler for processing

type Handler

type Handler func(Event) error

Handler is used to process messages via a subscription of a topic.

type Message

type Message struct {
	Header metadata.Metadata // contains message metadata
	Body   []byte            // contains message body
}

Message is used to transfer data

type Option

type Option func(*Options)

Option func

func Addrs

func Addrs(addrs ...string) Option

Addrs sets the host addresses to be used by the broker

func Codec

func Codec(c codec.Codec) Option

Codec sets the codec used for encoding/decoding used where a broker does not support headers

func Context

func Context(ctx context.Context) Option

Context sets the context option

func ErrorHandler

func ErrorHandler(h Handler) Option

ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

func Logger

func Logger(l logger.Logger) Option

Logger sets the logger

func Meter added in v3.1.6

func Meter(m meter.Meter) Option

Meter sets the meter

func Name added in v3.2.0

func Name(n string) Option

Name sets the name

func Register added in v3.2.0

func Register(r register.Register) Option

Register sets register option

func SetOption

func SetOption(k, v interface{}) Option

SetOption returns a function to setup a context with given value

func TLSConfig

func TLSConfig(t *tls.Config) Option

TLSConfig sets the TLS Config

func Tracer added in v3.1.6

func Tracer(t tracer.Tracer) Option

Tracer to be used for tracing

type Options

type Options struct {
	// Tracer used for tracing
	Tracer tracer.Tracer
	// Register can be used for clustering
	Register register.Register
	// Codec holds the codec for marshal/unmarshal
	Codec codec.Codec
	// Logger used for logging
	Logger logger.Logger
	// Meter used for metrics
	Meter meter.Meter
	// Context holds external options
	Context context.Context
	// TLSConfig holds tls.TLSConfig options
	TLSConfig *tls.Config
	// ErrorHandler used when broker can't unmarshal incoming message
	ErrorHandler Handler
	// Name holds the broker name
	Name string
	// Addrs holds the broker address
	Addrs []string
}

Options struct

func NewOptions

func NewOptions(opts ...Option) Options

NewOptions create new Options

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption func

func PublishBodyOnly added in v3.1.0

func PublishBodyOnly(b bool) PublishOption

PublishBodyOnly publish only body of the message

func PublishContext

func PublishContext(ctx context.Context) PublishOption

PublishContext sets the context

func SetPublishOption

func SetPublishOption(k, v interface{}) PublishOption

SetPublishOption returns a function to setup a context with given value

type PublishOptions

type PublishOptions struct {
	// Context holds external options
	Context context.Context
	// BodyOnly flag says the message contains raw body bytes
	BodyOnly bool
}

PublishOptions struct

func NewPublishOptions

func NewPublishOptions(opts ...PublishOption) PublishOptions

NewPublishOptions creates PublishOptions struct

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption func

func DisableAutoAck

func DisableAutoAck() SubscribeOption

DisableAutoAck disables auto ack

func Queue

func Queue(name string) SubscribeOption

Queue sets the subscribers queue Deprecated

func SetSubscribeOption

func SetSubscribeOption(k, v interface{}) SubscribeOption

SetSubscribeOption returns a function to setup a context with given value

func SubscribeAutoAck

func SubscribeAutoAck(b bool) SubscribeOption

SubscribeAutoAck will disable auto acking of messages after they have been handled.

func SubscribeBodyOnly added in v3.1.0

func SubscribeBodyOnly(b bool) SubscribeOption

SubscribeBodyOnly consumes only body of the message

func SubscribeContext

func SubscribeContext(ctx context.Context) SubscribeOption

SubscribeContext set context

func SubscribeErrorHandler

func SubscribeErrorHandler(h Handler) SubscribeOption

SubscribeErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

func SubscribeGroup

func SubscribeGroup(name string) SubscribeOption

SubscribeGroup sets the name of the queue to share messages on

type SubscribeOptions

type SubscribeOptions struct {
	// Context holds external options
	Context context.Context
	// ErrorHandler used when broker can't unmarshal incoming message
	ErrorHandler Handler
	// Group holds consumer group
	Group string
	// AutoAck flag specifies auto ack of incoming message when no error happens
	AutoAck bool
	// BodyOnly flag specifies that message contains only body bytes without header
	BodyOnly bool
}

SubscribeOptions struct

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

NewSubscribeOptions creates new SubscribeOptions

type Subscriber

type Subscriber interface {
	Options() SubscribeOptions
	Topic() string
	Unsubscribe(context.Context) error
}

Subscriber is a convenience return type for the Subscribe method

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL