Documentation ¶
Index ¶
- Variables
- func DefaultDecodeFunc(ctx context.Context, message Message, args interface{}) error
- func DefaultErrorHandler(err error)
- func GetMiddlewareFromContext(ctx context.Context) []middleware.Middleware
- func MiddlewareWithContext(ctx context.Context, list ...middleware.Middleware) context.Context
- type Client
- type ClientOptionFunc
- type Consumer
- type DecodeFunc
- type EncodeFunc
- type ErrorHandler
- type ErrorHandlerFunc
- type HandleFunc
- type Handler
- type Message
- type MessageOption
- type MessageV1
- type Payload
- type Producer
- type Server
- type ServerOptionFunc
- func ServerOptionWithConcurrencyNum(num int32) ServerOptionFunc
- func ServerOptionWithDecodeFunc(f DecodeFunc) ServerOptionFunc
- func ServerOptionWithErrHandleFunc(f ErrorHandlerFunc) ServerOptionFunc
- func ServerOptionWithErrHandler(handler ErrorHandler) ServerOptionFunc
- func ServerOptionWithMiddleware(ms ...middleware.Middleware) ServerOptionFunc
- func ServerOptionWithScaleThreshold(num int32) ServerOptionFunc
- func ServerOptionWithServerName(name string) ServerOptionFunc
- func ServerOptionWithTimeout(duration time.Duration) ServerOptionFunc
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrMessageIsNil = errors.New("message is nil show alloc memory") ErrMessageIsInvalid = errors.New("message is invalid") )
Functions ¶
func DefaultDecodeFunc ¶
func DefaultErrorHandler ¶
func DefaultErrorHandler(err error)
func GetMiddlewareFromContext ¶
func GetMiddlewareFromContext(ctx context.Context) []middleware.Middleware
func MiddlewareWithContext ¶
func MiddlewareWithContext(ctx context.Context, list ...middleware.Middleware) context.Context
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(producer Producer, opts ...ClientOptionFunc) *Client
type ClientOptionFunc ¶
type ClientOptionFunc func(*clientOptions)
func ClientOptionWithEncodeFunc ¶
func ClientOptionWithEncodeFunc(f EncodeFunc) ClientOptionFunc
ClientOptionWithEncodeFunc 中间件
type DecodeFunc ¶
type ErrorHandler ¶
type ErrorHandler interface {
Handle(err error)
}
type ErrorHandlerFunc ¶
type ErrorHandlerFunc func(err error)
func (ErrorHandlerFunc) Handle ¶
func (f ErrorHandlerFunc) Handle(err error)
type HandleFunc ¶
type Message ¶
type Message interface { // Metadata 元数据 Metadata() metadata.Metadata // Payload 荷载信息 Payload() Payload // Err 错误 Err() error // UniKey 消息唯一键 用于消息去重(已经消费的消息不再消费) UniKey() string // Check 检查消息完整性 Check() error // Marshal 序列化消息 Marshal() ([]byte, error) // UnMarshal 解析消息 UnMarshal([]byte) error }
Message 消息定义
func DefaultEncodeFunc ¶
func NewMessage ¶ added in v0.0.2
func NewMessage(payload Payload, opts ...MessageOption) Message
func NewMessageFromByte ¶ added in v0.0.2
type MessageOption ¶ added in v0.0.2
type MessageOption func(x *MessageV1)
func MessageOptionWithMetadata ¶ added in v0.0.2
func MessageOptionWithMetadata(md metadata.Metadata) MessageOption
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func NewServer ¶
func NewServer(ctx context.Context, consumer Consumer, opts ...ServerOptionFunc) *Server
func (*Server) DecodeFunc ¶
func (x *Server) DecodeFunc() DecodeFunc
func (*Server) ErrHandler ¶
func (x *Server) ErrHandler() ErrorHandler
func (*Server) Subscriber ¶
func (x *Server) Subscriber(topic string, channel string, handler Handler, ms ...middleware.Middleware) error
type ServerOptionFunc ¶
type ServerOptionFunc func(*serverOptions)
func ServerOptionWithConcurrencyNum ¶
func ServerOptionWithConcurrencyNum(num int32) ServerOptionFunc
ServerOptionWithConcurrencyNum 并发数
func ServerOptionWithDecodeFunc ¶
func ServerOptionWithDecodeFunc(f DecodeFunc) ServerOptionFunc
ServerOptionWithDecodeFunc 解码函数
func ServerOptionWithErrHandleFunc ¶
func ServerOptionWithErrHandleFunc(f ErrorHandlerFunc) ServerOptionFunc
ServerOptionWithErrHandleFunc 运行错误处理函数
func ServerOptionWithErrHandler ¶
func ServerOptionWithErrHandler(handler ErrorHandler) ServerOptionFunc
ServerOptionWithErrHandler 运行错误处理器
func ServerOptionWithMiddleware ¶
func ServerOptionWithMiddleware(ms ...middleware.Middleware) ServerOptionFunc
ServerOptionWithMiddleware 中间件
func ServerOptionWithScaleThreshold ¶
func ServerOptionWithScaleThreshold(num int32) ServerOptionFunc
ServerOptionWithScaleThreshold 阈值 任务队列长度超过 这个数字 则会增加 go routine
func ServerOptionWithServerName ¶
func ServerOptionWithServerName(name string) ServerOptionFunc
ServerOptionWithServerName 服务名定义
func ServerOptionWithTimeout ¶
func ServerOptionWithTimeout(duration time.Duration) ServerOptionFunc
ServerOptionWithTimeout 超时控制
Source Files ¶
Click to show internal directories.
Click to hide internal directories.