fatchoy

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2021 License: BSD-3-Clause Imports: 32 Imported by: 0

README

fatchoy

一些基础组建的定义和实现

其它模块需要依赖这个包的定义

Documentation

Index

Constants

View Source
const (
	RUNTIME_EXECUTOR_CAPACITY      = "RUNTIME_EXECUTOR_CAPACITY"
	RUNTIME_EXECUTOR_CONCURRENCY   = "RUNTIME_EXECUTOR_CONCURRENCY"
	RUNTIME_CONTEXT_INBOUND_SIZE   = "RUNTIME_CONTEXT_INBOUND_SIZE"
	RUNTIME_CONTEXT_OUTBOUND_SIZE  = "RUNTIME_CONTEXT_OUTBOUND_SIZE"
	RUNTIME_ENDPOINT_OUTBOUND_SIZE = "RUNTIME_ENDPOINT_OUTBOUND_SIZE"
	NET_PEER_PING_INTERVAL         = "NET_PEER_PING_INTERVAL"
	NET_PEER_READ_INTERVAL         = "NET_PEER_READ_INTERVAL"
	NET_RPC_TTL                    = "NET_RPC_TTL"
	NET_SESSION_READ_TIMEOUT       = "NET_SESSION_READ_TIMEOUT"
	NET_INTERFACES                 = "NET_INTERFACES"
)
View Source
const (
	StatCommit int = iota
	StatTimer
	StatExec
	StatError
	StatDropped
	NumStats
)
View Source
const (
	NodeServiceShift = 16
	NodeServiceMask  = 0xFF00FFFF
	NodeInstanceMask = 0xFFFF0000
	NodeTypeShift    = 31
	NodeTypeClient   = NodeID(1 << NodeTypeShift)
)
View Source
const (
	PacketFlagError      = 0x0100
	PacketFlagRpc        = 0x0400
	PacketFlagJSONText   = 0x0800
	PacketFlagCompressed = 0x0001
	PacketFlagEncrypted  = 0x0002
)
View Source
const (
	TimerPrecision    = 10  // 精度为10ms
	TimerChanCapacity = 128 //
)

Variables

View Source
var (
	ErrOutboundQueueOverflow   = errors.New("outbound queue overflow")
	ErrPacketContextEmpty      = errors.New("packet dispatch context is empty")
	ErrDestinationNotReachable = errors.New("destination not reachable")
)
View Source
var (
	ErrExecutorClosed = errors.New("executor is closed")
)

Functions

func Backtrace

func Backtrace(message interface{}, f *os.File)

func Catch

func Catch()

func DateTime

func DateTime() string

func DecodeAsMsg

func DecodeAsMsg(value interface{}, msg proto.Message) error

解码为protobuf消息

func DecodeAsString

func DecodeAsString(value interface{}) string

解码为string

func DecodeU32

func DecodeU32(data []byte) (uint32, error)

解码Uint32

func EncodeNumber

func EncodeNumber(value interface{}, data []byte) []byte

编码数字, `data`需足够容量

func EncodeValue

func EncodeValue(value interface{}) ([]byte, error)

编码一个字符串、字节流、protobuf消息对象 编码后的字节用于传输,不能修改其内容

func GetServiceNames

func GetServiceNames() []string

所有服务类型名

func GetServiceTypeByName

func GetServiceTypeByName(name string) uint8

根据服务类型名获取服务类型

func Now

func Now() time.Time

func ParseNetInterface

func ParseNetInterface(text string) *protocol.InterfaceAddr

解析地址格式,对外地址@bind地址:端口,如example.com@0.0.0.0:9527

func Register

func Register(service Service)

注册服务

func StartClock

func StartClock()

开启时钟

func StopClock

func StopClock()

关闭时钟

func WallClock

func WallClock() *datetime.Clock

Types

type BasicRoutePolicy added in v1.0.4

type BasicRoutePolicy struct {
	// contains filtered or unexported fields
}

func (*BasicRoutePolicy) IsLoopBack added in v1.0.4

func (r *BasicRoutePolicy) IsLoopBack(router *Router, pkt *Packet) bool

func (*BasicRoutePolicy) Lookup added in v1.0.4

func (r *BasicRoutePolicy) Lookup(router *Router, pkt *Packet) Endpoint

路由查询

func (*BasicRoutePolicy) Multicast added in v1.0.4

func (r *BasicRoutePolicy) Multicast(router *Router, pkt *Packet) bool

广播

type CapturedRunnable

type CapturedRunnable struct {
	F func() error
}

会捕获panic的runner

func (*CapturedRunnable) Run

func (r *CapturedRunnable) Run() error

type Endpoint

type Endpoint interface {
	MessageEndpoint

	RawConn() net.Conn
	Stats() *Stats
	Encoder() ProtocolCodec

	Go(write, read bool)

	SetUserData(interface{})
	UserData() interface{}
}

网络连接端点

type EndpointMap

type EndpointMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

线程安全的endpoint map

func NewEndpointMap

func NewEndpointMap() *EndpointMap

func (*EndpointMap) Add

func (e *EndpointMap) Add(node NodeID, endpoint Endpoint)

func (*EndpointMap) Delete

func (e *EndpointMap) Delete(node NodeID) bool

func (*EndpointMap) Get

func (e *EndpointMap) Get(node NodeID) Endpoint

func (*EndpointMap) List

func (e *EndpointMap) List() []Endpoint

func (*EndpointMap) Reset

func (e *EndpointMap) Reset()

func (*EndpointMap) Size

func (e *EndpointMap) Size() int

type Environ

type Environ struct {
	protocol.Environ
	dotenv.Env
}

进程的环境, 代码内部都使用environ获取变量参数

func LoadEnviron

func LoadEnviron() *Environ

加载环境变量

func NewEnviron

func NewEnviron() *Environ

func (*Environ) IsProd

func (e *Environ) IsProd() bool

func (*Environ) SetByOption

func (e *Environ) SetByOption(opt *Options)

command line option只是一种设置environ的手段

func (Environ) String

func (e Environ) String() string

type Executor

type Executor struct {
	Scheduler
	// contains filtered or unexported fields
}

Runner执行器

func (*Executor) Busyness

func (e *Executor) Busyness() float32

繁忙度

func (*Executor) Execute

func (e *Executor) Execute(r Runner) error

func (*Executor) Go

func (e *Executor) Go()

func (*Executor) Init

func (e *Executor) Init(queueSize, concurrency int32) error

func (*Executor) Shutdown

func (e *Executor) Shutdown()

func (*Executor) Stats

func (e *Executor) Stats() *Stats

type MessageEndpoint

type MessageEndpoint interface {
	NodeID() NodeID
	SetNodeID(NodeID)
	RemoteAddr() string

	// 发送消息
	SendPacket(*Packet) error

	// 关闭读/写
	Close() error
	ForceClose(error)
	IsClosing() bool

	Context() *ServiceContext
	SetContext(*ServiceContext)
}

type MessageSubscriber

type MessageSubscriber struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

消息订阅

func NewMessageSub

func NewMessageSub() *MessageSubscriber

func (*MessageSubscriber) AddSubNode

func (s *MessageSubscriber) AddSubNode(start, end int32, node NodeID)

func (*MessageSubscriber) DeleteNodeSubs added in v1.0.4

func (s *MessageSubscriber) DeleteNodeSubs(dest NodeID)

func (*MessageSubscriber) DeleteSubNode added in v1.0.4

func (s *MessageSubscriber) DeleteSubNode(start, end int32, node NodeID)

func (*MessageSubscriber) GetSubNodes added in v1.0.4

func (s *MessageSubscriber) GetSubNodes(startMsg, endMsg int32) NodeIDSet

func (*MessageSubscriber) HasSubNodes added in v1.0.4

func (s *MessageSubscriber) HasSubNodes(startMsg, endMsg int32) bool

type MySQLConf

type MySQLConf struct {
	Addr     string
	User     string
	Password string
	Database string
}

MySQL配置

func (*MySQLConf) DSN

func (c *MySQLConf) DSN() string

type NetInterface

type NetInterface protocol.InterfaceAddr

func (NetInterface) AdvertiseInterface

func (i NetInterface) AdvertiseInterface() string

func (NetInterface) Interface

func (i NetInterface) Interface() string

type NodeID

type NodeID uint32

节点号

func MakeNodeID

func MakeNodeID(service uint8, instance uint16) NodeID

func MakeSessionNodeID

func MakeSessionNodeID(session uint32) NodeID

func MustParseNodeID

func MustParseNodeID(s string) NodeID

func (NodeID) Instance

func (n NodeID) Instance() uint16

实例编号

func (NodeID) IsBackend

func (n NodeID) IsBackend() bool

是否backend instance

func (NodeID) Service

func (n NodeID) Service() uint8

服务类型编号

func (*NodeID) SetInstance

func (n *NodeID) SetInstance(v uint16)

func (*NodeID) SetService

func (n *NodeID) SetService(v uint8)

func (NodeID) String

func (n NodeID) String() string

type NodeIDSet

type NodeIDSet []NodeID

没有重复Node的集合

func (NodeIDSet) Add added in v1.0.4

func (s NodeIDSet) Add(node NodeID) NodeIDSet

func (NodeIDSet) Copy

func (s NodeIDSet) Copy() []NodeID

func (NodeIDSet) Delete

func (s NodeIDSet) Delete(node NodeID) NodeIDSet

func (NodeIDSet) Has

func (s NodeIDSet) Has(node NodeID) int

type Options

type Options struct {
	ShowVersion       bool   `short:"v" long:"version" description:"version string"`
	List              bool   `short:"l" long:"list" description:"list available services"`
	EnvFile           string `short:"E" long:"envfile" description:"dotenv file path"`
	WorkingDir        string `short:"W" long:"workdir" description:"runtime working directory"`
	ResourceDir       string `short:"R" long:"resdir" description:"resource directory"`
	ServiceType       string `short:"S" long:"service-type" description:"name of service type"`
	ServiceIndex      uint16 `short:"N" long:"service-index" description:"instance index of this service"`
	ServiceDependency string `short:"P" long:"dependency" description:"service dependency list"`
	LogLevel          string `short:"L" long:"loglevel" description:"debug,info,warn,error,fatal,panic"`
	EtcdAddress       string `short:"F" long:"etcd-addr" description:"etcd host address"`
	EtcdKeySpace      string `short:"K" long:"keyspace" description:"etcd key prefix"`
	EtcdLeaseTTL      int    `long:"lease-ttl" description:"etcd lease key TTL"`
	PprofAddr         string `long:"pprof-addr" description:"pprof http listen address"`
	EnableSysLog      bool   `long:"enable-syslog" description:"enable write log to syslog/eventlog"`
	SysLogParams      string `long:"syslog-param" description:"syslog/eventlog parameters"`
}

命令行参数

func NewOptions

func NewOptions() *Options

func ParseOptions

func ParseOptions() (*Options, error)

Parse options from console

type Packet

type Packet struct {
	Command  uint32          `json:"cmd"`            // 消息ID
	Seq      uint16          `json:"seq"`            // 序列号
	Flag     uint16          `json:"flg,omitempty"`  // 标记位
	Node     NodeID          `json:"node,omitempty"` // 目标节点
	Body     interface{}     `json:"body,omitempty"` // 消息内容,number/string/bytes/pb.Message
	Endpoint MessageEndpoint `json:"-"`              // 关联的endpoint
}

应用层消息

func MakePacket

func MakePacket() *Packet

func NewPacket

func NewPacket(node NodeID, command uint32, flag, seq uint16, body interface{}) *Packet

func (*Packet) Ack

func (m *Packet) Ack(msgId int32, ack proto.Message) error

Reply with response message

func (*Packet) Clone

func (m *Packet) Clone() *Packet

func (*Packet) DecodeBodyAsString

func (m *Packet) DecodeBodyAsString() string

解码为string

func (*Packet) DecodeMsg

func (m *Packet) DecodeMsg(msg proto.Message) error

解码成protobuf message

func (*Packet) EncodeBody

func (m *Packet) EncodeBody() ([]byte, error)

func (*Packet) Errno

func (m *Packet) Errno() uint32

func (*Packet) Refuse

func (m *Packet) Refuse(command int32, errno uint32) error

返回一个错误码消息

func (*Packet) Reply

func (m *Packet) Reply(ack proto.Message) error

func (*Packet) ReplyAny

func (m *Packet) ReplyAny(command uint32, data interface{}) error

func (*Packet) Reset

func (m *Packet) Reset()

func (*Packet) Run

func (m *Packet) Run() error

func (*Packet) SetErrno

func (m *Packet) SetErrno(ec uint32)

func (Packet) String

func (m Packet) String() string

type PacketFilter

type PacketFilter func(*Packet) bool

type PacketHandler

type PacketHandler func(*Packet) error

type PacketQueue

type PacketQueue struct {
	C chan struct{} // notify channel
	// contains filtered or unexported fields
}

一个无边界限制的Packet队列

func NewPacketQueue

func NewPacketQueue() *PacketQueue

func (*PacketQueue) Len

func (q *PacketQueue) Len() int

func (*PacketQueue) Notify

func (q *PacketQueue) Notify()

func (*PacketQueue) Peek

func (q *PacketQueue) Peek() *Packet

取出队列头部元素

func (*PacketQueue) Pop

func (q *PacketQueue) Pop() *Packet

弹出队列

func (*PacketQueue) Push

func (q *PacketQueue) Push(v *Packet)

压入队列

func (*PacketQueue) Reset

func (q *PacketQueue) Reset()

type ProtocolCodec

type ProtocolCodec interface {
	ProtocolEncoder
	ProtocolDecoder
}

消息编解码,同样一个codec会在多个goroutine执行,需要多线程安全

type ProtocolDecoder

type ProtocolDecoder interface {
	Unmarshal(r io.Reader, pkt *Packet) (int, error)
}

type ProtocolEncoder

type ProtocolEncoder interface {
	Marshal(w io.Writer, pkt *Packet) error
}

type RoutePolicy

type RoutePolicy interface {
	IsLoopBack(*Router, *Packet) bool
	Multicast(*Router, *Packet) bool
	Lookup(*Router, *Packet) Endpoint
}

路由策略

func NewBasicRoutePolicy added in v1.0.4

func NewBasicRoutePolicy(endpoints *EndpointMap) RoutePolicy

type Router

type Router struct {
	*MessageSubscriber // 消息订阅
	*RoutingTable      // 路由表
	// contains filtered or unexported fields
}

路由器

func NewRouter

func NewRouter(node NodeID) *Router

func (*Router) AddPolicy

func (r *Router) AddPolicy(policy RoutePolicy)

func (*Router) IsLoopBack

func (r *Router) IsLoopBack(pkt *Packet) bool

func (*Router) NodeID

func (r *Router) NodeID() NodeID

func (*Router) Route

func (r *Router) Route(pkt *Packet) error

type RoutingTable

type RoutingTable struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewRoutingTable

func NewRoutingTable() *RoutingTable

func (*RoutingTable) AddEntry

func (r *RoutingTable) AddEntry(src, dst NodeID)

func (*RoutingTable) DeleteDestEntry

func (r *RoutingTable) DeleteDestEntry(dest NodeID)

func (*RoutingTable) DeleteEntry

func (r *RoutingTable) DeleteEntry(src NodeID)

func (*RoutingTable) EntryList

func (r *RoutingTable) EntryList() []RoutingTableEntry

func (*RoutingTable) GetEntry

func (r *RoutingTable) GetEntry(key NodeID) NodeID

type RoutingTableEntry

type RoutingTableEntry struct {
	// contains filtered or unexported fields
}

路由表

type Runnable

type Runnable struct {
	F func() error
}

func (*Runnable) Run

func (r *Runnable) Run() error

type Runner

type Runner interface {
	Run() error
}

Runner是一个可执行对象

func NewCapturedRunnable

func NewCapturedRunnable(f func() error) Runner

func NewRunner

func NewRunner(f func() error) Runner

type Scheduler

type Scheduler struct {
	C chan *TimerNode // 到期的定时器
	// contains filtered or unexported fields
}

func (*Scheduler) Cancel

func (s *Scheduler) Cancel(id int32) bool

func (*Scheduler) Go

func (s *Scheduler) Go()

func (*Scheduler) Init

func (s *Scheduler) Init() error

func (*Scheduler) RunAfter

func (s *Scheduler) RunAfter(interval int32, r Runner) int32

创建一个定时器,在`interval`毫秒后运行`r`

func (*Scheduler) RunEvery

func (s *Scheduler) RunEvery(interval int32, r Runner) int32

创建一个定时器,每隔`interval`毫秒运行一次`r`

func (*Scheduler) Shutdown

func (s *Scheduler) Shutdown()

type Service

type Service interface {
	ID() uint8
	Name() string
	NodeID() NodeID
	SetNodeID(NodeID)

	// 初始化、启动和关闭
	Init(*ServiceContext) error
	Startup() error
	Shutdown()

	// 服务上下文
	Context() *ServiceContext

	// 执行
	Execute(Runner) error
	Dispatch(*Packet) error
}

服务

func GetServiceByID

func GetServiceByID(srvType uint8) Service

根据服务ID获取Service对象

func GetServiceByName

func GetServiceByName(name string) Service

根据名称获取Service对象

type ServiceContext

type ServiceContext struct {
	// contains filtered or unexported fields
}

服务的上下文

func NewServiceContext

func NewServiceContext(env *Environ) *ServiceContext

func (*ServiceContext) AddFinalizer

func (c *ServiceContext) AddFinalizer(finalizer func())

func (*ServiceContext) Env

func (c *ServiceContext) Env() *Environ

func (*ServiceContext) Go

func (c *ServiceContext) Go()

func (*ServiceContext) InboundQueue

func (c *ServiceContext) InboundQueue() chan<- *Packet

func (*ServiceContext) IsClosing

func (c *ServiceContext) IsClosing() bool

func (*ServiceContext) Router

func (c *ServiceContext) Router() *Router

func (*ServiceContext) SendMessage

func (c *ServiceContext) SendMessage(pkt *Packet) error

func (*ServiceContext) Service

func (c *ServiceContext) Service() Service

func (*ServiceContext) SetMessageFilter

func (c *ServiceContext) SetMessageFilter(f PacketFilter) PacketFilter

func (*ServiceContext) Shutdown

func (c *ServiceContext) Shutdown()

func (*ServiceContext) Start

func (c *ServiceContext) Start(srv Service) error

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

一组计数器

func NewStats

func NewStats(n int) *Stats

func (*Stats) Add

func (s *Stats) Add(i int, delta int64) int64

func (*Stats) Clone

func (s *Stats) Clone() *Stats

func (*Stats) Copy

func (s *Stats) Copy() []int64

func (*Stats) Get

func (s *Stats) Get(i int) int64

func (*Stats) Set

func (s *Stats) Set(i int, v int64)

type TimerHeap

type TimerHeap []*TimerNode

func (TimerHeap) Empty

func (q TimerHeap) Empty() bool

func (TimerHeap) Len

func (q TimerHeap) Len() int

func (TimerHeap) Less

func (q TimerHeap) Less(i, j int) bool

func (TimerHeap) Peek

func (q TimerHeap) Peek() *TimerNode

func (*TimerHeap) Pop

func (q *TimerHeap) Pop() interface{}

func (*TimerHeap) Push

func (q *TimerHeap) Push(x interface{})

func (TimerHeap) Swap

func (q TimerHeap) Swap(i, j int)

func (*TimerHeap) Update

func (q *TimerHeap) Update(item *TimerNode, ts int64)

type TimerNode

type TimerNode struct {
	ExpireTs int64  // 超时时间
	R        Runner // 超时后执行的runner
	Index    int32  // 数组索引
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL