goutils

command module
v1.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2022 License: MIT Imports: 7 Imported by: 0

README

github.com/liumingmin/goutils

gotuils目标是快速搭建应用的辅助代码库

ws模块用法

protoc --go_out=. ws/msg.proto

//js  
protoc --js_out=library=protobuf,binary:ws/js  ws/msg.proto

常用工具库

文件 说明
async.go 带超时异步调用
crc16.go 查表法crc16
crc16-kermit.go 算法实现crc16
csv_parse.go csv解析封装
httputils.go httpClient工具
math.go 数学库
models.go 反射创建对象
stringutils.go 字符串处理
struct.go 结构体工具(拷贝、合并)
tags.go 结构体tag工具
utils.go 其他工具类

algorithm

crc16_test.go crc16算法
TestCrc16

t.Log(Crc16([]byte("abcdefg")))
descartes_test.go 笛卡尔组合
TestDescartes

result := DescartesCombine([][]string{{"A", "B"}, {"1", "2", "3"}, {"a", "b", "c", "d"}})
for _, item := range result {
	t.Log(item)
}

cache 缓存模块

mem_cache_test.go 内存缓存
TestMemCacheFunc

ctx := context.Background()

const cacheKey = "UT:%v:%v"

var lCache = cache.New(5*time.Minute, 5*time.Minute)
result, err := MemCacheFunc(ctx, lCache, 60*time.Second, rawGetFunc0, cacheKey, "p1", "p2")
log.Info(ctx, "%v %v %v", result, err, printKind(result))

_memCacheFuncTestMore(ctx, lCache, cacheKey)
rds_cache_test.go Redis缓存
TestRdscCacheFunc

redisDao.InitRedises()
ctx := context.Background()

const cacheKey = "UT:%v:%v"
const RDSC_DB = "rdscdb"

rds := redisDao.Get(RDSC_DB)

result, err := RdsCacheFunc(ctx, rds, 60, rawGetFunc0, cacheKey, "p1", "p2")
log.Info(ctx, "%v %v %v", result, err, printKind(result))

_rdsDeleteCacheTestMore(ctx, rds, cacheKey)
TestRdsCacheMultiFunc

redisDao.InitRedises()
ctx := context.Background()
const RDSC_DB = "rdscdb"

rds := redisDao.Get(RDSC_DB)
result, err := RdsCacheMultiFunc(ctx, rds, 30, getThingsByIds, "multikey:%s", []string{"1", "2", "5", "3", "4", "10"})
if err == nil && result != nil {
	mapValue, ok := result.(map[string]*Thing)
	if ok {
		for key, value := range mapValue {
			log.Info(ctx, "%v===%v", key, value)
		}
	}
}

conf yaml配置模块

container 容器模块

bitmap_test.go 比特位表
TestBitmapExists

bitmap := initTestData()
t.Log(bitmap)

t.Log(bitmap.Exists(122))
t.Log(bitmap.Exists(123))

//data1 := []byte{1, 2, 4, 7}
//data2 := []byte{0, 1, 5}

TestBitmapSet

bitmap := initTestData()

t.Log(bitmap.Exists(1256))

bitmap.Set(1256)

t.Log(bitmap.Exists(1256))
TestBitmapUnionOr

bitmap := initTestData()
bitmap2 := initTestData()
bitmap2.Set(256)

bitmap3 := bitmap.Union(&bitmap2)
t.Log(bitmap3.Exists(256))

bitmap3.Set(562)
t.Log(bitmap3.Exists(562))

t.Log(bitmap.Exists(562))
TestBitmapBitInverse

bitmap := initTestData()

t.Log(bitmap.Exists(66))

bitmap.Inverse()

t.Log(bitmap.Exists(66))

const_hash_test.go 一致性HASH
TestConstHash


var ringchash CHashRing

var configs []CHashNode
for i := 0; i < 10; i++ {
	configs = append(configs, TestNode("node"+strconv.Itoa(i)))
}

ringchash.Adds(configs)

fmt.Println(ringchash.Debug())

fmt.Println("==================================", configs)

fmt.Println(ringchash.Get("jjfdsljk:dfdfd:fds"))

fmt.Println(ringchash.Get("jjfdxxvsljk:dddsaf:xzcv"))
//
fmt.Println(ringchash.Get("fcds:cxc:fdsfd"))
//
fmt.Println(ringchash.Get("vdsafd:32:fdsfd"))

fmt.Println(ringchash.Get("xvd:fs:xcvd"))

var configs2 []CHashNode
for i := 0; i < 2; i++ {
	configs2 = append(configs2, TestNode("node"+strconv.Itoa(10+i)))
}
ringchash.Adds(configs2)
fmt.Println("==================================")
fmt.Println(ringchash.Debug())
fmt.Println(ringchash.Get("jjfdsljk:dfdfd:fds"))

fmt.Println(ringchash.Get("jjfdxxvsljk:dddsaf:xzcv"))
//
fmt.Println(ringchash.Get("fcds:cxc:fdsfd"))
//
fmt.Println(ringchash.Get("vdsafd:32:fdsfd"))

fmt.Println(ringchash.Get("xvd:fs:xcvd"))

ringchash.Del("node0")

fmt.Println("==================================")
fmt.Println(ringchash.Debug())
fmt.Println(ringchash.Get("jjfdsljk:dfdfd:fds"))

fmt.Println(ringchash.Get("jjfdxxvsljk:dddsaf:xzcv"))
//
fmt.Println(ringchash.Get("fcds:cxc:fdsfd"))
//
fmt.Println(ringchash.Get("vdsafd:32:fdsfd"))

fmt.Println(ringchash.Get("xvd:fs:xcvd"))
lighttimer_test.go 轻量级计时器
TestStartTicks

lt := NewLightTimer()
lt.StartTicks(time.Millisecond)

lt.AddTimer(time.Second*time.Duration(2), func(fireSeqNo uint) bool {
	fmt.Println("callback", fireSeqNo, "-")
	if fireSeqNo == 4 {
		return true
	}
	return false
})

time.Sleep(time.Hour)
TestStartTicksDeadline


//NewLightTimerPool

lt := NewLightTimer()
lt.StartTicks(time.Millisecond)

lt.AddTimerWithDeadline(time.Second*time.Duration(2), time.Now().Add(time.Second*5), func(seqNo uint) bool {
	fmt.Println("callback", seqNo, "-")
	if seqNo == 4 {
		return true
	}
	return false
}, func(seqNo uint) bool {
	fmt.Println("end callback", seqNo, "-")
	return true
})

time.Sleep(time.Hour)
TestLtPool

pool := NewLightTimerPool(10, time.Millisecond)

for i := 0; i < 100000; i++ {
	tmp := i
	pool.AddTimerWithDeadline(strconv.Itoa(tmp), time.Second*time.Duration(2), time.Now().Add(time.Second*5), func(seqNo uint) bool {
		fmt.Println("callback", tmp, "-", seqNo, "-")
		if seqNo == 4 {
			return true
		}
		return false
	}, func(seqNo uint) bool {
		fmt.Println("end callback", tmp, "-", seqNo, "-")
		return true
	})
}

time.Sleep(time.Second * 20)

fmt.Println(runtime.NumGoroutine())

time.Sleep(time.Hour)
TestStartTicks2

lt := NewLightTimer()
lt.StartTicks(time.Millisecond)

lt.AddCallback(time.Second*time.Duration(3), func() {
	fmt.Println("invoke once")
})

time.Sleep(time.Hour)

db 数据库

elasticsearch ES搜索引擎
es6 ES6版本API
es_test.go
TestCreateIndexByModel

InitClients()

client := GetEsClient(testUserIndexKey)

err := client.CreateIndexByModel(context.Background(), testUserIndexName, &MappingModel{
	Mappings: map[string]Mapping{
		testUserTypeName: {
			Dynamic: false,
			Properties: map[string]*elasticsearch.MappingProperty{
				"userId": {
					Type:  "text",
					Index: false,
				},
				"nickname": {
					Type:     "text",
					Analyzer: "standard",
					Fields: map[string]*elasticsearch.MappingProperty{
						"std": {
							Type:     "text",
							Analyzer: "standard",
							ExtProps: map[string]interface{}{
								"term_vector": "with_offsets",
							},
						},
						"keyword": {
							Type: "keyword",
						},
					},
				},
				"status": {
					Type: "keyword",
				},
				"pType": {
					Type: "keyword",
				},
			},
		},
	},
	Settings: elasticsearch.MappingSettings{
		SettingsIndex: elasticsearch.SettingsIndex{
			IgnoreMalformed:  true,
			NumberOfReplicas: 1,
			NumberOfShards:   3,
		},
	},
})

t.Log(err)
TestEsInsert

InitClients()

client := GetEsClient(testUserIndexKey)

for i := 0; i < 100; i++ {
	ptype := "normal"
	if i%10 == 5 {
		ptype = "vip"
	}
	status := "valid"
	if i%30 == 2 {
		status = "invalid"
	}
	id := "000000000" + fmt.Sprint(i)
	err := client.Insert(context.Background(), testUserIndexName, testUserTypeName,
		id, testUser{UserId: id, Nickname: "超级棒" + id, Status: status, Type: ptype})
	t.Log(err)
}
TestEsBatchInsert

InitClients()

client := GetEsClient(testUserIndexKey)

ids := make([]string, 0)
items := make([]interface{}, 0)

for i := 0; i < 100; i++ {
	ptype := "normal"
	if i%10 == 5 {
		ptype = "vip"
	}
	status := "valid"
	if i%30 == 2 {
		status = "invalid"
	}
	id := "x00000000" + fmt.Sprint(i)

	ids = append(ids, id)
	items = append(items, &testUser{UserId: id, Nickname: "超级棒" + id, Status: status, Type: ptype})
}

err := client.BatchInsert(context.Background(), testUserIndexName, testUserTypeName, ids, items)
t.Log(err)
TestEsUpdateById

InitClients()
client := GetEsClient(testUserIndexKey)

id := "000000000" + fmt.Sprint(30)

err := client.UpdateById(context.Background(), testUserIndexName, testUserTypeName,
	id, map[string]interface{}{
		"status": "invalid",
	})
t.Log(err)

err = client.UpdateById(context.Background(), testUserIndexName, testUserTypeName,
	id, map[string]interface{}{
		"extField": "ext1234",
	})
t.Log(err)
TestDeleteById

InitClients()
client := GetEsClient(testUserIndexKey)

id := "000000000" + fmt.Sprint(9)

err := client.DeleteById(context.Background(), testUserIndexName, testUserTypeName,
	id)
t.Log(err)
TestQueryEs

InitClients()
client := GetEsClient(testUserIndexKey)

bq := elastic.NewBoolQuery()
bq.Must(elastic.NewMatchQuery("nickname", "超级棒"))

var users []testUser
total := int64(0)
err := client.FindByModel(context.Background(), elasticsearch.QueryModel{
	IndexName: testUserIndexName,
	TypeName:  testUserTypeName,
	Query:     bq,
	Size:      5,
	Results:   &users,
	Total:     &total,
})
bs, _ := json.Marshal(users)
t.Log(len(users), total, string(bs), err)
TestQueryEsQuerySource

InitClients()
client := GetEsClient(testUserIndexKey)

source := `{
	"from":0,
	"size":25,
	"query":{
		"match":{"nickname":"超级"}
	}
}`

var users []testUser
total := int64(0)
err := client.FindBySource(context.Background(), elasticsearch.SourceModel{
	IndexName: testUserIndexName,
	TypeName:  testUserTypeName,
	Source:    source,
	Results:   &users,
	Total:     &total,
})
bs, _ := json.Marshal(users)
t.Log(len(users), total, string(bs), err)
TestAggregateBySource

InitClients()
client := GetEsClient(testUserIndexKey)

source := `{
	"from": 0,
	"size": 0,
	"_source": {
		"includes": [
			"status",
			"pType",
			"COUNT"
		],
		"excludes": []
	},
	"stored_fields": [
		"status",
		"pType"
	],
	"aggregations": {
		"status": {
			"terms": {
				"field": "status",
				"size": 200,
				"min_doc_count": 1,
				"shard_min_doc_count": 0,
				"show_term_doc_count_error": false,
				"order": [
					{
						"_count": "desc"
					},
					{
						"_key": "asc"
					}
				]
			},
			"aggregations": {
				"pType": {
					"terms": {
						"field": "pType",
						"size": 10,
						"min_doc_count": 1,
						"shard_min_doc_count": 0,
						"show_term_doc_count_error": false,
						"order": [
							{
								"_count": "desc"
							},
							{
								"_key": "asc"
							}
						]
					},
					"aggregations": {
						"statusCnt": {
							"value_count": {
								"field": "_index"
							}
						}
					}
				}
			}
		}
	}
}`

var test AggregationTest
client.AggregateBySource(context.Background(), elasticsearch.AggregateModel{
	IndexName: testUserIndexName,
	TypeName:  testUserTypeName,
	Source:    source,
	AggKeys:   []string{"status"},
}, &test)
t.Log(test)
es7 ES7版本API
es_test.go
TestCreateIndexByModel

InitClients()

client := GetEsClient(testUserIndexKey)

err := client.CreateIndexByModel(context.Background(), testUserIndexName, &MappingModel{
	Mapping: Mapping{
		Dynamic: false,
		Properties: map[string]*elasticsearch.MappingProperty{
			"userId": {
				Type:  "text",
				Index: false,
			},
			"nickname": {
				Type:     "text",
				Analyzer: "standard",
				Fields: map[string]*elasticsearch.MappingProperty{
					"std": {
						Type:     "text",
						Analyzer: "standard",
						ExtProps: map[string]interface{}{
							"term_vector": "with_offsets",
						},
					},
					"keyword": {
						Type: "keyword",
					},
				},
			},
			"status": {
				Type: "keyword",
			},
			"pType": {
				Type: "keyword",
			},
		},
	},
	Settings: elasticsearch.MappingSettings{
		SettingsIndex: elasticsearch.SettingsIndex{
			IgnoreMalformed:  true,
			NumberOfReplicas: 2,
			NumberOfShards:   3,
		},
	},
})

t.Log(err)
TestEsInsert

InitClients()
client := GetEsClient(testUserIndexKey)

for i := 0; i < 100; i++ {
	ptype := "normal"
	if i%10 == 5 {
		ptype = "vip"
	}
	status := "valid"
	if i%30 == 2 {
		status = "invalid"
	}
	id := "000000000" + fmt.Sprint(i)
	err := client.Insert(context.Background(), testUserIndexName,
		id, testUser{UserId: id, Nickname: "超级棒" + id, Status: status, Type: ptype})
	t.Log(err)
}
TestEsBatchInsert

InitClients()
client := GetEsClient(testUserIndexKey)

ids := make([]string, 0)
items := make([]interface{}, 0)

for i := 0; i < 100; i++ {
	ptype := "normal"
	if i%10 == 5 {
		ptype = "vip"
	}
	status := "valid"
	if i%30 == 2 {
		status = "invalid"
	}
	id := "x00000000" + fmt.Sprint(i)

	ids = append(ids, id)
	items = append(items, &testUser{UserId: id, Nickname: "超级棒" + id, Status: status, Type: ptype})
}

err := client.BatchInsert(context.Background(), testUserIndexName, ids, items)
t.Log(err)
TestEsUpdateById

InitClients()
client := GetEsClient(testUserIndexKey)

id := "000000000" + fmt.Sprint(30)

err := client.UpdateById(context.Background(), testUserIndexName,
	id, map[string]interface{}{
		"status": "invalid",
	})
t.Log(err)

err = client.UpdateById(context.Background(), testUserIndexName,
	id, map[string]interface{}{
		"extField": "ext1234",
	})
t.Log(err)
TestDeleteById

InitClients()
client := GetEsClient(testUserIndexKey)

id := "000000000" + fmt.Sprint(9)

err := client.DeleteById(context.Background(), testUserIndexName, id)
t.Log(err)
TestQueryEs

InitClients()
client := GetEsClient(testUserIndexKey)

bq := elastic.NewBoolQuery()
bq.Must(elastic.NewMatchQuery("nickname", "超级棒"))

var users []testUser
total := int64(0)
err := client.FindByModel(context.Background(), elasticsearch.QueryModel{
	IndexName: testUserIndexName,
	Query:     bq,
	Size:      5,
	Results:   &users,
	Total:     &total,
})
bs, _ := json.Marshal(users)
t.Log(len(users), total, string(bs), err)
TestQueryEsQuerySource

InitClients()
client := GetEsClient(testUserIndexKey)
source := `{
	"from":0,
	"size":25,
	"query":{
		"match":{"nickname":"超级"}
	}
}`

var users []testUser
total := int64(0)
err := client.FindBySource(context.Background(), elasticsearch.SourceModel{
	IndexName: testUserIndexName,
	Source:    source,
	Results:   &users,
	Total:     &total,
})
bs, _ := json.Marshal(users)
t.Log(len(users), total, string(bs), err)
TestAggregateBySource

InitClients()
client := GetEsClient(testUserIndexKey)
source := `{
	"from": 0,
	"size": 0,
	"_source": {
		"includes": [
			"status",
			"pType",
			"COUNT"
		],
		"excludes": []
	},
	"stored_fields": [
		"status",
		"pType"
	],
	"aggregations": {
		"status": {
			"terms": {
				"field": "status",
				"size": 200,
				"min_doc_count": 1,
				"shard_min_doc_count": 0,
				"show_term_doc_count_error": false,
				"order": [
					{
						"_count": "desc"
					},
					{
						"_key": "asc"
					}
				]
			},
			"aggregations": {
				"pType": {
					"terms": {
						"field": "pType",
						"size": 10,
						"min_doc_count": 1,
						"shard_min_doc_count": 0,
						"show_term_doc_count_error": false,
						"order": [
							{
								"_count": "desc"
							},
							{
								"_key": "asc"
							}
						]
					},
					"aggregations": {
						"statusCnt": {
							"value_count": {
								"field": "_index"
							}
						}
					}
				}
			}
		}
	}
}`

var test AggregationTest
client.AggregateBySource(context.Background(), elasticsearch.AggregateModel{
	IndexName: testUserIndexName,
	Source:    source,
	AggKeys:   []string{"status"},
}, &test)
t.Log(test)
kafka kafka消息队列
kafka_test.go
TestKafkaProducer

InitKafka()
producer := GetProducer("user_producer")
producer.Produce(&sarama.ProducerMessage{
	Topic: userTopic,
	Key:   sarama.ByteEncoder(fmt.Sprint(time.Now().Unix())),
	Value: sarama.ByteEncoder(fmt.Sprint(time.Now().Unix())),
})

time.Sleep(time.Second * 5)
TestKafkaConsumer

InitKafka()

consumer := GetConsumer("user_consumer")
go func() {
	consumer.Consume(userTopic, func(msg *sarama.ConsumerMessage) error {
		fmt.Println(string(msg.Key), "=", string(msg.Value))
		return nil
	}, func(err error) {

	})
}()

producer := GetProducer("user_producer")
for i := 0; i < 10; i++ {
	producer.Produce(&sarama.ProducerMessage{
		Topic: userTopic,
		Key:   sarama.ByteEncoder(fmt.Sprint(i)),
		Value: sarama.ByteEncoder(fmt.Sprint(time.Now().Unix())),
	})
}

time.Sleep(time.Second * 5)
mongo mongo数据库
collection_test.go
TestInsert

ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)

op := NewCompCollectionOp(c, dbName, collectionName)
op.Insert(ctx, testUser{
	UserId:   "1",
	Nickname: "超级棒",
	Status:   "valid",
	Type:     "normal",
})

var result interface{}
op.FindOne(ctx, FindModel{
	Query:   bson.M{"user_id": "1"},
	Results: &result,
})

log.Info(ctx, "result: %v", result)
TestUpdate

ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)

op := NewCompCollectionOp(c, dbName, collectionName)
op.Update(ctx, bson.M{"user_id": "1"}, bson.M{"$set": bson.M{"nick_name": "超级棒++"}})

var result interface{}
op.FindOne(ctx, FindModel{
	Query:   bson.M{"user_id": "1"},
	Results: &result,
})

log.Info(ctx, "result: %v", result)
TestFind

ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)

op := NewCompCollectionOp(c, dbName, collectionName)

var result []bson.M
err := op.Find(ctx, FindModel{
	Query:   bson.M{"user_id": "1"},
	Results: &result,
})
if err != nil {
	log.Error(ctx, "Mgo find err: %v", err)
	return
}

for _, item := range result {
	t.Log(item)
}
TestDelete

ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)

op := NewCompCollectionOp(c, dbName, collectionName)
op.Delete(ctx, bson.M{"user_id": "1"})

var result interface{}
op.FindOne(ctx, FindModel{
	Query:   bson.M{"user_id": "1"},
	Results: &result,
})

log.Info(ctx, "result: %v", result)
TestUpert

ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)

op := NewCompCollectionOp(c, dbName, collectionName)
err := op.Upsert(ctx, bson.M{"name": "tom2"}, bson.M{"$set": bson.M{"birth": "2020"}}, bson.M{"birth2": "2024"})
t.Log(err)
TestBulkUpdateItems

ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)

op := NewCompCollectionOp(c, dbName, collectionName)

err := op.BulkUpdateItems(ctx, []*BulkUpdateItem{
	{Selector: bson.M{"name": "tom"}, Update: bson.M{"$set": bson.M{"birth": "1"}}},
	{Selector: bson.M{"name": "tom1"}, Update: bson.M{"$set": bson.M{"birth2": "2"}}},
})
t.Log(err)
TestBulkUpsertItems

ctx := context.Background()
InitClients()
c, _ := MgoClient(dbKey)

op := NewCompCollectionOp(c, dbName, collectionName)

err := op.BulkUpsertItem(ctx, []*BulkUpsertItem{
	{Selector: bson.M{"name": "tim"}, Replacement: bson.M{"name": "tim", "birth": "3"}},
	{Selector: bson.M{"name": "tim1"}, Replacement: bson.M{"name": "tim1", "birth2": "4"}},
})
t.Log(err)
redis go-redis
list_test.go Redis List工具库
TestList

InitRedises()
rds := Get("rdscdb")
ctx := context.Background()

err := ListPush(ctx, rds, "test_list", "stringvalue")
t.Log(err)
ListPop(rds, []string{"test_list"}, 3600, 1000, func(key, data string) {
	fmt.Println(key, data)
})

err = ListPush(ctx, rds, "test_list", "stringvalue")
t.Log(err)
time.Sleep(time.Second * 20)
lock_test.go Redis 锁工具库
TestRdsAllowActionWithCD

InitRedises()
rds := Get("rdscdb")
ctx := context.Background()

cd, ok := RdsAllowActionWithCD(ctx, rds, "test:action", 2)
t.Log(cd, ok)
cd, ok = RdsAllowActionWithCD(ctx, rds, "test:action", 2)
t.Log(cd, ok)
time.Sleep(time.Second * 3)

cd, ok = RdsAllowActionWithCD(ctx, rds, "test:action", 2)
t.Log(cd, ok)
TestRdsAllowActionByMTs

InitRedises()
rds := Get("rdscdb")
ctx := context.Background()

cd, ok := RdsAllowActionByMTs(ctx, rds, "test:action", 500, 10)
t.Log(cd, ok)
cd, ok = RdsAllowActionByMTs(ctx, rds, "test:action", 500, 10)
t.Log(cd, ok)
time.Sleep(time.Second)

cd, ok = RdsAllowActionByMTs(ctx, rds, "test:action", 500, 10)
t.Log(cd, ok)
TestRdsLockResWithCD

InitRedises()
rds := Get("rdscdb")
ctx := context.Background()

ok := RdsLockResWithCD(ctx, rds, "test:res", "res-1", 3)
t.Log(ok)
ok = RdsLockResWithCD(ctx, rds, "test:res", "res-2", 3)
t.Log(ok)
time.Sleep(time.Second * 4)

ok = RdsLockResWithCD(ctx, rds, "test:res", "res-2", 3)
t.Log(ok)
mq_test.go Redis PubSub工具库
TestMqPSubscribe

InitRedises()
rds := Get("rdscdb")
ctx := context.Background()

MqPSubscribe(ctx, rds, "testkey:*", func(channel string, data string) {
	fmt.Println(channel, data)
}, 10)

err := MqPublish(ctx, rds, "testkey:1", "id:1")
t.Log(err)
err = MqPublish(ctx, rds, "testkey:2", "id:2")
t.Log(err)
err = MqPublish(ctx, rds, "testkey:3", "id:3")
t.Log(err)

time.Sleep(time.Second * 3)
zset_test.go Redis ZSet工具库
TestZDescartes

InitRedises()
rds := Get("rdscdb")
ctx := context.Background()
dimValues := [][]string{{"dim1a", "dim1b"}, {"dim2a", "dim2b", "dim2c", "dim2d"}, {"dim3a", "dim3b", "dim3c"}}

dt, err := csv.ReadCsvToDataTable(ctx, "data.csv", ',',
	[]string{"id", "name", "createtime", "dim1", "dim2", "dim3", "member"}, "id", []string{})
if err != nil {
	t.Log(err)
	return
}

err = ZDescartes(ctx, rds, dimValues, func(strs []string) (string, map[string]int64) {
	dimData := make(map[string]int64)
	for _, row := range dt.Rows() {
		if row.String("dim1") == strs[0] &&
			row.String("dim2") == strs[1] &&
			row.String("dim3") == strs[2] {
			dimData[row.String("member")] = row.Int64("createtime")
		}
	}
	return "rds" + strings.Join(strs, "-"), dimData
}, 1000, 30)

t.Log(err)

log zap日志库

zap_test.go
TestZap

ctx := &gin.Context{}
ctx.Set("__traceId", "aaabbbbbcccc")
//Info(ctx, "我是日志", "name", "管理员")  //json

Info(ctx, "我是日志2")

//Info(ctx, "我是日志3", "name")  //json

Error(ctx, "我是日志4: %v,%v", "管理员", "eee")
TestZapJson

ctx := &gin.Context{}
ctx.Set("__traceId", "aaabbbbbcccc")
Info(ctx, "我是日志 %v", "name", "管理员") //json

Info(ctx, "我是日志3 %v", "管理员") //json
Error(ctx, "我是日志3")          //json
Log(ctx, zapcore.ErrorLevel, "日志啊")
TestPanicLog

testPanicLog()

Info(context.Background(), "catch panic")
TestLevelChange

traceId := strings.Replace(uuid.New().String(), "-", "", -1)
ctx := context.WithValue(context.Background(), LOG_TRADE_ID, traceId)
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())
Error(ctx, LogLess())

fmt.Println(LogLess(), "============")

Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())
Info(ctx, LogMore())

fmt.Println(LogMore(), "============")

middleware 中间件

limit_conn_test.go 限连接模块
TestLimitConn

router := gin.New()
lr := NewLimitConn(reqHostIp)

router.Use(lr.Incoming(nil, 10, 4))
router.GET("/testurl", func(c *gin.Context) {
	time.Sleep(time.Second)
	fmt.Println("enter")
	c.String(http.StatusOK, "ok!!")
}, lr.Leaving(nil))

safego.Go(func() {
	router.Run(":8081")
})

time.Sleep(time.Second * 3)

for j := 0; j < 5; j++ {
	time.Sleep(time.Second * 1)
	for i := 0; i < 20; i++ {
		safego.Go(func() {
			resp, err := http.Get("http://127.0.0.1:8081/testurl")
			if err != nil {
				fmt.Println(err)
			} else {
				if 200 != resp.StatusCode {
					fmt.Println("点击太快了", resp.StatusCode)
				}
			}

		})
	}
}

//w1 := utils.PerformTestRequest("GET", "/testurl", router)
//if 200 == w1.Code {
//	fmt.Println("okk")
//}
time.Sleep(time.Minute * 20)
limit_req_test.go 限流模块
TestLimitReq

router := gin.New()
lr := NewLimitReq(reqHostIp)

router.Use(lr.Incoming(nil, 10, 4))
router.GET("/testurl", func(c *gin.Context) {
	time.Sleep(time.Second)
	fmt.Println("enter")
	c.String(http.StatusOK, "ok!!")
})

safego.Go(func() {
	router.Run(":8080")
})

time.Sleep(time.Second * 3)

for j := 0; j < 5; j++ {
	time.Sleep(time.Second * 1)
	for i := 0; i < 20; i++ {
		safego.Go(func() {
			resp, err := http.Get("http://127.0.0.1:8080/testurl")
			if err != nil {
				fmt.Println(err)
			} else {
				if 200 != resp.StatusCode {
					fmt.Println("点击太快了", resp.StatusCode)
				}
			}

		})
	}
}

//w1 := utils.PerformTestRequest("GET", "/testurl", router)
//if 200 == w1.Code {
//	fmt.Println("okk")
//}
time.Sleep(time.Minute * 20)
service_handler_test.go service封装器
TestServiceHandler

router := gin.New()
router.POST("/foo", ServiceHandler(serviceFoo, fooReq{}, nil))

router.Run(":8080")

net 网络库

httpx 兼容http1.x和2.0的httpclient
httpclientx_test.go
TestHttpXGet

clientX := getHcx()

for i := 0; i < 3; i++ {
	resp, err := clientX.Get("http://127.0.0.1:8049")
	if err != nil {
		t.Fatal(fmt.Errorf("error making request: %v", err))
	}
	t.Log(resp.StatusCode)
	t.Log(resp.Proto)
}
TestHttpXPost

clientX := getHcx()

for i := 0; i < 3; i++ {
	resp, err := clientX.Get("http://127.0.0.1:8881")
	if err != nil {
		t.Fatal(fmt.Errorf("error making request: %v", err))
	}
	t.Log(resp.StatusCode)
	t.Log(resp.Proto)
}
ip
packet tcp包model
proxy ssh proxy
ssh_client_test.go
TestSshClient

client := getSshClient(t)
defer client.Close()

session, err := client.NewSession()
if err != nil {
	t.Fatalf("Create session failed %v", err)
}
defer session.Close()

// run command and capture stdout/stderr
output, err := session.CombinedOutput("ls -l /data")
if err != nil {
	t.Fatalf("CombinedOutput failed %v", err)
}
t.Log(string(output))
TestMysqlSshClient

client := getSshClient(t)
defer client.Close()

//test时候,打开,会引入mysql包
//mysql.RegisterDialContext("tcp", func(ctx context.Context, addr string) (net.Conn, error) {
//	return client.Dial("tcp", addr)
//})

db, err := sql.Open("", "")
if err != nil {
	t.Fatalf("open db failed %v", err)
}
defer db.Close()

rs, err := db.Query("select  limit 10")
if err != nil {
	t.Fatalf("open db failed %v", err)
}
defer rs.Close()
for rs.Next() {

}
serverx 兼容http1.x和2.0的http server

utils 通用工具库

buffer_invoker 异步调用
buffer_invoker_test.go
TestFuncBuffer

for i := 0; i < 100; i++ {
	item := strconv.Itoa(i)
	safego.Go(func() {
		fb.Invoke("1234", item)
	})
}

fmt.Println("for end1")

time.Sleep(time.Second * 10)

for i := 0; i < 100; i++ {
	item := strconv.Itoa(i)
	safego.Go(func() {
		fb.Invoke("1234", item)
	})
}

fmt.Println("for end2")

time.Sleep(time.Second * 60)
cbk 熔断器
cbk_test.go
TestCbkFailed

InitCbk()

var ok bool
var lastBreaked bool
for j := 0; j < 200; j++ {
	i := j
	//safego.Go(func() {
	err := Impls[SIMPLE].Check("test") //30s 返回一次true尝试
	fmt.Println(i, "Check:", ok)

	if err == nil {
		time.Sleep(time.Millisecond * 10)
		Impls[SIMPLE].Failed("test")

		if i > 105 && lastBreaked {
			Impls[SIMPLE].Succeed("test")
			lastBreaked = false
			fmt.Println(i, "Succeed")
		}
	} else {
		if lastBreaked {
			time.Sleep(time.Second * 10)
		} else {
			lastBreaked = true
		}
	}
	//})
}
csv CSV文件解析为MDB内存表
csv_parse_test.go
TestReadCsvToDataTable

dt, err := ReadCsvToDataTable(context.Background(), `goutils.log`, '\t',
	[]string{"xx", "xx", "xx", "xx"}, "xxx", []string{"xxx"})
if err != nil {
	t.Log(err)
	return
}
for _, r := range dt.Rows() {
	t.Log(r.Data())
}

rs := dt.RowsBy("xxx", "869")
for _, r := range rs {
	t.Log(r.Data())
}

t.Log(dt.Row("17"))
distlock 分布式锁
consullock_test.go
TestAquireConsulLock

l, _ := NewConsulLock("accountId", 10)
//l.Lock(15)
//l.Unlock()
ctx := context.Background()
fmt.Println("try lock 1")

fmt.Println(l.Lock(ctx, 5))
//time.Sleep(time.Second * 6)

//fmt.Println("try lock 2")
//fmt.Println(l.Lock(3))

l2, _ := NewConsulLock("accountId", 10)
fmt.Println("try lock 3")
fmt.Println(l2.Lock(ctx, 15))

l3, _ := NewConsulLock("accountId", 10)
fmt.Println("try lock 4")
fmt.Println(l3.Lock(ctx, 15))

time.Sleep(time.Minute)
filelock_test.go
TestFileLock

test_file_path, _ := os.Getwd()
locked_file := test_file_path

wg := sync.WaitGroup{}

for i := 0; i < 10; i++ {
	wg.Add(1)
	go func(num int) {
		flock := NewFileLock(locked_file, false)
		err := flock.Lock()
		if err != nil {
			wg.Done()
			fmt.Println(err.Error())
			return
		}
		fmt.Printf("output : %d\n", num)
		wg.Done()
	}(i)
}
wg.Wait()
time.Sleep(2 * time.Second)

rdslock_test.go
TestRdsLock

redis.InitRedises()
l, _ := NewRdsLuaLock("rdscdb", "accoutId", 4)
l2, _ := NewRdsLuaLock("rdscdb", "accoutId", 4)
//l.Lock(15)
//l.Unlock()
ctx := context.Background()
fmt.Println(l.Lock(ctx, 5))
fmt.Println("1getlock")
fmt.Println(l2.Lock(ctx, 5))
fmt.Println("2getlock")
time.Sleep(time.Second * 15)

//l2, _ := NewRdsLuaLock("accoutId", 15)

//t.Log(l2.Lock(5))
docgen 文档自动生成
cmd
docgen_test.go
TestGenDocTestUser

sb := strings.Builder{}
sb.WriteString(genDocTestUserQuery())
sb.WriteString(genDocTestUserCreate())
sb.WriteString(genDocTestUserUpdate())
sb.WriteString(genDocTestUserDelete())

GenDoc(context.Background(), "用户管理", "doc/testuser.md", 2, sb.String())
fsm 有限状态机
hc httpclient工具
ismtp 邮件工具
ismtp_test.go
TestSendEmail

emailauth := LoginAuth(
	"from",
	"xxxxxx",
	"mailhost.com",
)

ctype := fmt.Sprintf("Content-Type: %s; charset=%s", "text/plain", "utf-8")

msg := fmt.Sprintf("To: %s\r\nCc: %s\r\nFrom: %s\r\nSubject: %s\r\n%s\r\n\r\n%s",
	strings.Join([]string{"target@mailhost.com"}, ";"),
	"",
	"from@mailhost.com",
	"测试",
	ctype,
	"测试")

err := SendMail("mailhost.com:port", //convert port number from int to string
	emailauth,
	"from@mailhost.com",
	[]string{"target@mailhost.com"},
	[]byte(msg),
)

if err != nil {
	t.Log(err)
	return
}

return
safego 安全的go协程
snowflake
snowflake_test.go 雪花ID生成器
TestSnowflake

n, _ := NewNode(1)
t.Log(n.Generate(), ",", n.Generate(), ",", n.Generate())
tags_test.go 结构体TAG生成器
TestAutoGenTags

fmt.Println(AutoGenTags(testUser{}, map[string]TAG_STYLE{
	"json": TAG_STYLE_SNAKE,
	"bson": TAG_STYLE_UNDERLINE,
	"form": TAG_STYLE_ORIG,
}))

ws websocket客户端和服务端库

js
wss_test.go
TestWssRun

InitServer() //server invoke 服务端调用
InitClient() //client invoke 客户端调用
ctx := context.Background()

const (
	C2S_REQ  = 1
	S2C_RESP = 2
)

//server reg handler
RegisterHandler(C2S_REQ, func(ctx context.Context, connection *Connection, message *Message) error {
	log.Info(ctx, "server recv: %v, %v", message.PMsg().ProtocolId, string(message.PMsg().Data))
	packet := GetPoolMessage(S2C_RESP)
	packet.PMsg().Data = []byte("server response")
	connection.SendMsg(ctx, packet, nil)
	return nil
})

//server start
e := gin.New()
e.GET("/join", func(ctx *gin.Context) {
	connMeta := ConnectionMeta{
		UserId:   ctx.DefaultQuery("uid", ""),
		Typed:    0,
		DeviceId: "",
		Version:  0,
		Charset:  0,
	}
	_, err := AcceptGin(ctx, connMeta, ConnectCbOption(&ConnectCb{connMeta.UserId}),
		SrvUpgraderCompressOption(true),
		CompressionLevelOption(1))
	if err != nil {
		log.Error(ctx, "Accept client connection failed. error: %v", err)
		return
	}
})
go e.Run(":8003")

//client reg handler
RegisterHandler(S2C_RESP, func(ctx context.Context, connection *Connection, message *Message) error {
	log.Info(ctx, "client recv: %v, %v", message.PMsg().ProtocolId, string(message.PMsg().Data))
	return nil
})
//client connect
uid := "100"
url := "ws://127.0.0.1:8003/join?uid=" + uid
conn, _ := DialConnect(context.Background(), url, http.Header{},
	ClientIdOption("server1"),
	ClientDialWssOption(url, false),
	ClientDialCompressOption(true),
	CompressionLevelOption(2),
)
log.Info(ctx, "%v", conn)
time.Sleep(time.Second * 5)

packet := GetPoolMessage(C2S_REQ)
packet.PMsg().Data = []byte("client request")
conn.SendMsg(context.Background(), packet, nil)

time.Sleep(time.Minute * 1)
TestBenchmarkWssRun

InitServer() //server invoke 服务端调用
InitClient() //client invoke 客户端调用
const (
	C2S_REQ  = 1
	S2C_RESP = 2
)

var reqBytes = []byte("client request")
var respBytes = []byte("server response")

RegisterDataMsgType(C2S_REQ, &P_MESSAGE{})
RegisterDataMsgType(S2C_RESP, &P_MESSAGE{})
ctx := context.Background()

//server reg handler
RegisterHandler(C2S_REQ, func(ctx context.Context, connection *Connection, message *Message) error {
	//log.Info(ctx, "server recv: %v, %v", message.PMsg().ProtocolId, string(message.PMsg().Data))
	packet := GetPoolMessage(S2C_RESP)
	dataMsg := packet.DataMsg().(*P_MESSAGE)
	dataMsg.Data = respBytes
	connection.SendMsg(ctx, packet, nil)
	return nil
})

//server start
e := gin.New()
e.GET("/join", func(ctx *gin.Context) {
	connMeta := ConnectionMeta{
		UserId:   ctx.DefaultQuery("uid", ""),
		Typed:    0,
		DeviceId: "",
		Version:  0,
		Charset:  0,
	}
	_, err := AcceptGin(ctx, connMeta, ConnectCbOption(&ConnectCb{connMeta.UserId}),
		SrvUpgraderCompressOption(true),
		CompressionLevelOption(2))
	if err != nil {
		log.Error(ctx, "Accept client connection failed. error: %v", err)
		return
	}
})
go e.Run(":8003")

//client reg handler
RegisterHandler(S2C_RESP, func(ctx context.Context, connection *Connection, message *Message) error {
	//log.Info(ctx, "client recv: %v, %v", message.PMsg().ProtocolId, string(message.PMsg().Data))
	return nil
})
//client connect
url := "ws://127.0.0.1:8003/join?uid=" + "100"
conn, _ := DialConnect(context.Background(), url, http.Header{},
	ClientIdOption("server1"),
	ClientDialWssOption(url, false),
	ClientDialCompressOption(true),
	CompressionLevelOption(2),
)

log.Info(ctx, "%v", conn)
time.Sleep(time.Second * 5)

for i := 0; i < 100000; i++ {
	packet := GetPoolMessage(C2S_REQ)
	dataMsg := packet.DataMsg().(*P_MESSAGE)
	dataMsg.Data = reqBytes
	conn.SendMsg(context.Background(), packet, nil)
	time.Sleep(time.Millisecond * 50)
}

time.Sleep(time.Minute * 3)

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis
db
net
ip
cbk
csv
fsm
hc
ismtp
Package smtp implements the Simple Mail Transfer Protocol as defined in RFC 5321.
Package smtp implements the Simple Mail Transfer Protocol as defined in RFC 5321.
snowflake
Package snowflake provides a very simple Twitter snowflake generator and parser.
Package snowflake provides a very simple Twitter snowflake generator and parser.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL