Documentation ¶
Index ¶
- Constants
- Variables
- func Backtrace(message interface{}, f *os.File)
- func Catch()
- func DateTime() string
- func DecodeAsMsg(value interface{}, msg proto.Message) error
- func DecodeAsString(value interface{}) string
- func DecodeU32(data []byte) (uint32, error)
- func EncodeNumber(value interface{}, data []byte) []byte
- func EncodeValue(value interface{}) ([]byte, error)
- func GetServiceNames() []string
- func GetServiceTypeByName(name string) uint8
- func Now() time.Time
- func ParseNetInterface(text string) *protocol.InterfaceAddr
- func Register(service Service)
- func StartClock()
- func StopClock()
- func WallClock() *datetime.Clock
- type BasicRoutePolicy
- type CapturedRunnable
- type Endpoint
- type EndpointMap
- type Environ
- type Executor
- type MessageEndpoint
- type MessageSubscriber
- func (s *MessageSubscriber) AddSubNode(start, end int32, node NodeID)
- func (s *MessageSubscriber) DeleteNodeSubs(dest NodeID)
- func (s *MessageSubscriber) DeleteSubNode(start, end int32, node NodeID)
- func (s *MessageSubscriber) GetSubNodes(startMsg, endMsg int32) NodeIDSet
- func (s *MessageSubscriber) HasSubNodes(startMsg, endMsg int32) bool
- type MySQLConf
- type NetInterface
- type NodeID
- type NodeIDSet
- type Options
- type Packet
- func (m *Packet) Ack(msgId int32, ack proto.Message) error
- func (m *Packet) Clone() *Packet
- func (m *Packet) DecodeBodyAsString() string
- func (m *Packet) DecodeMsg(msg proto.Message) error
- func (m *Packet) EncodeBody() ([]byte, error)
- func (m *Packet) Errno() uint32
- func (m *Packet) Refuse(command int32, errno uint32) error
- func (m *Packet) Reply(ack proto.Message) error
- func (m *Packet) ReplyAny(command uint32, data interface{}) error
- func (m *Packet) Reset()
- func (m *Packet) Run() error
- func (m *Packet) SetErrno(ec uint32)
- func (m Packet) String() string
- type PacketFilter
- type PacketHandler
- type PacketQueue
- type ProtocolCodec
- type ProtocolDecoder
- type ProtocolEncoder
- type RoutePolicy
- type Router
- type RoutingTable
- type RoutingTableEntry
- type Runnable
- type Runner
- type Scheduler
- type Service
- type ServiceContext
- func (c *ServiceContext) AddFinalizer(finalizer func())
- func (c *ServiceContext) Env() *Environ
- func (c *ServiceContext) Go()
- func (c *ServiceContext) InboundQueue() chan<- *Packet
- func (c *ServiceContext) IsClosing() bool
- func (c *ServiceContext) Router() *Router
- func (c *ServiceContext) SendMessage(pkt *Packet) error
- func (c *ServiceContext) Service() Service
- func (c *ServiceContext) SetMessageFilter(f PacketFilter) PacketFilter
- func (c *ServiceContext) Shutdown()
- func (c *ServiceContext) Start(srv Service) error
- type Stats
- type TimerHeap
- func (q TimerHeap) Empty() bool
- func (q TimerHeap) Len() int
- func (q TimerHeap) Less(i, j int) bool
- func (q TimerHeap) Peek() *TimerNode
- func (q *TimerHeap) Pop() interface{}
- func (q *TimerHeap) Push(x interface{})
- func (q TimerHeap) Swap(i, j int)
- func (q *TimerHeap) Update(item *TimerNode, ts int64)
- type TimerNode
Constants ¶
View Source
const ( RUNTIME_EXECUTOR_CAPACITY = "RUNTIME_EXECUTOR_CAPACITY" RUNTIME_EXECUTOR_CONCURRENCY = "RUNTIME_EXECUTOR_CONCURRENCY" RUNTIME_CONTEXT_INBOUND_SIZE = "RUNTIME_CONTEXT_INBOUND_SIZE" RUNTIME_CONTEXT_OUTBOUND_SIZE = "RUNTIME_CONTEXT_OUTBOUND_SIZE" RUNTIME_ENDPOINT_OUTBOUND_SIZE = "RUNTIME_ENDPOINT_OUTBOUND_SIZE" NET_PEER_PING_INTERVAL = "NET_PEER_PING_INTERVAL" NET_PEER_READ_INTERVAL = "NET_PEER_READ_INTERVAL" NET_RPC_TTL = "NET_RPC_TTL" NET_SESSION_READ_TIMEOUT = "NET_SESSION_READ_TIMEOUT" NET_INTERFACES = "NET_INTERFACES" )
View Source
const ( StatCommit int = iota StatTimer StatExec StatError StatDropped NumStats )
View Source
const ( NodeServiceShift = 16 NodeServiceMask = 0xFF00FFFF NodeInstanceMask = 0xFFFF0000 NodeTypeShift = 31 NodeTypeClient = NodeID(1 << NodeTypeShift) )
View Source
const ( PacketFlagError = 0x0100 PacketFlagRpc = 0x0400 PacketFlagJSONText = 0x0800 PacketFlagCompressed = 0x0001 PacketFlagEncrypted = 0x0002 )
View Source
const ( TimerPrecision = 10 // 精度为10ms TimerChanCapacity = 128 // )
Variables ¶
View Source
var ( ErrOutboundQueueOverflow = errors.New("outbound queue overflow") ErrPacketContextEmpty = errors.New("packet dispatch context is empty") ErrDestinationNotReachable = errors.New("destination not reachable") )
View Source
var (
ErrExecutorClosed = errors.New("executor is closed")
)
Functions ¶
func EncodeValue ¶
编码一个字符串、字节流、protobuf消息对象 编码后的字节用于传输,不能修改其内容
func ParseNetInterface ¶
func ParseNetInterface(text string) *protocol.InterfaceAddr
解析地址格式,对外地址@bind地址:端口,如example.com@0.0.0.0:9527
Types ¶
type BasicRoutePolicy ¶ added in v1.0.4
type BasicRoutePolicy struct {
// contains filtered or unexported fields
}
func (*BasicRoutePolicy) IsLoopBack ¶ added in v1.0.4
func (r *BasicRoutePolicy) IsLoopBack(router *Router, pkt *Packet) bool
type CapturedRunnable ¶
type CapturedRunnable struct {
F func() error
}
会捕获panic的runner
func (*CapturedRunnable) Run ¶
func (r *CapturedRunnable) Run() error
type Endpoint ¶
type Endpoint interface { MessageEndpoint RawConn() net.Conn Stats() *Stats Encoder() ProtocolCodec Go(write, read bool) SetUserData(interface{}) UserData() interface{} }
网络连接端点
type EndpointMap ¶
线程安全的endpoint map
func NewEndpointMap ¶
func NewEndpointMap() *EndpointMap
func (*EndpointMap) Add ¶
func (e *EndpointMap) Add(node NodeID, endpoint Endpoint)
func (*EndpointMap) Delete ¶
func (e *EndpointMap) Delete(node NodeID) bool
func (*EndpointMap) Get ¶
func (e *EndpointMap) Get(node NodeID) Endpoint
func (*EndpointMap) List ¶
func (e *EndpointMap) List() []Endpoint
func (*EndpointMap) Reset ¶
func (e *EndpointMap) Reset()
func (*EndpointMap) Size ¶
func (e *EndpointMap) Size() int
type Environ ¶
进程的环境, 代码内部都使用environ获取变量参数
func NewEnviron ¶
func NewEnviron() *Environ
func (*Environ) SetByOption ¶
command line option只是一种设置environ的手段
type Executor ¶
type Executor struct { Scheduler // contains filtered or unexported fields }
Runner执行器
type MessageEndpoint ¶
type MessageEndpoint interface { NodeID() NodeID SetNodeID(NodeID) RemoteAddr() string // 发送消息 SendPacket(*Packet) error // 关闭读/写 Close() error ForceClose(error) IsClosing() bool Context() *ServiceContext SetContext(*ServiceContext) }
type MessageSubscriber ¶
消息订阅
func NewMessageSub ¶
func NewMessageSub() *MessageSubscriber
func (*MessageSubscriber) AddSubNode ¶
func (s *MessageSubscriber) AddSubNode(start, end int32, node NodeID)
func (*MessageSubscriber) DeleteNodeSubs ¶ added in v1.0.4
func (s *MessageSubscriber) DeleteNodeSubs(dest NodeID)
func (*MessageSubscriber) DeleteSubNode ¶ added in v1.0.4
func (s *MessageSubscriber) DeleteSubNode(start, end int32, node NodeID)
func (*MessageSubscriber) GetSubNodes ¶ added in v1.0.4
func (s *MessageSubscriber) GetSubNodes(startMsg, endMsg int32) NodeIDSet
func (*MessageSubscriber) HasSubNodes ¶ added in v1.0.4
func (s *MessageSubscriber) HasSubNodes(startMsg, endMsg int32) bool
type NetInterface ¶
type NetInterface protocol.InterfaceAddr
func (NetInterface) AdvertiseInterface ¶
func (i NetInterface) AdvertiseInterface() string
func (NetInterface) Interface ¶
func (i NetInterface) Interface() string
type NodeID ¶
type NodeID uint32
节点号
func MakeNodeID ¶
func MakeSessionNodeID ¶
func MustParseNodeID ¶
func (*NodeID) SetInstance ¶
func (*NodeID) SetService ¶
type Options ¶
type Options struct { ShowVersion bool `short:"v" long:"version" description:"version string"` List bool `short:"l" long:"list" description:"list available services"` EnvFile string `short:"E" long:"envfile" description:"dotenv file path"` WorkingDir string `short:"W" long:"workdir" description:"runtime working directory"` ResourceDir string `short:"R" long:"resdir" description:"resource directory"` ServiceType string `short:"S" long:"service-type" description:"name of service type"` ServiceIndex uint16 `short:"N" long:"service-index" description:"instance index of this service"` ServiceDependency string `short:"P" long:"dependency" description:"service dependency list"` LogLevel string `short:"L" long:"loglevel" description:"debug,info,warn,error,fatal,panic"` EtcdAddress string `short:"F" long:"etcd-addr" description:"etcd host address"` EtcdKeySpace string `short:"K" long:"keyspace" description:"etcd key prefix"` EtcdLeaseTTL int `long:"lease-ttl" description:"etcd lease key TTL"` PprofAddr string `long:"pprof-addr" description:"pprof http listen address"` EnableSysLog bool `long:"enable-syslog" description:"enable write log to syslog/eventlog"` SysLogParams string `long:"syslog-param" description:"syslog/eventlog parameters"` }
命令行参数
func NewOptions ¶
func NewOptions() *Options
type Packet ¶
type Packet struct { Command uint32 `json:"cmd"` // 消息ID Seq uint16 `json:"seq"` // 序列号 Flag uint16 `json:"flg,omitempty"` // 标记位 Node NodeID `json:"node,omitempty"` // 目标节点 Body interface{} `json:"body,omitempty"` // 消息内容,number/string/bytes/pb.Message Endpoint MessageEndpoint `json:"-"` // 关联的endpoint }
应用层消息
func MakePacket ¶
func MakePacket() *Packet
func (*Packet) EncodeBody ¶
type PacketFilter ¶
type PacketHandler ¶
type PacketQueue ¶
type PacketQueue struct { C chan struct{} // notify channel // contains filtered or unexported fields }
一个无边界限制的Packet队列
func NewPacketQueue ¶
func NewPacketQueue() *PacketQueue
func (*PacketQueue) Len ¶
func (q *PacketQueue) Len() int
func (*PacketQueue) Notify ¶
func (q *PacketQueue) Notify()
func (*PacketQueue) Reset ¶
func (q *PacketQueue) Reset()
type ProtocolCodec ¶
type ProtocolCodec interface { ProtocolEncoder ProtocolDecoder }
消息编解码,同样一个codec会在多个goroutine执行,需要多线程安全
type ProtocolDecoder ¶
type RoutePolicy ¶
type RoutePolicy interface { IsLoopBack(*Router, *Packet) bool Multicast(*Router, *Packet) bool Lookup(*Router, *Packet) Endpoint }
路由策略
func NewBasicRoutePolicy ¶ added in v1.0.4
func NewBasicRoutePolicy(endpoints *EndpointMap) RoutePolicy
type Router ¶
type Router struct { *MessageSubscriber // 消息订阅 *RoutingTable // 路由表 // contains filtered or unexported fields }
路由器
func (*Router) AddPolicy ¶
func (r *Router) AddPolicy(policy RoutePolicy)
func (*Router) IsLoopBack ¶
type RoutingTable ¶
func NewRoutingTable ¶
func NewRoutingTable() *RoutingTable
func (*RoutingTable) AddEntry ¶
func (r *RoutingTable) AddEntry(src, dst NodeID)
func (*RoutingTable) DeleteDestEntry ¶
func (r *RoutingTable) DeleteDestEntry(dest NodeID)
func (*RoutingTable) DeleteEntry ¶
func (r *RoutingTable) DeleteEntry(src NodeID)
func (*RoutingTable) EntryList ¶
func (r *RoutingTable) EntryList() []RoutingTableEntry
func (*RoutingTable) GetEntry ¶
func (r *RoutingTable) GetEntry(key NodeID) NodeID
type RoutingTableEntry ¶
type RoutingTableEntry struct {
// contains filtered or unexported fields
}
路由表
type Scheduler ¶
type Scheduler struct { C chan *TimerNode // 到期的定时器 // contains filtered or unexported fields }
type Service ¶
type Service interface { ID() uint8 Name() string NodeID() NodeID SetNodeID(NodeID) // 初始化、启动和关闭 Init(*ServiceContext) error Startup() error Shutdown() // 服务上下文 Context() *ServiceContext // 执行 Execute(Runner) error Dispatch(*Packet) error }
服务
type ServiceContext ¶
type ServiceContext struct {
// contains filtered or unexported fields
}
服务的上下文
func NewServiceContext ¶
func NewServiceContext(env *Environ) *ServiceContext
func (*ServiceContext) AddFinalizer ¶
func (c *ServiceContext) AddFinalizer(finalizer func())
func (*ServiceContext) Env ¶
func (c *ServiceContext) Env() *Environ
func (*ServiceContext) Go ¶
func (c *ServiceContext) Go()
func (*ServiceContext) InboundQueue ¶
func (c *ServiceContext) InboundQueue() chan<- *Packet
func (*ServiceContext) IsClosing ¶
func (c *ServiceContext) IsClosing() bool
func (*ServiceContext) Router ¶
func (c *ServiceContext) Router() *Router
func (*ServiceContext) SendMessage ¶
func (c *ServiceContext) SendMessage(pkt *Packet) error
func (*ServiceContext) Service ¶
func (c *ServiceContext) Service() Service
func (*ServiceContext) SetMessageFilter ¶
func (c *ServiceContext) SetMessageFilter(f PacketFilter) PacketFilter
func (*ServiceContext) Shutdown ¶
func (c *ServiceContext) Shutdown()
func (*ServiceContext) Start ¶
func (c *ServiceContext) Start(srv Service) error
Click to show internal directories.
Click to hide internal directories.