broker

package
v5.0.0-...-4117378 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2025 License: AGPL-3.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OpenerFuncKey = "opener"
	OpenerIDKey   = "openerID"
)

Variables

View Source
var (
	ContextKey = brokerKey{}
)

Functions

func EncodeBrokerMessage

func EncodeBrokerMessage(message Message) []byte

EncodeBrokerMessage just joins on the md+bytes raw data from a broker message

func EncodeProtoWithContext

func EncodeProtoWithContext(ctx context.Context, msg proto.Message) []byte

EncodeProtoWithContext combines json-encoded context metadata and marshalled proto.Message into a unique []byte

func MakeWrappedOpener

func MakeWrappedOpener(opener URLOpener) func(w OpenWrapper) controller.Opener[AsyncQueuePool]

func MustPublish

func MustPublish(ctx context.Context, topic string, message proto.Message, opts ...PublishOption)

MustPublish publishes a message ignoring the error

func Publish

func Publish(ctx context.Context, topic string, message proto.Message, opts ...PublishOption) error

Publish sends a message to standard broker. For the moment, forward message to client.Publish

func PublishRaw

func PublishRaw(ctx context.Context, topic string, body []byte, header map[string]string, opts ...PublishOption) error

PublishRaw sends a message to standard broker. For the moment, forward message to client.Publish

func Register

func Register(b Broker)

func SubscribeCancellable

func SubscribeCancellable(ctx context.Context, topic string, handler SubscriberHandler, opts ...SubscribeOption) error

Types

type AsyncQueue

type AsyncQueue interface {
	MessageQueue
	Push(ctx context.Context, msg proto.Message) error
}

type AsyncQueuePool

type AsyncQueuePool interface {
	openurl.Resolver[AsyncQueue]
	Del(context.Context, ...map[string]interface{}) (bool, error)
}

func NewWrappedPool

func NewWrappedPool(rootURL string, wrapper func(w OpenWrapper) controller.Opener[AsyncQueuePool]) (AsyncQueuePool, error)

type Broker

func Default

func Default() Broker

func NewBroker

func NewBroker(s string, opts ...Option) Broker

NewBroker wraps a standard broker but prevents it from disconnecting while there still is a service running

type Consumer

type Consumer func(...Message)

type Message

type Message interface {
	Unmarshal(ctx context.Context, target proto.Message) (context.Context, error)
	RawData() (map[string]string, []byte)
}

func DecodeToBrokerMessage

func DecodeToBrokerMessage(msg []byte) (Message, error)

DecodeToBrokerMessage tries to parse a combination of json-encoded metadata and a marshalled protobuf

type MessageQueue

type MessageQueue interface {
	Consume(callback func(context.Context, ...Message)) error
	PushRaw(ctx context.Context, message Message) error
	Close(ctx context.Context) error
}

type OpenWrapper

type OpenWrapper func(q AsyncQueue) (AsyncQueue, error)

type Option

type Option func(*Options)

Option definition

func BeforeDisconnect

func BeforeDisconnect(f func() error) Option

BeforeDisconnect registers all functions to be triggered before the broker disconnect

func WithChainPublisherInterceptor

func WithChainPublisherInterceptor(interceptors ...PublisherInterceptor) Option

WithChainPublisherInterceptor synchronously modifies the pushed messages

func WithChainSubscriberInterceptor

func WithChainSubscriberInterceptor(interceptors ...SubscriberInterceptor) Option

WithChainSubscriberInterceptor synchronously modifies the received messages

func WithContext

func WithContext(ctx context.Context) Option

type Options

type Options struct {
	Context context.Context
	// contains filtered or unexported fields
}

Options to the broker

type PublishOption

type PublishOption func(options *PublishOptions)

func PublishContext

func PublishContext(ctx context.Context) PublishOption

func WithAsyncPublisherInterceptor

func WithAsyncPublisherInterceptor(queueURL string, fallbackURLs ...string) PublishOption

WithAsyncPublisherInterceptor registers a FIFO-like queue to intercept the pushed messages and send them asynchronously after pre-processing

type PublishOptions

type PublishOptions struct {
	Context context.Context

	MessageQueueURLs []string
}

PublishOptions holds various configurations for publishing

type Publisher

type Publisher func(ctx context.Context, msg *pubsub.Message) error

type PublisherInterceptor

type PublisherInterceptor func(ctx context.Context, msg *pubsub.Message, publisher Publisher) error

type SubscribeOpener

type SubscribeOpener func(context.Context, string, ...SubscribeOption) (*pubsub.Subscription, error)

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func Queue

func Queue(name string) SubscribeOption

Queue sets the name of the queue to share messages on

func SubscribeContext

func SubscribeContext(ctx context.Context) SubscribeOption

SubscribeContext set context

func WithAsyncQueuePool

func WithAsyncQueuePool(queuePool AsyncQueuePool, data map[string]interface{}) SubscribeOption

WithAsyncQueuePool adapts the subscriber with a queue If many queues are appended, they are tried in the same order (fallback)

func WithCallee

func WithCallee(file string, line int) SubscribeOption

WithCallee adds a custom id for metrics counter name

func WithCounterName

func WithCounterName(n string) SubscribeOption

WithCounterName adds a custom id for metrics counter name

func WithErrorHandler

func WithErrorHandler(h SubscriptionErrorHandler) SubscribeOption

WithErrorHandler sets an ErrorHandler to catch all broker errors that cant be handled in normal way, for example Codec errors

type SubscribeOptions

type SubscribeOptions struct {
	// Subscribers with the same queue name
	// will create a shared subscription where each
	// receives a subset of messages.
	Queue string

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context

	// Optional MessageQueue than can debounce/persist
	// received messages and re-process them later on.
	// They should be dynamically opened based on the input context
	AsyncQueuePool []struct {
		AsyncQueuePool AsyncQueuePool
		Resolution     map[string]interface{}
	}

	// Optional name for metrics
	CounterName string

	// Hints used when handling errors with default error handler
	TopicName  string
	CalleeFile string
	CalleeLine int
	// contains filtered or unexported fields
}

SubscribeOptions holds configuration for subscribers

func (*SubscribeOptions) HandleError

func (o *SubscribeOptions) HandleError(ctx context.Context, err error) error

HandleError tries to log an error during a handler execution. If err is nil it is ignored. If options have a errorHandler, it is applied and then it returns

func (*SubscribeOptions) HandleErrorToString

func (o *SubscribeOptions) HandleErrorToString(err error) string

type SubscriberHandler

type SubscriberHandler func(ctx context.Context, msg Message) error

type SubscriberInterceptor

type SubscriberInterceptor func(ctx context.Context, msg Message, handler SubscriberHandler) error

func ContextInjectorInterceptor

func ContextInjectorInterceptor() SubscriberInterceptor

func HeaderInjectorInterceptor

func HeaderInjectorInterceptor() SubscriberInterceptor

func TimeoutSubscriberInterceptor

func TimeoutSubscriberInterceptor() SubscriberInterceptor

type SubscriptionErrorHandler

type SubscriptionErrorHandler func(context.Context, *SubscribeOptions, error) error

type TopicOpener

type TopicOpener func(context.Context, string) (*pubsub.Topic, error)

type TypeWithContext

type TypeWithContext[T any] struct {
	Original T
	Ctx      context.Context
}

TypeWithContext composes a generic type and a context

type URLOpener

type URLOpener interface {
	OpenURL(ctx context.Context, u *url.URL) (AsyncQueue, error)
}

type UnSubscriber

type UnSubscriber func() error

func Subscribe

func Subscribe(root context.Context, topic string, handler SubscriberHandler, opts ...SubscribeOption) (UnSubscriber, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL