broker

package
v4.4.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: AGPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustPublish

func MustPublish(ctx context.Context, topic string, message proto.Message, opts ...PublishOption)

MustPublish publishes a message ignoring the error

func Publish

func Publish(ctx context.Context, topic string, message proto.Message, opts ...PublishOption) error

Publish sends a message to standard broker. For the moment, forward message to client.Publish

func PublishRaw

func PublishRaw(ctx context.Context, topic string, body []byte, header map[string]string, opts ...PublishOption) error

PublishRaw sends a message to standard broker. For the moment, forward message to client.Publish

func Register

func Register(b Broker)

func SubscribeCancellable

func SubscribeCancellable(ctx context.Context, topic string, handler SubscriberHandler, opts ...SubscribeOption) error

Types

type Broker

func Default

func Default() Broker

func NewBroker

func NewBroker(s string, opts ...Option) Broker

NewBroker wraps a standard broker but prevents it from disconnecting while there still is a service running

type Message

type Message interface {
	Unmarshal(target proto.Message) (context.Context, error)
	RawData() (map[string]string, []byte)
}

type MessageQueue added in v4.2.6

type MessageQueue interface {
	Consume(callback func(...Message)) error
	PushRaw(ctx context.Context, message Message) error
}

type Option

type Option func(*Options)

Option definition

func BeforeDisconnect

func BeforeDisconnect(f func() error) Option

BeforeDisconnect registers all functions to be triggered before the broker disconnect

func WithContext

func WithContext(ctx context.Context) Option

type Options

type Options struct {
	Context context.Context
	// contains filtered or unexported fields
}

Options to the broker

type PublishOption

type PublishOption func(options *PublishOptions)

func PublishContext

func PublishContext(ctx context.Context) PublishOption

type PublishOptions

type PublishOptions struct {
	Context context.Context
}

type SubscribeOpener

type SubscribeOpener func(string, ...SubscribeOption) (*pubsub.Subscription, error)

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func HandleError

func HandleError(h func(error)) SubscribeOption

HandleError sets an ErrorHandler to catch all broker errors that cant be handled in normal way, for example Codec errors

func Queue

func Queue(name string) SubscribeOption

Queue sets the name of the queue to share messages on

func SubscribeContext

func SubscribeContext(ctx context.Context) SubscribeOption

SubscribeContext set context

func WithCounterName added in v4.2.6

func WithCounterName(n string) SubscribeOption

WithCounterName adds a custom id for metrics counter name

func WithLocalQueue added in v4.2.6

func WithLocalQueue(q MessageQueue) SubscribeOption

WithLocalQueue passes a FIFO queue to absorb input

type SubscribeOptions

type SubscribeOptions struct {
	// Handler executed when errors occur processing messages
	ErrorHandler func(error)

	// Subscribers with the same queue name
	// will create a shared subscription where each
	// receives a subset of messages.
	Queue string

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context

	// Optional MessageQueue than can debounce/persist
	// received messages and re-process them later on
	MessageQueue MessageQueue

	// Optional name for metrics
	CounterName string
}

type SubscriberHandler

type SubscriberHandler func(Message) error

type TopicOpener

type TopicOpener func(context.Context, string) (*pubsub.Topic, error)

type UnSubscriber

type UnSubscriber func() error

func Subscribe

func Subscribe(ctx context.Context, topic string, handler SubscriberHandler, opts ...SubscribeOption) (UnSubscriber, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL