cnet

package module
v0.0.0-...-435a7a5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2020 License: MIT Imports: 12 Imported by: 0

README

cnet

介绍

本框架从evio和gnet派生,更改了部分api与处理逻辑。
基于epoll事件分发的网络服务框架,仅支持linux 下运行。

How to use
go get -u github.com/cuckooemm/cnet

TCP

type serverTcpCallback struct {
	connTotal, connected, close int64
	spanDown, spanUp            int64
}
// 链接accept时回调
func (sc *serverTcpCallback) OnConnOpened(c Conn) (out []byte, op Operation) {
	var str = "hello client, welcome to connection\n"
	out = []byte(str)
	atomic.AddInt64(&sc.spanUp, int64(len(out)))
	atomic.AddInt64(&sc.connected, 1)
	atomic.AddInt64(&sc.connTotal, 1)
	fmt.Println("conned - ", sc.connected, "connTotal - ", sc.connTotal, "close - ", sc.close)
	fmt.Println("spanDown - ", sc.spanDown, "spanUp - ", sc.spanUp)
	return
}

// 链接关闭时回调
func (sc *serverTcpCallback) OnConnClosed(c Conn, err error) (op Operation) {
	if err != nil {
		fmt.Println("connection err: ", err)
	}
	atomic.AddInt64(&sc.connected, -1)
	atomic.AddInt64(&sc.close, 1)
	return
}

// 链接可读时回调 
func (sc *serverTcpCallback) ConnHandler(c Conn) (out []byte, op Operation) {
	var rcv = c.Read()
	var l = len(rcv)
	atomic.AddInt64(&sc.spanDown, int64(l))
	fmt.Printf("已收到 %s client 发来长度为: %d 的信息: %s\n", c.RemoteAddr().String(), l, rcv)
	out = []byte("receive ")
	c.ShiftN(l)
	out = append(out, rcv...)
	atomic.AddInt64(&sc.spanUp, int64(len(out)))
	return
}

// 唤醒conn时触发 c.wake()
func (sc *serverTcpCallback) OnWakenHandler(c Conn) (out []byte, op Operation) {
	return nil, None
}

func main(){
    var (
    	call serverTcpCallback
        addr = ":8000"
        err error
    )   
    if err = TcpService(&call, addr, TcpOption{TcpKeepAlive: time.Minute, MultiCore: 4, ReusePort: true}); err != nil{
        // err handler...
    }
}

UDP

type serverUdpCallback struct {
	spanDown, spanUp int64
}

func (sc *serverUdpCallback) PackHandler(pack []byte, p Pconn) (out []byte, op Operation) {
	fmt.Printf("receive message :%s of client: %s", pack, p.RemoteAddr())
	atomic.AddInt64(&sc.spanDown, int64(len(pack)))
	out = append(out, []byte("reply: ")...)
	out = append(out, pack...)
	atomic.AddInt64(&sc.spanUp, int64(len(out)))
	return
}

func (sc *serverUdpCallback) SendErr(remoteAddr string, err error) {
	fmt.Printf("send error: %v of client: %s", err, remoteAddr)
}

func main(){
	var (
		c        cnet.Cnet
		err      error
	)
	c = cnet.Cnet{
		Network:      cnet.Tcp,
		Callback:     &serverCallback{},
		Addr:         ":8000",
		MultiCore:    4,
		TcpKeepAlive: time.Minute,
		ReusePort:    true,
	}
	if err = c.Listener(); err != nil {
		println(err)
	}
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrServerShutdown    = errors.New("service is going to be shutdown")
	ErrUnSupportProtocol = errors.New("unsupported protocol")
)

Functions

func TcpService

func TcpService(callback IEventCallback, addr string, opt TcpOption) error

func UdpService

func UdpService(callback IEventCallback, addr string, opt UdpOption) error

Types

type Cnet

type Cnet struct {
	// protocol
	Network Network
	// address
	Addr string
	// reuseport
	ReusePort bool
	// event-loop number
	MultiCore int
	// tco keepAlive
	TcpKeepAlive time.Duration
	// callback
	Callback IEventCallback
	// log
	Logger Logger
}

func (*Cnet) Listener

func (c *Cnet) Listener() error

type Conn

type Conn interface {
	// 返回用户定义数据。
	Expand() map[string]interface{}
	// 设置用户定义的数据
	SetExpand(data map[string]interface{})
	// 协议
	Network() string
	// 连接的本地套接字地址
	LocalAddr() string
	// 连接的远程对端地址
	RemoteAddr() string

	// Read从入站环形缓冲区中读取所有数据,而不会移动“read”指针,不会淘汰缓冲区数据,直到调用ResetBuffer方法为止 。
	Read() (int, []byte)

	// ResetBuffer重置入站环形缓冲区
	ResetBuffer()

	// ReadN从入站环形缓冲区和读取具有给定长度的字节,不会移动“读取”指针,直到调用ShiftN方法,它才会从缓冲区中逐出数据,
	ReadN(n int) (int, []byte)

	// ShiftN将“read”指针移入给定长度的缓冲区中。
	ShiftN(n int) int

	// BufferLength 返回入站环形缓冲区中可用数据的长度。
	BufferLength() int

	// AsyncWrite异步将数据写入客户端/连接,通常在单个goroutine中而不是事件循环goroutine中调用它。
	AsyncWrite([]byte) error

	// 唤醒会为此连接触发一个React事件。
	Wake() error

	// 关闭当前连接
	Close() error
}

type IEventCallback

type IEventCallback interface {
	// tcp
	// 链接连接时回调
	OnConnOpened(c Conn) (out []byte, op Operation)
	// 链接关闭时回调
	OnConnClosed(c Conn, err error) (op Operation)
	// 读事件触发
	ConnHandler(c Conn) (out []byte, op Operation)
	// 唤醒conn时触发 c.wake
	OnWakenHandler(c Conn) (out []byte, op Operation)
	// udp
	// 读事件触发
	PackHandler(pack []byte, p Pconn) (out []byte, op Operation)
	// 发送数据错误时触发
	SendErr(remoteAddr string, err error)
}

type IEventTcpLoopGroup

type IEventTcpLoopGroup interface {
	// contains filtered or unexported methods
}

type Logger

type Logger interface {
	Printf(format string, args ...interface{})
}

type Network

type Network int
const (
	Tcp Network = iota
	Udp
)

type Operation

type Operation int
const (
	None Operation = iota
	Close
	Shutdown
)

type Pconn

type Pconn interface {
	// 协议
	Network() string
	// 连接的本地套接字地址
	LocalAddr() (addr string)

	// 连接的远程对端地址
	RemoteAddr() (addr string)

	// SendTo为UDP套接字写入数据,它允许您在各个goroutine中将数据发送回UDP套接字。
	SendTo(buf []byte) error
}

type TcpOption

type TcpOption struct {
	ReusePort    bool
	MultiCore    int
	Logger       Logger
	TcpKeepAlive time.Duration
}

type UdpOption

type UdpOption struct {
	ReusePort bool
	MultiCore int
	Logger    Logger
}

Directories

Path Synopsis
example
buf

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL