overloader

package
v6.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2021 License: Apache-2.0 Imports: 5 Imported by: 0

README

overloader

A plugin to protect erpc from overload.

Test
package overloader

import (
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"github.com/fyyang/erpc/v6"
	"github.com/stretchr/testify/assert"
)

type Home struct {
	erpc.CallCtx
}

func (h *Home) Test(arg *map[string]string) (map[string]interface{}, *erpc.Status) {
	return map[string]interface{}{
		"arg": *arg,
	}, nil
}

func TestPlugin(t *testing.T) {
	ol := New(LimitConfig{
		MaxConn:     1,
		QPSInterval: 100 * time.Millisecond,
		MaxTotalQPS: 2,
		MaxHandlerQPS: []HandlerLimit{
			{ServiceMethod: "/home/test", MaxQPS: 1},
		},
	})
	// Server
	srv := erpc.NewPeer(
		erpc.PeerConfig{ListenPort: 9090, CountTime: true},
		ol,
	)
	srv.RouteCall(new(Home))
	go srv.ListenAndServe()
	time.Sleep(1e9)

	// Client
	cli := erpc.NewPeer(
		erpc.PeerConfig{CountTime: true},
	)
	var testClient = func(connNum, totalQPS int) (olConnCount, olQPSCount int64) {
		var connGW sync.WaitGroup
		connGW.Add(connNum)
		defer connGW.Wait()
		for index := 0; index < connNum; index++ {
			go func() {
				defer connGW.Done()
				sess, stat := cli.Dial(":9090")
				if !stat.OK() {
					t.Fatal(stat)
				}
				defer sess.Close()
				time.Sleep(time.Millisecond)
				if !sess.Health() {
					atomic.AddInt64(&olConnCount, 1)
					t.Logf("connNum:%d, totalQPS:%d, dial: Connection Closed", connNum, totalQPS)
					return
				}
				var qpsGW sync.WaitGroup
				qpsGW.Add(totalQPS)
				defer qpsGW.Wait()
				for i := 0; i < totalQPS; i++ {
					go func() {
						defer qpsGW.Done()
						stat = sess.Call("/home/test", nil, nil).Status()
						if !stat.OK() {
							atomic.AddInt64(&olQPSCount, 1)
							t.Logf("connNum:%d, totalQPS:%d, call:%s", connNum, totalQPS, stat)
						}
					}()
				}
			}()
		}
		return
	}

	{
		olConnCount, olQPSCount := testClient(1, 1)
		assert.Equal(t, int64(0), olConnCount)
		assert.Equal(t, int64(0), olQPSCount)
	}
	{
		time.Sleep(time.Second)
		assert.Equal(t, 0, srv.CountSession())
		olConnCount, olQPSCount := testClient(2, 1)
		assert.Equal(t, int64(1), olConnCount)
		assert.Equal(t, int64(0), olQPSCount)
	}
	{
		time.Sleep(time.Second)
		assert.Equal(t, 0, srv.CountSession())
		olConnCount, olQPSCount := testClient(1, 2)
		assert.Equal(t, int64(0), olConnCount)
		assert.Equal(t, int64(1), olQPSCount)
	}
	{
		ol.Update(LimitConfig{
			MaxConn:     100,
			QPSInterval: time.Second,
			MaxTotalQPS: 2000,
		})
		time.Sleep(time.Second) // Wait for one cycle to take effect
		assert.Equal(t, 0, srv.CountSession())
		olConnCount, olQPSCount := testClient(10, 200)
		assert.Equal(t, int64(0), olConnCount)
		assert.Equal(t, int64(0), olQPSCount)
	}
}

test command:

go test -v -run=TestPlugin

Documentation

Overview

Package overloader is a plugin to protect erpc from overload.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type HandlerLimit

type HandlerLimit struct {
	ServiceMethod string
	MaxQPS        int32
}

HandlerLimit handler QPS overload limitation condition

type LimitConfig

type LimitConfig struct {
	MaxConn       int32
	QPSInterval   time.Duration
	MaxTotalQPS   int32
	MaxHandlerQPS []HandlerLimit
}

LimitConfig overload limitation condition

type Overloader

type Overloader struct {
	// contains filtered or unexported fields
}

Overloader plug-in to protect erpc from overload

func New

func New(initLimitConfig LimitConfig) *Overloader

New creates a plug-in to protect erpc from overload.

func (*Overloader) LimitConfig

func (o *Overloader) LimitConfig() LimitConfig

LimitConfig returns the overload limitation condition.

func (*Overloader) Name

func (o *Overloader) Name() string

Name returns the plugin name.

func (*Overloader) PostAccept

func (o *Overloader) PostAccept(_ erpc.PreSession) *erpc.Status

PostAccept checks connection overload. If overload, print error log and close the connection.

func (*Overloader) PostDial

func (o *Overloader) PostDial(sess erpc.PreSession, isRedial bool) *erpc.Status

PostDial checks connection overload. If overload, print error log and close the connection.

func (*Overloader) PostDisconnect

func (o *Overloader) PostDisconnect(_ erpc.BaseSession) *erpc.Status

PostDisconnect releases connection count.

func (*Overloader) PostReadCallHeader

func (o *Overloader) PostReadCallHeader(ctx erpc.ReadCtx) *erpc.Status

PostReadCallHeader checks PULL QPS overload. If overload, print error log and reply error.

func (*Overloader) PostReadPushHeader

func (o *Overloader) PostReadPushHeader(ctx erpc.ReadCtx) *erpc.Status

PostReadPushHeader checks PUSH QPS overload. If overload, print warning log.

func (*Overloader) Update

func (o *Overloader) Update(newLimitConfig LimitConfig)

Update updates the overload limitation condition.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL