go-library

module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2024 License: GPL-3.0

README

go-library

简介

so-scaffold是基于go1.18开发的后端工具包,开箱即用。

特性

  • 集成了mysql(读写分离)、redis(单点、哨兵、集群、codis)、mongodb(副本集、分片)、memcached、kafka等组件
  • 支持http mock
  • 支持定时任务
  • 所有基础组件库的调用都默认开启了mon打点监测,可以很方便的监测应用程序依赖服务的状态。

快速开始

安装:

go env -w GOPRIVATE="git.danlansky.cn"
go env -w GOINSECURE="git.danlansky.cn"
git config --global url."http://git.danlansky.cn".insteadOf "https://git.danlansky.cn"
go get gitee.com/danlansky/go-library
1.使用mysql

import (
	"encoding/json"
	"fmt"
	"gitee.com/danlansky/go-library/logs"
	"gitee.com/danlansky/go-library/repository/mysqldb"
	"time"
)

type AppChannel struct {
	Appid         int       `gorm:"column:appid" json:"appid"`
	AdminUserid   int       `gorm:"column:admin_userid" json:"admin_userid"`
	ChannelName   string    `gorm:"column:channel_name" json:"channel_name"`
	ChannelFromId string    `gorm:"column:channel_fromid" json:"channel_fromid"`
	CreateTime    time.Time `gorm:"column:createTime" json:"createTime"`
	DeleteTime    time.Time `gorm:"column:deleteTime" json:"deleteTime"`
	Id            int       `gorm:"column:id" json:"id"`
}

func (a AppChannel) TableName() string {
	return "app_channel"
}

func TestMysql() {
	// 可以只在全局初始化一次Metric
	logs.InitLog(log.Setting{
		RuntimeRootPath: "/your_path/",
		LocalIp:         "127.0.0.1",
		Project:         "test",
	})
  // 还有很多配置项,具体可以看代码。这里只列了必须项
	var setting = mysqldb.Setting{
		Name:     "test",
		Type:     "mysql",
		Source:   "127.0.0.1:3306",
		Replica:  "127.0.0.1:3307",
		Username: "username",
		Password: "password",
		Database: "test",
	}
	db := mysqldb.InitMysqlDB(setting)
	// 初始化logger,默认会打印sql的慢日志,级别为warn
	db.SetLogger(logs.DbLogger{})

	var appinfo AppChannel

	var res []byte
	for i := 0; i < 1000; i++ {
		db.Db.Model(appinfo).Where("id=?", 30).First(&appinfo)
		res, _ = json.Marshal(appinfo)
		fmt.Println(string(res))
	}
}
2.使用redis
import (
	"time"
  "gitee.com/danlansky/go-library/repository/redis"
)

func TestRedis() {
  // 具体见配置项,支持codis、sentinel、redis cluster模式
	var setting = redis.Setting{
		Name: "test",
		Type: "redis",
		Addr: []string{"127.0.0.1:6380"},
	}
	redis_ := redis.InitRedis(setting)

	for i := 0; i < 1000; i++ {
		_, _ = redis_.Set("test_c", "value1")
		_, _ = redis_.Get("test_c")
	}
}
3.使用memcached
import (
	"fmt"
	"gitee.com/danlansky/go-library/repository/memcached"
	"time"
)

func TestMemcached() {
	// 具体见配置项
	var setting = memcached.Setting{
		Name: "test",
		Addr: []string{"127.0.0.1:11214"},
	}

	mem := memcached.InitMemcached(setting)
	var item memcached.Item
	var err error
	for i := 0; i < 1000; i++ {
		item = memcached.Item{
			Key:        "test_c",
			Value:      []byte("value"),
			Expiration: 100,
		}
		err = mem.Set(&item)
		if err != nil {
			fmt.Println(err)
		}

		resPoint, err := mem.Get("test_c")
		if err != nil {
			fmt.Println(err)
		} else {
			fmt.Println(string((*resPoint).Value))
		}
	}
}
4.使用mongo
import (
	"context"
	"fmt"
	"github.com/qiniu/qmgo/field"
	"gitee.com/danlansky/go-library/repository/mongo"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/primitive"
	"time"
)

type UserInfo struct {
	field.DefaultField `bson:",inline"`
	Name               string `bson:"name"`
	Age                int    `bson:"age"`
	Talent             string `bson:"talent"`
}

func TestMongoPool() {
  // 具体见配置项
	setting := mongo.Setting{
		Name: "test2",
		Type: "single",
		Addr: []string{"127.0.0.1:27017"},
	}
	mongoPool, err := mongo.InitMongo(setting)
	if err != nil {
		fmt.Println(err)
		return
	}
	var res UserInfo
	objId, _ := primitive.ObjectIDFromHex("626e26ef1445fe382a501ddc")
	for i := 0; i < 1000; i++ {
		ctx := context.Background()
		cli := mongoPool.Client.Database("test_db").Collection("test_collection")
    // Find时,要加NewQueryMonOpt才会进行大点,insert、replace同理
		cli.Find(ctx, bson.M{"_id": objId}, mongo.NewQueryMonOpt("test_db", "test_collection")).One(&res)
		fmt.Println(res)

	}
	_ = mongoPool.Client.Close(context.Background())
}
5.使用http调用

type AutoGenerated struct {
	Reason string `json:"reason"`
	Code   int    `json:"code"`
	Status string `json:"status"`
	Result Result `json:"result"`
}
type Data struct {
	Docid string `json:"docid"`
	Title string `json:"title"`
	Date  string `json:"date"`
}
type Result struct {
	Data []Data `json:"data"`
}

type TestContext struct{}

func (c TestContext) GetStringMapString(traffic string) map[string]string {
	return map[string]string{
		"test_traffic_wemedia": "470",
	}
}

func TestHttp() {
	var setting = httpclient.URLSetting{
		Timeout:    1 * time.Second,
		RetryCount: 3,
	}

	client := httpclient.New(setting)
	var res AutoGenerated
	var res1 []byte
	for i := 0; i < 100; i++ {
		// 非mock场景
		_, _ = client.GetByRes("http://www.baidu.com",
			map[string]string{"media_id": "1868751", "start": "0", "end": "1"}, &res)
		// mock场景。实际开发,这里传gin.Context,(前提是要先使用了本包中为gin写的mock中间件)
		res1, _ = client.SetMockParam("test_traffic_wemedia", TestContext{}).GetByRes("http://www.baidu.com",
			map[string]string{"media_id": "1868751", "start": "0", "end": "1"}, &res)
		fmt.Println(string(res1))
	}
}

6.kafka

生产者示例:

// TestSyncProducer 同步生产者
func TestSyncProducer() {
	var setting = ProducerSetting{
		Hosts: []string{
			"127.0.0.1:9092",
		},
		Topic:         "test_topic",
       // 同步生产时,ReturnError和ReturnSuccess都必须指定为true
		ReturnError:   true,
		ReturnSuccess: true,
	}

	producer, err := NewSyncProducer(setting)
	if err != nil {
		fmt.Println(err)
	}
	for i := 0; i < 1000; i++ {
		err = producer.SendMsg("test_topic", "data", "{\"test\":\"value\"}")
		if err != nil {
			fmt.Println(err)
		}
	}
	producer.Close()
}

// TestAsyncProducer 异步生产者
func TestAsyncProducer() {
	var setting = ProducerSetting{
		Hosts: []string{
			"127.0.0.1:9092",
		},
		Topic:       "test_topic",
       // 建议仅指定ReturnError为true
		ReturnError: true,
       // error时的回调处理,可以是日志、mon打点等
		ErrorCallback: func(err error) {
			fmt.Println(err)
		},
	}

	producer, err := NewAsyncProducer(setting)
	if err != nil {
		fmt.Println(err)
	}
	for i := 0; i < 1000; i++ {
		err = producer.SendMsg("test_topic", "data", "{\"test\":\"value\"}")
		if err != nil {
			fmt.Println(err)
		}
	}
	producer.Close()
}

消费者示例:

// MsgHandler 实现ConsumerHandler接口
type MsgHandler struct{}

func (c *MsgHandler) SetupHook() {
	fmt.Println("test set up")
}

func (c *MsgHandler) CleanUpHook() {
	fmt.Println("test clean up")
}

func (c *MsgHandler) HandleMsg(key, msg string) {
    // 仅需要写单条消息的处理逻辑即可。MarkMessage和Commit offset已经做了处理,不需要再考虑
	fmt.Println("get key", key)
	fmt.Println("get value", msg)
	// write mysql...
}

// TestNewConsumer1 自动commit offset(易导致消息重复消费)
func TestNewConsumer1(t *testing.T) {

	var setting = ConsumerSetting{
		Hosts: []string{
			"127.0.0.1:9092",
		},
		Topic:       "test_topic",
		GroupId:     "test_group_id",
       // offset=-1,代表无偏移量时从最新位置开始消费
       // offset=-2,代表无偏移量时从最原始的位置开始消费
       // kafka使用groupid-topic-partition三元组标记offset
		Offset:      -1,
		ReturnError: true,
		ErrorCallback: func(err error) {
			fmt.Println(err)
		},
       // 每条消息被HandleMsg处理完后,都会调用MarkMessage记录offset位置
       // 达到commit interval后自动commit
		AutoCommit: true,
	}

	consumer, err := NewConsumer(setting)
	if err != nil {
		fmt.Println(err)
		return
	}

	var msgHandler = MsgHandler{}

	consumer.StartGroupConsume(&msgHandler)
}


// TestNewConsumer2 手动commit
func TestNewConsumer2(t *testing.T) {

	var setting = ConsumerSetting{
		Hosts: []string{
			"127.0.0.1:9092",
		},
		Topic:       "test_topic",
		GroupId:     "test_group_id",
		ReturnError: true,
		ErrorCallback: func(err error) {
			fmt.Println(err)
		},
       // 每条消息被HandleMsg处理完后,都会调用MarkMessage记录offset位置
       // 另外还会显式调用一次Commit
		AutoCommit: false,
	}

	consumer, err := NewConsumer(setting)
	if err != nil {
		fmt.Println(err)
		return
	}

	var msgHandler = MsgHandler{}

	consumer.StartGroupConsume(&msgHandler)

}
7.crontab
import (
	"fmt"
	"gitee.com/danlansky/go-library/cron"
	"time"
)


func TestCrontab() {
	s := cron.NewCrontab()
	_ = s.AddTaskFunc("test", "* * * * *", func() { time.Sleep(2 * time.Second) })

	j := s.Scheduler
	fmt.Println(j.IsRunning())
	
  // 异步,不阻塞当前进程
	s.StartAsync()

	time.Sleep(time.Second)
	fmt.Println(j.IsRunning())

	time.Sleep(time.Second)
	s.Stop()

	time.Sleep(1 * time.Second)
	fmt.Println(j.IsRunning())

}
8.其他
  • log:日志结构已处理好。使用示例:

    import (
    	"gitee.com/danlansky/go-library/logs"
    	"go.uber.org/zap"
    	"time"
    )
    
    // TestLog 默认日志记录位置:/home/service/{service_name}/logs/
    func TestLog() {
    	var setting = logs.Setting{
           // 默认为/home/service/
    		RuntimeRootPath: "your_path",
    		LocalIp:         "127.0.0.1",
    		Project:         "go-scaffold-demo",
    	}
    	logs.InitLog(setting)
    	for i := 0; i < 1000; i++ {
    		logs.AccessLog("test-access", zap.String("key", "value"))
    		logs.LatencyLog("test-latency")
    		logs.ErrorLog("test-error")
    	}
    }
    
  • middleware:middleware中提供了一些写好的gin中间件,例如限流、accesslog、接口mon等

  • utils:提供了一些常用工具,uuid、本地缓存等

遗留项

  • runtime监控,包含goroutine数量、GC时间次数等监控。计划使用一个单独的goroutine定时抓取runtime信息,上报到一点mon服务
  • crontab监控。原生gocron不支持运行状态查看、插件,计划在gocron的基础上进行二次开发
  • ...

Directories

Path Synopsis
base
db
repository
union

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL