README ¶
Tao --- 轻量级TCP异步框架,Go语言实现
Light-weight TCP Asynchronous gOlang framework
Announcing Tao 1.4 - Release Notes
- bugfix:TLS重连失败问题;
bugfix: failed to reconnect the TLS connection; - bugfix:ConnectionMap死锁问题;
bugfix: ConnectionMap dead-lock problem; - 优化TCP网络连接的关闭过程;
Optimize the closing process of TCP connection; - 优化服务器的关闭过程;
Optimize the closing process of server; - 更优雅的消息处理注册接口;
More elegant message handler register interface;
Announcing Tao 1.3 - Release Notes
- bugfix:修复断线重连状态不一致问题;
bugfix: fixed inconsistent status caused by reconnecting; - bugfix:修复ServerConnection和TimingWheel在连接关闭时并发访问导致崩溃问题;
bugfix: fixed a corruption caused by concurrent accessing between ServerConnection and TimingWheel during connection closing; - 无锁且线程安全的TimingWheel,优化CPU占用率;
Lock-free and thread-safe TimingWheel, optimized occupancy rate; - bugfix:修复TLS配置文件读取函数;
bugfix: Fixed errors when loading TLS config; - 添加消息相关的Context结构;简化消息注册机制,直接注册处理函数到HandlerMap;
A message-related Context struct added; Register handler functions in HandlerMap directly to simplify message registration mechanism; - 合并NewClientConnection()和NewTLSClientConnection(),提供一致的API;
Combine NewTLSConnection() into NewClientConnection(), providing a consistent API; - 工作者线程池改造成单例模式;
Make WorkerPool a singleton pattern; - 使用Holmes日志库代替glog;
Using Holmes logging package instead of glog; - 添加metrics.go:基于expvar标准包导出服务器关键信息;
Add metrics.go: exporting critical server information based on expvar standard pacakge; - 编写中文版框架设计原理文档,英文版正在翻译中;
A document about framework designing principles in Chinese, English version under developed;
Announcing Tao 1.2 - Release Notes
- 更优雅的消息注册接口;
More elegant message register interface; - TCPConnection的断线重连机制;
TCPConnection reconnecting upon closing; - bugfix:协议未注册时不关闭客户端连接;
bugfix: Don't close client when messages not registered; - bugfix:在readLoop()协程中处理心跳时间戳更新;
bugfix: Updating heart-beat timestamp in readLoop() go-routine; - bugfix:Message接口使用Serialize()替代之前的MarshalBinary(),以免框架使用者使用gob.Encoder/Decoder的时候栈溢出;
bugfix: Use Serialize() instead of MarshalBinary() in Message interface, preventing stack overflows when framework users use gob.Encoder/Decoder; - bugfix:当应用层数据长度大于0时才对其进行序列化;
bugfix: Serialize application data when its length greater than 0; - 新API:SetCodec(),允许TCPConnection自定义编解码器;
New API: SetCodec() allowing TCPConnection defines its own codec; - 新API:SetDBInitializer(),允许框架使用者定义数据访问接口;
New API: SetDBInitializer() allowing framework users define data access interface; - 允许框架使用者在TCPConnection上设置自定义数据;
Allowing framework users setting custom data on TCPConnection; - 为新客户端连接的启动单独开辟一个对应的go协程;
Allocating a corresponding go-routine for newly-connected clients respectively; - bugfix:写事件循环在连接关闭时将信道中的数据全部发送出去;
bugfix: writeLoop() flushes all packets left in channel when performing closing; - bugfix:服务器和客户端连接等待所有go协程关闭后再退出;
bugfix: Servers and client connections wait for the exits of all go-routines before shutting down; - 重构Server和Connection,采用针对接口编程的设计;
Refactoring Server and Connection, adopting a programming-by-interface design; - 设置500毫秒读超时,防止readLoop()发生阻塞;
Setting 500ms read-timeout prevents readLoop() from blocking;
Announcing Tao 1.1 - Release Notes
- 添加注释,提高代码可读性;
Add comments, make it more readable; - 限制服务器的最大并发连接数(默认1000);
Server max connections limit (default to 1000); - 新API:NewTLSTCPServer() 创建传输层安全的TCP服务器;
New API: NewTLSTCPServer() for creating TLS-supported TCP server; - 新特性:SetOnScheduleCallback() 由框架使用者来定义计划任务(比如心跳);
New Feature: SetOnScheduleCallback() make scheduled task managed by framwork users(such as heart beat); - 新特性:支持默认的消息编解码器TypeLengthValueCodec,并允许框架使用者开发自定义编解码器;
Support TypeLengthValueCodec by default, while allowing framework users develop their own codecs;
Announcing Tao 1.0 - Release Notes
- 完全异步的读,写以及消息处理;
Completely asynchronous reading, writing and message handling; - 工作者协程池;
Worker go-routine pool; - 并发数据结构和原子数据类型;
Concurrent data structure and atomic data types; - 毫秒精度的定时器功能;
Millisecond-precision timer function; - 传输层安全支持;
Transport layer security support; - 应用层心跳协议;
Application-level heart-beating protocol;
Documentation
- Tao - Go语言实现的TCP网络编程框架
- English(TBD)
Chat Server Example
package main
import (
"fmt"
"runtime"
"github.com/leesper/tao"
"github.com/leesper/tao/examples/chat"
"github.com/reechou/holmes"
)
type ChatServer struct {
tao.Server
}
func NewChatServer(addr string) *ChatServer {
return &ChatServer {
tao.NewTCPServer(addr),
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
defer holmes.Start().Stop()
tao.MonitorOn(12345)
tao.Register(chat.CHAT_MESSAGE, chat.DeserializeChatMessage, chat.ProcessChatMessage)
chatServer := NewChatServer(fmt.Sprintf("%s:%d", "0.0.0.0", 18341))
chatServer.SetOnConnectCallback(func(conn tao.Connection) bool {
holmes.Info("%s", "On connect")
return true
})
chatServer.SetOnErrorCallback(func() {
holmes.Info("%s", "On error")
})
chatServer.SetOnCloseCallback(func(conn tao.Connection) {
holmes.Info("%s", "Closing chat client")
})
chatServer.Start()
}
TODO list:
- Add more use-case examples;
- Add logger support;
Documentation ¶
Overview ¶
Worker pool is a pool of go-routines running for executing callbacks,
each client's message handler is permanently hashed into one specified worker to execute, so it is in-order for each client's perspective.
Index ¶
- Constants
- Variables
- func GetHandler(msgType int32) handlerFunc
- func GetUnmarshaler(msgType int32) unmarshalFunc
- func LoadTLSConfig(certFile, keyFile string, isSkipVerify bool) (tls.Config, error)
- func MonitorOn(port int)
- func ProcessHeartBeatMessage(ctx Context, conn Connection)
- func Register(msgType int32, unmarshaler func([]byte) (Message, error), ...)
- func Undefined(msgType int32) error
- type AtomicBoolean
- type AtomicInt32
- func (a *AtomicInt32) AddAndGet(delta int32) int32
- func (a *AtomicInt32) CompareAndSet(expect, update int32) bool
- func (a *AtomicInt32) DecrementAndGet() int32
- func (a *AtomicInt32) Get() int32
- func (a *AtomicInt32) GetAndAdd(delta int32) int32
- func (a *AtomicInt32) GetAndDecrement() int32
- func (a *AtomicInt32) GetAndIncrement() int32
- func (a *AtomicInt32) GetAndSet(newValue int32) (oldValue int32)
- func (a *AtomicInt32) IncrementAndGet() int32
- func (a *AtomicInt32) Set(newValue int32)
- func (a *AtomicInt32) String() string
- type AtomicInt64
- func (a *AtomicInt64) AddAndGet(delta int64) int64
- func (a *AtomicInt64) CompareAndSet(expect, update int64) bool
- func (a *AtomicInt64) DecrementAndGet() int64
- func (a *AtomicInt64) Get() int64
- func (a *AtomicInt64) GetAndAdd(delta int64) int64
- func (a *AtomicInt64) GetAndDecrement() int64
- func (a *AtomicInt64) GetAndIncrement() int64
- func (a *AtomicInt64) GetAndSet(newValue int64) int64
- func (a *AtomicInt64) IncrementAndGet() int64
- func (a *AtomicInt64) Set(newValue int64)
- func (a *AtomicInt64) String() string
- type ClientConnection
- func (client *ClientConnection) CancelTimer(timerId int64)
- func (client *ClientConnection) Close()
- func (client *ClientConnection) GetCloseChannel() chan struct{}
- func (client *ClientConnection) GetExtraData() interface{}
- func (client *ClientConnection) GetHeartBeat() int64
- func (client *ClientConnection) GetMessageCodec() Codec
- func (client *ClientConnection) GetMessageHandlerChannel() chan MessageHandler
- func (client *ClientConnection) GetMessageSendChannel() chan []byte
- func (client *ClientConnection) GetName() string
- func (client *ClientConnection) GetNetId() int64
- func (client *ClientConnection) GetOnCloseCallback() onCloseFunc
- func (client *ClientConnection) GetOnConnectCallback() onConnectFunc
- func (client *ClientConnection) GetOnErrorCallback() onErrorFunc
- func (client *ClientConnection) GetOnMessageCallback() onMessageFunc
- func (client *ClientConnection) GetPendingTimers() []int64
- func (client *ClientConnection) GetRawConn() net.Conn
- func (client *ClientConnection) GetRemoteAddress() net.Addr
- func (client *ClientConnection) GetTimeOutChannel() chan *OnTimeOut
- func (client *ClientConnection) GetTimingWheel() *TimingWheel
- func (client *ClientConnection) IsRunning() bool
- func (client *ClientConnection) RunAfter(duration time.Duration, callback func(time.Time, interface{})) int64
- func (client *ClientConnection) RunAt(timestamp time.Time, callback func(time.Time, interface{})) int64
- func (client *ClientConnection) RunEvery(interval time.Duration, callback func(time.Time, interface{})) int64
- func (client *ClientConnection) SetExtraData(extra interface{})
- func (client *ClientConnection) SetHeartBeat(beat int64)
- func (client *ClientConnection) SetMessageCodec(codec Codec)
- func (client *ClientConnection) SetName(name string)
- func (client *ClientConnection) SetNetId(netid int64)
- func (client *ClientConnection) SetOnCloseCallback(callback func(Connection))
- func (client *ClientConnection) SetOnConnectCallback(callback func(Connection) bool)
- func (client *ClientConnection) SetOnErrorCallback(callback func())
- func (client *ClientConnection) SetOnMessageCallback(callback func(Message, Connection))
- func (client *ClientConnection) SetPendingTimers(pending []int64)
- func (client *ClientConnection) Start()
- func (client *ClientConnection) Write(message Message) error
- type Codec
- type ConcurrentMap
- func (cm *ConcurrentMap) Clear()
- func (cm *ConcurrentMap) ContainsKey(k interface{}) bool
- func (cm *ConcurrentMap) Get(k interface{}) (interface{}, bool)
- func (cm *ConcurrentMap) IsEmpty() bool
- func (cm *ConcurrentMap) IterItems() <-chan Item
- func (cm *ConcurrentMap) IterKeys() <-chan interface{}
- func (cm *ConcurrentMap) IterValues() <-chan interface{}
- func (cm *ConcurrentMap) Put(k, v interface{}) error
- func (cm *ConcurrentMap) PutIfAbsent(k, v interface{}) error
- func (cm *ConcurrentMap) Remove(k interface{}) error
- func (cm *ConcurrentMap) Size() int
- type Connection
- type ConnectionMap
- type Context
- type ErrorUndefined
- type Hashable
- type HeartBeatMessage
- type Item
- type Message
- type MessageHandler
- type OnTimeOut
- type Server
- type ServerConnection
- func (conn *ServerConnection) CancelTimer(timerId int64)
- func (conn *ServerConnection) Close()
- func (conn *ServerConnection) GetCloseChannel() chan struct{}
- func (conn *ServerConnection) GetExtraData() interface{}
- func (conn *ServerConnection) GetHeartBeat() int64
- func (conn *ServerConnection) GetMessageCodec() Codec
- func (conn *ServerConnection) GetMessageHandlerChannel() chan MessageHandler
- func (conn *ServerConnection) GetMessageSendChannel() chan []byte
- func (conn *ServerConnection) GetName() string
- func (conn *ServerConnection) GetNetId() int64
- func (conn *ServerConnection) GetOnCloseCallback() onCloseFunc
- func (conn *ServerConnection) GetOnConnectCallback() onConnectFunc
- func (conn *ServerConnection) GetOnErrorCallback() onErrorFunc
- func (conn *ServerConnection) GetOnMessageCallback() onMessageFunc
- func (conn *ServerConnection) GetOwner() *TCPServer
- func (conn *ServerConnection) GetPendingTimers() []int64
- func (conn *ServerConnection) GetRawConn() net.Conn
- func (conn *ServerConnection) GetRemoteAddress() net.Addr
- func (conn *ServerConnection) GetTimeOutChannel() chan *OnTimeOut
- func (conn *ServerConnection) GetTimingWheel() *TimingWheel
- func (conn *ServerConnection) IsRunning() bool
- func (conn *ServerConnection) RunAfter(duration time.Duration, callback func(time.Time, interface{})) int64
- func (conn *ServerConnection) RunAt(timestamp time.Time, callback func(time.Time, interface{})) int64
- func (conn *ServerConnection) RunEvery(interval time.Duration, callback func(time.Time, interface{})) int64
- func (conn *ServerConnection) SetExtraData(extra interface{})
- func (conn *ServerConnection) SetHeartBeat(beat int64)
- func (conn *ServerConnection) SetMessageCodec(codec Codec)
- func (conn *ServerConnection) SetName(name string)
- func (conn *ServerConnection) SetNetId(netid int64)
- func (conn *ServerConnection) SetOnCloseCallback(callback func(Connection))
- func (conn *ServerConnection) SetOnConnectCallback(callback func(Connection) bool)
- func (conn *ServerConnection) SetOnErrorCallback(callback func())
- func (conn *ServerConnection) SetOnMessageCallback(callback func(Message, Connection))
- func (conn *ServerConnection) SetPendingTimers(pending []int64)
- func (conn *ServerConnection) Start()
- func (conn *ServerConnection) Write(message Message) error
- type TCPServer
- func (server *TCPServer) Close()
- func (server *TCPServer) GetConnectionMap() *ConnectionMap
- func (server *TCPServer) GetConnections() []Connection
- func (server *TCPServer) GetOnCloseCallback() onCloseFunc
- func (server *TCPServer) GetOnConnectCallback() onConnectFunc
- func (server *TCPServer) GetOnErrorCallback() onErrorFunc
- func (server *TCPServer) GetOnMessageCallback() onMessageFunc
- func (server *TCPServer) GetOnScheduleCallback() (time.Duration, onScheduleFunc)
- func (server *TCPServer) GetServerAddress() string
- func (server *TCPServer) GetTimeOutChannel() chan *OnTimeOut
- func (server *TCPServer) GetTimingWheel() *TimingWheel
- func (server *TCPServer) IsRunning() bool
- func (server *TCPServer) SetOnCloseCallback(callback func(Connection))
- func (server *TCPServer) SetOnConnectCallback(callback func(Connection) bool)
- func (server *TCPServer) SetOnErrorCallback(callback func())
- func (server *TCPServer) SetOnMessageCallback(callback func(Message, Connection))
- func (server *TCPServer) SetOnScheduleCallback(duration time.Duration, callback func(time.Time, interface{}))
- func (server *TCPServer) Start()
- type TLSTCPServer
- func (server *TLSTCPServer) Close()
- func (server *TLSTCPServer) GetConnectionMap() *ConnectionMap
- func (server *TLSTCPServer) GetConnections() []Connection
- func (server *TLSTCPServer) GetOnCloseCallback() onCloseFunc
- func (server *TLSTCPServer) GetOnConnectCallback() onConnectFunc
- func (server *TLSTCPServer) GetOnErrorCallback() onErrorFunc
- func (server *TLSTCPServer) GetOnMessageCallback() onMessageFunc
- func (server *TLSTCPServer) GetOnScheduleCallback() (time.Duration, onScheduleFunc)
- func (server *TLSTCPServer) GetServerAddress() string
- func (server *TLSTCPServer) GetTimingWheel() *TimingWheel
- func (server *TLSTCPServer) IsRunning() bool
- func (server *TLSTCPServer) SetOnCloseCallback(callback func(Connection))
- func (server *TLSTCPServer) SetOnConnectCallback(callback func(Connection) bool)
- func (server *TLSTCPServer) SetOnErrorCallback(callback func())
- func (server *TLSTCPServer) SetOnMessageCallback(callback func(Message, Connection))
- func (server *TLSTCPServer) SetOnScheduleCallback(duration time.Duration, callback func(time.Time, interface{}))
- func (server *TLSTCPServer) Start()
- type TimingWheel
- type TypeLengthValueCodec
- type WorkerPool
Constants ¶
const ( NTYPE = 4 NLEN = 4 MAXLEN = 1 << 23 // 8M )
const ( WORKERS = 20 MAX_CONNECTIONS = 5000 )
const (
HEART_BEAT = 0
)
0 is the preserved heart beat message number, you can define your own.
const INITIAL_SHARD_SIZE = 16
Variables ¶
var ( ErrorParameter error = errors.New("Parameter error") ErrorNilKey error = errors.New("Nil key") ErrorNilValue error = errors.New("Nil value") ErrorWouldBlock error = errors.New("Would block") ErrorNotHashable error = errors.New("Not hashable") ErrorNilData error = errors.New("Nil data") ErrorIllegalData error = errors.New("More than 8M data") ErrorNotImplemented error = errors.New("Not implemented") ErrorConnClosed error = errors.New("Connection closed") )
Functions ¶
func GetHandler ¶
func GetHandler(msgType int32) handlerFunc
GetHandler returns the corresponding handler function for msgType.
func GetUnmarshaler ¶
func GetUnmarshaler(msgType int32) unmarshalFunc
GetUnmarshaler returns the corresponding unmarshal function for msgType.
func LoadTLSConfig ¶
func ProcessHeartBeatMessage ¶
func ProcessHeartBeatMessage(ctx Context, conn Connection)
func Register ¶
func Register(msgType int32, unmarshaler func([]byte) (Message, error), handler func(Context, Connection))
Register registers the unmarshal and handle functions for msgType. If no unmarshal function provided, the message will not be parsed. If no handler function provided, the message will not be handled unless you set a default one by calling SetOnMessageCallback. If Register being called twice on one msgType, it will panics.
Types ¶
type AtomicBoolean ¶
type AtomicBoolean int32
func NewAtomicBoolean ¶
func NewAtomicBoolean(initialValue bool) *AtomicBoolean
func (*AtomicBoolean) CompareAndSet ¶
func (a *AtomicBoolean) CompareAndSet(oldValue, newValue bool) bool
func (*AtomicBoolean) Get ¶
func (a *AtomicBoolean) Get() bool
func (*AtomicBoolean) GetAndSet ¶
func (a *AtomicBoolean) GetAndSet(newValue bool) bool
func (*AtomicBoolean) Set ¶
func (a *AtomicBoolean) Set(newValue bool)
func (*AtomicBoolean) String ¶
func (a *AtomicBoolean) String() string
type AtomicInt32 ¶
type AtomicInt32 int32
func NewAtomicInt32 ¶
func NewAtomicInt32(initialValue int32) *AtomicInt32
func (*AtomicInt32) AddAndGet ¶
func (a *AtomicInt32) AddAndGet(delta int32) int32
func (*AtomicInt32) CompareAndSet ¶
func (a *AtomicInt32) CompareAndSet(expect, update int32) bool
func (*AtomicInt32) DecrementAndGet ¶
func (a *AtomicInt32) DecrementAndGet() int32
func (*AtomicInt32) Get ¶
func (a *AtomicInt32) Get() int32
func (*AtomicInt32) GetAndAdd ¶
func (a *AtomicInt32) GetAndAdd(delta int32) int32
func (*AtomicInt32) GetAndDecrement ¶
func (a *AtomicInt32) GetAndDecrement() int32
func (*AtomicInt32) GetAndIncrement ¶
func (a *AtomicInt32) GetAndIncrement() int32
func (*AtomicInt32) GetAndSet ¶
func (a *AtomicInt32) GetAndSet(newValue int32) (oldValue int32)
func (*AtomicInt32) IncrementAndGet ¶
func (a *AtomicInt32) IncrementAndGet() int32
func (*AtomicInt32) Set ¶
func (a *AtomicInt32) Set(newValue int32)
func (*AtomicInt32) String ¶
func (a *AtomicInt32) String() string
type AtomicInt64 ¶
type AtomicInt64 int64
func NewAtomicInt64 ¶
func NewAtomicInt64(initialValue int64) *AtomicInt64
func (*AtomicInt64) AddAndGet ¶
func (a *AtomicInt64) AddAndGet(delta int64) int64
func (*AtomicInt64) CompareAndSet ¶
func (a *AtomicInt64) CompareAndSet(expect, update int64) bool
func (*AtomicInt64) DecrementAndGet ¶
func (a *AtomicInt64) DecrementAndGet() int64
func (*AtomicInt64) Get ¶
func (a *AtomicInt64) Get() int64
func (*AtomicInt64) GetAndAdd ¶
func (a *AtomicInt64) GetAndAdd(delta int64) int64
func (*AtomicInt64) GetAndDecrement ¶
func (a *AtomicInt64) GetAndDecrement() int64
func (*AtomicInt64) GetAndIncrement ¶
func (a *AtomicInt64) GetAndIncrement() int64
func (*AtomicInt64) GetAndSet ¶
func (a *AtomicInt64) GetAndSet(newValue int64) int64
func (*AtomicInt64) IncrementAndGet ¶
func (a *AtomicInt64) IncrementAndGet() int64
func (*AtomicInt64) Set ¶
func (a *AtomicInt64) Set(newValue int64)
func (*AtomicInt64) String ¶
func (a *AtomicInt64) String() string
type ClientConnection ¶
type ClientConnection struct {
// contains filtered or unexported fields
}
implements Connection
func (*ClientConnection) CancelTimer ¶
func (client *ClientConnection) CancelTimer(timerId int64)
func (*ClientConnection) Close ¶
func (client *ClientConnection) Close()
func (*ClientConnection) GetCloseChannel ¶
func (client *ClientConnection) GetCloseChannel() chan struct{}
func (*ClientConnection) GetExtraData ¶
func (client *ClientConnection) GetExtraData() interface{}
func (*ClientConnection) GetHeartBeat ¶
func (client *ClientConnection) GetHeartBeat() int64
func (*ClientConnection) GetMessageCodec ¶
func (client *ClientConnection) GetMessageCodec() Codec
func (*ClientConnection) GetMessageHandlerChannel ¶
func (client *ClientConnection) GetMessageHandlerChannel() chan MessageHandler
func (*ClientConnection) GetMessageSendChannel ¶
func (client *ClientConnection) GetMessageSendChannel() chan []byte
func (*ClientConnection) GetName ¶
func (client *ClientConnection) GetName() string
func (*ClientConnection) GetNetId ¶
func (client *ClientConnection) GetNetId() int64
func (*ClientConnection) GetOnCloseCallback ¶
func (client *ClientConnection) GetOnCloseCallback() onCloseFunc
func (*ClientConnection) GetOnConnectCallback ¶
func (client *ClientConnection) GetOnConnectCallback() onConnectFunc
func (*ClientConnection) GetOnErrorCallback ¶
func (client *ClientConnection) GetOnErrorCallback() onErrorFunc
func (*ClientConnection) GetOnMessageCallback ¶
func (client *ClientConnection) GetOnMessageCallback() onMessageFunc
func (*ClientConnection) GetPendingTimers ¶
func (client *ClientConnection) GetPendingTimers() []int64
func (*ClientConnection) GetRawConn ¶
func (client *ClientConnection) GetRawConn() net.Conn
func (*ClientConnection) GetRemoteAddress ¶
func (client *ClientConnection) GetRemoteAddress() net.Addr
func (*ClientConnection) GetTimeOutChannel ¶
func (client *ClientConnection) GetTimeOutChannel() chan *OnTimeOut
func (*ClientConnection) GetTimingWheel ¶
func (client *ClientConnection) GetTimingWheel() *TimingWheel
func (*ClientConnection) IsRunning ¶
func (client *ClientConnection) IsRunning() bool
func (*ClientConnection) SetExtraData ¶
func (client *ClientConnection) SetExtraData(extra interface{})
func (*ClientConnection) SetHeartBeat ¶
func (client *ClientConnection) SetHeartBeat(beat int64)
func (*ClientConnection) SetMessageCodec ¶
func (client *ClientConnection) SetMessageCodec(codec Codec)
func (*ClientConnection) SetName ¶
func (client *ClientConnection) SetName(name string)
func (*ClientConnection) SetNetId ¶
func (client *ClientConnection) SetNetId(netid int64)
func (*ClientConnection) SetOnCloseCallback ¶
func (client *ClientConnection) SetOnCloseCallback(callback func(Connection))
func (*ClientConnection) SetOnConnectCallback ¶
func (client *ClientConnection) SetOnConnectCallback(callback func(Connection) bool)
func (*ClientConnection) SetOnErrorCallback ¶
func (client *ClientConnection) SetOnErrorCallback(callback func())
func (*ClientConnection) SetOnMessageCallback ¶
func (client *ClientConnection) SetOnMessageCallback(callback func(Message, Connection))
func (*ClientConnection) SetPendingTimers ¶
func (client *ClientConnection) SetPendingTimers(pending []int64)
func (*ClientConnection) Start ¶
func (client *ClientConnection) Start()
func (*ClientConnection) Write ¶
func (client *ClientConnection) Write(message Message) error
type Codec ¶
Codec is the interface for message coder and decoder. Application programmer can define a custom codec themselves.
type ConcurrentMap ¶
type ConcurrentMap struct {
// contains filtered or unexported fields
}
func NewConcurrentMap ¶
func NewConcurrentMap() *ConcurrentMap
func (*ConcurrentMap) Clear ¶
func (cm *ConcurrentMap) Clear()
func (*ConcurrentMap) ContainsKey ¶
func (cm *ConcurrentMap) ContainsKey(k interface{}) bool
func (*ConcurrentMap) Get ¶
func (cm *ConcurrentMap) Get(k interface{}) (interface{}, bool)
func (*ConcurrentMap) IsEmpty ¶
func (cm *ConcurrentMap) IsEmpty() bool
func (*ConcurrentMap) IterItems ¶
func (cm *ConcurrentMap) IterItems() <-chan Item
func (*ConcurrentMap) IterKeys ¶
func (cm *ConcurrentMap) IterKeys() <-chan interface{}
func (*ConcurrentMap) IterValues ¶
func (cm *ConcurrentMap) IterValues() <-chan interface{}
func (*ConcurrentMap) Put ¶
func (cm *ConcurrentMap) Put(k, v interface{}) error
func (*ConcurrentMap) PutIfAbsent ¶
func (cm *ConcurrentMap) PutIfAbsent(k, v interface{}) error
func (*ConcurrentMap) Remove ¶
func (cm *ConcurrentMap) Remove(k interface{}) error
func (*ConcurrentMap) Size ¶
func (cm *ConcurrentMap) Size() int
type Connection ¶
type Connection interface { SetNetId(netid int64) GetNetId() int64 SetName(name string) GetName() string SetHeartBeat(beat int64) GetHeartBeat() int64 SetExtraData(extra interface{}) GetExtraData() interface{} SetMessageCodec(codec Codec) GetMessageCodec() Codec SetPendingTimers(pending []int64) GetPendingTimers() []int64 SetOnConnectCallback(callback func(Connection) bool) GetOnConnectCallback() onConnectFunc SetOnMessageCallback(callback func(Message, Connection)) GetOnMessageCallback() onMessageFunc SetOnErrorCallback(callback func()) GetOnErrorCallback() onErrorFunc SetOnCloseCallback(callback func(Connection)) GetOnCloseCallback() onCloseFunc Start() Close() IsRunning() bool Write(message Message) error RunAt(t time.Time, cb func(time.Time, interface{})) int64 RunAfter(d time.Duration, cb func(time.Time, interface{})) int64 RunEvery(i time.Duration, cb func(time.Time, interface{})) int64 GetTimingWheel() *TimingWheel CancelTimer(timerId int64) GetRawConn() net.Conn GetMessageSendChannel() chan []byte GetMessageHandlerChannel() chan MessageHandler GetCloseChannel() chan struct{} GetTimeOutChannel() chan *OnTimeOut GetRemoteAddress() net.Addr }
func NewClientConnection ¶
func NewServerConnection ¶
func NewServerConnection(netid int64, server *TCPServer, conn net.Conn) Connection
type ConnectionMap ¶
func NewConnectionMap ¶
func NewConnectionMap() *ConnectionMap
func (*ConnectionMap) Clear ¶
func (cm *ConnectionMap) Clear()
func (*ConnectionMap) Get ¶
func (cm *ConnectionMap) Get(k int64) (Connection, bool)
func (*ConnectionMap) IsEmpty ¶
func (cm *ConnectionMap) IsEmpty() bool
func (*ConnectionMap) Put ¶
func (cm *ConnectionMap) Put(k int64, v Connection)
func (*ConnectionMap) Remove ¶
func (cm *ConnectionMap) Remove(k int64)
func (*ConnectionMap) Size ¶
func (cm *ConnectionMap) Size() int
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context is the context info for every handler function. Handler function handles the business logic about message. We can find the client connection who sent this message by netid and send back responses.
func NewContext ¶
type ErrorUndefined ¶
type ErrorUndefined struct {
// contains filtered or unexported fields
}
func (ErrorUndefined) Error ¶
func (eu ErrorUndefined) Error() string
type HeartBeatMessage ¶
type HeartBeatMessage struct {
Timestamp int64
}
HeartBeatMessage for long-term connection keeping alive.
func (HeartBeatMessage) MessageNumber ¶
func (hbm HeartBeatMessage) MessageNumber() int32
func (HeartBeatMessage) Serialize ¶
func (hbm HeartBeatMessage) Serialize() ([]byte, error)
type Message ¶
Message represents the structured data that can be handled.
type MessageHandler ¶
type MessageHandler struct {
// contains filtered or unexported fields
}
type OnTimeOut ¶
func NewOnTimeOut ¶
type Server ¶
type Server interface { IsRunning() bool GetConnections() []Connection GetConnectionMap() *ConnectionMap GetTimingWheel() *TimingWheel GetServerAddress() string Start() Close() SetOnScheduleCallback(time.Duration, func(time.Time, interface{})) GetOnScheduleCallback() (time.Duration, onScheduleFunc) SetOnConnectCallback(func(Connection) bool) GetOnConnectCallback() onConnectFunc SetOnMessageCallback(func(Message, Connection)) GetOnMessageCallback() onMessageFunc SetOnCloseCallback(func(Connection)) GetOnCloseCallback() onCloseFunc SetOnErrorCallback(func()) GetOnErrorCallback() onErrorFunc }
func NewTCPServer ¶
func NewTLSTCPServer ¶
type ServerConnection ¶
type ServerConnection struct {
// contains filtered or unexported fields
}
func (*ServerConnection) CancelTimer ¶
func (conn *ServerConnection) CancelTimer(timerId int64)
func (*ServerConnection) Close ¶
func (conn *ServerConnection) Close()
func (*ServerConnection) GetCloseChannel ¶
func (conn *ServerConnection) GetCloseChannel() chan struct{}
func (*ServerConnection) GetExtraData ¶
func (conn *ServerConnection) GetExtraData() interface{}
func (*ServerConnection) GetHeartBeat ¶
func (conn *ServerConnection) GetHeartBeat() int64
func (*ServerConnection) GetMessageCodec ¶
func (conn *ServerConnection) GetMessageCodec() Codec
func (*ServerConnection) GetMessageHandlerChannel ¶
func (conn *ServerConnection) GetMessageHandlerChannel() chan MessageHandler
func (*ServerConnection) GetMessageSendChannel ¶
func (conn *ServerConnection) GetMessageSendChannel() chan []byte
func (*ServerConnection) GetName ¶
func (conn *ServerConnection) GetName() string
func (*ServerConnection) GetNetId ¶
func (conn *ServerConnection) GetNetId() int64
func (*ServerConnection) GetOnCloseCallback ¶
func (conn *ServerConnection) GetOnCloseCallback() onCloseFunc
func (*ServerConnection) GetOnConnectCallback ¶
func (conn *ServerConnection) GetOnConnectCallback() onConnectFunc
func (*ServerConnection) GetOnErrorCallback ¶
func (conn *ServerConnection) GetOnErrorCallback() onErrorFunc
func (*ServerConnection) GetOnMessageCallback ¶
func (conn *ServerConnection) GetOnMessageCallback() onMessageFunc
func (*ServerConnection) GetOwner ¶
func (conn *ServerConnection) GetOwner() *TCPServer
func (*ServerConnection) GetPendingTimers ¶
func (conn *ServerConnection) GetPendingTimers() []int64
func (*ServerConnection) GetRawConn ¶
func (conn *ServerConnection) GetRawConn() net.Conn
func (*ServerConnection) GetRemoteAddress ¶
func (conn *ServerConnection) GetRemoteAddress() net.Addr
func (*ServerConnection) GetTimeOutChannel ¶
func (conn *ServerConnection) GetTimeOutChannel() chan *OnTimeOut
func (*ServerConnection) GetTimingWheel ¶
func (conn *ServerConnection) GetTimingWheel() *TimingWheel
func (*ServerConnection) IsRunning ¶
func (conn *ServerConnection) IsRunning() bool
func (*ServerConnection) SetExtraData ¶
func (conn *ServerConnection) SetExtraData(extra interface{})
func (*ServerConnection) SetHeartBeat ¶
func (conn *ServerConnection) SetHeartBeat(beat int64)
func (*ServerConnection) SetMessageCodec ¶
func (conn *ServerConnection) SetMessageCodec(codec Codec)
func (*ServerConnection) SetName ¶
func (conn *ServerConnection) SetName(name string)
func (*ServerConnection) SetNetId ¶
func (conn *ServerConnection) SetNetId(netid int64)
func (*ServerConnection) SetOnCloseCallback ¶
func (conn *ServerConnection) SetOnCloseCallback(callback func(Connection))
func (*ServerConnection) SetOnConnectCallback ¶
func (conn *ServerConnection) SetOnConnectCallback(callback func(Connection) bool)
func (*ServerConnection) SetOnErrorCallback ¶
func (conn *ServerConnection) SetOnErrorCallback(callback func())
func (*ServerConnection) SetOnMessageCallback ¶
func (conn *ServerConnection) SetOnMessageCallback(callback func(Message, Connection))
func (*ServerConnection) SetPendingTimers ¶
func (conn *ServerConnection) SetPendingTimers(pending []int64)
func (*ServerConnection) Start ¶
func (conn *ServerConnection) Start()
func (*ServerConnection) Write ¶
func (conn *ServerConnection) Write(message Message) error
type TCPServer ¶
type TCPServer struct {
// contains filtered or unexported fields
}
func (*TCPServer) GetConnectionMap ¶
func (server *TCPServer) GetConnectionMap() *ConnectionMap
func (*TCPServer) GetConnections ¶
func (server *TCPServer) GetConnections() []Connection
func (*TCPServer) GetOnCloseCallback ¶
func (server *TCPServer) GetOnCloseCallback() onCloseFunc
func (*TCPServer) GetOnConnectCallback ¶
func (server *TCPServer) GetOnConnectCallback() onConnectFunc
func (*TCPServer) GetOnErrorCallback ¶
func (server *TCPServer) GetOnErrorCallback() onErrorFunc
func (*TCPServer) GetOnMessageCallback ¶
func (server *TCPServer) GetOnMessageCallback() onMessageFunc
func (*TCPServer) GetOnScheduleCallback ¶
func (*TCPServer) GetServerAddress ¶
func (*TCPServer) GetTimeOutChannel ¶
func (*TCPServer) GetTimingWheel ¶
func (server *TCPServer) GetTimingWheel() *TimingWheel
func (*TCPServer) SetOnCloseCallback ¶
func (server *TCPServer) SetOnCloseCallback(callback func(Connection))
func (*TCPServer) SetOnConnectCallback ¶
func (server *TCPServer) SetOnConnectCallback(callback func(Connection) bool)
func (*TCPServer) SetOnErrorCallback ¶
func (server *TCPServer) SetOnErrorCallback(callback func())
func (*TCPServer) SetOnMessageCallback ¶
func (server *TCPServer) SetOnMessageCallback(callback func(Message, Connection))
func (*TCPServer) SetOnScheduleCallback ¶
type TLSTCPServer ¶
type TLSTCPServer struct { *TCPServer // contains filtered or unexported fields }
func (*TLSTCPServer) Close ¶
func (server *TLSTCPServer) Close()
func (*TLSTCPServer) GetConnectionMap ¶
func (server *TLSTCPServer) GetConnectionMap() *ConnectionMap
func (*TLSTCPServer) GetConnections ¶
func (server *TLSTCPServer) GetConnections() []Connection
func (*TLSTCPServer) GetOnCloseCallback ¶
func (server *TLSTCPServer) GetOnCloseCallback() onCloseFunc
func (*TLSTCPServer) GetOnConnectCallback ¶
func (server *TLSTCPServer) GetOnConnectCallback() onConnectFunc
func (*TLSTCPServer) GetOnErrorCallback ¶
func (server *TLSTCPServer) GetOnErrorCallback() onErrorFunc
func (*TLSTCPServer) GetOnMessageCallback ¶
func (server *TLSTCPServer) GetOnMessageCallback() onMessageFunc
func (*TLSTCPServer) GetOnScheduleCallback ¶
func (server *TLSTCPServer) GetOnScheduleCallback() (time.Duration, onScheduleFunc)
func (*TLSTCPServer) GetServerAddress ¶
func (server *TLSTCPServer) GetServerAddress() string
func (*TLSTCPServer) GetTimingWheel ¶
func (server *TLSTCPServer) GetTimingWheel() *TimingWheel
func (*TLSTCPServer) IsRunning ¶
func (server *TLSTCPServer) IsRunning() bool
func (*TLSTCPServer) SetOnCloseCallback ¶
func (server *TLSTCPServer) SetOnCloseCallback(callback func(Connection))
func (*TLSTCPServer) SetOnConnectCallback ¶
func (server *TLSTCPServer) SetOnConnectCallback(callback func(Connection) bool)
func (*TLSTCPServer) SetOnErrorCallback ¶
func (server *TLSTCPServer) SetOnErrorCallback(callback func())
func (*TLSTCPServer) SetOnMessageCallback ¶
func (server *TLSTCPServer) SetOnMessageCallback(callback func(Message, Connection))
func (*TLSTCPServer) SetOnScheduleCallback ¶
func (server *TLSTCPServer) SetOnScheduleCallback(duration time.Duration, callback func(time.Time, interface{}))
func (*TLSTCPServer) Start ¶
func (server *TLSTCPServer) Start()
type TimingWheel ¶
type TimingWheel struct {
// contains filtered or unexported fields
}
func NewTimingWheel ¶
func NewTimingWheel() *TimingWheel
func (*TimingWheel) CancelTimer ¶
func (tw *TimingWheel) CancelTimer(timerId int64)
func (*TimingWheel) GetTimeOutChannel ¶
func (tw *TimingWheel) GetTimeOutChannel() chan *OnTimeOut
func (*TimingWheel) Size ¶
func (tw *TimingWheel) Size() int
func (*TimingWheel) Stop ¶
func (tw *TimingWheel) Stop()
type TypeLengthValueCodec ¶
type TypeLengthValueCodec struct{}
Format: type-length-value |4 bytes|4 bytes|n bytes <= 8M|
func (TypeLengthValueCodec) Decode ¶
func (codec TypeLengthValueCodec) Decode(c Connection) (Message, error)
Decode decodes the bytes data into Message
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
<<singleton>>
func WorkerPoolInstance ¶
func WorkerPoolInstance() *WorkerPool
func (*WorkerPool) Close ¶
func (wp *WorkerPool) Close()
func (*WorkerPool) Put ¶
func (wp *WorkerPool) Put(k interface{}, cb func()) error