Discover Packages
git.oschina.net/cloudzone/smartgo
stgclient
package
Version:
v0.0.0-...-130f5e9
Opens a new window with list of versions in this module.
Published: Apr 25, 2018
License: Apache-2.0
Opens a new window with license information.
Imports: 7
Opens a new window with list of imports.
Imported by: 3
Opens a new window with list of known importers.
README
README
¶
stgclient
Read the docs
创建topic
1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
2、创建发送实例process.NewDefaultMQProducer("producerGroupId")
3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
4、建立链接调用Start()
方法
5、调用创建topic的方法CreateTopic(stgcommon.DEFAULT_TOPIC, "topicName", 8)
stgcommon.DEFAULT_TOPIC
为密钥需先import "git.oschina.net/cloudzone/smartgo/stgcommon"
topicName
为创建topic的名称
8
为读写的队列数
发送同步消息
请求
1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
2、创建发送实例process.NewDefaultMQProducer("producerGroupId")
3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
4、建立链接调用Start()
方法
5、调用实例的Send方法Send(message.NewMessage("topicName", "tagName", []byte("msgbody")))
先import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
创建Message的参数topicName 为创建topic的名称,tagName为标签名称,msgBody为消息内容。
响应
Send方法返回值为(*SendResult, error)
SendResult中SendStatus值说明
SEND_OK
发送成功并同步到SLAVE成功
FLUSH_DISK_TIMEOUT
刷盘超时
FLUSH_SLAVE_TIMEOUT
同步到SLAVE超时
SLAVE_NOT_AVAILABLE SLAVE
不可用
发送异步消息
请求
1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
2、创建发送实例process.NewDefaultMQProducer("producerGroupId")
3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
4、建立链接调用Start()
方法
5、调用实例的SendCallBack方法SendCallBack(message.NewMessage("topicName", "tagName", []byte("msgbody")), func(sendResult *process.SendResult, err error) {})
先import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
创建Message的参数topicName 为创建topic的名称,tagName为标签名称,msgBody为消息内容。
响应
SendCallBack回调函数func(sendResult *process.SendResult, err error)
SendResult中SendStatus值说明
SEND_OK
发送成功并同步到SLAVE成功
FLUSH_DISK_TIMEOUT
刷盘超时
FLUSH_SLAVE_TIMEOUT
同步到SLAVE超时
SLAVE_NOT_AVAILABLE SLAVE
不可用
发送OneWay消息
请求
1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
2、创建发送实例process.NewDefaultMQProducer("producerGroupId")
3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
4、建立链接调用Start()
方法
5、调用实例的SendOneWay方法SendOneWay(message.NewMessage("topicName", "tagName", []byte("msgbody")))
先import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
创建Message的参数topicName 为创建topic的名称,tagName为标签名称,msgBody为消息内容。
响应
Push消费
1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
2、创建消费实例process.NewDefaultMQPushConsumer("consumerGroupId")
3、设置实例消费位置SetConsumeFromWhere(heartbeat.CONSUME_FROM_LAST_OFFSET)
先 import "git.oschina.net/cloudzone/smartgo/stgcommon/protocol/heartbeat"
CONSUME_FROM_LAST_OFFSET
一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费。
CONSUME_FROM_FIRST_OFFSET
一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费。
CONSUME_FROM_TIMESTAMP
一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,时间点设置参见DefaultMQPushConsumer.ConsumeTimestamp
参数。
4、设置消费模式SetMessageModel(heartbeat.CLUSTERING)
CLUSTERING
集群消费。
BROADCASTING
广播消费。
5、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
6、设置订阅topic和tagSubscribe("topicName", "tagName")
6、设置监听器RegisterMessageListener(&MessageListenerImpl{})
普通消息需MessageListenerImpl
实现MessageListenerConcurrently
的接口
顺序消息需MessageListenerImpl
实现MessageListenerOrderly
的接口
7、建立链接调用Start()
方法
Pull消费
1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
2、创建消费实例process.NewDefaultMQPullConsumer("consumerGroupId")
3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
4、建立链接调用Start()
方法
5、调用实例的FetchSubscribeMessageQueues("topicName")
方法,拿到topic的所有队列
6、调用实例的Pull(mq, "tagA", 0, 32)
方法
mq
为队列结构体
tagA
为tag的标签名称
0
为该队列offset,需自行维护
32
为一次拉取数量。
Expand ▾
Collapse ▴
Documentation
¶
View Source
const (
VIRTUAL_APPGROUP_PREFIX = "%%PROJECT_%s%%"
)
type ClientConfig struct {
NamesrvAddr string
InstanceName string
ClientIP string
ClientCallbackExecutorThreads int
PollNameServerInterval int
HeartbeatBrokerInterval int
PersistConsumerOffsetInterval int
}
发送状态枚举
Author: yintongqiang
Since: 2017/8/8
type MQAdmin interface {
CreateTopic(key, newTopic string , queueNum int ) error
}
MQAdmin MQ管理
注意:
(1)MQAdmin接口不能引入"git.oschina.net/cloudzone/smartgo/stgcommon/message"包,否则产生循环引用
(2)把属于MQAdmin接口的部分方法移入MQAdminExtInner接口,用来解决包循环引用的问题
(3)移动的方法包括 searchOffset()、maxOffset()、minOffset()、earliestMsgStoreTime()、viewMessage()、queryMessage()
Author: tianyuliang
Since: 2017/11/2
Source Files
¶
Directories
¶
Click to show internal directories.
Click to hide internal directories.