Nervous说明文档
一、总线SDK使用说明
1. 安装
- 将src/nervous目录拷贝到GOPATH的src目录下
- 将nervous/config.json拷贝至某个目录下。config.json中KAFKA_BROKERS为broker地址,其他均为kafka中配置,详情见kafka文档,建议直接保持默认
二、总线 基础功能
1. 示例
n,_:=NewNervous(context.Background(),"./config.json","test917")
wg:=sync.WaitGroup{}
wg.Add(1)
n.Subscribe("test917t")
go func(){
msg,_:=n.Receive("test917t")
fmt.Println(msg.Topic)
fmt.Println(msg.Key)
fmt.Println(msg.Value)
wg.Done()
}()
n.Send("test917t","keyf","fff")
wg.Wait()
n.Close()
2. 接口说明
1. 初始化
/*
初始化总线
@param 上下文,配置文件目录,全局id
@return 总线指针,错误
*/
func NewNervous(ctx context.Context, configAddress string,guid string)(*Nervous,error)
2. 向topic中写入数据
/*
发送消息
@param 发送消息的topic,消息的key(可以为""),消息的json字符串
@return 错误
*/
func (n *Nervous)Send(topic string, key string, value string) error
3. 订阅topic
/*
订阅topic
@param 要订阅的topic
@return 错误
*/
func (n *Nervous)Subscribe(topic string) error
/*
取消订阅topic
@param 要取消订阅的topic
@return 错误
*/
func (n *Nervous)Unsubscribe(topic string) error
4. 从topic中拉取数据
/*
从某个topic获取一条消息
@param 要获取消息的topic
@return 获得的消息,错误
*/
func (n *Nervous)Receive(topic string) (NervousMessage,error)
5. 释放资源
/*
关闭Nervous
@param
@return 错误
*/
func (n *Nervous)Close() error
三、总线 RPC 功能
1. 示例
c,_:=NewNervous(context.Background(),"./config.json","testrpcclient")
c.Run()
s,_:=NewNervous(context.Background(),"./config.json","testrpcserver")
s.Run()
testAdd := func(toAdd ...interface{})(interface{},error){
fmt.Println("in testAdd")
return toAdd[0].(float64)+1,nil
}
s.RPCRegister("testAdd",testAdd)
if result,err:=c.RPCCall(5,500,"testrpcserver","testAdd",2);err!=nil{
fmt.Printf("rpc call error:%v\n",err)
}else{
fmt.Printf("result: %d\n",int(result.(float64)))
}
if result,err:=c.RPCCall(5,500,"testrpcserver","funcList");err!=nil{
fmt.Printf("rpc call error:%v\n",err)
}else{
fmt.Println("rcp",result)
var tmpR []interface{}
tmpR = result.([]interface{})
for i:=0;i<len(tmpR);i++{
fmt.Println(tmpR[i].(string))
}
}
fmt.Println("close rpc client and server")
c.Close()
s.Close()
2. 接口说明
- 1. 初始化
/*
初始化rpc server 和rpc client,server会开始监听rpc请求,并注册基础rpc函数;client则开始监听rpc请求的返回值
@param
@return 错误
*/
func (n *Nervous)Run()error
- 2. 注册方法
/*
向rcp server注册函数
@param 注册的函数名,注册的函数
@return 错误
*/
func (n *Nervous)RPCRegister(registerName string, rpcProcess func(args ...interface{})(interface{},error))error
- 3. 查询方法
/*
查看当前rpc server上所有注册了的函数
@param
@return 函数列表,错误
*/
func (n *Nervous)RPCList()([]string,error)
/*
查看当前rpc server上是否注册了某个函数
@param 查询的函数名
@return 是否存在,错误
*/
func (n *Nervous)RPCContains(funcName string)(bool,error)
- 4. 同步请求
/*
发起rpc请求
@param 目标rpc server的guid,调用的函数名,传入的参数,默认重试5次,每次间隔500ms
@return 返回值,错误
*/
func (n *nervous) RPCCall(targetGuid string, funcName string, params ...interface{}) (interface{}, error)
/*
发起rpc请求
@param 目标rpc server的guid,重试的次数,每次重试等待的毫秒数,调用的函数名,传入的参数
@return 返回值,错误
*/
func (n *nervous) RPCCall(targetGuid string, tryTime int, tryInterval int, funcName string, params ...interface{}) (interface{}, error)