tao

package module
v0.0.0-...-4217d3b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2016 License: Apache-2.0 Imports: 21 Imported by: 0

README

Tao --- 轻量级TCP异步框架,Go语言实现

Light-weight TCP Asynchronous gOlang framework

Announcing Tao 1.4 - Release Notes

  1. bugfix:TLS重连失败问题;
    bugfix: failed to reconnect the TLS connection;
  2. bugfix:ConnectionMap死锁问题;
    bugfix: ConnectionMap dead-lock problem;
  3. 优化TCP网络连接的关闭过程;
    Optimize the closing process of TCP connection;
  4. 优化服务器的关闭过程;
    Optimize the closing process of server;
  5. 更优雅的消息处理注册接口;
    More elegant message handler register interface;

Announcing Tao 1.3 - Release Notes

  1. bugfix:修复断线重连状态不一致问题;
    bugfix: fixed inconsistent status caused by reconnecting;
  2. bugfix:修复ServerConnection和TimingWheel在连接关闭时并发访问导致崩溃问题;
    bugfix: fixed a corruption caused by concurrent accessing between ServerConnection and TimingWheel during connection closing;
  3. 无锁且线程安全的TimingWheel,优化CPU占用率;
    Lock-free and thread-safe TimingWheel, optimized occupancy rate;
  4. bugfix:修复TLS配置文件读取函数;
    bugfix: Fixed errors when loading TLS config;
  5. 添加消息相关的Context结构;简化消息注册机制,直接注册处理函数到HandlerMap;
    A message-related Context struct added; Register handler functions in HandlerMap directly to simplify message registration mechanism;
  6. 合并NewClientConnection()和NewTLSClientConnection(),提供一致的API;
    Combine NewTLSConnection() into NewClientConnection(), providing a consistent API;
  7. 工作者线程池改造成单例模式;
    Make WorkerPool a singleton pattern;
  8. 使用Holmes日志库代替glog;
    Using Holmes logging package instead of glog;
  9. 添加metrics.go:基于expvar标准包导出服务器关键信息;
    Add metrics.go: exporting critical server information based on expvar standard pacakge;
  10. 编写中文版框架设计原理文档,英文版正在翻译中;
    A document about framework designing principles in Chinese, English version under developed;

Announcing Tao 1.2 - Release Notes

  1. 更优雅的消息注册接口;
    More elegant message register interface;
  2. TCPConnection的断线重连机制;
    TCPConnection reconnecting upon closing;
  3. bugfix:协议未注册时不关闭客户端连接;
    bugfix: Don't close client when messages not registered;
  4. bugfix:在readLoop()协程中处理心跳时间戳更新;
    bugfix: Updating heart-beat timestamp in readLoop() go-routine;
  5. bugfix:Message接口使用Serialize()替代之前的MarshalBinary(),以免框架使用者使用gob.Encoder/Decoder的时候栈溢出;
    bugfix: Use Serialize() instead of MarshalBinary() in Message interface, preventing stack overflows when framework users use gob.Encoder/Decoder;
  6. bugfix:当应用层数据长度大于0时才对其进行序列化;
    bugfix: Serialize application data when its length greater than 0;
  7. 新API:SetCodec(),允许TCPConnection自定义编解码器;
    New API: SetCodec() allowing TCPConnection defines its own codec;
  8. 新API:SetDBInitializer(),允许框架使用者定义数据访问接口;
    New API: SetDBInitializer() allowing framework users define data access interface;
  9. 允许框架使用者在TCPConnection上设置自定义数据;
    Allowing framework users setting custom data on TCPConnection;
  10. 为新客户端连接的启动单独开辟一个对应的go协程;
    Allocating a corresponding go-routine for newly-connected clients respectively;
  11. bugfix:写事件循环在连接关闭时将信道中的数据全部发送出去;
    bugfix: writeLoop() flushes all packets left in channel when performing closing;
  12. bugfix:服务器和客户端连接等待所有go协程关闭后再退出;
    bugfix: Servers and client connections wait for the exits of all go-routines before shutting down;
  13. 重构Server和Connection,采用针对接口编程的设计;
    Refactoring Server and Connection, adopting a programming-by-interface design;
  14. 设置500毫秒读超时,防止readLoop()发生阻塞;
    Setting 500ms read-timeout prevents readLoop() from blocking;

