rocketmq

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2020 License: MIT Imports: 18 Imported by: 0

README

Introduction

A RocketMQ client for golang supportting producer and consumer.

Import package

import "github.com/sevenNt/rocketmq"

Getting started

Getting message with consumer
group := "dev-VodHotClacSrcData"
topic := "canal_vod_collect__video_collected_count_live"
var timeSleep = 30 * time.Second
conf := &rocketmq.Config{
    Nameserver:   "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876",
    ClientIp:     "192.168.1.23",
    InstanceName: "DEFAULT",
}

consumer, err := rocketmq.NewDefaultConsumer(consumerGroup, consumerConf)
if err != nil {
    return err
}
consumer.Subscribe(consumerTopic, "*")
consumer.RegisterMessageListener(
    func(msgs []*MessageExt) error {
        for i, msg := range msgs {
            fmt.Println("msg", i, msg.Topic, msg.Flag, msg.Properties, string(msg.Body))
        }
        fmt.Println("Consume success!")
        return nil
    })
consumer.Start()

time.Sleep(timeSleep)
Sending message with producer
  • Synchronous sending
group := "dev-VodHotClacSrcData"
topic := "canal_vod_collect__video_collected_count_live"
conf := &rocketmq.Config{
    Nameserver:   "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876",
    ClientIp:     "192.168.1.23",
    InstanceName: "DEFAULT",
}

producer, err := rocketmq.NewDefaultProducer(group, conf)
producer.Start()
if err != nil {
    return errors.New("NewDefaultProducer err")
}
msg := NewMessage(topic, []byte("Hello RocketMQ!")
if sendResult, err := producer.Send(msg); err != nil {
    return errors.New("Sync sending fail!")
} else {
    fmt.Println("Sync sending success!, ", sendResult)
}
  • Asynchronous sending
group := "dev-VodHotClacSrcData"
topic := "canal_vod_collect__video_collected_count_live"
conf := &rocketmq.Config{
    Nameserver:   "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876",
    ClientIp:     "192.168.1.23",
    InstanceName: "DEFAULT",
}
producer, err := rocketmq.NewDefaultProducer(group, conf)
producer.Start()
if err != nil {
    return err
}
msg := NewMessage(topic, []byte("Hello RocketMQ!")
sendCallback := func() error {
    fmt.Println("I am callback")
    return nil
}
if err := producer.SendAsync(msg, sendCallback); err != nil {
    return err
} else {
    fmt.Println("Async sending success!")
}
  • Oneway sending
group := "dev-VodHotClacSrcData"
topic := "canal_vod_collect__video_collected_count_live"
conf := &rocketmq.Config{
    Nameserver:   "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876",
    ClientIp:     "192.168.1.23",
    InstanceName: "DEFAULT",
}
producer, err := rocketmq.NewDefaultProducer(group, conf)
producer.Start()
if err != nil {
    return err
}
msg := NewMessage(topic, []byte("Hello RocketMQ!")
if err := producer.SendOneway(msg); err != nil {
    return err
} else {
    fmt.Println("Oneway sending success!")
}

Documentation

Index

Constants

View Source
const (
	BrokerSuspendMaxTimeMillis       = 1000 * 15
	FlagCommitOffset           int32 = 0x1 << 0
	FlagSuspend                int32 = 0x1 << 1
	FlagSubscription           int32 = 0x1 << 2
	FlagClassFilter            int32 = 0x1 << 3
)
View Source
const (
	CompressedFlag          = (0x1 << 0)
	MultiTagsFlag           = (0x1 << 1)
	TransactionNotType      = (0x0 << 2)
	TransactionPreparedType = (0x1 << 2)
	TransactionCommitType   = (0x2 << 2)
	TransactionRollbackType = (0x3 << 2)
)
View Source
const (
	NameValueSeparator = 1 + iota
	PropertySeparator
)
View Source
const (
	RocketmqHomeEnv          = "ROCKETMQ_HOME"
	RocketmqHomeProperty     = "rocketmq.home.dir"
	NamesrvAddrEnv           = "NAMESRV_ADDR"
	NamesrvAddrProperty      = "rocketmq.namesrv.addr"
	MessageCompressLevel     = "rocketmq.message.compressLevel"
	WsDomainName             = "192.168.7.101"
	WsDomainSubgroup         = ""
	WsAddr                   = "http://" + WsDomainName + ":8080/rocketmq/" + WsDomainSubgroup
	DefaultTopic             = "TBW102"
	BenchmarkTopic           = "BenchmarkTest"
	DefaultProducerGroup     = "DEFAULT_PRODUCER"
	DefaultConsumerGroup     = "DEFAULT_CONSUMER"
	ToolsConsumerGroup       = "TOOLS_CONSUMER"
	FiltersrvConsumerGroup   = "FILTERSRV_CONSUMER"
	MonitrorConsumerGroup    = "__MONITOR_CONSUMER"
	ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER"
	SelfTestProducerGroup    = "SELF_TEST_P_GROUP"
	SelfTestConsumerGroup    = "SELF_TEST_C_GROUP"
	SelfTestTopic            = "SELF_TEST_TOPIC"
	OffsetMovedEvent         = "OFFSET_MOVED_EVENT"
	OnsHttpProxyGroup        = "CID_ONS-HTTP-PROXY"
	CidOnsapiPermissionGroup = "CID_ONSAPI_PERMISSION"
	CidOnsapiOwnerGroup      = "CID_ONSAPI_OWNER"
	CidOnsapiPullGroup       = "CID_ONSAPI_PULL"
	CidRmqSysPerfix          = "CID_RMQ_SYS_"

	Localhost      = "127.0.0.1"
	DefaultCharset = "UTF-8"
	MasterId       = 0

	RetryGroupTopicPrefix = "%RETRY%"
	DlqGroupTopicPerfix   = "%DLQ%"
	SysTopicPerfix        = "rmq_sys_"
	UniqMsgQueryFlag      = "_UNIQUE_KEY_QUERY"
)
View Source
const (
	Sync = iota
	Async
	Oneway
)

communicationMode

View Source
const (
	CreateJust = iota
	Running
	ShutdownAlready
	StartFailed
)

ServiceState

View Source
const (
	RpcType   = 0
	RpcOneway = 1
)
View Source
const (
	// Broker 发送消息
	SendMsg = 10
	// Broker 订阅消息
	PullMsg = 11
	// Broker 查询消息
	QueryMESSAGE = 12
	// Broker 查询Broker Offset
	QueryBrokerOffset = 13
	// Broker 查询Consumer Offset
	QueryConsumerOffset = 14
	// Broker 更新Consumer Offset
	UpdateCconsumerOffset = 15
	// Broker 更新或者增加一个Topic
	UpdateAndCreateTopic = 17
	// Broker 获取所有Topic的配置(Slave和Namesrv都会向Master请求此配置)
	GetAllTopicConfig = 21
	// Broker 获取所有Topic配置(Slave和Namesrv都会向Master请求此配置)
	GetTopicConfigList = 22
	// Broker 获取所有Topic名称列表
	GetTopicNameList = 23
	// Broker 更新Broker上的配置
	UpdateBrokerConfig = 25
	// Broker 获取Broker上的配置
	GetBrokerConfig = 26
	// Broker 触发Broker删除文件
	TriggerDeleteFILES = 27
	// Broker 获取Broker运行时信息
	GetBrokerRuntimeInfo = 28
	// Broker 根据时间查询队列的Offset
	SearchOffsetByTimeStamp = 29
	// Broker 查询队列最大Offset
	GetMaxOffset = 30
	// Broker 查询队列最小Offset
	GetMinOffset = 31
	// Broker 查询队列最早消息对应时间
	GetEarliestMsgStoreTime = 32
	// Broker 根据消息ID来查询消息
	ViewMsgById = 33
	// Broker Client向Client发送心跳,并注册自身
	HeartBeat = 34
	// Broker Client注销
	UnregisterClient = 35
	// Broker Consumer将处理不了的消息发回服务器
	CconsumerSendMsgBack = 36
	// Broker Commit或者Rollback事务
	EndTransaction = 37
	// Broker 获取ConsumerId列表通过GroupName
	GetConsumerListByGroup = 38
	// Broker 主动向Producer回查事务状态
	CheckTransactionState = 39
	// Broker Broker通知Consumer列表变化
	NotifyConsumerIdsChanged = 40
	// Broker Consumer向Master锁定队列
	LockBatchMq = 41
	// Broker Consumer向Master解锁队列
	UNLockBatchMq = 42
	// Broker 获取所有Consumer Offset
	GetAllCconsumerOffset = 43
	// Broker 获取所有定时进度
	GetAllDelayOffset = 45
	// Namesrv 向Namesrv追加KV配置
	PutKVConfig = 100
	// Namesrv 从Namesrv获取KV配置
	GetKVConfig = 101
	// Namesrv 从Namesrv获取KV配置
	DeleteKVConfig = 102
	// Namesrv 注册一个Broker,数据都是持久化的,如果存在则覆盖配置
	RegisterBroker = 103
	// Namesrv 卸载一个Broker,数据都是持久化的
	UnregisterBroker = 104
	// Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列)
	GetRouteinfoByTopic = 105
	// Namesrv 获取注册到Name Server的所有Broker集群信息
	GetBrokerClusterInfo             = 106
	UpdateAndCreateSubscriptionGroup = 200
	GetAllSubscriptionGroupConfig    = 201
	GetTopicStatsInfo                = 202
	GetConsumerConnList              = 203
	GetProducerConnList              = 204
	WipeWritePermOfBroker            = 205

	// 从Name Server获取完整Topic列表
	GetAllTopicListFromNamesrv = 206
	// 从Broker删除订阅组
	DeleteSubscriptionGroup = 207
	// 从Broker获取消费状态(进度)
	GetConsumeStats = 208
	// Suspend Consumer消费过程
	SuspendConsumer = 209
	// Resume Consumer消费过程
	ResumeConsumer = 210
	// 重置Consumer Offset
	ResetCconsumerOffsetInConsumer = 211
	// 重置Consumer Offset
	ResetCconsumerOffsetInBroker = 212
	// 调整Consumer线程池数量
	AdjustCconsumerThreadPoolPOOL = 213
	// 查询消息被哪些消费组消费
	WhoConsumeTHE_MESSAGE = 214

	// 从Broker删除Topic配置
	DeleteTopicInBroker = 215
	// 从Namesrv删除Topic配置
	DeleteTopicInNamesrv = 216
	// Namesrv 通过 project 获取所有的 server ip 信息
	GetKvConfigByValue = 217
	// Namesrv 删除指定 project group 下的所有 server ip 信息
	DeleteKvConfigByValue = 218
	// 通过NameSpace获取所有的KV List
	GetKvlistByNamespace = 219

	// offset 重置
	ResetCconsumerClientOffset = 220
	// 客户端订阅消息
	GetCconsumerStatusFromClient = 221
	// 通知 broker 调用 offset 重置处理
	InvokeBrokerToResetOffset = 222
	// 通知 broker 调用客户端订阅消息处理
	InvokeBrokerToGetCconsumerSTATUS = 223

	// Broker 查询topic被谁消费
	// 2014-03-21 Add By shijia
	QueryTopicConsumeByWho = 300

	// 获取指定集群下的所有 topic
	// 2014-03-26
	GetTopicsByCluster = 224

	// 向Broker注册Filter Server
	// 2014-04-06 Add By shijia
	RegisterFilterServer = 301
	// 向Filter Server注册Class
	// 2014-04-06 Add By shijia
	RegisterMsgFilterClass = 302
	// 根据 topic 和 group 获取消息的时间跨度
	QueryConsumeTimeSpan = 303
	// 获取所有系统内置 Topic 列表
	GetSysTopicListFromNS     = 304
	GetSysTopicListFromBroker = 305

	// 清理失效队列
	CleanExpiredConsumequeue = 306

	// 通过Broker查询Consumer内存数据
	// 2014-07-19 Add By shijia
	GetCconsumerRunningInfo = 307

	// 查找被修正 offset (转发组件)
	QueryCorrectionOffset = 308

	// 通过Broker直接向某个Consumer发送一条消息,并立刻消费,返回结果给broker,再返回给调用方
	// 2014-08-11 Add By shijia
	ConsumeMsgDirectly = 309

	// Broker 发送消息,优化网络数据包
	SendMsgV2 = 310

	// 单元化相关 topic
	GetUnitTopicList = 311
	// 获取含有单元化订阅组的 Topic 列表
	GetHasUnitSubTopicList = 312
	// 获取含有单元化订阅组的非单元化 Topic 列表
	GetHasUnitSubUnunitTopicList = 313
	// 克隆某一个组的消费进度到新的组
	CloneGroupOffset = 314

	// 查看Broker上的各种统计信息
	ViewBrokerStatsData = 315
)
View Source
const (
	// 成功
	Success = 0
	// 发生了未捕获异常
	SysError = 1
	// 由于线程池拥堵,系统繁忙
	SysBusy = 2
	// 请求代码不支持
	RequestCodeNotSupported = 3
	//事务失败,添加db失败
	TransactionFailed = 4
	// Broker 刷盘超时
	FlushDiskTimeout = 10
	// Broker 同步双写,Slave不可用
	SlaveNotAvailable = 11
	// Broker 同步双写,等待Slave应答超时
	FlushSlaveTimeout = 12
	// Broker 消息非法
	MsgIllegal = 13
	// Broker, Namesrv 服务不可用,可能是正在关闭或者权限问题
	ServiceNotAvailable = 14
	// Broker, Namesrv 版本号不支持
	VersionNotSupported = 15
	// Broker, Namesrv 无权限执行此操作,可能是发、收、或者其他操作
	NoPermission = 16
	// Broker, Topic不存在
	TopicNotExist = 17
	// Broker, Topic已经存在,创建Topic
	TopicExistAlready = 18
	// Broker 拉消息未找到(请求的Offset等于最大Offset,最大Offset无对应消息)
	PullNotFound = 19
	// Broker 可能被过滤,或者误通知等
	PullRetryImmediately = 20
	// Broker 拉消息请求的Offset不合法,太小或太大
	PullOffsetMoved = 21
	// Broker 查询消息未找到
	QueryNotFound = 22
	// Broker 订阅关系解析失败
	SubscriptionParseFailed = 23
	// Broker 订阅关系不存在
	SubscriptionNotExist = 24
	// Broker 订阅关系不是最新的
	SubscriptionNotLatest = 25
	// Broker 订阅组不存在
	SubscriptionGroupNotExist = 26
	// Producer 事务应该被提交
	TransactionShouldCommit = 200
	// Producer 事务应该被回滚
	TransactionShouldRollback = 201
	// Producer 事务状态未知
	TransactionStateUnknow = 202
	// Producer ProducerGroup错误
	TransactionStateGroupWrong = 203
	// 单元化消息,需要设置 buyerId
	NoBuyerId = 204
	// 单元化消息,非本单元消息
	NotInCurrentUint = 205
	// Consumer不在线
	ConsumerNotOnline = 206
	// Consumer消费消息超时
	ConsumeMsgTimeout = 207
)
View Source
const (
	NormalMsg = iota
	TransMsgHalf
	TransMsgCommit
	DelayMsg
)
View Source
const (
	SendStatusOK = iota
	SendStatusFlushDiskTimeout
	SendStatusFlushSlaveTimeout
	SendStatusSlaveNotAvailable
)
View Source
const (
	MemoryFirstThenStore = 0
	ReadFromMemory       = 1
	ReadFromStore        = 2
)
View Source
const (
	CharacterMaxLength = 255
)

Variables

View Source
var (
	ConfigVersion int = -1
)
View Source
var DefaultIp = GetLocalIp4()
View Source
var MessageClientIDSetter = messageClientIDSetter{
	// contains filtered or unexported fields
}
View Source
var MessageConst = &messageConst{
	PropertyKeys:                      "KEYS",
	PropertyTags:                      "TAGS",
	PropertyWaitStoreMsgOk:            "WAIT",
	PropertyDelayTimeLevel:            "DELAY",
	PropertyRetryTopic:                "RETRY_TOPIC",
	PropertyRealTopic:                 "REAL_TOPIC",
	PropertyRealQueueId:               "REAL_QID",
	PropertyTransactionPrepared:       "TRAN_MSG",
	PropertyProducerGroup:             "PGROUP",
	PropertyMinOffset:                 "MIN_OFFSET",
	PropertyMaxOffset:                 "MAX_OFFSET",
	PropertyBuyerId:                   "BUYER_ID",
	PropertyOriginMessageId:           "ORIGIN_MESSAGE_ID",
	PropertyTransferFlag:              "TRANSFER_FLAG",
	PropertyCorrectionFlag:            "CORRECTION_FLAG",
	PropertyMq2Flag:                   "MQ2_FLAG",
	PropertyReconsumeTime:             "RECONSUME_TIME",
	PropertyMsgRegion:                 "MSG_REGION",
	PropertyUniqClientMessageIdKeyidx: "UNIQ_KEY",
	PropertyMaxReconsumeTimes:         "MAX_RECONSUME_TIMES",
	PropertyConsumeStartTimeStamp:     "CONSUME_START_TIME",

	KeySeparator: "",
}
View Source
var PermName = permName{
	PermPriority: 0x1 << 3,
	PermRead:     0x1 << 2,
	PermWrite:    0x1 << 1,
	PermInherit:  0x1 << 0,
}

Functions

func BrokerVIPChannel

func BrokerVIPChannel(isChange bool, brokerAddr string) (borkerAddrNew string)

func GetLocalIp4

func GetLocalIp4() (ip string)

Get local IPV4 Address

Types

type Admin

type Admin interface {
	// contains filtered or unexported methods
}

type AllocateMessageQueueAveragely

type AllocateMessageQueueAveragely struct{}

type AllocateMessageQueueStrategy

type AllocateMessageQueueStrategy interface {
	// contains filtered or unexported methods
}

type BrokerData

type BrokerData struct {
	BrokerName      string
	BrokerAddrs     map[string]string
	BrokerAddrsLock sync.RWMutex
}

type Config

type Config struct {
	Namesrv      string
	ClientIp     string
	InstanceName string
}

type Consumer

type Consumer interface {
	Start() error
	Shutdown()
	RegisterMessageListener(listener MessageListener)
	Subscribe(topic string, subExpression string)
	UnSubscribe(topic string)
	SendMessageBack(msg MessageExt, delayLevel int) error
	SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error
	// contains filtered or unexported methods
}

func NewDefaultConsumer

func NewDefaultConsumer(consumerGroup string, conf *Config) (Consumer, error)

type ConsumerData

type ConsumerData struct {
	GroupName           string
	ConsumerType        string
	MessageModel        string
	ConsumeFromWhere    string
	SubscriptionDataSet []*SubscriptionData
	UnitMode            bool
}

type ConsumerIdSorter

type ConsumerIdSorter []string

func (ConsumerIdSorter) Len

func (r ConsumerIdSorter) Len() int

func (ConsumerIdSorter) Less

func (r ConsumerIdSorter) Less(i, j int) bool

func (ConsumerIdSorter) Swap

func (r ConsumerIdSorter) Swap(i, j int)

type DefaultConsumer

type DefaultConsumer struct {
	// contains filtered or unexported fields
}

func (*DefaultConsumer) RegisterMessageListener

func (c *DefaultConsumer) RegisterMessageListener(messageListener MessageListener)

func (*DefaultConsumer) SendMessageBack

func (c *DefaultConsumer) SendMessageBack(msg MessageExt, delayLevel int) error

func (*DefaultConsumer) SendMessageBack1

func (c *DefaultConsumer) SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error

func (*DefaultConsumer) Shutdown

func (c *DefaultConsumer) Shutdown()

func (*DefaultConsumer) Start

func (c *DefaultConsumer) Start() error

func (*DefaultConsumer) Subscribe

func (c *DefaultConsumer) Subscribe(topic string, subExpression string)

func (*DefaultConsumer) UnSubscribe

func (c *DefaultConsumer) UnSubscribe(topic string)

type DefaultProducer

type DefaultProducer struct {
	// contains filtered or unexported fields
}

func (*DefaultProducer) Send

func (d *DefaultProducer) Send(msg *Message) (*SendResult, error)

func (*DefaultProducer) SendAsync

func (d *DefaultProducer) SendAsync(msg *Message, sendCallback SendCallback) (err error)

func (*DefaultProducer) SendOneway

func (d *DefaultProducer) SendOneway(msg *Message) error

func (*DefaultProducer) Shutdown

func (d *DefaultProducer) Shutdown()

func (*DefaultProducer) Start

func (d *DefaultProducer) Start() error

type DefaultRemotingClient

type DefaultRemotingClient struct {
	// contains filtered or unexported fields
}

func (*DefaultRemotingClient) ScanResponseTable

func (d *DefaultRemotingClient) ScanResponseTable()

type GetConsumerListByGroupRequestHeader

type GetConsumerListByGroupRequestHeader struct {
	ConsumerGroup string `json:"consumerGroup"`
}

type GetConsumerListByGroupResponseBody

type GetConsumerListByGroupResponseBody struct {
	ConsumerIdList []string
}

type GetRouteInfoRequestHeader

type GetRouteInfoRequestHeader struct {
	// contains filtered or unexported fields
}

func (*GetRouteInfoRequestHeader) MarshalJSON

func (g *GetRouteInfoRequestHeader) MarshalJSON() ([]byte, error)

type HeartbeatData

type HeartbeatData struct {
	ClientId        string
	ConsumerDataSet []*ConsumerData
}

type InvokeCallback

type InvokeCallback func(responseFuture *ResponseFuture)

type Message

type Message struct {
	Topic      string
	Flag       int32
	Properties map[string]string
	Body       []byte
}

func NewMessage

func NewMessage(topic string, body []byte) *Message

type MessageExt

type MessageExt struct {
	Message
	QueueId       int32
	StoreSize     int32
	QueueOffset   int64
	SysFlag       int32
	BornTimestamp int64
	// bornHost
	StoreTimestamp int64
	// storeHost
	MsgId                     string
	CommitLogOffset           int64
	BodyCRC                   int32
	ReconsumeTimes            int32
	PreparedTransactionOffset int64
}

type MessageListener

type MessageListener func(msgs []*MessageExt) error

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

func NewMessageQueue

func NewMessageQueue(topic string, brokerName string, queueId int32) *MessageQueue

type MessageQueues

type MessageQueues []*MessageQueue

func (MessageQueues) Len

func (m MessageQueues) Len() int

func (MessageQueues) Less

func (m MessageQueues) Less(i, j int) bool

func (MessageQueues) Swap

func (m MessageQueues) Swap(i, j int)

type MixAll

type MixAll struct{}

type MqClient

type MqClient struct {
	// contains filtered or unexported fields
}

func NewMqClient

func NewMqClient() *MqClient

type OffsetStore

type OffsetStore interface {
	// contains filtered or unexported methods
}

type Producer

type Producer interface {
	Start() error
	Shutdown()
	Send(msg *Message) (*SendResult, error)
	SendAsync(msg *Message, sendCallback SendCallback) error
	SendOneway(msg *Message) error
}

func NewDefaultProducer

func NewDefaultProducer(producerGroup string, conf *Config) (Producer, error)

type PullMessageRequestHeader

type PullMessageRequestHeader struct {
	ConsumerGroup        string `json:"consumerGroup"`
	Topic                string `json:"topic"`
	QueueId              int32  `json:"queueId"`
	QueueOffset          int64  `json:"queueOffset"`
	MaxMsgNums           int32  `json:"maxMsgNums"`
	SysFlag              int32  `json:"sysFlag"`
	CommitOffset         int64  `json:"commitOffset"`
	SuspendTimeoutMillis int64  `json:"suspendTimeoutMillis"`
	Subscription         string `json:"subscription"`
	SubVersion           int64  `json:"subVersion"`
}

type PullMessageService

type PullMessageService struct {
	// contains filtered or unexported fields
}

func NewPullMessageService

func NewPullMessageService() *PullMessageService

type PullRequest

type PullRequest struct {
	// contains filtered or unexported fields
}

type QueryConsumerOffsetRequestHeader

type QueryConsumerOffsetRequestHeader struct {
	ConsumerGroup string `json:"consumerGroup"`
	Topic         string `json:"topic"`
	QueueId       int32  `json:"queueId"`
}

type QueueData

type QueueData struct {
	BrokerName     string
	ReadQueueNums  int32
	WriteQueueNums int32
	Perm           int
	TopicSynFlag   int32
}

type Rebalance

type Rebalance struct {
	// contains filtered or unexported fields
}

func NewRebalance

func NewRebalance() *Rebalance

type RemoteOffsetStore

type RemoteOffsetStore struct {
	// contains filtered or unexported fields
}

type RemotingClient

type RemotingClient interface {
	ScanResponseTable()
	// contains filtered or unexported methods
}

func NewDefaultRemotingClient

func NewDefaultRemotingClient() RemotingClient

type RemotingCommand

type RemotingCommand struct {
	// header
	Code     int    `json:"code"`
	Language string `json:"language"`
	Version  int    `json:"version"`
	Opaque   int32  `json:"opaque"`
	Flag     int    `json:"flag"`

	ExtFields interface{} `json:"extFields"`
	// body
	Body []byte `json:"body,omitempty"`
	// contains filtered or unexported fields
}

type ResponseFuture

type ResponseFuture struct {
	// contains filtered or unexported fields
}

type SendCallback

type SendCallback func() error

type SendMessageContext

type SendMessageContext struct {
	Message Message
	// contains filtered or unexported fields
}

type SendMessageRequestHeader

type SendMessageRequestHeader struct {
	ProducerGroup         string `json:"producerGroup"`
	Topic                 string `json:"topic"`
	DefaultTopic          string `json:"defaultTopic"`
	DefaultTopicQueueNums int    `json:"defaultTopicQueueNums"`
	QueueId               int32  `json:"queueId"`
	SysFlag               int    `json:"sysFlag"`
	BornTimestamp         int64  `json:"bornTimestamp"`
	Flag                  int32  `json:"flag"`
	Properties            string `json:"properties"`
	ReconsumeTimes        int    `json:"reconsumeTimes"`
	UnitMode              bool   `json:"unitMode"`
	MaxReconsumeTimes     int    `json:"maxReconsumeTimes"`
}

type SendMessageResponseHeader

type SendMessageResponseHeader struct {
	// contains filtered or unexported fields
}

type SendMessageService

type SendMessageService struct {
	// contains filtered or unexported fields
}

func NewSendMessageService

func NewSendMessageService() *SendMessageService

type SendRequest

type SendRequest struct {
	// contains filtered or unexported fields
}

type SendResult

type SendResult struct {
	// contains filtered or unexported fields
}

func NewSendResult

func NewSendResult(sendStatus int, msgId string, offsetMsgId string, messageQueue *MessageQueue, queueOffset int64) *SendResult

func (*SendResult) SendResult

func (s *SendResult) SendResult(SendStatus int, msgId string, messageQueue MessageQueue, queueOffset uint64,
	transactionId string, offsetMsgId string, regionId string) (ok bool)

type Service

type Service interface {
	// contains filtered or unexported methods
}

type SubscriptionData

type SubscriptionData struct {
	Topic           string
	SubString       string
	ClassFilterMode bool
	TagsSet         []string
	CodeSet         []string
	SubVersion      int64
}

type TopicPublishInfo

type TopicPublishInfo struct {
	// contains filtered or unexported fields
}

func NewTopicPublishInfo

func NewTopicPublishInfo() *TopicPublishInfo

type TopicRouteData

type TopicRouteData struct {
	OrderTopicConf string
	QueueDatas     []*QueueData
	BrokerDatas    []*BrokerData
}

type UpdateConsumerOffsetRequestHeader

type UpdateConsumerOffsetRequestHeader struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL