xmsgbus

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoData      = fmt.Errorf("no data")
	ErrCheckFailed = fmt.Errorf("check failed")
	ErrPopTimeout  = fmt.Errorf("pop timeout")
)

Functions

func DefaultDecodeFunc

func DefaultDecodeFunc[T ITopic](ctx context.Context, bs []byte) (T, error)

func DefaultEncodeFunc

func DefaultEncodeFunc[T ITopic](ctx context.Context, dst T) ([]byte, error)

func DefaultSubscriberCheckFunc

func DefaultSubscriberCheckFunc[T ITopic](ctx context.Context, dst T) bool

func DefaultSubscriberHandleFunc

func DefaultSubscriberHandleFunc[T ITopic](ctx context.Context, dst T) error

func Extract

Extract extracts the metadata from ctx.

func Inject

func Inject(ctx context.Context, p propagation.TextMapPropagator, metadata metadata.MD)

Inject injects cross-cutting concerns from the ctx into the metadata.

Types

type DecodeFunc

type DecodeFunc[T ITopic] func(ctx context.Context, bs []byte) (T, error)

type EncodeFunc

type EncodeFunc[T ITopic] func(ctx context.Context, dst T) ([]byte, error)

type Event

type Event struct {
	Metadata metadata.MD
	Topic    string
	Payload  []byte
}

type IMsgBus

type IMsgBus interface {
	// Push 推入数据
	Push(ctx context.Context, topic string, bs []byte) error
	// Pop 以阻塞的方式获取数据
	// blockTimeout 为 0 则永久阻塞 直到 context 退出 或 数据到达
	Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) (data []byte, ackFn func(), err error)
	// AddChannel 为 topic 添加 channel
	AddChannel(ctx context.Context, topic string, channel string) error
	// RemoveChannel 删除 Channel, channel 下的数据也应该被删除
	RemoveChannel(ctx context.Context, topic string, channel string) error
	// ListChannel 列出 Topic 下所有 Channel
	ListChannel(ctx context.Context, topic string) ([]string, error)
}

type IPublisher

type IPublisher[T ITopic] interface {
	Publish(ctx context.Context, event T) error
}

func NewPublisher

func NewPublisher[T ITopic](client IMsgBus, topicManager ITopicManager, otelOptions *OTELOptions, opts ...PublisherOption[T]) IPublisher[T]

type ISharedStorage

type ISharedStorage interface {
	// SetEx 设置一个 值 ,并且设置它的过期时间
	SetEx(ctx context.Context, key string, value interface{}, ttl time.Duration) error
	// Keys 通过 前缀匹配 列出满足条件的 所有 Key
	Keys(ctx context.Context, prefix string) ([]string, error)
	// Del 删除 key
	Del(ctx context.Context, key string) error
}

type ISubscriber

type ISubscriber[T ITopic] interface {
	Handle(ctx context.Context) error
	// Close 取消订阅,这个操作必须成功
	Close(ctx context.Context)
}

func NewSubscriber

func NewSubscriber[T ITopic](topic, channel string, client IMsgBus, otelOptions *OTELOptions, topicManager ITopicManager, opts ...SubscriberOption[T]) ISubscriber[T]

type ITopic

type ITopic interface {
	Topic() string
}

type ITopicManager

type ITopicManager interface {
	// Register 注册
	Register(ctx context.Context, topic string, channel string, uuid string, ttl time.Duration) error
	// Unregister 取消注册
	Unregister(ctx context.Context, topic string, channel string, uuid string)
}

ITopicManager 用于监听 topic channel 当 channel

func NewTopicManager

func NewTopicManager(ctx context.Context, client IMsgBus, cas try_lock.CASCommand, storage ISharedStorage) ITopicManager

type MetadataSupplier

type MetadataSupplier struct {
	// contains filtered or unexported fields
}

func NewMetadataSupplier

func NewMetadataSupplier(metadata metadata.MD) *MetadataSupplier

func (*MetadataSupplier) Get

func (s *MetadataSupplier) Get(key string) string

func (*MetadataSupplier) Keys

func (s *MetadataSupplier) Keys() []string

func (*MetadataSupplier) Set

func (s *MetadataSupplier) Set(key, value string)

type OTELOption

type OTELOption func(o *OTELOptions)

func WithOTELOptionPropagator

func WithOTELOptionPropagator(propagator propagation.TextMapPropagator) OTELOption

type OTELOptions

type OTELOptions struct {
	// contains filtered or unexported fields
}

func NewOTELOptions

func NewOTELOptions(opts ...OTELOption) *OTELOptions

func (*OTELOptions) ConsumerStartSpan

func (x *OTELOptions) ConsumerStartSpan(ctx context.Context, topic string, attributes ...attribute.KeyValue) (context.Context, trace.Span)

func (*OTELOptions) ProducerStartSpan

func (x *OTELOptions) ProducerStartSpan(ctx context.Context, topic string, attributes ...attribute.KeyValue) (context.Context, trace.Span)

type Publisher

type Publisher[T ITopic] struct {
	// contains filtered or unexported fields
}

func (*Publisher[T]) Publish

func (x *Publisher[T]) Publish(ctx context.Context, event T) error

type PublisherOption

type PublisherOption[T ITopic] func(o *PublisherOptions[T])

func WithEncodeFunc

func WithEncodeFunc[T ITopic](f EncodeFunc[T]) PublisherOption[T]

type PublisherOptions

type PublisherOptions[T ITopic] struct {
	Encode EncodeFunc[T]
}

type Subscriber

type Subscriber[T ITopic] struct {
	// contains filtered or unexported fields
}

func (*Subscriber[T]) Close

func (x *Subscriber[T]) Close(ctx context.Context)

func (*Subscriber[T]) Handle

func (x *Subscriber[T]) Handle(ctx context.Context) (err error)

type SubscriberCheckFunc

type SubscriberCheckFunc[T ITopic] func(ctx context.Context, dst T) bool

type SubscriberHandleFunc

type SubscriberHandleFunc[T ITopic] func(ctx context.Context, dst T) error

type SubscriberOption

type SubscriberOption[T ITopic] func(o *SubscriberOptions[T])

func WithCheckFunc

func WithCheckFunc[T ITopic](f SubscriberCheckFunc[T]) SubscriberOption[T]

func WithDecodeFunc

func WithDecodeFunc[T ITopic](f DecodeFunc[T]) SubscriberOption[T]

func WithHandleFunc

func WithHandleFunc[T ITopic](f SubscriberHandleFunc[T]) SubscriberOption[T]

type SubscriberOptions

type SubscriberOptions[T ITopic] struct {
	HandleEvent SubscriberHandleFunc[T]
	CheckEvent  SubscriberCheckFunc[T]
	Decode      DecodeFunc[T]
}

type TopicManager

type TopicManager struct {
	// contains filtered or unexported fields
}

func (*TopicManager) Register

func (x *TopicManager) Register(ctx context.Context, topic string, channel string, uuid string, ttl time.Duration) error

func (*TopicManager) Unregister

func (x *TopicManager) Unregister(ctx context.Context, topic string, channel string, uuid string)

Directories

Path Synopsis
impl

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL