tp

package module
v3.3.3+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2018 License: Apache-2.0 Imports: 33 Imported by: 0

README

Teleport GitHub release report card github issues github closed issues GoDoc view examples view Go网络编程群

Teleport is a versatile, high-performance and flexible socket framework.

It can be used for peer-peer, rpc, gateway, micro services, push services, game services and so on.

简体中文

Teleport-Framework

Benchmark

Test Case

  • A server and a client process, running on the same machine
  • CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
  • Memory: 16G
  • OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
  • Go: 1.9.2
  • Message size: 581 bytes
  • Message codec: protobuf
  • Sent total 1000000 messages

Test Results

  • teleport
client concurrency mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 1 0 16 0 75505
500 9 11 97 0 52192
1000 19 24 187 0 50040
2000 39 54 409 0 42551
5000 96 128 1148 0 46367
  • teleport/socket
client concurrency mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 0 0 14 0 225682
500 2 1 24 0 212630
1000 4 3 51 0 180733
2000 8 6 64 0 183351
5000 21 18 651 0 133886

test code

  • Profile torch of teleport/socket

tp_socket_profile_torch

svg file

  • Heap torch of teleport/socket

tp_socket_heap_torch

svg file

1. Version

version status branch
v3 release v3
v2 release v2
v1 release v1

2. Install

go get -u github.com/henrylee2cn/teleport

3. Feature

  • Server and client are peer-to-peer, have the same API method
  • Support custom communication protocol
  • Support set the size of socket I/O buffer
  • Packet contains both Header and Body two parts
  • Support for customizing head and body coding types separately, e.g JSON Protobuf string
  • Packet Header contains metadata in the same format as http header
  • Support push, pull, reply and other means of communication
  • Support plug-in mechanism, can customize authentication, heartbeat, micro service registration center, statistics, etc.
  • Whether server or client, the peer support reboot and shutdown gracefully
  • Support reverse proxy
  • Detailed log information, support print input and output details
  • Supports setting slow operation alarm threshold
  • Use I/O multiplexing technology
  • Support setting the size of the reading packet (if exceed disconnect it)
  • Provide the context of the handler
  • Client session support automatically redials after disconnection
  • Support network list: tcp, tcp4, tcp6, unix, unixpacket and so on

4. Design

4.1 Keywords
  • Peer: A communication instance may be a server or a client
  • Socket: Base on the net.Conn package, add custom package protocol, transfer pipelines and other functions
  • Packet: The corresponding structure of the data package content element
  • Proto: The protocol interface of packet pack/unpack
  • Codec: Serialization interface for Packet.Body
  • XferPipe: Packet bytes encoding pipeline, such as compression, encryption, calibration and so on
  • XferFilter: A interface to handle packet data before transfer
  • Plugin: Plugins that cover all aspects of communication
  • Session: A connection session, with push, pull, reply, close and other methods of operation
  • Context: Handle the received or send packets
  • Pull-Launch: Pull data from the peer
  • Pull-Handle: Handle and reply to the pull of peer
  • Push-Launch: Push data to the peer
  • Push-Handle: Handle the push of peer
  • Router: Router that route the response handler by request information(such as a URI)
4.2 Packet

The contents of every one packet:

// in socket package
type (
    // Packet a socket data packet.
    Packet struct {
        // packet sequence
        seq uint64
        // packet type, such as PULL, PUSH, REPLY
        ptype byte
        // URL string
        uri string
        // URL object
        url *url.URL
        // metadata
        meta *utils.Args
        // body codec type
        bodyCodec byte
        // body object
        body interface{}
        // newBodyFunc creates a new body by packet type and URI.
        // Note:
        //  only for writing packet;
        //  should be nil when reading packet.
        newBodyFunc NewBodyFunc
        // XferPipe transfer filter pipe, handlers from outer-most to inner-most.
        // Note: the length can not be bigger than 255!
        xferPipe *xfer.XferPipe
        // packet size
        size uint32
        // ctx is the packet handling context,
        // carries a deadline, a cancelation signal,
        // and other values across API boundaries.
        ctx context.Context
        // stack
        next *Packet
    }

    // NewBodyFunc creates a new body by header info.
    NewBodyFunc func(seq uint64, ptype byte, uri string) interface{}
)
4.3 Codec

The body's codec set.

type Codec interface {
    // Id returns codec id.
    Id() byte
    // Name returns codec name.
    Name() string
    // Marshal returns the encoding of v.
    Marshal(v interface{}) ([]byte, error)
    // Unmarshal parses the encoded data and stores the result
    // in the value pointed to by v.
    Unmarshal(data []byte, v interface{}) error
}
4.4 XferPipe

Transfer filter pipe, handles byte stream of packet when transfer.

type (
    // XferPipe transfer filter pipe, handlers from outer-most to inner-most.
    // Note: the length can not be bigger than 255!
    XferPipe struct {
        filters []XferFilter
    }
    // XferFilter handles byte stream of packet when transfer.
    XferFilter interface {
        Id() byte
        OnPack([]byte) ([]byte, error)
        OnUnpack([]byte) ([]byte, error)
    }
)
4.5 Plugin

Plug-ins during runtime.

type (
    // Plugin plugin background
    Plugin interface {
        Name() string
    }
    // PreNewPeerPlugin is executed before creating peer.
    PreNewPeerPlugin interface {
        Plugin
        PreNewPeer(*PeerConfig, *PluginContainer) error
    }
    ...
)
4.6 Protocol

You can customize your own communication protocol by implementing the interface:

type (
    // Proto pack/unpack protocol scheme of socket packet.
    Proto interface {
        // Version returns the protocol's id and name.
        Version() (byte, string)
        // Pack writes the Packet into the connection.
        // Note: Make sure to write only once or there will be package contamination!
        Pack(*Packet) error
        // Unpack reads bytes from the connection to the Packet.
        // Note: Concurrent unsafe!
        Unpack(*Packet) error
    }
    ProtoFunc func(io.ReadWriter) Proto
)

Next, you can specify the communication protocol in the following ways:

func SetDefaultProtoFunc(socket.ProtoFunc)
type Peer interface {
    ...
    ServeConn(conn net.Conn, protoFunc ...socket.ProtoFunc) Session
    DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
    Dial(addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
    Listen(protoFunc ...socket.ProtoFunc) error
    ...
}

5. Usage

5.1 Peer(server or client) Demo
// Start a server
var peer1 = tp.NewPeer(tp.PeerConfig{
    ListenAddress: "0.0.0.0:9090", // for server role
})
peer1.Listen()

...

// Start a client
var peer2 = tp.NewPeer(tp.PeerConfig{})
var sess, err = peer2.Dial("127.0.0.1:8080")
5.2 PullController Model Demo
type Aaa struct {
    tp.PullCtx
}
// XxZz register the route: /aaa/xx_zz
func (x *Aaa) XxZz(args *<T>) (<T>, *tp.Rerror) {
    ...
    return r, nil
}
// YyZz register the route: /aaa/yy_zz
func (x *Aaa) YyZz(args *<T>) (<T>, *tp.Rerror) {
    ...
    return r, nil
}
5.3 PushController Model Demo
type Bbb struct {
    tp.PushCtx
}
// XxZz register the route: /bbb/yy_zz
func (b *Bbb) XxZz(args *<T>) *tp.Rerror {
    ...
    return r, nil
}
// YyZz register the route: /bbb/yy_zz
func (b *Bbb) YyZz(args *<T>) *tp.Rerror {
    ...
    return r, nil
}
5.4 UnknownPullHandler Type Demo
func XxxUnknownPull (ctx tp.UnknownPullCtx) (interface{}, *tp.Rerror) {
    ...
    return r, nil
}
5.5 UnknownPushHandler Type Demo
func XxxUnknownPush(ctx tp.UnknownPushCtx) *tp.Rerror {
    ...
    return nil
}
5.6 Plugin Demo
// NewIgnoreCase Returns a ignoreCase plugin.
func NewIgnoreCase() *ignoreCase {
    return &ignoreCase{}
}

type ignoreCase struct{}

var (
    _ tp.PostReadPullHeaderPlugin = new(ignoreCase)
    _ tp.PostReadPushHeaderPlugin = new(ignoreCase)
)

func (i *ignoreCase) Name() string {
    return "ignoreCase"
}

func (i *ignoreCase) PostReadPullHeader(ctx tp.ReadCtx) *tp.Rerror {
    // Dynamic transformation path is lowercase
    ctx.Url().Path = strings.ToLower(ctx.Url().Path)
    return nil
}

func (i *ignoreCase) PostReadPushHeader(ctx tp.ReadCtx) *tp.Rerror {
    // Dynamic transformation path is lowercase
    ctx.Url().Path = strings.ToLower(ctx.Url().Path)
    return nil
}
5.7 Register above handler and plugin
// add router group
group := peer.SubRoute("test")
// register to test group
group.RoutePull(new(Aaa), NewIgnoreCase())
group.RoutePush(new(Bbb))
peer.SetUnknownPull(XxxUnknownPull)
peer.SetUnknownPush(XxxUnknownPush)
5.8 Config
type PeerConfig struct {
    Network            string        `yaml:"network"              ini:"network"              comment:"Network; tcp, tcp4, tcp6, unix or unixpacket"`
    ListenAddress      string        `yaml:"listen_address"       ini:"listen_address"       comment:"Listen address; for server role"`
    DefaultDialTimeout time.Duration `yaml:"default_dial_timeout" ini:"default_dial_timeout" comment:"Default maximum duration for dialing; for client role; ns,µs,ms,s,m,h"`
    RedialTimes        int32         `yaml:"redial_times"         ini:"redial_times"         comment:"The maximum times of attempts to redial, after the connection has been unexpectedly broken; for client role"`
    DefaultBodyCodec   string        `yaml:"default_body_codec"   ini:"default_body_codec"   comment:"Default body codec type id"`
    DefaultSessionAge  time.Duration `yaml:"default_session_age"  ini:"default_session_age"  comment:"Default session max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
    DefaultContextAge  time.Duration `yaml:"default_context_age"  ini:"default_context_age"  comment:"Default PULL or PUSH context max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
    SlowCometDuration  time.Duration `yaml:"slow_comet_duration"  ini:"slow_comet_duration"  comment:"Slow operation alarm threshold; ns,µs,ms,s ..."`
    PrintBody          bool          `yaml:"print_body"           ini:"print_body"           comment:"Is print body or not"`
    CountTime          bool          `yaml:"count_time"           ini:"count_time"           comment:"Is count cost time or not"`
}
5.9 Optimize
  • SetPacketSizeLimit sets max packet size. If maxSize<=0, set it to max uint32.

    func SetPacketSizeLimit(maxPacketSize uint32)
    
  • SetSocketKeepAlive sets whether the operating system should send keepalive messages on the connection.

    func SetSocketKeepAlive(keepalive bool)
    
  • SetSocketKeepAlivePeriod sets period between keep alives.

    func SetSocketKeepAlivePeriod(d time.Duration)
    
  • SetSocketNoDelay controls whether the operating system should delay packet transmission in hopes of sending fewer packets (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.

    func SetSocketNoDelay(_noDelay bool)
    
  • SetSocketReadBuffer sets the size of the operating system's receive buffer associated with the connection.

    func SetSocketReadBuffer(bytes int)
    
  • SetSocketWriteBuffer sets the size of the operating system's transmit buffer associated with the connection.

    func SetSocketWriteBuffer(bytes int)
    

6. Example

server.go
package main

import (
    "fmt"
    "time"

    tp "github.com/henrylee2cn/teleport"
)

func main() {
    svr := tp.NewPeer(tp.PeerConfig{
        CountTime:     true,
        ListenAddress: ":9090",
    })
    svr.RoutePull(new(math))
    svr.Listen()
}

type math struct {
    tp.PullCtx
}

func (m *math) Add(args *[]int) (int, *tp.Rerror) {
    if m.Query().Get("push_status") == "yes" {
        m.Session().Push(
            "/push/status",
            fmt.Sprintf("%d numbers are being added...", len(*args)),
        )
        time.Sleep(time.Millisecond * 10)
    }
    var r int
    for _, a := range *args {
        r += a
    }
    return r, nil
}
client.go
package main

import (
    tp "github.com/henrylee2cn/teleport"
)

func main() {
    tp.SetLoggerLevel("ERROR")
    cli := tp.NewPeer(tp.PeerConfig{})
    defer cli.Close()
    cli.RoutePush(new(push))
    sess, err := cli.Dial(":9090")
    if err != nil {
        tp.Fatalf("%v", err)
    }

    var reply int
    rerr := sess.Pull("/math/add?push_status=yes",
        []int{1, 2, 3, 4, 5},
        &reply,
    ).Rerror()

    if rerr != nil {
        tp.Fatalf("%v", rerr)
    }
    tp.Printf("reply: %d", reply)
}

type push struct {
    tp.PushCtx
}

func (p *push) Status(args *string) *tp.Rerror {
    tp.Printf("server status: %s", *args)
    return nil
}

More

7. Extensions

Codec
package import description
json import "github.com/henrylee2cn/teleport/codec" JSON codec(teleport own)
protobuf import "github.com/henrylee2cn/teleport/codec" Protobuf codec(teleport own)
string import "github.com/henrylee2cn/teleport/codec" String codec(teleport own)
Plugin
package import description
auth import "github.com/henrylee2cn/teleport/plugin" A auth plugin for verifying peer at the first time
binder import binder "github.com/henrylee2cn/tp-ext/plugin-binder" Parameter Binding Verification for Struct Handler
heartbeat import heartbeat "github.com/henrylee2cn/tp-ext/plugin-heartbeat" A generic timing heartbeat plugin
proxy import "github.com/henrylee2cn/teleport/plugin" A proxy plugin for handling unknown pulling or pushing
Protocol
package import description
fastproto import "github.com/henrylee2cn/teleport/socket A fast socket communication protocol(teleport default protocol)
jsonproto import jsonproto "github.com/henrylee2cn/tp-ext/proto-jsonproto" A JSON socket communication protocol
Transfer-Filter
package import description
gzip import "github.com/henrylee2cn/teleport/xfer" Gzip(teleport own)
md5Hash import md5Hash "github.com/henrylee2cn/tp-ext/xfer-md5Hash" Provides a integrity check transfer filter
Module
package import description
cliSession import cliSession "github.com/henrylee2cn/tp-ext/mod-cliSession" Client session which has connection pool
websocket import websocket "github.com/henrylee2cn/tp-ext/mod-websocket" Makes the Teleport framework compatible with websocket protocol as specified in RFC 6455

Extensions Repository

8. Projects based on Teleport

project description
ant Ant is a simple and flexible microservice framework based on Teleport
ants Ants is a highly available microservice platform based on Ant and Teleport
pholcus Pholcus is a distributed, high concurrency and powerful web crawler software

9. Business Users

深圳市梦之舵信息技术有限公司    北京风行在线技术有限公司    北京可即时代网络公司

10. License

Teleport is under Apache v2 License. See the LICENSE file for the full license text

Documentation

Overview

Package tp (teleport) is a versatile, high-performance and flexible TCP socket framework. It can be used for peer-peer, rpc, gateway, micro services, push services, game services and so on.

Copyright 2015-2017 HenryLee. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	TypeUndefined byte = 0
	TypePull      byte = 1
	TypeReply     byte = 2 // reply to pull
	TypePush      byte = 3
)

Packet types

View Source
const (
	CodeUnknownError    = -1
	CodeDialFailed      = 105
	CodeConnClosed      = 102
	CodeWriteFailed     = 104
	CodeBadPacket       = 400
	CodeNotFound        = 404
	CodePtypeNotAllowed = 405
	CodeHandleTimeout   = 408
)

Internal Framework Rerror code. Note: Recommended custom code is greater than 1000.

View Source
const (
	// MetaRerrorKey reply error metadata key
	MetaRerrorKey = "X-Reply-Error"
	// MetaRealId real ID metadata key
	MetaRealId = "X-Real-ID"
	// MetaRealIp real IP metadata key
	MetaRealIp = "X-Real-IP"
)

Variables

View Source
var (
	// FirstSweep is first executed.
	// Usage: share github.com/henrylee2cn/goutil/graceful with other project.
	FirstSweep func() error
	// BeforeExiting is executed before process exiting.
	// Usage: share github.com/henrylee2cn/goutil/graceful with other project.
	BeforeExiting func() error
)
View Source
var DefaultProtoFunc = socket.DefaultProtoFunc

DefaultProtoFunc gets the default builder of socket communication protocol

func DefaultProtoFunc() socket.ProtoFunc
View Source
var ErrListenClosed = errors.New("listener is closed")

ErrListenClosed listener is closed error.

View Source
var GetPacket = socket.GetPacket

GetPacket gets a *Packet form packet stack. Note:

newBodyFunc is only for reading form connection;
settings are only for writing to connection.
func GetPacket(settings ...socket.PacketSetting) *socket.Packet
View Source
var GetReadLimit = socket.PacketSizeLimit

GetReadLimit gets the packet size upper limit of reading.

GetReadLimit() uint32
View Source
var PutPacket = socket.PutPacket

PutPacket puts a *socket.Packet to packet stack.

func PutPacket(p *socket.Packet)
View Source
var SetDefaultProtoFunc = socket.SetDefaultProtoFunc

SetDefaultProtoFunc sets the default builder of socket communication protocol

func SetDefaultProtoFunc(protoFunc socket.ProtoFunc)

SetReadLimit sets max packet size. If maxSize<=0, set it to max uint32.

func SetReadLimit(maxPacketSize uint32)
View Source
var SetSocketKeepAlive = socket.SetKeepAlive

SetSocketKeepAlive sets whether the operating system should send keepalive messages on the connection. Note: If have not called the function, the system defaults are used.

func SetSocketKeepAlive(keepalive bool)
View Source
var SetSocketKeepAlivePeriod = socket.SetKeepAlivePeriod

SetSocketKeepAlivePeriod sets period between keep alives. Note: if d<0, don't change the value.

func SetSocketKeepAlivePeriod(d time.Duration)
View Source
var SetSocketNoDelay = socket.SetNoDelay

SetSocketNoDelay controls whether the operating system should delay packet transmission in hopes of sending fewer packets (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.

func SetSocketNoDelay(noDelay bool)
View Source
var SetSocketReadBuffer = socket.SetReadBuffer

SetSocketReadBuffer sets the size of the operating system's receive buffer associated with the connection. Note: if bytes<0, don't change the value.

func SetSocketReadBuffer(bytes int)
View Source
var SetSocketWriteBuffer = socket.SetWriteBuffer

SetSocketWriteBuffer sets the size of the operating system's transmit buffer associated with the connection. Note: if bytes<0, don't change the value.

func SetSocketWriteBuffer(bytes int)
View Source
var SocketReadBuffer = socket.ReadBuffer

SocketReadBuffer returns the size of the operating system's receive buffer associated with the connection. Note: if using the system default value, bytes=-1 and isDefault=true.

func SocketReadBuffer() (bytes int, isDefault bool)
View Source
var SocketWriteBuffer = socket.WriteBuffer

SocketWriteBuffer returns the size of the operating system's transmit buffer associated with the connection. Note: if using the system default value, bytes=-1 and isDefault=true.

func SocketWriteBuffer() (bytes int, isDefault bool)
View Source
var WithAddMeta = socket.WithAddMeta

WithAddMeta adds 'key=value' metadata argument. Multiple values for the same key may be added.

func WithAddMeta(key, value string) socket.PacketSetting
View Source
var WithBody = socket.WithBody

WithBody sets the body object.

func WithBody(body interface{}) socket.PacketSetting
View Source
var WithBodyCodec = socket.WithBodyCodec

WithBodyCodec sets the body codec.

func WithBodyCodec(bodyCodec byte) socket.PacketSetting
View Source
var WithContext = socket.WithContext

WithContext sets the packet handling context.

func WithContext(ctx context.Context) socket.PacketSetting
View Source
var WithNewBody = socket.WithNewBody

WithNewBody resets the function of geting body.

func WithNewBody(newBodyFunc socket.NewBodyFunc) socket.PacketSetting
View Source
var WithPtype = socket.WithPtype

WithPtype sets the packet type.

func WithPtype(ptype byte) socket.PacketSetting
View Source
var WithQuery = socket.WithQuery

WithQuery sets the packet URL query parameter.

func WithQuery(key, value string) socket.PacketSetting
View Source
var WithSeq = socket.WithSeq

WithSeq sets the packet sequence.

func WithSeq(seq uint64) socket.PacketSetting
View Source
var WithSetMeta = socket.WithSetMeta

WithSetMeta sets 'key=value' metadata argument.

func WithSetMeta(key, value string) socket.PacketSetting
View Source
var WithUri = socket.WithUri

WithUri sets the packet URL string.

func WithUri(uri string) socket.PacketSetting
View Source
var WithXferPipe = socket.WithXferPipe

WithXferPipe sets transfer filter pipe.

func WithXferPipe(filterId ...byte) socket.PacketSetting

Functions

func AnywayGo

func AnywayGo(fn func())

AnywayGo similar to go func, but concurrent resources are limited.

func CodeText

func CodeText(rerrCode int32) string

CodeText returns the reply error code text. If the type is undefined returns 'Unknown Error'.

func Criticalf

func Criticalf(format string, args ...interface{})

Criticalf logs a message using CRITICAL as log level.

func Debugf

func Debugf(format string, args ...interface{})

Debugf logs a message using DEBUG as log level.

func Errorf

func Errorf(format string, args ...interface{})

Errorf logs a message using ERROR as log level.

func Fatalf

func Fatalf(format string, args ...interface{})

Fatalf is equivalent to l.Criticalf followed by a call to os.Exit(1).

func Go

func Go(fn func()) bool

Go similar to go func, but return false if insufficient resources.

func GraceSignal

func GraceSignal()

GraceSignal open graceful shutdown or reboot signal.

func Infof

func Infof(format string, args ...interface{})

Infof logs a message using INFO as log level.

func IsConnRerror

func IsConnRerror(rerr *Rerror) bool

IsConnRerror determines whether the error is a connection error

func NewInheritListener

func NewInheritListener(network, laddr string, tlsConfig *tls.Config) (net.Listener, error)

NewInheritListener creates a new listener that can be inherited on reboot.

func NewTlsConfigFromFile

func NewTlsConfigFromFile(tlsCertFile, tlsKeyFile string) (*tls.Config, error)

NewTlsConfigFromFile creates a new TLS config.

func Noticef

func Noticef(format string, args ...interface{})

Noticef logs a message using NOTICE as log level.

func Panicf

func Panicf(format string, args ...interface{})

Panicf is equivalent to l.Criticalf followed by a call to panic().

func Printf

func Printf(format string, args ...interface{})

Printf formats according to a format specifier and writes to standard output. It returns the number of bytes written and any write error encountered.

func Reboot

func Reboot(timeout ...time.Duration)

Reboot all the frame process gracefully. Notes: Windows system are not supported!

func SetGopool

func SetGopool(maxGoroutinesAmount int, maxGoroutineIdleDuration time.Duration)

SetGopool set or reset go pool config. Note: Make sure to call it before calling NewPeer() and Go()

func SetLogger

func SetLogger(logger Logger)

SetLogger sets global logger. Note: Concurrent is not safe!

func SetLoggerLevel

func SetLoggerLevel(level string)

SetLoggerLevel sets the logger's level.

func SetShutdown

func SetShutdown(timeout time.Duration, firstSweep, beforeExiting func() error)

SetShutdown sets the function which is called after the process shutdown, and the time-out period for the process shutdown. If 0<=timeout<5s, automatically use 'MinShutdownTimeout'(5s). If timeout<0, indefinite period. 'firstSweep' is first executed. 'beforeExiting' is executed before process exiting.

func Shutdown

func Shutdown(timeout ...time.Duration)

Shutdown closes all the frame process gracefully. Parameter timeout is used to reset time-out period for the process shutdown.

func Tracef

func Tracef(format string, args ...interface{})

Tracef logs a message using TRACE as log level.

func TypeText

func TypeText(typ byte) string

TypeText returns the packet type text. If the type is undefined returns 'Undefined'.

func Warnf

func Warnf(format string, args ...interface{})

Warnf logs a message using WARNING as log level.

func WithRealId

func WithRealId(id string) socket.PacketSetting

WithRealId sets the real ID to metadata.

func WithRealIp

func WithRealIp(ip string) socket.PacketSetting

WithRealIp sets the real IP to metadata.

Types

type BaseCtx

type BaseCtx interface {
	PreCtx
	// Seq returns the input packet sequence.
	Seq() uint64
	// PeekMeta peeks the header metadata for the input packet.
	PeekMeta(key string) []byte
	// VisitMeta calls f for each existing metadata.
	//
	// f must not retain references to key and value after returning.
	// Make key and/or value copies if you need storing them after returning.
	VisitMeta(f func(key, value []byte))
	// CopyMeta returns the input packet metadata copy.
	CopyMeta() *utils.Args
	// Uri returns the input packet uri.
	Uri() string
	// ChangeUri changes the input packet uri.
	ChangeUri(string)
	// Url returns the input packet uri object.
	Url() *url.URL
	// Path returns the input packet uri path.
	Path() string
	// Query returns the input packet uri query object.
	Query() url.Values
}

BaseCtx common context method set.

type BasePeer

type BasePeer interface {
	// Close closes peer.
	Close() (err error)
	// CountSession returns the number of sessions.
	CountSession() int
	// Dial connects with the peer of the destination address.
	Dial(addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
	// DialContext connects with the peer of the destination address, using the provided context.
	DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
	// GetSession gets the session by id.
	GetSession(sessionId string) (Session, bool)
	// RangeSession ranges all sessions. If fn returns false, stop traversing.
	RangeSession(fn func(sess Session) bool)
	// ServeConn serves the connection and returns a session.
	ServeConn(conn net.Conn, protoFunc ...socket.ProtoFunc) (Session, error)
	// SetTlsConfig sets the TLS config.
	SetTlsConfig(tlsConfig *tls.Config)
	// SetTlsConfigFromFile sets the TLS config from file.
	SetTlsConfigFromFile(tlsCertFile, tlsKeyFile string) error
	// TlsConfig returns the TLS config.
	TlsConfig() *tls.Config
}

BasePeer peer with the common method set

type BaseSession

type BaseSession interface {
	// Id returns the session id.
	Id() string
	// Peer returns the peer.
	Peer() Peer
	// LocalIp returns the local peer ip.
	LocalIp() string
	// RemoteIp returns the remote peer ip.
	RemoteIp() string
	// Public returns temporary public data of session(socket).
	Public() goutil.Map
	// PublicLen returns the length of public data of session(socket).
	PublicLen() int
}

BaseSession a connection session with the common method set.

type EarlyPeer

type EarlyPeer interface {
	BasePeer
	// Router returns the root router of pull or push handlers.
	Router() *Router
	// SubRoute adds handler group.
	SubRoute(pathPrefix string, plugin ...Plugin) *SubRouter
	// RoutePull registers PULL handler.
	RoutePull(ctrlStruct interface{}, plugin ...Plugin)
	// RoutePush registers PUSH handler.
	RoutePush(ctrlStruct interface{}, plugin ...Plugin)
	// SetUnknownPull sets the default handler, which is called when no handler for PULL is found.
	SetUnknownPull(fn func(UnknownPullCtx) (interface{}, *Rerror), plugin ...Plugin)
	// SetUnknownPush sets the default handler, which is called when no handler for PUSH is found.
	SetUnknownPush(fn func(UnknownPushCtx) *Rerror, plugin ...Plugin)
}

EarlyPeer the communication peer that has just been created

type EarlySession

type EarlySession interface {
	BaseSession
	// SetId sets the session id.
	SetId(newId string)
	// Conn returns the connection.
	Conn() net.Conn
	// ResetConn resets the connection.
	// Note:
	// only reset net.Conn, but not reset socket.ProtoFunc;
	// inherit the previous session id.
	ResetConn(net.Conn, ...socket.ProtoFunc)
	// GetProtoFunc returns the socket.ProtoFunc
	GetProtoFunc() socket.ProtoFunc
	// Send sends packet to peer, before the formal connection.
	// Note:
	// the external setting seq is invalid, the internal will be forced to set;
	// does not support automatic redial after disconnection.
	Send(uri string, body interface{}, rerr *Rerror, setting ...socket.PacketSetting) *Rerror
	// Receive receives a packet from peer, before the formal connection.
	// Note: does not support automatic redial after disconnection.
	Receive(socket.NewBodyFunc, ...socket.PacketSetting) (*socket.Packet, *Rerror)
	// SessionAge returns the session max age.
	SessionAge() time.Duration
	// ContextAge returns PULL or PUSH context max age.
	ContextAge() time.Duration
	// SetSessionAge sets the session max age.
	SetSessionAge(duration time.Duration)
	// SetContextAge sets PULL or PUSH context max age.
	SetContextAge(duration time.Duration)
}

EarlySession a connection session that has not started reading goroutine.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler pull or push handler type info

func (*Handler) ArgElemType

func (h *Handler) ArgElemType() reflect.Type

ArgElemType returns the handler arg elem type.

func (*Handler) IsPull

func (h *Handler) IsPull() bool

IsPull checks if it is pull handler or not.

func (*Handler) IsPush

func (h *Handler) IsPush() bool

IsPush checks if it is push handler or not.

func (*Handler) IsUnknown

func (h *Handler) IsUnknown() bool

IsUnknown checks if it is unknown handler(pull/push) or not.

func (*Handler) Name

func (h *Handler) Name() string

Name returns the handler name.

func (*Handler) NewArgValue

func (h *Handler) NewArgValue() reflect.Value

NewArgValue creates a new arg elem value.

func (*Handler) ReplyType

func (h *Handler) ReplyType() reflect.Type

ReplyType returns the handler reply type

func (*Handler) RouterTypeName

func (h *Handler) RouterTypeName() string

RouterTypeName returns the router type name.

type HandlersMaker

type HandlersMaker func(string, interface{}, *PluginContainer) ([]*Handler, error)

HandlersMaker makes []*Handler

type Logger

type Logger interface {
	// Level returns the logger's level.
	Level() string
	// SetLevel sets the logger's level.
	SetLevel(level string)
	// Printf formats according to a format specifier and writes to standard output.
	// It returns the number of bytes written and any write error encountered.
	Printf(format string, args ...interface{})
	// Fatalf is equivalent to Criticalf followed by a call to os.Exit(1).
	Fatalf(format string, args ...interface{})
	// Panicf is equivalent to Criticalf followed by a call to panic().
	Panicf(format string, args ...interface{})
	// Criticalf logs a message using CRITICAL as log level.
	Criticalf(format string, args ...interface{})
	// Errorf logs a message using ERROR as log level.
	Errorf(format string, args ...interface{})
	// Warnf logs a message using WARNING as log level.
	Warnf(format string, args ...interface{})
	// Noticef logs a message using NOTICE as log level.
	Noticef(format string, args ...interface{})
	// Infof logs a message using INFO as log level.
	Infof(format string, args ...interface{})
	// Debugf logs a message using DEBUG as log level.
	Debugf(format string, args ...interface{})
	// Tracef logs a message using TRACE as log level.
	Tracef(format string, args ...interface{})
}

Logger interface

type Peer

type Peer interface {
	EarlyPeer
	// Listen turns on the listening service.
	Listen(protoFunc ...socket.ProtoFunc) error
}

Peer the communication peer which is server or client role

func NewPeer

func NewPeer(cfg PeerConfig, plugin ...Plugin) Peer

NewPeer creates a new peer.

type PeerConfig

type PeerConfig struct {
	Network            string        `yaml:"network"              ini:"network"              comment:"Network; tcp, tcp4, tcp6, unix or unixpacket"`
	ListenAddress      string        `yaml:"listen_address"       ini:"listen_address"       comment:"Listen address; for server role"`
	DefaultDialTimeout time.Duration `` /* 135-byte string literal not displayed */
	RedialTimes        int32         `` /* 172-byte string literal not displayed */
	DefaultBodyCodec   string        `yaml:"default_body_codec"   ini:"default_body_codec"   comment:"Default body codec type id"`
	DefaultSessionAge  time.Duration `` /* 148-byte string literal not displayed */
	DefaultContextAge  time.Duration `` /* 161-byte string literal not displayed */
	SlowCometDuration  time.Duration `yaml:"slow_comet_duration"  ini:"slow_comet_duration"  comment:"Slow operation alarm threshold; ns,µs,ms,s ..."`
	PrintBody          bool          `yaml:"print_body"           ini:"print_body"           comment:"Is print body or not"`
	CountTime          bool          `yaml:"count_time"           ini:"count_time"           comment:"Is count cost time or not"`
	// contains filtered or unexported fields
}

PeerConfig peer config Note:

yaml tag is used for github.com/henrylee2cn/cfgo
ini tag is used for github.com/henrylee2cn/ini

func (*PeerConfig) Reload

func (p *PeerConfig) Reload(bind cfgo.BindFunc) error

Reload Bi-directionally synchronizes config between YAML file and memory.

type Plugin

type Plugin interface {
	Name() string
}

Plugin plugin background

type PluginContainer

type PluginContainer struct {
	// contains filtered or unexported fields
}

PluginContainer plugins container.

func (*PluginContainer) AppendLeft

func (p *PluginContainer) AppendLeft(plugins ...Plugin)

AppendLeft appends plugins on the left side of the pluginContainer.

func (*PluginContainer) AppendRight

func (p *PluginContainer) AppendRight(plugins ...Plugin)

AppendRight appends plugins on the right side of the pluginContainer.

func (*PluginContainer) GetAll

func (p *PluginContainer) GetAll() []Plugin

GetAll returns all activated plugins.

func (*PluginContainer) GetByName

func (p *PluginContainer) GetByName(pluginName string) Plugin

GetByName returns a plugin instance by it's name.

func (*PluginContainer) PostAccept

func (p *PluginContainer) PostAccept(sess EarlySession) *Rerror

PostAccept executes the defined plugins after accepting connection.

func (*PluginContainer) PostDial

func (p *PluginContainer) PostDial(sess EarlySession) *Rerror

PostDial executes the defined plugins after dialing.

func (*PluginContainer) PostDisconnect

func (p *PluginContainer) PostDisconnect(sess BaseSession) *Rerror

PostDisconnect executes the defined plugins after disconnectingy.

func (*PluginContainer) PostListen

func (p *PluginContainer) PostListen(addr net.Addr)

PostListen is executed between listening and accepting.

func (*PluginContainer) PostNewPeer

func (p *PluginContainer) PostNewPeer(peer EarlyPeer)

PostNewPeer executes the defined plugins after creating peer.

func (*PluginContainer) PostReadPullBody

func (p *PluginContainer) PostReadPullBody(ctx ReadCtx) *Rerror

PostReadPullBody executes the defined plugins after reading PULL packet body.

func (*PluginContainer) PostReadPullHeader

func (p *PluginContainer) PostReadPullHeader(ctx ReadCtx) *Rerror

PostReadPullHeader executes the defined plugins after reading PULL packet header.

func (*PluginContainer) PostReadPushBody

func (p *PluginContainer) PostReadPushBody(ctx ReadCtx) *Rerror

PostReadPushBody executes the defined plugins after reading PUSH packet body.

func (*PluginContainer) PostReadPushHeader

func (p *PluginContainer) PostReadPushHeader(ctx ReadCtx) *Rerror

PostReadPushHeader executes the defined plugins after reading PUSH packet header.

func (*PluginContainer) PostReadReplyBody

func (p *PluginContainer) PostReadReplyBody(ctx ReadCtx) *Rerror

PostReadReplyBody executes the defined plugins after reading REPLY packet body.

func (*PluginContainer) PostReadReplyHeader

func (p *PluginContainer) PostReadReplyHeader(ctx ReadCtx) *Rerror

PostReadReplyHeader executes the defined plugins after reading REPLY packet header.

func (*PluginContainer) PostReg

func (p *PluginContainer) PostReg(h *Handler)

PostReg executes the defined plugins before registering handler.

func (*PluginContainer) PostWritePull

func (p *PluginContainer) PostWritePull(ctx WriteCtx) *Rerror

PostWritePull executes the defined plugins after successful writing PULL packet.

func (*PluginContainer) PostWritePush

func (p *PluginContainer) PostWritePush(ctx WriteCtx) *Rerror

PostWritePush executes the defined plugins after successful writing PUSH packet.

func (*PluginContainer) PostWriteReply

func (p *PluginContainer) PostWriteReply(ctx WriteCtx) *Rerror

PostWriteReply executes the defined plugins after successful writing REPLY packet.

func (*PluginContainer) PreNewPeer

func (p *PluginContainer) PreNewPeer(peerConfig *PeerConfig)

PreNewPeer executes the defined plugins before creating peer.

func (*PluginContainer) PreReadHeader

func (p *PluginContainer) PreReadHeader(ctx PreCtx) *Rerror

PreReadHeader executes the defined plugins before reading packet header.

func (*PluginContainer) PreReadPullBody

func (p *PluginContainer) PreReadPullBody(ctx ReadCtx) *Rerror

PreReadPullBody executes the defined plugins before reading PULL packet body.

func (*PluginContainer) PreReadPushBody

func (p *PluginContainer) PreReadPushBody(ctx ReadCtx) *Rerror

PreReadPushBody executes the defined plugins before reading PUSH packet body.

func (*PluginContainer) PreReadReplyBody

func (p *PluginContainer) PreReadReplyBody(ctx ReadCtx) *Rerror

PreReadReplyBody executes the defined plugins before reading REPLY packet body.

func (*PluginContainer) PreWritePull

func (p *PluginContainer) PreWritePull(ctx WriteCtx) *Rerror

PreWritePull executes the defined plugins before writing PULL packet.

func (*PluginContainer) PreWritePush

func (p *PluginContainer) PreWritePush(ctx WriteCtx) *Rerror

PreWritePush executes the defined plugins before writing PUSH packet.

func (*PluginContainer) PreWriteReply

func (p *PluginContainer) PreWriteReply(ctx WriteCtx) *Rerror

PreWriteReply executes the defined plugins before writing REPLY packet.

func (*PluginContainer) Remove

func (p *PluginContainer) Remove(pluginName string) error

Remove removes a plugin by it's name.

type PostAcceptPlugin

type PostAcceptPlugin interface {
	Plugin
	PostAccept(EarlySession) *Rerror
}

PostAcceptPlugin is executed after accepting connection.

type PostDialPlugin

type PostDialPlugin interface {
	Plugin
	PostDial(EarlySession) *Rerror
}

PostDialPlugin is executed after dialing.

type PostDisconnectPlugin

type PostDisconnectPlugin interface {
	Plugin
	PostDisconnect(BaseSession) *Rerror
}

PostDisconnectPlugin is executed after disconnectingy.

type PostListenPlugin

type PostListenPlugin interface {
	Plugin
	PostListen() error
}

PostListenPlugin is executed between listening and accepting.

type PostNewPeerPlugin

type PostNewPeerPlugin interface {
	Plugin
	PostNewPeer(EarlyPeer) error
}

PostNewPeerPlugin is executed after creating peer.

type PostReadPullBodyPlugin

type PostReadPullBodyPlugin interface {
	Plugin
	PostReadPullBody(ReadCtx) *Rerror
}

PostReadPullBodyPlugin is executed after reading PULL packet body.

type PostReadPullHeaderPlugin

type PostReadPullHeaderPlugin interface {
	Plugin
	PostReadPullHeader(ReadCtx) *Rerror
}

PostReadPullHeaderPlugin is executed after reading PULL packet header.

type PostReadPushBodyPlugin

type PostReadPushBodyPlugin interface {
	Plugin
	PostReadPushBody(ReadCtx) *Rerror
}

PostReadPushBodyPlugin is executed after reading PUSH packet body.

type PostReadPushHeaderPlugin

type PostReadPushHeaderPlugin interface {
	Plugin
	PostReadPushHeader(ReadCtx) *Rerror
}

PostReadPushHeaderPlugin is executed after reading PUSH packet header.

type PostReadReplyBodyPlugin

type PostReadReplyBodyPlugin interface {
	Plugin
	PostReadReplyBody(ReadCtx) *Rerror
}

PostReadReplyBodyPlugin is executed after reading REPLY packet body.

type PostReadReplyHeaderPlugin

type PostReadReplyHeaderPlugin interface {
	Plugin
	PostReadReplyHeader(ReadCtx) *Rerror
}

PostReadReplyHeaderPlugin is executed after reading REPLY packet header.

type PostRegPlugin

type PostRegPlugin interface {
	Plugin
	PostReg(*Handler) error
}

PostRegPlugin is executed after registering handler.

type PostWritePullPlugin

type PostWritePullPlugin interface {
	Plugin
	PostWritePull(WriteCtx) *Rerror
}

PostWritePullPlugin is executed after successful writing PULL packet.

type PostWritePushPlugin

type PostWritePushPlugin interface {
	Plugin
	PostWritePush(WriteCtx) *Rerror
}

PostWritePushPlugin is executed after successful writing PUSH packet.

type PostWriteReplyPlugin

type PostWriteReplyPlugin interface {
	Plugin
	PostWriteReply(WriteCtx) *Rerror
}

PostWriteReplyPlugin is executed after successful writing REPLY packet.

type PreCtx

type PreCtx interface {
	// Peer returns the peer.
	Peer() Peer
	// Session returns the session.
	Session() Session
	// Id returns the session id.
	Id() string
	// RealId returns the real remote id.
	RealId() string
	// Ip returns the remote addr.
	Ip() string
	// RealIp returns the the current real remote addr.
	RealIp() string
	// Public returns temporary public data of context.
	Public() goutil.Map
	// PublicLen returns the length of public data of context.
	PublicLen() int
	// Rerror returns the handle error.
	Rerror() *Rerror
	// Context carries a deadline, a cancelation signal, and other values across
	// API boundaries.
	Context() context.Context
}

PreCtx context method set used before reading packet header.

type PreNewPeerPlugin

type PreNewPeerPlugin interface {
	Plugin
	PreNewPeer(*PeerConfig, *PluginContainer) error
}

PreNewPeerPlugin is executed before creating peer.

type PreReadHeaderPlugin

type PreReadHeaderPlugin interface {
	Plugin
	PreReadHeader(PreCtx) *Rerror
}

PreReadHeaderPlugin is executed before reading packet header.

type PreReadPullBodyPlugin

type PreReadPullBodyPlugin interface {
	Plugin
	PreReadPullBody(ReadCtx) *Rerror
}

PreReadPullBodyPlugin is executed before reading PULL packet body.

type PreReadPushBodyPlugin

type PreReadPushBodyPlugin interface {
	Plugin
	PreReadPushBody(ReadCtx) *Rerror
}

PreReadPushBodyPlugin is executed before reading PUSH packet body.

type PreReadReplyBodyPlugin

type PreReadReplyBodyPlugin interface {
	Plugin
	PreReadReplyBody(ReadCtx) *Rerror
}

PreReadReplyBodyPlugin is executed before reading REPLY packet body.

type PreWritePullPlugin

type PreWritePullPlugin interface {
	Plugin
	PreWritePull(WriteCtx) *Rerror
}

PreWritePullPlugin is executed before writing PULL packet.

type PreWritePushPlugin

type PreWritePushPlugin interface {
	Plugin
	PreWritePush(WriteCtx) *Rerror
}

PreWritePushPlugin is executed before writing PUSH packet.

type PreWriteReplyPlugin

type PreWriteReplyPlugin interface {
	Plugin
	PreWriteReply(WriteCtx) *Rerror
}

PreWriteReplyPlugin is executed before writing REPLY packet.

type PullCmd

type PullCmd interface {
	// Output returns writed packet.
	Output() *socket.Packet
	// Context carries a deadline, a cancelation signal, and other values across
	// API boundaries.
	Context() context.Context
	// Result returns the pull result.
	Result() (interface{}, *Rerror)
	// Rerror returns the pull error.
	Rerror() *Rerror
	// InputMeta returns the header metadata of input packet.
	InputMeta() *utils.Args
	// CostTime returns the pulled cost time.
	// If PeerConfig.CountTime=false, always returns 0.
	CostTime() time.Duration
}

PullCmd the command of the pulling operation's response.

func NewFakePullCmd

func NewFakePullCmd(uri string, args, reply interface{}, rerr *Rerror) PullCmd

NewFakePullCmd creates a fake PullCmd.

type PullCtx

type PullCtx interface {
	BaseCtx
	// Input returns readed packet.
	Input() *socket.Packet
	// GetBodyCodec gets the body codec type of the input packet.
	GetBodyCodec() byte
	// Output returns writed packet.
	Output() *socket.Packet
	// SetBodyCodec sets the body codec for reply packet.
	SetBodyCodec(byte)
	// AddMeta adds the header metadata 'key=value' for reply packet.
	// Multiple values for the same key may be added.
	AddMeta(key, value string)
	// SetMeta sets the header metadata 'key=value' for reply packet.
	SetMeta(key, value string)
	// AddXferPipe appends transfer filter pipe of reply packet.
	AddXferPipe(filterId ...byte)
}

PullCtx context method set for handling the pulled packet. For example:

type HomePull struct{ PullCtx }

type PushCtx

type PushCtx interface {
	BaseCtx
	// GetBodyCodec gets the body codec type of the input packet.
	GetBodyCodec() byte
}

PushCtx context method set for handling the pushed packet. For example:

type HomePush struct{ PushCtx }

type ReadCtx

type ReadCtx interface {
	BaseCtx
	// Input returns readed packet.
	Input() *socket.Packet
}

ReadCtx context method set for reading packet.

type Rerror

type Rerror struct {
	// Code error code
	Code int32
	// Message error message to the user (optional)
	Message string
	// Detail error's detailed reason (optional)
	Detail string
}

Rerror error only for reply packet

func NewRerror

func NewRerror(code int32, message, detail string) *Rerror

NewRerror creates a *Rerror.

func NewRerrorFromMeta

func NewRerrorFromMeta(meta *utils.Args) *Rerror

NewRerrorFromMeta creates a *Rerror from 'X-Reply-Error' metadata. Return nil if there is no 'X-Reply-Error' in metadata.

func ToRerror

func ToRerror(err error) *Rerror

ToRerror converts error to *Rerror

func (Rerror) Copy

func (r Rerror) Copy() *Rerror

Copy returns the copy of Rerror

func (*Rerror) MarshalJSON

func (r *Rerror) MarshalJSON() ([]byte, error)

MarshalJSON marshals Rerror into JSON, implements json.Marshaler interface.

func (*Rerror) SetToMeta

func (r *Rerror) SetToMeta(meta *utils.Args)

SetToMeta sets self to 'X-Reply-Error' metadata.

func (*Rerror) String

func (r *Rerror) String() string

String prints error info.

func (*Rerror) ToError

func (r *Rerror) ToError() error

ToError converts to error

func (*Rerror) UnmarshalJSON

func (r *Rerror) UnmarshalJSON(b []byte) error

UnmarshalJSON unmarshals a JSON description of self.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router the router of pull or push handlers.

func (*Router) RoutePull

func (r *Router) RoutePull(ctrlStruct interface{}, plugin ...Plugin)

RoutePull registers PULL handler.

func (*Router) RoutePush

func (r *Router) RoutePush(ctrlStruct interface{}, plugin ...Plugin)

RoutePush registers PUSH handler.

func (*Router) SetUnknownPull

func (r *Router) SetUnknownPull(fn func(UnknownPullCtx) (interface{}, *Rerror), plugin ...Plugin)

SetUnknownPull sets the default handler, which is called when no handler for PULL is found.

func (*Router) SetUnknownPush

func (r *Router) SetUnknownPush(fn func(UnknownPushCtx) *Rerror, plugin ...Plugin)

SetUnknownPush sets the default handler, which is called when no handler for PUSH is found.

func (*Router) SubRoute

func (r *Router) SubRoute(pathPrefix string, plugin ...Plugin) *SubRouter

SubRoute adds handler group.

type Session

type Session interface {
	BaseSession
	// SetId sets the session id.
	SetId(newId string)
	// Close closes the session.
	Close() error
	// Health checks if the session is usable.
	Health() bool
	// AsyncPull sends a packet and receives reply asynchronously.
	// If the args is []byte or *[]byte type, it can automatically fill in the body codec name.
	AsyncPull(uri string, args interface{}, reply interface{}, done chan PullCmd, setting ...socket.PacketSetting)
	// Pull sends a packet and receives reply.
	// Note:
	// If the args is []byte or *[]byte type, it can automatically fill in the body codec name;
	// If the session is a client role and PeerConfig.RedialTimes>0, it is automatically re-called once after a failure.
	Pull(uri string, args interface{}, reply interface{}, setting ...socket.PacketSetting) PullCmd
	// Push sends a packet, but do not receives reply.
	// Note:
	// If the args is []byte or *[]byte type, it can automatically fill in the body codec name;
	// If the session is a client role and PeerConfig.RedialTimes>0, it is automatically re-called once after a failure.
	Push(uri string, args interface{}, setting ...socket.PacketSetting) *Rerror
	// SessionAge returns the session max age.
	SessionAge() time.Duration
	// ContextAge returns PULL or PUSH context max age.
	ContextAge() time.Duration
}

Session a connection session.

type SessionHub

type SessionHub struct {
	// contains filtered or unexported fields
}

SessionHub sessions hub

func (*SessionHub) Delete

func (sh *SessionHub) Delete(id string)

Delete deletes the *session for a id.

func (*SessionHub) Get

func (sh *SessionHub) Get(id string) (*session, bool)

Get gets *session by id. If second returned arg is false, mean the *session is not found.

func (*SessionHub) Len

func (sh *SessionHub) Len() int

Len returns the length of the session hub. Note: the count implemented using sync.Map may be inaccurate.

func (*SessionHub) Random

func (sh *SessionHub) Random() (*session, bool)

Random gets a *session randomly. If third returned arg is false, mean no *session is exist.

func (*SessionHub) Range

func (sh *SessionHub) Range(fn func(*session) bool)

Range calls f sequentially for each id and *session present in the session hub. If fn returns false, stop traversing.

func (*SessionHub) Set

func (sh *SessionHub) Set(sess *session)

Set sets a *session.

type SubRouter

type SubRouter struct {
	// contains filtered or unexported fields
}

SubRouter without the SetUnknownPull and SetUnknownPush methods

func (*SubRouter) Root

func (r *SubRouter) Root() *Router

Root returns the root router.

func (*SubRouter) RoutePull

func (r *SubRouter) RoutePull(ctrlStruct interface{}, plugin ...Plugin)

RoutePull registers PULL handler.

func (*SubRouter) RoutePush

func (r *SubRouter) RoutePush(ctrlStruct interface{}, plugin ...Plugin)

RoutePush registers PUSH handler.

func (*SubRouter) SubRoute

func (r *SubRouter) SubRoute(pathPrefix string, plugin ...Plugin) *SubRouter

SubRoute adds handler group.

func (*SubRouter) ToRouter

func (r *SubRouter) ToRouter() *Router

ToRouter converts to the router which is added the SetUnknownPull and SetUnknownPush methods.

type UnknownPullCtx

type UnknownPullCtx interface {
	BaseCtx
	// GetBodyCodec gets the body codec type of the input packet.
	GetBodyCodec() byte
	// InputBodyBytes if the input body binder is []byte type, returns it, else returns nil.
	InputBodyBytes() []byte
	// Bind when the raw body binder is []byte type, now binds the input body to v.
	Bind(v interface{}) (bodyCodec byte, err error)
	// SetBodyCodec sets the body codec for reply packet.
	SetBodyCodec(byte)
	// AddMeta adds the header metadata 'key=value' for reply packet.
	// Multiple values for the same key may be added.
	AddMeta(key, value string)
	// SetMeta sets the header metadata 'key=value' for reply packet.
	SetMeta(key, value string)
	// AddXferPipe appends transfer filter pipe of reply packet.
	AddXferPipe(filterId ...byte)
}

UnknownPullCtx context method set for handling the unknown pulled packet.

type UnknownPushCtx

type UnknownPushCtx interface {
	BaseCtx
	// GetBodyCodec gets the body codec type of the input packet.
	GetBodyCodec() byte
	// InputBodyBytes if the input body binder is []byte type, returns it, else returns nil.
	InputBodyBytes() []byte
	// Bind when the raw body binder is []byte type, now binds the input body to v.
	Bind(v interface{}) (bodyCodec byte, err error)
}

UnknownPushCtx context method set for handling the unknown pushed packet.

type WriteCtx

type WriteCtx interface {
	PreCtx
	// Output returns writed packet.
	Output() *socket.Packet
}

WriteCtx context method set for writing packet.

Directories

Path Synopsis
Package codec is the body's codec set.
Package codec is the body's codec set.
samples
ab
age
Package socket provides a concise, powerful and high-performance TCP.
Package socket provides a concise, powerful and high-performance TCP.
example/pb
Package pb is a generated protocol buffer package.
Package pb is a generated protocol buffer package.
Package xfer is transfer filter set.
Package xfer is transfer filter set.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL