socket

package
v0.5.1-0...-236ff9b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2017 License: Apache-2.0, Apache-2.0 Imports: 17 Imported by: 0

README

Socket

A concise, powerful and high-performance TCP connection socket.

Feature

  • The server and client are peer-to-peer interfaces
  • With I/O buffer
  • Packet contains both Header and Body
  • Supports custom encoding types, e.g JSON Protobuf
  • Header and Body can use different coding types
  • Body supports gzip compression
  • Header contains the status code and its description text
  • Each socket is assigned an id
  • Provides Socket hub, Socket pool and *Packet stack

Packet

HeaderLength | HeaderCodecId | Header | BodyLength | BodyCodecId | Body

Notes:

  • HeaderLength: uint32, 4 bytes, big endian
  • BodyLength: uint32, 4 bytes, big endian
  • HeaderCodecId: uint8, 1 byte
  • BodyCodecId: uint8, 1 byte
type Packet struct {
	// HeaderCodec header codec string
	HeaderCodec string
	// BodyCodec body codec string
	BodyCodec string
	// header content
	Header *Header `json:"header"`
	// body content
	Body interface{} `json:"body"`
	// header length
	HeaderLength int64 `json:"header_length"`
	// body length
	BodyLength int64 `json:"body_length"`
	// HeaderLength + BodyLength
	Length int64 `json:"length"`
}

Header

type Header struct {
	// Packet id
	Id string
	// Service type
	Type int32
	// Service URI
	Uri string
	// Body gizp compression level
	Gzip int32
	// As reply, it indicates the service status code
	StatusCode int32
	// As reply, it indicates the service status text
	Status string
}

Demo

server.go
package main

import (
	"log"
	"net"
	"time"

	"github.com/henrylee2cn/teleport/socket"
)

func main() {
	lis, err := net.Listen("tcp", "0.0.0.0:8000")
	if err != nil {
		log.Fatalf("[SVR] listen err: %v", err)
	}
	log.Printf("listen tcp 0.0.0.0:8000")
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Fatalf("[SVR] accept err: %v", err)
		}
		go func(s socket.Socket) {
			log.Printf("accept %s", s.Id())
			defer s.Close()
			for {
				// read request
				var packet = socket.GetPacket(func(_ *socket.Header) interface{} {
					return new(map[string]string)
				})
				err = s.ReadPacket(packet)
				if err != nil {
					log.Printf("[SVR] read request err: %v", err)
					return
				} else {
					log.Printf("[SVR] read request: %v", packet)
				}

				// write response
				packet.HeaderCodec = "json"
				packet.BodyCodec = "json"
				packet.Header.StatusCode = 200
				packet.Header.Status = "ok"
				packet.Body = time.Now()
				err = s.WritePacket(packet)
				if err != nil {
					log.Printf("[SVR] write response err: %v", err)
				}
				log.Printf("[SVR] write response: %v", packet)
				socket.PutPacket(packet)
			}
		}(socket.GetSocket(conn))
	}
}
client.go
package main

import (
	"log"
	"net"

	"github.com/henrylee2cn/teleport/socket"
)

func main() {
	conn, err := net.Dial("tcp", "127.0.0.1:8000")
	if err != nil {
		log.Fatalf("[CLI] dial err: %v", err)
	}
	s := socket.GetSocket(conn)
	defer s.Close()
	var packet = socket.GetPacket(nil)
	defer socket.PutPacket(packet)
	for i := 0; i < 10; i++ {
		// write request
		packet.Reset(nil)
		packet.HeaderCodec = "json"
		packet.BodyCodec = "json"
		packet.Header.Seq = 1
		packet.Header.Uri = "/a/b"
		packet.Header.Gzip = 5
		packet.Body = map[string]string{"a": "A"}
		err = s.WritePacket(packet)
		if err != nil {
			log.Printf("[CLI] write request err: %v", err)
			continue
		}
		log.Printf("[CLI] write request: %v", packet)

		// read response
		packet.Reset(func(_ *socket.Header) interface{} {
			return new(string)
		})
		err = s.ReadPacket(packet)
		if err != nil {
			log.Printf("[CLI] read response err: %v", err)
		} else {
			log.Printf("[CLI] read response: %v", packet)
		}
	}
}

Documentation

Overview

Package socket is a generated protocol buffer package.

It is generated from these files:
	header.proto

It has these top-level messages:
	Header

Socket package provides a concise, powerful and high-performance TCP socket.

Copyright 2017 HenryLee. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Packet protocol:

HeaderLength | HeaderCodecId | Header | BodyLength | BodyCodecId | Body

Notes:

HeaderLength: uint32, 4 bytes, big endian
BodyLength: uint32, 4 bytes, big endian
HeaderCodecId: uint8, 1 byte
BodyCodecId: uint8, 1 byte

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthHeader = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowHeader   = fmt.Errorf("proto: integer overflow")
)
View Source
var ErrProactivelyCloseSocket = errors.New("socket is closed proactively")

ErrProactivelyCloseSocket proactively close the socket error.

Functions

func AddCodecToBytes

func AddCodecToBytes(codecId byte, body []byte) []byte

AddCodecToBytes adds codec id to body bytes.

func NewSocket

func NewSocket(c net.Conn, id ...string) *socket

NewSocket wraps a net.Conn as a Socket.

func PutPacket

func PutPacket(p *Packet)

PutPacket puts a *Packet to packet stack.

func SetDefaultBodyCodec

func SetDefaultBodyCodec(codecName string)

SetDefaultBodyCodec set the default header codec. Note:

If the codec.Codec named 'codecName' is not registered, it will panic;
It is not safe to call it concurrently.

func SetDefaultHeaderCodec

func SetDefaultHeaderCodec(codecName string)

SetDefaultHeaderCodec set the default header codec. Note:

If the codec.Codec named 'codecName' is not registered, it will panic;
It is not safe to call it concurrently.

func Unmarshal

func Unmarshal(b []byte, v interface{}, isGzip bool) (codecName string, err error)

Unmarshal unmarshals bytes to header or body receiver.

Types

type GzipDecoder

type GzipDecoder struct {
	// contains filtered or unexported fields
}

func (*GzipDecoder) Decode

func (g *GzipDecoder) Decode(gzipLevel int, v interface{}) error

func (*GzipDecoder) Name

func (g *GzipDecoder) Name() string

type GzipEncoder

type GzipEncoder struct {
	// contains filtered or unexported fields
}

func (*GzipEncoder) Encode

func (g *GzipEncoder) Encode(gzipLevel int, v interface{}) error

func (*GzipEncoder) Id

func (g *GzipEncoder) Id() byte
type Header struct {
	Seq        uint64 `protobuf:"varint,1,opt,name=seq,proto3" json:"seq,omitempty"`
	Type       int32  `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"`
	Uri        string `protobuf:"bytes,3,opt,name=uri,proto3" json:"uri,omitempty"`
	Gzip       int32  `protobuf:"varint,4,opt,name=gzip,proto3" json:"gzip,omitempty"`
	StatusCode int32  `protobuf:"varint,5,opt,name=status_code,json=statusCode,proto3" json:"status_code,omitempty"`
	Status     string `protobuf:"bytes,6,opt,name=status,proto3" json:"status,omitempty"`
}

func (*Header) Descriptor

func (*Header) Descriptor() ([]byte, []int)

func (*Header) GetGzip

func (m *Header) GetGzip() int32

func (*Header) GetSeq

func (m *Header) GetSeq() uint64

func (*Header) GetStatus

func (m *Header) GetStatus() string

func (*Header) GetStatusCode

func (m *Header) GetStatusCode() int32

func (*Header) GetType

func (m *Header) GetType() int32

func (*Header) GetUri

func (m *Header) GetUri() string

func (*Header) Marshal

func (m *Header) Marshal() (dAtA []byte, err error)

func (*Header) MarshalTo

func (m *Header) MarshalTo(dAtA []byte) (int, error)

func (*Header) ProtoMessage

func (*Header) ProtoMessage()

func (*Header) Reset

func (m *Header) Reset()

func (*Header) Size

func (m *Header) Size() (n int)

func (*Header) String

func (m *Header) String() string

func (*Header) Unmarshal

func (m *Header) Unmarshal(dAtA []byte) error

type Packet

type Packet struct {
	// HeaderCodec header codec name
	HeaderCodec string `json:"header_codec"`
	// BodyCodec body codec name
	BodyCodec string `json:"body_codec"`
	// header content
	Header *Header `json:"header"`
	// body content
	Body interface{} `json:"body"`
	// header length
	HeaderLength int64 `json:"header_length"`
	// body length
	BodyLength int64 `json:"body_length"`
	// HeaderLength + BodyLength
	Length int64 `json:"length"`
	// contains filtered or unexported fields
}

Packet provides header and body's containers for receiving and sending packet.

func GetPacket

func GetPacket(bodyGetting func(*Header) interface{}, settings ...PacketSetting) *Packet

GetPacket gets a *Packet form packet stack. Note:

bodyGetting is only for reading form connection;
settings are only for writing to connection.

func GetReceiverPacket

func GetReceiverPacket(bodyGetting func(*Header) interface{}) *Packet

GetReceiverPacket returns a packet for sending.

func GetSenderPacket

func GetSenderPacket(typ int32, uri string, body interface{}, setting ...PacketSetting) *Packet

GetSenderPacket returns a packet for sending.

func NewPacket

func NewPacket(bodyGetting func(*Header) interface{}, settings ...PacketSetting) *Packet

NewPacket creates a new *Packet. Note:

bodyGetting is only for reading form connection;
settings are only for writing to connection.

func NewReceiverPacket

func NewReceiverPacket(bodyGetting func(*Header) interface{}) *Packet

NewReceiverPacket returns a packet for sending.

func NewSenderPacket

func NewSenderPacket(typ int32, uri string, body interface{}, setting ...PacketSetting) *Packet

NewSenderPacket returns a packet for sending.

func (*Packet) BodyCodecId

func (p *Packet) BodyCodecId() byte

BodyCodecId returns packet body codec id.

func (*Packet) HeaderCodecId

func (p *Packet) HeaderCodecId() byte

HeaderCodecId returns packet header codec id.

func (*Packet) Reset

func (p *Packet) Reset(bodyGetting func(*Header) interface{}, settings ...PacketSetting)

Reset resets itself. Note:

bodyGetting is only for reading form connection;
settings are only for writing to connection.

func (*Packet) ResetBodyGetting

func (p *Packet) ResetBodyGetting(bodyGetting func(*Header) interface{})

ResetBodyGetting resets the function of geting body.

func (*Packet) String

func (p *Packet) String() string

String returns printing text.

type PacketSetting

type PacketSetting func(*Packet)

PacketSetting sets Header field.

func WithBodyCodec

func WithBodyCodec(codecName string) PacketSetting

WithBodyCodec sets body codec name.

func WithBodyGzip

func WithBodyGzip(gzipLevel int32) PacketSetting

WithBodyGzip sets body gzip level.

func WithHeaderCodec

func WithHeaderCodec(codecName string) PacketSetting

WithHeaderCodec sets header codec name.

func WithStatus

func WithStatus(code int32, text string) PacketSetting

WithStatus sets header status.

type Socket

type Socket interface {
	// LocalAddr returns the local network address.
	LocalAddr() net.Addr

	// RemoteAddr returns the remote network address.
	RemoteAddr() net.Addr

	// SetDeadline sets the read and write deadlines associated
	// with the connection. It is equivalent to calling both
	// SetReadDeadline and SetWriteDeadline.
	//
	// A deadline is an absolute time after which I/O operations
	// fail with a timeout (see type Error) instead of
	// blocking. The deadline applies to all future and pending
	// I/O, not just the immediately following call to Read or
	// Write. After a deadline has been exceeded, the connection
	// can be refreshed by setting a deadline in the future.
	//
	// An idle timeout can be implemented by repeatedly extending
	// the deadline after successful Read or Write calls.
	//
	// A zero value for t means I/O operations will not time out.
	SetDeadline(t time.Time) error

	// SetReadDeadline sets the deadline for future Read calls
	// and any currently-blocked Read call.
	// A zero value for t means Read will not time out.
	SetReadDeadline(t time.Time) error

	// SetWriteDeadline sets the deadline for future Write calls
	// and any currently-blocked Write call.
	// Even if write times out, it may return n > 0, indicating that
	// some of the data was successfully written.
	// A zero value for t means Write will not time out.
	SetWriteDeadline(t time.Time) error

	// WritePacket writes header and body to the connection.
	// WritePacket can be made to time out and return an Error with Timeout() == true
	// after a fixed time limit; see SetDeadline and SetWriteDeadline.
	// Note: must be safe for concurrent use by multiple goroutines.
	WritePacket(packet *Packet) error

	// ReadPacket reads header and body from the connection.
	// Note: must be safe for concurrent use by multiple goroutines.
	ReadPacket(packet *Packet) error

	// Read reads data from the connection.
	// Read can be made to time out and return an Error with Timeout() == true
	// after a fixed time limit; see SetDeadline and SetReadDeadline.
	Read(b []byte) (n int, err error)

	// Write writes data to the connection.
	// Write can be made to time out and return an Error with Timeout() == true
	// after a fixed time limit; see SetDeadline and SetWriteDeadline.
	Write(b []byte) (n int, err error)

	// Close closes the connection socket.
	// Any blocked Read or Write operations will be unblocked and return errors.
	Close() error

	// Public returns temporary public data of Socket.
	Public() goutil.Map
	// PublicLen returns the length of public data of Socket.
	PublicLen() int
	// Id returns the socket id.
	Id() string
	// ChangeId changes the socket id.
	ChangeId(string)
}

Socket is a generic stream-oriented network connection.

Multiple goroutines may invoke methods on a Socket simultaneously.

func GetSocket

func GetSocket(c net.Conn, id ...string) Socket

GetSocket gets a Socket from pool, and reset it.

type SocketHub

type SocketHub struct {
	// contains filtered or unexported fields
}

SocketHub sockets hub

func NewSocketHub

func NewSocketHub() *SocketHub

NewSocketHub creates a new sockets hub.

func (*SocketHub) ChangeId

func (sh *SocketHub) ChangeId(newId string, socket Socket)

ChangeId changes the socket id.

func (*SocketHub) Delete

func (sh *SocketHub) Delete(id string)

Delete deletes the Socket for a id.

func (*SocketHub) Get

func (sh *SocketHub) Get(id string) (Socket, bool)

Get gets Socket by id. If second returned arg is false, mean the Socket is not found.

func (*SocketHub) Len

func (sh *SocketHub) Len() int

Len returns the length of the socket hub. Note: the count implemented using sync.Map may be inaccurate.

func (*SocketHub) Random

func (sh *SocketHub) Random() (Socket, bool)

Random gets a Socket randomly. If third returned arg is false, mean no Socket is exist.

func (*SocketHub) Range

func (sh *SocketHub) Range(f func(Socket) bool)

Range calls f sequentially for each id and Socket present in the socket hub. If f returns false, range stops the iteration.

func (*SocketHub) Set

func (sh *SocketHub) Set(socket Socket)

Set sets a Socket.

Directories

Path Synopsis
pb
Package pb is a generated protocol buffer package.
Package pb is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL