stgclient

package
v0.0.0-...-ba2213e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2019 License: Apache-2.0 Imports: 7 Imported by: 0

README

stgclient

Read the docs

创建topic

  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
  • 2、创建发送实例process.NewDefaultMQProducer("producerGroupId")
  • 3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
  • 4、建立链接调用Start()方法
  • 5、调用创建topic的方法CreateTopic(stgcommon.DEFAULT_TOPIC, "topicName", 8)
    • stgcommon.DEFAULT_TOPIC 为密钥需先import "git.oschina.net/cloudzone/smartgo/stgcommon"
    • topicName 为创建topic的名称
    • 8为读写的队列数

发送同步消息

  • 请求
  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"

  • 2、创建发送实例process.NewDefaultMQProducer("producerGroupId")

  • 3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)

  • 4、建立链接调用Start()方法

  • 5、调用实例的Send方法Send(message.NewMessage("topicName", "tagName", []byte("msgbody")))

    • import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
    • 创建Message的参数topicName 为创建topic的名称,tagName为标签名称,msgBody为消息内容。
  • 响应
    • Send方法返回值为(*SendResult, error)
    • SendResult中SendStatus值说明
      • SEND_OK 发送成功并同步到SLAVE成功
      • FLUSH_DISK_TIMEOUT 刷盘超时
      • FLUSH_SLAVE_TIMEOUT 同步到SLAVE超时
      • SLAVE_NOT_AVAILABLE SLAVE不可用

发送异步消息

  • 请求
  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"

  • 2、创建发送实例process.NewDefaultMQProducer("producerGroupId")

  • 3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)

  • 4、建立链接调用Start()方法

  • 5、调用实例的SendCallBack方法SendCallBack(message.NewMessage("topicName", "tagName", []byte("msgbody")), func(sendResult *process.SendResult, err error) {})

    • import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
    • 创建Message的参数topicName 为创建topic的名称,tagName为标签名称,msgBody为消息内容。
  • 响应
    • SendCallBack回调函数func(sendResult *process.SendResult, err error)
    • SendResult中SendStatus值说明
      • SEND_OK 发送成功并同步到SLAVE成功
      • FLUSH_DISK_TIMEOUT 刷盘超时
      • FLUSH_SLAVE_TIMEOUT 同步到SLAVE超时
      • SLAVE_NOT_AVAILABLE SLAVE不可用

发送OneWay消息

  • 请求
  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"

  • 2、创建发送实例process.NewDefaultMQProducer("producerGroupId")

  • 3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)

  • 4、建立链接调用Start()方法

  • 5、调用实例的SendOneWay方法SendOneWay(message.NewMessage("topicName", "tagName", []byte("msgbody")))

    • import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
    • 创建Message的参数topicName 为创建topic的名称,tagName为标签名称,msgBody为消息内容。
  • 响应
    • SendOneWay有错误返回error

Push消费

  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
  • 2、创建消费实例process.NewDefaultMQPushConsumer("consumerGroupId")
  • 3、设置实例消费位置SetConsumeFromWhere(heartbeat.CONSUME_FROM_LAST_OFFSET)
    • import "git.oschina.net/cloudzone/smartgo/stgcommon/protocol/heartbeat"
    • CONSUME_FROM_LAST_OFFSET 一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费。
    • CONSUME_FROM_FIRST_OFFSET 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费。
    • CONSUME_FROM_TIMESTAMP 一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,时间点设置参见DefaultMQPushConsumer.ConsumeTimestamp参数。
  • 4、设置消费模式SetMessageModel(heartbeat.CLUSTERING)
    • CLUSTERING 集群消费。
    • BROADCASTING 广播消费。
  • 5、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
  • 6、设置订阅topic和tagSubscribe("topicName", "tagName")
  • 6、设置监听器RegisterMessageListener(&MessageListenerImpl{})
    • 普通消息需MessageListenerImpl实现MessageListenerConcurrently的接口
    • 顺序消息需MessageListenerImpl实现MessageListenerOrderly的接口
  • 7、建立链接调用Start()方法

Pull消费

  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
  • 2、创建消费实例process.NewDefaultMQPullConsumer("consumerGroupId")
  • 3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
  • 4、建立链接调用Start()方法
  • 5、调用实例的FetchSubscribeMessageQueues("topicName")方法,拿到topic的所有队列
  • 6、调用实例的Pull(mq, "tagA", 0, 32)方法
    • mq为队列结构体
    • tagA为tag的标签名称
    • 0为该队列offset,需自行维护
    • 32为一次拉取数量。

Documentation

Index

Constants

View Source
const (
	VIRTUAL_APPGROUP_PREFIX = "%%PROJECT_%s%%"
)

Variables

This section is empty.

Functions

func BuildWithProjectGroup

func BuildWithProjectGroup(origin string, projectGroup string) string

func ClearProjectGroup

func ClearProjectGroup(origin string, projectGroup string) string

func GetLocalAddress

func GetLocalAddress() string

Types

type ClientConfig

type ClientConfig struct {
	NamesrvAddr                   string
	InstanceName                  string
	ClientIP                      string
	ClientCallbackExecutorThreads int
	PollNameServerInterval        int
	HeartbeatBrokerInterval       int
	PersistConsumerOffsetInterval int
}

发送状态枚举 Author: yintongqiang Since: 2017/8/8

func NewClientConfig

func NewClientConfig(namesrvAddr string) *ClientConfig

func (*ClientConfig) BuildMQClientId

func (conf *ClientConfig) BuildMQClientId() string

func (*ClientConfig) ChangeInstanceNameToPID

func (client *ClientConfig) ChangeInstanceNameToPID()

func (*ClientConfig) CloneClientConfig

func (client *ClientConfig) CloneClientConfig() *ClientConfig

func (*ClientConfig) ResetClientConfig

func (client *ClientConfig) ResetClientConfig(cc *ClientConfig)

type MQAdmin

type MQAdmin interface {

	// 创建Topic
	// key 消息队列已存在的topic
	// newTopic 需新建的topic
	// queueNum 读写队列的数量
	CreateTopic(key, newTopic string, queueNum int) error
}

MQAdmin MQ管理

注意: (1)MQAdmin接口不能引入"git.oschina.net/cloudzone/smartgo/stgcommon/message"包,否则产生循环引用 (2)把属于MQAdmin接口的部分方法移入MQAdminExtInner接口,用来解决包循环引用的问题 (3)移动的方法包括 searchOffset()、maxOffset()、minOffset()、earliestMsgStoreTime()、viewMessage()、queryMessage()

Author: tianyuliang Since: 2017/11/2

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL