base

module
v2.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2024 License: MIT

README

base

cache 本地内存缓存组件,会同步到本地目录,重启后从本地加载到内存
package main

import (
	"time"

	"github.com/grpc-boot/base/v2/cache"
	"github.com/grpc-boot/base/v2/kind/msg"
	"github.com/grpc-boot/base/v2/utils"
)

func main() {
	localPath := "./cache"
	c := cache.New(localPath, time.Second*5)

	defer func() {
		// 手动同步数据到本地,运行过程中会自动同步
		c.Flush()
	}()

	value := cache.CommonGet[msg.Map](c, "index:conf", 10, func() (msg.Map, error) {
		// 模拟耗时
		time.Sleep(time.Second)

		return msg.Map{
			"rate":       3.14,
			"text":       "cache test",
			"updated_at": time.Now().Unix(),
		}, nil
	})

	conf := msg.MsgMap(value)
	utils.Green("rate: %.2f text: %s updated at: %d", conf.Float("rate"), conf.String("text"), conf.Int("updated_at"))
}
gored操作redis

缓存处理

package gored

import (
	"context"
	"testing"
	"time"

	"github.com/grpc-boot/base/v2/kind/msg"
)

func init() {
	o := DefaultOptions()
	SetRedis("redis", o)
}

func TestGetItemWithCache(t *testing.T) {
	red, _ := GetRedis("redis")
	item, err := GetItemWithCacheTimeout(time.Second, red, "cache", time.Now().Unix(), 6, func() (value msg.Map, err error) {
		value = msg.Map{
			"id":   10086,
			"name": "移动",
		}
		return
	})

	if err != nil {
		t.Fatalf("want nil, got %v", err)
	}

	t.Logf("value: %v", item.Map())
}

func TestAcquire(t *testing.T) {
	red, _ := GetRedis("redis")

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	cmd := Acquire(ctx, red, "acquire", 10)
	err := DealCmdErr(cmd)
	if err != nil {
		t.Fatalf("want nil, got %v", err)
	}

	token := cmd.Val()
	if token > 0 {
		t.Logf("acquire token: %d", token)
		ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second)
		defer cancel1()

		rCmd := Release(ctx1, red, "acquire", token)
		err = DealCmdErr(rCmd)
		if err != nil {
			t.Fatalf("want nil, got %v", err)
		}
	} else {
		t.Logf("do not acquire token")
	}
}

令牌桶限速

func TestGetToken(t *testing.T) {
	red, _ := GetRedis("redis")
	max := 100

	for i := 0; i < max; i++ {
		cmd := SecondLimitByToken(context.Background(), red, "token", 3, 1, 6)
		err := DealCmdErr(cmd)
		if err != nil {
			t.Fatalf("want nil, got %v", err)
		}

		if cmd.Val() {
			t.Logf("got token")
		}
	}
}
运行时开启关闭pprof,当系统出现问题时可以实时开启pprof定位系统问题
package main

import (
	"context"
	"fmt"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/grpc-boot/base/v2/internal"
	"github.com/grpc-boot/base/v2/utils"
)

type router struct {
}

func (r *router) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	_, _ = w.Write(internal.String2Bytes(`ok`))
}

func main() {
	server := http.Server{
		Addr:    ":8080",
		Handler: &router{},
	}

	go func() {
		err := server.ListenAndServe()
		if err != http.ErrServerClosed {
			panic(err)
		}
	}()

	defer func() {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
		defer cancel()
		err := server.Shutdown(ctx)
		if err != nil {
			fmt.Printf("shutdown server error:%v", err)
		}
	}()

	var sig = make(chan os.Signal, 1)
	signal.Notify(sig)

	for {
		val := <-sig
		switch val {
		case syscall.SIGUSR1:
			if utils.PprofIsRun() {
				continue
			}

			go func() {
				err := utils.StartPprof(":8081", nil)
				if err != nil {
					fmt.Printf("start pprof error:%v", err)
				}
			}()
		case syscall.SIGUSR2:
			if !utils.PprofIsRun() {
				continue
			}

			err := utils.StopPprofWithTimeout(10)
			if err != nil {
				fmt.Printf("stop pprof error:%v", err)
			}
		default:
			signal.Stop(sig)
			return
		}
	}
}
## 开启pprof
kill -USR1 ${pid}

## 关闭pprof
kill -USR2 ${pid}
utils.Timeout帮助方法,简化超时实现逻辑
func TestTimeout(t *testing.T) {
    err := Timeout(time.Second, func(args ...any) {
        time.Sleep(time.Millisecond * 500)
    })
    
    if err != nil {
        t.Fatalf("want nil, got %v", err)
    }
    
    err = Timeout(time.Millisecond*100, func(args ...any) {
        time.Sleep(time.Millisecond * 200)
    })
    
    if err != context.DeadlineExceeded {
        t.Fatalf("want err, got %v", err)
    }
}
utils.Recover帮助方法,减少未知panic导致进程宕掉
func TestRecover(t *testing.T) {
	go Recover("recover test", func(args ...any) {
		panic("panic with test")
	})
}
utils.Join 可以对int、int32等数值类型进行Join
func TestJoin(t *testing.T) {
	ss := []string{"s1", "s2"}

	res1 := strings.Join(ss, ",")
	t.Logf("res1: %s", res1)

	is := []int{1, 2, 45}
	resInt := Join(",", is...)
	t.Logf("resInt: %s", resInt)

	i32s := []int32{1, 2, 45}
	resInt32 := Join(",", i32s...)
	t.Logf("resInt32: %s", resInt32)
}
utils.Acquire基于原子操作的超时锁
func TestAcquire(t *testing.T) {
	var (
		workerNum = 32
		waitTime  = time.Second * 60
		done      atomic.Bool
		locker    int64
		wa        sync.WaitGroup
	)

	wa.Add(workerNum)

	for i := 0; i < workerNum; i++ {
		go func(w *sync.WaitGroup) {
			for {
				if done.Load() {
					break
				}

				token := Acquire(&locker, time.Second*3)
				if token > 0 {
					// 模拟业务操作成功,释放锁,操作失败,保留锁,防止频繁获得锁操作业务
					ok := func() bool {
						// load data from db to cache

						return rand.Int63()%2 == 0
					}()

					if ok {
						Release(&locker, token)
					}
				}
			}
			w.Done()
		}(&wa)
	}

	time.AfterFunc(waitTime, func() {
		done.Store(true)
	})

	wa.Wait()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL