Documentation
¶
Index ¶
- Constants
- Variables
- func EncodeBrokerMessage(message Message) []byte
- func EncodeProtoWithContext(ctx context.Context, msg proto.Message) []byte
- func MakeWrappedOpener(opener URLOpener) func(w OpenWrapper) controller.Opener[AsyncQueuePool]
- func MustPublish(ctx context.Context, topic string, message proto.Message, ...)
- func Publish(ctx context.Context, topic string, message proto.Message, ...) error
- func PublishRaw(ctx context.Context, topic string, body []byte, header map[string]string, ...) error
- func Register(b Broker)
- func SubscribeCancellable(ctx context.Context, topic string, handler SubscriberHandler, ...) error
- type AsyncQueue
- type AsyncQueuePool
- type Broker
- type Consumer
- type Message
- type MessageQueue
- type OpenWrapper
- type Option
- type Options
- type PublishOption
- type PublishOptions
- type Publisher
- type PublisherInterceptor
- type SubscribeOpener
- type SubscribeOption
- func Queue(name string) SubscribeOption
- func SubscribeContext(ctx context.Context) SubscribeOption
- func WithAsyncQueuePool(queuePool AsyncQueuePool, data map[string]interface{}) SubscribeOption
- func WithCallee(file string, line int) SubscribeOption
- func WithCounterName(n string) SubscribeOption
- func WithErrorHandler(h SubscriptionErrorHandler) SubscribeOption
- type SubscribeOptions
- type SubscriberHandler
- type SubscriberInterceptor
- type SubscriptionErrorHandler
- type TopicOpener
- type TypeWithContext
- type URLOpener
- type UnSubscriber
Constants ¶
const ( OpenerFuncKey = "opener" OpenerIDKey = "openerID" )
Variables ¶
var (
ContextKey = brokerKey{}
)
Functions ¶
func EncodeBrokerMessage ¶
EncodeBrokerMessage just joins on the md+bytes raw data from a broker message
func EncodeProtoWithContext ¶
EncodeProtoWithContext combines json-encoded context metadata and marshalled proto.Message into a unique []byte
func MakeWrappedOpener ¶
func MakeWrappedOpener(opener URLOpener) func(w OpenWrapper) controller.Opener[AsyncQueuePool]
func MustPublish ¶
MustPublish publishes a message ignoring the error
func Publish ¶
Publish sends a message to standard broker. For the moment, forward message to client.Publish
func PublishRaw ¶
func PublishRaw(ctx context.Context, topic string, body []byte, header map[string]string, opts ...PublishOption) error
PublishRaw sends a message to standard broker. For the moment, forward message to client.Publish
func SubscribeCancellable ¶
func SubscribeCancellable(ctx context.Context, topic string, handler SubscriberHandler, opts ...SubscribeOption) error
Types ¶
type AsyncQueue ¶
type AsyncQueuePool ¶
type AsyncQueuePool interface { openurl.Resolver[AsyncQueue] Del(context.Context, ...map[string]interface{}) (bool, error) }
func NewWrappedPool ¶
func NewWrappedPool(rootURL string, wrapper func(w OpenWrapper) controller.Opener[AsyncQueuePool]) (AsyncQueuePool, error)
type Broker ¶
type Broker interface { PublishRaw(context.Context, string, []byte, map[string]string, ...PublishOption) error Publish(context.Context, string, proto.Message, ...PublishOption) error Subscribe(context.Context, string, SubscriberHandler, ...SubscribeOption) (UnSubscriber, error) }
type Message ¶
type Message interface { Unmarshal(ctx context.Context, target proto.Message) (context.Context, error) RawData() (map[string]string, []byte) }
func DecodeToBrokerMessage ¶
DecodeToBrokerMessage tries to parse a combination of json-encoded metadata and a marshalled protobuf
type MessageQueue ¶
type OpenWrapper ¶
type OpenWrapper func(q AsyncQueue) (AsyncQueue, error)
type Option ¶
type Option func(*Options)
Option definition
func BeforeDisconnect ¶
BeforeDisconnect registers all functions to be triggered before the broker disconnect
func WithChainPublisherInterceptor ¶
func WithChainPublisherInterceptor(interceptors ...PublisherInterceptor) Option
WithChainPublisherInterceptor synchronously modifies the pushed messages
func WithChainSubscriberInterceptor ¶
func WithChainSubscriberInterceptor(interceptors ...SubscriberInterceptor) Option
WithChainSubscriberInterceptor synchronously modifies the received messages
func WithContext ¶
type PublishOption ¶
type PublishOption func(options *PublishOptions)
func PublishContext ¶
func PublishContext(ctx context.Context) PublishOption
func WithAsyncPublisherInterceptor ¶
func WithAsyncPublisherInterceptor(queueURL string, fallbackURLs ...string) PublishOption
WithAsyncPublisherInterceptor registers a FIFO-like queue to intercept the pushed messages and send them asynchronously after pre-processing
type PublishOptions ¶
PublishOptions holds various configurations for publishing
type PublisherInterceptor ¶
type SubscribeOpener ¶
type SubscribeOpener func(context.Context, string, ...SubscribeOption) (*pubsub.Subscription, error)
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func Queue ¶
func Queue(name string) SubscribeOption
Queue sets the name of the queue to share messages on
func SubscribeContext ¶
func SubscribeContext(ctx context.Context) SubscribeOption
SubscribeContext set context
func WithAsyncQueuePool ¶
func WithAsyncQueuePool(queuePool AsyncQueuePool, data map[string]interface{}) SubscribeOption
WithAsyncQueuePool adapts the subscriber with a queue If many queues are appended, they are tried in the same order (fallback)
func WithCallee ¶
func WithCallee(file string, line int) SubscribeOption
WithCallee adds a custom id for metrics counter name
func WithCounterName ¶
func WithCounterName(n string) SubscribeOption
WithCounterName adds a custom id for metrics counter name
func WithErrorHandler ¶
func WithErrorHandler(h SubscriptionErrorHandler) SubscribeOption
WithErrorHandler sets an ErrorHandler to catch all broker errors that cant be handled in normal way, for example Codec errors
type SubscribeOptions ¶
type SubscribeOptions struct { // Subscribers with the same queue name // will create a shared subscription where each // receives a subset of messages. Queue string // Other options for implementations of the interface // can be stored in a context Context context.Context // Optional MessageQueue than can debounce/persist // received messages and re-process them later on. // They should be dynamically opened based on the input context AsyncQueuePool []struct { AsyncQueuePool AsyncQueuePool Resolution map[string]interface{} } // Optional name for metrics CounterName string // Hints used when handling errors with default error handler TopicName string CalleeFile string CalleeLine int // contains filtered or unexported fields }
SubscribeOptions holds configuration for subscribers
func (*SubscribeOptions) HandleError ¶
func (o *SubscribeOptions) HandleError(ctx context.Context, err error) error
HandleError tries to log an error during a handler execution. If err is nil it is ignored. If options have a errorHandler, it is applied and then it returns
func (*SubscribeOptions) HandleErrorToString ¶
func (o *SubscribeOptions) HandleErrorToString(err error) string
type SubscriberInterceptor ¶
type SubscriberInterceptor func(ctx context.Context, msg Message, handler SubscriberHandler) error
func ContextInjectorInterceptor ¶
func ContextInjectorInterceptor() SubscriberInterceptor
func HeaderInjectorInterceptor ¶
func HeaderInjectorInterceptor() SubscriberInterceptor
func TimeoutSubscriberInterceptor ¶
func TimeoutSubscriberInterceptor() SubscriberInterceptor
type SubscriptionErrorHandler ¶
type SubscriptionErrorHandler func(context.Context, *SubscribeOptions, error) error
type TypeWithContext ¶
TypeWithContext composes a generic type and a context
type UnSubscriber ¶
type UnSubscriber func() error
func Subscribe ¶
func Subscribe(root context.Context, topic string, handler SubscriberHandler, opts ...SubscribeOption) (UnSubscriber, error)