gstatem
一、前言
Golang
语言初学者, 习惯了写 Erlang
程序, 想试试使用 Golang
能否实现相似的流程
为什么?
通常 Go Routine
用于单个功能实现, 一个任务会根据功能类别拆分成多个 Go Routine
同步进行逻辑处理
特别是 http
服务是无状态的, 一次 http
请求可能会拆成多个 Go Routine
, 完成任务后通过 context
维护这些协程的生命周期
而下次相同或类似的请求, 会把之前的步骤再来一遍, 但如果是做某些实时共享状态和数据的服务可能就不太适合?
比如游戏服务很多时候有玩家或房间等数据状态需要持续读写和同步, 并用状态来控制游戏的流程和限制
至此参照 Erlang OTP gen_statem
状态机进程服务思路, 编写了 Golang gstatem
区别?
状态机服务协程可以为某个上下文缓存数据和状态, 根据收到的 消息类型
、消息内容
和 协程当前的状态
更新缓存的数据
无需频繁的读写数据库, 比如某个游戏的用户在很多情况下数据是临时的, 而到了某个状态用户的部分数据才需要做入库持久化
又比如用户可以有自己的协程和n个业务逻辑协程, 用户协程用于维护用户数据, 业务协程用于维护该用户的业务逻辑
特色?
无论是 Erlang OTP gen_statem
还是 Golang gstatem
, 相较于 Golang
的多个 Go Routine
同步进行, 前者是线性的
Erlang OTP gen_statem
和 Golang gstatem
是 actor model
, 可以理解为一个邮箱, 每次都是根据先来后到的顺序规则一个个读取和处理邮件
数据的处理只需要读取已缓存的上下文的部分数据即可, 而无需频繁读取数据库和做重复的前置流程
和 Erlang OTP gen_statem
一样, 为协程提供同步异步等通信方案, 同时可以控制协程携带的数据和状态
并且每次处理完数据后, 可以根据业务需求决定上下文接下来处于什么状态, 多久后进入什么状态, 从而控制业务流程
二、例子
参考用于调试的 gstatem_example 仓库
三、使用
go get gitee.com/gelomen/gstatem
四、父级上下文
先创建属于整个程序的顶级上下文: topCtx
, 并创建属于某类服务的父级上下文: fooParentCtx
例:
package main
func main() {
// 启动顶级上下文
topCtx, topCancel := context.WithCancel(context.Background())
// 启动 foo server 父级上下文
fooParentCtx, fooParentCancel := foo_server.StartServerParent(topCtx)
...
}
创建某类型服务的父级 context
时, 定义用于处理服务的函数:
Name
, 该类服务父级名字
InitFunc
, 初始化函数, 用于自定义该类服务的初始化数据
CastFunc
, 异步消息处理函数, 用于自定义该类服务处理 gstatem.Cast
消息
CallFunc
, 同步消息处理函数, 用于自定义该类服务处理 gstatem.Call
消息
InfoFunc
, 同步消息处理函数, 用于自定义该类服务处理 gstatem.Info
消息
StateTimeoutFunc
, 状态超时消息处理函数, 用于自定义该类服务 Action
状态超时消息
TerminateFunc
, 终结函数, 用于自定义处理协程关闭是的清理工作
例:
package foo_server
// StartServerParent 启动 foo server 父级上下文
func StartServerParent(parentCtx context.Context) (context.Context, context.CancelFunc) {
ParentData := gstatem.ParentData{
Name: "FooServer",
InitFunc: HandleInit,
CastFunc: HandleCast,
CallFunc: HandleCall,
InfoFunc: HandleInfo,
StateTimeoutFunc: HandleStateTimeout,
TerminateFunc: HandleTerminate,
}
return gstatem.ParentStart(parentCtx, ParentData)
}
五、子级上下文
创建某类型服务子级上下文和协程
- 根据实际需求, 自定义传入该子级上下文的初始化数据
- 初始化
gstatem.StateData
, 传入初始化数据和协程通道大小
- 自定义协程通道大小必须大于
0
, 否则使用默认值 1000
- 调用该函数后, 若第三个返回值返回
true
, 则代表协程创建成功, 并返回该子级上下文和子级取消函数
例:
package foo_server
// StartServer 启动服务
func StartServer(parentCtx context.Context, name string) (context.Context, context.CancelFunc, bool) {
initData := initDataS{fooName: name}
statemData := &gstatem.Data{
InitData: initData,
MsgChanSize: 2000,
}
return gstatem.ChildStart(parentCtx, statemData)
}
六、Context / Cancel 存储
每个上下文有属于自己的 n 个上下文, 这就需要一个全局变量来做上下文存储, 以便在每次处理请求时能复用已经创建过的上下文和协程
key
为属于该服务器类型和唯一标识组成的任意类型
1. 保存
传入 key
, 以及已经创建的子级上下文和子级取消函数
package gstatem_context
// Set 保存某个唯一标识的上下文存储信息
func Set(key any, context context.Context, cancel context.CancelFunc) {
ctxData := contextData{
context: context,
cancel: cancel,
}
contextStore.Store(key, ctxData)
}
2. 删除
传入 key
package gstatem_context
// Del 删除某个唯一标识的上下文存储信息
func Del(key any) {
contextStore.Delete(key)
}
3. 同时获取
传入 key
, 返回上下文、取消函数 和 是否存在字段
// Get 同时获取某个唯一标识的 context.Context 和 context.CancelFunc
func Get(key any) (context.Context, context.CancelFunc, bool) {
if ctxDataAny, ok := contextStore.Load(key); ok {
ctxData := toContextData(ctxDataAny)
ctx := ctxData.context
cancel := ctxData.cancel
return ctx, cancel, true
}
return nil, nil, false
}
4. 获取 Context
传入 key
, 返回上下文和是否存在的字段
// Context 获取某个唯一标识的 context.Context
func Context(key any) (context.Context, bool) {
if ctxDataAny, ok := contextStore.Load(key); ok {
ctxData := toContextData(ctxDataAny)
ctx := ctxData.context
return ctx, true
}
return nil, false
}
5. 获取 CancelFunc
传入 key
, 返回取消函数和是否存在的字段
// Cancel 获取某个唯一标识的 context.CancelFunc
func Cancel(key any) (context.CancelFunc, bool) {
if ctxDataAny, ok := contextStore.Load(key); ok {
ctxData := toContextData(ctxDataAny)
cancel := ctxData.cancel
return cancel, true
}
return nil, false
}
七、创建协程
- 根据当前服务类型和请求发起者唯一标识, 组合成任意类型的唯一
key
- 在请求发起者请求数据时, 根据
key
查找上下文, 从而复用或创建新的协程
例:
func foo(parentCtx context.Context, key any) (context.Context, context.cancelFunc, bool) {
// 根据服务类型和请求发起者标识组成的key获取上下文和取消函数
if ctx, cancel, ok := gstatem_context.Get(key); ok {
// 找得到上下文, 复用上下文和协程
return ctx, cancel, true
} else {
// 找不到上下文, 创建新的协程和上下文
ctx, cancel, ok := foo_server:StartServer(parentCtx, key)
if !ok {
// 创建协程失败
return nil, nil, false
}
// 创建协程成功, 保存上下文和取消函数
gstatem_context.Set(key, ctx, cancel)
return ctx, cancle, true
}
}
八、协程消息
有异步消息、同步消息、普通消息等多种请求方式
1. 异步消息
发送消息到指定上下文的协程, 无需等待上下文协程返回值, 协程的父级上下文定义的 gstatem.ParentData.CastFunc
回调函数会收到消息
package gstatem
// Cast 异步消息
func Cast(ctx context.Context, castData any) {
statemData := ctx.Value(DataKey).(*Data)
statemMsg := msg{Type: msgTypeCast, Msg: castData}
go func() { _ = try(func() { statemData.channel <- statemMsg }) }()
}
将上下文和请求数据传入即可, 请求数据的结构由具体项目指定
2. 同步消息
发送消息到指定上下文的协程, 并等待上下文协程返回值, 协程的父级上下文定义的 gstatem.ParentData.CallFunc
回调函数会收到消息
package gstatem
// Call 同步消息
func Call(ctx context.Context, callData any, timeouts ...duration) (any, error) {
statemData := ctx.Value(DataKey).(*Data)
// 创建响应通道
replyChan := make(chan any)
defer close(replyChan)
// 发送消息
statemMsg := msg{Type: msgTypeCall, Msg: callData, replyChan: replyChan}
err := try(func() { go func() { statemData.channel <- statemMsg }() })
if err != nil {
return nil, err
}
// 获取超时时间间隔
var timeout duration
if len(timeouts) > 0 {
timeout = timeouts[0]
} else {
timeout = defaultCallTimeout
}
// 等待响应
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
select {
case <-timeoutCtx.Done():
err := fmt.Errorf("Call timeout, callData: %v\n", callData)
return nil, err
case reply := <-replyChan:
return reply, nil
}
}
3. 普通消息
通常用于协程发送给自己的定时消息, 协程的父级上下文定义的 gstatem.ParentData.InfoFunc
回调函数会收到消息
package gstatem
// Info 内部消息
func Info(ctx context.Context, infoData any) {
statemData := ctx.Value(DataKey).(*Data)
statemMsg := msg{Type: msgTypeInfo, Msg: infoData}
go func() { _ = try(func() { statemData.channel <- statemMsg }) }()
}
4. 定时消息
基于普通消息, 启动定时器调用普通消息函数发送消息, 并返回 *time.Timer
, 可调用 *time.Stop()
取消定时
协程的父级上下文定义的 gstatem.ParentData.InfoFunc
回调函数会收到消息
package gstatem
// SendAfter 启动定时器 n 秒后发送消息
func SendAfter(ctx context.Context, n duration, infoData any) *time.Timer {
timer := time.AfterFunc(n, func() {
Info(ctx, infoData)
})
return timer
}
九、回调函数
实现以下六个回调函数, 并赋值到创建父级上下文的 gstatem.ParentData
(这里 父级上下文)
// 初始化回调函数
type initHandlerFunc func(key any, initData any) Return
// 异步消息回调函数
type castHandlerFunc func(ctx context.Context, msg any, state any, serverData any) Return
// 同步消息回调函数
type callHandlerFunc func(ctx context.Context, msg any, state any, serverData any) Return
// 普通消息回调函数
type infoHandlerFunc func(ctx context.Context, msg any, state any, serverData any) Return
// 状态超时回调函数
type stateTimeoutHandlerFunc func(ctx context.Context, msg any, state any, serverData any) Return
// 终结回调函数
type terminateHandlerFunc func(closeMsg CloseMsg, state any, serverData any)
1. 初始化回调函数
实现协程初始化回调函数逻辑
- 先查询子级上下文(当前协程)唯一标识是否已存在, 若存在则结束协程, 该判断为可选
- 获取创建协程时的初始化数据, 组合成当前项目协程的缓存数据
- 返还初始化成功或失败, 若成功要返回新状态和服务数据
例:
// HandleInit 协程初始化
func HandleInit(key any, sourceInitData any) gstatem.Return {
// 查询是否已经存在相同的协程(非必要)
if _, ok := gstatem_context.Context(key); ok {
// 已经存在, 关闭协程
return gstatem.Return{
Type: gstatem.ReturnTypeStop,
}
} else {
// 不存在, 初始化服务数据
initData := sourceInitData.(initDataS)
...
fooServerData := &fooServerData{
...
}
...
return gstatem.Return{
Type: gstatem.ReturnTypeInitOk,
NewState: FooSrvStateXXX,
ServerData: fooServerData,
}
}
}
如上所示, 初始化可以返回两种类型
gstatem.ReturnTypeInitOk
// 或
gstatem.ReturnTypeStop
2. 异步消息回调函数
- 自定义异步消息请求数据的结构
fooReqMsgType
- 将收到的消息转为该自定义结构
- 根据自定义结构取值和处理逻辑
例:
// FooCast 异步消息
func FooCast(ctx context.Context) {
msg := make(fooReqMsgType)
// 组合 msg 数据, 有当前项目和 `fooReqMsgType` 结构决定
...
gstatem.Cast(ctx, msg)
}
该上下文协程的 HandleCast
会收到消息
例:
// HandleCast 异步消息回调函数
func HandleCast(ctx context.Context, msg any, state any, serverData any) gstatem.Return {
fooServerData := serverData.(*fooServerData)
fooMsg := msg.(fooReqMsgType)
statemReturn := gstatem.Return{}
...
return statemReturn
}
3. 同步消息回调函数
- 自定义同步消息请求数据的结构
fooReqMsgType
- 将收到的消息转为该自定义结构
- 根据自定义结构取值和处理逻辑
例:
// FooCall 同步消息
func FooCall(ctx context.Context, context *gin.Context) (any, error) {
msg := make(fooReqMsgType)
// 组合 msg 数据, 由当前项目和 `fooReqMsgType` 结构决定
...
return gstatem.Call(ctx, msg)
}
该上下文协程的 HandleCall
会收到消息
例:
// HandleCall 同步消息回调函数
func HandleCall(ctx context.Context, msg any, state any, serverData any) gstatem.Return {
fooServerData := serverData.(fooServerData)
fooMsg := msg.(fooReqMsgType)
statemReturn := gstatem.Return{}
...
return statemReturn
}
4. 普通消息回调函数
- 自定义同步消息请求数据的结构
fooReqMsgType
- 将收到的消息转为该自定义结构
- 根据自定义结构取值和处理逻辑
例:
// FooInfo 普通消息
func FooInfo(ctx context.Context, context *gin.Context) {
msg := make(fooReqMsgType)
// 组合 msg 数据, 有当前项目和 `fooReqMsgType` 结构决定
...
gstatem.Info(ctx, msg)
}
该上下文协程的 HandleInfo
会收到消息
例:
// HandleInfo 普通消息回调函数
func HandleInfo(ctx context.Context, msg any, state any, serverData any) gstatem.Return {
fooServerData := serverData.(fooServerData)
fooMsg := msg.(fooReqMsgType)
statemReturn := gstatem.Return{}
...
return statemReturn
}
5. 状态超时回调函数
- 由回调函数的返回值
gstatem.Return.Action
触发
Action
说明 回调返回动作
例:
// HandleStateTimeout 状态超时回调函数
func HandleStateTimeout(ctx context.Context, msg any, state any, serverData any) gstatem.Return {
fooServerData := serverData.(fooServerData)
fooTimeoutMsg := msg.(fooStateTimeoutMsg)
statemReturn := gstatem.Return{}
...
return statemReturn
}
6. 终结回调函数
协程关闭前会调用该回调函数, 以便协程做清理工作
例:
// HandleTerminate 协程终结
func HandleTerminate(closeMsg gstatem.CloseMsg, state any, serverData any) {
fooServerData := serverData.(fooServerData)
...
log.Infof("Book server userName: %s terminal, state: %s, closeMsg: %s", serverData.userName, state, closeMsg)
}
十、回调返回
回调函数的返回统一结果:
// Return statem server 回调返回结构
type Return struct {
Type returnType // 回调返回类型
NewState any // 新状态
ServerData any // 服务数据
Action Action // 回调返回动作
ReplyMsg any // 响应消息
msgType msgType // 消息类型
replyChan chan any // 响应通道
}
// Action statem server 回调返回动作
type Action struct {
ActionType actionType // 动作类型
StateTime duration // 状态时间
TimeoutMsg any // 超时消息
}
1. 类型
// statem server 回调返回类型
type returnType = int8
const (
ReturnTypeInitOk = iota + 1 // 初始化完成
ReturnTypeKeepState // 保持状态
ReturnTypeNextState // 下个状态
ReturnTypeKeepStateAndData // 保持状态和数据
ReturnTypeStop // 停止
)
1) 初始化完成
例:
gstatem.Return{
Type: gstatem.ReturnTypeInitOk,
}
只用于初始化回调函数返回, 当返回该类型时, 协程就正常创建成功, 同时 gstatem
会读取 Return.NewState
和 Return.ServerData
并更新
2) 保持状态
例:
gstatem.Return{
Type: gstatem.ReturnTypeKeepState,
}
当返回该类型时, 表示协程状态不变, gstatem
不会读取 Return.NewState
字段, 但依然会读取 Return.ServerData
更新服务数据
3) 下个状态
例:
gstatem.Return{
Type: gstatem.ReturnTypeNextState,
}
当返回该类型时, 表示协程进入新的状态, 同时 gstatem
会读取 Return.NewState
和 Return.ServerData
并更新
4) 保持状态和数据
例:
gstatem.Return{
Type: gstatem.ReturnTypeKeepStateAndData,
}
当返回该类型时, 表示协程什么都不改变, gstatem
什么都不读取和更新
例:
5) 停止
gstatem.Return{
Type: gstatem.ReturnTypeStop,
}
当返回该类型时, 表现关闭协程, 同时 gstatem
会读取 Return.NewState
和 Return.ServerData
并更新
2. 状态
状态的类型是 any
, 由具体项目决定状态的类型
例:
gstatem.Return{
NewState: fooServerStateXXX,
}
3. 服务数据
服务数据的类型是 any
, 由具体项目决定状态的类型, 是保存协程缓存数据的结构
例:
gstatem.Return{
ServerData: fooServerData
}
4. 动作
动作用于控制协程服务的状态, 比如多久后状态超时并进入某个状态, 或者取消当前的状态定时
gstatem.Return{
Action: gstatem.Action{
ActionType actionType // 动作类型
StateTime duration // 状态时间
TimeoutMsg any // 超时消息
}
}
// ActionType statem server 回调返回动作类型
type ActionType = int8
const (
ActionTypeStateTimeout = iota + 1 // 当前状态定时
ActionTypeCancelStateTimeout // 取消状态定时
)
1) 状态定时
设置动作类型为 ActionTypeStateTimeout
, 指定状态持续时间 StateTime
和 超时后收到消息 TimeoutMsg
例:
gstatem.Action{
ActionType: ActionTypeStateTimeout,
StateTime: 3 * time.Second,
TimeoutMsg: fooStateTimeoutMsg,
}
当时间到时, 状态超时回调函数将会收到消息, 此时可以进行状态的变化或其他逻辑处理 状态超时回调函数
2) 取消定时
设置动作类型为 ActionTypeCancelStateTimeout
, 其他无需设置
例:
gstatem.Action{
ActionType: ActionTypeCancelStateTimeout,
}
此时会把协程的状态定时取消, 状态超时回调函数也不会收到消息
5. 消息响应
- 当消息类型为
Call
同步类型时, 会获取 gstatem.Return
里的 ReplyMsg
和 replyChan
进行响应通道消息发送
Call
请求逻辑处理, 在最后 gstatem.Return
给 ReplyMsg
响应消息知道赋值即可
例:
package foo
// HandleCall 同步消息回调函数
func HandleCall(ctx context.Context, msg any, state any, serverData any) gstatem.Return {
serverData := serverData.(fooServerData)
fooMsg := msg.(ReqMsgType)
statemReturn := gstatem.Return{}
...
replyMsg := fooRespMsgType
statemReturn.ReplyMsg = replyMsg
...
return statemReturn
}
十一、上下文和通道清理
协程在关闭时会自动调用上下文的 context.CancelFunc
和 关闭该协程的通道, 无需具体服务在终结回调函数处理
package gstatem
// 服务 Routine
func serverRoutine(ctx context.Context, cancel context.CancelFunc) {
statemData := ctx.Value(DataKey).(*Data)
defer handleTerminate(ctx)
defer func(sd *Data) {
if r := recover(); r != nil {
...
}
sd.cancelMsg = ctx.Err().Error()
}(statemData)
defer func(sd *Data) {
close(sd.channel)
if ctx.Err() == nil {
cancel()
}
}(statemData)
// 初始化
handleInit(ctx, cancel)
// 服务循环
serverLoop(ctx, cancel)
}
EOF