Announcing Tao 1.1 - Release Notes

  1. 添加注释,提高代码可读性;
    Add comments, make it more readable;
  2. 限制服务器的最大并发连接数(默认1000);
    Server max connections limit (default to 1000);
  3. 新API:NewTLSTCPServer() 创建传输层安全的TCP服务器;
    New API: NewTLSTCPServer() for creating TLS-supported TCP server;
  4. 新特性:SetOnScheduleCallback() 由框架使用者来定义计划任务(比如心跳);
    New Feature: SetOnScheduleCallback() make scheduled task managed by framwork users(such as heart beat);
  5. 新特性:支持默认的消息编解码器TypeLengthValueCodec,并允许框架使用者开发自定义编解码器;
    Support TypeLengthValueCodec by default, while allowing framework users develop their own codecs;

Announcing Tao 1.0 - Release Notes

  1. 完全异步的读,写以及消息处理;
    Completely asynchronous reading, writing and message handling;
  2. 工作者协程池;
    Worker go-routine pool;
  3. 并发数据结构和原子数据类型;
    Concurrent data structure and atomic data types;
  4. 毫秒精度的定时器功能;
    Millisecond-precision timer function;
  5. 传输层安全支持;
    Transport layer security support;
  6. 应用层心跳协议;
    Application-level heart-beating protocol;
Documentation
  1. Tao - Go语言实现的TCP网络编程框架
  2. English(TBD)
Chat Server Example
package main

import (
  "fmt"
  "runtime"
  "github.com/leesper/tao"
  "github.com/leesper/tao/examples/chat"
  "github.com/reechou/holmes"
)

type ChatServer struct {
  tao.Server
}

func NewChatServer(addr string) *ChatServer {
  return &ChatServer {
    tao.NewTCPServer(addr),
  }
}

func main() {
  runtime.GOMAXPROCS(runtime.NumCPU())
  defer holmes.Start().Stop()
  tao.MonitorOn(12345)

  tao.Register(chat.CHAT_MESSAGE, chat.DeserializeChatMessage, chat.ProcessChatMessage)

  chatServer := NewChatServer(fmt.Sprintf("%s:%d", "0.0.0.0", 18341))

  chatServer.SetOnConnectCallback(func(conn tao.Connection) bool {
    holmes.Info("%s", "On connect")
    return true
  })

  chatServer.SetOnErrorCallback(func() {
    holmes.Info("%s", "On error")
  })

  chatServer.SetOnCloseCallback(func(conn tao.Connection) {
    holmes.Info("%s", "Closing chat client")
  })

  chatServer.Start()
}
TODO list:
  1. Add more use-case examples;
  2. Add logger support;

Documentation

Overview

Worker pool is a pool of go-routines running for executing callbacks,

each client's message handler is permanently hashed into one specified worker to execute, so it is in-order for each client's perspective.

Index

Constants

View Source
const (
	NTYPE  = 4
	NLEN   = 4
	MAXLEN = 1 << 23 // 8M
)
View Source
const (
	WORKERS         = 20
	MAX_CONNECTIONS = 5000
)
View Source
const (
	HEART_BEAT = 0
)

0 is the preserved heart beat message number, you can define your own.

View Source
const INITIAL_SHARD_SIZE = 16

Variables

View Source
var (
	ErrorParameter      error = errors.New("Parameter error")
	ErrorNilKey         error = errors.New("Nil key")
	ErrorNilValue       error = errors.New("Nil value")
	ErrorWouldBlock     error = errors.New("Would block")
	ErrorNotHashable    error = errors.New("Not hashable")
	ErrorNilData        error = errors.New("Nil data")
	ErrorIllegalData    error = errors.New("More than 8M data")
	ErrorNotImplemented error = errors.New("Not implemented")
	ErrorConnClosed     error = errors.New("Connection closed")
)

Functions

func GetHandler

func GetHandler(msgType int32) handlerFunc

GetHandler returns the corresponding handler function for msgType.

func GetUnmarshaler

func GetUnmarshaler(msgType int32) unmarshalFunc

GetUnmarshaler returns the corresponding unmarshal function for msgType.

func LoadTLSConfig

func LoadTLSConfig(certFile, keyFile string, isSkipVerify bool) (tls.Config, error)

func MonitorOn

func MonitorOn(port int)

func ProcessHeartBeatMessage

func ProcessHeartBeatMessage(ctx Context, conn Connection)

func Register

func Register(msgType int32, unmarshaler func([]byte) (Message, error), handler func(Context, Connection))

Register registers the unmarshal and handle functions for msgType. If no unmarshal function provided, the message will not be parsed. If no handler function provided, the message will not be handled unless you set a default one by calling SetOnMessageCallback. If Register being called twice on one msgType, it will panics.

func Undefined

func Undefined(msgType int32) error

Types

type AtomicBoolean

type AtomicBoolean int32

func NewAtomicBoolean

func NewAtomicBoolean(initialValue bool) *AtomicBoolean

func (*AtomicBoolean) CompareAndSet

func (a *AtomicBoolean) CompareAndSet(oldValue, newValue bool) bool

func (*AtomicBoolean) Get

func (a *AtomicBoolean) Get() bool

func (*AtomicBoolean) GetAndSet

func (a *AtomicBoolean) GetAndSet(newValue bool) bool

func (*AtomicBoolean) Set

func (a *AtomicBoolean) Set(newValue bool)

func (*AtomicBoolean) String

func (a *AtomicBoolean) String() string

type AtomicInt32

type AtomicInt32 int32

func NewAtomicInt32

func NewAtomicInt32(initialValue int32) *AtomicInt32

func (*AtomicInt32) AddAndGet

func (a *AtomicInt32) AddAndGet(delta int32) int32

func (*AtomicInt32) CompareAndSet

func (a *AtomicInt32) CompareAndSet(expect, update int32) bool

func (*AtomicInt32) DecrementAndGet

func (a *AtomicInt32) DecrementAndGet() int32

func (*AtomicInt32) Get

func (a *AtomicInt32) Get() int32

func (*AtomicInt32) GetAndAdd

func (a *AtomicInt32) GetAndAdd(delta int32) int32

func (*AtomicInt32) GetAndDecrement

func (a *AtomicInt32) GetAndDecrement() int32

func (*AtomicInt32) GetAndIncrement

func (a *AtomicInt32) GetAndIncrement() int32

func (*AtomicInt32) GetAndSet

func (a *AtomicInt32) GetAndSet(newValue int32) (oldValue int32)

func (*AtomicInt32) IncrementAndGet

func (a *AtomicInt32) IncrementAndGet() int32

func (*AtomicInt32) Set

func (a *AtomicInt32) Set(newValue int32)

func (*AtomicInt32) String

func (a *AtomicInt32) String() string

type AtomicInt64

type AtomicInt64 int64

func NewAtomicInt64

func NewAtomicInt64(initialValue int64) *AtomicInt64

func (*AtomicInt64) AddAndGet

func (a *AtomicInt64) AddAndGet(delta int64) int64

func (*AtomicInt64) CompareAndSet

func (a *AtomicInt64) CompareAndSet(expect, update int64) bool

func (*AtomicInt64) DecrementAndGet

func (a *AtomicInt64) DecrementAndGet() int64

func (*AtomicInt64) Get

func (a *AtomicInt64) Get() int64

func (*AtomicInt64) GetAndAdd

func (a *AtomicInt64) GetAndAdd(delta int64) int64

func (*AtomicInt64) GetAndDecrement

func (a *AtomicInt64) GetAndDecrement() int64

func (*AtomicInt64) GetAndIncrement

func (a *AtomicInt64) GetAndIncrement() int64

func (*AtomicInt64) GetAndSet

func (a *AtomicInt64) GetAndSet(newValue int64) int64

func (*AtomicInt64) IncrementAndGet

func (a *AtomicInt64) IncrementAndGet() int64

func (*AtomicInt64) Set

func (a *AtomicInt64) Set(newValue int64)

func (*AtomicInt64) String

func (a *AtomicInt64) String() string

type ClientConnection

type ClientConnection struct {
	// contains filtered or unexported fields
}

implements Connection

func (*ClientConnection) CancelTimer

func (client *ClientConnection) CancelTimer(timerId int64)

func (*ClientConnection) Close

func (client *ClientConnection) Close()

func (*ClientConnection) GetCloseChannel

func (client *ClientConnection) GetCloseChannel() chan struct{}

func (*ClientConnection) GetExtraData

func (client *ClientConnection) GetExtraData() interface{}

func (*ClientConnection) GetHeartBeat

func (client *ClientConnection) GetHeartBeat() int64

func (*ClientConnection) GetMessageCodec

func (client *ClientConnection) GetMessageCodec() Codec

func (*ClientConnection) GetMessageHandlerChannel

func (client *ClientConnection) GetMessageHandlerChannel() chan MessageHandler

func (*ClientConnection) GetMessageSendChannel

func (client *ClientConnection) GetMessageSendChannel() chan []byte

func (*ClientConnection) GetName

func (client *ClientConnection) GetName() string

func (*ClientConnection) GetNetId

func (client *ClientConnection) GetNetId() int64

func (*ClientConnection) GetOnCloseCallback

func (client *ClientConnection) GetOnCloseCallback() onCloseFunc

func (*ClientConnection) GetOnConnectCallback

func (client *ClientConnection) GetOnConnectCallback() onConnectFunc

func (*ClientConnection) GetOnErrorCallback

func (client *ClientConnection) GetOnErrorCallback() onErrorFunc

func (*ClientConnection) GetOnMessageCallback

func (client *ClientConnection) GetOnMessageCallback() onMessageFunc

func (*ClientConnection) GetPendingTimers

func (client *ClientConnection) GetPendingTimers() []int64

func (*ClientConnection) GetRawConn

func (client *ClientConnection) GetRawConn() net.Conn

func (*ClientConnection) GetRemoteAddress

func (client *ClientConnection) GetRemoteAddress() net.Addr

func (*ClientConnection) GetTimeOutChannel

func (client *ClientConnection) GetTimeOutChannel() chan *OnTimeOut

func (*ClientConnection) GetTimingWheel

func (client *ClientConnection) GetTimingWheel() *TimingWheel

func (*ClientConnection) IsRunning

func (client *ClientConnection) IsRunning() bool

func (*ClientConnection) RunAfter

func (client *ClientConnection) RunAfter(duration time.Duration, callback func(time.Time, interface{})) int64

func (*ClientConnection) RunAt

func (client *ClientConnection) RunAt(timestamp time.Time, callback func(time.Time, interface{})) int64

func (*ClientConnection) RunEvery

func (client *ClientConnection) RunEvery(interval time.Duration, callback func(time.Time, interface{})) int64

func (*ClientConnection) SetExtraData

func (client *ClientConnection) SetExtraData(extra interface{})

func (*ClientConnection) SetHeartBeat

func (client *ClientConnection) SetHeartBeat(beat int64)

func (*ClientConnection) SetMessageCodec

func (client *ClientConnection) SetMessageCodec(codec Codec)

func (*ClientConnection) SetName

func (client *ClientConnection) SetName(name string)

func (*ClientConnection) SetNetId

func (client *ClientConnection) SetNetId(netid int64)

func (*ClientConnection) SetOnCloseCallback

func (client *ClientConnection) SetOnCloseCallback(callback func(Connection))

func (*ClientConnection) SetOnConnectCallback

func (client *ClientConnection) SetOnConnectCallback(callback func(Connection) bool)

func (*ClientConnection) SetOnErrorCallback

func (client *ClientConnection) SetOnErrorCallback(callback func())

func (*ClientConnection) SetOnMessageCallback

func (client *ClientConnection) SetOnMessageCallback(callback func(Message, Connection))

func (*ClientConnection) SetPendingTimers

func (client *ClientConnection) SetPendingTimers(pending []int64)

func (*ClientConnection) Start

func (client *ClientConnection) Start()

func (*ClientConnection) Write

func (client *ClientConnection) Write(message Message) error

type Codec

type Codec interface {
	Decode(Connection) (Message, error)
	Encode(Message) ([]byte, error)
}

