tp

package module
v3.7.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2018 License: Apache-2.0 Imports: 34 Imported by: 0

README

Teleport GitHub release report card github issues github closed issues GoDoc view examples

Teleport is a versatile, high-performance and flexible socket framework.

It can be used for peer-peer, rpc, gateway, micro services, push services, game services and so on.

简体中文

Teleport-Framework

Benchmark

Test Case

  • A server and a client process, running on the same machine
  • CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
  • Memory: 16G
  • OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
  • Go: 1.9.2
  • Message size: 581 bytes
  • Message codec: protobuf
  • Sent total 1000000 messages

Test Results

  • teleport
client concurrency mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 1 0 16 0 75505
500 9 11 97 0 52192
1000 19 24 187 0 50040
2000 39 54 409 0 42551
5000 96 128 1148 0 46367
  • teleport/socket
client concurrency mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 0 0 14 0 225682
500 2 1 24 0 212630
1000 4 3 51 0 180733
2000 8 6 64 0 183351
5000 21 18 651 0 133886

test code

  • Profile torch of teleport/socket

tp_socket_profile_torch

svg file

  • Heap torch of teleport/socket

tp_socket_heap_torch

svg file

Version

version status branch
v3 release v3
v2 release v2
v1 release v1

Install

go get -u -f github.com/henrylee2cn/teleport

Feature

  • Server and client are peer-to-peer, have the same API method
  • Support custom communication protocol
  • Support set the size of socket I/O buffer
  • Packet contains both Header and Body two parts
  • Support for customizing head and body coding types separately, e.g JSON Protobuf string
  • Packet Header contains metadata in the same format as http header
  • Support push, pull, reply and other means of communication
  • Support plug-in mechanism, can customize authentication, heartbeat, micro service registration center, statistics, etc.
  • Whether server or client, the peer support reboot and shutdown gracefully
  • Support reverse proxy
  • Detailed log information, support print input and output details
  • Supports setting slow operation alarm threshold
  • Use I/O multiplexing technology
  • Support setting the size of the reading packet (if exceed disconnect it)
  • Provide the context of the handler
  • Client session support automatically redials after disconnection
  • Support network list: tcp, tcp4, tcp6, unix, unixpacket and so on
  • Provide an operating interface to control the connection file descriptor

Example

server.go
package main

import (
    "fmt"
    "time"

    tp "github.com/henrylee2cn/teleport"
)

func main() {
    srv := tp.NewPeer(tp.PeerConfig{
        CountTime:     true,
        ListenAddress: ":9090",
    })
    srv.RoutePull(new(math))
    srv.ListenAndServe()
}

type math struct {
    tp.PullCtx
}

func (m *math) Add(args *[]int) (int, *tp.Rerror) {
    if m.Query().Get("push_status") == "yes" {
        m.Session().Push(
            "/push/status",
            fmt.Sprintf("%d numbers are being added...", len(*args)),
        )
        time.Sleep(time.Millisecond * 10)
    }
    var r int
    for _, a := range *args {
        r += a
    }
    return r, nil
}
client.go
package main

import (
    tp "github.com/henrylee2cn/teleport"
)

func main() {
    tp.SetLoggerLevel("ERROR")
    cli := tp.NewPeer(tp.PeerConfig{})
    defer cli.Close()
    cli.RoutePush(new(push))
    sess, err := cli.Dial(":9090")
    if err != nil {
        tp.Fatalf("%v", err)
    }

    var reply int
    rerr := sess.Pull("/math/add?push_status=yes",
        []int{1, 2, 3, 4, 5},
        &reply,
    ).Rerror()

    if rerr != nil {
        tp.Fatalf("%v", rerr)
    }
    tp.Printf("reply: %d", reply)
}

type push struct {
    tp.PushCtx
}

func (p *push) Status(args *string) *tp.Rerror {
    tp.Printf("server status: %s", *args)
    return nil
}

More Examples

Design

Keywords
  • Peer: A communication instance may be a server or a client
  • Socket: Base on the net.Conn package, add custom package protocol, transfer pipelines and other functions
  • Packet: The corresponding structure of the data package content element
  • Proto: The protocol interface of packet pack/unpack
  • Codec: Serialization interface for Packet.Body
  • XferPipe: Packet bytes encoding pipeline, such as compression, encryption, calibration and so on
  • XferFilter: A interface to handle packet data before transfer
  • Plugin: Plugins that cover all aspects of communication
  • Session: A connection session, with push, pull, reply, close and other methods of operation
  • Context: Handle the received or send packets
  • Pull-Launch: Pull data from the peer
  • Pull-Handle: Handle and reply to the pull of peer
  • Push-Launch: Push data to the peer
  • Push-Handle: Handle the push of peer
  • Router: Router that route the response handler by request information(such as a URI)
Packet

The contents of every one packet:

// in .../teleport/socket package
type (
    type Packet struct {
        // Has unexported fields.
    }
        Packet a socket data packet.
    
    func GetPacket(settings ...PacketSetting) *Packet
    func NewPacket(settings ...PacketSetting) *Packet
    func (p *Packet) Body() interface{}
    func (p *Packet) BodyCodec() byte
    func (p *Packet) Context() context.Context
    func (p *Packet) MarshalBody() ([]byte, error)
    func (p *Packet) Meta() *utils.Args
    func (p *Packet) Ptype() byte
    func (p *Packet) Reset(settings ...PacketSetting)
    func (p *Packet) Seq() string
    func (p *Packet) SetBody(body interface{})
    func (p *Packet) SetBodyCodec(bodyCodec byte)
    func (p *Packet) SetNewBody(newBodyFunc NewBodyFunc)
    func (p *Packet) SetPtype(ptype byte)
    func (p *Packet) SetSeq(seq string)
    func (p *Packet) SetSize(size uint32) error
    func (p *Packet) SetUri(uri string)
    func (p *Packet) SetUriObject(uriObject *url.URL)
    func (p *Packet) Size() uint32
    func (p *Packet) String() string
    func (p *Packet) UnmarshalBody(bodyBytes []byte) error
    func (p *Packet) Uri() string
    func (p *Packet) UriObject() *url.URL
    func (p *Packet) XferPipe() *xfer.XferPipe

    // NewBodyFunc creates a new body by header.
    NewBodyFunc func(Header) interface{}
)

// in .../teleport/xfer package
type (
    // XferPipe transfer filter pipe, handlers from outer-most to inner-most.
    // Note: the length can not be bigger than 255!
    XferPipe struct {
        filters []XferFilter
    }
    // XferFilter handles byte stream of packet when transfer.
    XferFilter interface {
        Id() byte
        OnPack([]byte) ([]byte, error)
        OnUnpack([]byte) ([]byte, error)
    }
)
Codec

The body's codec set.

type Codec interface {
    // Id returns codec id.
    Id() byte
    // Name returns codec name.
    Name() string
    // Marshal returns the encoding of v.
    Marshal(v interface{}) ([]byte, error)
    // Unmarshal parses the encoded data and stores the result
    // in the value pointed to by v.
    Unmarshal(data []byte, v interface{}) error
}
XferPipe

Transfer filter pipe, handles byte stream of packet when transfer.

type (
    // XferPipe transfer filter pipe, handlers from outer-most to inner-most.
    // Note: the length can not be bigger than 255!
    XferPipe struct {
        filters []XferFilter
    }
    // XferFilter handles byte stream of packet when transfer.
    XferFilter interface {
        Id() byte
        OnPack([]byte) ([]byte, error)
        OnUnpack([]byte) ([]byte, error)
    }
)
Plugin

Plug-ins during runtime.

type (
    // Plugin plugin background
    Plugin interface {
        Name() string
    }
    // PreNewPeerPlugin is executed before creating peer.
    PreNewPeerPlugin interface {
        Plugin
        PreNewPeer(*PeerConfig, *PluginContainer) error
    }
    ...
)
Protocol

You can customize your own communication protocol by implementing the interface:

type (
    // Proto pack/unpack protocol scheme of socket packet.
    Proto interface {
        // Version returns the protocol's id and name.
        Version() (byte, string)
        // Pack writes the Packet into the connection.
        // Note: Make sure to write only once or there will be package contamination!
        Pack(*Packet) error
        // Unpack reads bytes from the connection to the Packet.
        // Note: Concurrent unsafe!
        Unpack(*Packet) error
    }
    ProtoFunc func(io.ReadWriter) Proto
)

Next, you can specify the communication protocol in the following ways:

func SetDefaultProtoFunc(socket.ProtoFunc)
type Peer interface {
    ...
    ServeConn(conn net.Conn, protoFunc ...socket.ProtoFunc) Session
    DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
    Dial(addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
    Listen(protoFunc ...socket.ProtoFunc) error
    ...
}

Usage

Peer(server or client) Demo
// Start a server
var peer1 = tp.NewPeer(tp.PeerConfig{
    ListenAddress: "0.0.0.0:9090", // for server role
})
peer1.Listen()

...

// Start a client
var peer2 = tp.NewPeer(tp.PeerConfig{})
var sess, err = peer2.Dial("127.0.0.1:8080")
Pull-Controller-Struct API template
type Aaa struct {
    tp.PullCtx
}
func (x *Aaa) XxZz(args *<T>) (<T>, *tp.Rerror) {
    ...
    return r, nil
}
  • register it to root router:
// register the pull route: /aaa/xx_zz
peer.RoutePull(new(Aaa))

// or register the pull route: /xx_zz
peer.RoutePullFunc((*Aaa).XxZz)
Pull-Handler-Function API template
func XxZz(ctx tp.PullCtx, args *<T>) (<T>, *tp.Rerror) {
    ...
    return r, nil
}
  • register it to root router:
// register the pull route: /xx_zz
peer.RoutePullFunc(XxZz)
Push-Controller-Struct API template
type Bbb struct {
    tp.PushCtx
}
func (b *Bbb) YyZz(args *<T>) *tp.Rerror {
    ...
    return nil
}
  • register it to root router:
// register the push route: /bbb/yy_zz
peer.RoutePush(new(Bbb))

// or register the push route: /yy_zz
peer.RoutePushFunc((*Bbb).YyZz)
Push-Handler-Function API template
// YyZz register the route: /yy_zz
func YyZz(ctx tp.PushCtx, args *<T>) *tp.Rerror {
    ...
    return nil
}
  • register it to root router:
// register the push route: /yy_zz
peer.RoutePushFunc(YyZz)
Unknown-Pull-Handler-Function API template
func XxxUnknownPull (ctx tp.UnknownPullCtx) (interface{}, *tp.Rerror) {
    ...
    return r, nil
}
  • register it to root router:
// register the unknown pull route: /*
peer.SetUnknownPull(XxxUnknownPull)
Unknown-Push-Handler-Function API template
func XxxUnknownPush(ctx tp.UnknownPushCtx) *tp.Rerror {
    ...
    return nil
}
  • register it to root router:
// register the unknown push route: /*
peer.SetUnknownPush(XxxUnknownPush)
The mapping rule of struct(func) name to URI path:
  • AaBb -> /aa_bb
  • Aa_Bb -> /aa/bb
  • aa_bb -> /aa/bb
  • Aa__Bb -> /aa_bb
  • aa__bb -> /aa_bb
  • ABC_XYZ -> /abc/xyz
  • ABcXYz -> /abc_xyz
  • ABC__XYZ -> /abc_xyz
Plugin Demo
// NewIgnoreCase Returns a ignoreCase plugin.
func NewIgnoreCase() *ignoreCase {
    return &ignoreCase{}
}

type ignoreCase struct{}

var (
    _ tp.PostReadPullHeaderPlugin = new(ignoreCase)
    _ tp.PostReadPushHeaderPlugin = new(ignoreCase)
)

func (i *ignoreCase) Name() string {
    return "ignoreCase"
}

func (i *ignoreCase) PostReadPullHeader(ctx tp.ReadCtx) *tp.Rerror {
    // Dynamic transformation path is lowercase
    ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
    return nil
}

func (i *ignoreCase) PostReadPushHeader(ctx tp.ReadCtx) *tp.Rerror {
    // Dynamic transformation path is lowercase
    ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
    return nil
}
Register above handler and plugin
// add router group
group := peer.SubRoute("test")
// register to test group
group.RoutePull(new(Aaa), NewIgnoreCase())
peer.RoutePullFunc(XxZz, NewIgnoreCase())
group.RoutePush(new(Bbb))
peer.RoutePushFunc(YyZz)
peer.SetUnknownPull(XxxUnknownPull)
peer.SetUnknownPush(XxxUnknownPush)
Config
type PeerConfig struct {
    Network            string        `yaml:"network"              ini:"network"              comment:"Network; tcp, tcp4, tcp6, unix or unixpacket"`
    ListenAddress      string        `yaml:"listen_address"       ini:"listen_address"       comment:"Listen address; for server role"`
    DefaultDialTimeout time.Duration `yaml:"default_dial_timeout" ini:"default_dial_timeout" comment:"Default maximum duration for dialing; for client role; ns,µs,ms,s,m,h"`
    RedialTimes        int32         `yaml:"redial_times"         ini:"redial_times"         comment:"The maximum times of attempts to redial, after the connection has been unexpectedly broken; for client role"`
    DefaultBodyCodec   string        `yaml:"default_body_codec"   ini:"default_body_codec"   comment:"Default body codec type id"`
    DefaultSessionAge  time.Duration `yaml:"default_session_age"  ini:"default_session_age"  comment:"Default session max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
    DefaultContextAge  time.Duration `yaml:"default_context_age"  ini:"default_context_age"  comment:"Default PULL or PUSH context max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
    SlowCometDuration  time.Duration `yaml:"slow_comet_duration"  ini:"slow_comet_duration"  comment:"Slow operation alarm threshold; ns,µs,ms,s ..."`
    PrintBody          bool          `yaml:"print_body"           ini:"print_body"           comment:"Is print body or not"`
    CountTime          bool          `yaml:"count_time"           ini:"count_time"           comment:"Is count cost time or not"`
}
Optimize
  • SetPacketSizeLimit sets max packet size. If maxSize<=0, set it to max uint32.

    func SetPacketSizeLimit(maxPacketSize uint32)
    
  • SetSocketKeepAlive sets whether the operating system should send keepalive messages on the connection.

    func SetSocketKeepAlive(keepalive bool)
    
  • SetSocketKeepAlivePeriod sets period between keep alives.

    func SetSocketKeepAlivePeriod(d time.Duration)
    
  • SetSocketNoDelay controls whether the operating system should delay packet transmission in hopes of sending fewer packets (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.

    func SetSocketNoDelay(_noDelay bool)
    
  • SetSocketReadBuffer sets the size of the operating system's receive buffer associated with the connection.

    func SetSocketReadBuffer(bytes int)
    
  • SetSocketWriteBuffer sets the size of the operating system's transmit buffer associated with the connection.

    func SetSocketWriteBuffer(bytes int)
    

Extensions

Codec
package import description
json import "github.com/henrylee2cn/teleport/codec" JSON codec(teleport own)
protobuf import "github.com/henrylee2cn/teleport/codec" Protobuf codec(teleport own)
plain import "github.com/henrylee2cn/teleport/codec" Plain text codec(teleport own)
form import "github.com/henrylee2cn/teleport/codec" Form(url encode) codec(teleport own)
Plugin
package import description
auth import "github.com/henrylee2cn/teleport/plugin" A auth plugin for verifying peer at the first time
binder import binder "github.com/henrylee2cn/tp-ext/plugin-binder" Parameter Binding Verification for Struct Handler
heartbeat import heartbeat "github.com/henrylee2cn/tp-ext/plugin-heartbeat" A generic timing heartbeat plugin
proxy import "github.com/henrylee2cn/teleport/plugin" A proxy plugin for handling unknown pulling or pushing
encrypt import encrypt "github.com/henrylee2cn/tp-ext/plugin-encrypt" Encrypting the packet body
Protocol
package import description
fastproto import "github.com/henrylee2cn/teleport/socket A fast socket communication protocol(teleport default protocol)
jsonproto import jsonproto "github.com/henrylee2cn/tp-ext/proto-jsonproto" A JSON socket communication protocol
pbproto import pbproto "github.com/henrylee2cn/tp-ext/proto-pbproto" A Protobuf socket communication protocol
Transfer-Filter
package import description
gzip import "github.com/henrylee2cn/teleport/xfer" Gzip(teleport own)
md5Hash import md5Hash "github.com/henrylee2cn/tp-ext/xfer-md5Hash" Provides a integrity check transfer filter
Module
package import description
cliSession import cliSession "github.com/henrylee2cn/tp-ext/mod-cliSession" Client session with a high efficient and load balanced connection pool
websocket import websocket "github.com/henrylee2cn/tp-ext/mod-websocket" Makes the Teleport framework compatible with websocket protocol as specified in RFC 6455

Extensions Repository

Projects based on Teleport

project description
TP-Micro TP-Micro is a simple, powerful micro service framework based on Teleport
Ants Ants is a highly available micro service platform based on TP-Micro and Teleport
Pholcus Pholcus is a distributed, high concurrency and powerful web crawler software

Business Users

深圳市梦之舵信息技术有限公司    北京风行在线技术有限公司    北京可即时代网络公司

License

Teleport is under Apache v2 License. See the LICENSE file for the full license text

Documentation

Overview

Package tp (teleport) is a versatile, high-performance and flexible TCP socket framework. It can be used for peer-peer, rpc, gateway, micro services, push services, game services and so on.

Copyright 2015-2018 HenryLee. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	TypeUndefined byte = 0
	TypePull      byte = 1
	TypeReply     byte = 2 // reply to pull
	TypePush      byte = 3
)

Packet types

View Source
const (
	CodeUnknownError        = -1
	CodeConnClosed          = 102
	CodeWriteFailed         = 104
	CodeDialFailed          = 105
	CodeBadPacket           = 400
	CodeUnauthorized        = 401
	CodeNotFound            = 404
	CodePtypeNotAllowed     = 405
	CodeHandleTimeout       = 408
	CodeInternalServerError = 500
	CodeBadGateway          = 502
)

Internal Framework Rerror code. Note: Recommended custom code is greater than 1000.

View Source
const (
	// MetaRerror reply error metadata key
	MetaRerror = "X-Reply-Error"
	// MetaRealIp real IP metadata key
	MetaRealIp = "X-Real-IP"
	// MetaAcceptBodyCodec the key of body codec that the sender wishes to accept
	MetaAcceptBodyCodec = "X-Accept-Body-Codec"
)

Variables

View Source
var (
	// FirstSweep is first executed.
	// Usage: share github.com/henrylee2cn/goutil/graceful with other project.
	FirstSweep func() error
	// BeforeExiting is executed before process exiting.
	// Usage: share github.com/henrylee2cn/goutil/graceful with other project.
	BeforeExiting func() error
)
View Source
var DefaultProtoFunc = socket.DefaultProtoFunc

DefaultProtoFunc gets the default builder of socket communication protocol

func DefaultProtoFunc() socket.ProtoFunc
View Source
var ErrListenClosed = errors.New("listener is closed")

ErrListenClosed listener is closed error.

View Source
var GetPacket = socket.GetPacket

GetPacket gets a *Packet form packet stack. Note:

newBodyFunc is only for reading form connection;
settings are only for writing to connection.
func GetPacket(settings ...socket.PacketSetting) *socket.Packet
View Source
var GetReadLimit = socket.PacketSizeLimit

GetReadLimit gets the packet size upper limit of reading.

GetReadLimit() uint32
View Source
var PutPacket = socket.PutPacket

PutPacket puts a *socket.Packet to packet stack.

func PutPacket(p *socket.Packet)
View Source
var SetDefaultProtoFunc = socket.SetDefaultProtoFunc

SetDefaultProtoFunc sets the default builder of socket communication protocol

func SetDefaultProtoFunc(protoFunc socket.ProtoFunc)

SetReadLimit sets max packet size. If maxSize<=0, set it to max uint32.

func SetReadLimit(maxPacketSize uint32)
View Source
var SetSocketKeepAlive = socket.SetKeepAlive

SetSocketKeepAlive sets whether the operating system should send keepalive messages on the connection. Note: If have not called the function, the system defaults are used.

func SetSocketKeepAlive(keepalive bool)
View Source
var SetSocketKeepAlivePeriod = socket.SetKeepAlivePeriod

SetSocketKeepAlivePeriod sets period between keep alives. Note: if d<0, don't change the value.

func SetSocketKeepAlivePeriod(d time.Duration)
View Source
var SetSocketNoDelay = socket.SetNoDelay

SetSocketNoDelay controls whether the operating system should delay packet transmission in hopes of sending fewer packets (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.

func SetSocketNoDelay(noDelay bool)
View Source
var SetSocketReadBuffer = socket.SetReadBuffer

SetSocketReadBuffer sets the size of the operating system's receive buffer associated with the connection. Note: if bytes<0, don't change the value.

func SetSocketReadBuffer(bytes int)
View Source
var SetSocketWriteBuffer = socket.SetWriteBuffer

SetSocketWriteBuffer sets the size of the operating system's transmit buffer associated with the connection. Note: if bytes<0, don't change the value.

func SetSocketWriteBuffer(bytes int)
View Source
var SocketReadBuffer = socket.ReadBuffer

SocketReadBuffer returns the size of the operating system's receive buffer associated with the connection. Note: if using the system default value, bytes=-1 and isDefault=true.

func SocketReadBuffer() (bytes int, isDefault bool)
View Source
var SocketWriteBuffer = socket.WriteBuffer

SocketWriteBuffer returns the size of the operating system's transmit buffer associated with the connection. Note: if using the system default value, bytes=-1 and isDefault=true.

func SocketWriteBuffer() (bytes int, isDefault bool)
View Source
var WithAddMeta = socket.WithAddMeta

WithAddMeta adds 'key=value' metadata argument. Multiple values for the same key may be added.

func WithAddMeta(key, value string) socket.PacketSetting
View Source
var WithBody = socket.WithBody

WithBody sets the body object.

func WithBody(body interface{}) socket.PacketSetting
View Source
var WithBodyCodec = socket.WithBodyCodec

WithBodyCodec sets the body codec.

func WithBodyCodec(bodyCodec byte) socket.PacketSetting
View Source
var WithContext = socket.WithContext

WithContext sets the packet handling context.

func WithContext(ctx context.Context) socket.PacketSetting
View Source
var WithNewBody = socket.WithNewBody

WithNewBody resets the function of geting body.

func WithNewBody(newBodyFunc socket.NewBodyFunc) socket.PacketSetting
View Source
var WithPtype = socket.WithPtype

WithPtype sets the packet type.

func WithPtype(ptype byte) socket.PacketSetting
View Source
var WithQuery = socket.WithQuery

WithQuery sets the packet URI query parameter.

func WithQuery(key, value string) socket.PacketSetting
View Source
var WithSeq = socket.WithSeq

WithSeq sets the packet sequence.

func WithSeq(seq uint64) socket.PacketSetting
View Source
var WithSetMeta = socket.WithSetMeta

WithSetMeta sets 'key=value' metadata argument.

func WithSetMeta(key, value string) socket.PacketSetting
View Source
var WithUri = socket.WithUri

WithUri sets the packet URI string.

func WithUri(uri string) socket.PacketSetting
View Source
var WithUriObject = socket.WithUriObject

WithUriObject sets the packet URI object.

func WithUriObject(uriObject *url.URL) socket.PacketSetting
View Source
var WithXferPipe = socket.WithXferPipe

WithXferPipe sets transfer filter pipe.

func WithXferPipe(filterId ...byte) socket.PacketSetting

Functions

func AnywayGo

func AnywayGo(fn func())

AnywayGo similar to go func, but concurrent resources are limited.

func CodeText

func CodeText(rerrCode int32) string

CodeText returns the reply error code text. If the type is undefined returns 'Unknown Error'.

func Criticalf

func Criticalf(format string, args ...interface{})

Criticalf logs a message using CRITICAL as log level.

func Debugf

func Debugf(format string, args ...interface{})

Debugf logs a message using DEBUG as log level.

func Errorf

func Errorf(format string, args ...interface{})

Errorf logs a message using ERROR as log level.

func Fatalf

func Fatalf(format string, args ...interface{})

Fatalf is equivalent to l.Criticalf followed by a call to os.Exit(1).

func GetAcceptBodyCodec

func GetAcceptBodyCodec(meta *utils.Args) (byte, bool)

GetAcceptBodyCodec gets the body codec that the sender wishes to accept. Note: If the specified codec is invalid, the receiver will ignore the mate data.

func Go

func Go(fn func()) bool

Go similar to go func, but return false if insufficient resources.

func GraceSignal

func GraceSignal()

GraceSignal open graceful shutdown or reboot signal.

func Infof

func Infof(format string, args ...interface{})

Infof logs a message using INFO as log level.

func IsConnRerror

func IsConnRerror(rerr *Rerror) bool

IsConnRerror determines whether the error is a connection error

func NewInheritListener

func NewInheritListener(network, laddr string, tlsConfig *tls.Config) (net.Listener, error)

NewInheritListener creates a new listener that can be inherited on reboot.

func NewTlsConfigFromFile

func NewTlsConfigFromFile(tlsCertFile, tlsKeyFile string) (*tls.Config, error)

NewTlsConfigFromFile creates a new TLS config.

func Noticef

func Noticef(format string, args ...interface{})

Noticef logs a message using NOTICE as log level.

func Panicf

func Panicf(format string, args ...interface{})

Panicf is equivalent to l.Criticalf followed by a call to panic().

func Printf

func Printf(format string, args ...interface{})

Printf formats according to a format specifier and writes to standard output. It returns the number of bytes written and any write error encountered.

func Reboot

func Reboot(timeout ...time.Duration)

Reboot all the frame process gracefully. Notes: Windows system are not supported!

func SetGopool

func SetGopool(maxGoroutinesAmount int, maxGoroutineIdleDuration time.Duration)

SetGopool set or reset go pool config. Note: Make sure to call it before calling NewPeer() and Go()

func SetLogger

func SetLogger(logger Logger)

SetLogger sets global logger. Note: Concurrent is not safe!

func SetLoggerLevel

func SetLoggerLevel(level string)

SetLoggerLevel sets the logger's level.

func SetShutdown

func SetShutdown(timeout time.Duration, firstSweep, beforeExiting func() error)

SetShutdown sets the function which is called after the process shutdown, and the time-out period for the process shutdown. If 0<=timeout<5s, automatically use 'MinShutdownTimeout'(5s). If timeout<0, indefinite period. 'firstSweep' is first executed. 'beforeExiting' is executed before process exiting.

func Shutdown

func Shutdown(timeout ...time.Duration)

Shutdown closes all the frame process gracefully. Parameter timeout is used to reset time-out period for the process shutdown.

func ToUriPath

func ToUriPath(name string) string

ToUriPath maps struct(func) name to URI path.

func Tracef

func Tracef(format string, args ...interface{})

Tracef logs a message using TRACE as log level.

func TypeText

func TypeText(typ byte) string

TypeText returns the packet type text. If the type is undefined returns 'Undefined'.

func Warnf

func Warnf(format string, args ...interface{})

Warnf logs a message using WARNING as log level.

func WithAcceptBodyCodec

func WithAcceptBodyCodec(bodyCodec byte) socket.PacketSetting

WithAcceptBodyCodec sets the body codec that the sender wishes to accept. Note: If the specified codec is invalid, the receiver will ignore the mate data.

func WithRealIp

func WithRealIp(ip string) socket.PacketSetting

WithRealIp sets the real IP to metadata.

func WithRerror

func WithRerror(rerr *Rerror) socket.PacketSetting

WithRerror sets the real IP to metadata.

Types

type BasePeer

type BasePeer interface {
	// Close closes peer.
	Close() (err error)
	// CountSession returns the number of sessions.
	CountSession() int
	// GetSession gets the session by id.
	GetSession(sessionId string) (Session, bool)
	// RangeSession ranges all sessions. If fn returns false, stop traversing.
	RangeSession(fn func(sess Session) bool)
	// SetTlsConfig sets the TLS config.
	SetTlsConfig(tlsConfig *tls.Config)
	// SetTlsConfigFromFile sets the TLS config from file.
	SetTlsConfigFromFile(tlsCertFile, tlsKeyFile string) error
	// TlsConfig returns the TLS config.
	TlsConfig() *tls.Config
}

BasePeer peer with the common method set

type BaseSession

type BaseSession interface {
	// Id returns the session id.
	Id() string
	// Peer returns the peer.
	Peer() Peer
	// LocalAddr returns the local network address.
	LocalAddr() net.Addr
	// RemoteAddr returns the remote network address.
	RemoteAddr() net.Addr
	// Swap returns custom data swap of the session(socket).
	Swap() goutil.Map
}

BaseSession a connection session with the common method set.

type EarlyPeer

type EarlyPeer interface {
	BasePeer
	// Router returns the root router of pull or push handlers.
	Router() *Router
	// SubRoute adds handler group.
	SubRoute(pathPrefix string, plugin ...Plugin) *SubRouter
	// RoutePull registers PULL handlers, and returns the paths.
	RoutePull(ctrlStruct interface{}, plugin ...Plugin) []string
	// RoutePullFunc registers PULL handler, and returns the path.
	RoutePullFunc(pullHandleFunc interface{}, plugin ...Plugin) string
	// RoutePush registers PUSH handlers, and returns the paths.
	RoutePush(ctrlStruct interface{}, plugin ...Plugin) []string
	// RoutePushFunc registers PUSH handler, and returns the path.
	RoutePushFunc(pushHandleFunc interface{}, plugin ...Plugin) string
	// SetUnknownPull sets the default handler, which is called when no handler for PULL is found.
	SetUnknownPull(fn func(UnknownPullCtx) (interface{}, *Rerror), plugin ...Plugin)
	// SetUnknownPush sets the default handler, which is called when no handler for PUSH is found.
	SetUnknownPush(fn func(UnknownPushCtx) *Rerror, plugin ...Plugin)
}

EarlyPeer the communication peer that has just been created

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler pull or push handler type info

func (*Handler) ArgElemType

func (h *Handler) ArgElemType() reflect.Type

ArgElemType returns the handler arg elem type.

func (*Handler) IsPull

func (h *Handler) IsPull() bool

IsPull checks if it is pull handler or not.

func (*Handler) IsPush

func (h *Handler) IsPush() bool

IsPush checks if it is push handler or not.

func (*Handler) IsUnknown

func (h *Handler) IsUnknown() bool

IsUnknown checks if it is unknown handler(pull/push) or not.

func (*Handler) Name

func (h *Handler) Name() string

Name returns the handler name.

func (*Handler) NewArgValue

func (h *Handler) NewArgValue() reflect.Value

NewArgValue creates a new arg elem value.

func (*Handler) ReplyType

func (h *Handler) ReplyType() reflect.Type

ReplyType returns the handler reply type

func (*Handler) RouterTypeName

func (h *Handler) RouterTypeName() string

RouterTypeName returns the router type name.

type HandlersMaker

type HandlersMaker func(string, interface{}, *PluginContainer) ([]*Handler, error)

HandlersMaker makes []*Handler

type Logger

type Logger interface {
	// Level returns the logger's level.
	Level() string
	// SetLevel sets the logger's level.
	SetLevel(level string)
	// Printf formats according to a format specifier and writes to standard output.
	// It returns the number of bytes written and any write error encountered.
	Printf(format string, args ...interface{})
	// Fatalf is equivalent to Criticalf followed by a call to os.Exit(1).
	Fatalf(format string, args ...interface{})
	// Panicf is equivalent to Criticalf followed by a call to panic().
	Panicf(format string, args ...interface{})
	// Criticalf logs a message using CRITICAL as log level.
	Criticalf(format string, args ...interface{})
	// Errorf logs a message using ERROR as log level.
	Errorf(format string, args ...interface{})
	// Warnf logs a message using WARNING as log level.
	Warnf(format string, args ...interface{})
	// Noticef logs a message using NOTICE as log level.
	Noticef(format string, args ...interface{})
	// Infof logs a message using INFO as log level.
	Infof(format string, args ...interface{})
	// Debugf logs a message using DEBUG as log level.
	Debugf(format string, args ...interface{})
	// Tracef logs a message using TRACE as log level.
	Tracef(format string, args ...interface{})
}

Logger interface

type Peer

type Peer interface {
	EarlyPeer
	// ListenAndServe turns on the listening service.
	ListenAndServe(protoFunc ...socket.ProtoFunc) error
	// Dial connects with the peer of the destination address.
	Dial(addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
	// DialContext connects with the peer of the destination address, using the provided context.
	DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
	// ServeConn serves the connection and returns a session.
	// Note: Not support automatically redials after disconnection.
	ServeConn(conn net.Conn, protoFunc ...socket.ProtoFunc) (Session, error)
	// ServeListener serves the listener.
	// Note: The caller ensures that the listener supports graceful shutdown.
	ServeListener(lis net.Listener, protoFunc ...socket.ProtoFunc) error
}

Peer the communication peer which is server or client role

func NewPeer

func NewPeer(cfg PeerConfig, plugin ...Plugin) Peer

NewPeer creates a new peer.

type PeerConfig

type PeerConfig struct {
	Network            string        `yaml:"network"              ini:"network"              comment:"Network; tcp, tcp4, tcp6, unix or unixpacket"`
	ListenAddress      string        `yaml:"listen_address"       ini:"listen_address"       comment:"Listen address; for server role"`
	DefaultDialTimeout time.Duration `` /* 135-byte string literal not displayed */
	RedialTimes        int32         `` /* 172-byte string literal not displayed */
	DefaultBodyCodec   string        `yaml:"default_body_codec"   ini:"default_body_codec"   comment:"Default body codec type id"`
	DefaultSessionAge  time.Duration `` /* 148-byte string literal not displayed */
	DefaultContextAge  time.Duration `` /* 161-byte string literal not displayed */
	SlowCometDuration  time.Duration `yaml:"slow_comet_duration"  ini:"slow_comet_duration"  comment:"Slow operation alarm threshold; ns,µs,ms,s ..."`
	PrintBody          bool          `yaml:"print_body"           ini:"print_body"           comment:"Is print body or not"`
	CountTime          bool          `yaml:"count_time"           ini:"count_time"           comment:"Is count cost time or not"`
	// contains filtered or unexported fields
}

PeerConfig peer config Note:

yaml tag is used for github.com/henrylee2cn/cfgo
ini tag is used for github.com/henrylee2cn/ini

func (*PeerConfig) Reload

func (p *PeerConfig) Reload(bind cfgo.BindFunc) error

Reload Bi-directionally synchronizes config between YAML file and memory.

type Plugin

type Plugin interface {
	Name() string
}

Plugin plugin background

type PluginContainer

type PluginContainer struct {
	// contains filtered or unexported fields
}

PluginContainer plugins container.

func (*PluginContainer) AppendLeft

func (p *PluginContainer) AppendLeft(plugins ...Plugin)

AppendLeft appends plugins on the left side of the pluginContainer.

func (*PluginContainer) AppendRight

func (p *PluginContainer) AppendRight(plugins ...Plugin)

AppendRight appends plugins on the right side of the pluginContainer.

func (*PluginContainer) GetAll

func (p *PluginContainer) GetAll() []Plugin

GetAll returns all activated plugins.

func (*PluginContainer) GetByName

func (p *PluginContainer) GetByName(pluginName string) Plugin

GetByName returns a plugin instance by it's name.

func (*PluginContainer) PostAccept

func (p *PluginContainer) PostAccept(sess PreSession) *Rerror

PostAccept executes the defined plugins after accepting connection.

func (*PluginContainer) PostDial

func (p *PluginContainer) PostDial(sess PreSession) *Rerror

PostDial executes the defined plugins after dialing.

func (*PluginContainer) PostDisconnect

func (p *PluginContainer) PostDisconnect(sess BaseSession) *Rerror

PostDisconnect executes the defined plugins after disconnection.

func (*PluginContainer) PostListen

func (p *PluginContainer) PostListen(addr net.Addr)

PostListen is executed between listening and accepting.

func (*PluginContainer) PostNewPeer

func (p *PluginContainer) PostNewPeer(peer EarlyPeer)

PostNewPeer executes the defined plugins after creating peer.

func (*PluginContainer) PostReadPullBody

func (p *PluginContainer) PostReadPullBody(ctx ReadCtx) *Rerror

PostReadPullBody executes the defined plugins after reading PULL packet body.

func (*PluginContainer) PostReadPullHeader

func (p *PluginContainer) PostReadPullHeader(ctx ReadCtx) *Rerror

PostReadPullHeader executes the defined plugins after reading PULL packet header.

func (*PluginContainer) PostReadPushBody

func (p *PluginContainer) PostReadPushBody(ctx ReadCtx) *Rerror

PostReadPushBody executes the defined plugins after reading PUSH packet body.

func (*PluginContainer) PostReadPushHeader

func (p *PluginContainer) PostReadPushHeader(ctx ReadCtx) *Rerror

PostReadPushHeader executes the defined plugins after reading PUSH packet header.

func (*PluginContainer) PostReadReplyBody

func (p *PluginContainer) PostReadReplyBody(ctx ReadCtx) *Rerror

PostReadReplyBody executes the defined plugins after reading REPLY packet body.

func (*PluginContainer) PostReadReplyHeader

func (p *PluginContainer) PostReadReplyHeader(ctx ReadCtx) *Rerror

PostReadReplyHeader executes the defined plugins after reading REPLY packet header.

func (*PluginContainer) PostReg

func (p *PluginContainer) PostReg(h *Handler)

PostReg executes the defined plugins before registering handler.

func (*PluginContainer) PostWritePull

func (p *PluginContainer) PostWritePull(ctx WriteCtx) *Rerror

PostWritePull executes the defined plugins after successful writing PULL packet.

func (*PluginContainer) PostWritePush

func (p *PluginContainer) PostWritePush(ctx WriteCtx) *Rerror

PostWritePush executes the defined plugins after successful writing PUSH packet.

func (*PluginContainer) PostWriteReply

func (p *PluginContainer) PostWriteReply(ctx WriteCtx)

PostWriteReply executes the defined plugins after successful writing REPLY packet.

func (*PluginContainer) PreNewPeer

func (p *PluginContainer) PreNewPeer(peerConfig *PeerConfig)

PreNewPeer executes the defined plugins before creating peer.

func (*PluginContainer) PreReadHeader

func (p *PluginContainer) PreReadHeader(ctx PreCtx) error

PreReadHeader executes the defined plugins before reading packet header.

func (*PluginContainer) PreReadPullBody

func (p *PluginContainer) PreReadPullBody(ctx ReadCtx) *Rerror

PreReadPullBody executes the defined plugins before reading PULL packet body.

func (*PluginContainer) PreReadPushBody

func (p *PluginContainer) PreReadPushBody(ctx ReadCtx) *Rerror

PreReadPushBody executes the defined plugins before reading PUSH packet body.

func (*PluginContainer) PreReadReplyBody

func (p *PluginContainer) PreReadReplyBody(ctx ReadCtx) *Rerror

PreReadReplyBody executes the defined plugins before reading REPLY packet body.

func (*PluginContainer) PreWritePull

func (p *PluginContainer) PreWritePull(ctx WriteCtx) *Rerror

PreWritePull executes the defined plugins before writing PULL packet.

func (*PluginContainer) PreWritePush

func (p *PluginContainer) PreWritePush(ctx WriteCtx) *Rerror

PreWritePush executes the defined plugins before writing PUSH packet.

func (*PluginContainer) PreWriteReply

func (p *PluginContainer) PreWriteReply(ctx WriteCtx)

PreWriteReply executes the defined plugins before writing REPLY packet.

func (*PluginContainer) Remove

func (p *PluginContainer) Remove(pluginName string) error

Remove removes a plugin by it's name.

type PostAcceptPlugin

type PostAcceptPlugin interface {
	PostAccept(PreSession) *Rerror
}

PostAcceptPlugin is executed after accepting connection.

type PostDialPlugin

type PostDialPlugin interface {
	PostDial(PreSession) *Rerror
}

PostDialPlugin is executed after dialing.

type PostDisconnectPlugin

type PostDisconnectPlugin interface {
	PostDisconnect(BaseSession) *Rerror
}

PostDisconnectPlugin is executed after disconnection.

type PostListenPlugin

type PostListenPlugin interface {
	PostListen() error
}

PostListenPlugin is executed between listening and accepting.

type PostNewPeerPlugin

type PostNewPeerPlugin interface {
	PostNewPeer(EarlyPeer) error
}

PostNewPeerPlugin is executed after creating peer.

type PostReadPullBodyPlugin

type PostReadPullBodyPlugin interface {
	PostReadPullBody(ReadCtx) *Rerror
}

PostReadPullBodyPlugin is executed after reading PULL packet body.

type PostReadPullHeaderPlugin

type PostReadPullHeaderPlugin interface {
	PostReadPullHeader(ReadCtx) *Rerror
}

PostReadPullHeaderPlugin is executed after reading PULL packet header.

type PostReadPushBodyPlugin

type PostReadPushBodyPlugin interface {
	PostReadPushBody(ReadCtx) *Rerror
}

PostReadPushBodyPlugin is executed after reading PUSH packet body.

type PostReadPushHeaderPlugin

type PostReadPushHeaderPlugin interface {
	PostReadPushHeader(ReadCtx) *Rerror
}

PostReadPushHeaderPlugin is executed after reading PUSH packet header.

type PostReadReplyBodyPlugin

type PostReadReplyBodyPlugin interface {
	PostReadReplyBody(ReadCtx) *Rerror
}

PostReadReplyBodyPlugin is executed after reading REPLY packet body.

type PostReadReplyHeaderPlugin

type PostReadReplyHeaderPlugin interface {
	PostReadReplyHeader(ReadCtx) *Rerror
}

PostReadReplyHeaderPlugin is executed after reading REPLY packet header.

type PostRegPlugin

type PostRegPlugin interface {
	PostReg(*Handler) error
}

PostRegPlugin is executed after registering handler.

type PostWritePullPlugin

type PostWritePullPlugin interface {
	PostWritePull(WriteCtx) *Rerror
}

PostWritePullPlugin is executed after successful writing PULL packet.

type PostWritePushPlugin

type PostWritePushPlugin interface {
	PostWritePush(WriteCtx) *Rerror
}

PostWritePushPlugin is executed after successful writing PUSH packet.

type PostWriteReplyPlugin

type PostWriteReplyPlugin interface {
	PostWriteReply(WriteCtx) *Rerror
}

PostWriteReplyPlugin is executed after successful writing REPLY packet.

type PreCtx

type PreCtx interface {
	// Peer returns the peer.
	Peer() Peer
	// Session returns the session.
	Session() Session
	// Ip returns the remote addr.
	Ip() string
	// RealIp returns the the current real remote addr.
	RealIp() string
	// Swap returns custom data swap of context.
	Swap() goutil.Map
	// Context carries a deadline, a cancelation signal, and other values across
	// API boundaries.
	Context() context.Context
}

PreCtx context method set used before reading packet header.

type PreNewPeerPlugin

type PreNewPeerPlugin interface {
	PreNewPeer(*PeerConfig, *PluginContainer) error
}

PreNewPeerPlugin is executed before creating peer.

type PreReadHeaderPlugin

type PreReadHeaderPlugin interface {
	PreReadHeader(PreCtx) error
}

PreReadHeaderPlugin is executed before reading packet header.

type PreReadPullBodyPlugin

type PreReadPullBodyPlugin interface {
	PreReadPullBody(ReadCtx) *Rerror
}

PreReadPullBodyPlugin is executed before reading PULL packet body.

type PreReadPushBodyPlugin

type PreReadPushBodyPlugin interface {
	PreReadPushBody(ReadCtx) *Rerror
}

PreReadPushBodyPlugin is executed before reading PUSH packet body.

type PreReadReplyBodyPlugin

type PreReadReplyBodyPlugin interface {
	PreReadReplyBody(ReadCtx) *Rerror
}

PreReadReplyBodyPlugin is executed before reading REPLY packet body.

type PreSession

type PreSession interface {
	// Peer returns the peer.
	Peer() Peer
	// LocalAddr returns the local network address.
	LocalAddr() net.Addr
	// RemoteAddr returns the remote network address.
	RemoteAddr() net.Addr
	// Swap returns custom data swap of the session(socket).
	Swap() goutil.Map
	// SetId sets the session id.
	SetId(newId string)
	// ControlFD invokes f on the underlying connection's file
	// descriptor or handle.
	// The file descriptor fd is guaranteed to remain valid while
	// f executes but not after f returns.
	ControlFD(f func(fd uintptr)) error
	// ModifySocket modifies the socket.
	// Note:
	// The connection fd is not allowed to change!
	// Inherit the previous session id and custom data swap;
	// If modifiedConn!=nil, reset the net.Conn of the socket;
	// If newProtoFunc!=nil, reset the socket.ProtoFunc of the socket.
	ModifySocket(fn func(conn net.Conn) (modifiedConn net.Conn, newProtoFunc socket.ProtoFunc))
	// GetProtoFunc returns the socket.ProtoFunc
	GetProtoFunc() socket.ProtoFunc
	// Send sends packet to peer, before the formal connection.
	// Note:
	// the external setting seq is invalid, the internal will be forced to set;
	// does not support automatic redial after disconnection.
	Send(uri string, body interface{}, rerr *Rerror, setting ...socket.PacketSetting) *Rerror
	// Receive receives a packet from peer, before the formal connection.
	// Note: does not support automatic redial after disconnection.
	Receive(socket.NewBodyFunc, ...socket.PacketSetting) (*socket.Packet, *Rerror)
	// SessionAge returns the session max age.
	SessionAge() time.Duration
	// ContextAge returns PULL or PUSH context max age.
	ContextAge() time.Duration
	// SetSessionAge sets the session max age.
	SetSessionAge(duration time.Duration)
	// SetContextAge sets PULL or PUSH context max age.
	SetContextAge(duration time.Duration)
}

PreSession a connection session that has not started reading goroutine.

type PreWritePullPlugin

type PreWritePullPlugin interface {
	PreWritePull(WriteCtx) *Rerror
}

PreWritePullPlugin is executed before writing PULL packet.

type PreWritePushPlugin

type PreWritePushPlugin interface {
	PreWritePush(WriteCtx) *Rerror
}

PreWritePushPlugin is executed before writing PUSH packet.

type PreWriteReplyPlugin

type PreWriteReplyPlugin interface {
	PreWriteReply(WriteCtx) *Rerror
}

PreWriteReplyPlugin is executed before writing REPLY packet.

type PullCmd

type PullCmd interface {
	// Context carries a deadline, a cancelation signal, and other values across
	// API boundaries.
	Context() context.Context
	// Output returns writed packet.
	Output() *socket.Packet
	// Rerror returns the pull error.
	Rerror() *Rerror
	// Done returns the chan that indicates whether it has been completed.
	Done() <-chan struct{}
	// Result returns the pull result.
	// Notes:
	//  Inside, <-Done() is automatically called and blocked,
	//  until the pull is completed!
	Result() (interface{}, *Rerror)
	// InputBodyCodec gets the body codec type of the input packet.
	// Notes:
	//  Inside, <-Done() is automatically called and blocked,
	//  until the pull is completed!
	InputBodyCodec() byte
	// InputMeta returns the header metadata of input packet.
	// Notes:
	//  Inside, <-Done() is automatically called and blocked,
	//  until the pull is completed!
	InputMeta() *utils.Args
	// CostTime returns the pulled cost time.
	// If PeerConfig.CountTime=false, always returns 0.
	// Notes:
	//  Inside, <-Done() is automatically called and blocked,
	//  until the pull is completed!
	CostTime() time.Duration
}

PullCmd the command of the pulling operation's response.

func NewFakePullCmd

func NewFakePullCmd(uri string, args, reply interface{}, rerr *Rerror) PullCmd

NewFakePullCmd creates a fake PullCmd.

type PullCtx

type PullCtx interface {

	// Input returns readed packet.
	Input() *socket.Packet
	// GetBodyCodec gets the body codec type of the input packet.
	GetBodyCodec() byte
	// Output returns writed packet.
	Output() *socket.Packet
	// SetBodyCodec sets the body codec for reply packet.
	SetBodyCodec(byte)
	// AddMeta adds the header metadata 'key=value' for reply packet.
	// Multiple values for the same key may be added.
	AddMeta(key, value string)
	// SetMeta sets the header metadata 'key=value' for reply packet.
	SetMeta(key, value string)
	// AddXferPipe appends transfer filter pipe of reply packet.
	AddXferPipe(filterId ...byte)
	// contains filtered or unexported methods
}

PullCtx context method set for handling the pulled packet. For example:

type HomePull struct{ PullCtx }

type PushCtx

type PushCtx interface {

	// GetBodyCodec gets the body codec type of the input packet.
	GetBodyCodec() byte
	// contains filtered or unexported methods
}

PushCtx context method set for handling the pushed packet. For example:

type HomePush struct{ PushCtx }

type ReadCtx

type ReadCtx interface {

	// Input returns readed packet.
	Input() *socket.Packet
	// Rerror returns the handle error.
	Rerror() *Rerror
	// contains filtered or unexported methods
}

ReadCtx context method set for reading packet.

type Rerror

type Rerror struct {
	// Code error code
	Code int32
	// Message error message to the user (optional)
	Message string
	// Detail error's detailed reason (optional)
	Detail string
}

Rerror error only for reply packet

func NewRerror

func NewRerror(code int32, message, detail string) *Rerror

NewRerror creates a *Rerror.

func NewRerrorFromMeta

func NewRerrorFromMeta(meta *utils.Args) *Rerror

NewRerrorFromMeta creates a *Rerror from 'X-Reply-Error' metadata. Return nil if there is no 'X-Reply-Error' in metadata.

func ToRerror

func ToRerror(err error) *Rerror

ToRerror converts error to *Rerror

func (Rerror) Copy

func (r Rerror) Copy() *Rerror

Copy returns the copy of Rerror

func (*Rerror) MarshalJSON

func (r *Rerror) MarshalJSON() ([]byte, error)

MarshalJSON marshals Rerror into JSON, implements json.Marshaler interface.

func (*Rerror) SetDetail

func (r *Rerror) SetDetail(detail string) *Rerror

SetDetail sets the detail field.

func (*Rerror) SetMessage

func (r *Rerror) SetMessage(message string) *Rerror

SetMessage sets the message field.

func (*Rerror) SetToMeta

func (r *Rerror) SetToMeta(meta *utils.Args)

SetToMeta sets self to 'X-Reply-Error' metadata.

func (*Rerror) String

func (r *Rerror) String() string

String prints error info.

func (*Rerror) ToError

func (r *Rerror) ToError() error

ToError converts to error

func (*Rerror) UnmarshalJSON

func (r *Rerror) UnmarshalJSON(b []byte) error

UnmarshalJSON unmarshals a JSON description of self.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router the router of pull or push handlers.

func (*Router) RoutePull

func (r *Router) RoutePull(pullCtrlStruct interface{}, plugin ...Plugin) []string

RoutePull registers PULL handlers, and returns the paths.

func (*Router) RoutePullFunc

func (r *Router) RoutePullFunc(pullHandleFunc interface{}, plugin ...Plugin) string

RoutePullFunc registers PULL handler, and returns the path.

func (*Router) RoutePush

func (r *Router) RoutePush(pushCtrlStruct interface{}, plugin ...Plugin) []string

RoutePush registers PUSH handlers, and returns the paths.

func (*Router) RoutePushFunc

func (r *Router) RoutePushFunc(pushHandleFunc interface{}, plugin ...Plugin) string

RoutePushFunc registers PUSH handler, and returns the path.

func (*Router) SetUnknownPull

func (r *Router) SetUnknownPull(fn func(UnknownPullCtx) (interface{}, *Rerror), plugin ...Plugin)

SetUnknownPull sets the default handler, which is called when no handler for PULL is found.

func (*Router) SetUnknownPush

func (r *Router) SetUnknownPush(fn func(UnknownPushCtx) *Rerror, plugin ...Plugin)

SetUnknownPush sets the default handler, which is called when no handler for PUSH is found.

func (*Router) SubRoute

func (r *Router) SubRoute(pathPrefix string, plugin ...Plugin) *SubRouter

SubRoute adds handler group.

type Session

type Session interface {
	BaseSession
	// SetId sets the session id.
	SetId(newId string)
	// Close closes the session.
	Close() error
	// Health checks if the session is usable.
	Health() bool
	// AsyncPull sends a packet and receives reply asynchronously.
	// If the args is []byte or *[]byte type, it can automatically fill in the body codec name.
	AsyncPull(
		uri string,
		args interface{},
		reply interface{},
		pullCmdChan chan<- PullCmd,
		setting ...socket.PacketSetting,
	) PullCmd
	// Pull sends a packet and receives reply.
	// Note:
	// If the args is []byte or *[]byte type, it can automatically fill in the body codec name;
	// If the session is a client role and PeerConfig.RedialTimes>0, it is automatically re-called once after a failure.
	Pull(uri string, args interface{}, reply interface{}, setting ...socket.PacketSetting) PullCmd
	// Push sends a packet, but do not receives reply.
	// Note:
	// If the args is []byte or *[]byte type, it can automatically fill in the body codec name;
	// If the session is a client role and PeerConfig.RedialTimes>0, it is automatically re-called once after a failure.
	Push(uri string, args interface{}, setting ...socket.PacketSetting) *Rerror
	// SessionAge returns the session max age.
	SessionAge() time.Duration
	// ContextAge returns PULL or PUSH context max age.
	ContextAge() time.Duration
}

Session a connection session.

type SessionHub

type SessionHub struct {
	// contains filtered or unexported fields
}

SessionHub sessions hub

func (*SessionHub) Delete

func (sh *SessionHub) Delete(id string)

Delete deletes the *session for a id.

func (*SessionHub) Get

func (sh *SessionHub) Get(id string) (*session, bool)

Get gets *session by id. If second returned arg is false, mean the *session is not found.

func (*SessionHub) Len

func (sh *SessionHub) Len() int

Len returns the length of the session hub. Note: the count implemented using sync.Map may be inaccurate.

func (*SessionHub) Random

func (sh *SessionHub) Random() (*session, bool)

Random gets a *session randomly. If third returned arg is false, mean no *session is exist.

func (*SessionHub) Range

func (sh *SessionHub) Range(fn func(*session) bool)

Range calls f sequentially for each id and *session present in the session hub. If fn returns false, stop traversing.

func (*SessionHub) Set

func (sh *SessionHub) Set(sess *session)

Set sets a *session.

type SubRouter

type SubRouter struct {
	// contains filtered or unexported fields
}

SubRouter without the SetUnknownPull and SetUnknownPush methods

func (*SubRouter) Root

func (r *SubRouter) Root() *Router

Root returns the root router.

func (*SubRouter) RoutePull

func (r *SubRouter) RoutePull(pullCtrlStruct interface{}, plugin ...Plugin) []string

RoutePull registers PULL handlers, and returns the paths.

func (*SubRouter) RoutePullFunc

func (r *SubRouter) RoutePullFunc(pullHandleFunc interface{}, plugin ...Plugin) string

RoutePullFunc registers PULL handler, and returns the path.

func (*SubRouter) RoutePush

func (r *SubRouter) RoutePush(pushCtrlStruct interface{}, plugin ...Plugin) []string

RoutePush registers PUSH handlers, and returns the paths.

func (*SubRouter) RoutePushFunc

func (r *SubRouter) RoutePushFunc(pushHandleFunc interface{}, plugin ...Plugin) string

RoutePushFunc registers PUSH handler, and returns the path.

func (*SubRouter) SubRoute

func (r *SubRouter) SubRoute(pathPrefix string, plugin ...Plugin) *SubRouter

SubRoute adds handler group.

func (*SubRouter) ToRouter

func (r *SubRouter) ToRouter() *Router

ToRouter converts to the router which is added the SetUnknownPull and SetUnknownPush methods.

type UnknownPullCtx

type UnknownPullCtx interface {

	// GetBodyCodec gets the body codec type of the input packet.
	GetBodyCodec() byte
	// InputBodyBytes if the input body binder is []byte type, returns it, else returns nil.
	InputBodyBytes() []byte
	// Bind when the raw body binder is []byte type, now binds the input body to v.
	Bind(v interface{}) (bodyCodec byte, err error)
	// SetBodyCodec sets the body codec for reply packet.
	SetBodyCodec(byte)
	// AddMeta adds the header metadata 'key=value' for reply packet.
	// Multiple values for the same key may be added.
	AddMeta(key, value string)
	// SetMeta sets the header metadata 'key=value' for reply packet.
	SetMeta(key, value string)
	// AddXferPipe appends transfer filter pipe of reply packet.
	AddXferPipe(filterId ...byte)
	// contains filtered or unexported methods
}

UnknownPullCtx context method set for handling the unknown pulled packet.

type UnknownPushCtx

type UnknownPushCtx interface {

	// GetBodyCodec gets the body codec type of the input packet.
	GetBodyCodec() byte
	// InputBodyBytes if the input body binder is []byte type, returns it, else returns nil.
	InputBodyBytes() []byte
	// Bind when the raw body binder is []byte type, now binds the input body to v.
	Bind(v interface{}) (bodyCodec byte, err error)
	// contains filtered or unexported methods
}

UnknownPushCtx context method set for handling the unknown pushed packet.

type WriteCtx

type WriteCtx interface {
	PreCtx
	// Output returns writed packet.
	Output() *socket.Packet
	// Rerror returns the handle error.
	Rerror() *Rerror
}

WriteCtx context method set for writing packet.

Directories

Path Synopsis
Package codec is the body's codec set.
Package codec is the body's codec set.
examples
ab
age
Package socket provides a concise, powerful and high-performance TCP.
Package socket provides a concise, powerful and high-performance TCP.
example/pb
Package pb is a generated protocol buffer package.
Package pb is a generated protocol buffer package.
Package xfer is transfer filter set.
Package xfer is transfer filter set.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL