tp

package module
v4.1.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2018 License: Apache-2.0 Imports: 33 Imported by: 0

README

Teleport GitHub release report card github issues github closed issues GoDoc view examples

Teleport is a versatile, high-performance and flexible socket framework.

It can be used for peer-peer, rpc, gateway, micro services, push services, game services and so on.

简体中文

Teleport-Framework

Benchmark

Test Case

  • A server and a client process, running on the same machine
  • CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
  • Memory: 16G
  • OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
  • Go: 1.9.2
  • Message size: 581 bytes
  • Message codec: protobuf
  • Sent total 1000000 messages

Test Results

  • teleport
client concurrency mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 1 0 16 0 75505
500 9 11 97 0 52192
1000 19 24 187 0 50040
2000 39 54 409 0 42551
5000 96 128 1148 0 46367
  • teleport/socket
client concurrency mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 0 0 14 0 225682
500 2 1 24 0 212630
1000 4 3 51 0 180733
2000 8 6 64 0 183351
5000 21 18 651 0 133886

test code

  • Profile torch of teleport/socket

tp_socket_profile_torch

svg file

  • Heap torch of teleport/socket

tp_socket_heap_torch

svg file

Version

version status branch
v4 release v4
v3 release v3
v2 release v2
v1 release v1

Install

go get -u -f github.com/henrylee2cn/teleport

Feature

  • Server and client are peer-to-peer, have the same API method
  • Support custom communication protocol
  • Support set the size of socket I/O buffer
  • Message contains both Header and Body two parts
  • Message Header contains metadata in the same format as HTTP header
  • Support for customizing Body coding types separately, e.g JSON Protobuf string
  • Support push, call, reply and other means of communication
  • Support plug-in mechanism, can customize authentication, heartbeat, micro service registration center, statistics, etc.
  • Whether server or client, the peer support reboot and shutdown gracefully
  • Support reverse proxy
  • Detailed log information, support print input and output details
  • Supports setting slow operation alarm threshold
  • Use I/O multiplexing technology
  • Support setting the size of the reading message (if exceed disconnect it)
  • Provide the context of the handler
  • Client session support automatically redials after disconnection
  • Support network list: tcp, tcp4, tcp6, unix, unixpacket and so on
  • Provide an operating interface to control the connection file descriptor

Example

server.go
package main

import (
    "fmt"
    "time"

    tp "github.com/henrylee2cn/teleport"
)

func main() {
    srv := tp.NewPeer(tp.PeerConfig{
        CountTime:  true,
        ListenPort: 9090,
    })
    srv.RouteCall(new(math))
    srv.ListenAndServe()
}

type math struct {
    tp.CallCtx
}

func (m *math) Add(arg *[]int) (int, *tp.Rerror) {
    if m.Query().Get("push_status") == "yes" {
        m.Session().Push(
            "/push/status",
            fmt.Sprintf("%d numbers are being added...", len(*arg)),
        )
        time.Sleep(time.Millisecond * 10)
    }
    var r int
    for _, a := range *arg {
        r += a
    }
    return r, nil
}
client.go
package main

import (
    tp "github.com/henrylee2cn/teleport"
)

func main() {
    tp.SetLoggerLevel("ERROR")
    cli := tp.NewPeer(tp.PeerConfig{})
    defer cli.Close()
    cli.RoutePush(new(push))
    sess, err := cli.Dial(":9090")
    if err != nil {
        tp.Fatalf("%v", err)
    }

    var result int
    rerr := sess.Call("/math/add?push_status=yes",
        []int{1, 2, 3, 4, 5},
        &result,
    ).Rerror()

    if rerr != nil {
        tp.Fatalf("%v", rerr)
    }
    tp.Printf("result: %d", result)
}

type push struct {
    tp.PushCtx
}

func (p *push) Status(arg *string) *tp.Rerror {
    tp.Printf("server status: %s", *arg)
    return nil
}

More Examples

Design

Keywords
  • Peer: A communication instance may be a server or a client
  • Socket: Base on the net.Conn package, add custom package protocol, transfer pipelines and other functions
  • Message: The corresponding structure of the data package content element
  • Proto: The protocol interface of message pack/unpack
  • Codec: Serialization interface for Message.Body
  • XferPipe: Message bytes encoding pipeline, such as compression, encryption, calibration and so on
  • XferFilter: A interface to handle message data before transfer
  • Plugin: Plugins that cover all aspects of communication
  • Session: A connection session, with push, call, reply, close and other methods of operation
  • Context: Handle the received or send messages
  • Call-Launch: Call data from the peer
  • Call-Handle: Handle and reply to the calling of peer
  • Push-Launch: Push data to the peer
  • Push-Handle: Handle the pushing of peer
  • Router: Router that route the response handler by request information(such as a URI)
Message

The contents of every one message:

// in .../teleport/socket package

// Message a socket message data.
type Message struct {
    // Has unexported fields.
}

func GetMessage(settings ...MessageSetting) *Message
func NewMessage(settings ...MessageSetting) *Message
func (m *Message) Body() interface{}
func (m *Message) BodyCodec() byte
func (m *Message) Context() context.Context
func (m *Message) MarshalBody() ([]byte, error)
func (m *Message) Meta() *utils.Args
func (m *Message) Mtype() byte
func (m *Message) Reset(settings ...MessageSetting)
func (m *Message) Seq() string
func (m *Message) SetBody(body interface{})
func (m *Message) SetBodyCodec(bodyCodec byte)
func (m *Message) SetNewBody(newBodyFunc NewBodyFunc)
func (m *Message) SetMtype(mtype byte)
func (m *Message) SetSeq(seq string)
func (m *Message) SetSize(size uint32) error
func (m *Message) SetUri(uri string)
func (m *Message) SetUriObject(uriObject *url.URL)
func (m *Message) Size() uint32
func (m *Message) String() string
func (m *Message) UnmarshalBody(bodyBytes []byte) error
func (m *Message) Uri() string
func (m *Message) UriObject() *url.URL
func (m *Message) XferPipe() *xfer.XferPipe

// NewBodyFunc creates a new body by header.
type NewBodyFunc func(Header) interface{}
Codec

The body's codec set.

type Codec interface {
    // Id returns codec id.
    Id() byte
    // Name returns codec name.
    Name() string
    // Marshal returns the encoding of v.
    Marshal(v interface{}) ([]byte, error)
    // Unmarshal parses the encoded data and stores the result
    // in the value pointed to by v.
    Unmarshal(data []byte, v interface{}) error
}
XferPipe

Transfer filter pipe, handles byte stream of message when transfer.

// XferFilter handles byte stream of message when transfer.
type XferFilter interface {
    // Id returns transfer filter id.
    Id() byte
    // Name returns transfer filter name.
    Name() string
    // OnPack performs filtering on packing.
    OnPack([]byte) ([]byte, error)
    // OnUnpack performs filtering on unpacking.
    OnUnpack([]byte) ([]byte, error)
}
// Get returns transfer filter by id.
func Get(id byte) (XferFilter, error)
// GetByName returns transfer filter by name.
func GetByName(name string) (XferFilter, error)

// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
// Note: the length can not be bigger than 255!
type XferPipe struct {
    // Has unexported fields.
}
func NewXferPipe() *XferPipe
func (x *XferPipe) Append(filterId ...byte) error
func (x *XferPipe) AppendFrom(src *XferPipe)
func (x *XferPipe) Ids() []byte
func (x *XferPipe) Len() int
func (x *XferPipe) Names() []string
func (x *XferPipe) OnPack(data []byte) ([]byte, error)
func (x *XferPipe) OnUnpack(data []byte) ([]byte, error)
func (x *XferPipe) Range(callback func(idx int, filter XferFilter) bool)
func (x *XferPipe) Reset()
Plugin

Plug-ins during runtime.

type (
    // Plugin plugin background
    Plugin interface {
        Name() string
    }
    // PreNewPeerPlugin is executed before creating peer.
    PreNewPeerPlugin interface {
        Plugin
        PreNewPeer(*PeerConfig, *PluginContainer) error
    }
    ...
)
Protocol

You can customize your own communication protocol by implementing the interface:

type (
    // Proto pack/unpack protocol scheme of socket message.
    Proto interface {
        // Version returns the protocol's id and name.
        Version() (byte, string)
        // Pack writes the Message into the connection.
        // Note: Make sure to write only once or there will be package contamination!
        Pack(*Message) error
        // Unpack reads bytes from the connection to the Message.
        // Note: Concurrent unsafe!
        Unpack(*Message) error
    }
    ProtoFunc func(io.ReadWriter) Proto
)

Next, you can specify the communication protocol in the following ways:

func SetDefaultProtoFunc(ProtoFunc)
type Peer interface {
    ...
    ServeConn(conn net.Conn, protoFunc ...ProtoFunc) Session
    DialContext(ctx context.Context, addr string, protoFunc ...ProtoFunc) (Session, *Rerror)
    Dial(addr string, protoFunc ...ProtoFunc) (Session, *Rerror)
    Listen(protoFunc ...ProtoFunc) error
    ...
}

Default protocol RawProto(Big Endian):

{4 bytes message length}
{1 byte protocol version}
{1 byte transfer pipe length}
{transfer pipe IDs}
# The following is handled data by transfer pipe
{4 bytes sequence length}
{sequence}
{1 byte message type} // e.g. CALL:1; REPLY:2; PUSH:3
{4 bytes URI length}
{URI}
{4 bytes metadata length}
{metadata(urlencoded)}
{1 byte body codec id}
{body}

Usage

Peer(server or client) Demo
// Start a server
var peer1 = tp.NewPeer(tp.PeerConfig{
    ListenPort: 9090, // for server role
})
peer1.Listen()

...

// Start a client
var peer2 = tp.NewPeer(tp.PeerConfig{})
var sess, err = peer2.Dial("127.0.0.1:8080")
Call-Controller-Struct API template
type Aaa struct {
    tp.CallCtx
}
func (x *Aaa) XxZz(arg *<T>) (<T>, *tp.Rerror) {
    ...
    return r, nil
}
  • register it to root router:
// register the call route: /aaa/xx_zz
peer.RouteCall(new(Aaa))

// or register the call route: /xx_zz
peer.RouteCallFunc((*Aaa).XxZz)
Call-Handler-Function API template
func XxZz(ctx tp.CallCtx, arg *<T>) (<T>, *tp.Rerror) {
    ...
    return r, nil
}
  • register it to root router:
// register the call route: /xx_zz
peer.RouteCallFunc(XxZz)
Push-Controller-Struct API template
type Bbb struct {
    tp.PushCtx
}
func (b *Bbb) YyZz(arg *<T>) *tp.Rerror {
    ...
    return nil
}
  • register it to root router:
// register the push route: /bbb/yy_zz
peer.RoutePush(new(Bbb))

// or register the push route: /yy_zz
peer.RoutePushFunc((*Bbb).YyZz)
Push-Handler-Function API template
// YyZz register the route: /yy_zz
func YyZz(ctx tp.PushCtx, arg *<T>) *tp.Rerror {
    ...
    return nil
}
  • register it to root router:
// register the push route: /yy_zz
peer.RoutePushFunc(YyZz)
Unknown-Call-Handler-Function API template
func XxxUnknownCall (ctx tp.UnknownCallCtx) (interface{}, *tp.Rerror) {
    ...
    return r, nil
}
  • register it to root router:
// register the unknown call route: /*
peer.SetUnknownCall(XxxUnknownCall)
Unknown-Push-Handler-Function API template
func XxxUnknownPush(ctx tp.UnknownPushCtx) *tp.Rerror {
    ...
    return nil
}
  • register it to root router:
// register the unknown push route: /*
peer.SetUnknownPush(XxxUnknownPush)
The mapping rule of struct(func) name to URI path:
  • AaBb -> /aa_bb
  • Aa_Bb -> /aa/bb
  • aa_bb -> /aa/bb
  • Aa__Bb -> /aa_bb
  • aa__bb -> /aa_bb
  • ABC_XYZ -> /abc/xyz
  • ABcXYz -> /abc_xyz
  • ABC__XYZ -> /abc_xyz
Plugin Demo
// NewIgnoreCase Returns a ignoreCase plugin.
func NewIgnoreCase() *ignoreCase {
    return &ignoreCase{}
}

type ignoreCase struct{}

var (
    _ tp.PostReadCallHeaderPlugin = new(ignoreCase)
    _ tp.PostReadPushHeaderPlugin = new(ignoreCase)
)

func (i *ignoreCase) Name() string {
    return "ignoreCase"
}

func (i *ignoreCase) PostReadCallHeader(ctx tp.ReadCtx) *tp.Rerror {
    // Dynamic transformation path is lowercase
    ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
    return nil
}

func (i *ignoreCase) PostReadPushHeader(ctx tp.ReadCtx) *tp.Rerror {
    // Dynamic transformation path is lowercase
    ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
    return nil
}
Register above handler and plugin
// add router group
group := peer.SubRoute("test")
// register to test group
group.RouteCall(new(Aaa), NewIgnoreCase())
peer.RouteCallFunc(XxZz, NewIgnoreCase())
group.RoutePush(new(Bbb))
peer.RoutePushFunc(YyZz)
peer.SetUnknownCall(XxxUnknownCall)
peer.SetUnknownPush(XxxUnknownPush)
Config
type PeerConfig struct {
    Network            string        `yaml:"network"              ini:"network"              comment:"Network; tcp, tcp4, tcp6, unix or unixpacket"`
    LocalIP            string        `yaml:"local_ip"             ini:"local_ip"             comment:"Local IP"`
    ListenPort         uint16        `yaml:"listen_port"          ini:"listen_port"          comment:"Listen port; for server role"`
    DefaultDialTimeout time.Duration `yaml:"default_dial_timeout" ini:"default_dial_timeout" comment:"Default maximum duration for dialing; for client role; ns,µs,ms,s,m,h"`
    RedialTimes        int32         `yaml:"redial_times"         ini:"redial_times"         comment:"The maximum times of attempts to redial, after the connection has been unexpectedly broken; for client role"`
    DefaultBodyCodec   string        `yaml:"default_body_codec"   ini:"default_body_codec"   comment:"Default body codec type id"`
    DefaultSessionAge  time.Duration `yaml:"default_session_age"  ini:"default_session_age"  comment:"Default session max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
    DefaultContextAge  time.Duration `yaml:"default_context_age"  ini:"default_context_age"  comment:"Default CALL or PUSH context max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
    SlowCometDuration  time.Duration `yaml:"slow_comet_duration"  ini:"slow_comet_duration"  comment:"Slow operation alarm threshold; ns,µs,ms,s ..."`
    PrintDetail        bool          `yaml:"print_detail"         ini:"print_detail"         comment:"Is print body and metadata or not"`
    CountTime          bool          `yaml:"count_time"           ini:"count_time"           comment:"Is count cost time or not"`
}
Optimize
  • SetMessageSizeLimit sets max message size. If maxSize<=0, set it to max uint32.

    func SetMessageSizeLimit(maxMessageSize uint32)
    
  • SetSocketKeepAlive sets whether the operating system should send keepalive messages on the connection.

    func SetSocketKeepAlive(keepalive bool)
    
  • SetSocketKeepAlivePeriod sets period between keep alives.

    func SetSocketKeepAlivePeriod(d time.Duration)
    
  • SetSocketNoDelay controls whether the operating system should delay message transmission in hopes of sending fewer messages (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.

    func SetSocketNoDelay(_noDelay bool)
    
  • SetSocketReadBuffer sets the size of the operating system's receive buffer associated with the connection.

    func SetSocketReadBuffer(bytes int)
    
  • SetSocketWriteBuffer sets the size of the operating system's transmit buffer associated with the connection.

    func SetSocketWriteBuffer(bytes int)
    

Extensions

Codec
package import description
json import "github.com/henrylee2cn/teleport/codec" JSON codec(teleport own)
protobuf import "github.com/henrylee2cn/teleport/codec" Protobuf codec(teleport own)
plain import "github.com/henrylee2cn/teleport/codec" Plain text codec(teleport own)
form import "github.com/henrylee2cn/teleport/codec" Form(url encode) codec(teleport own)
Plugin
package import description
auth import "github.com/henrylee2cn/teleport/plugin/auth" A auth plugin for verifying peer at the first time
binder import binder "github.com/henrylee2cn/teleport/plugin/binder" Parameter Binding Verification for Struct Handler
heartbeat import heartbeat "github.com/henrylee2cn/teleport/plugin/heartbeat" A generic timing heartbeat plugin
proxy import "github.com/henrylee2cn/teleport/plugin/proxy" A proxy plugin for handling unknown calling or pushing
secure import secure "github.com/henrylee2cn/teleport/plugin/secure" Encrypting/decrypting the message body
Protocol
package import description
rawproto import "github.com/henrylee2cn/teleport/proto/rawproto A fast socket communication protocol(teleport default protocol)
jsonproto import "github.com/henrylee2cn/teleport/proto/jsonproto" A JSON socket communication protocol
pbproto import "github.com/henrylee2cn/teleport/proto/pbproto" A Protobuf socket communication protocol
Transfer-Filter
package import description
gzip import "github.com/henrylee2cn/teleport/xfer/gzip" Gzip(teleport own)
md5 import "github.com/henrylee2cn/teleport/xfer/md5" Provides a integrity check transfer filter
Mixer
package import description
multiclient import "github.com/henrylee2cn/teleport/mixer/multiclient" Higher throughput client connection pool when transferring large messages (such as downloading files)
websocket import "github.com/henrylee2cn/teleport/mixer/websocket" Makes the Teleport framework compatible with websocket protocol as specified in RFC 6455
html html "github.com/xiaoenai/tp-micro/helper/mod-html" HTML render for http client

Projects based on Teleport

project description
TP-Micro TP-Micro is a simple, powerful micro service framework based on Teleport
Pholcus Pholcus is a distributed, high concurrency and powerful web crawler software

Business Users

深圳市梦之舵信息技术有限公司    平安科技
北京风行在线技术有限公司    北京可即时代网络公司

License

Teleport is under Apache v2 License. See the LICENSE file for the full license text

Documentation

Overview

Package tp (teleport) is a versatile, high-performance and flexible TCP socket framework. It can be used for peer-peer, rpc, gateway, micro services, push services, game services and so on.

Copyright 2015-2018 HenryLee. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	TypeUndefined byte = 0
	TypeCall      byte = 1
	TypeReply     byte = 2 // reply to call
	TypePush      byte = 3
)

Message types

View Source
const (
	CodeUnknownError        = -1
	CodeConnClosed          = 102
	CodeWriteFailed         = 104
	CodeDialFailed          = 105
	CodeBadMessage          = 400
	CodeUnauthorized        = 401
	CodeNotFound            = 404
	CodeMtypeNotAllowed     = 405
	CodeHandleTimeout       = 408
	CodeInternalServerError = 500
	CodeBadGateway          = 502
)

Internal Framework Rerror code. Note: Recommended custom code is greater than 1000.

unknown error code: -1.
sender peer error code range: [100,199].
message handling error code range: [400,499].
receiver peer error code range: [500,599].
View Source
const (
	// MetaRerror reply error metadata key
	MetaRerror = "X-Reply-Error"
	// MetaRealIp real IP metadata key
	MetaRealIp = "X-Real-IP"
	// MetaAcceptBodyCodec the key of body codec that the sender wishes to accept
	MetaAcceptBodyCodec = "X-Accept-Body-Codec"
)

Variables

View Source
var (
	// FirstSweep is first executed.
	// Usage: share github.com/henrylee2cn/goutil/graceful with other project.
	FirstSweep func() error
	// BeforeExiting is executed before process exiting.
	// Usage: share github.com/henrylee2cn/goutil/graceful with other project.
	BeforeExiting func() error
)
View Source
var DefaultProtoFunc = socket.DefaultProtoFunc

DefaultProtoFunc gets the default builder of socket communication protocol

func DefaultProtoFunc() tp.ProtoFunc
View Source
var ErrListenClosed = errors.New("listener is closed")

ErrListenClosed listener is closed error.

View Source
var GetMessage = socket.GetMessage

GetMessage gets a *Message form message stack. Note:

newBodyFunc is only for reading form connection;
settings are only for writing to connection.
func GetMessage(settings ...MessageSetting) *Message
View Source
var GetReadLimit = socket.MessageSizeLimit

GetReadLimit gets the message size upper limit of reading.

GetReadLimit() uint32
View Source
var PutMessage = socket.PutMessage

PutMessage puts a *Message to message stack.

func PutMessage(m *Message)
View Source
var SetDefaultProtoFunc = socket.SetDefaultProtoFunc

SetDefaultProtoFunc sets the default builder of socket communication protocol

func SetDefaultProtoFunc(protoFunc tp.ProtoFunc)

SetReadLimit sets max message size. If maxSize<=0, set it to max uint32.

func SetReadLimit(maxMessageSize uint32)
View Source
var SetSocketKeepAlive = socket.SetKeepAlive

SetSocketKeepAlive sets whether the operating system should send keepalive messages on the connection. Note: If have not called the function, the system defaults are used.

func SetSocketKeepAlive(keepalive bool)
View Source
var SetSocketKeepAlivePeriod = socket.SetKeepAlivePeriod

SetSocketKeepAlivePeriod sets period between keep alives. Note: if d<0, don't change the value.

func SetSocketKeepAlivePeriod(d time.Duration)
View Source
var SetSocketNoDelay = socket.SetNoDelay

SetSocketNoDelay controls whether the operating system should delay packet transmission in hopes of sending fewer packets (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.

func SetSocketNoDelay(noDelay bool)
View Source
var SetSocketReadBuffer = socket.SetReadBuffer

SetSocketReadBuffer sets the size of the operating system's receive buffer associated with the connection. Note: if bytes<0, don't change the value.

func SetSocketReadBuffer(bytes int)
View Source
var SetSocketWriteBuffer = socket.SetWriteBuffer

SetSocketWriteBuffer sets the size of the operating system's transmit buffer associated with the connection. Note: if bytes<0, don't change the value.

func SetSocketWriteBuffer(bytes int)
View Source
var SocketReadBuffer = socket.ReadBuffer

SocketReadBuffer returns the size of the operating system's receive buffer associated with the connection. Note: if using the system default value, bytes=-1 and isDefault=true.

func SocketReadBuffer() (bytes int, isDefault bool)
View Source
var SocketWriteBuffer = socket.WriteBuffer

SocketWriteBuffer returns the size of the operating system's transmit buffer associated with the connection. Note: if using the system default value, bytes=-1 and isDefault=true.

func SocketWriteBuffer() (bytes int, isDefault bool)
View Source
var WithAddMeta = socket.WithAddMeta

WithAddMeta adds 'key=value' metadata argument. Multiple values for the same key may be added.

func WithAddMeta(key, value string) MessageSetting
View Source
var WithBody = socket.WithBody

WithBody sets the body object.

func WithBody(body interface{}) MessageSetting
View Source
var WithBodyCodec = socket.WithBodyCodec

WithBodyCodec sets the body codec.

func WithBodyCodec(bodyCodec byte) MessageSetting
View Source
var WithContext = socket.WithContext

WithContext sets the message handling context.

func WithContext(ctx context.Context) MessageSetting
View Source
var WithMtype = socket.WithMtype

WithMtype sets the message type.

func WithMtype(mtype byte) MessageSetting
View Source
var WithNewBody = socket.WithNewBody

WithNewBody resets the function of geting body.

func WithNewBody(newBodyFunc socket.NewBodyFunc) MessageSetting
View Source
var WithQuery = socket.WithQuery

WithQuery sets the message URI query parameter.

func WithQuery(key, value string) MessageSetting
View Source
var WithSeq = socket.WithSeq

WithSeq sets the message sequence.

func WithSeq(seq uint64) MessageSetting
View Source
var WithSetMeta = socket.WithSetMeta

WithSetMeta sets 'key=value' metadata argument.

func WithSetMeta(key, value string) MessageSetting
View Source
var WithUri = socket.WithUri

WithUri sets the message URI string.

func WithUri(uri string) MessageSetting
View Source
var WithUriObject = socket.WithUriObject

WithUriObject sets the message URI object.

func WithUriObject(uriObject *url.URL) MessageSetting
View Source
var WithXferPipe = socket.WithXferPipe

WithXferPipe sets transfer filter pipe.

func WithXferPipe(filterId ...byte) MessageSetting

NOTE:

panic if the filterId is not registered

Functions

func AnywayGo

func AnywayGo(fn func())

AnywayGo similar to go func, but concurrent resources are limited.

func CodeText

func CodeText(rerrCode int32) string

CodeText returns the reply error code text. If the type is undefined returns 'Unknown Error'.

func Criticalf

func Criticalf(format string, a ...interface{})

Criticalf logs a message using CRITICAL as log level.

func Debugf

func Debugf(format string, a ...interface{})

Debugf logs a message using DEBUG as log level.

func Errorf

func Errorf(format string, a ...interface{})

Errorf logs a message using ERROR as log level.

func Fatalf

func Fatalf(format string, a ...interface{})

Fatalf is equivalent to l.Criticalf followed by a call to os.Exit(1).

func GetAcceptBodyCodec

func GetAcceptBodyCodec(meta *utils.Args) (byte, bool)

GetAcceptBodyCodec gets the body codec that the sender wishes to accept. Note: If the specified codec is invalid, the receiver will ignore the mate data.

func GetLoggerLevel

func GetLoggerLevel() string

GetLoggerLevel gets the logger's level.

func Go

func Go(fn func()) bool

Go similar to go func, but return false if insufficient resources.

func GraceSignal

func GraceSignal()

GraceSignal open graceful shutdown or reboot signal.

func Infof

func Infof(format string, a ...interface{})

Infof logs a message using INFO as log level.

func IsConnRerror

func IsConnRerror(rerr *Rerror) bool

IsConnRerror determines whether the error is a connection error

func NewInheritListener

func NewInheritListener(network, laddr string, tlsConfig *tls.Config) (net.Listener, error)

NewInheritListener creates a new listener that can be inherited on reboot.

func NewTlsConfigFromFile

func NewTlsConfigFromFile(tlsCertFile, tlsKeyFile string) (*tls.Config, error)

NewTlsConfigFromFile creates a new TLS config.

func Noticef

func Noticef(format string, a ...interface{})

Noticef logs a message using NOTICE as log level.

func Panicf

func Panicf(format string, a ...interface{})

Panicf is equivalent to l.Criticalf followed by a call to panic().

func Printf

func Printf(format string, a ...interface{})

Printf formats according to a format specifier and writes to standard output. It returns the number of bytes written and any write error encountered.

func Reboot

func Reboot(timeout ...time.Duration)

Reboot all the frame process gracefully. Notes: Windows system are not supported!

func SetGopool

func SetGopool(maxGoroutinesAmount int, maxGoroutineIdleDuration time.Duration)

SetGopool set or reset go pool config. Note: Make sure to call it before calling NewPeer() and Go()

func SetLogger

func SetLogger(logger Logger)

SetLogger sets global logger. Note: Concurrent is not safe!

func SetLoggerLevel

func SetLoggerLevel(level string)

SetLoggerLevel sets the logger's level.

func SetShutdown

func SetShutdown(timeout time.Duration, firstSweep, beforeExiting func() error)

SetShutdown sets the function which is called after the process shutdown, and the time-out period for the process shutdown. If 0<=timeout<5s, automatically use 'MinShutdownTimeout'(5s). If timeout<0, indefinite period. 'firstSweep' is first executed. 'beforeExiting' is executed before process exiting.

func Shutdown

func Shutdown(timeout ...time.Duration)

Shutdown closes all the frame process gracefully. Parameter timeout is used to reset time-out period for the process shutdown.

func ToUriPath

func ToUriPath(name string) string

ToUriPath maps struct(func) name to URI path.

func Tracef

func Tracef(format string, a ...interface{})

Tracef logs a message using TRACE as log level.

func TryGo

func TryGo(fn func())

TryGo tries to execute the function via goroutine. If there are no concurrent resources, execute it synchronously.

func TypeText

func TypeText(typ byte) string

TypeText returns the message type text. If the type is undefined returns 'Undefined'.

func Warnf

func Warnf(format string, a ...interface{})

Warnf logs a message using WARNING as log level.

Types

type BasePeer

type BasePeer interface {
	// Close closes peer.
	Close() (err error)
	// CountSession returns the number of sessions.
	CountSession() int
	// GetSession gets the session by id.
	GetSession(sessionId string) (Session, bool)
	// RangeSession ranges all sessions. If fn returns false, stop traversing.
	RangeSession(fn func(sess Session) bool)
	// SetTlsConfig sets the TLS config.
	SetTlsConfig(tlsConfig *tls.Config)
	// SetTlsConfigFromFile sets the TLS config from file.
	SetTlsConfigFromFile(tlsCertFile, tlsKeyFile string) error
	// TlsConfig returns the TLS config.
	TlsConfig() *tls.Config
	// PluginContainer returns the global plugin container.
	PluginContainer() *PluginContainer
}

BasePeer peer with the common method set

type BaseSession

type BaseSession interface {
	// Id returns the session id.
	Id() string
	// Peer returns the peer.
	Peer() Peer
	// LocalAddr returns the local network address.
	LocalAddr() net.Addr
	// RemoteAddr returns the remote network address.
	RemoteAddr() net.Addr
	// Swap returns custom data swap of the session(socket).
	Swap() goutil.Map
}

BaseSession a connection session with the common method set.

type Body

type Body = socket.Body

Body message body interface

type CallCmd

type CallCmd interface {
	// TracePeer trace back the peer.
	TracePeer() (peer Peer, found bool)
	// TraceSession trace back the session.
	TraceSession() (sess Session, found bool)
	// Context carries a deadline, a cancelation signal, and other values across
	// API boundaries.
	Context() context.Context
	// Output returns writed message.
	Output() *Message
	// Rerror returns the call error.
	Rerror() *Rerror
	// Done returns the chan that indicates whether it has been completed.
	Done() <-chan struct{}
	// Reply returns the call reply.
	// Notes:
	//  Inside, <-Done() is automatically called and blocked,
	//  until the call is completed!
	Reply() (interface{}, *Rerror)
	// InputBodyCodec gets the body codec type of the input message.
	// Notes:
	//  Inside, <-Done() is automatically called and blocked,
	//  until the call is completed!
	InputBodyCodec() byte
	// InputMeta returns the header metadata of input message.
	// Notes:
	//  Inside, <-Done() is automatically called and blocked,
	//  until the call is completed!
	InputMeta() *utils.Args
	// CostTime returns the called cost time.
	// If PeerConfig.CountTime=false, always returns 0.
	// Notes:
	//  Inside, <-Done() is automatically called and blocked,
	//  until the call is completed!
	CostTime() time.Duration
}

CallCmd the command of the calling operation's response.

func NewFakeCallCmd

func NewFakeCallCmd(uri string, arg, result interface{}, rerr *Rerror) CallCmd

NewFakeCallCmd creates a fake CallCmd.

type CallCtx

type CallCtx interface {

	// Input returns readed message.
	Input() *Message
	// GetBodyCodec gets the body codec type of the input message.
	GetBodyCodec() byte
	// Output returns writed message.
	Output() *Message
	// ReplyBodyCodec initializes and returns the reply message body codec id.
	ReplyBodyCodec() byte
	// SetBodyCodec sets the body codec for reply message.
	SetBodyCodec(byte)
	// AddMeta adds the header metadata 'key=value' for reply message.
	// Multiple values for the same key may be added.
	AddMeta(key, value string)
	// SetMeta sets the header metadata 'key=value' for reply message.
	SetMeta(key, value string)
	// AddXferPipe appends transfer filter pipe of reply message.
	AddXferPipe(filterId ...byte)
	// contains filtered or unexported methods
}

CallCtx context method set for handling the called message. For example:

type HomeCall struct{ CallCtx }

type EarlyPeer

type EarlyPeer interface {
	BasePeer
	// Router returns the root router of call or push handlers.
	Router() *Router
	// SubRoute adds handler group.
	SubRoute(pathPrefix string, plugin ...Plugin) *SubRouter
	// RouteCall registers CALL handlers, and returns the paths.
	RouteCall(ctrlStruct interface{}, plugin ...Plugin) []string
	// RouteCallFunc registers CALL handler, and returns the path.
	RouteCallFunc(callHandleFunc interface{}, plugin ...Plugin) string
	// RoutePush registers PUSH handlers, and returns the paths.
	RoutePush(ctrlStruct interface{}, plugin ...Plugin) []string
	// RoutePushFunc registers PUSH handler, and returns the path.
	RoutePushFunc(pushHandleFunc interface{}, plugin ...Plugin) string
	// SetUnknownCall sets the default handler, which is called when no handler for CALL is found.
	SetUnknownCall(fn func(UnknownCallCtx) (interface{}, *Rerror), plugin ...Plugin)
	// SetUnknownPush sets the default handler, which is called when no handler for PUSH is found.
	SetUnknownPush(fn func(UnknownPushCtx) *Rerror, plugin ...Plugin)
}

EarlyPeer the communication peer that has just been created

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler call or push handler type info

func (*Handler) ArgElemType

func (h *Handler) ArgElemType() reflect.Type

ArgElemType returns the handler arg elem type.

func (*Handler) IsCall

func (h *Handler) IsCall() bool

IsCall checks if it is call handler or not.

func (*Handler) IsPush

func (h *Handler) IsPush() bool

IsPush checks if it is push handler or not.

func (*Handler) IsUnknown

func (h *Handler) IsUnknown() bool

IsUnknown checks if it is unknown handler(call/push) or not.

func (*Handler) Name

func (h *Handler) Name() string

Name returns the handler name.

func (*Handler) NewArgValue

func (h *Handler) NewArgValue() reflect.Value

NewArgValue creates a new arg elem value.

func (*Handler) ReplyType

func (h *Handler) ReplyType() reflect.Type

ReplyType returns the handler reply type

func (*Handler) RouterTypeName

func (h *Handler) RouterTypeName() string

RouterTypeName returns the router type name.

type HandlersMaker

type HandlersMaker func(string, interface{}, *PluginContainer) ([]*Handler, error)

HandlersMaker makes []*Handler

type Header = socket.Header

Header message header interface

type Logger

type Logger interface {
	// Level returns the logger's level.
	Level() string
	// SetLevel sets the logger's level.
	SetLevel(level string)
	// Printf formats according to a format specifier and writes to standard output.
	// It returns the number of bytes written and any write error encountered.
	Printf(format string, a ...interface{})
	// Fatalf is equivalent to Criticalf followed by a call to os.Exit(1).
	Fatalf(format string, a ...interface{})
	// Panicf is equivalent to Criticalf followed by a call to panic().
	Panicf(format string, a ...interface{})
	// Criticalf logs a message using CRITICAL as log level.
	Criticalf(format string, a ...interface{})
	// Errorf logs a message using ERROR as log level.
	Errorf(format string, a ...interface{})
	// Warnf logs a message using WARNING as log level.
	Warnf(format string, a ...interface{})
	// Noticef logs a message using NOTICE as log level.
	Noticef(format string, a ...interface{})
	// Infof logs a message using INFO as log level.
	Infof(format string, a ...interface{})
	// Debugf logs a message using DEBUG as log level.
	Debugf(format string, a ...interface{})
	// Tracef logs a message using TRACE as log level.
	Tracef(format string, a ...interface{})
}

Logger interface

func GetLogger

func GetLogger() Logger

GetLogger gets global logger.

type Message

type Message = socket.Message

Message a socket message data.

type MessageSetting

type MessageSetting = socket.MessageSetting

MessageSetting is a pipe function type for setting message.

func WithAcceptBodyCodec

func WithAcceptBodyCodec(bodyCodec byte) MessageSetting

WithAcceptBodyCodec sets the body codec that the sender wishes to accept. Note: If the specified codec is invalid, the receiver will ignore the mate data.

func WithRealIp

func WithRealIp(ip string) MessageSetting

WithRealIp sets the real IP to metadata.

func WithRerror

func WithRerror(rerr *Rerror) MessageSetting

WithRerror sets the real IP to metadata.

type NewBodyFunc

type NewBodyFunc = socket.NewBodyFunc

NewBodyFunc creates a new body by header.

type Peer

type Peer interface {
	EarlyPeer
	// ListenAndServe turns on the listening service.
	ListenAndServe(protoFunc ...ProtoFunc) error
	// Dial connects with the peer of the destination address.
	Dial(addr string, protoFunc ...ProtoFunc) (Session, *Rerror)
	// DialContext connects with the peer of the destination address, using the provided context.
	DialContext(ctx context.Context, addr string, protoFunc ...ProtoFunc) (Session, *Rerror)
	// ServeConn serves the connection and returns a session.
	// Note:
	//  Not support automatically redials after disconnection;
	//  Execute the PostAcceptPlugin plugins.
	ServeConn(conn net.Conn, protoFunc ...ProtoFunc) (Session, error)
	// ServeListener serves the listener.
	// Note: The caller ensures that the listener supports graceful shutdown.
	ServeListener(lis net.Listener, protoFunc ...ProtoFunc) error
}

Peer the communication peer which is server or client role

func NewPeer

func NewPeer(cfg PeerConfig, globalLeftPlugin ...Plugin) Peer

NewPeer creates a new peer.

type PeerConfig

type PeerConfig struct {
	Network            string        `yaml:"network"              ini:"network"              comment:"Network; tcp, tcp4, tcp6, unix or unixpacket"`
	LocalIP            string        `yaml:"local_ip"             ini:"local_ip"             comment:"Local IP"`
	ListenPort         uint16        `yaml:"listen_port"          ini:"listen_port"          comment:"Listen port; for server role"`
	DefaultDialTimeout time.Duration `` /* 135-byte string literal not displayed */
	RedialTimes        int32         `` /* 172-byte string literal not displayed */
	DefaultBodyCodec   string        `yaml:"default_body_codec"   ini:"default_body_codec"   comment:"Default body codec type id"`
	DefaultSessionAge  time.Duration `` /* 148-byte string literal not displayed */
	DefaultContextAge  time.Duration `` /* 161-byte string literal not displayed */
	SlowCometDuration  time.Duration `yaml:"slow_comet_duration"  ini:"slow_comet_duration"  comment:"Slow operation alarm threshold; ns,µs,ms,s ..."`
	PrintDetail        bool          `yaml:"print_detail"         ini:"print_detail"         comment:"Is print body and metadata or not"`
	CountTime          bool          `yaml:"count_time"           ini:"count_time"           comment:"Is count cost time or not"`
	// contains filtered or unexported fields
}

PeerConfig peer config Note:

yaml tag is used for github.com/henrylee2cn/cfgo
ini tag is used for github.com/henrylee2cn/ini

func (*PeerConfig) Reload

func (p *PeerConfig) Reload(bind cfgo.BindFunc) error

Reload Bi-directionally synchronizes config between YAML file and memory.

type Plugin

type Plugin interface {
	Name() string
}

Plugin plugin background

type PluginContainer

type PluginContainer struct {
	// contains filtered or unexported fields
}

func (*PluginContainer) AppendLeft

func (p *PluginContainer) AppendLeft(plugins ...Plugin)

AppendLeft appends plugins on the left side of the pluginContainer.

func (*PluginContainer) AppendRight

func (p *PluginContainer) AppendRight(plugins ...Plugin)

AppendRight appends plugins on the right side of the pluginContainer.

func (PluginContainer) GetAll

func (p PluginContainer) GetAll() []Plugin

GetAll returns all activated plugins.

func (PluginContainer) GetByName

func (p PluginContainer) GetByName(pluginName string) Plugin

GetByName returns a plugin instance by it's name.

func (*PluginContainer) Remove

func (p *PluginContainer) Remove(pluginName string) error

Remove removes a plugin by it's name.

type PostAcceptPlugin

type PostAcceptPlugin interface {
	PostAccept(PreSession) *Rerror
}

PostAcceptPlugin is executed after accepting connection.

type PostDialPlugin

type PostDialPlugin interface {
	PostDial(PreSession) *Rerror
}

PostDialPlugin is executed after dialing.

type PostDisconnectPlugin

type PostDisconnectPlugin interface {
	PostDisconnect(BaseSession) *Rerror
}

PostDisconnectPlugin is executed after disconnection.

type PostListenPlugin

type PostListenPlugin interface {
	PostListen(net.Addr) error
}

PostListenPlugin is executed between listening and accepting.

type PostNewPeerPlugin

type PostNewPeerPlugin interface {
	PostNewPeer(EarlyPeer) error
}

PostNewPeerPlugin is executed after creating peer.

type PostReadCallBodyPlugin

type PostReadCallBodyPlugin interface {
	PostReadCallBody(ReadCtx) *Rerror
}

PostReadCallBodyPlugin is executed after reading CALL message body.

type PostReadCallHeaderPlugin

type PostReadCallHeaderPlugin interface {
	PostReadCallHeader(ReadCtx) *Rerror
}

PostReadCallHeaderPlugin is executed after reading CALL message header.

type PostReadPushBodyPlugin

type PostReadPushBodyPlugin interface {
	PostReadPushBody(ReadCtx) *Rerror
}

PostReadPushBodyPlugin is executed after reading PUSH message body.

type PostReadPushHeaderPlugin

type PostReadPushHeaderPlugin interface {
	PostReadPushHeader(ReadCtx) *Rerror
}

PostReadPushHeaderPlugin is executed after reading PUSH message header.

type PostReadReplyBodyPlugin

type PostReadReplyBodyPlugin interface {
	PostReadReplyBody(ReadCtx) *Rerror
}

PostReadReplyBodyPlugin is executed after reading REPLY message body.

type PostReadReplyHeaderPlugin

type PostReadReplyHeaderPlugin interface {
	PostReadReplyHeader(ReadCtx) *Rerror
}

PostReadReplyHeaderPlugin is executed after reading REPLY message header.

type PostRegPlugin

type PostRegPlugin interface {
	PostReg(*Handler) error
}

PostRegPlugin is executed after registering handler.

type PostWriteCallPlugin

type PostWriteCallPlugin interface {
	PostWriteCall(WriteCtx) *Rerror
}

PostWriteCallPlugin is executed after successful writing CALL message.

type PostWritePushPlugin

type PostWritePushPlugin interface {
	PostWritePush(WriteCtx) *Rerror
}

PostWritePushPlugin is executed after successful writing PUSH message.

type PostWriteReplyPlugin

type PostWriteReplyPlugin interface {
	PostWriteReply(WriteCtx) *Rerror
}

PostWriteReplyPlugin is executed after successful writing REPLY message.

type PreCtx

type PreCtx interface {
	// Peer returns the peer.
	Peer() Peer
	// Session returns the session.
	Session() Session
	// Ip returns the remote addr.
	Ip() string
	// RealIp returns the the current real remote addr.
	RealIp() string
	// Swap returns custom data swap of context.
	Swap() goutil.Map
	// Context carries a deadline, a cancelation signal, and other values across
	// API boundaries.
	Context() context.Context
}

PreCtx context method set used before reading message header.

type PreNewPeerPlugin

type PreNewPeerPlugin interface {
	PreNewPeer(*PeerConfig, *PluginContainer) error
}

PreNewPeerPlugin is executed before creating peer.

type PreReadCallBodyPlugin

type PreReadCallBodyPlugin interface {
	PreReadCallBody(ReadCtx) *Rerror
}

PreReadCallBodyPlugin is executed before reading CALL message body.

type PreReadHeaderPlugin

type PreReadHeaderPlugin interface {
	PreReadHeader(PreCtx) error
}

PreReadHeaderPlugin is executed before reading message header.

type PreReadPushBodyPlugin

type PreReadPushBodyPlugin interface {
	PreReadPushBody(ReadCtx) *Rerror
}

PreReadPushBodyPlugin is executed before reading PUSH message body.

type PreReadReplyBodyPlugin

type PreReadReplyBodyPlugin interface {
	PreReadReplyBody(ReadCtx) *Rerror
}

PreReadReplyBodyPlugin is executed before reading REPLY message body.

type PreSession

type PreSession interface {
	// Peer returns the peer.
	Peer() Peer
	// LocalAddr returns the local network address.
	LocalAddr() net.Addr
	// RemoteAddr returns the remote network address.
	RemoteAddr() net.Addr
	// Swap returns custom data swap of the session(socket).
	Swap() goutil.Map
	// SetId sets the session id.
	SetId(newId string)
	// ControlFD invokes f on the underlying connection's file
	// descriptor or handle.
	// The file descriptor fd is guaranteed to remain valid while
	// f executes but not after f returns.
	ControlFD(f func(fd uintptr)) error
	// ModifySocket modifies the socket.
	// Note:
	// The connection fd is not allowed to change!
	// Inherit the previous session id and custom data swap;
	// If modifiedConn!=nil, reset the net.Conn of the socket;
	// If newProtoFunc!=nil, reset the ProtoFunc of the socket.
	ModifySocket(fn func(conn net.Conn) (modifiedConn net.Conn, newProtoFunc ProtoFunc))
	// GetProtoFunc returns the ProtoFunc
	GetProtoFunc() ProtoFunc
	// Send sends message to peer, before the formal connection.
	// Note:
	// the external setting seq is invalid, the internal will be forced to set;
	// does not support automatic redial after disconnection.
	Send(uri string, body interface{}, rerr *Rerror, setting ...MessageSetting) *Rerror
	// Receive receives a message from peer, before the formal connection.
	// Note: does not support automatic redial after disconnection.
	Receive(NewBodyFunc, ...MessageSetting) (*Message, *Rerror)
	// SessionAge returns the session max age.
	SessionAge() time.Duration
	// ContextAge returns CALL or PUSH context max age.
	ContextAge() time.Duration
	// SetSessionAge sets the session max age.
	SetSessionAge(duration time.Duration)
	// SetContextAge sets CALL or PUSH context max age.
	SetContextAge(duration time.Duration)
}

PreSession a connection session that has not started reading goroutine.

type PreWriteCallPlugin

type PreWriteCallPlugin interface {
	PreWriteCall(WriteCtx) *Rerror
}

PreWriteCallPlugin is executed before writing CALL message.

type PreWritePushPlugin

type PreWritePushPlugin interface {
	PreWritePush(WriteCtx) *Rerror
}

PreWritePushPlugin is executed before writing PUSH message.

type PreWriteReplyPlugin

type PreWriteReplyPlugin interface {
	PreWriteReply(WriteCtx) *Rerror
}

PreWriteReplyPlugin is executed before writing REPLY message.

type Proto

type Proto = socket.Proto

Proto pack/unpack protocol scheme of socket message.

type ProtoFunc

type ProtoFunc = socket.ProtoFunc

ProtoFunc function used to create a custom Proto interface.

type PushCtx

type PushCtx interface {

	// GetBodyCodec gets the body codec type of the input message.
	GetBodyCodec() byte
	// contains filtered or unexported methods
}

PushCtx context method set for handling the pushed message. For example:

type HomePush struct{ PushCtx }

type ReadCtx

type ReadCtx interface {

	// Input returns readed message.
	Input() *Message
	// Rerror returns the handle error.
	Rerror() *Rerror
	// contains filtered or unexported methods
}

ReadCtx context method set for reading message.

type Rerror

type Rerror struct {
	// Code error code
	Code int32
	// Message the error message displayed to the user (optional)
	Message string
	// Reason the cause of the error for debugging (optional)
	Reason string
}

Rerror error only for reply message

func NewRerror

func NewRerror(code int32, message, reason string) *Rerror

NewRerror creates a *Rerror.

func NewRerrorFromMeta

func NewRerrorFromMeta(meta *utils.Args) *Rerror

NewRerrorFromMeta creates a *Rerror from 'X-Reply-Error' metadata. Return nil if there is no 'X-Reply-Error' in metadata.

func ToRerror

func ToRerror(err error) *Rerror

ToRerror converts error to *Rerror

func (Rerror) Copy

func (r Rerror) Copy() *Rerror

Copy returns the copy of Rerror

func (*Rerror) MarshalJSON

func (r *Rerror) MarshalJSON() ([]byte, error)

MarshalJSON marshals Rerror into JSON, implements json.Marshaler interface.

func (*Rerror) SetMessage

func (r *Rerror) SetMessage(message string) *Rerror

SetMessage sets the error message displayed to the user.

func (*Rerror) SetReason

func (r *Rerror) SetReason(reason string) *Rerror

SetReason sets the cause of the error for debugging.

func (*Rerror) SetToMeta

func (r *Rerror) SetToMeta(meta *utils.Args)

SetToMeta sets self to 'X-Reply-Error' metadata.

func (*Rerror) String

func (r *Rerror) String() string

String prints error info.

func (*Rerror) ToError

func (r *Rerror) ToError() error

ToError converts to error

func (*Rerror) UnmarshalJSON

func (r *Rerror) UnmarshalJSON(b []byte) error

UnmarshalJSON unmarshals a JSON description of self.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router the router of call or push handlers.

func (*Router) RouteCall

func (r *Router) RouteCall(callCtrlStruct interface{}, plugin ...Plugin) []string

RouteCall registers CALL handlers, and returns the paths.

func (*Router) RouteCallFunc

func (r *Router) RouteCallFunc(callHandleFunc interface{}, plugin ...Plugin) string

RouteCallFunc registers CALL handler, and returns the path.

func (*Router) RoutePush

func (r *Router) RoutePush(pushCtrlStruct interface{}, plugin ...Plugin) []string

RoutePush registers PUSH handlers, and returns the paths.

func (*Router) RoutePushFunc

func (r *Router) RoutePushFunc(pushHandleFunc interface{}, plugin ...Plugin) string

RoutePushFunc registers PUSH handler, and returns the path.

func (*Router) SetUnknownCall

func (r *Router) SetUnknownCall(fn func(UnknownCallCtx) (interface{}, *Rerror), plugin ...Plugin)

SetUnknownCall sets the default handler, which is called when no handler for CALL is found.

func (*Router) SetUnknownPush

func (r *Router) SetUnknownPush(fn func(UnknownPushCtx) *Rerror, plugin ...Plugin)

SetUnknownPush sets the default handler, which is called when no handler for PUSH is found.

func (*Router) SubRoute

func (r *Router) SubRoute(pathPrefix string, plugin ...Plugin) *SubRouter

SubRoute adds handler group.

type Session

type Session interface {
	BaseSession
	// SetId sets the session id.
	SetId(newId string)
	// Close closes the session.
	Close() error
	// CloseNotify returns a channel that closes when the connection has gone away.
	CloseNotify() <-chan struct{}
	// Health checks if the session is usable.
	Health() bool
	// AsyncCall sends a message and receives reply asynchronously.
	// If the  is []byte or *[]byte type, it can automatically fill in the body codec name.
	AsyncCall(
		uri string,
		arg interface{},
		result interface{},
		callCmdChan chan<- CallCmd,
		setting ...MessageSetting,
	) CallCmd
	// Call sends a message and receives reply.
	// Note:
	// If the arg is []byte or *[]byte type, it can automatically fill in the body codec name;
	// If the session is a client role and PeerConfig.RedialTimes>0, it is automatically re-called once after a failure.
	Call(uri string, arg interface{}, result interface{}, setting ...MessageSetting) CallCmd
	// Push sends a message, but do not receives reply.
	// Note:
	// If the arg is []byte or *[]byte type, it can automatically fill in the body codec name;
	// If the session is a client role and PeerConfig.RedialTimes>0, it is automatically re-called once after a failure.
	Push(uri string, arg interface{}, setting ...MessageSetting) *Rerror
	// SessionAge returns the session max age.
	SessionAge() time.Duration
	// ContextAge returns CALL or PUSH context max age.
	ContextAge() time.Duration
}

Session a connection session.

type SessionHub

type SessionHub struct {
	// contains filtered or unexported fields
}

SessionHub sessions hub

func (*SessionHub) Delete

func (sh *SessionHub) Delete(id string)

Delete deletes the *session for a id.

func (*SessionHub) Get

func (sh *SessionHub) Get(id string) (*session, bool)

Get gets *session by id. If second returned arg is false, mean the *session is not found.

func (*SessionHub) Len

func (sh *SessionHub) Len() int

Len returns the length of the session hub. Note: the count implemented using sync.Map may be inaccurate.

func (*SessionHub) Random

func (sh *SessionHub) Random() (*session, bool)

Random gets a *session randomly. If third returned arg is false, mean no *session is exist.

func (*SessionHub) Range

func (sh *SessionHub) Range(fn func(*session) bool)

Range calls f sequentially for each id and *session present in the session hub. If fn returns false, stop traversing.

func (*SessionHub) Set

func (sh *SessionHub) Set(sess *session)

Set sets a *session.

type Socket

type Socket = socket.Socket

Socket is a generic stream-oriented network connection.

Multiple goroutines may invoke methods on a Socket simultaneously.

type SubRouter

type SubRouter struct {
	// contains filtered or unexported fields
}

SubRouter without the SetUnknownCall and SetUnknownPush methods

func (*SubRouter) Root

func (r *SubRouter) Root() *Router

Root returns the root router.

func (*SubRouter) RouteCall

func (r *SubRouter) RouteCall(callCtrlStruct interface{}, plugin ...Plugin) []string

RouteCall registers CALL handlers, and returns the paths.

func (*SubRouter) RouteCallFunc

func (r *SubRouter) RouteCallFunc(callHandleFunc interface{}, plugin ...Plugin) string

RouteCallFunc registers CALL handler, and returns the path.

func (*SubRouter) RoutePush

func (r *SubRouter) RoutePush(pushCtrlStruct interface{}, plugin ...Plugin) []string

RoutePush registers PUSH handlers, and returns the paths.

func (*SubRouter) RoutePushFunc

func (r *SubRouter) RoutePushFunc(pushHandleFunc interface{}, plugin ...Plugin) string

RoutePushFunc registers PUSH handler, and returns the path.

func (*SubRouter) SubRoute

func (r *SubRouter) SubRoute(pathPrefix string, plugin ...Plugin) *SubRouter

SubRoute adds handler group.

func (*SubRouter) ToRouter

func (r *SubRouter) ToRouter() *Router

ToRouter converts to the router which is added the SetUnknownCall and SetUnknownPush methods.

type UnknownCallCtx

type UnknownCallCtx interface {

	// GetBodyCodec gets the body codec type of the input message.
	GetBodyCodec() byte
	// InputBodyBytes if the input body binder is []byte type, returns it, else returns nil.
	InputBodyBytes() []byte
	// Bind when the raw body binder is []byte type, now binds the input body to v.
	Bind(v interface{}) (bodyCodec byte, err error)
	// SetBodyCodec sets the body codec for reply message.
	SetBodyCodec(byte)
	// AddMeta adds the header metadata 'key=value' for reply message.
	// Multiple values for the same key may be added.
	AddMeta(key, value string)
	// SetMeta sets the header metadata 'key=value' for reply message.
	SetMeta(key, value string)
	// AddXferPipe appends transfer filter pipe of reply message.
	AddXferPipe(filterId ...byte)
	// contains filtered or unexported methods
}

UnknownCallCtx context method set for handling the unknown called message.

type UnknownPushCtx

type UnknownPushCtx interface {

	// GetBodyCodec gets the body codec type of the input message.
	GetBodyCodec() byte
	// InputBodyBytes if the input body binder is []byte type, returns it, else returns nil.
	InputBodyBytes() []byte
	// Bind when the raw body binder is []byte type, now binds the input body to v.
	Bind(v interface{}) (bodyCodec byte, err error)
	// contains filtered or unexported methods
}

UnknownPushCtx context method set for handling the unknown pushed message.

type WriteCtx

type WriteCtx interface {
	PreCtx
	// Output returns writed message.
	Output() *Message
	// Rerror returns the handle error.
	Rerror() *Rerror
}

WriteCtx context method set for writing message.

Directories

Path Synopsis
Package codec is the body's codec set.
Package codec is the body's codec set.
examples
ab
age
plugin-tps
Package tps statistics requests per second
Package tps statistics requests per second
mixer
multiclient
Package multiclient is a higher throughput client connection pool when transferring large messages (such as downloading files).
Package multiclient is a higher throughput client connection pool when transferring large messages (such as downloading files).
websocket
Websocket is an extension package that makes the Teleport framework compatible with websocket protocol as specified in RFC 6455.
Websocket is an extension package that makes the Teleport framework compatible with websocket protocol as specified in RFC 6455.
websocket/jsonSubProto
Package jsonSubProto is implemented JSON socket communication protocol.
Package jsonSubProto is implemented JSON socket communication protocol.
websocket/pbSubProto
Package pbSubProto is implemented PROTOBUF socket communication protocol.
Package pbSubProto is implemented PROTOBUF socket communication protocol.
websocket/websocket
Package websocket implements a client and server for the WebSocket protocol as specified in RFC 6455.
Package websocket implements a client and server for the WebSocket protocol as specified in RFC 6455.
plugin
binder
Package binder is Parameter Binding Verification Plugin for Struct Handler.
Package binder is Parameter Binding Verification Plugin for Struct Handler.
heartbeat
Heartbeat is a generic timing heartbeat plugin.
Heartbeat is a generic timing heartbeat plugin.
ignorecase
Package ignorecase dynamically ignoring the case of path
Package ignorecase dynamically ignoring the case of path
secure
Package secure encrypting/decrypting the message body.
Package secure encrypting/decrypting the message body.
proto
jsonproto
Package jsonproto is implemented JSON socket communication protocol.
Package jsonproto is implemented JSON socket communication protocol.
pbproto
Package pbproto is implemented PROTOBUF socket communication protocol.
Package pbproto is implemented PROTOBUF socket communication protocol.
Package socket provides a concise, powerful and high-performance TCP.
Package socket provides a concise, powerful and high-performance TCP.
Package xfer is transfer filter set.
Package xfer is transfer filter set.
md5
Package md5 provides a integrity check transfer filter
Package md5 provides a integrity check transfer filter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL