broker

package
v4.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2023 License: Apache-2.0 Imports: 22 Imported by: 1

Documentation

Overview

Package broker is an interface used for asynchronous messaging

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotConnected returns when broker used but not connected yet
	ErrNotConnected = errors.New("broker not connected")
	// ErrDisconnected returns when broker disconnected
	ErrDisconnected = errors.New("broker disconnected")
	// ErrInvalidMessage returns when message has nvalid format
	ErrInvalidMessage = errors.New("broker message has invalid format")
)

Functions

func BodyOnly added in v4.0.6

func BodyOnly(b bool) options.Option

BodyOnly transfer only body without

func ErrorHandler

func ErrorHandler(h interface{}) options.Option

ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

func NewContext

func NewContext(ctx context.Context, s Broker) context.Context

NewContext savess broker in context

func PublishTopic added in v4.0.6

func PublishTopic(t string) options.Option

PublishTopic pass topic for messages

func SubscribeAutoAck

func SubscribeAutoAck(b bool) options.Option

SubscribeAutoAck contol auto acking of messages after they have been handled.

func SubscribeBatchSize

func SubscribeBatchSize(n int) options.Option

SubscribeBatchSize specifies max batch size

func SubscribeBatchWait

func SubscribeBatchWait(td time.Duration) options.Option

SubscribeBatchWait specifies max batch wait time

func SubscribeQueueGroup added in v4.0.2

func SubscribeQueueGroup(n string) options.Option

SubscribeQueueGroup sets the shared queue name distributed messages across subscribers

func ValidateSubscriber added in v4.0.2

func ValidateSubscriber(sub interface{}) error

ValidateSubscriber func signature

Types

type Broker

type Broker interface {
	// Name returns broker instance name
	Name() string
	// Init initilize broker
	Init(opts ...options.Option) error
	// Options returns broker options
	Options() Options
	// Address return configured address
	Address() string
	// Connect connects to broker
	Connect(ctx context.Context) error
	// Disconnect disconnect from broker
	Disconnect(ctx context.Context) error
	// Publish message, msg can be single broker.Message or []broker.Message
	Publish(ctx context.Context, msg interface{}, opts ...options.Option) error
	// Subscribe subscribes to topic message via handler
	Subscribe(ctx context.Context, topic string, handler interface{}, opts ...options.Option) (Subscriber, error)
	// String type of broker
	String() string
}

Broker is an interface used for asynchronous messaging.

var DefaultBroker Broker = NewBroker()

DefaultBroker default memory broker

func FromContext

func FromContext(ctx context.Context) (Broker, bool)

FromContext returns broker from passed context

type MemoryBroker added in v4.0.6

type MemoryBroker struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(opts ...options.Option) *MemoryBroker

NewBroker return new memory broker

func (*MemoryBroker) Address added in v4.0.6

func (m *MemoryBroker) Address() string

func (*MemoryBroker) Connect added in v4.0.6

func (m *MemoryBroker) Connect(ctx context.Context) error

func (*MemoryBroker) Disconnect added in v4.0.6

func (m *MemoryBroker) Disconnect(ctx context.Context) error

func (*MemoryBroker) Init added in v4.0.6

func (m *MemoryBroker) Init(opts ...options.Option) error

func (*MemoryBroker) Name added in v4.0.6

func (m *MemoryBroker) Name() string

func (*MemoryBroker) Options added in v4.0.6

func (m *MemoryBroker) Options() Options

func (*MemoryBroker) Publish added in v4.0.6

func (m *MemoryBroker) Publish(ctx context.Context, message interface{}, opts ...options.Option) error

func (*MemoryBroker) String added in v4.0.6

func (m *MemoryBroker) String() string

func (*MemoryBroker) Subscribe added in v4.0.6

func (m *MemoryBroker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...options.Option) (Subscriber, error)

type Message

type Message interface {
	// Context for the message
	Context() context.Context
	// Topic
	Topic() string
	// Header returns message headers
	Header() metadata.Metadata
	// Body returns broker message may be []byte slice or some go struct
	Body() interface{}
	// Ack acknowledge message
	Ack() error
	// Error returns message error (like decoding errors or some other)
	// In this case Body contains raw []byte from broker
	Error() error
}

Message is given to a subscription handler for processing

type MessageHandler added in v4.0.6

type MessageHandler func(Message) error

MessageHandler func signature for single message processing

type MessagesHandler added in v4.0.6

type MessagesHandler func([]Message) error

MessagesHandler func signature for batch message processing

type Options

type Options struct {
	// Tracer used for tracing
	Tracer tracer.Tracer
	// Register can be used for clustering
	Register register.Register
	// Codecs holds the codec for marshal/unmarshal
	Codecs map[string]codec.Codec
	// Logger used for logging
	Logger logger.Logger
	// Meter used for metrics
	Meter meter.Meter
	// Context holds external options
	Context context.Context
	// TLSConfig holds tls.TLSConfig options
	TLSConfig *tls.Config
	// ErrorHandler used when broker have error while processing message
	ErrorHandler interface{}
	// Name holds the broker name
	Name string
	// Address holds the broker address
	Address []string
}

Options struct

func NewOptions

func NewOptions(opts ...options.Option) Options

NewOptions create new Options

type PublishOptions

type PublishOptions struct {
	// Context holds external options
	Context context.Context
	// BodyOnly flag says the message contains raw body bytes
	BodyOnly bool
	// Message metadata usually passed as message headers
	Metadata metadata.Metadata
	// Content-Type of message for marshal
	ContentType string
	// Topic destination
	Topic string
}

PublishOptions struct

func NewPublishOptions

func NewPublishOptions(opts ...options.Option) PublishOptions

NewPublishOptions creates PublishOptions struct

type SubscribeOptions

type SubscribeOptions struct {
	// Context holds external options
	Context context.Context
	// ErrorHandler used when broker have error while processing message
	ErrorHandler interface{}
	// QueueGroup holds consumer group
	QueueGroup string
	// AutoAck flag specifies auto ack of incoming message when no error happens
	AutoAck bool
	// BodyOnly flag specifies that message contains only body bytes without header
	BodyOnly bool
	// BatchSize flag specifies max batch size
	BatchSize int
	// BatchWait flag specifies max wait time for batch filling
	BatchWait time.Duration
}

SubscribeOptions struct

func NewSubscribeOptions

func NewSubscribeOptions(opts ...options.Option) SubscribeOptions

NewSubscribeOptions creates new SubscribeOptions

type Subscriber

type Subscriber interface {
	// Options returns subscriber options
	Options() SubscribeOptions
	// Topic returns topic for subscription
	Topic() string
	// Unsubscribe from topic
	Unsubscribe(ctx context.Context) error
}

Subscriber is a convenience return type for the Subscribe method

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL