choykit

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2021 License: BSD-3-Clause Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatCommit int = iota
	StatTimer
	StatExec
	StatError
	StatDropped
	NumStats
)
View Source
const (
	NodeServiceShift  = 8
	NodeServiceMask   = 0xFFFF00FF
	NodeDistrictShift = 16
	NodeDistrictMask  = 0xF000FFFF
	NodeInstanceMask  = 0xFFFFFF00
	NodeTypeShift     = 31
	NodeTypeClient    = NodeID(1 << NodeTypeShift)
)
View Source
const (
	PacketFlagError    = 0x0100
	PacketFlagRefer    = 0x0200
	PacketFlagRpc      = 0x0400
	PacketFlagJSONText = 0x0800
	PacketFlagCompress = 0x0001
	PacketFlagEncrypt  = 0x0002

	PacketFlagBitsMask = 0xFF00 // 低8位的标志用于传输处理,完成传输后需要清除,不能再返回给ack
)
View Source
const (
	TimerTickInterval = 10  // every 10ms a tick
	TimerChanCapacity = 256 //
)

Variables

View Source
var (
	ErrExecutorClosed = errors.New("executor is closed")
	ErrExecutorBusy   = errors.New("executor is busy")
)
View Source
var (
	ErrOutboundQueueOverflow   = errors.New("outbound queue overflow")
	ErrPacketContextEmpty      = errors.New("packet dispatch context is empty")
	ErrDestinationNotReachable = errors.New("destination not reachable")
)

Functions

func Backtrace

func Backtrace(message interface{}, fp *os.File)

func Catch

func Catch()

func DateTime

func DateTime() string

func DecodeAsMsg

func DecodeAsMsg(value interface{}, msg proto.Message) error

解码为protobuf消息

func DecodeAsString

func DecodeAsString(value interface{}) string

解码为string

func DecodeU32

func DecodeU32(data []byte) (uint32, error)

解码Uint32

func EncodeNumber

func EncodeNumber(value interface{}, data []byte) []byte

编码数字, `data`需足够容量

func EncodeValue

func EncodeValue(value interface{}) ([]byte, error)

编码一个字符串、字节流、protobuf消息对象 编码后的字节用于传输,不能修改其内容

func GetServiceNames

func GetServiceNames() []string

所有服务类型名

func GetServiceTypeByName

func GetServiceTypeByName(name string) uint8

根据服务类型名获取服务类型

func Now

func Now() time.Time

func ReadFileOption

func ReadFileOption(filename string, opts *Options) error

从文件中读取option

func Register

func Register(service Service)

注册服务

func StartClock

func StartClock()

func StopClock

func StopClock()

func WallClock

func WallClock() *datetime.Clock

Types

type BasicRoutePolicy

type BasicRoutePolicy struct {
	// contains filtered or unexported fields
}

func (*BasicRoutePolicy) IsLoopBack

func (r *BasicRoutePolicy) IsLoopBack(router *Router, pkt *Packet) bool

func (*BasicRoutePolicy) Lookup

func (r *BasicRoutePolicy) Lookup(router *Router, pkt *Packet) Endpoint

路由查询

func (*BasicRoutePolicy) Multicast

func (r *BasicRoutePolicy) Multicast(router *Router, pkt *Packet) bool

广播

type CapturedRunnable

type CapturedRunnable struct {
	F func() error
}

func (*CapturedRunnable) Run

func (r *CapturedRunnable) Run() error

type Codec

type Codec interface {
	Version() uint8
	Clone() Codec

	SetSeqNo(seq uint16)
	SetEncryptKey(key, iv []byte)

	// 消息编解码
	Decode(rd io.Reader, pkt *Packet) (int, error)
	Encode(pkt *Packet, buf *bytes.Buffer) error
}

消息编码器

type Endpoint

type Endpoint interface {
	MessageEndpoint

	RawConn() net.Conn
	Stats() *Stats
	Codec() Codec

	Go(write, read bool)

	SetUserData(interface{})
	UserData() interface{}
}

网络连接端点

