README
¶
Tao
Light-weight TCP Asynchronous gOlang framework 轻量级TCP异步框架,Go语言实现 1.6.0
Requirements
- Golang 1.9 and above
Installation
go get -u -v github.com/Gz3zFork/tao
Usage
A Chat Server Example in 50 Lines
package main
import (
"fmt"
"net"
"github.com/leesper/holmes"
"github.com/Gz3zFork/tao"
"github.com/Gz3zFork/tao/examples/chat"
)
// ChatServer is the chatting server.
type ChatServer struct {
*tao.Server
}
// NewChatServer returns a ChatServer.
func NewChatServer() *ChatServer {
onConnectOption := tao.OnConnectOption(func(conn tao.WriteCloser) bool {
holmes.Infoln("on connect")
return true
})
onErrorOption := tao.OnErrorOption(func(conn tao.WriteCloser) {
holmes.Infoln("on error")
})
onCloseOption := tao.OnCloseOption(func(conn tao.WriteCloser) {
holmes.Infoln("close chat client")
})
return &ChatServer{
tao.NewServer(onConnectOption, onErrorOption, onCloseOption),
}
}
func main() {
defer holmes.Start().Stop()
tao.Register(chat.ChatMessage, chat.DeserializeMessage, chat.ProcessMessage)
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "0.0.0.0", 12345))
if err != nil {
holmes.Fatalln("listen error", err)
}
chatServer := NewChatServer()
err = chatServer.Start(l)
if err != nil {
holmes.Fatalln("start error", err)
}
}
Changelog
v1.6.0
- Bugfix: writeLoop() drains all pending messages before exit;
writeLoop()函数退出前将所有的网络数据包发送完毕; - Renaming getter methods according to Effective Go;
根据Effective Go重命名getter方法; - Bugfix: timer task expired forever due to system clock affected by NTP;
修复因为受NTP协议校正系统时钟偏差的影响,导致定时任务永远过期的bug; - Bugfix: asyncWrite() do not return error if called after ServerConn or ClientConn closes;
修复网络连接关闭后调用asyncWrite()不返回错误的bug; - Providing WorkerSizeOption() for tuning the size of worker go-routine pool;
提供WorkerSizeOption()来调节工作者线程池大小; - Providing BufferSizeOption() for tuning the size of buffered channel;
提供BufferSizeOption()来调节缓冲通道大小; - Providing ReconnectOption() for activating ClientConn's reconnecting mechanism;
提供ReconnectOption()来启动ClientConn的断线重连机制; - Providing CustomCodecOption() for setting self-defined codec;
提供CustomCodecOption() 来设置自定义编解码器; - Providing TLSCredsOption() for running a TLS server;
提供TLSCredsOption()来运行TLS服务器; - Providing OnConnectOption(), OnMessageOption(), OnCloseOption() and OnErrorOption() for setting callbacks of the four situations respectively;
提供OnConnectOption(), OnMessageOption(), OnCloseOption() 和 OnErrorOption()来设置四种情况下的回调函数; - Use the standard sync.Map instead of map guarded by rwmutex;
使用标准库中的sync.Map替换使用rwmutex保护的map;
v1.5.0
- A Golang-style redesigning of the overall framework, a reduce about 500+ lines of codes;
按照Go语言风格重新设计的整体框架,精简500多行代码; - Providing new Server, ClientConn and ServerConn struct and a WriteCloser interface;
提供Server,ClientConn和ServerConn三种新结构和WriteCloser新接口; - Using standard context package to manage and spread request-scoped data acrossing go-routines;
使用标准库中的context包在多个Go线程中管理和传播与请求有关的数据; - Graceful stopping, all go-routines are related by context, and they will be noticed and exit when server stops or connection closes;
优雅停机,所有的Go线程都通过上下文进行关联,当服务器停机或连接关闭时它们都会收到通知并执行退出; - Providing new type HandlerFunc func(context.Context, WriteCloser) for defining message handlers;
提供新的HandlerFunc类型来定义消息处理器; - Developers can now use NewContextWithMessage() and MessageFromContext() to put and get message they are about to use in handler function's context, this also leads to a more clarified design;
开发者现在可以通过NewContextWithMessage()和MessageFromContext()函数来在上下文中存取他们将在处理器函数中使用的消息,这样的设计更简洁; - Go-routine functions readLoop(), writeLoop() and handleLoop() are all optimized to serve both ServerConn and ClientConn, serveral dead-lock bugs such as blocking on channels are fixed;
优化Go线程函数readLoop(),writeLoop()和handleLoop()使得它们能同时为ServerConn和ClientConn服务,修复了多个“通道阻塞”的死锁问题; - Reconnecting mechanism of ClientConn is redesigned and optimized;
重新设计和优化ClientConn的断线重连机制;
v1.4.0
- bugfix:TLS重连失败问题;
bugfix: failed to reconnect the TLS connection; - bugfix:ConnectionMap死锁问题;
bugfix: ConnectionMap dead-lock problem; - 优化TCP网络连接的关闭过程;
Optimize the closing process of TCP connection; - 优化服务器的关闭过程;
Optimize the closing process of server; - 更优雅的消息处理注册接口;
More elegant message handler register interface;
v1.3.0
- bugfix:修复断线重连状态不一致问题;
bugfix: fixed inconsistent status caused by reconnecting; - bugfix:修复ServerConnection和TimingWheel在连接关闭时并发访问导致崩溃问题;
bugfix: fixed a corruption caused by concurrent accessing between ServerConnection and TimingWheel during connection closing; - 无锁且线程安全的TimingWheel,优化CPU占用率;
Lock-free and thread-safe TimingWheel, optimized occupancy rate; - bugfix:修复TLS配置文件读取函数;
bugfix: Fixed errors when loading TLS config; - 添加消息相关的Context结构;简化消息注册机制,直接注册处理函数到HandlerMap;
A message-related Context struct added; Register handler functions in HandlerMap directly to simplify message registration mechanism; - 合并NewClientConnection()和NewTLSClientConnection(),提供一致的API;
Combine NewTLSConnection() into NewClientConnection(), providing a consistent API; - 工作者线程池改造成单例模式;
Make WorkerPool a singleton pattern; - 使用Holmes日志库代替glog;
Using Holmes logging package instead of glog; - 添加metrics.go:基于expvar标准包导出服务器关键信息;
Add metrics.go: exporting critical server information based on expvar standard pacakge; - 编写中文版框架设计原理文档,英文版正在翻译中;
A document about framework designing principles in Chinese, English version under developed;
v1.2.0
- 更优雅的消息注册接口;
More elegant message register interface; - TCPConnection的断线重连机制;
TCPConnection reconnecting upon closing; - bugfix:协议未注册时不关闭客户端连接;
bugfix: Don't close client when messages not registered; - bugfix:在readLoop()协程中处理心跳时间戳更新;
bugfix: Updating heart-beat timestamp in readLoop() go-routine; - bugfix:Message接口使用Serialize()替代之前的MarshalBinary(),以免框架使用者使用gob.Encoder/Decoder的时候栈溢出;
bugfix: Use Serialize() instead of MarshalBinary() in Message interface, preventing stack overflows when framework users use gob.Encoder/Decoder; - bugfix:当应用层数据长度大于0时才对其进行序列化;
bugfix: Serialize application data when its length greater than 0; - 新API:SetCodec(),允许TCPConnection自定义编解码器;
New API: SetCodec() allowing TCPConnection defines its own codec; - 新API:SetDBInitializer(),允许框架使用者定义数据访问接口;
New API: SetDBInitializer() allowing framework users define data access interface; - 允许框架使用者在TCPConnection上设置自定义数据;
Allowing framework users setting custom data on TCPConnection; - 为新客户端连接的启动单独开辟一个对应的go协程;
Allocating a corresponding go-routine for newly-connected clients respectively; - bugfix:写事件循环在连接关闭时将信道中的数据全部发送出去;
bugfix: writeLoop() flushes all packets left in channel when performing closing; - bugfix:服务器和客户端连接等待所有go协程关闭后再退出;
bugfix: Servers and client connections wait for the exits of all go-routines before shutting down; - 重构Server和Connection,采用针对接口编程的设计;
Refactoring Server and Connection, adopting a programming-by-interface design; - 设置500毫秒读超时,防止readLoop()发生阻塞;
Setting 500ms read-timeout prevents readLoop() from blocking;
v1.1.0
- 添加注释,提高代码可读性;
Add comments, make it more readable; - 限制服务器的最大并发连接数(默认1000);
Server max connections limit (default to 1000); - 新API:NewTLSTCPServer() 创建传输层安全的TCP服务器;
New API: NewTLSTCPServer() for creating TLS-supported TCP server; - 新特性:SetOnScheduleCallback() 由框架使用者来定义计划任务(比如心跳);
New Feature: SetOnScheduleCallback() make scheduled task managed by framwork users(such as heart beat); - 新特性:支持默认的消息编解码器TypeLengthValueCodec,并允许框架使用者开发自定义编解码器;
Support TypeLengthValueCodec by default, while allowing framework users develop their own codecs;
v1.0.0
- 完全异步的读,写以及消息处理;
Completely asynchronous reading, writing and message handling; - 工作者协程池;
Worker go-routine pool; - 并发数据结构和原子数据类型;
Concurrent data structure and atomic data types; - 毫秒精度的定时器功能;
Millisecond-precision timer function; - 传输层安全支持;
Transport layer security support; - 应用层心跳协议;
Application-level heart-beating protocol;
More Documentation
- Tao - Go语言实现的TCP网络编程框架
- English(TBD)
Documentation
¶
Overview ¶
Package tao implements a light-weight TCP network programming framework.
Server represents a TCP server with various ServerOption supported.
1. Provides custom codec by CustomCodecOption; 2. Provides TLS server by TLSCredsOption; 3. Provides callback on connected by OnConnectOption; 4. Provides callback on meesage arrived by OnMessageOption; 5. Provides callback on closed by OnCloseOption; 6. Provides callback on error occurred by OnErrorOption;
ServerConn represents a connection on the server side.
ClientConn represents a connection connect to other servers. You can make it reconnectable by passing ReconnectOption when creating.
AtomicInt64, AtomicInt32 and AtomicBoolean are providing concurrent-safe atomic types in a Java-like style while ConnMap is a go-routine safe map for connection management.
Every handler function is defined as func(context.Context, WriteCloser). Usually a meesage and a net ID are shifted within the Context, developers can retrieve them by calling the following functions.
func NewContextWithMessage(ctx context.Context, msg Message) context.Context func MessageFromContext(ctx context.Context) Message func NewContextWithNetID(ctx context.Context, netID int64) context.Context func NetIDFromContext(ctx context.Context) int64
Programmers are free to define their own request-scoped data and put them in the context, but must be sure that the data is safe for multiple go-routines to access.
Every message must define according to the interface and a deserialization function:
type Message interface { MessageNumber() int32 Serialize() ([]byte, error) } func Deserialize(data []byte) (message Message, err error)
There is a TypeLengthValueCodec defined, but one can also define his/her own codec:
type Codec interface { Decode(net.Conn) (Message, error) Encode(Message) ([]byte, error) }
TimingWheel is a safe timer for running timed callbacks on connection.
WorkerPool is a go-routine pool for running message handlers, you can fetch one by calling func WorkerPoolInstance() *WorkerPool.
Index ¶
- Constants
- Variables
- func HandleHeartBeat(ctx context.Context, c WriteCloser)
- func LoadTLSConfig(certFile, keyFile string, isSkipVerify bool) (*tls.Config, error)
- func MonitorOn(port int)
- func NetIDFromContext(ctx context.Context) int64
- func NewContextWithMessage(ctx context.Context, msg Message) context.Context
- func NewContextWithNetID(ctx context.Context, netID int64) context.Context
- func Register(msgType int32, unmarshaler func([]byte) (Message, error), ...)
- type AtomicBoolean
- type AtomicInt32
- func (a *AtomicInt32) AddAndGet(delta int32) int32
- func (a *AtomicInt32) CompareAndSet(expect, update int32) bool
- func (a *AtomicInt32) DecrementAndGet() int32
- func (a *AtomicInt32) Get() int32
- func (a *AtomicInt32) GetAndAdd(delta int32) int32
- func (a *AtomicInt32) GetAndDecrement() int32
- func (a *AtomicInt32) GetAndIncrement() int32
- func (a *AtomicInt32) GetAndSet(newValue int32) (oldValue int32)
- func (a *AtomicInt32) IncrementAndGet() int32
- func (a *AtomicInt32) Set(newValue int32)
- func (a *AtomicInt32) String() string
- type AtomicInt64
- func (a *AtomicInt64) AddAndGet(delta int64) int64
- func (a *AtomicInt64) CompareAndSet(expect, update int64) bool
- func (a *AtomicInt64) DecrementAndGet() int64
- func (a *AtomicInt64) Get() int64
- func (a *AtomicInt64) GetAndAdd(delta int64) int64
- func (a *AtomicInt64) GetAndDecrement() int64
- func (a *AtomicInt64) GetAndIncrement() int64
- func (a *AtomicInt64) GetAndSet(newValue int64) int64
- func (a *AtomicInt64) IncrementAndGet() int64
- func (a *AtomicInt64) Set(newValue int64)
- func (a *AtomicInt64) String() string
- type ClientConn
- func (cc *ClientConn) AddPendingTimer(timerID int64)
- func (cc *ClientConn) CancelTimer(timerID int64)
- func (cc *ClientConn) Close()
- func (cc *ClientConn) ContextValue(k interface{}) interface{}
- func (cc *ClientConn) HeartBeat() int64
- func (cc *ClientConn) LocalAddr() net.Addr
- func (cc *ClientConn) Name() string
- func (cc *ClientConn) NetID() int64
- func (cc *ClientConn) RemoteAddr() net.Addr
- func (cc *ClientConn) RunAfter(duration time.Duration, callback func(time.Time, WriteCloser)) int64
- func (cc *ClientConn) RunAt(timestamp time.Time, callback func(time.Time, WriteCloser)) int64
- func (cc *ClientConn) RunEvery(interval time.Duration, callback func(time.Time, WriteCloser)) int64
- func (cc *ClientConn) SetContextValue(k, v interface{})
- func (cc *ClientConn) SetHeartBeat(heart int64)
- func (cc *ClientConn) SetName(name string)
- func (cc *ClientConn) Start()
- func (cc *ClientConn) Write(message Message) error
- func (cc *ClientConn) WriteData(data []byte) error
- type Codec
- type ErrUndefined
- type Handler
- type HandlerFunc
- type Hashable
- type HeartBeatMessage
- type Message
- type MessageHandler
- type OnTimeOut
- type Server
- func (s *Server) Broadcast(msg Message)
- func (s *Server) Conn(id int64) (*ServerConn, bool)
- func (s *Server) ConnsSize() int
- func (s *Server) Sched(dur time.Duration, sched func(time.Time, WriteCloser))
- func (s *Server) Start(l net.Listener) error
- func (s *Server) Stop()
- func (s *Server) Unicast(id int64, msg Message) error
- type ServerConn
- func (sc *ServerConn) AddPendingTimer(timerID int64)
- func (sc *ServerConn) CancelTimer(timerID int64)
- func (sc *ServerConn) Close()
- func (sc *ServerConn) ContextValue(k interface{}) interface{}
- func (sc *ServerConn) HeartBeat() int64
- func (sc *ServerConn) LocalAddr() net.Addr
- func (sc *ServerConn) Name() string
- func (sc *ServerConn) NetID() int64
- func (sc *ServerConn) RemoteAddr() net.Addr
- func (sc *ServerConn) RunAfter(duration time.Duration, callback func(time.Time, WriteCloser)) int64
- func (sc *ServerConn) RunAt(timestamp time.Time, callback func(time.Time, WriteCloser)) int64
- func (sc *ServerConn) RunEvery(interval time.Duration, callback func(time.Time, WriteCloser)) int64
- func (sc *ServerConn) SetContextValue(k, v interface{})
- func (sc *ServerConn) SetHeartBeat(heart int64)
- func (sc *ServerConn) SetName(name string)
- func (sc *ServerConn) Start()
- func (sc *ServerConn) Write(message Message) error
- type ServerOption
- func BufferSizeOption(indicator int) ServerOption
- func CustomCodecOption(codec Codec) ServerOption
- func OnCloseOption(cb func(WriteCloser)) ServerOption
- func OnConnectOption(cb func(WriteCloser) bool) ServerOption
- func OnErrorOption(cb func(WriteCloser)) ServerOption
- func OnMessageOption(cb func(Message, WriteCloser)) ServerOption
- func ReconnectOption() ServerOption
- func TLSCredsOption(config *tls.Config) ServerOption
- func WorkerSizeOption(workerSz int) ServerOption
- type TimingWheel
- type TypeLengthValueCodec
- type UnmarshalFunc
- type WorkerPool
- type WriteCloser
Constants ¶
const ( // MessageTypeBytes is the length of type header. MessageTypeBytes = 4 // MessageLenBytes is the length of length header. MessageLenBytes = 4 // MessageMaxBytes is the maximum bytes allowed for application data. MessageMaxBytes = 1 << 23 // 8M )
const ( MaxConnections = 1000 BufferSize128 = 128 BufferSize256 = 256 BufferSize512 = 512 BufferSize1024 = 1024 )
definitions about some constants.
const (
// HeartBeat is the default heart beat message number.
HeartBeat = 0
)
Variables ¶
var ( ErrParameter = errors.New("parameter error") ErrNilKey = errors.New("nil key") ErrNilValue = errors.New("nil value") ErrWouldBlock = errors.New("would block") ErrNotHashable = errors.New("not hashable") ErrNilData = errors.New("nil data") ErrBadData = errors.New("more than 8M data") ErrNotRegistered = errors.New("handler not registered") ErrServerClosed = errors.New("server has been closed") )
Error codes returned by failures dealing with server or connection.
Functions ¶
func HandleHeartBeat ¶
func HandleHeartBeat(ctx context.Context, c WriteCloser)
HandleHeartBeat updates connection heart beat timestamp.
func LoadTLSConfig ¶
LoadTLSConfig returns a TLS configuration with the specified cert and key file.
func NetIDFromContext ¶
NetIDFromContext returns a net ID from a Context.
func NewContextWithMessage ¶
NewContextWithMessage returns a new Context that carries message.
func NewContextWithNetID ¶
NewContextWithNetID returns a new Context that carries net ID.
func Register ¶
func Register(msgType int32, unmarshaler func([]byte) (Message, error), handler func(context.Context, WriteCloser))
Register registers the unmarshal and handle functions for msgType. If no unmarshal function provided, the message will not be parsed. If no handler function provided, the message will not be handled unless you set a default one by calling SetOnMessageCallback. If Register being called twice on one msgType, it will panics.
Types ¶
type AtomicBoolean ¶
type AtomicBoolean int32
AtomicBoolean provides atomic boolean type.
func NewAtomicBoolean ¶
func NewAtomicBoolean(initialValue bool) *AtomicBoolean
NewAtomicBoolean returns an atomic boolean type.
func (*AtomicBoolean) CompareAndSet ¶
func (a *AtomicBoolean) CompareAndSet(oldValue, newValue bool) bool
CompareAndSet compares boolean with expected value, if equals as expected then sets the updated value, this operation performs atomically.
func (*AtomicBoolean) Get ¶
func (a *AtomicBoolean) Get() bool
Get returns the value of boolean atomically.
func (*AtomicBoolean) GetAndSet ¶
func (a *AtomicBoolean) GetAndSet(newValue bool) bool
GetAndSet sets new value and returns the old atomically.
func (*AtomicBoolean) Set ¶
func (a *AtomicBoolean) Set(newValue bool)
Set sets the value of boolean atomically.
func (*AtomicBoolean) String ¶
func (a *AtomicBoolean) String() string
type AtomicInt32 ¶
type AtomicInt32 int32
AtomicInt32 provides atomic int32 type.
func NewAtomicInt32 ¶
func NewAtomicInt32(initialValue int32) *AtomicInt32
NewAtomicInt32 returns an atomoic int32 type.
func (*AtomicInt32) AddAndGet ¶
func (a *AtomicInt32) AddAndGet(delta int32) int32
AddAndGet adds the value by delta and then gets the value, this operation performs atomically.
func (*AtomicInt32) CompareAndSet ¶
func (a *AtomicInt32) CompareAndSet(expect, update int32) bool
CompareAndSet compares int64 with expected value, if equals as expected then sets the updated value, this operation performs atomically.
func (*AtomicInt32) DecrementAndGet ¶
func (a *AtomicInt32) DecrementAndGet() int32
DecrementAndGet decrements the value by 1 and then gets the value, this operation performs atomically.
func (*AtomicInt32) Get ¶
func (a *AtomicInt32) Get() int32
Get returns the value of int32 atomically.
func (*AtomicInt32) GetAndAdd ¶
func (a *AtomicInt32) GetAndAdd(delta int32) int32
GetAndAdd gets the old value and then add by delta, this operation performs atomically.
func (*AtomicInt32) GetAndDecrement ¶
func (a *AtomicInt32) GetAndDecrement() int32
GetAndDecrement gets the old value and then decrement by 1, this operation performs atomically.
func (*AtomicInt32) GetAndIncrement ¶
func (a *AtomicInt32) GetAndIncrement() int32
GetAndIncrement gets the old value and then increment by 1, this operation performs atomically.
func (*AtomicInt32) GetAndSet ¶
func (a *AtomicInt32) GetAndSet(newValue int32) (oldValue int32)
GetAndSet sets new value and returns the old atomically.
func (*AtomicInt32) IncrementAndGet ¶
func (a *AtomicInt32) IncrementAndGet() int32
IncrementAndGet increments the value by 1 and then gets the value, this operation performs atomically.
func (*AtomicInt32) Set ¶
func (a *AtomicInt32) Set(newValue int32)
Set sets the value of int32 atomically.
func (*AtomicInt32) String ¶
func (a *AtomicInt32) String() string
type AtomicInt64 ¶
type AtomicInt64 int64
AtomicInt64 provides atomic int64 type.
func NewAtomicInt64 ¶
func NewAtomicInt64(initialValue int64) *AtomicInt64
NewAtomicInt64 returns an atomic int64 type.
func (*AtomicInt64) AddAndGet ¶
func (a *AtomicInt64) AddAndGet(delta int64) int64
AddAndGet adds the value by delta and then gets the value, this operation performs atomically.
func (*AtomicInt64) CompareAndSet ¶
func (a *AtomicInt64) CompareAndSet(expect, update int64) bool
CompareAndSet compares int64 with expected value, if equals as expected then sets the updated value, this operation performs atomically.
func (*AtomicInt64) DecrementAndGet ¶
func (a *AtomicInt64) DecrementAndGet() int64
DecrementAndGet decrements the value by 1 and then gets the value, this operation performs atomically.
func (*AtomicInt64) Get ¶
func (a *AtomicInt64) Get() int64
Get returns the value of int64 atomically.
func (*AtomicInt64) GetAndAdd ¶
func (a *AtomicInt64) GetAndAdd(delta int64) int64
GetAndAdd gets the old value and then add by delta, this operation performs atomically.
func (*AtomicInt64) GetAndDecrement ¶
func (a *AtomicInt64) GetAndDecrement() int64
GetAndDecrement gets the old value and then decrement by 1, this operation performs atomically.
func (*AtomicInt64) GetAndIncrement ¶
func (a *AtomicInt64) GetAndIncrement() int64
GetAndIncrement gets the old value and then increment by 1, this operation performs atomically.
func (*AtomicInt64) GetAndSet ¶
func (a *AtomicInt64) GetAndSet(newValue int64) int64
GetAndSet sets new value and returns the old atomically.
func (*AtomicInt64) IncrementAndGet ¶
func (a *AtomicInt64) IncrementAndGet() int64
IncrementAndGet increments the value by 1 and then gets the value, this operation performs atomically.
func (*AtomicInt64) Set ¶
func (a *AtomicInt64) Set(newValue int64)
Set sets the value of int64 atomically.
func (*AtomicInt64) String ¶
func (a *AtomicInt64) String() string
type ClientConn ¶
type ClientConn struct {
// contains filtered or unexported fields
}
ClientConn represents a client connection to a TCP server.
func NewClientConn ¶
func NewClientConn(netid int64, c net.Conn, opt ...ServerOption) *ClientConn
NewClientConn returns a new client connection which has not started to serve requests yet.
func (*ClientConn) AddPendingTimer ¶
func (cc *ClientConn) AddPendingTimer(timerID int64)
AddPendingTimer adds a new timer ID to client connection.
func (*ClientConn) CancelTimer ¶
func (cc *ClientConn) CancelTimer(timerID int64)
CancelTimer cancels a timer with the specified ID.
func (*ClientConn) Close ¶
func (cc *ClientConn) Close()
Close gracefully closes the client connection. It blocked until all sub go-routines are completed and returned.
func (*ClientConn) ContextValue ¶
func (cc *ClientConn) ContextValue(k interface{}) interface{}
ContextValue gets extra data from client connection.
func (*ClientConn) HeartBeat ¶
func (cc *ClientConn) HeartBeat() int64
HeartBeat gets the heart beats of client connection.
func (*ClientConn) LocalAddr ¶
func (cc *ClientConn) LocalAddr() net.Addr
LocalAddr returns the local address of server connection.
func (*ClientConn) Name ¶
func (cc *ClientConn) Name() string
Name gets the name of client connection.
func (*ClientConn) NetID ¶
func (cc *ClientConn) NetID() int64
NetID returns the net ID of client connection.
func (*ClientConn) RemoteAddr ¶
func (cc *ClientConn) RemoteAddr() net.Addr
RemoteAddr returns the remote address of server connection.
func (*ClientConn) RunAfter ¶
func (cc *ClientConn) RunAfter(duration time.Duration, callback func(time.Time, WriteCloser)) int64
RunAfter runs a callback right after the specified duration ellapsed.
func (*ClientConn) RunAt ¶
func (cc *ClientConn) RunAt(timestamp time.Time, callback func(time.Time, WriteCloser)) int64
RunAt runs a callback at the specified timestamp.
func (*ClientConn) RunEvery ¶
func (cc *ClientConn) RunEvery(interval time.Duration, callback func(time.Time, WriteCloser)) int64
RunEvery runs a callback on every interval time.
func (*ClientConn) SetContextValue ¶
func (cc *ClientConn) SetContextValue(k, v interface{})
SetContextValue sets extra data to client connection.
func (*ClientConn) SetHeartBeat ¶
func (cc *ClientConn) SetHeartBeat(heart int64)
SetHeartBeat sets the heart beats of client connection.
func (*ClientConn) SetName ¶
func (cc *ClientConn) SetName(name string)
SetName sets the name of client connection.
func (*ClientConn) Start ¶
func (cc *ClientConn) Start()
Start starts the client connection, creating go-routines for reading, writing and handlng.
func (*ClientConn) Write ¶
func (cc *ClientConn) Write(message Message) error
Write writes a message to the client.
func (*ClientConn) WriteData ¶
func (cc *ClientConn) WriteData(data []byte) error
type Codec ¶
Codec is the interface for message coder and decoder. Application programmer can define a custom codec themselves.
type ErrUndefined ¶
type ErrUndefined int32
ErrUndefined for undefined message type.
func (ErrUndefined) Error ¶
func (e ErrUndefined) Error() string
type HandlerFunc ¶
type HandlerFunc func(context.Context, WriteCloser)
HandlerFunc serves as an adapter to allow the use of ordinary functions as handlers.
func GetHandlerFunc ¶
func GetHandlerFunc(msgType int32) HandlerFunc
GetHandlerFunc returns the corresponding handler function for msgType.
func (HandlerFunc) Handle ¶
func (f HandlerFunc) Handle(ctx context.Context, c WriteCloser)
Handle calls f(ctx, c)
type Hashable ¶
type Hashable interface {
HashCode() int32
}
Hashable is a interface for hashable object.
type HeartBeatMessage ¶
type HeartBeatMessage struct {
Timestamp int64
}
HeartBeatMessage for application-level keeping alive.
func (HeartBeatMessage) MessageNumber ¶
func (hbm HeartBeatMessage) MessageNumber() int32
MessageNumber returns message number.
func (HeartBeatMessage) Serialize ¶
func (hbm HeartBeatMessage) Serialize() ([]byte, error)
Serialize serializes HeartBeatMessage into bytes.
type Message ¶
Message represents the structured data that can be handled.
func DeserializeHeartBeat ¶
DeserializeHeartBeat deserializes bytes into Message.
func MessageFromContext ¶
MessageFromContext extracts a message from a Context.
type MessageHandler ¶
type MessageHandler struct {
// contains filtered or unexported fields
}
MessageHandler is a combination of message and its handler function.
type OnTimeOut ¶
type OnTimeOut struct { Callback func(time.Time, WriteCloser) Ctx context.Context }
OnTimeOut represents a timed task.
func NewOnTimeOut ¶
NewOnTimeOut returns OnTimeOut.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is a server to serve TCP requests.
func NewServer ¶
func NewServer(opt ...ServerOption) *Server
NewServer returns a new TCP server which has not started to serve requests yet.
func ServerFromContext ¶
ServerFromContext returns the server within the context.
func (*Server) Conn ¶
func (s *Server) Conn(id int64) (*ServerConn, bool)
Conn returns a server connection with specified ID.
func (*Server) Start ¶
Start starts the TCP server, accepting new clients and creating service go-routine for each. The service go-routines read messages and then call the registered handlers to handle them. Start returns when failed with fatal errors, the listener willl be closed when returned.
type ServerConn ¶
type ServerConn struct {
// contains filtered or unexported fields
}
ServerConn represents a server connection to a TCP server, it implments Conn.
func NewServerConn ¶
func NewServerConn(id int64, s *Server, c net.Conn) *ServerConn
NewServerConn returns a new server connection which has not started to serve requests yet.
func (*ServerConn) AddPendingTimer ¶
func (sc *ServerConn) AddPendingTimer(timerID int64)
AddPendingTimer adds a timer ID to server Connection.
func (*ServerConn) CancelTimer ¶
func (sc *ServerConn) CancelTimer(timerID int64)
CancelTimer cancels a timer with the specified ID.
func (*ServerConn) Close ¶
func (sc *ServerConn) Close()
Close gracefully closes the server connection. It blocked until all sub go-routines are completed and returned.
func (*ServerConn) ContextValue ¶
func (sc *ServerConn) ContextValue(k interface{}) interface{}
ContextValue gets extra data from server connection.
func (*ServerConn) HeartBeat ¶
func (sc *ServerConn) HeartBeat() int64
HeartBeat returns the heart beats of server connection.
func (*ServerConn) LocalAddr ¶
func (sc *ServerConn) LocalAddr() net.Addr
LocalAddr returns the local address of server connection.
func (*ServerConn) Name ¶
func (sc *ServerConn) Name() string
Name returns the name of server connection.
func (*ServerConn) NetID ¶
func (sc *ServerConn) NetID() int64
NetID returns net ID of server connection.
func (*ServerConn) RemoteAddr ¶
func (sc *ServerConn) RemoteAddr() net.Addr
RemoteAddr returns the remote address of server connection.
func (*ServerConn) RunAfter ¶
func (sc *ServerConn) RunAfter(duration time.Duration, callback func(time.Time, WriteCloser)) int64
RunAfter runs a callback right after the specified duration ellapsed.
func (*ServerConn) RunAt ¶
func (sc *ServerConn) RunAt(timestamp time.Time, callback func(time.Time, WriteCloser)) int64
RunAt runs a callback at the specified timestamp.
func (*ServerConn) RunEvery ¶
func (sc *ServerConn) RunEvery(interval time.Duration, callback func(time.Time, WriteCloser)) int64
RunEvery runs a callback on every interval time.
func (*ServerConn) SetContextValue ¶
func (sc *ServerConn) SetContextValue(k, v interface{})
SetContextValue sets extra data to server connection.
func (*ServerConn) SetHeartBeat ¶
func (sc *ServerConn) SetHeartBeat(heart int64)
SetHeartBeat sets the heart beats of server connection.
func (*ServerConn) SetName ¶
func (sc *ServerConn) SetName(name string)
SetName sets name of server connection.
func (*ServerConn) Start ¶
func (sc *ServerConn) Start()
Start starts the server connection, creating go-routines for reading, writing and handlng.
func (*ServerConn) Write ¶
func (sc *ServerConn) Write(message Message) error
Write writes a message to the client.
type ServerOption ¶
type ServerOption func(*options)
ServerOption sets server options.
func BufferSizeOption ¶
func BufferSizeOption(indicator int) ServerOption
BufferSizeOption returns a ServerOption that is the size of buffered channel, for example an indicator of BufferSize256 means a size of 256.
func CustomCodecOption ¶
func CustomCodecOption(codec Codec) ServerOption
CustomCodecOption returns a ServerOption that will apply a custom Codec.
func OnCloseOption ¶
func OnCloseOption(cb func(WriteCloser)) ServerOption
OnCloseOption returns a ServerOption that will set callback to call when client closed.
func OnConnectOption ¶
func OnConnectOption(cb func(WriteCloser) bool) ServerOption
OnConnectOption returns a ServerOption that will set callback to call when new client connected.
func OnErrorOption ¶
func OnErrorOption(cb func(WriteCloser)) ServerOption
OnErrorOption returns a ServerOption that will set callback to call when error occurs.
func OnMessageOption ¶
func OnMessageOption(cb func(Message, WriteCloser)) ServerOption
OnMessageOption returns a ServerOption that will set callback to call when new message arrived.
func ReconnectOption ¶
func ReconnectOption() ServerOption
ReconnectOption returns a ServerOption that will make ClientConn reconnectable.
func TLSCredsOption ¶
func TLSCredsOption(config *tls.Config) ServerOption
TLSCredsOption returns a ServerOption that will set TLS credentials for server connections.
func WorkerSizeOption ¶
func WorkerSizeOption(workerSz int) ServerOption
WorkerSizeOption returns a ServerOption that will set the number of go-routines in WorkerPool.
type TimingWheel ¶
type TimingWheel struct {
// contains filtered or unexported fields
}
TimingWheel manages all the timed task.
func NewTimingWheel ¶
func NewTimingWheel(ctx context.Context) *TimingWheel
NewTimingWheel returns a *TimingWheel ready for use.
func (*TimingWheel) CancelTimer ¶
func (tw *TimingWheel) CancelTimer(timerID int64)
CancelTimer cancels a timed task with specified timer ID.
func (*TimingWheel) Size ¶
func (tw *TimingWheel) Size() int
Size returns the number of timed tasks.
func (*TimingWheel) TimeOutChannel ¶
func (tw *TimingWheel) TimeOutChannel() chan *OnTimeOut
TimeOutChannel returns the timeout channel.
type TypeLengthValueCodec ¶
type TypeLengthValueCodec struct{}
TypeLengthValueCodec defines a special codec. Format: type-length-value |4 bytes|4 bytes|n bytes <= 8M|
type UnmarshalFunc ¶
UnmarshalFunc unmarshals bytes into Message.
func GetUnmarshalFunc ¶
func GetUnmarshalFunc(msgType int32) UnmarshalFunc
GetUnmarshalFunc returns the corresponding unmarshal function for msgType.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is a pool of go-routines running functions.
func WorkerPoolInstance ¶
func WorkerPoolInstance() *WorkerPool
WorkerPoolInstance returns the global pool.
func (*WorkerPool) Close ¶
func (wp *WorkerPool) Close()
Close closes the pool, stopping it from executing functions.
func (*WorkerPool) Put ¶
func (wp *WorkerPool) Put(k interface{}, cb func()) error
Put appends a function to some worker's channel.
type WriteCloser ¶
WriteCloser is the interface that groups Write and Close methods.