go-timeline
a timeline(消息同步模型) service write by golang, thinks aliyun tablestore !!!
depends:
- mongo: >= 5.0
- redis: >= 6.2
framework:
architecture
data
mongodb主要有2个文档:
chat_sync
: 离线消息库(同步库),扩散写。一个消息写N份,当客户端成功拉取消息后,应删除离线消息,释放存储(目前为了调试方便,不会删除)
chat_history
: 持久化存储库(一般使用mysql,这里为了简单直接使用mongo实现),扩散读,一个消息只写一份。有2个作用:
- 消息漫游。参考QQ消息漫游功能,可以任意查看几个月前,甚至一年前或者任意时间的历史消息。
- web支持。因为web没有存储能力(无法缓存timeline同步位点),所以可以不用从 chat_sync 走同步流程,直接通过 chat_history 拉历史消息显示即可。
文档结构:
type Message struct {
Id string `bson:"id,omitempty"` // id,非mongo的对象id
Seq int64 `bson:"seq,omitempty"` // 连续递增序号
Message map[string]interface{} `bson:"message,omitempty"` // 数据内容
}
- 离线消息库中:id代表收件人的id,具体的发送者信息、消息内容等需要自己解析Message,timeline服务并不限制存储的结构。
- 持久存储库:id代表会话(会话关系timeline不存储,需要上游服务自行实现),私聊的会话的id为:
samllUserId:bigUserId
,群聊会话的ID就是 groupId
。故查询的时候,直接按照该规则查询即可。
消息序号生成
timeline 序号生成直接采用 redis 实现,保证同一个id下,seq严格递增即可(且连续)。
example
场景
假设有如下聊天场景:
a -> b: 吃了吗?
b -> a: 吃了
// group_a has member [a, b, c, d]
a -> group_a: 大家好
c -> group_a: 报三围
a -> group_a: 初次见面,多多指教
- a给b发送私聊消息
- b回复a
- a在群中发送一条消息
- c在群中发送一条消息
- a在群中发送一条消息
下面介绍主要接口实现。
发消息
message_test.go: TestMessageUseCase_Send:
var (
user1, user2, user3, user4 = "a", "b", "c", "d"
group = "group_a"
)
assert.NoError(t, send(mc, "a", "b", "吃了吗?"))
assert.NoError(t, send(mc, "b", "a", "吃了"))
assert.NoError(t, sendGroup(mc, "a", "group_a", []string{"a", "c", "d"}, "大家好"))
assert.NoError(t, sendGroup(mc, "c", "group_a", []string{"a", "c", "d"}, "报三围"))
assert.NoError(t, sendGroup(mc, "a", "group_a", []string{"a", "c", "d"}, "初次见面,多多指教"))
同步消息(扩散写,存N份)
message_test.go: TestMessageUseCase_GetSyncMessage:
lastRead = 0
msgResult, err := mc.GetSyncMessage(context.Background(), user1, lastRead, math.MaxInt64)
message_test.go:132: [seq=20] a -> b: 吃了吗?
message_test.go:132: [seq=21] b -> a: 吃了
message_test.go:132: [seq=22] a -> group_a: 大家好
message_test.go:132: [seq=23] c -> group_a: 报三围
message_test.go:132: [seq=24] a -> group_a: 初次见面,多多指教
message_test.go:132: [seq=16] a -> b: 吃了吗?
message_test.go:132: [seq=17] b -> a: 吃了
message_test.go:132: [seq=13] a -> group_a: 大家好
message_test.go:132: [seq=14] c -> group_a: 报三围
message_test.go:132: [seq=15] a -> group_a: 初次见面,多多指教
message_test.go:132: [seq=13] a -> group_a: 大家好
message_test.go:132: [seq=14] c -> group_a: 报三围
message_test.go:132: [seq=15] a -> group_a: 初次见面,多多指教
查询历史消息(扩散读,只存一份)
lastRead = 0
msgResult, err := mc.GetSingleHistoryMessage(context.Background(), "a", "b", lastRead, 10)
message_test.go:132: [seq=13] a -> b: 吃了吗?
message_test.go:132: [seq=14] b -> a: 吃了
lastRead = 0
msgResult, err := mc.GetGroupHistoryMessage(context.Background(), "group_a", lastRead, 10)
message_test.go:132: [seq=22] a -> group_a: 大家好
message_test.go:132: [seq=23] c -> group_a: 报三围
message_test.go:132: [seq=24] a -> group_a: 初次见面,多多指教
screenhost
mongodb
client
see html client:
Usage
grpc
- go mod
go get github.com/gomicroim/go-timeline/api@latest
- grpc client example by use etcd
package main
import (
"context"
"encoding/json"
"flag"
"github.com/go-kratos/kratos/contrib/registry/etcd/v2"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport/grpc"
v1 "github.com/gomicroim/go-timeline/api/v1"
clientv3 "go.etcd.io/etcd/client/v3"
"log"
)
var (
endpoints = flag.String("etcd", "127.0.0.1:2379", "-etcd=127.0.0.1:2379")
)
func main() {
flag.Parse()
// init etcd conn
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: []string{*endpoints},
})
if err != nil {
panic(err)
}
dis := etcd.New(etcdClient)
log.Println("connect etcd", "etcd", *endpoints)
// new timeline client
client, err := NewTimelineClient("go-timeline", dis)
if err != nil {
panic(err)
}
log.Println("send msg")
msgData := map[string]interface{}{"text:": "hello"}
buffer, _ := json.Marshal(msgData)
// call grpc `send`
res, err := client.Send(context.Background(), &v1.SendMessageRequest{
From: "user_a",
To: "user_z",
Message: string(buffer),
})
if err != nil {
panic(err)
}
log.Println("send success,result:", res)
}
func NewTimelineClient(serviceName string, discovery registry.Discovery) (v1.TimelineClient, error) {
rpcUserEndpoint := "discovery:///" + serviceName
conn, err := grpc.DialInsecure(context.Background(),
grpc.WithEndpoint(rpcUserEndpoint),
grpc.WithDiscovery(discovery),
)
if err != nil {
return nil, err
}
return v1.NewTimelineClient(conn), nil
}
- discovery
http
/timeline/send
request:
curl -X POST http://localhost:8000/timeline/send \
-H 'Content-Type:application/json' \
-d '{"from":"user_a","to":"user_b","message":"{\"from\": \"user_a\",\"to\":\"user_b\",\"text\":\"在吗?\"}"}'
{
"from":"user_a",
"to":"user_b",
"message":"{\"from\": \"user_a\",\"to\":\"user_b\",\"text\":\"在吗?\"}"
}
response:
{"sequence":"3"}
/timeline/sendGroup
request:
curl -X POST http://localhost:8000/timeline/sendGroup \
-H 'Content-Type:application/json' \
-d '{"group_name":"test_group","group_members":["user_a","user_b","user_c","user_d"],"message":"{\"from\":\"user_a\",\"to\":\"test_group\",\"is_group\":true,\"text\":\"大家好\"}"}'
{
"group_name": "test_group",
"group_members": ["user_a", "user_b", "user_c", "user_d"],
"message": "{\"from\":\"user_a\",\"to\":\"test_group\",\"is_group\":true,\"text\":\"大家好\"}"
}
response:
{"failedMembers":[]}
/timeline/sync
request:
curl -X GET http://localhost:8000/timeline/sync?member=user_a&last_read=0&count=10
response:
{
"entrySet": [{
"sequence": "3",
"message": "{\"from\":\"user_a\",\"text\":\"在吗?\",\"to\":\"user_b\"}"
}, {
"sequence": "4",
"message": "{\"from\":\"user_a\",\"is_group\":true,\"text\":\"大家好\",\"to\":\"test_group\"}"
}]
}
/timeline/history/single/{from}/{to}
request:
curl -X GET http://localhost:8000/timeline/history/single/user_a/user_b
response:
{
"entrySet": [{
"sequence": "7",
"message": "{\"from\":\"user_a\",\"text\":\"在吗?\",\"to\":\"user_b\"}"
}]
}
/timeline/history/group/{group}
request:
curl -X GET http://localhost:8000/timeline/history/group/test_group
response:
{
"entrySet": [{
"sequence": "1",
"message": "{\"from\":\"user_a\",\"is_group\":true,\"text\":\"大家好\",\"to\":\"test_group\"}"
}]
}