socket

package
v5.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2018 License: Apache-2.0, Apache-2.0 Imports: 20 Imported by: 126

README

Socket

A concise, powerful and high-performance connection socket.

Feature

  • The server and client are peer-to-peer interfaces
  • Support set the size of socket I/O buffer
  • Support custom communication protocol
  • Support custom transfer filter pipe (Such as gzip, encrypt, verify...)
  • Message contains both Header and Body
  • Supports custom encoding types, e.g JSON Protobuf
  • Header contains the status code and its description text
  • Each socket is assigned an id
  • Provides Socket Hub, Socket pool and Message stack
  • Support setting the size of the reading message (if exceed disconnect it)
  • Provide an operating interface to control the connection file descriptor

Benchmark

Test Case

  • A server and a client process, running on the same machine
  • CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
  • Memory: 16G
  • OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
  • Go: 1.9.2
  • Message size: 581 bytes
  • Message codec: protobuf
  • Sent total 1000000 messages

Test Results

  • teleport/socket
client concurrency mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 0 0 14 0 225682
500 2 1 24 0 212630
1000 4 3 51 0 180733
2000 8 6 64 0 183351
5000 21 18 651 0 133886

test code

  • Profile torch of teleport/socket

tp_socket_profile_torch

svg file

  • Heap torch of teleport/socket

tp_socket_heap_torch

svg file

Example

server.go
package main

import (
    "log"
    "net"

    "github.com/henrylee2cn/teleport/socket"
    "github.com/henrylee2cn/teleport/socket/example/pb"
)

//go:generate go build $GOFILE

func main() {
    socket.SetNoDelay(false)
    socket.SetMessageSizeLimit(512)
    lis, err := net.Listen("tcp", "0.0.0.0:8000")
    if err != nil {
        log.Fatalf("[SVR] listen err: %v", err)
    }
    log.Printf("listen tcp 0.0.0.0:8000")
    for {
        conn, err := lis.Accept()
        if err != nil {
            log.Fatalf("[SVR] accept err: %v", err)
        }
        go func(s socket.Socket) {
            log.Printf("accept %s", s.ID())
            defer s.Close()
            var pbTest = new(pb.PbTest)
            for {
                // read request
                var message = socket.GetMessage(socket.WithNewBody(
                    func(header socket.Header) interface{} {
                        *pbTest = pb.PbTest{}
                        return pbTest
                    }),
                )
                err = s.ReadMessage(message)
                if err != nil {
                    log.Printf("[SVR] read request err: %v", err)
                    return
                }

                // write response
                pbTest.A = pbTest.A + pbTest.B
                pbTest.B = pbTest.A - pbTest.B*2
                message.SetBody(pbTest)

                err = s.WriteMessage(message)
                if err != nil {
                    log.Printf("[SVR] write response err: %v", err)
                } else {
                    log.Printf("[SVR] write response: %v", message)
                }
                socket.PutMessage(message)
            }
        }(socket.GetSocket(conn))
    }
}
client.go
package main

import (
    "log"
    "net"

    "github.com/henrylee2cn/teleport/codec"
    "github.com/henrylee2cn/teleport/socket"
    "github.com/henrylee2cn/teleport/socket/example/pb"
)

//go:generate go build $GOFILE

func main() {
    conn, err := net.Dial("tcp", "127.0.0.1:8000")
    if err != nil {
        log.Fatalf("[CLI] dial err: %v", err)
    }
    s := socket.GetSocket(conn)
    defer s.Close()
    var message = socket.GetMessage()
    defer socket.PutMessage(message)
    for i := int32(0); i < 1; i++ {
        // write request
        message.Reset()
        message.SetMtype(0)
        message.SetBodyCodec(codec.ID_JSON)
        message.SetSeq(i)
        message.SetServiceMethod("/a/b")
        message.SetBody(&pb.PbTest{A: 10, B: 2})
        err = s.WriteMessage(message)
        if err != nil {
            log.Printf("[CLI] write request err: %v", err)
            continue
        }
        log.Printf("[CLI] write request: %v", message)

        // read response
        message.Reset(socket.WithNewBody(
            func(header socket.Header) interface{} {
                return new(pb.PbTest)
            }),
        )
        err = s.ReadMessage(message)
        if err != nil {
            log.Printf("[CLI] read response err: %v", err)
        } else {
            log.Printf("[CLI] read response: %v", message)
        }
    }
}

More Examples

Keyworks

  • Message:* The corresponding structure of the data package
  • Proto: The protocol interface of message pack/unpack
  • Codec: Serialization interface for Body
  • XferPipe: A series of pipelines to handle message data before transfer
  • XferFilter: A interface to handle message data before transfer

Message

The contents of every one message:

// in .../teleport/socket package
type (
    // Message a socket message interface.
    Message interface {
        // Header is an operation interface of required message fields.
        // NOTE: Must be supported by Proto interface.
        Header

        // Body is an operation interface of optional message fields.
        // SUGGEST: For features complete, the protocol interface should support it.
        Body

        // XferPipe transfer filter pipe, handlers from outer-most to inner-most.
        // SUGGEST: The length can not be bigger than 255!
        XferPipe() *xfer.XferPipe

        // Size returns the size of message.
        // SUGGEST: For better statistics, Proto interfaces should support it.
        Size() uint32

        // SetSize sets the size of message.
        // If the size is too big, returns error.
        // SUGGEST: For better statistics, Proto interfaces should support it.
        SetSize(size uint32) error

        // Reset resets itself.
        Reset(settings ...MessageSetting)

        // Context returns the message handling context.
        Context() context.Context
        
        // String returns printing message information.
        String() string
    }

    // Header is an operation interface of required message fields.
    // NOTE: Must be supported by Proto interface.
    Header interface {
        // Seq returns the message sequence.
        Seq() int32
        // SetSeq sets the message sequence.
        SetSeq(int32)
        // Mtype returns the message type, such as CALL, REPLY, PUSH.
        Mtype() byte
        // Mtype sets the message type, such as CALL, REPLY, PUSH.
        SetMtype(byte)
        // ServiceMethod returns the serviec method.
        // SUGGEST: max len ≤ 255!
        ServiceMethod() string
        // SetServiceMethod sets the serviec method.
        // SUGGEST: max len ≤ 255!
        SetServiceMethod(string)
        // Meta returns the metadata.
        // SUGGEST: urlencoded string max len ≤ 65535!
        Meta() *utils.Args
    }

    // Body is an operation interface of optional message fields.
    // SUGGEST: For features complete, the protocol interface should support it.
    Body interface {
        // BodyCodec returns the body codec type id.
        BodyCodec() byte
        // SetBodyCodec sets the body codec type id.
        SetBodyCodec(bodyCodec byte)
        // Body returns the body object.
        Body() interface{}
        // SetBody sets the body object.
        SetBody(body interface{})
        // SetNewBody resets the function of geting body.
        //  NOTE: NewBodyFunc is only for reading form connection;
        SetNewBody(NewBodyFunc)
        // MarshalBody returns the encoding of body.
        // NOTE: when the body is a stream of bytes, no marshalling is done.
        MarshalBody() ([]byte, error)
        // UnmarshalBody unmarshals the encoded data to the body.
        // NOTE:
        //  seq, mtype, uri must be setted already;
        //  if body=nil, try to use newBodyFunc to create a new one;
        //  when the body is a stream of bytes, no unmarshalling is done.
        UnmarshalBody(bodyBytes []byte) error
    }

    // NewBodyFunc creates a new body by header,
    // and only for reading form connection.
    NewBodyFunc func(Header) interface{}
)

// in .../teleport/xfer package
type (
    // XferPipe transfer filter pipe, handlers from outer-most to inner-most.
    // NOTE: the length can not be bigger than 255!
    XferPipe struct {
        filters []XferFilter
    }
    // XferFilter handles byte stream of message when transfer.
    XferFilter interface {
        ID() byte
        OnPack([]byte) ([]byte, error)
        OnUnpack([]byte) ([]byte, error)
    }
)

Protocol

You can customize your own communication protocol by implementing the interface:

type (
    // Proto pack/unpack protocol scheme of socket message.
    // NOTE: Implementation specifications for Message interface should be complied with.
    Proto interface {
        // Version returns the protocol's id and name.
        Version() (byte, string)
        // Pack writes the Message into the connection.
        // NOTE: Make sure to write only once or there will be package contamination!
        Pack(Message) error
        // Unpack reads bytes from the connection to the Message.
        // NOTE: Concurrent unsafe!
        Unpack(Message) error
    }
    // IOWithReadBuffer implements buffered I/O with buffered reader.
    IOWithReadBuffer interface {
        io.ReadWriter
    }
    // ProtoFunc function used to create a custom Proto interface.
    ProtoFunc func(IOWithReadBuffer) Proto
)

Next, you can specify the communication protocol in the following ways:

func SetDefaultProtoFunc(ProtoFunc)
func GetSocket(net.Conn, ...ProtoFunc) Socket
func NewSocket(net.Conn, ...ProtoFunc) Socket

Default protocol RawProto(Big Endian):

# raw protocol format(Big Endian):
{4 bytes message length}
{1 byte protocol version}
{1 byte transfer pipe length}
{transfer pipe IDs}
# The following is handled data by transfer pipe
{1 bytes sequence length}
{sequence}
{1 byte message type} # e.g. CALL:1; REPLY:2; PUSH:3
{1 bytes service method length}
{service method}
{2 bytes metadata length}
{metadata(urlencoded)}
{1 byte body codec id}
{body}

Optimize

  • SetMessageSizeLimit sets max message size. If maxSize<=0, set it to max uint32.

    func SetMessageSizeLimit(maxMessageSize uint32)
    
  • SetKeepAlive sets whether the operating system should send keepalive messages on the connection.

    func SetKeepAlive(keepalive bool)
    
  • SetKeepAlivePeriod sets period between keep alives.

    func SetKeepAlivePeriod(d time.Duration)
    
  • SetNoDelay controls whether the operating system should delay message transmission in hopes of sending fewer messages (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.

    func SetNoDelay(_noDelay bool)
    
  • SetReadBuffer sets the size of the operating system's receive buffer associated with the connection.

    func SetReadBuffer(bytes int)
    
  • SetWriteBuffer sets the size of the operating system's transmit buffer associated with the connection.

    func SetWriteBuffer(bytes int)
    

Documentation

Overview

Package socket provides a concise, powerful and high-performance TCP.

Copyright 2017 HenryLee. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrExceedMessageSizeLimit error
	ErrExceedMessageSizeLimit = errors.New("Size of package exceeds limit")
)
View Source
var ErrProactivelyCloseSocket = errors.New("socket is closed proactively")

ErrProactivelyCloseSocket proactively close the socket error.

View Source
var NewRawProtoFunc = func(rw IOWithReadBuffer) Proto {
	return &rawProto{
		id:   'r',
		name: "raw",
		r:    rw,
		w:    rw,
	}
}

NewRawProtoFunc is creation function of fast socket protocol. NOTE: it is the default protocol.

Functions

func MessageSizeLimit

func MessageSizeLimit() uint32

MessageSizeLimit gets the message size upper limit of reading.

func PutMessage

func PutMessage(m Message)

PutMessage puts a *message to message pool.

func ReadBuffer

func ReadBuffer() (bytes int, isDefault bool)

ReadBuffer returns the size of the operating system's receive buffer associated with the connection. NOTE: if using the system default value, bytes=-1 and isDefault=true.

func SetDefaultProtoFunc

func SetDefaultProtoFunc(protoFunc ProtoFunc)

SetDefaultProtoFunc sets the default builder of socket communication protocol

func SetKeepAlive

func SetKeepAlive(keepalive bool)

SetKeepAlive sets whether the operating system should send keepalive messages on the connection. NOTE: If have not called the function, the system defaults are used.

func SetKeepAlivePeriod

func SetKeepAlivePeriod(d time.Duration)

SetKeepAlivePeriod sets period between keep alives. NOTE: if d<0, don't change the value.

func SetMessageSizeLimit

func SetMessageSizeLimit(maxMessageSize uint32)

SetMessageSizeLimit sets max message size. If maxSize<=0, set it to max uint32.

func SetNoDelay

func SetNoDelay(_noDelay bool)

SetNoDelay controls whether the operating system should delay message transmission in hopes of sending fewer messages (Nagle's algorithm). The default is true (no delay), meaning that data is sent as soon as possible after a Write.

func SetReadBuffer

func SetReadBuffer(bytes int)

SetReadBuffer sets the size of the operating system's receive buffer associated with the connection. NOTE: if bytes<0, don't change the value.

func SetWriteBuffer

func SetWriteBuffer(bytes int)

SetWriteBuffer sets the size of the operating system's transmit buffer associated with the connection. NOTE: if bytes<0, don't change the value.

func WriteBuffer

func WriteBuffer() (bytes int, isDefault bool)

WriteBuffer returns the size of the operating system's transmit buffer associated with the connection. NOTE: if using the system default value, bytes=-1 and isDefault=true.

Types

type Body

type Body interface {
	// BodyCodec returns the body codec type id.
	BodyCodec() byte
	// SetBodyCodec sets the body codec type id.
	SetBodyCodec(bodyCodec byte)
	// Body returns the body object.
	Body() interface{}
	// SetBody sets the body object.
	SetBody(body interface{})
	// SetNewBody resets the function of geting body.
	//  NOTE: NewBodyFunc is only for reading form connection;
	SetNewBody(NewBodyFunc)
	// MarshalBody returns the encoding of body.
	// NOTE: when the body is a stream of bytes, no marshalling is done.
	MarshalBody() ([]byte, error)
	// UnmarshalBody unmarshals the encoded data to the body.
	// NOTE:
	//  seq, mtype, uri must be setted already;
	//  if body=nil, try to use newBodyFunc to create a new one;
	//  when the body is a stream of bytes, no unmarshalling is done.
	UnmarshalBody(bodyBytes []byte) error
	// contains filtered or unexported methods
}

Body is an operation interface of optional message fields. SUGGEST: For features complete, the protocol interface should support it.

type Header interface {
	// Seq returns the message sequence.
	Seq() int32
	// SetSeq sets the message sequence.
	SetSeq(int32)
	// Mtype returns the message type, such as CALL, REPLY, PUSH.
	Mtype() byte
	// Mtype sets the message type, such as CALL, REPLY, PUSH.
	SetMtype(byte)
	// ServiceMethod returns the serviec method.
	// SUGGEST: max len ≤ 255!
	ServiceMethod() string
	// SetServiceMethod sets the serviec method.
	// SUGGEST: max len ≤ 255!
	SetServiceMethod(string)
	// Meta returns the metadata.
	// SUGGEST: urlencoded string max len ≤ 65535!
	Meta() *utils.Args
}

Header is an operation interface of required message fields. NOTE: Must be supported by Proto interface.

type IOWithReadBuffer

type IOWithReadBuffer interface {
	io.ReadWriter
}

IOWithReadBuffer implements buffered I/O with buffered reader.

type Message

type Message interface {
	// Header is an operation interface of required message fields.
	// NOTE: Must be supported by Proto interface.
	Header

	// Body is an operation interface of optional message fields.
	// SUGGEST: For features complete, the protocol interface should support it.
	Body

	// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
	// SUGGEST: The length can not be bigger than 255!
	XferPipe() *xfer.XferPipe

	// Size returns the size of message.
	// SUGGEST: For better statistics, Proto interfaces should support it.
	Size() uint32

	// SetSize sets the size of message.
	// If the size is too big, returns error.
	// SUGGEST: For better statistics, Proto interfaces should support it.
	SetSize(size uint32) error

	// Reset resets itself.
	Reset(settings ...MessageSetting)

	// Context returns the message handling context.
	Context() context.Context

	// String returns printing message information.
	String() string
}

Message a socket message interface.

func GetMessage

func GetMessage(settings ...MessageSetting) Message

GetMessage gets a *message form message pool. NOTE:

newBodyFunc is only for reading form connection;
settings are only for writing to connection.

func NewMessage

func NewMessage(settings ...MessageSetting) Message

NewMessage creates a new *message. NOTE:

NewBody is only for reading form connection;
settings are only for writing to connection.

type MessageSetting

type MessageSetting func(Message)

MessageSetting is a pipe function type for setting message, and only for writing to connection.

func WithAddMeta

func WithAddMeta(key, value string) MessageSetting

WithAddMeta adds 'key=value' metadata argument. Multiple values for the same key may be added. SUGGEST: urlencoded string max len ≤ 65535!

func WithBody

func WithBody(body interface{}) MessageSetting

WithBody sets the body object.

func WithBodyCodec

func WithBodyCodec(bodyCodec byte) MessageSetting

WithBodyCodec sets the body codec.

func WithContext

func WithContext(ctx context.Context) MessageSetting

WithContext sets the message handling context.

func WithMtype

func WithMtype(mtype byte) MessageSetting

WithMtype sets the message type.

func WithNewBody

func WithNewBody(newBodyFunc NewBodyFunc) MessageSetting

WithNewBody resets the function of geting body.

NOTE: newBodyFunc is only for reading form connection.

func WithNothing

func WithNothing() MessageSetting

WithNothing nothing to do.

func WithServiceMethod

func WithServiceMethod(serviceMethod string) MessageSetting

WithServiceMethod sets the message service method. SUGGEST: max len ≤ 255!

func WithSetMeta

func WithSetMeta(key, value string) MessageSetting

WithSetMeta sets 'key=value' metadata argument. SUGGEST: urlencoded string max len ≤ 65535!

func WithXferPipe

func WithXferPipe(filterID ...byte) MessageSetting

WithXferPipe sets transfer filter pipe. NOTE: Panic if the filterID is not registered. SUGGEST: The length can not be bigger than 255!

type NewBodyFunc

type NewBodyFunc func(Header) interface{}

NewBodyFunc creates a new body by header, and only for reading form connection.

type Proto

type Proto interface {
	// Version returns the protocol's id and name.
	Version() (byte, string)
	// Pack writes the Message into the connection.
	// NOTE: Make sure to write only once or there will be package contamination!
	Pack(Message) error
	// Unpack reads bytes from the connection to the Message.
	// NOTE: Concurrent unsafe!
	Unpack(Message) error
}

Proto pack/unpack protocol scheme of socket message. NOTE: Implementation specifications for Message interface should be complied with.

type ProtoFunc

type ProtoFunc func(IOWithReadBuffer) Proto

ProtoFunc function used to create a custom Proto interface.

func DefaultProtoFunc

func DefaultProtoFunc() ProtoFunc

DefaultProtoFunc gets the default builder of socket communication protocol

type RawConn

type RawConn interface {
	// Raw returns the raw net.Conn
	Raw() net.Conn
}

RawConn raw conn

type Socket

type Socket interface {
	// ControlFD invokes f on the underlying connection's file
	// descriptor or handle.
	// The file descriptor fd is guaranteed to remain valid while
	// f executes but not after f returns.
	ControlFD(f func(fd uintptr)) error
	// LocalAddr returns the local network address.
	LocalAddr() net.Addr
	// RemoteAddr returns the remote network address.
	RemoteAddr() net.Addr
	// SetDeadline sets the read and write deadlines associated
	// with the connection. It is equivalent to calling both
	// SetReadDeadline and SetWriteDeadline.
	//
	// A deadline is an absolute time after which I/O operations
	// fail with a timeout (see type Error) instead of
	// blocking. The deadline applies to all future and pending
	// I/O, not just the immediately following call to Read or
	// Write. After a deadline has been exceeded, the connection
	// can be refreshed by setting a deadline in the future.
	//
	// An idle timeout can be implemented by repeatedly extending
	// the deadline after successful Read or Write calls.
	//
	// A zero value for t means I/O operations will not time out.
	SetDeadline(t time.Time) error
	// SetReadDeadline sets the deadline for future Read calls
	// and any currently-blocked Read call.
	// A zero value for t means Read will not time out.
	SetReadDeadline(t time.Time) error
	// SetWriteDeadline sets the deadline for future Write calls
	// and any currently-blocked Write call.
	// Even if write times out, it may return n > 0, indicating that
	// some of the data was successfully written.
	// A zero value for t means Write will not time out.
	SetWriteDeadline(t time.Time) error
	// WriteMessage writes header and body to the connection.
	// NOTE: must be safe for concurrent use by multiple goroutines.
	WriteMessage(message Message) error
	// ReadMessage reads header and body from the connection.
	// NOTE: must be safe for concurrent use by multiple goroutines.
	ReadMessage(message Message) error
	// Read reads data from the connection.
	// Read can be made to time out and return an Error with Timeout() == true
	// after a fixed time limit; see SetDeadline and SetReadDeadline.
	Read(b []byte) (n int, err error)
	// Write writes data to the connection.
	// Write can be made to time out and return an Error with Timeout() == true
	// after a fixed time limit; see SetDeadline and SetWriteDeadline.
	Write(b []byte) (n int, err error)
	// Close closes the connection socket.
	// Any blocked Read or Write operations will be unblocked and return errors.
	Close() error
	// Swap returns custom data swap of the socket.
	Swap() goutil.Map
	// SwapLen returns the amount of custom data of the socket.
	SwapLen() int
	// ID returns the socket id.
	ID() string
	// SetID sets the socket id.
	SetID(string)
	// Reset reset net.Conn and ProtoFunc.
	Reset(netConn net.Conn, protoFunc ...ProtoFunc)
}

Socket is a generic stream-oriented network connection.

Multiple goroutines may invoke methods on a Socket simultaneously.

func GetSocket

func GetSocket(c net.Conn, protoFunc ...ProtoFunc) Socket

GetSocket gets a Socket from pool, and reset it.

func NewSocket

func NewSocket(c net.Conn, protoFunc ...ProtoFunc) Socket

NewSocket wraps a net.Conn as a Socket.

type SocketHub

type SocketHub struct {
	// contains filtered or unexported fields
}

SocketHub sockets hub

func NewSocketHub

func NewSocketHub() *SocketHub

NewSocketHub creates a new sockets hub.

func (*SocketHub) ChangeID

func (sh *SocketHub) ChangeID(newID string, socket Socket)

ChangeID changes the socket id. NOTE: if the old id is remoteAddr, won't delete the index from socketHub.

func (*SocketHub) Delete

func (sh *SocketHub) Delete(id string)

Delete deletes the Socket for a id.

func (*SocketHub) Get

func (sh *SocketHub) Get(id string) (Socket, bool)

Get gets Socket by id. If second returned arg is false, mean the Socket is not found.

func (*SocketHub) Len

func (sh *SocketHub) Len() int

Len returns the length of the socket hub. NOTE: the count implemented using sync.Map may be inaccurate.

func (*SocketHub) Random

func (sh *SocketHub) Random() (Socket, bool)

Random gets a Socket randomly. If third returned arg is false, mean no Socket is exist.

func (*SocketHub) Range

func (sh *SocketHub) Range(f func(Socket) bool)

Range calls f sequentially for each id and Socket present in the socket hub. If f returns false, range stops the iteration.

func (*SocketHub) Set

func (sh *SocketHub) Set(socket Socket)

Set sets a Socket.

Directories

Path Synopsis
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL