README ¶
Socket
A concise, powerful and high-performance connection socket.
Feature
- The server and client are peer-to-peer interfaces
- Support set the size of socket I/O buffer
- Support custom communication protocol
- Support custom transfer filter pipe (Such as gzip, encrypt, verify...)
- Message contains both Header and Body
- Supports custom encoding types, e.g
JSON
Protobuf
- Header contains the status code and its description text
- Each socket is assigned an id
- Provides
Socket Hub
,Socket
pool andMessage
stack - Support setting the size of the reading message (if exceed disconnect it)
- Provide an operating interface to control the connection file descriptor
Benchmark
Test Case
- A server and a client process, running on the same machine
- CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
- Memory: 16G
- OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
- Go: 1.9.2
- Message size: 581 bytes
- Message codec: protobuf
- Sent total 1000000 messages
Test Results
- teleport/socket
client concurrency | mean(ms) | median(ms) | max(ms) | min(ms) | throughput(TPS) |
---|---|---|---|---|---|
100 | 0 | 0 | 14 | 0 | 225682 |
500 | 2 | 1 | 24 | 0 | 212630 |
1000 | 4 | 3 | 51 | 0 | 180733 |
2000 | 8 | 6 | 64 | 0 | 183351 |
5000 | 21 | 18 | 651 | 0 | 133886 |
- Profile torch of teleport/socket
- Heap torch of teleport/socket
Example
server.go
package main
import (
"log"
"net"
"github.com/henrylee2cn/teleport/v6/socket"
"github.com/henrylee2cn/teleport/v6/socket/example/pb"
)
//go:generate go build $GOFILE
func main() {
socket.SetNoDelay(false)
socket.SetMessageSizeLimit(512)
lis, err := net.Listen("tcp", "0.0.0.0:8000")
if err != nil {
log.Fatalf("[SVR] listen err: %v", err)
}
log.Printf("listen tcp 0.0.0.0:8000")
for {
conn, err := lis.Accept()
if err != nil {
log.Fatalf("[SVR] accept err: %v", err)
}
go func(s socket.Socket) {
log.Printf("accept %s", s.ID())
defer s.Close()
var pbTest = new(pb.PbTest)
for {
// read request
var message = socket.GetMessage(socket.WithNewBody(
func(header socket.Header) interface{} {
*pbTest = pb.PbTest{}
return pbTest
}),
)
err = s.ReadMessage(message)
if err != nil {
log.Printf("[SVR] read request err: %v", err)
return
}
// write response
pbTest.A = pbTest.A + pbTest.B
pbTest.B = pbTest.A - pbTest.B*2
message.SetBody(pbTest)
err = s.WriteMessage(message)
if err != nil {
log.Printf("[SVR] write response err: %v", err)
} else {
log.Printf("[SVR] write response: %v", message)
}
socket.PutMessage(message)
}
}(socket.GetSocket(conn))
}
}
client.go
package main
import (
"log"
"net"
"github.com/henrylee2cn/teleport/v6/codec"
"github.com/henrylee2cn/teleport/v6/socket"
"github.com/henrylee2cn/teleport/v6/socket/example/pb"
)
//go:generate go build $GOFILE
func main() {
conn, err := net.Dial("tcp", "127.0.0.1:8000")
if err != nil {
log.Fatalf("[CLI] dial err: %v", err)
}
s := socket.GetSocket(conn)
defer s.Close()
var message = socket.GetMessage()
defer socket.PutMessage(message)
for i := int32(0); i < 1; i++ {
// write request
message.Reset()
message.SetMtype(0)
message.SetBodyCodec(codec.ID_JSON)
message.SetSeq(i)
message.SetServiceMethod("/a/b")
message.SetBody(&pb.PbTest{A: 10, B: 2})
err = s.WriteMessage(message)
if err != nil {
log.Printf("[CLI] write request err: %v", err)
continue
}
log.Printf("[CLI] write request: %v", message)
// read response
message.Reset(socket.WithNewBody(
func(header socket.Header) interface{} {
return new(pb.PbTest)
}),
)
err = s.ReadMessage(message)
if err != nil {
log.Printf("[CLI] read response err: %v", err)
} else {
log.Printf("[CLI] read response: %v", message)
}
}
}
Keyworks
- Message:* The corresponding structure of the data package
- Proto: The protocol interface of message pack/unpack
- Codec: Serialization interface for
Body
- XferPipe: A series of pipelines to handle message data before transfer
- XferFilter: A interface to handle message data before transfer
Message
The contents of every one message:
// in .../teleport/socket package
type (
// Message a socket message interface.
Message interface {
// Header is an operation interface of required message fields.
// NOTE: Must be supported by Proto interface.
Header
// Body is an operation interface of optional message fields.
// SUGGEST: For features complete, the protocol interface should support it.
Body
// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
// SUGGEST: The length can not be bigger than 255!
XferPipe() *xfer.XferPipe
// Size returns the size of message.
// SUGGEST: For better statistics, Proto interfaces should support it.
Size() uint32
// SetSize sets the size of message.
// If the size is too big, returns error.
// SUGGEST: For better statistics, Proto interfaces should support it.
SetSize(size uint32) error
// Reset resets itself.
Reset(settings ...MessageSetting)
// Context returns the message handling context.
Context() context.Context
// String returns printing message information.
String() string
}
// Header is an operation interface of required message fields.
// NOTE: Must be supported by Proto interface.
Header interface {
// Seq returns the message sequence.
Seq() int32
// SetSeq sets the message sequence.
SetSeq(int32)
// Mtype returns the message type, such as CALL, REPLY, PUSH.
Mtype() byte
// Mtype sets the message type, such as CALL, REPLY, PUSH.
SetMtype(byte)
// ServiceMethod returns the serviec method.
// SUGGEST: max len ≤ 255!
ServiceMethod() string
// SetServiceMethod sets the serviec method.
// SUGGEST: max len ≤ 255!
SetServiceMethod(string)
// Meta returns the metadata.
// SUGGEST: urlencoded string max len ≤ 65535!
Meta() *utils.Args
}
// Body is an operation interface of optional message fields.
// SUGGEST: For features complete, the protocol interface should support it.
Body interface {
// BodyCodec returns the body codec type id.
BodyCodec() byte
// SetBodyCodec sets the body codec type id.
SetBodyCodec(bodyCodec byte)
// Body returns the body object.
Body() interface{}
// SetBody sets the body object.
SetBody(body interface{})
// SetNewBody resets the function of geting body.
// NOTE: NewBodyFunc is only for reading form connection;
SetNewBody(NewBodyFunc)
// MarshalBody returns the encoding of body.
// NOTE: when the body is a stream of bytes, no marshalling is done.
MarshalBody() ([]byte, error)
// UnmarshalBody unmarshals the encoded data to the body.
// NOTE:
// seq, mtype, uri must be setted already;
// if body=nil, try to use newBodyFunc to create a new one;
// when the body is a stream of bytes, no unmarshalling is done.
UnmarshalBody(bodyBytes []byte) error
}
// NewBodyFunc creates a new body by header,
// and only for reading form connection.
NewBodyFunc func(Header) interface{}
)
// in .../teleport/xfer package
type (
// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
// NOTE: the length can not be bigger than 255!
XferPipe struct {
filters []XferFilter
}
// XferFilter handles byte stream of message when transfer.
XferFilter interface {
ID() byte
OnPack([]byte) ([]byte, error)
OnUnpack([]byte) ([]byte, error)
}
)
Protocol
You can customize your own communication protocol by implementing the interface:
type (
// Proto pack/unpack protocol scheme of socket message.
// NOTE: Implementation specifications for Message interface should be complied with.
Proto interface {
// Version returns the protocol's id and name.
Version() (byte, string)
// Pack writes the Message into the connection.
// NOTE: Make sure to write only once or there will be package contamination!
Pack(Message) error
// Unpack reads bytes from the connection to the Message.
// NOTE: Concurrent unsafe!
Unpack(Message) error
}
// IOWithReadBuffer implements buffered I/O with buffered reader.
IOWithReadBuffer interface {
io.ReadWriter
}
// ProtoFunc function used to create a custom Proto interface.
ProtoFunc func(IOWithReadBuffer) Proto
)
Next, you can specify the communication protocol in the following ways:
func SetDefaultProtoFunc(ProtoFunc)
func GetSocket(net.Conn, ...ProtoFunc) Socket
func NewSocket(net.Conn, ...ProtoFunc) Socket
Default protocol RawProto
(Big Endian):
{4 bytes message length}
{1 byte protocol version} # 6
{1 byte transfer pipe length}
{transfer pipe IDs}
# The following is handled data by transfer pipe
{1 bytes sequence length}
{sequence (HEX 36 string of int32)}
{1 byte message type} # e.g. CALL:1; REPLY:2; PUSH:3
{1 bytes service method length}
{service method}
{2 bytes status length}
{status(urlencoded)}
{2 bytes metadata length}
{metadata(urlencoded)}
{1 byte body codec id}
{body}
Optimize
-
SetMessageSizeLimit sets max message size. If maxSize<=0, set it to max uint32.
func SetMessageSizeLimit(maxMessageSize uint32)
-
SetKeepAlive sets whether the operating system should send keepalive messages on the connection.
func SetKeepAlive(keepalive bool)
-
SetKeepAlivePeriod sets period between keep alives.
func SetKeepAlivePeriod(d time.Duration)
-
SetNoDelay controls whether the operating system should delay message transmission in hopes of sending fewer messages (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.
func SetNoDelay(_noDelay bool)
-
SetReadBuffer sets the size of the operating system's receive buffer associated with the connection.
func SetReadBuffer(bytes int)
-
SetWriteBuffer sets the size of the operating system's transmit buffer associated with the connection.
func SetWriteBuffer(bytes int)
Documentation ¶
Overview ¶
Package socket provides a concise, powerful and high-performance TCP.
Copyright 2017 HenryLee. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- func MessageSizeLimit() uint32
- func PutMessage(m Message)
- func ReadBuffer() (bytes int, isDefault bool)
- func SetDefaultProtoFunc(protoFunc ProtoFunc)
- func SetKeepAlive(keepalive bool)
- func SetKeepAlivePeriod(d time.Duration)
- func SetMessageSizeLimit(maxMessageSize uint32)
- func SetNoDelay(_noDelay bool)
- func SetReadBuffer(bytes int)
- func SetWriteBuffer(bytes int)
- func TryOptimize(conn net.Conn)
- func WriteBuffer() (bytes int, isDefault bool)
- type Body
- type Header
- type IOWithReadBuffer
- type Message
- type MessageSetting
- func WithAddMeta(key, value string) MessageSetting
- func WithBody(body interface{}) MessageSetting
- func WithBodyCodec(bodyCodec byte) MessageSetting
- func WithContext(ctx context.Context) MessageSetting
- func WithDelMeta(key string) MessageSetting
- func WithNewBody(newBodyFunc NewBodyFunc) MessageSetting
- func WithNothing() MessageSetting
- func WithServiceMethod(serviceMethod string) MessageSetting
- func WithSetMeta(key, value string) MessageSetting
- func WithStatus(stat *Status) MessageSetting
- func WithXferPipe(filterID ...byte) MessageSetting
- type NewBodyFunc
- type Proto
- type ProtoFunc
- type Socket
- type SocketHub
- func (sh *SocketHub) ChangeID(newID string, socket Socket)
- func (sh *SocketHub) Delete(id string)
- func (sh *SocketHub) Get(id string) (Socket, bool)
- func (sh *SocketHub) Len() int
- func (sh *SocketHub) Random() (Socket, bool)
- func (sh *SocketHub) Range(f func(Socket) bool)
- func (sh *SocketHub) Set(socket Socket)
- type Status
- type UnsafeSocket
Constants ¶
This section is empty.
Variables ¶
var ( // NewStatus creates a message status with code, msg and cause. // NOTE: // code=0 means no error // TYPE: // func NewStatus(code int32, msg string, cause interface{}) *Status NewStatus = status.New // NewStatusWithStack creates a message status with code, msg and cause and stack. // NOTE: // code=0 means no error // TYPE: // func NewStatusWithStack(code int32, msg string, cause interface{}) *Status NewStatusWithStack = status.NewWithStack )
var ( // ErrExceedMessageSizeLimit error ErrExceedMessageSizeLimit = errors.New("Size of package exceeds limit") )
var ErrProactivelyCloseSocket = errors.New("socket is closed proactively")
ErrProactivelyCloseSocket proactively close the socket error.
var RawProtoFunc = func(rw IOWithReadBuffer) Proto {
return &rawProto{
id: 6,
name: "raw",
r: rw,
w: rw,
}
}
RawProtoFunc is creation function of fast socket protocol. NOTE: it is the default protocol.
Functions ¶
func MessageSizeLimit ¶
func MessageSizeLimit() uint32
MessageSizeLimit gets the message size upper limit of reading.
func ReadBuffer ¶
ReadBuffer returns the size of the operating system's receive buffer associated with the connection. NOTE: if using the system default value, bytes=-1 and isDefault=true.
func SetDefaultProtoFunc ¶
func SetDefaultProtoFunc(protoFunc ProtoFunc)
SetDefaultProtoFunc sets the default builder of socket communication protocol
func SetKeepAlive ¶
func SetKeepAlive(keepalive bool)
SetKeepAlive sets whether the operating system should send keepalive messages on the connection. NOTE: If have not called the function, the system defaults are used.
func SetKeepAlivePeriod ¶
SetKeepAlivePeriod sets period between keep alives. NOTE: if d<0, don't change the value.
func SetMessageSizeLimit ¶
func SetMessageSizeLimit(maxMessageSize uint32)
SetMessageSizeLimit sets max message size. If maxSize<=0, set it to max uint32.
func SetNoDelay ¶
func SetNoDelay(_noDelay bool)
SetNoDelay controls whether the operating system should delay message transmission in hopes of sending fewer messages (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.
func SetReadBuffer ¶
func SetReadBuffer(bytes int)
SetReadBuffer sets the size of the operating system's receive buffer associated with the connection. NOTE: if bytes<0, don't change the value.
func SetWriteBuffer ¶
func SetWriteBuffer(bytes int)
SetWriteBuffer sets the size of the operating system's transmit buffer associated with the connection. NOTE: if bytes<0, don't change the value.
func TryOptimize ¶
TryOptimize attempts to set KeepAlive, KeepAlivePeriod, ReadBuffer, WriteBuffer or NoDelay.
func WriteBuffer ¶
WriteBuffer returns the size of the operating system's transmit buffer associated with the connection. NOTE: if using the system default value, bytes=-1 and isDefault=true.
Types ¶
type Body ¶
type Body interface { // BodyCodec returns the body codec type id. BodyCodec() byte // SetBodyCodec sets the body codec type id. SetBodyCodec(bodyCodec byte) // Body returns the body object. Body() interface{} // SetBody sets the body object. SetBody(body interface{}) // SetNewBody resets the function of geting body. // NOTE: NewBodyFunc is only for reading form connection; SetNewBody(NewBodyFunc) // MarshalBody returns the encoding of body. // NOTE: when the body is a stream of bytes, no marshalling is done. MarshalBody() ([]byte, error) // UnmarshalBody unmarshals the encoded data to the body. // NOTE: // seq, mtype, uri must be setted already; // if body=nil, try to use newBodyFunc to create a new one; // when the body is a stream of bytes, no unmarshalling is done. UnmarshalBody(bodyBytes []byte) error }
Body is an operation interface of optional message fields. SUGGEST: For features complete, the protocol interface should support it.
type Header ¶
type Header interface { // Seq returns the message sequence. Seq() int32 // SetSeq sets the message sequence. SetSeq(int32) // Mtype returns the message type, such as CALL, REPLY, PUSH. Mtype() byte // Mtype sets the message type, such as CALL, REPLY, PUSH. SetMtype(byte) // ServiceMethod returns the serviec method. // SUGGEST: max len ≤ 255! ServiceMethod() string // SetServiceMethod sets the serviec method. // SUGGEST: max len ≤ 255! SetServiceMethod(string) // StatusOK returns the message status is OK or not. StatusOK() bool // Status returns the message status with code, msg, cause or stack. // NOTE: // If it is nil and autoInit = true, assign a new object; // Should use StatusOK to judge whether it is ok. Status(autoInit ...bool) *Status // SetStatus the message status. SetStatus(*Status) // Meta returns the metadata. // SUGGEST: urlencoded string max len ≤ 65535! Meta() *utils.Args }
Header is an operation interface of required message fields. NOTE: Must be supported by Proto interface.
type IOWithReadBuffer ¶
type IOWithReadBuffer interface { io.ReadWriter }
IOWithReadBuffer implements buffered I/O with buffered reader.
type Message ¶
type Message interface { // Reset resets and returns itself. Reset(settings ...MessageSetting) Message // Header is an operation interface of required message fields. // NOTE: Must be supported by Proto interface. Header // Body is an operation interface of optional message fields. // SUGGEST: For features complete, the protocol interface should support it. Body // XferPipe transfer filter pipe, handlers from outer-most to inner-most. // SUGGEST: The length can not be bigger than 255! XferPipe() *xfer.XferPipe // Size returns the size of message. // SUGGEST: For better statistics, Proto interfaces should support it. Size() uint32 // SetSize sets the size of message. // If the size is too big, returns error. // SUGGEST: For better statistics, Proto interfaces should support it. SetSize(size uint32) error // Context returns the message handling context. Context() context.Context // String returns printing message information. String() string // AsHeader converts it to Header interface. AsHeader() Header // AsBody converts it to Body interface. AsBody() Body // contains filtered or unexported methods }
Message a socket message interface.
func GetMessage ¶
func GetMessage(settings ...MessageSetting) Message
GetMessage gets a *message form message pool. NOTE:
newBodyFunc is only for reading form connection; settings are only for writing to connection.
func NewMessage ¶
func NewMessage(settings ...MessageSetting) Message
NewMessage creates a new *message. NOTE:
NewBody is only for reading form connection; settings are only for writing to connection.
type MessageSetting ¶
type MessageSetting func(Message)
MessageSetting is a pipe function type for setting message, and only for writing to connection.
func WithAddMeta ¶
func WithAddMeta(key, value string) MessageSetting
WithAddMeta adds 'key=value' metadata argument. Multiple values for the same key may be added. SUGGEST: urlencoded string max len ≤ 65535!
func WithBodyCodec ¶
func WithBodyCodec(bodyCodec byte) MessageSetting
WithBodyCodec sets the body codec.
func WithContext ¶
func WithContext(ctx context.Context) MessageSetting
WithContext sets the message handling context.
func WithDelMeta ¶
func WithDelMeta(key string) MessageSetting
WithDelMeta deletes metadata argument.
func WithNewBody ¶
func WithNewBody(newBodyFunc NewBodyFunc) MessageSetting
WithNewBody resets the function of geting body.
NOTE: newBodyFunc is only for reading form connection.
func WithServiceMethod ¶
func WithServiceMethod(serviceMethod string) MessageSetting
WithServiceMethod sets the message service method. SUGGEST: max len ≤ 255!
func WithSetMeta ¶
func WithSetMeta(key, value string) MessageSetting
WithSetMeta sets 'key=value' metadata argument. SUGGEST: urlencoded string max len ≤ 65535!
func WithXferPipe ¶
func WithXferPipe(filterID ...byte) MessageSetting
WithXferPipe sets transfer filter pipe. NOTE: Panic if the filterID is not registered. SUGGEST: The length can not be bigger than 255!
type NewBodyFunc ¶
type NewBodyFunc func(Header) interface{}
NewBodyFunc creates a new body by header, and only for reading form connection.
type Proto ¶
type Proto interface { // Version returns the protocol's id and name. Version() (byte, string) // Pack writes the Message into the connection. // NOTE: Make sure to write only once or there will be package contamination! Pack(Message) error // Unpack reads bytes from the connection to the Message. // NOTE: Concurrent unsafe! Unpack(Message) error }
Proto pack/unpack protocol scheme of socket message. NOTE: Implementation specifications for Message interface should be complied with.
type ProtoFunc ¶
type ProtoFunc func(IOWithReadBuffer) Proto
ProtoFunc function used to create a custom Proto interface.
func DefaultProtoFunc ¶
func DefaultProtoFunc() ProtoFunc
DefaultProtoFunc gets the default builder of socket communication protocol
type Socket ¶
type Socket interface { // ControlFD invokes f on the underlying connection's file // descriptor or handle. // The file descriptor fd is guaranteed to remain valid while // f executes but not after f returns. ControlFD(f func(fd uintptr)) error // LocalAddr returns the local network address. LocalAddr() net.Addr // RemoteAddr returns the remote network address. RemoteAddr() net.Addr // SetDeadline sets the read and write deadlines associated // with the connection. It is equivalent to calling both // SetReadDeadline and SetWriteDeadline. // // A deadline is an absolute time after which I/O operations // fail with a timeout (see type Error) instead of // blocking. The deadline applies to all future and pending // I/O, not just the immediately following call to Read or // Write. After a deadline has been exceeded, the connection // can be refreshed by setting a deadline in the future. // // An idle timeout can be implemented by repeatedly extending // the deadline after successful Read or Write calls. // // A zero value for t means I/O operations will not time out. SetDeadline(t time.Time) error // SetReadDeadline sets the deadline for future Read calls // and any currently-blocked Read call. // A zero value for t means Read will not time out. SetReadDeadline(t time.Time) error // SetWriteDeadline sets the deadline for future Write calls // and any currently-blocked Write call. // Even if write times out, it may return n > 0, indicating that // some of the data was successfully written. // A zero value for t means Write will not time out. SetWriteDeadline(t time.Time) error // WriteMessage writes header and body to the connection. // NOTE: must be safe for concurrent use by multiple goroutines. WriteMessage(message Message) error // ReadMessage reads header and body from the connection. // NOTE: must be safe for concurrent use by multiple goroutines. ReadMessage(message Message) error // Read reads data from the connection. // Read can be made to time out and return an Error with Timeout() == true // after a fixed time limit; see SetDeadline and SetReadDeadline. Read(b []byte) (n int, err error) // Write writes data to the connection. // Write can be made to time out and return an Error with Timeout() == true // after a fixed time limit; see SetDeadline and SetWriteDeadline. Write(b []byte) (n int, err error) // Close closes the connection socket. // Any blocked Read or Write operations will be unblocked and return errors. Close() error // Swap returns custom data swap of the socket. // NOTE: // If newSwap is not empty, reset the swap and return it. Swap(newSwap ...goutil.Map) goutil.Map // SwapLen returns the amount of custom data of the socket. SwapLen() int // ID returns the socket id. ID() string // SetID sets the socket id. SetID(string) // Reset reset net.Conn and ProtoFunc. Reset(netConn net.Conn, protoFunc ...ProtoFunc) // Raw returns the raw net.Conn Raw() net.Conn }
Socket is a generic stream-oriented network connection. NOTE:
Multiple goroutines may invoke methods on a Socket simultaneously.
type SocketHub ¶
type SocketHub struct {
// contains filtered or unexported fields
}
SocketHub sockets hub
func (*SocketHub) ChangeID ¶
ChangeID changes the socket id. NOTE: if the old id is remoteAddr, won't delete the index from socketHub.
func (*SocketHub) Get ¶
Get gets Socket by id. If second returned arg is false, mean the Socket is not found.
func (*SocketHub) Len ¶
Len returns the length of the socket hub. NOTE: the count implemented using sync.Map may be inaccurate.
func (*SocketHub) Random ¶
Random gets a Socket randomly. If third returned arg is false, mean no Socket is exist.
type UnsafeSocket ¶
type UnsafeSocket interface { Socket // RawLocked returns the raw net.Conn, // can be called in ProtoFunc. // NOTE: // Make sure the external is locked before calling RawLocked() net.Conn }
UnsafeSocket has more unsafe methods than Socket interface.