Documentation ¶
Index ¶
- Variables
- func DefaultDecodeFunc[T ITopic](ctx context.Context, bs []byte) (T, error)
- func DefaultEncodeFunc[T ITopic](ctx context.Context, dst T) ([]byte, error)
- func DefaultSubscriberCheckFunc[T ITopic](ctx context.Context, dst T) bool
- func DefaultSubscriberHandleFunc[T ITopic](ctx context.Context, dst T) error
- func Extract(ctx context.Context, p propagation.TextMapPropagator, metadata metadata.MD) (baggage.Baggage, sdktrace.SpanContext)
- func Inject(ctx context.Context, p propagation.TextMapPropagator, metadata metadata.MD)
- type DecodeFunc
- type EncodeFunc
- type Event
- type IMsgBus
- type IPublisher
- type ISharedStorage
- type ISubscriber
- type ITopic
- type ITopicManager
- type MetadataSupplier
- type OTELOption
- type OTELOptions
- type Publisher
- type PublisherOption
- type PublisherOptions
- type Subscriber
- type SubscriberCheckFunc
- type SubscriberHandleFunc
- type SubscriberOption
- type SubscriberOptions
- type TopicManager
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func DefaultDecodeFunc ¶
func DefaultEncodeFunc ¶
func Extract ¶
func Extract(ctx context.Context, p propagation.TextMapPropagator, metadata metadata.MD) ( baggage.Baggage, sdktrace.SpanContext, )
Extract extracts the metadata from ctx.
func Inject ¶
func Inject(ctx context.Context, p propagation.TextMapPropagator, metadata metadata.MD)
Inject injects cross-cutting concerns from the ctx into the metadata.
Types ¶
type IMsgBus ¶
type IMsgBus interface { // Push 推入数据 Push(ctx context.Context, topic string, bs []byte) error // Pop 以阻塞的方式获取数据 // blockTimeout 为 0 则永久阻塞 直到 context 退出 或 数据到达 Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, error) // AddChannel 为 topic 添加 channel AddChannel(ctx context.Context, topic string, channel string) error // RemoveChannel 删除 Channel, channel 下的数据也应该被删除 RemoveChannel(ctx context.Context, topic string, channel string) error // ListChannel 列出 Topic 下所有 Channel ListChannel(ctx context.Context, topic string) ([]string, error) }
type IPublisher ¶
func NewPublisher ¶
func NewPublisher[T ITopic](client IMsgBus, topicManager ITopicManager, otelOptions *OTELOptions, opts ...PublisherOption[T]) IPublisher[T]
type ISharedStorage ¶
type ISubscriber ¶
type ISubscriber[T ITopic] interface { Handle(ctx context.Context) error // Close 取消订阅,这个操作必须成功 Close(ctx context.Context) }
func NewSubscriber ¶
func NewSubscriber[T ITopic](topic, channel string, client IMsgBus, otelOptions *OTELOptions, topicManager ITopicManager, opts ...SubscriberOption[T]) ISubscriber[T]
type ITopicManager ¶
type ITopicManager interface { // Register 注册 Register(ctx context.Context, topic string, channel string, uuid string, ttl time.Duration) error // Unregister 取消注册 Unregister(ctx context.Context, topic string, channel string, uuid string) }
ITopicManager 用于监听 topic channel 当 channel
func NewTopicManager ¶
func NewTopicManager(ctx context.Context, client IMsgBus, cas try_lock.CASCommand, storage ISharedStorage) ITopicManager
type MetadataSupplier ¶
type MetadataSupplier struct {
// contains filtered or unexported fields
}
func NewMetadataSupplier ¶
func NewMetadataSupplier(metadata metadata.MD) *MetadataSupplier
func (*MetadataSupplier) Get ¶
func (s *MetadataSupplier) Get(key string) string
func (*MetadataSupplier) Keys ¶
func (s *MetadataSupplier) Keys() []string
func (*MetadataSupplier) Set ¶
func (s *MetadataSupplier) Set(key, value string)
type OTELOption ¶
type OTELOption func(o *OTELOptions)
func WithOTELOptionPropagator ¶
func WithOTELOptionPropagator(propagator propagation.TextMapPropagator) OTELOption
type OTELOptions ¶
type OTELOptions struct {
// contains filtered or unexported fields
}
func NewOTELOptions ¶
func NewOTELOptions(opts ...OTELOption) *OTELOptions
func (*OTELOptions) ConsumerStartSpan ¶
type PublisherOption ¶
type PublisherOption[T ITopic] func(o *PublisherOptions[T])
func WithEncodeFunc ¶
func WithEncodeFunc[T ITopic](f EncodeFunc[T]) PublisherOption[T]
type PublisherOptions ¶
type PublisherOptions[T ITopic] struct { Encode EncodeFunc[T] }
type Subscriber ¶
type Subscriber[T ITopic] struct { // contains filtered or unexported fields }
func (*Subscriber[T]) Close ¶
func (x *Subscriber[T]) Close(ctx context.Context)
type SubscriberHandleFunc ¶
type SubscriberOption ¶
type SubscriberOption[T ITopic] func(o *SubscriberOptions[T])
func WithCheckFunc ¶
func WithCheckFunc[T ITopic](f SubscriberCheckFunc[T]) SubscriberOption[T]
func WithDecodeFunc ¶
func WithDecodeFunc[T ITopic](f DecodeFunc[T]) SubscriberOption[T]
func WithHandleFunc ¶
func WithHandleFunc[T ITopic](f SubscriberHandleFunc[T]) SubscriberOption[T]
type SubscriberOptions ¶
type SubscriberOptions[T ITopic] struct { HandleEvent SubscriberHandleFunc[T] CheckEvent SubscriberCheckFunc[T] Decode DecodeFunc[T] }
type TopicManager ¶
type TopicManager struct {
// contains filtered or unexported fields
}
func (*TopicManager) Unregister ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.