type EndpointMap

type EndpointMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

线程安全的endpoint map

func NewEndpointMap

func NewEndpointMap() *EndpointMap

func (*EndpointMap) Add

func (e *EndpointMap) Add(node NodeID, endpoint Endpoint)

func (*EndpointMap) Delete

func (e *EndpointMap) Delete(node NodeID) bool

func (*EndpointMap) Get

func (e *EndpointMap) Get(node NodeID) Endpoint

func (*EndpointMap) List

func (e *EndpointMap) List() []Endpoint

func (*EndpointMap) Reset

func (e *EndpointMap) Reset()

func (*EndpointMap) Size

func (e *EndpointMap) Size() int

type Environ

type Environ struct {
	DevelopMode               bool   // 测试/生产环境
	GameID                    string // 游戏ID
	ChannelID                 string // 渠道ID
	ServerID                  string // 服务器ID
	ServerName                string // 服务器名称
	AccessKey                 string //
	MysqlDSN                  string //
	RedisAddr                 string //
	ExecutorCapacity          int    //
	ExecutorConcurrency       int    //
	ContextInboundQueueSize   int    //
	ContextOutboundQueueSize  int    //
	EndpointOutboundQueueSize int    //
	NetEnableEncryption       bool
	NetPublicKeyFile          string //
	NetPrivateKeyFile         string //
	NetPeerPingInterval       int    //
	NetPeerReadTimeout        int    //
	NetSessionReadTimeout     int    //
	NetRpcTimeoutInterval     int    //
}

通用环境变量

func LoadEnviron

func LoadEnviron() *Environ

func NewEnviron

func NewEnviron() *Environ

func (Environ) String

func (e Environ) String() string

type Executor

type Executor struct {
	Scheduler
	// contains filtered or unexported fields
}

Runner执行器

func (*Executor) Busyness

func (e *Executor) Busyness() float32

繁忙度

func (*Executor) Execute

func (e *Executor) Execute(r Runner) error

func (*Executor) Go

func (e *Executor) Go()

func (*Executor) Init

func (e *Executor) Init(queueSize, concurrency int) error

func (*Executor) Shutdown

func (e *Executor) Shutdown()

func (*Executor) Stats

func (e *Executor) Stats() *Stats

type MessageEndpoint

type MessageEndpoint interface {
	NodeID() NodeID
	SetNodeID(NodeID)
	RemoteAddr() string

	// 发送消息
	SendPacket(*Packet) error

	// 关闭读/写
	Close() error
	ForceClose(error)
	IsClosing() bool

	Context() *ServiceContext
	SetContext(*ServiceContext)
}

type MessageSubscriber

type MessageSubscriber struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

消息订阅

func NewMessageSub

func NewMessageSub() *MessageSubscriber

func (*MessageSubscriber) AddSubNode

func (s *MessageSubscriber) AddSubNode(start, end int32, node NodeID)

func (*MessageSubscriber) DeleteNodeSubs

func (s *MessageSubscriber) DeleteNodeSubs(dest NodeID)

func (*MessageSubscriber) DeleteSubNode

func (s *MessageSubscriber) DeleteSubNode(start, end int32, node NodeID)

func (*MessageSubscriber) GetSubNodes

func (s *MessageSubscriber) GetSubNodes(startMsg, endMsg int32) NodeIDSet

func (*MessageSubscriber) HasSubNodes

func (s *MessageSubscriber) HasSubNodes(startMsg, endMsg int32) bool

type MySQLConf

type MySQLConf struct {
	Addr     string
	User     string
	Password string
	Database string
}

MySQL配置

func (*MySQLConf) DSN

func (c *MySQLConf) DSN() string

type NodeID

type NodeID uint32

func MakeNodeID

func MakeNodeID(district uint16, service, instance uint8) NodeID

func MakeSessionNodeID

func MakeSessionNodeID(session uint32) NodeID

func MustParseNodeID

func MustParseNodeID(s string) NodeID

func (NodeID) District

func (n NodeID) District() uint16

区服编号

func (NodeID) Instance

func (n NodeID) Instance() uint8

实例编号

func (NodeID) IsBackend

func (n NodeID) IsBackend() bool

是否backend instance

func (NodeID) IsClient

func (n NodeID) IsClient() bool

是否client session

func (NodeID) Service

func (n NodeID) Service() uint8

服务类型编号

func (*NodeID) SetDistrict

func (n *NodeID) SetDistrict(v uint16)

func (*NodeID) SetInstance

func (n *NodeID) SetInstance(v uint8)

func (*NodeID) SetService

func (n *NodeID) SetService(v uint8)

func (NodeID) String

func (n NodeID) String() string

type NodeIDSet

type NodeIDSet []NodeID

没有重复Node的集合

func (NodeIDSet) Add

func (s NodeIDSet) Add(node NodeID) NodeIDSet

func (NodeIDSet) Copy

func (s NodeIDSet) Copy() []NodeID

func (NodeIDSet) Delete

func (s NodeIDSet) Delete(node NodeID) NodeIDSet

func (NodeIDSet) Has

func (s NodeIDSet) Has(node NodeID) int

type Options

type Options struct {
	ShowVersion       bool   `short:"v" long:"version" description:"version string"`
	List              bool   `short:"l" long:"list" description:"list available services"`
	ConfigFile        string `short:"C" long:"config" description:"load option from file"`
	EnvFile           string `short:"E" long:"envfile" description:"dotenv file path"`
	WorkingDir        string `short:"W" long:"workdir" description:"runtime working directory"`
	ResourceDir       string `short:"R" long:"resdir" description:"resource directory"`
	ServiceType       string `short:"S" long:"service-type" description:"name of service type"`
	ServiceIndex      int    `short:"N" long:"service-index" description:"instance index of this service"`
	ServiceDistrict   int    `short:"D" long:"service-district" description:"district of this service"`
	ServiceDependency string `short:"P" long:"dependency" description:"service dependency list"`
	IsCrossDistrict   bool   `short:"X" long:"cross-district" description:"serve all districts"`
	Interface         string `short:"I" long:"interface" description:"service interface address"`
	LogLevel          string `short:"L" long:"loglevel" description:"debug,info,warn,error,fatal,panic"`
	EtcdAddress       string `short:"F" long:"etcd-addr" description:"etcd host address"`
	EtcdKeySpace      string `short:"K" long:"keyspace" description:"etcd key prefix"`
	EtcdLeaseTTL      int    `long:"lease-ttl" description:"etcd lease key TTL"`
	PprofAddr         string `long:"pprof-addr" description:"pprof http listen address"`
	EnableSysLog      bool   `long:"enable-syslog" description:"enable write log to syslog/eventlog"`
	SysLogParams      string `long:"syslog-param" description:"syslog/eventlog parameters"`
}

命令行选项参数

func NewOptions

func NewOptions() *Options

func ParseOptions

func ParseOptions() (*Options, error)

Parse options from console

type Packet

type Packet struct {
	Command  uint32          `json:"cmd"`            // 消息ID
	Seq      uint16          `json:"seq"`            // 消息序列号
	Flags    uint16          `json:"flg,omitempty"`  // 消息标记位
	Referer  uint32          `json:"ref,omitempty"`  // 引用的client session
	Node     NodeID          `json:"node,omitempty"` // 目标节点
	Body     interface{}     `json:"body,omitempty"` // 消息内容,integer/bytes/string/pb.Message
	Endpoint MessageEndpoint `json:"-"`              // 关联的endpoint
}

应用层消息

func MakePacket

func MakePacket() *Packet

func NewPacket

func NewPacket(node NodeID, command, refer uint32, flags, seq uint16, body interface{}) *Packet

func (*Packet) Ack

func (m *Packet) Ack(msgId int32, ack proto.Message) error

Reply with response message

func (*Packet) Clone

func (m *Packet) Clone() *Packet

func (*Packet) DecodeAsString

func (m *Packet) DecodeAsString() string

解码为string

func (*Packet) DecodeMsg

func (m *Packet) DecodeMsg(msg proto.Message) error

解码成protobuf message

func (*Packet) Encode

func (m *Packet) Encode() ([]byte, error)

func (*Packet) Errno

func (m *Packet) Errno() uint32

func (*Packet) Refuse

func (m *Packet) Refuse(command int32, errno uint32) error

Refuse with errno

func (*Packet) Reply

func (m *Packet) Reply(ack proto.Message) error

func (*Packet) ReplyAny

func (m *Packet) ReplyAny(command uint32, data interface{}) error

func (*Packet) Reset

func (m *Packet) Reset()

func (*Packet) Run

func (m *Packet) Run() error

func (*Packet) SetErrno

func (m *Packet) SetErrno(ec uint32)

func (Packet) String

func (m Packet) String() string

type PacketFilter

type PacketFilter func(*Packet) bool

type PacketHandler

type PacketHandler func(*Packet) error

type PacketQueue

type PacketQueue struct {
	C chan struct{} // notify channel
	// contains filtered or unexported fields
}

A memory-bound packet queue

func NewMessageQueue

func NewMessageQueue() *PacketQueue

func (*PacketQueue) Len

func (q *PacketQueue) Len() int

func (*PacketQueue) Notify

func (q *PacketQueue) Notify()

func (*PacketQueue) Peek

func (q *PacketQueue) Peek() *Packet

func (*PacketQueue) Pop

func (q *PacketQueue) Pop() *Packet

func (*PacketQueue) Push

func (q *PacketQueue) Push(v *Packet)

func (*PacketQueue) Reset

func (q *PacketQueue) Reset()

initializes or clears

type RoutePolicy

type RoutePolicy interface {
	IsLoopBack(*Router, *Packet) bool
	Multicast(*Router, *Packet) bool
	Lookup(*Router, *Packet) Endpoint
}

路由策略

func NewBasicRoutePolicy

func NewBasicRoutePolicy(endpoints *EndpointMap) RoutePolicy

type Router

type Router struct {
	*MessageSubscriber // 消息订阅
	*RoutingTable      // 路由表
	// contains filtered or unexported fields
}

路由器

func NewRouter

func NewRouter(node NodeID) *Router

func (*Router) AddPolicy

func (r *Router) AddPolicy(policy RoutePolicy)

func (*Router) IsLoopBack

func (r *Router) IsLoopBack(pkt *Packet) bool

func (*Router) NodeID

func (r *Router) NodeID() NodeID

func (*Router) Route

func (r *Router) Route(pkt *Packet) error

type RoutingTable

type RoutingTable struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewRoutingTable

func NewRoutingTable() *RoutingTable

func (*RoutingTable) AddEntry

func (r *RoutingTable) AddEntry(src, dst NodeID)

func (*RoutingTable) DeleteDestEntry

func (r *RoutingTable) DeleteDestEntry(dest NodeID)

func (*RoutingTable) DeleteEntry

func (r *RoutingTable) DeleteEntry(src NodeID)

func (*RoutingTable) EntryList

func (r *RoutingTable) EntryList() []RoutingTableEntry

func (*RoutingTable) GetEntry

func (r *RoutingTable) GetEntry(key NodeID) NodeID

type RoutingTableEntry

type RoutingTableEntry struct {
	// contains filtered or unexported fields
}

路由表

type Runnable

type Runnable struct {
	F func() error
}

func (*Runnable) Run

func (r *Runnable) Run() error

type Runner

type Runner interface {
	Run() error
}

Runner是一个可执行对象接口

func NewCapturedRunnable

func NewCapturedRunnable(f func() error) Runner

func NewRunner

func NewRunner(f func() error) Runner

type Scheduler

type Scheduler struct {
	C chan *TimerNode // 到期的定时器
	// contains filtered or unexported fields
}

func (*Scheduler) Cancel

func (s *Scheduler) Cancel(id int32) bool

func (*Scheduler) Go

func (s *Scheduler) Go()

func (*Scheduler) Init

func (s *Scheduler) Init() error