Codec is the interface for message coder and decoder. Application programmer can define a custom codec themselves.

type ConcurrentMap

type ConcurrentMap struct {
	// contains filtered or unexported fields
}

func NewConcurrentMap

func NewConcurrentMap() *ConcurrentMap

func (*ConcurrentMap) Clear

func (cm *ConcurrentMap) Clear()

func (*ConcurrentMap) ContainsKey

func (cm *ConcurrentMap) ContainsKey(k interface{}) bool

func (*ConcurrentMap) Get

func (cm *ConcurrentMap) Get(k interface{}) (interface{}, bool)

func (*ConcurrentMap) IsEmpty

func (cm *ConcurrentMap) IsEmpty() bool

func (*ConcurrentMap) IterItems

func (cm *ConcurrentMap) IterItems() <-chan Item

func (*ConcurrentMap) IterKeys

func (cm *ConcurrentMap) IterKeys() <-chan interface{}

func (*ConcurrentMap) IterValues

func (cm *ConcurrentMap) IterValues() <-chan interface{}

func (*ConcurrentMap) Put

func (cm *ConcurrentMap) Put(k, v interface{}) error

func (*ConcurrentMap) PutIfAbsent

func (cm *ConcurrentMap) PutIfAbsent(k, v interface{}) error

func (*ConcurrentMap) Remove

func (cm *ConcurrentMap) Remove(k interface{}) error

func (*ConcurrentMap) Size

func (cm *ConcurrentMap) Size() int

type Connection

type Connection interface {
	SetNetId(netid int64)
	GetNetId() int64

	SetName(name string)
	GetName() string

	SetHeartBeat(beat int64)
	GetHeartBeat() int64

	SetExtraData(extra interface{})
	GetExtraData() interface{}

	SetMessageCodec(codec Codec)
	GetMessageCodec() Codec

	SetPendingTimers(pending []int64)
	GetPendingTimers() []int64

	SetOnConnectCallback(callback func(Connection) bool)
	GetOnConnectCallback() onConnectFunc
	SetOnMessageCallback(callback func(Message, Connection))
	GetOnMessageCallback() onMessageFunc
	SetOnErrorCallback(callback func())
	GetOnErrorCallback() onErrorFunc
	SetOnCloseCallback(callback func(Connection))
	GetOnCloseCallback() onCloseFunc

	Start()
	Close()
	IsRunning() bool
	Write(message Message) error

	RunAt(t time.Time, cb func(time.Time, interface{})) int64
	RunAfter(d time.Duration, cb func(time.Time, interface{})) int64
	RunEvery(i time.Duration, cb func(time.Time, interface{})) int64
	GetTimingWheel() *TimingWheel

	CancelTimer(timerId int64)

	GetRawConn() net.Conn
	GetMessageSendChannel() chan []byte
	GetMessageHandlerChannel() chan MessageHandler
	GetCloseChannel() chan struct{}
	GetTimeOutChannel() chan *OnTimeOut

	GetRemoteAddress() net.Addr
}

func NewClientConnection

func NewClientConnection(netid int64, reconnectable bool, c net.Conn, tconf *tls.Config) Connection

func NewServerConnection

func NewServerConnection(netid int64, server *TCPServer, conn net.Conn) Connection

type ConnectionMap

type ConnectionMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewConnectionMap

func NewConnectionMap() *ConnectionMap

func (*ConnectionMap) Clear

func (cm *ConnectionMap) Clear()

func (*ConnectionMap) Get

func (cm *ConnectionMap) Get(k int64) (Connection, bool)

func (*ConnectionMap) IsEmpty

func (cm *ConnectionMap) IsEmpty() bool

func (*ConnectionMap) Put

func (cm *ConnectionMap) Put(k int64, v Connection)

func (*ConnectionMap) Remove

func (cm *ConnectionMap) Remove(k int64)

func (*ConnectionMap) Size

func (cm *ConnectionMap) Size() int

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context is the context info for every handler function. Handler function handles the business logic about message. We can find the client connection who sent this message by netid and send back responses.

func NewContext

func NewContext(msg Message, id int64) Context

func (Context) Id

func (ctx Context) Id() int64

func (Context) Message

func (ctx Context) Message() Message

type ErrorUndefined

type ErrorUndefined struct {
	// contains filtered or unexported fields
}

func (ErrorUndefined) Error

func (eu ErrorUndefined) Error() string

type Hashable

type Hashable interface {
	HashCode() int32
}

type HeartBeatMessage

type HeartBeatMessage struct {
	Timestamp int64
}

HeartBeatMessage for long-term connection keeping alive.

func (HeartBeatMessage) MessageNumber

func (hbm HeartBeatMessage) MessageNumber() int32

func (HeartBeatMessage) Serialize

func (hbm HeartBeatMessage) Serialize() ([]byte, error)

type Item

type Item struct {
	Key, Value interface{}
}

type Message

type Message interface {
	MessageNumber() int32
	Serialize() ([]byte, error)
}

Message represents the structured data that can be handled.

func DeserializeHeartBeatMessage

func DeserializeHeartBeatMessage(data []byte) (message Message, err error)

type MessageHandler

type MessageHandler struct {
	// contains filtered or unexported fields
}

type OnTimeOut

type OnTimeOut struct {
	Callback  func(time.Time, interface{})
	ExtraData interface{}
}

func NewOnTimeOut

func NewOnTimeOut(extra interface{}, cb func(time.Time, interface{})) *OnTimeOut

type Server

type Server interface {
	IsRunning() bool
	GetConnections() []Connection
	GetConnectionMap() *ConnectionMap
	GetTimingWheel() *TimingWheel
	GetServerAddress() string
	Start()
	Close()

	SetOnScheduleCallback(time.Duration, func(time.Time, interface{}))
	GetOnScheduleCallback() (time.Duration, onScheduleFunc)
	SetOnConnectCallback(func(Connection) bool)
	GetOnConnectCallback() onConnectFunc
	SetOnMessageCallback(func(Message, Connection))
	GetOnMessageCallback() onMessageFunc
	SetOnCloseCallback(func(Connection))
	GetOnCloseCallback() onCloseFunc
	SetOnErrorCallback(func())
	GetOnErrorCallback() onErrorFunc
}

func NewTCPServer

func NewTCPServer(addr string) Server

func NewTLSTCPServer

func NewTLSTCPServer(addr, cert, key string) Server

type ServerConnection

type ServerConnection struct {
	// contains filtered or unexported fields
}

func (*ServerConnection) CancelTimer

func (conn *ServerConnection) CancelTimer(timerId int64)

func (*ServerConnection) Close

func (conn *ServerConnection) Close()

func (*ServerConnection) GetCloseChannel

func (conn *ServerConnection) GetCloseChannel() chan struct{}

func (*ServerConnection) GetExtraData

func (conn *ServerConnection) GetExtraData() interface{}

func (*ServerConnection) GetHeartBeat

func (conn *ServerConnection) GetHeartBeat() int64

func (*ServerConnection) GetMessageCodec

func (conn *ServerConnection) GetMessageCodec() Codec

func (*ServerConnection) GetMessageHandlerChannel

func (conn *ServerConnection) GetMessageHandlerChannel() chan MessageHandler

func (*ServerConnection) GetMessageSendChannel

func (conn *ServerConnection) GetMessageSendChannel() chan []byte

func (*ServerConnection) GetName

func (conn *ServerConnection) GetName() string

func (*ServerConnection) GetNetId

func (conn *ServerConnection) GetNetId() int64

func (*ServerConnection) GetOnCloseCallback

func (conn *ServerConnection) GetOnCloseCallback() onCloseFunc

func (*ServerConnection) GetOnConnectCallback

func (conn *ServerConnection) GetOnConnectCallback() onConnectFunc

func (*ServerConnection) GetOnErrorCallback

func (conn *ServerConnection) GetOnErrorCallback() onErrorFunc

func (*ServerConnection) GetOnMessageCallback

func (conn *ServerConnection) GetOnMessageCallback() onMessageFunc

func (*ServerConnection) GetOwner

func (conn *ServerConnection) GetOwner() *TCPServer

func (*ServerConnection) GetPendingTimers

func (conn *ServerConnection) GetPendingTimers() []int64

func (*ServerConnection) GetRawConn

func (conn *ServerConnection) GetRawConn() net.Conn

func (*ServerConnection) GetRemoteAddress

func (conn *ServerConnection) GetRemoteAddress() net.Addr

func (*ServerConnection) GetTimeOutChannel

func (conn *ServerConnection) GetTimeOutChannel() chan *OnTimeOut

func (*ServerConnection) GetTimingWheel

func (conn *ServerConnection) GetTimingWheel() *TimingWheel

func (*ServerConnection) IsRunning

func (conn *ServerConnection) IsRunning() bool

func (*ServerConnection) RunAfter

func (conn *ServerConnection) RunAfter(duration time.Duration, callback func(time.Time, interface{})) int64

func (*ServerConnection) RunAt

func (conn *ServerConnection) RunAt(timestamp time.Time, callback func(time.Time, interface{})) int64

func (*ServerConnection) RunEvery

func (conn *ServerConnection) RunEvery(interval time.Duration, callback func(time.Time, interface{})) int64

func (*ServerConnection) SetExtraData

func (conn *ServerConnection) SetExtraData(extra interface{})

func (*ServerConnection) SetHeartBeat

func (conn *ServerConnection) SetHeartBeat(beat int64)

func (*ServerConnection) SetMessageCodec

func (conn *ServerConnection) SetMessageCodec(codec Codec)

func (*ServerConnection) SetName

func (conn *ServerConnection) SetName(name string)

func (*ServerConnection) SetNetId

func (conn *ServerConnection) SetNetId(netid int64)

func (*ServerConnection) SetOnCloseCallback

func (conn *ServerConnection) SetOnCloseCallback(callback func(Connection))

func (*ServerConnection) SetOnConnectCallback

func (conn *ServerConnection) SetOnConnectCallback(callback func(Connection) bool)

func (*ServerConnection) SetOnErrorCallback

func (conn *ServerConnection) SetOnErrorCallback(callback func())

func (*ServerConnection) SetOnMessageCallback

func (conn *ServerConnection) SetOnMessageCallback(callback func(Message, Connection))

func (*ServerConnection) SetPendingTimers

func (conn *ServerConnection) SetPendingTimers(pending []int64)

func (*ServerConnection) Start

func (conn *ServerConnection) Start()

func (*ServerConnection) Write

func (conn *ServerConnection) Write(message Message) error

type TCPServer

type TCPServer struct {
	// contains filtered or unexported fields
}

func (*TCPServer) Close

func (server *TCPServer) Close()

wait until all connections closed

func (*TCPServer) GetConnectionMap

func (server *TCPServer) GetConnectionMap() *ConnectionMap

func (*TCPServer) GetConnections

func (server *TCPServer) GetConnections() []Connection

func (*TCPServer) GetOnCloseCallback

func (server *TCPServer) GetOnCloseCallback() onCloseFunc

func (*TCPServer) GetOnConnectCallback

func (server *TCPServer) GetOnConnectCallback() onConnectFunc

func (*TCPServer) GetOnErrorCallback

func (server *TCPServer) GetOnErrorCallback() onErrorFunc

func (*TCPServer) GetOnMessageCallback

func (server *TCPServer) GetOnMessageCallback() onMessageFunc

func (*TCPServer) GetOnScheduleCallback

func (server *TCPServer) GetOnScheduleCallback() (time.Duration, onScheduleFunc)

func (*TCPServer) GetServerAddress

func (server *TCPServer) GetServerAddress() string

func (*TCPServer) GetTimeOutChannel

func (server *TCPServer) GetTimeOutChannel() chan *OnTimeOut

func (*TCPServer) GetTimingWheel

func (server *TCPServer) GetTimingWheel() *TimingWheel

func (*TCPServer) IsRunning

func (server *TCPServer) IsRunning() bool

func (*TCPServer) SetOnCloseCallback

func (server *TCPServer) SetOnCloseCallback(callback func(Connection))

func (*TCPServer) SetOnConnectCallback

func (server *TCPServer) SetOnConnectCallback(callback func(Connection) bool)

func (*TCPServer) SetOnErrorCallback

func (server *TCPServer) SetOnErrorCallback(callback func())

func (*TCPServer) SetOnMessageCallback

func (server *TCPServer) SetOnMessageCallback(callback func(Message, Connection))

func (*TCPServer) SetOnScheduleCallback

func (server *TCPServer) SetOnScheduleCallback(duration time.Duration, callback func(time.Time, interface{}))

func (*TCPServer) Start

func (server *TCPServer) Start()

type TLSTCPServer

type TLSTCPServer struct {
	*TCPServer
	// contains filtered or unexported fields
}

func (*TLSTCPServer) Close

func (server *TLSTCPServer) Close()

func (*TLSTCPServer) GetConnectionMap

func (server *TLSTCPServer) GetConnectionMap() *ConnectionMap

func (*TLSTCPServer) GetConnections

func (server *TLSTCPServer) GetConnections() []Connection

func (*TLSTCPServer) GetOnCloseCallback

func (server *TLSTCPServer) GetOnCloseCallback() onCloseFunc

func (*TLSTCPServer) GetOnConnectCallback

func (server *TLSTCPServer) GetOnConnectCallback() onConnectFunc

func (*TLSTCPServer) GetOnErrorCallback

func (server *TLSTCPServer) GetOnErrorCallback() onErrorFunc

func (*TLSTCPServer) GetOnMessageCallback

func (server *TLSTCPServer) GetOnMessageCallback() onMessageFunc

func (*TLSTCPServer) GetOnScheduleCallback

func (server *TLSTCPServer) GetOnScheduleCallback() (time.Duration, onScheduleFunc)

func (*TLSTCPServer) GetServerAddress

func (server *TLSTCPServer) GetServerAddress() string

func (*TLSTCPServer) GetTimingWheel

func (server *TLSTCPServer) GetTimingWheel() *TimingWheel

func (*TLSTCPServer) IsRunning

func (server *TLSTCPServer) IsRunning() bool

func (*TLSTCPServer) SetOnCloseCallback

func (server *TLSTCPServer) SetOnCloseCallback(callback func(Connection))

func (*TLSTCPServer) SetOnConnectCallback

func (server *TLSTCPServer) SetOnConnectCallback(callback func(Connection) bool)

func (*TLSTCPServer) SetOnErrorCallback

func (server *TLSTCPServer) SetOnErrorCallback(callback func())

func (*TLSTCPServer) SetOnMessageCallback

func (server *TLSTCPServer) SetOnMessageCallback(callback func(Message, Connection))

func (*TLSTCPServer) SetOnScheduleCallback

func (server *TLSTCPServer) SetOnScheduleCallback(duration time.Duration, callback func(time.Time, interface{}))

func (*TLSTCPServer) Start

func (server *TLSTCPServer) Start()

type TimingWheel

type TimingWheel struct {
	// contains filtered or unexported fields
}

func NewTimingWheel

func NewTimingWheel() *TimingWheel

func (*TimingWheel) AddTimer

func (tw *TimingWheel) AddTimer(when time.Time, interv time.Duration, to *OnTimeOut) int64

func (*TimingWheel) CancelTimer

func (tw *TimingWheel) CancelTimer(timerId int64)

func (*TimingWheel) GetTimeOutChannel

func (tw *TimingWheel) GetTimeOutChannel() chan *OnTimeOut

func (*TimingWheel) Size

func (tw *TimingWheel) Size() int

func (*TimingWheel) Stop

func (tw *TimingWheel) Stop()

type TypeLengthValueCodec

type TypeLengthValueCodec struct{}

Format: type-length-value |4 bytes|4 bytes|n bytes <= 8M|

func (TypeLengthValueCodec) Decode

func (codec TypeLengthValueCodec) Decode(c Connection) (Message, error)

Decode decodes the bytes data into Message

func (TypeLengthValueCodec) Encode

func (codec TypeLengthValueCodec) Encode(msg Message) ([]byte, error)

Encode encodes the message into bytes data.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

<<singleton>>

func WorkerPoolInstance

func WorkerPoolInstance() *WorkerPool

func (*WorkerPool) Close

func (wp *WorkerPool) Close()

func (*WorkerPool) Put

func (wp *WorkerPool) Put(k interface{}, cb func()) error

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL