pubsub

package module
v0.0.0-...-8887b7f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: Apache-2.0 Imports: 12 Imported by: 0

README

PubSub

本项目的灵感来源于: lileio/pubsub, 吸取了其动态方法以及中间件的思路并提供配置的支持以应用不同的消息队列所需的配置.

支持的消息队列:

  • RocketMq生态: AliYun RocketMQ V4; Apache RocketMQ V4

TODO:

  • 事务消息处理.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMissHandler = errors.New("handler cannot be nil")
)

Functions

func Logger

func Logger() log.ComponentLogger

Logger returns the logger for the package.

func RegisterProvider

func RegisterProvider(name string, builder providerBuildFunc)

RegisterProvider register a provider builder.

func SetLogger

func SetLogger(l log.ComponentLogger)

SetLogger sets the logger for the package.

Types

type Client

type Client struct {
	ServiceName string
	Provider    Provider
	Middleware  []Middleware
}

func New

func New(cfg *conf.Configuration) (*Client, error)

New creates a new pubsub client.

func (*Client) On

func (c *Client) On(opts HandlerOptions) error

On registers a handler function for processing messages. This function uses reflection to verify the correctness of the handler's signature and implements the necessary conversions and validations.

func (*Client) OnRaw

func (c *Client) OnRaw(opts HandlerOptions) error

OnRaw register a HandlerOptions with MessageHandler function for processing messages.

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, opts PublishOptions, data any) (err error)

func (*Client) Stop

func (c *Client) Stop(ctx context.Context) error

type Handler

type Handler any

Handler is custom callback for subscriber. The handler function format is. func(ctx context.Context, custom CustomModel,m *Message) error

type HandlerOptions

type HandlerOptions struct {
	Name string
	// ServiceName identify the service of Provider, will use this to find the service or Config
	ServiceName string
	// The function to invoke
	Handler Handler
	// Decode JSON objects from message body instead of protobuf.
	JSON bool
}

HandlerOptions is the options for handler

type LoggerMiddleware

type LoggerMiddleware struct {
	Logger log.ComponentLogger
}

func (LoggerMiddleware) PublishInterceptor

func (o LoggerMiddleware) PublishInterceptor(ctx context.Context, serviceName string, next PublishHandler) PublishHandler

func (LoggerMiddleware) SubscribeInterceptor

func (o LoggerMiddleware) SubscribeInterceptor(opts *HandlerOptions, next MessageHandler) MessageHandler

type Message

type Message struct {
	ID          string
	Metadata    map[string]string
	Data        []byte
	PublishTime *time.Time
}

Message is the message struct

type MessageHandler

type MessageHandler func(ctx context.Context, m *Message) error

MessageHandler is the internal or raw message handler

type Middleware

type Middleware interface {
	SubscribeInterceptor(opts *HandlerOptions, next MessageHandler) MessageHandler
	PublishInterceptor(ctx context.Context, serviceName string, next PublishHandler) PublishHandler
}

Middleware is an interface to provide subscriber and publisher interceptors

type Provider

type Provider interface {
	woocoo.Server
	// GetMQType get the type name of message queue provider
	GetMQType() string
	Publish(ctx context.Context, opts PublishOptions, m *Message) error
	Subscribe(opts HandlerOptions, handler MessageHandler) error
}

Provider is an interface for message queue provider. this is also a wrapper for the sdk of mq officer client.

type PublishHandler

type PublishHandler func(ctx context.Context, m *Message) error

PublishHandler wraps a call to publish, for interception

type PublishOptions

type PublishOptions struct {
	// ServiceName identify the service of Provider, will use this to find the service or Config
	ServiceName string
	// Metadata is a key-value pair to pass to the Provider
	Metadata map[string]string
	// Decode JSON objects from message body instead of protobuf.
	JSON bool
}

type RecoveryHandlerFunc

type RecoveryHandlerFunc func(p interface{}) (err error)

RecoveryHandlerFunc is a function that recovers from the panic `p` by returning an `error`.

type RecoveryMiddleware

type RecoveryMiddleware struct {
	RecoveryHandlerFunc RecoveryHandlerFunc
}

RecoveryMiddleware is middleware for recovering from panics

func (RecoveryMiddleware) PublishInterceptor

func (o RecoveryMiddleware) PublishInterceptor(ctx context.Context, serviceName string, next PublishHandler) PublishHandler

PublishInterceptor adds recovery to the publisher

func (RecoveryMiddleware) SubscribeInterceptor

func (o RecoveryMiddleware) SubscribeInterceptor(opts *HandlerOptions, next MessageHandler) MessageHandler

SubscribeInterceptor returns a subscriber middleware with added logging via Zap

Directories

Path Synopsis
contrib
mock/rocketmq Module
provider

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL