go-library
简介
so-scaffold是基于go1.18开发的后端工具包,开箱即用。
特性
- 集成了mysql(读写分离)、redis(单点、哨兵、集群、codis)、mongodb(副本集、分片)、memcached、kafka等组件
- 支持http mock
- 支持定时任务
- 所有基础组件库的调用都默认开启了mon打点监测,可以很方便的监测应用程序依赖服务的状态。
快速开始
安装:
go env -w GOPRIVATE="git.danlansky.cn"
go env -w GOINSECURE="git.danlansky.cn"
git config --global url."http://git.danlansky.cn".insteadOf "https://git.danlansky.cn"
go get gitee.com/danlansky/go-library
1.使用mysql
import (
"encoding/json"
"fmt"
"gitee.com/danlansky/go-library/logs"
"gitee.com/danlansky/go-library/repository/mysqldb"
"time"
)
type AppChannel struct {
Appid int `gorm:"column:appid" json:"appid"`
AdminUserid int `gorm:"column:admin_userid" json:"admin_userid"`
ChannelName string `gorm:"column:channel_name" json:"channel_name"`
ChannelFromId string `gorm:"column:channel_fromid" json:"channel_fromid"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime"`
DeleteTime time.Time `gorm:"column:deleteTime" json:"deleteTime"`
Id int `gorm:"column:id" json:"id"`
}
func (a AppChannel) TableName() string {
return "app_channel"
}
func TestMysql() {
// 可以只在全局初始化一次Metric
logs.InitLog(log.Setting{
RuntimeRootPath: "/your_path/",
LocalIp: "127.0.0.1",
Project: "test",
})
// 还有很多配置项,具体可以看代码。这里只列了必须项
var setting = mysqldb.Setting{
Name: "test",
Type: "mysql",
Source: "127.0.0.1:3306",
Replica: "127.0.0.1:3307",
Username: "username",
Password: "password",
Database: "test",
}
db := mysqldb.InitMysqlDB(setting)
// 初始化logger,默认会打印sql的慢日志,级别为warn
db.SetLogger(logs.DbLogger{})
var appinfo AppChannel
var res []byte
for i := 0; i < 1000; i++ {
db.Db.Model(appinfo).Where("id=?", 30).First(&appinfo)
res, _ = json.Marshal(appinfo)
fmt.Println(string(res))
}
}
2.使用redis
import (
"time"
"gitee.com/danlansky/go-library/repository/redis"
)
func TestRedis() {
// 具体见配置项,支持codis、sentinel、redis cluster模式
var setting = redis.Setting{
Name: "test",
Type: "redis",
Addr: []string{"127.0.0.1:6380"},
}
redis_ := redis.InitRedis(setting)
for i := 0; i < 1000; i++ {
_, _ = redis_.Set("test_c", "value1")
_, _ = redis_.Get("test_c")
}
}
3.使用memcached
import (
"fmt"
"gitee.com/danlansky/go-library/repository/memcached"
"time"
)
func TestMemcached() {
// 具体见配置项
var setting = memcached.Setting{
Name: "test",
Addr: []string{"127.0.0.1:11214"},
}
mem := memcached.InitMemcached(setting)
var item memcached.Item
var err error
for i := 0; i < 1000; i++ {
item = memcached.Item{
Key: "test_c",
Value: []byte("value"),
Expiration: 100,
}
err = mem.Set(&item)
if err != nil {
fmt.Println(err)
}
resPoint, err := mem.Get("test_c")
if err != nil {
fmt.Println(err)
} else {
fmt.Println(string((*resPoint).Value))
}
}
}
4.使用mongo
import (
"context"
"fmt"
"github.com/qiniu/qmgo/field"
"gitee.com/danlansky/go-library/repository/mongo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
)
type UserInfo struct {
field.DefaultField `bson:",inline"`
Name string `bson:"name"`
Age int `bson:"age"`
Talent string `bson:"talent"`
}
func TestMongoPool() {
// 具体见配置项
setting := mongo.Setting{
Name: "test2",
Type: "single",
Addr: []string{"127.0.0.1:27017"},
}
mongoPool, err := mongo.InitMongo(setting)
if err != nil {
fmt.Println(err)
return
}
var res UserInfo
objId, _ := primitive.ObjectIDFromHex("626e26ef1445fe382a501ddc")
for i := 0; i < 1000; i++ {
ctx := context.Background()
cli := mongoPool.Client.Database("test_db").Collection("test_collection")
// Find时,要加NewQueryMonOpt才会进行大点,insert、replace同理
cli.Find(ctx, bson.M{"_id": objId}, mongo.NewQueryMonOpt("test_db", "test_collection")).One(&res)
fmt.Println(res)
}
_ = mongoPool.Client.Close(context.Background())
}
5.使用http调用
type AutoGenerated struct {
Reason string `json:"reason"`
Code int `json:"code"`
Status string `json:"status"`
Result Result `json:"result"`
}
type Data struct {
Docid string `json:"docid"`
Title string `json:"title"`
Date string `json:"date"`
}
type Result struct {
Data []Data `json:"data"`
}
type TestContext struct{}
func (c TestContext) GetStringMapString(traffic string) map[string]string {
return map[string]string{
"test_traffic_wemedia": "470",
}
}
func TestHttp() {
var setting = httpclient.URLSetting{
Timeout: 1 * time.Second,
RetryCount: 3,
}
client := httpclient.New(setting)
var res AutoGenerated
var res1 []byte
for i := 0; i < 100; i++ {
// 非mock场景
_, _ = client.GetByRes("http://www.baidu.com",
map[string]string{"media_id": "1868751", "start": "0", "end": "1"}, &res)
// mock场景。实际开发,这里传gin.Context,(前提是要先使用了本包中为gin写的mock中间件)
res1, _ = client.SetMockParam("test_traffic_wemedia", TestContext{}).GetByRes("http://www.baidu.com",
map[string]string{"media_id": "1868751", "start": "0", "end": "1"}, &res)
fmt.Println(string(res1))
}
}
6.kafka
生产者示例:
// TestSyncProducer 同步生产者
func TestSyncProducer() {
var setting = ProducerSetting{
Hosts: []string{
"127.0.0.1:9092",
},
Topic: "test_topic",
// 同步生产时,ReturnError和ReturnSuccess都必须指定为true
ReturnError: true,
ReturnSuccess: true,
}
producer, err := NewSyncProducer(setting)
if err != nil {
fmt.Println(err)
}
for i := 0; i < 1000; i++ {
err = producer.SendMsg("test_topic", "data", "{\"test\":\"value\"}")
if err != nil {
fmt.Println(err)
}
}
producer.Close()
}
// TestAsyncProducer 异步生产者
func TestAsyncProducer() {
var setting = ProducerSetting{
Hosts: []string{
"127.0.0.1:9092",
},
Topic: "test_topic",
// 建议仅指定ReturnError为true
ReturnError: true,
// error时的回调处理,可以是日志、mon打点等
ErrorCallback: func(err error) {
fmt.Println(err)
},
}
producer, err := NewAsyncProducer(setting)
if err != nil {
fmt.Println(err)
}
for i := 0; i < 1000; i++ {
err = producer.SendMsg("test_topic", "data", "{\"test\":\"value\"}")
if err != nil {
fmt.Println(err)
}
}
producer.Close()
}
消费者示例:
// MsgHandler 实现ConsumerHandler接口
type MsgHandler struct{}
func (c *MsgHandler) SetupHook() {
fmt.Println("test set up")
}
func (c *MsgHandler) CleanUpHook() {
fmt.Println("test clean up")
}
func (c *MsgHandler) HandleMsg(key, msg string) {
// 仅需要写单条消息的处理逻辑即可。MarkMessage和Commit offset已经做了处理,不需要再考虑
fmt.Println("get key", key)
fmt.Println("get value", msg)
// write mysql...
}
// TestNewConsumer1 自动commit offset(易导致消息重复消费)
func TestNewConsumer1(t *testing.T) {
var setting = ConsumerSetting{
Hosts: []string{
"127.0.0.1:9092",
},
Topic: "test_topic",
GroupId: "test_group_id",
// offset=-1,代表无偏移量时从最新位置开始消费
// offset=-2,代表无偏移量时从最原始的位置开始消费
// kafka使用groupid-topic-partition三元组标记offset
Offset: -1,
ReturnError: true,
ErrorCallback: func(err error) {
fmt.Println(err)
},
// 每条消息被HandleMsg处理完后,都会调用MarkMessage记录offset位置
// 达到commit interval后自动commit
AutoCommit: true,
}
consumer, err := NewConsumer(setting)
if err != nil {
fmt.Println(err)
return
}
var msgHandler = MsgHandler{}
consumer.StartGroupConsume(&msgHandler)
}
// TestNewConsumer2 手动commit
func TestNewConsumer2(t *testing.T) {
var setting = ConsumerSetting{
Hosts: []string{
"127.0.0.1:9092",
},
Topic: "test_topic",
GroupId: "test_group_id",
ReturnError: true,
ErrorCallback: func(err error) {
fmt.Println(err)
},
// 每条消息被HandleMsg处理完后,都会调用MarkMessage记录offset位置
// 另外还会显式调用一次Commit
AutoCommit: false,
}
consumer, err := NewConsumer(setting)
if err != nil {
fmt.Println(err)
return
}
var msgHandler = MsgHandler{}
consumer.StartGroupConsume(&msgHandler)
}
7.crontab
import (
"fmt"
"gitee.com/danlansky/go-library/cron"
"time"
)
func TestCrontab() {
s := cron.NewCrontab()
_ = s.AddTaskFunc("test", "* * * * *", func() { time.Sleep(2 * time.Second) })
j := s.Scheduler
fmt.Println(j.IsRunning())
// 异步,不阻塞当前进程
s.StartAsync()
time.Sleep(time.Second)
fmt.Println(j.IsRunning())
time.Sleep(time.Second)
s.Stop()
time.Sleep(1 * time.Second)
fmt.Println(j.IsRunning())
}
8.其他
-
log:日志结构已处理好。使用示例:
import (
"gitee.com/danlansky/go-library/logs"
"go.uber.org/zap"
"time"
)
// TestLog 默认日志记录位置:/home/service/{service_name}/logs/
func TestLog() {
var setting = logs.Setting{
// 默认为/home/service/
RuntimeRootPath: "your_path",
LocalIp: "127.0.0.1",
Project: "go-scaffold-demo",
}
logs.InitLog(setting)
for i := 0; i < 1000; i++ {
logs.AccessLog("test-access", zap.String("key", "value"))
logs.LatencyLog("test-latency")
logs.ErrorLog("test-error")
}
}
-
middleware:middleware中提供了一些写好的gin中间件,例如限流、accesslog、接口mon等
-
utils:提供了一些常用工具,uuid、本地缓存等
遗留项
- runtime监控,包含goroutine数量、GC时间次数等监控。计划使用一个单独的goroutine定时抓取runtime信息,上报到一点mon服务
- crontab监控。原生gocron不支持运行状态查看、插件,计划在gocron的基础上进行二次开发
- ...