Versions in this module Expand all Collapse all v1 v1.0.1 May 19, 2024 Changes in this version + var DefaultCodec encoding.Codec = nil + func Marshal(codec encoding.Codec, msg Any) ([]byte, error) + func Unmarshal(codec encoding.Codec, inputData []byte, outValue interface{}) error + type Any interface + type Binder func() Any + type Broker interface + Address func() string + Connect func() error + Disconnect func() error + Init func(...Option) error + Name func() string + Options func() Options + Publish func(ctx context.Context, topic string, msg Any, opts ...PublishOption) error + Subscribe func(topic string, handler Handler, binder Binder, opts ...SubscribeOption) (Subscriber, error) + type Event interface + Ack func() error + Error func() error + Message func() *Message + RawMessage func() interface{} + Topic func() string + type Handler func(ctx context.Context, evt Event) error + type Headers map[string]string + type Message struct + Body Any + Headers Headers + func (m Message) GetHeader(key string) string + func (m Message) GetHeaders() Headers + type Option func(*Options) + func OptionContextWithValue(k, v interface{}) Option + func WithAddress(addressList ...string) Option + func WithCodec(name string) Option + func WithEnableSecure(enable bool) Option + func WithErrorHandler(handler Handler) Option + func WithGlobalPropagator() Option + func WithGlobalTracerProvider() Option + func WithOptionContext(ctx context.Context) Option + func WithPropagator(propagators propagation.TextMapPropagator) Option + func WithTLSConfig(config *tls.Config) Option + func WithTracerProvider(provider trace.TracerProvider, tracerName string) Option + type Options struct + Addrs []string + Codec encoding.Codec + Context context.Context + ErrorHandler Handler + Secure bool + TLSConfig *tls.Config + Tracings []tracing.Option + func NewOptions() Options + func NewOptionsAndApply(opts ...Option) Options + func (o *Options) Apply(opts ...Option) + type PublishOption func(*PublishOptions) + func PublishContextWithValue(k, v interface{}) PublishOption + func WithPublishContext(ctx context.Context) PublishOption + type PublishOptions struct + Context context.Context + func NewPublishOptions(opts ...PublishOption) PublishOptions + func (o *PublishOptions) Apply(opts ...PublishOption) + type SubscribeOption func(*SubscribeOptions) + func DisableAutoAck() SubscribeOption + func SubscribeContextWithValue(k, v interface{}) SubscribeOption + func WithQueueName(name string) SubscribeOption + func WithSubscribeContext(ctx context.Context) SubscribeOption + type SubscribeOptions struct + AutoAck bool + Context context.Context + Queue string + func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions + func (o *SubscribeOptions) Apply(opts ...SubscribeOption) + type Subscriber interface + Options func() SubscribeOptions + Topic func() string + Unsubscribe func(removeFromManager bool) error + func Subscribe[T any](broker Broker, topic string, ...) (Subscriber, error) + type SubscriberMap map[string]Subscriber + type SubscriberSyncMap struct + func NewSubscriberSyncMap() *SubscriberSyncMap + func (sm *SubscriberSyncMap) Add(topic string, sub Subscriber) + func (sm *SubscriberSyncMap) Clear() + func (sm *SubscriberSyncMap) ForceClear() + func (sm *SubscriberSyncMap) Foreach(fnc func(topic string, sub Subscriber)) + func (sm *SubscriberSyncMap) Get(topic string) Subscriber + func (sm *SubscriberSyncMap) Remove(topic string) error + func (sm *SubscriberSyncMap) RemoveOnly(topic string) bool