knetty

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

README

knetty

English | 中文

Introduction

knetty is a network communication framework written in Go based on the event-driven architecture. It supports TCP, UDP, and websocket protocols and is easy to use like Netty written in Java."

Contents

Installation

To install knetty package,you need to install Go and set your Go workspace first.

  • You first need Go installed (version 1.18+ is required), then you can use the below Go command to install knetty.
go get -u  github.com/Softwarekang/knetty
  • import in your code
import "github.com/Softwarekang/knetty"

Quick Start

# View knetty code examples
# work dir in knetty
cd /example/server
# view server start up code examples
cat server.go
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/Softwarekang/knetty"
	"github.com/Softwarekang/knetty/session"
)

func main() {
	knetty.SetPollerNums(8)
	// setting optional options for the server
	options := []knetty.ServerOption{
		knetty.WithServiceNewSessionCallBackFunc(newSessionCallBackFn),
	}

	// creating a new server with network settings such as tcp/upd, address such as 127.0.0.1:8000, and optional options
	server := knetty.NewServer("tcp", "127.0.0.1:8000", options...)
	// Initializing the server in a goroutine so that
	// it won't block the graceful shutdown handling below
	go func() {
		if err := server.Server(); err != nil && errors.Is(err, http.ErrServerClosed) {
			log.Printf("run server: %s\n", err)
		}
	}()

	// Wait for interrupt signal to gracefully shut down the server with
	quit := make(chan os.Signal)
	// kill (no param) default send syscall.SIGTERM
	// kill -2 is syscall.SIGINT
	// kill -9 is syscall.SIGKILL but can't be caught, so don't need to add it
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	log.Printf("server pid:%d", os.Getpid())
	<-quit
	log.Println("shutting down server...")

	// The context is used to inform the server it has 5 seconds to finish
	// the request it is currently handling
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	if err := server.Shutdown(ctx); err != nil {
		log.Fatal("server starting shutdown:", err)
	}

	log.Println("server exiting")
}

// set the necessary parameters for the session to run.
func newSessionCallBackFn(s session.Session) error {
	s.SetCodec(&codec{})
	s.SetEventListener(&helloWorldListener{})
	return nil
}

type helloWorldListener struct {
}

func (e *helloWorldListener) OnMessage(s session.Session, pkg interface{}) session.ExecStatus {
	s.WritePkg(pkg)
	s.FlushBuffer()
	return session.Normal
}

func (e *helloWorldListener) OnConnect(s session.Session) {
	fmt.Printf("local:%s get a remote:%s connection\n", s.LocalAddr(), s.RemoteAddr())
}

func (e *helloWorldListener) OnClose(s session.Session) {
	fmt.Printf("server session: %s closed\n", s.Info())
}

func (e *helloWorldListener) OnError(s session.Session, err error) {
	fmt.Printf("session: %s got err :%v\n", s.Info(), err)
}

type codec struct {
}

func (c codec) Encode(pkg interface{}) ([]byte, error) {

	data, _ := pkg.(string)

	return []byte(data), nil
}

func (c codec) Decode(bytes []byte) (interface{}, int, error) {
	if bytes == nil {
		return nil, 0, errors.New("bytes is nil")
	}

	data := string(bytes)

	if len(data) == 0 {
		return nil, 0, nil
	}
	return data, len(data), nil
}

# start up server 
go run ./example/server/server/server.go
# view client start up code examples
cat client.go
/*
	Copyright 2022 Phoenix

	Licensed under the Apache License, Version 2.0 (the "License");
	you may not use this file except in compliance with the License.
	You may obtain a copy of the License at

	http://www.apache.org/licenses/LICENSE-2.0

	Unless required by applicable law or agreed to in writing, software
	distributed under the License is distributed on an "AS IS" BASIS,
	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
	See the License for the specific language governing permissions and
	limitations under the License.
*/

package main

import (
	"errors"
	"fmt"
	"log"
	"syscall"
	"time"

	"github.com/Softwarekang/knetty"
	"github.com/Softwarekang/knetty/session"
)

func main() {
	// setting optional options for the server
	options := []knetty.ClientOption{
		knetty.WithClientNewSessionCallBackFunc(newSessionCallBackFn),
	}
	client := knetty.NewClient("tcp", "127.0.0.1:8000", options...)

	if err := client.Run(); err != nil {
		log.Printf("run client: %s\n", err)
	}
}

// set the necessary parameters for the session to run.
func newSessionCallBackFn(s session.Session) error {
	s.SetCodec(codec{})
	s.SetEventListener(&pkgListener{})
	return nil
}

func sendHello(s session.Session) {
	n, err := s.WritePkg("hello")
	if err != nil && err != syscall.EAGAIN {
		log.Println(err)
	}

	fmt.Printf("client session send %v bytes data to server\n", n)
	if err := s.FlushBuffer(); err != nil {
		log.Println(err)
	}
}

type codec struct{}

func (c codec) Encode(pkg interface{}) ([]byte, error) {
	if pkg == nil {
		return nil, errors.New("pkg is illegal")
	}
	data, ok := pkg.(string)
	if !ok {
		return nil, errors.New("pkg type must be string")
	}

	return []byte(data), nil
}

func (c codec) Decode(bytes []byte) (interface{}, int, error) {
	if bytes == nil {
		return nil, 0, errors.New("bytes is nil")
	}

	if len(bytes) < 5 {
		return nil, 0, nil
	}

	data := string(bytes)
	if len(bytes) > 5 {
		data = data[0:5]
	}
	return data, len(data), nil
}

type pkgListener struct {
}

func (e *pkgListener) OnMessage(s session.Session, pkg interface{}) session.ExecStatus {
	data := pkg.(string)
	fmt.Printf("client got data:%s\n", data)
	_, err := s.WriteBuffer([]byte(data))
	if err = s.FlushBuffer(); err != nil {
	}
	time.Sleep(2 * time.Second)
	return session.Normal
}

func (e *pkgListener) OnConnect(s session.Session) {
	fmt.Printf("local:%s get a remote:%s connection\n", s.LocalAddr(), s.RemoteAddr())
	sendHello(s)
}

func (e *pkgListener) OnClose(s session.Session) {
	fmt.Printf("client session: %s closed\n", s.Info())

}

func (e *pkgListener) OnError(s session.Session, err error) {
	fmt.Printf("client session: %s got err :%v\n", s.Info(), err)
}

# start up client
go run ./example/server/client/client.go

More Detail

Using NewSessionCallBackFunc

definition

/*
	NewSessionCallBackFunc It is executed when a new session is established,
	so some necessary parameters for drawing need to be set to ensure that the session starts properly.
*/
type NewSessionCallBackFunc func(s session.Session) error

You can set parameters such as codec, event listener and more for the session via the provided API.

// set the necessary parameters for the session to run.
func newSessionCallBackFn(s session.Session) error {
	s.SetCodec(&codec{})
	s.SetEventListener(&helloWorldListener{})
	return nil
}
Using Codec

definition

// Codec for session
type Codec interface {
	// Encode will convert object to binary network data
	Encode(pkg interface{}) ([]byte, error)

	// Decode will convert binary network data into upper-layer protocol objects.
	// The following three conditions are used to distinguish abnormal, half - wrapped, normal and sticky packets.
	// Exceptions: nil,0,err
	// Half-pack: nil,0,nil
	// Normal & Sticky package: pkg,pkgLen,nil
	Decode([]byte) (interface{}, int, error)
}

Here is an implementation of a hello string boundary encoder that handles semi-packet, sticky packet, and exceptional network data processing logic.

func (c codec) Encode(pkg interface{}) ([]byte, error) {
	if pkg == nil {
		return nil, errors.New("pkg is illegal")
	}
	data, ok := pkg.(string)
	if !ok {
		return nil, errors.New("pkg type must be string")
	}

	if len(data) != 5 || data != "hello" {
		return nil, errors.New("pkg string must be \"hello\"")
	}

	return []byte(data), nil
}

func (c codec) Decode(bytes []byte) (interface{}, int, error) {
	if bytes == nil {
		return nil, 0, errors.New("bytes is nil")
	}

	if len(bytes) < 5 {
		return nil, 0, nil
	}

	data := string(bytes)
	if len(bytes) > 5 {
		data = data[0:5]
	}
	if data != "hello" {
		return nil, 0, errors.New("data is not 'hello'")
	}
	return data, len(data), nil
}
Using Custom Logger

definition

// Logger  A Logger is a minimalistic interface for the knetty to log messages to. Should
// be used to provide custom logging writers for the knetty to use.
type Logger interface {
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Fatal(args ...interface{})
	Infof(format string, args ...interface{})
	Info(args ...interface{})
	Warnf(format string, args ...interface{})
	Debugf(format string, args ...interface{})
	Debug(args ...interface{})
}

// SetLogger set custom log
func SetLogger(logger log.Logger) {
	log.DefaultLogger = logger
}

set custom logger

// logger must impl Logger Interface
knetty.SetLogger(logger)
Using EventListener

definition

// EventListener listen for session
type EventListener interface {
	// OnMessage runs when the session gets a pkg
	OnMessage(s Session, pkg interface{})
	// OnConnect runs when the connection initialized
	OnConnect(s Session)
	// OnClose runs before the session closed
	OnClose(s Session)
	// OnError runs when the session err
	OnError(s Session, e error)
}

Below is a typical event listener.

type helloWorldListener struct {
}

func (e *helloWorldListener) OnMessage(s session.Session, pkg interface{}) {
	data := pkg.(string)
	fmt.Println(data)
}

func (e *helloWorldListener) OnConnect(s session.Session) {
	fmt.Printf("local:%s get a remote:%s connection\n", s.LocalAddr(), s.RemoteAddr())
}

func (e *helloWorldListener) OnClose(s session.Session) {
	fmt.Printf("session close\n")
}

func (e *helloWorldListener) OnError(s session.Session, err error) {
	fmt.Printf("session got err :%v\n", err)
}
Graceful shutdown
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/Softwarekang/knetty"
	"github.com/Softwarekang/knetty/session"
)

func main() {
	// setting optional options for the server
	options := []knetty.ServerOption{
		knetty.WithServiceNewSessionCallBackFunc(newSessionCallBackFn),
	}

	// creating a new server with network settings such as tcp/upd, address such as 127.0.0.1:8000, and optional options
	server := knetty.NewServer("tcp", "127.0.0.1:8000", options...)
	// Initializing the server in a goroutine so that
	// it won't block the graceful shutdown handling below
	go func() {
		if err := server.Server(); err != nil && errors.Is(err, http.ErrServerClosed) {
			log.Printf("run server: %s\n", err)
		}
	}()

	// Wait for interrupt signal to gracefully shutdown the server with
	quit := make(chan os.Signal)
	// kill (no param) default send syscall.SIGTERM
	// kill -2 is syscall.SIGINT
	// kill -9 is syscall.SIGKILL but can't be caught, so don't need to add it
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	log.Println("shutting down server...")

	// The context is used to inform the server it has 5 seconds to finish
	// the request it is currently handling
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	if err := server.Shutdown(ctx); err != nil {
		log.Fatal("server starting shutdown:", err)
	}

	log.Println("server exiting")
}

Benchmarks

Documentation

Overview

Package knetty .

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetLogger

func SetLogger(logger log.Logger)

SetLogger set custom log

func SetPollerNums

func SetPollerNums(n int) error

SetPollerNums set reactor goroutine nums

Types

type Client

type Client struct {
	ClientOptions
	// contains filtered or unexported fields
}

func NewClient

func NewClient(network, address string, opts ...ClientOption) *Client

NewClient init the client network and address are necessary parameters network like tcp、udp、websocket address like 127.0.0.1:8000、localhost:8000.

func (*Client) Run

func (c *Client) Run() error

func (*Client) Shutdown

func (c *Client) Shutdown(ctx context.Context) error

Shutdown closeCh the client within the maximum allowed time in ctx, otherwise return timeout err.

type ClientOption

type ClientOption func(options *ClientOptions)

ClientOption option for client

func WithClientNewSessionCallBackFunc

func WithClientNewSessionCallBackFunc(f NewSessionCallBackFunc) ClientOption

WithClientNewSessionCallBackFunc set newSessionCallBackFunc

type ClientOptions

type ClientOptions struct {
	// contains filtered or unexported fields
}

ClientOptions options for client

type NewSessionCallBackFunc

type NewSessionCallBackFunc func(s session.Session) error

NewSessionCallBackFunc It is executed when a new session is established, so some necessary parameters for drawing need to be set to ensure that the session starts properly.

type Server

type Server struct {
	ServerOptions
	// contains filtered or unexported fields
}

Server for knetty

func NewServer

func NewServer(network, address string, opts ...ServerOption) *Server

NewServer init the server network and address are necessary parameters network like tcp、udp、websocket address like 127.0.0.1:8000、localhost:8000.

func (*Server) Server

func (s *Server) Server() error

Server listen and run event-loop

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown stop server

type ServerOption

type ServerOption func(*ServerOptions)

ServerOption option for server

func WithServiceNewSessionCallBackFunc

func WithServiceNewSessionCallBackFunc(f NewSessionCallBackFunc) ServerOption

WithServiceNewSessionCallBackFunc set newSessionCallBackFunc

type ServerOptions

type ServerOptions struct {
	// contains filtered or unexported fields
}

ServerOptions options for server

Directories

Path Synopsis
example
internal
net
net/connection
Package connection implements tcp, udp and other protocols for network connection.
Package connection implements tcp, udp and other protocols for network connection.
net/poll
Package poll impl io multiplexing on different systems.
Package poll impl io multiplexing on different systems.
pkg
err
Package err wrapped err for knetty
Package err wrapped err for knetty
log
math
Package math common mathematical methods
Package math common mathematical methods
net
Package net general net func
Package net general net func
pool/ringbuffer
Package ring_buffer implement buffer cache pool to improve memory reuse.
Package ring_buffer implement buffer cache pool to improve memory reuse.
syscall
Package syscall wrapped syscall for knetty
Package syscall wrapped syscall for knetty
utils
Package utils generic tool method implementation.
Package utils generic tool method implementation.
Package session for knetty
Package session for knetty

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL