Documentation ¶
Index ¶
- Constants
- type Client
- type HandlerRegistrar
- type Message
- type MessageHandler
- type MsgHandlerAdapter
- type MsgHandlerSelector
- type NetListenHandler
- type ProcessMessageHandler
- type Queue
- type QueueBroadcastor
- type QueueMessageHandler
- type ReadMessageHandler
- type Server
- type ServerOption
- func WithDefaultQueueSize(defaultQueueSize int) ServerOption
- func WithHeartbeatDuration(heartbeatDuration time.Duration) ServerOption
- func WithMsgHandlerSelector(msgHandlerSelector MsgHandlerSelector) ServerOption
- func WithNetwork(network string) ServerOption
- func WithSocketWriteDeadline(socketWriteDeadline time.Duration) ServerOption
- func WithTopicQueueSize(topicQueueSize map[string]int) ServerOption
- type TimeoutReadWriteCloser
- type TopicQueue
Constants ¶
const ( //请求授权 REQ_AUTH string = "REQ_AUTH" //响应授权 RES_AUTH string = "RES_AUTH" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface { // Close 关闭连接 Close() error // Submit 发起请求,不需要等待服务器返回处理结果(并行安全), // expireAfterNowMillis用于设置消息的过期时间,单位为ms。过期时间 = 服务端接收消息的时间 + expireAfterNowMillis, // 当expireAfterNowMillis=0时,提交的消息无过期时间。 Submit(topic string, payload []byte, expireAfterNowMillis uint32) error // SubmitWithAsyncCallback 发起请求,不等待服务器返回处理结果,通过callback异步处理服务端响应(并行不安全), // expireAfterNowMillis用于设置消息的过期时间,单位为ms。过期时间 = 服务端接收消息的时间 + expireAfterNowMillis, // 当expireAfterNowMillis=0时,提交的消息无过期时间。 // callback 异步处理服务端响应。 SubmitWithAsyncCallback(topic string, payload []byte, expireAfterNowMillis uint32, callback func(msg *common.Msg, occuredErr error)) error // SubmitWaitReply 发起请求,需要等待服务器返回处理结果(并行不安全) // expireAfterNowMillis用于设置消息的过期时间,单位为ms。过期时间 = 服务端接收消息的时间 + expireAfterNowMillis // 当expireAfterNowMillis=0时,提交的消息无过期时间 SubmitWaitReply(topic string, payload []byte, expireAfterNowMillis uint32) (*common.Msg, error) // Query 查询topic对应的消息,不删除消息(并行不安全) Query(topic string, payload []byte) (*common.Msg, error) // QueryPop查询topic对应的消息,并删除消息(并行不安全) QueryPop(topic string, payload []byte) (*common.Msg, error) // QueryStream 发起流请求,通过Receive接收服务器返回处理结果,一个消息只能被一个client查询(并行不安全) QueryStream(topic string, payload []byte) (receive func() (*common.Msg, error), err error) // QueryStreamGroup 发起流请求,通过Receive接收服务器返回处理结果,每个消息可以被多个client查询(并行不安全) QueryStreamGroup(topic string, payload []byte) (receive func() (*common.Msg, error), err error) // Auth 请求授权 Auth(payload []byte) error SetTimeOut(timeout time.Duration) }
func NewClientByConn ¶ added in v1.0.0
func NewClientByConn(conn TimeoutReadWriteCloser, clientId uint32) Client
type HandlerRegistrar ¶
type HandlerRegistrar interface { // RegisterGlobalPreHandler 注册全局的消息处理器,优先执行。 RegisterGlobalPreHandler(handlers ...MessageHandler) // RegisterHandler 具体的消息处理器,在全局的消息处理器后执行。 RegisterHandler(topic string, handlers ...MessageHandler) GetHandlers(topic string) []MessageHandler }
HandlerRegistrar 消息处理器,注册中心。
type MessageHandler ¶
type MessageHandler interface { // OnReceive 处理接收到的消息。 // 如果返回的newMessage为nil,则继续执行后续的Handler。 // 如果newMessage的Type为RESPONSE,则直接将newMessage通知客户端,终止后续handler。 // 如果newMessage不为nil,则将newMessage视为新的消息(或指令),放入队列。 OnReceive(requestMsg *Message) (newMessage *Message) // OnResponse 当服务端向客户端返回消息时,执行。当存在多个MessageHandler时,按照注册时的反序执行。 OnResponse(responseMsg *Message, requestMsg *Message) (replyToClientMsg *Message) }
MessageHandler 消息处理
var DefaultMsgHandler MessageHandler = &defaultMsgHandler{}
DefaultMsgHandler 默认消息处理,什么都不做。
var SuccessResponseMsgHandler MessageHandler = &successResponseMsgHandler{}
SuccessResponseMsgHandler POST请求,默认返回成功。
func NewSimpleAuthMsgHandler ¶
func NewSimpleAuthMsgHandler(authorizeLogic func(msg *Message) bool, authCodeGen func(msg *Message) string) MessageHandler
NewSimpleAuthMsgHandler 授权设备接入。 创建设备接入授权Handler(简单版本,可仿造该版本随意定制)。 authorizeLogic:决定是否授权。 authCodeGen:生成授权码。
type MsgHandlerAdapter ¶ added in v1.1.0
type MsgHandlerAdapter struct { }
MsgHandlerAdapter 实现了默认的OnResponse方法。
func (MsgHandlerAdapter) OnResponse ¶ added in v1.1.0
func (h MsgHandlerAdapter) OnResponse(responseMsg *Message, requestMsg *Message) (replyToClientMsg *Message)
type MsgHandlerSelector ¶ added in v1.1.1
type NetListenHandler ¶
type NetListenHandler struct { flowprocess.TaskHandlerAdapter // contains filtered or unexported fields }
Node0,监听端口
func (*NetListenHandler) Handle ¶
func (h *NetListenHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error)
type ProcessMessageHandler ¶
type ProcessMessageHandler struct { flowprocess.TaskHandlerAdapter // contains filtered or unexported fields }
Node2 处理和回复消息
func (*ProcessMessageHandler) Handle ¶
func (h *ProcessMessageHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) error
type Queue ¶
type Queue interface { Len() int PushFront(msg *Message) PushBack(msg *Message) PopFront() (*Message, bool) PopBack() (*Message, bool) Front() (*Message, bool) Back() (*Message, bool) Wait() <-chan struct{} Size() int }
func NewListQueue ¶ added in v1.1.0
func NewOrderQueue ¶ added in v1.1.0
type QueueBroadcastor ¶ added in v1.1.0
type QueueBroadcastor struct {
// contains filtered or unexported fields
}
QueueBroadcastor 将sourceQueue里的数据,广播给多个接收者。例如: sourceQueue --msgs(goroutine0)--|---msgs---> [splitQueue1] --msgs(goroutine1)--> broadcast to Target1
|---msgs---> [splitQueue2] --msgs(goroutine2)--> broadcast to Target2 |---msgs---> [splitQueue3] --msgs(goroutine3)--> broadcast to Target3
func NewQueueBroadcastor ¶ added in v1.1.0
type QueueMessageHandler ¶
type QueueMessageHandler struct { flowprocess.TaskHandlerAdapter // contains filtered or unexported fields }
Node3 将消息放入队列, 单goroutine运行,一般不用加锁
func NewQueueMessageHandler ¶ added in v1.1.0
func NewQueueMessageHandler(server *server) *QueueMessageHandler
func (*QueueMessageHandler) Handle ¶
func (h *QueueMessageHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) error
type ReadMessageHandler ¶
type ReadMessageHandler struct { flowprocess.TaskHandlerAdapter // contains filtered or unexported fields }
Node1,组装消息
func (*ReadMessageHandler) Handle ¶
func (h *ReadMessageHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error)
type Server ¶
type Server interface { HandlerRegistrar Start() error // Submit 在Server启动后,直接提交消息 Submit(msg *common.Msg) (*common.Msg, error) Close() error }
func NewServer ¶
func NewServer(address string, options ...ServerOption) Server
type ServerOption ¶
type ServerOption func(s *server)
func WithDefaultQueueSize ¶
func WithDefaultQueueSize(defaultQueueSize int) ServerOption
WithDefaultQueueSize 配置默认的topicQueueSize
func WithHeartbeatDuration ¶ added in v1.0.0
func WithHeartbeatDuration(heartbeatDuration time.Duration) ServerOption
WithHeartbeatDuration 心跳周期
func WithMsgHandlerSelector ¶ added in v1.1.1
func WithMsgHandlerSelector(msgHandlerSelector MsgHandlerSelector) ServerOption
func WithNetwork ¶ added in v1.1.0
func WithNetwork(network string) ServerOption
WithNetwork 必须是 "tcp", "tcp4", "tcp6", "unix", "unixpacket".
func WithSocketWriteDeadline ¶
func WithSocketWriteDeadline(socketWriteDeadline time.Duration) ServerOption
WithSocketWriteDeadline 配置端口超时
func WithTopicQueueSize ¶
func WithTopicQueueSize(topicQueueSize map[string]int) ServerOption
WithTopicQueueSize 配置topicQueueSize