node

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2025 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Actor

type Actor struct {
	// contains filtered or unexported fields
}

func (*Actor) AddEventHandler

func (a *Actor) AddEventHandler(event gcluster.Event, handler EventHandler)

AddEventHandler 添加事件处理器

func (*Actor) AddRouteHandler

func (a *Actor) AddRouteHandler(route int32, handler RouteHandler)

AddRouteHandler 添加路由处理器

func (*Actor) AfterFunc

func (a *Actor) AfterFunc(d time.Duration, f func()) *Timer

AfterFunc 延迟调用,与官方的time.AfterFunc用法一致

func (*Actor) AfterInvoke

func (a *Actor) AfterInvoke(d time.Duration, f func()) *Timer

AfterInvoke 延迟调用(线程安全)

func (*Actor) Deliver

func (a *Actor) Deliver(uid int64, message *gcluster.Message) error

Deliver 投递消息到当前Actor中进行处理

func (*Actor) Destroy

func (a *Actor) Destroy() (ok bool)

Destroy 销毁Actor

func (*Actor) ID

func (a *Actor) ID() string

ID 获取Actor的ID

func (*Actor) Invoke

func (a *Actor) Invoke(fn func())

Invoke 调用函数(Actor内线程安全)

func (*Actor) Kind

func (a *Actor) Kind() string

Kind 获取Actor类型

func (*Actor) Next

func (a *Actor) Next(ctx Context)

Next 投递消息到Actor中进行处理

func (*Actor) PID

func (a *Actor) PID() string

PID 获取Actor的唯一识别ID

func (*Actor) Proxy

func (a *Actor) Proxy() *Proxy

Proxy 获取代理API

func (*Actor) Push

func (a *Actor) Push(uid int64, message *gcluster.Message) error

Push 推送消息到本地Node队列上进行处理

func (*Actor) Spawn

func (a *Actor) Spawn(creator Creator, opts ...ActorOption) (*Actor, error)

Spawn 衍生出一个Actor

type ActorOption

type ActorOption func(o *actorOptions)

func WithActorArgs

func WithActorArgs(args ...any) ActorOption

WithActorArgs 设置传递到Processor中的参数

func WithActorID

func WithActorID(id string) ActorOption

WithActorID 设置Actor编号

func WithActorKind

func WithActorKind(kind string) ActorOption

WithActorKind 设置Actor类型

func WithActorNonDispatch

func WithActorNonDispatch() ActorOption

WithActorNonDispatch 设置Actor不可调度

func WithActorNonWait

func WithActorNonWait() ActorOption

WithActorNonWait 设置Actor无需等待属性(Node组件关关闭时无需等待此Actor结束)

type BaseProcessor

type BaseProcessor struct{}

func (*BaseProcessor) Destroy

func (b *BaseProcessor) Destroy()

Destroy 销毁回调

func (*BaseProcessor) Init

func (b *BaseProcessor) Init()

Init 初始化回调

func (*BaseProcessor) Start

func (b *BaseProcessor) Start()

Start 启动回调

type Context

type Context interface {
	// GID 获取网关ID
	GID() string
	// NID 获取节点ID
	NID() string
	// CID 获取连接ID
	CID() int64
	// UID 获取用户ID
	UID() int64
	// Seq 获取消息序列号
	Seq() int32
	// Route 获取消息路由号
	Route() int32
	// Event 获取事件类型
	Event() gcluster.Event
	// Kind 上下文消息类型
	Kind() Kind
	// Parse 解析消息
	Parse(v interface{}) error
	// Defer 添加defer延迟调用栈
	// 此方法功能与go defer一致,作用域也仅限于当前handler处理函数内,推荐使用Defer方法替代go defer使用
	// 区别在于使用Defer方法可以对调用栈进行取消操作
	// 同时,在调用Task和Next方法是会自动取消调用栈
	// 也可通过Cancel方法进行手动取消调用栈
	// bottom用于标识是否挂载到栈底部
	Defer(fn func(), bottom ...bool)
	// Cancel 取消defer调用栈
	Cancel()
	// Clone 克隆Context
	Clone() Context
	// Task 投递任务
	// 调用此方法会自动取消Defer调用栈的所有执行函数
	Task(fn func(ctx Context))
	// Proxy 获取代理API
	Proxy() *Proxy
	// Context 获取上下文
	Context() context.Context
	// GetIP 获取客户端IP
	GetIP() (string, error)
	// Deliver 投递消息给节点处理
	Deliver(args *gcluster.DeliverArgs) error
	// Reply 回复消息
	Reply(message *gcluster.Message) error
	// Response 响应消息
	Response(message interface{}) error
	// Disconnect 关闭来自网关的连接
	Disconnect(force ...bool) error
	// BindGate 绑定网关
	BindGate(uid ...int64) error
	// UnbindGate 解绑网关
	UnbindGate(uid ...int64) error
	// BindNode 绑定节点
	BindNode(uid ...int64) error
	// UnbindNode 解绑节点
	UnbindNode(uid ...int64) error
	// BindActor 绑定Actor
	BindActor(kind, id string) error
	// UnbindActor 解绑Actor
	UnbindActor(kind string)
	// Next 消息下放
	// 调用此方法会自动取消Defer调用栈的所有执行函数
	Next() error
	// Spawn 衍生出一个新的Actor
	Spawn(creator Creator, opts ...ActorOption) (*Actor, error)
	// Kill 杀死存在的一个Actor
	Kill(kind, id string) bool
	// Actor 获取Actor
	Actor(kind, id string) (*Actor, bool)
	// Invoke 调用函数(线程安全)
	// ctx在全局的处理器中,调用的就是proxy.Invoke
	// ctx在Actor的处理器中,调用的就是actor.Invoke
	Invoke(fn func())
	// AfterFunc 延迟调用,与官方的time.AfterFunc用法一致
	// ctx在全局的处理器中,调用的就是proxy.AfterFunc
	// ctx在Actor的处理器中,调用的就是actor.AfterFunc
	AfterFunc(d time.Duration, f func()) *Timer
	// AfterInvoke 延迟调用(线程安全)
	// ctx在全局的处理器中,调用的就是proxy.AfterInvoke
	// ctx在Actor的处理器中,调用的就是actor.AfterInvoke
	AfterInvoke(d time.Duration, f func()) *Timer
	// NewMeshClient 新建微服务客户端
	// target参数可分为三种种模式:
	// 服务直连模式: 	direct://127.0.0.1:8011
	// 服务直连模式: 	direct://711baf8d-8a06-11ef-b7df-f4f19e1f0070
	// 服务发现模式: 	discovery://service_name
	NewMeshClient(target string) (gtransport.Client, error)
	// contains filtered or unexported methods
}

type Creator

type Creator func(actor *Actor, args ...any) Processor

type EventHandler

type EventHandler func(ctx Context)

type HookHandler

type HookHandler func(proxy *Proxy)

type Kind

type Kind int
const (
	Event   Kind = iota // 事件
	Request             // 请求
)

type Middleware

type Middleware struct {
	// contains filtered or unexported fields
}

func (*Middleware) Next

func (m *Middleware) Next(ctx Context)

Next 下一个中间件

func (*Middleware) Skip

func (m *Middleware) Skip(ctx Context, skip int)

Skip 跳过N个中间件

type MiddlewareHandler

type MiddlewareHandler func(middleware *Middleware, ctx Context)

type Node

type Node struct {
	gmodules.Base
	// contains filtered or unexported fields
}

func NewNode

func NewNode(opts ...Option) *Node

func (*Node) Close

func (n *Node) Close()

Close 关闭节点

func (*Node) Destroy

func (n *Node) Destroy()

Destroy 销毁节点服务器

func (*Node) Init

func (n *Node) Init()

Init 初始化节点

func (*Node) Name

func (n *Node) Name() string

Name 组件名称

func (*Node) Proxy

func (n *Node) Proxy() *Proxy

Proxy 获取节点代理

func (*Node) Start

func (n *Node) Start()

Start 启动节点

type Option

type Option func(o *options)

func WithAddr

func WithAddr(addr string) Option

WithAddr 设置连接地址

func WithCodec

func WithCodec(codec gencoding.Codec) Option

WithCodec 设置编解码器

func WithContext

func WithContext(ctx context.Context) Option

WithContext 设置上下文

func WithEncryptor

func WithEncryptor(encryptor gcrypto.Encryptor) Option

WithEncryptor 设置消息加密器

func WithID

func WithID(id string) Option

WithID 设置实例ID

func WithLocator

func WithLocator(locator glocate.Locator) Option

WithLocator 设置定位器

func WithName

func WithName(name string) Option

WithName 设置实例名称

func WithRegistry

func WithRegistry(r gregistry.Registry) Option

WithRegistry 设置服务注册器

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout 设置RPC调用超时时间

func WithTransporter

func WithTransporter(transporter gtransport.Transporter) Option

WithTransporter 设置消息传输器

func WithWeight

func WithWeight(weight int) Option

WithWeight 设置权重

type Processor

type Processor interface {
	// Init 初始化回调
	Init()
	// Start 启动回调
	Start()
	// Destroy 销毁回调
	Destroy()
}

type Proxy

type Proxy struct {
	// contains filtered or unexported fields
}

func (*Proxy) Actor

func (p *Proxy) Actor(kind, id string) (*Actor, bool)

Actor 获取Actor

func (*Proxy) AddEventHandler

func (p *Proxy) AddEventHandler(event gcluster.Event, handler EventHandler)

AddEventHandler 添加事件处理器

func (*Proxy) AddHookListener

func (p *Proxy) AddHookListener(hook gcluster.Hook, handler HookHandler)

AddHookListener 添加钩子监听器

func (*Proxy) AddInternalRouteHandler

func (p *Proxy) AddInternalRouteHandler(route int32, stateful bool, handler RouteHandler, middlewares ...MiddlewareHandler)

AddInternalRouteHandler 添加内部路由处理器(node节点间路由消息处理)

func (*Proxy) AddRouteHandler

func (p *Proxy) AddRouteHandler(route int32, stateful bool, handler RouteHandler, middlewares ...MiddlewareHandler)

AddRouteHandler 添加路由处理器

func (*Proxy) AddServiceProvider

func (p *Proxy) AddServiceProvider(name string, desc interface{}, provider interface{})

AddServiceProvider 添加服务提供者

func (*Proxy) AfterFunc

func (p *Proxy) AfterFunc(d time.Duration, f func()) *Timer

AfterFunc 延迟调用,与官方的time.AfterFunc用法一致

func (*Proxy) AfterInvoke

func (p *Proxy) AfterInvoke(d time.Duration, f func()) *Timer

AfterInvoke 延迟调用(线程安全)

func (*Proxy) AskGate

func (p *Proxy) AskGate(ctx context.Context, gid string, uid int64) (string, bool, error)

AskGate 检测用户是否在给定的网关上

func (*Proxy) AskNode

func (p *Proxy) AskNode(ctx context.Context, uid int64, name, nid string) (string, bool, error)

AskNode 检测用户是否在给定的节点上

func (*Proxy) BindActor

func (p *Proxy) BindActor(uid int64, kind, id string) error

BindActor 绑定Actor

func (*Proxy) BindGate

func (p *Proxy) BindGate(ctx context.Context, gid string, cid, uid int64) error

BindGate 绑定网关

func (*Proxy) BindNode

func (p *Proxy) BindNode(ctx context.Context, uid int64, nameAndNID ...string) error

BindNode 绑定节点 单个用户可以绑定到多个节点服务器上,相同名称的节点服务器只能绑定一个,多次绑定会到相同名称的节点服务器会覆盖之前的绑定。 绑定操作会通过发布订阅方式同步到网关服务器和其他相关节点服务器上。

func (*Proxy) Broadcast

func (p *Proxy) Broadcast(ctx context.Context, args *gcluster.BroadcastArgs) error

Broadcast 推送广播消息

func (*Proxy) Deliver

func (p *Proxy) Deliver(ctx context.Context, args *gcluster.DeliverArgs) error

Deliver 投递消息给节点处理

func (*Proxy) Disconnect

func (p *Proxy) Disconnect(ctx context.Context, args *gcluster.DisconnectArgs) error

Disconnect 断开连接

func (*Proxy) FetchGateList

func (p *Proxy) FetchGateList(ctx context.Context, states ...gcluster.State) ([]*gregistry.ServiceInstance, error)

FetchGateList 拉取网关列表

func (*Proxy) FetchNodeList

func (p *Proxy) FetchNodeList(ctx context.Context, states ...gcluster.State) ([]*gregistry.ServiceInstance, error)

FetchNodeList 拉取节点列表

func (*Proxy) GetID

func (p *Proxy) GetID() string

GetID 获取当前节点ID

func (*Proxy) GetIP

func (p *Proxy) GetIP(ctx context.Context, args *gcluster.GetIPArgs) (string, error)

GetIP 获取客户端IP

func (*Proxy) GetName

func (p *Proxy) GetName() string

GetName 获取当前节点名称

func (*Proxy) GetState

func (p *Proxy) GetState() gcluster.State

GetState 获取当前节点状态

func (*Proxy) HasGate

func (p *Proxy) HasGate(gid string) bool

HasGate 检测是否存在某个网关

func (*Proxy) HasNode

func (p *Proxy) HasNode(nid string) bool

HasNode 检测是否存在某个节点

func (*Proxy) Invoke

func (p *Proxy) Invoke(fn func())

Invoke 调用函数(线程安全)

func (*Proxy) IsOnline

func (p *Proxy) IsOnline(ctx context.Context, args *gcluster.IsOnlineArgs) (bool, error)

IsOnline 检测是否在线

func (*Proxy) Kill

func (p *Proxy) Kill(kind, id string) bool

Kill 杀死存在的一个Actor

func (*Proxy) LocateGate

func (p *Proxy) LocateGate(ctx context.Context, uid int64) (string, error)

LocateGate 定位用户所在网关

func (*Proxy) LocateNode

func (p *Proxy) LocateNode(ctx context.Context, uid int64, name string) (string, error)

LocateNode 定位用户所在节点

func (*Proxy) Multicast

func (p *Proxy) Multicast(ctx context.Context, args *gcluster.MulticastArgs) error

Multicast 推送组播消息

func (*Proxy) NewMeshClient

func (p *Proxy) NewMeshClient(target string) (gtransport.Client, error)

NewMeshClient 新建微服务客户端 target参数可分为三种种模式: 服务直连模式: direct://127.0.0.1:8011 服务直连模式: direct://711baf8d-8a06-11ef-b7df-f4f19e1f0070 服务发现模式: discovery://service_name

func (*Proxy) PackBuffer

func (p *Proxy) PackBuffer(message any) ([]byte, error)

PackBuffer 打包Buffer

func (*Proxy) PackMessage

func (p *Proxy) PackMessage(message *gcluster.Message) ([]byte, error)

PackMessage 打包消息

func (*Proxy) Push

func (p *Proxy) Push(ctx context.Context, args *gcluster.PushArgs) error

Push 推送消息

func (*Proxy) RouteGroup

func (p *Proxy) RouteGroup(groups ...func(group *RouterGroup)) *RouterGroup

RouteGroup 路由组

func (*Proxy) Router

func (p *Proxy) Router() *Router

Router 路由器

func (*Proxy) SetDefaultRouteHandler

func (p *Proxy) SetDefaultRouteHandler(handler RouteHandler)

SetDefaultRouteHandler 设置默认路由处理器,所有未注册的路由均走默认路由处理器

func (*Proxy) SetState

func (p *Proxy) SetState(state gcluster.State) error

SetState 设置当前节点状态

func (*Proxy) Spawn

func (p *Proxy) Spawn(creator Creator, opts ...ActorOption) (*Actor, error)

Spawn 衍生出一个新的Actor

func (*Proxy) Stat

func (p *Proxy) Stat(ctx context.Context, kind gsession.Kind) (int64, error)

Stat 统计会话总数

func (*Proxy) Trigger

func (p *Proxy) Trigger() *Trigger

Trigger 事件触发器

func (*Proxy) UnbindActor

func (p *Proxy) UnbindActor(uid int64, kind string)

UnbindActor 解绑Actor

func (*Proxy) UnbindGate

func (p *Proxy) UnbindGate(ctx context.Context, uid int64) error

UnbindGate 解绑网关

func (*Proxy) UnbindNode

func (p *Proxy) UnbindNode(ctx context.Context, uid int64, nameAndNID ...string) error

UnbindNode 解绑节点 解绑时会对对应名称的节点服务器进行解绑,解绑时会对解绑节点ID进行校验,不匹配则解绑失败。 解绑操作会通过发布订阅方式同步到网关服务器和其他相关节点服务器上。

type RouteHandler

type RouteHandler func(ctx Context)

type RouteOptions

type RouteOptions struct {
	// 是否有状态路由,默认无状态
	// 无状态路由消息会根据负载均衡策略分配到不同的节点服务器进行处理
	// 有状态路由消息会在绑定节点服务器后,固定路由到绑定的节点服务器进行处理
	Stateful bool

	// 是否内部的路由,默认非内部
	// 外部路由可在客户端、网关、节点间进行消息流转
	// 内部路由仅限于在节点间进行消息流转
	Internal bool

	// 是否受限的路由,默认不受限
	// 仅对无状态路由生效
	// 受限的路由在节点状态变更为cluster.Hang或cluster.Shut时,不会路由到该节点;网关层会优先选取其他处于cluster.Work状态的节点;若无cluster.Work状态的节点则选取cluster.Busy节点
	// 非受限路由不受节点状态影响
	Restricted bool

	// 路由中间件
	Middlewares []MiddlewareHandler
}

type Router

type Router struct {
	// contains filtered or unexported fields
}

func (*Router) AddInternalRouteHandler

func (r *Router) AddInternalRouteHandler(route int32, stateful bool, handler RouteHandler, middlewares ...MiddlewareHandler)

AddInternalRouteHandler 添加内部路由处理器(node节点间路由消息处理)

func (*Router) AddRouteHandler

func (r *Router) AddRouteHandler(route int32, stateful bool, handler RouteHandler, middlewares ...MiddlewareHandler)

AddRouteHandler 添加路由处理器

func (*Router) CheckRouteStateful

func (r *Router) CheckRouteStateful(route int32) (stateful bool, exist bool)

CheckRouteStateful 是否为有状态路由

func (*Router) Group

func (r *Router) Group(groups ...func(group *RouterGroup)) *RouterGroup

Group 路由组

func (*Router) HasDefaultRouteHandler

func (r *Router) HasDefaultRouteHandler() bool

HasDefaultRouteHandler 是否存在默认路由处理器

func (*Router) SetDefaultRouteHandler

func (r *Router) SetDefaultRouteHandler(handler RouteHandler)

SetDefaultRouteHandler 设置默认路由处理器,所有未注册的路由均走默认路由处理器

type RouterGroup

type RouterGroup struct {
	// contains filtered or unexported fields
}

func (*RouterGroup) AddInternalRouteHandler

func (g *RouterGroup) AddInternalRouteHandler(route int32, stateful bool, handler RouteHandler, middlewares ...MiddlewareHandler) *RouterGroup

AddInternalRouteHandler 添加内部路由处理器(node节点间路由消息处理)

func (*RouterGroup) AddRouteHandler

func (g *RouterGroup) AddRouteHandler(route int32, stateful bool, handler RouteHandler, middlewares ...MiddlewareHandler) *RouterGroup

AddRouteHandler 添加路由处理器

func (*RouterGroup) Middleware

func (g *RouterGroup) Middleware(middlewares ...MiddlewareHandler) *RouterGroup

Middleware 添加中间件

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

type SchedulingModel

type SchedulingModel string

SchedulingModel 调度模型

type Timer

type Timer struct {
	// contains filtered or unexported fields
}

func (*Timer) Stop

func (t *Timer) Stop() (ok bool)

Stop 停止定时器

type Trigger

type Trigger struct {
	// contains filtered or unexported fields
}

func (*Trigger) AddEventHandler

func (e *Trigger) AddEventHandler(event gcluster.Event, handler EventHandler)

AddEventHandler 添加事件处理器

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL