Documentation ¶
Index ¶
- Variables
- func Marshal(codec encoding.Codec, msg Any) ([]byte, error)
- func Unmarshal(codec encoding.Codec, inputData []byte, outValue interface{}) error
- type Any
- type Binder
- type Broker
- type Event
- type Handler
- type Headers
- type Message
- type Option
- func OptionContextWithValue(k, v interface{}) Option
- func WithAddress(addressList ...string) Option
- func WithCodec(name string) Option
- func WithEnableSecure(enable bool) Option
- func WithErrorHandler(handler Handler) Option
- func WithGlobalPropagator() Option
- func WithGlobalTracerProvider() Option
- func WithOptionContext(ctx context.Context) Option
- func WithPropagator(propagators propagation.TextMapPropagator) Option
- func WithTLSConfig(config *tls.Config) Option
- func WithTracerProvider(provider trace.TracerProvider, tracerName string) Option
- type Options
- type PublishOption
- type PublishOptions
- type RequestOption
- type RequestOptions
- type SubscribeOption
- type SubscribeOptions
- type Subscriber
- type SubscriberMap
- type SubscriberSyncMap
- func (sm *SubscriberSyncMap) Add(topic string, sub Subscriber)
- func (sm *SubscriberSyncMap) Clear()
- func (sm *SubscriberSyncMap) ForceClear()
- func (sm *SubscriberSyncMap) Foreach(fnc func(topic string, sub Subscriber))
- func (sm *SubscriberSyncMap) Get(topic string) Subscriber
- func (sm *SubscriberSyncMap) Remove(topic string) error
- func (sm *SubscriberSyncMap) RemoveOnly(topic string) bool
Constants ¶
This section is empty.
Variables ¶
View Source
var (
DefaultCodec encoding.Codec = nil
)
Functions ¶
Types ¶
type Broker ¶
type Broker interface { Name() string Options() Options Address() string Init(...Option) error Connect() error Disconnect() error Publish(ctx context.Context, topic string, msg Any, opts ...PublishOption) error Subscribe(topic string, handler Handler, binder Binder, opts ...SubscribeOption) (Subscriber, error) Request(ctx context.Context, topic string, msg Any, opts ...RequestOption) (Any, error) }
type Message ¶
func (Message) GetHeaders ¶
type Option ¶
type Option func(*Options)
func OptionContextWithValue ¶
func OptionContextWithValue(k, v interface{}) Option
func WithEnableSecure ¶
func WithErrorHandler ¶
func WithGlobalPropagator ¶
func WithGlobalPropagator() Option
func WithGlobalTracerProvider ¶
func WithGlobalTracerProvider() Option
func WithOptionContext ¶
func WithPropagator ¶
func WithPropagator(propagators propagation.TextMapPropagator) Option
func WithTLSConfig ¶
func WithTracerProvider ¶
func WithTracerProvider(provider trace.TracerProvider, tracerName string) Option
type Options ¶
type Options struct { Addrs []string Codec encoding.Codec ErrorHandler Handler Secure bool TLSConfig *tls.Config Context context.Context Tracings []tracing.Option }
func NewOptions ¶
func NewOptions() Options
func NewOptionsAndApply ¶
type PublishOption ¶
type PublishOption func(*PublishOptions)
func PublishContextWithValue ¶
func PublishContextWithValue(k, v interface{}) PublishOption
func WithPublishContext ¶
func WithPublishContext(ctx context.Context) PublishOption
type PublishOptions ¶
func NewPublishOptions ¶
func NewPublishOptions(opts ...PublishOption) PublishOptions
func (*PublishOptions) Apply ¶
func (o *PublishOptions) Apply(opts ...PublishOption)
type RequestOption ¶
type RequestOption func(*RequestOptions)
func RequestContextWithValue ¶
func RequestContextWithValue(k, v interface{}) RequestOption
func WithRequestContext ¶
func WithRequestContext(ctx context.Context) RequestOption
type RequestOptions ¶
func NewRequestOptions ¶
func NewRequestOptions(opts ...RequestOption) RequestOptions
func (*RequestOptions) Apply ¶
func (o *RequestOptions) Apply(opts ...RequestOption)
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func DisableAutoAck ¶
func DisableAutoAck() SubscribeOption
func SubscribeContextWithValue ¶
func SubscribeContextWithValue(k, v interface{}) SubscribeOption
func WithQueueName ¶
func WithQueueName(name string) SubscribeOption
func WithSubscribeContext ¶
func WithSubscribeContext(ctx context.Context) SubscribeOption
type SubscribeOptions ¶
func NewSubscribeOptions ¶
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions
func (*SubscribeOptions) Apply ¶
func (o *SubscribeOptions) Apply(opts ...SubscribeOption)
type Subscriber ¶
type Subscriber interface { // Options . Options() SubscribeOptions // Topic . Topic() string // Unsubscribe . Unsubscribe(removeFromManager bool) error }
Subscriber .
type SubscriberMap ¶
type SubscriberMap map[string]Subscriber
type SubscriberSyncMap ¶
func NewSubscriberSyncMap ¶
func NewSubscriberSyncMap() *SubscriberSyncMap
func (*SubscriberSyncMap) Add ¶
func (sm *SubscriberSyncMap) Add(topic string, sub Subscriber)
func (*SubscriberSyncMap) Clear ¶
func (sm *SubscriberSyncMap) Clear()
func (*SubscriberSyncMap) ForceClear ¶
func (sm *SubscriberSyncMap) ForceClear()
func (*SubscriberSyncMap) Foreach ¶
func (sm *SubscriberSyncMap) Foreach(fnc func(topic string, sub Subscriber))
func (*SubscriberSyncMap) Get ¶
func (sm *SubscriberSyncMap) Get(topic string) Subscriber
func (*SubscriberSyncMap) Remove ¶
func (sm *SubscriberSyncMap) Remove(topic string) error
func (*SubscriberSyncMap) RemoveOnly ¶
func (sm *SubscriberSyncMap) RemoveOnly(topic string) bool
Click to show internal directories.
Click to hide internal directories.