func (*Scheduler) RunAfter

func (s *Scheduler) RunAfter(delay int32, r Runner) int32

创建一个定时器,在`delay`毫秒后运行`r`

func (*Scheduler) RunEvery

func (s *Scheduler) RunEvery(delay int32, r Runner) int32

创建一个定时器,每隔`delay`毫秒运行一次`r`

func (*Scheduler) Shutdown

func (s *Scheduler) Shutdown()

type Service

type Service interface {
	ID() uint8
	Name() string
	NodeID() NodeID
	SetNodeID(NodeID)

	// 初始化、启动和关闭
	Init(*ServiceContext) error
	Startup() error
	Shutdown()

	// 服务上下文
	Context() *ServiceContext

	// 执行
	Execute(Runner) error
	Dispatch(*Packet) error
}

服务

func GetServiceByID

func GetServiceByID(srvType uint8) Service

根据服务ID获取Service对象

func GetServiceByName

func GetServiceByName(name string) Service

根据名称获取Service对象

type ServiceContext

type ServiceContext struct {
	// contains filtered or unexported fields
}

服务的上下文

func NewServiceContext

func NewServiceContext(opt *Options, env *Environ) *ServiceContext

func (*ServiceContext) AddFinalizer

func (c *ServiceContext) AddFinalizer(finalizer func())

func (*ServiceContext) Env

func (c *ServiceContext) Env() *Environ

func (*ServiceContext) Go

func (c *ServiceContext) Go()

func (*ServiceContext) InboundQueue

func (c *ServiceContext) InboundQueue() chan<- *Packet

func (*ServiceContext) IsClosing

func (c *ServiceContext) IsClosing() bool

func (*ServiceContext) Options

func (c *ServiceContext) Options() *Options

func (*ServiceContext) Router

func (c *ServiceContext) Router() *Router

func (*ServiceContext) SendMessage

func (c *ServiceContext) SendMessage(pkt *Packet) error

func (*ServiceContext) Service

func (c *ServiceContext) Service() Service

func (*ServiceContext) SetMessageFilter

func (c *ServiceContext) SetMessageFilter(f PacketFilter) PacketFilter

func (*ServiceContext) Shutdown

func (c *ServiceContext) Shutdown()

func (*ServiceContext) Start

func (c *ServiceContext) Start(srv Service) error

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

func NewStats

func NewStats(n int) *Stats

func (*Stats) Add

func (s *Stats) Add(field int, delta int64) (v int64)

func (*Stats) Clone

func (s *Stats) Clone() *Stats

func (*Stats) Get

func (s *Stats) Get(field int) (v int64)

func (*Stats) Set

func (s *Stats) Set(field int, v int64)

func (*Stats) Values

func (s *Stats) Values() []int64

type TimerHeap

type TimerHeap []*TimerNode

func (TimerHeap) Empty

func (q TimerHeap) Empty() bool

func (TimerHeap) Len

func (q TimerHeap) Len() int

func (TimerHeap) Less

func (q TimerHeap) Less(i, j int) bool

func (TimerHeap) Peek

func (q TimerHeap) Peek() *TimerNode

func (*TimerHeap) Pop

func (q *TimerHeap) Pop() interface{}

func (*TimerHeap) Push

func (q *TimerHeap) Push(x interface{})

func (TimerHeap) Swap

func (q TimerHeap) Swap(i, j int)

func (*TimerHeap) Update

func (q *TimerHeap) Update(item *TimerNode, priority int64)

type TimerNode

type TimerNode struct {
	Priority int64 // absolute expire time

	Index int32  // array index
	R     Runner // timer expire callback function
	// contains filtered or unexported fields
}

Directories

Path Synopsis
This package provides immutable UUID structs and the functions NewV3, NewV4, NewV5 and Parse() for generating versions 3, 4 and 5 UUIDs as specified in RFC 4122.
This package provides immutable UUID structs and the functions NewV3, NewV4, NewV5 and Parse() for generating versions 3, 4 and 5 UUIDs as specified in RFC 4122.
x

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL