knetty
English | 中文
Introduction
knetty is a network communication framework written in Go based on the event-driven architecture. It supports TCP, UDP, and websocket protocols and is easy to use like Netty written in Java."
Contents
Installation
To install knetty package,you need to install Go and set your Go workspace first.
- You first need Go installed (version 1.18+ is required), then you can use the below Go command to install knetty.
go get -u github.com/Softwarekang/knetty
import "github.com/Softwarekang/knetty"
Quick Start
# View knetty code examples
# work dir in knetty
cd /example/server
# view server start up code examples
cat server.go
package main
import (
"context"
"errors"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/Softwarekang/knetty"
"github.com/Softwarekang/knetty/session"
)
func main() {
knetty.SetPollerNums(8)
// setting optional options for the server
options := []knetty.ServerOption{
knetty.WithServiceNewSessionCallBackFunc(newSessionCallBackFn),
}
// creating a new server with network settings such as tcp/upd, address such as 127.0.0.1:8000, and optional options
server := knetty.NewServer("tcp", "127.0.0.1:8000", options...)
// Initializing the server in a goroutine so that
// it won't block the graceful shutdown handling below
go func() {
if err := server.Server(); err != nil && errors.Is(err, http.ErrServerClosed) {
log.Printf("run server: %s\n", err)
}
}()
// Wait for interrupt signal to gracefully shut down the server with
quit := make(chan os.Signal)
// kill (no param) default send syscall.SIGTERM
// kill -2 is syscall.SIGINT
// kill -9 is syscall.SIGKILL but can't be caught, so don't need to add it
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
log.Printf("server pid:%d", os.Getpid())
<-quit
log.Println("shutting down server...")
// The context is used to inform the server it has 5 seconds to finish
// the request it is currently handling
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Fatal("server starting shutdown:", err)
}
log.Println("server exiting")
}
// set the necessary parameters for the session to run.
func newSessionCallBackFn(s session.Session) error {
s.SetCodec(&codec{})
s.SetEventListener(&helloWorldListener{})
return nil
}
type helloWorldListener struct {
}
func (e *helloWorldListener) OnMessage(s session.Session, pkg interface{}) session.ExecStatus {
s.WritePkg(pkg)
s.FlushBuffer()
return session.Normal
}
func (e *helloWorldListener) OnConnect(s session.Session) {
fmt.Printf("local:%s get a remote:%s connection\n", s.LocalAddr(), s.RemoteAddr())
}
func (e *helloWorldListener) OnClose(s session.Session) {
fmt.Printf("server session: %s closed\n", s.Info())
}
func (e *helloWorldListener) OnError(s session.Session, err error) {
fmt.Printf("session: %s got err :%v\n", s.Info(), err)
}
type codec struct {
}
func (c codec) Encode(pkg interface{}) ([]byte, error) {
data, _ := pkg.(string)
return []byte(data), nil
}
func (c codec) Decode(bytes []byte) (interface{}, int, error) {
if bytes == nil {
return nil, 0, errors.New("bytes is nil")
}
data := string(bytes)
if len(data) == 0 {
return nil, 0, nil
}
return data, len(data), nil
}
# start up server
go run ./example/server/server/server.go
# view client start up code examples
cat client.go
/*
Copyright 2022 Phoenix
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"errors"
"fmt"
"log"
"syscall"
"time"
"github.com/Softwarekang/knetty"
"github.com/Softwarekang/knetty/session"
)
func main() {
// setting optional options for the server
options := []knetty.ClientOption{
knetty.WithClientNewSessionCallBackFunc(newSessionCallBackFn),
}
client := knetty.NewClient("tcp", "127.0.0.1:8000", options...)
if err := client.Run(); err != nil {
log.Printf("run client: %s\n", err)
}
}
// set the necessary parameters for the session to run.
func newSessionCallBackFn(s session.Session) error {
s.SetCodec(codec{})
s.SetEventListener(&pkgListener{})
return nil
}
func sendHello(s session.Session) {
n, err := s.WritePkg("hello")
if err != nil && err != syscall.EAGAIN {
log.Println(err)
}
fmt.Printf("client session send %v bytes data to server\n", n)
if err := s.FlushBuffer(); err != nil {
log.Println(err)
}
}
type codec struct{}
func (c codec) Encode(pkg interface{}) ([]byte, error) {
if pkg == nil {
return nil, errors.New("pkg is illegal")
}
data, ok := pkg.(string)
if !ok {
return nil, errors.New("pkg type must be string")
}
return []byte(data), nil
}
func (c codec) Decode(bytes []byte) (interface{}, int, error) {
if bytes == nil {
return nil, 0, errors.New("bytes is nil")
}
if len(bytes) < 5 {
return nil, 0, nil
}
data := string(bytes)
if len(bytes) > 5 {
data = data[0:5]
}
return data, len(data), nil
}
type pkgListener struct {
}
func (e *pkgListener) OnMessage(s session.Session, pkg interface{}) session.ExecStatus {
data := pkg.(string)
fmt.Printf("client got data:%s\n", data)
_, err := s.WriteBuffer([]byte(data))
if err = s.FlushBuffer(); err != nil {
}
time.Sleep(2 * time.Second)
return session.Normal
}
func (e *pkgListener) OnConnect(s session.Session) {
fmt.Printf("local:%s get a remote:%s connection\n", s.LocalAddr(), s.RemoteAddr())
sendHello(s)
}
func (e *pkgListener) OnClose(s session.Session) {
fmt.Printf("client session: %s closed\n", s.Info())
}
func (e *pkgListener) OnError(s session.Session, err error) {
fmt.Printf("client session: %s got err :%v\n", s.Info(), err)
}
# start up client
go run ./example/server/client/client.go
More Detail
Using NewSessionCallBackFunc
definition
/*
NewSessionCallBackFunc It is executed when a new session is established,
so some necessary parameters for drawing need to be set to ensure that the session starts properly.
*/
type NewSessionCallBackFunc func(s session.Session) error
You can set parameters such as codec, event listener and more for the session via the provided API.
// set the necessary parameters for the session to run.
func newSessionCallBackFn(s session.Session) error {
s.SetCodec(&codec{})
s.SetEventListener(&helloWorldListener{})
return nil
}
Using Codec
definition
// Codec for session
type Codec interface {
// Encode will convert object to binary network data
Encode(pkg interface{}) ([]byte, error)
// Decode will convert binary network data into upper-layer protocol objects.
// The following three conditions are used to distinguish abnormal, half - wrapped, normal and sticky packets.
// Exceptions: nil,0,err
// Half-pack: nil,0,nil
// Normal & Sticky package: pkg,pkgLen,nil
Decode([]byte) (interface{}, int, error)
}
Here is an implementation of a hello string boundary encoder that handles semi-packet, sticky packet, and exceptional network data processing logic.
func (c codec) Encode(pkg interface{}) ([]byte, error) {
if pkg == nil {
return nil, errors.New("pkg is illegal")
}
data, ok := pkg.(string)
if !ok {
return nil, errors.New("pkg type must be string")
}
if len(data) != 5 || data != "hello" {
return nil, errors.New("pkg string must be \"hello\"")
}
return []byte(data), nil
}
func (c codec) Decode(bytes []byte) (interface{}, int, error) {
if bytes == nil {
return nil, 0, errors.New("bytes is nil")
}
if len(bytes) < 5 {
return nil, 0, nil
}
data := string(bytes)
if len(bytes) > 5 {
data = data[0:5]
}
if data != "hello" {
return nil, 0, errors.New("data is not 'hello'")
}
return data, len(data), nil
}
Using Custom Logger
definition
// Logger A Logger is a minimalistic interface for the knetty to log messages to. Should
// be used to provide custom logging writers for the knetty to use.
type Logger interface {
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Fatal(args ...interface{})
Infof(format string, args ...interface{})
Info(args ...interface{})
Warnf(format string, args ...interface{})
Debugf(format string, args ...interface{})
Debug(args ...interface{})
}
// SetLogger set custom log
func SetLogger(logger log.Logger) {
log.DefaultLogger = logger
}
set custom logger
// logger must impl Logger Interface
knetty.SetLogger(logger)
Using EventListener
definition
// EventListener listen for session
type EventListener interface {
// OnMessage runs when the session gets a pkg
OnMessage(s Session, pkg interface{})
// OnConnect runs when the connection initialized
OnConnect(s Session)
// OnClose runs before the session closed
OnClose(s Session)
// OnError runs when the session err
OnError(s Session, e error)
}
Below is a typical event listener.
type helloWorldListener struct {
}
func (e *helloWorldListener) OnMessage(s session.Session, pkg interface{}) {
data := pkg.(string)
fmt.Println(data)
}
func (e *helloWorldListener) OnConnect(s session.Session) {
fmt.Printf("local:%s get a remote:%s connection\n", s.LocalAddr(), s.RemoteAddr())
}
func (e *helloWorldListener) OnClose(s session.Session) {
fmt.Printf("session close\n")
}
func (e *helloWorldListener) OnError(s session.Session, err error) {
fmt.Printf("session got err :%v\n", err)
}
Graceful shutdown
package main
import (
"context"
"errors"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/Softwarekang/knetty"
"github.com/Softwarekang/knetty/session"
)
func main() {
// setting optional options for the server
options := []knetty.ServerOption{
knetty.WithServiceNewSessionCallBackFunc(newSessionCallBackFn),
}
// creating a new server with network settings such as tcp/upd, address such as 127.0.0.1:8000, and optional options
server := knetty.NewServer("tcp", "127.0.0.1:8000", options...)
// Initializing the server in a goroutine so that
// it won't block the graceful shutdown handling below
go func() {
if err := server.Server(); err != nil && errors.Is(err, http.ErrServerClosed) {
log.Printf("run server: %s\n", err)
}
}()
// Wait for interrupt signal to gracefully shutdown the server with
quit := make(chan os.Signal)
// kill (no param) default send syscall.SIGTERM
// kill -2 is syscall.SIGINT
// kill -9 is syscall.SIGKILL but can't be caught, so don't need to add it
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("shutting down server...")
// The context is used to inform the server it has 5 seconds to finish
// the request it is currently handling
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Fatal("server starting shutdown:", err)
}
log.Println("server exiting")
}
Benchmarks