Documentation ¶
Index ¶
- Constants
- Variables
- func BrokerVIPChannel(isChange bool, brokerAddr string) (borkerAddrNew string)
- func GetLocalIp4() (ip string)
- type Admin
- type AllocateMessageQueueAveragely
- type AllocateMessageQueueStrategy
- type BrokerData
- type Config
- type Consumer
- type ConsumerData
- type ConsumerIdSorter
- type DefaultConsumer
- func (c *DefaultConsumer) RegisterMessageListener(messageListener MessageListener)
- func (c *DefaultConsumer) SendMessageBack(msg MessageExt, delayLevel int) error
- func (c *DefaultConsumer) SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error
- func (c *DefaultConsumer) Shutdown()
- func (c *DefaultConsumer) Start() error
- func (c *DefaultConsumer) Subscribe(topic string, subExpression string)
- func (c *DefaultConsumer) UnSubscribe(topic string)
- type DefaultProducer
- type DefaultRemotingClient
- type GetConsumerListByGroupRequestHeader
- type GetConsumerListByGroupResponseBody
- type GetRouteInfoRequestHeader
- type HeartbeatData
- type InvokeCallback
- type Message
- type MessageExt
- type MessageListener
- type MessageQueue
- type MessageQueues
- type MixAll
- type MqClient
- type OffsetStore
- type Producer
- type PullMessageRequestHeader
- type PullMessageService
- type PullRequest
- type QueryConsumerOffsetRequestHeader
- type QueueData
- type Rebalance
- type RemoteOffsetStore
- type RemotingClient
- type RemotingCommand
- type ResponseFuture
- type SendCallback
- type SendMessageContext
- type SendMessageRequestHeader
- type SendMessageResponseHeader
- type SendMessageService
- type SendRequest
- type SendResult
- type Service
- type SubscriptionData
- type TopicPublishInfo
- type TopicRouteData
- type UpdateConsumerOffsetRequestHeader
Constants ¶
View Source
const ( BrokerSuspendMaxTimeMillis = 1000 * 15 FlagCommitOffset int32 = 0x1 << 0 FlagSuspend int32 = 0x1 << 1 FlagSubscription int32 = 0x1 << 2 FlagClassFilter int32 = 0x1 << 3 )
View Source
const ( CompressedFlag = (0x1 << 0) MultiTagsFlag = (0x1 << 1) TransactionNotType = (0x0 << 2) TransactionPreparedType = (0x1 << 2) TransactionCommitType = (0x2 << 2) TransactionRollbackType = (0x3 << 2) )
View Source
const ( NameValueSeparator = 1 + iota PropertySeparator )
View Source
const ( RocketmqHomeEnv = "ROCKETMQ_HOME" RocketmqHomeProperty = "rocketmq.home.dir" NamesrvAddrEnv = "NAMESRV_ADDR" NamesrvAddrProperty = "rocketmq.namesrv.addr" MessageCompressLevel = "rocketmq.message.compressLevel" WsDomainName = "192.168.7.101" WsDomainSubgroup = "" WsAddr = "http://" + WsDomainName + ":8080/rocketmq/" + WsDomainSubgroup DefaultTopic = "TBW102" BenchmarkTopic = "BenchmarkTest" DefaultProducerGroup = "DEFAULT_PRODUCER" DefaultConsumerGroup = "DEFAULT_CONSUMER" ToolsConsumerGroup = "TOOLS_CONSUMER" FiltersrvConsumerGroup = "FILTERSRV_CONSUMER" MonitrorConsumerGroup = "__MONITOR_CONSUMER" ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER" SelfTestProducerGroup = "SELF_TEST_P_GROUP" SelfTestConsumerGroup = "SELF_TEST_C_GROUP" SelfTestTopic = "SELF_TEST_TOPIC" OffsetMovedEvent = "OFFSET_MOVED_EVENT" OnsHttpProxyGroup = "CID_ONS-HTTP-PROXY" CidOnsapiPermissionGroup = "CID_ONSAPI_PERMISSION" CidOnsapiOwnerGroup = "CID_ONSAPI_OWNER" CidOnsapiPullGroup = "CID_ONSAPI_PULL" CidRmqSysPerfix = "CID_RMQ_SYS_" Localhost = "127.0.0.1" DefaultCharset = "UTF-8" MasterId = 0 RetryGroupTopicPrefix = "%RETRY%" DlqGroupTopicPerfix = "%DLQ%" SysTopicPerfix = "rmq_sys_" UniqMsgQueryFlag = "_UNIQUE_KEY_QUERY" )
View Source
const ( Sync = iota Async Oneway )
communicationMode
View Source
const ( CreateJust = iota Running ShutdownAlready StartFailed )
ServiceState
View Source
const ( RpcType = 0 RpcOneway = 1 )
View Source
const ( // Broker 发送消息 SendMsg = 10 // Broker 订阅消息 PullMsg = 11 // Broker 查询消息 QueryMESSAGE = 12 // Broker 查询Broker Offset QueryBrokerOffset = 13 // Broker 查询Consumer Offset QueryConsumerOffset = 14 // Broker 更新Consumer Offset UpdateCconsumerOffset = 15 // Broker 更新或者增加一个Topic UpdateAndCreateTopic = 17 // Broker 获取所有Topic的配置(Slave和Namesrv都会向Master请求此配置) GetAllTopicConfig = 21 // Broker 获取所有Topic配置(Slave和Namesrv都会向Master请求此配置) GetTopicConfigList = 22 // Broker 获取所有Topic名称列表 GetTopicNameList = 23 // Broker 更新Broker上的配置 UpdateBrokerConfig = 25 // Broker 获取Broker上的配置 GetBrokerConfig = 26 // Broker 触发Broker删除文件 TriggerDeleteFILES = 27 // Broker 获取Broker运行时信息 GetBrokerRuntimeInfo = 28 // Broker 根据时间查询队列的Offset SearchOffsetByTimeStamp = 29 // Broker 查询队列最大Offset GetMaxOffset = 30 // Broker 查询队列最小Offset GetMinOffset = 31 // Broker 查询队列最早消息对应时间 GetEarliestMsgStoreTime = 32 // Broker 根据消息ID来查询消息 ViewMsgById = 33 // Broker Client向Client发送心跳,并注册自身 HeartBeat = 34 // Broker Client注销 UnregisterClient = 35 // Broker Consumer将处理不了的消息发回服务器 CconsumerSendMsgBack = 36 // Broker Commit或者Rollback事务 EndTransaction = 37 // Broker 获取ConsumerId列表通过GroupName GetConsumerListByGroup = 38 // Broker 主动向Producer回查事务状态 CheckTransactionState = 39 // Broker Broker通知Consumer列表变化 NotifyConsumerIdsChanged = 40 // Broker Consumer向Master锁定队列 LockBatchMq = 41 // Broker Consumer向Master解锁队列 UNLockBatchMq = 42 // Broker 获取所有Consumer Offset GetAllCconsumerOffset = 43 // Broker 获取所有定时进度 GetAllDelayOffset = 45 // Namesrv 向Namesrv追加KV配置 PutKVConfig = 100 // Namesrv 从Namesrv获取KV配置 GetKVConfig = 101 // Namesrv 从Namesrv获取KV配置 DeleteKVConfig = 102 // Namesrv 注册一个Broker,数据都是持久化的,如果存在则覆盖配置 RegisterBroker = 103 // Namesrv 卸载一个Broker,数据都是持久化的 UnregisterBroker = 104 // Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列) GetRouteinfoByTopic = 105 // Namesrv 获取注册到Name Server的所有Broker集群信息 GetBrokerClusterInfo = 106 UpdateAndCreateSubscriptionGroup = 200 GetAllSubscriptionGroupConfig = 201 GetTopicStatsInfo = 202 GetConsumerConnList = 203 GetProducerConnList = 204 WipeWritePermOfBroker = 205 // 从Name Server获取完整Topic列表 GetAllTopicListFromNamesrv = 206 // 从Broker删除订阅组 DeleteSubscriptionGroup = 207 // 从Broker获取消费状态(进度) GetConsumeStats = 208 // Suspend Consumer消费过程 SuspendConsumer = 209 // Resume Consumer消费过程 ResumeConsumer = 210 // 重置Consumer Offset ResetCconsumerOffsetInConsumer = 211 // 重置Consumer Offset ResetCconsumerOffsetInBroker = 212 // 调整Consumer线程池数量 AdjustCconsumerThreadPoolPOOL = 213 // 查询消息被哪些消费组消费 WhoConsumeTHE_MESSAGE = 214 // 从Broker删除Topic配置 DeleteTopicInBroker = 215 // 从Namesrv删除Topic配置 DeleteTopicInNamesrv = 216 // Namesrv 通过 project 获取所有的 server ip 信息 GetKvConfigByValue = 217 // Namesrv 删除指定 project group 下的所有 server ip 信息 DeleteKvConfigByValue = 218 // 通过NameSpace获取所有的KV List GetKvlistByNamespace = 219 // offset 重置 ResetCconsumerClientOffset = 220 // 客户端订阅消息 GetCconsumerStatusFromClient = 221 // 通知 broker 调用 offset 重置处理 InvokeBrokerToResetOffset = 222 // 通知 broker 调用客户端订阅消息处理 InvokeBrokerToGetCconsumerSTATUS = 223 // Broker 查询topic被谁消费 // 2014-03-21 Add By shijia QueryTopicConsumeByWho = 300 // 获取指定集群下的所有 topic // 2014-03-26 GetTopicsByCluster = 224 // 向Broker注册Filter Server // 2014-04-06 Add By shijia RegisterFilterServer = 301 // 向Filter Server注册Class // 2014-04-06 Add By shijia RegisterMsgFilterClass = 302 // 根据 topic 和 group 获取消息的时间跨度 QueryConsumeTimeSpan = 303 // 获取所有系统内置 Topic 列表 GetSysTopicListFromNS = 304 GetSysTopicListFromBroker = 305 // 清理失效队列 CleanExpiredConsumequeue = 306 // 通过Broker查询Consumer内存数据 // 2014-07-19 Add By shijia GetCconsumerRunningInfo = 307 // 查找被修正 offset (转发组件) QueryCorrectionOffset = 308 // 通过Broker直接向某个Consumer发送一条消息,并立刻消费,返回结果给broker,再返回给调用方 // 2014-08-11 Add By shijia ConsumeMsgDirectly = 309 // Broker 发送消息,优化网络数据包 SendMsgV2 = 310 // 单元化相关 topic GetUnitTopicList = 311 // 获取含有单元化订阅组的 Topic 列表 GetHasUnitSubTopicList = 312 // 获取含有单元化订阅组的非单元化 Topic 列表 GetHasUnitSubUnunitTopicList = 313 // 克隆某一个组的消费进度到新的组 CloneGroupOffset = 314 // 查看Broker上的各种统计信息 ViewBrokerStatsData = 315 )
View Source
const ( // 成功 Success = 0 // 发生了未捕获异常 SysError = 1 // 由于线程池拥堵,系统繁忙 SysBusy = 2 // 请求代码不支持 RequestCodeNotSupported = 3 //事务失败,添加db失败 TransactionFailed = 4 // Broker 刷盘超时 FlushDiskTimeout = 10 // Broker 同步双写,Slave不可用 SlaveNotAvailable = 11 // Broker 同步双写,等待Slave应答超时 FlushSlaveTimeout = 12 // Broker 消息非法 MsgIllegal = 13 // Broker, Namesrv 服务不可用,可能是正在关闭或者权限问题 ServiceNotAvailable = 14 // Broker, Namesrv 版本号不支持 VersionNotSupported = 15 // Broker, Namesrv 无权限执行此操作,可能是发、收、或者其他操作 NoPermission = 16 // Broker, Topic不存在 TopicNotExist = 17 // Broker, Topic已经存在,创建Topic TopicExistAlready = 18 // Broker 拉消息未找到(请求的Offset等于最大Offset,最大Offset无对应消息) PullNotFound = 19 // Broker 可能被过滤,或者误通知等 PullRetryImmediately = 20 // Broker 拉消息请求的Offset不合法,太小或太大 PullOffsetMoved = 21 // Broker 查询消息未找到 QueryNotFound = 22 // Broker 订阅关系解析失败 SubscriptionParseFailed = 23 // Broker 订阅关系不存在 SubscriptionNotExist = 24 // Broker 订阅关系不是最新的 SubscriptionNotLatest = 25 // Broker 订阅组不存在 SubscriptionGroupNotExist = 26 // Producer 事务应该被提交 TransactionShouldCommit = 200 // Producer 事务应该被回滚 TransactionShouldRollback = 201 // Producer 事务状态未知 TransactionStateUnknow = 202 // Producer ProducerGroup错误 TransactionStateGroupWrong = 203 // 单元化消息,需要设置 buyerId NoBuyerId = 204 // 单元化消息,非本单元消息 NotInCurrentUint = 205 // Consumer不在线 ConsumerNotOnline = 206 // Consumer消费消息超时 ConsumeMsgTimeout = 207 )
View Source
const ( NormalMsg = iota TransMsgHalf TransMsgCommit DelayMsg )
View Source
const ( SendStatusOK = iota SendStatusFlushDiskTimeout SendStatusFlushSlaveTimeout SendStatusSlaveNotAvailable )
View Source
const ( MemoryFirstThenStore = 0 ReadFromMemory = 1 ReadFromStore = 2 )
View Source
const (
CharacterMaxLength = 255
)
Variables ¶
View Source
var (
ConfigVersion int = -1
)
View Source
var DefaultIp = GetLocalIp4()
View Source
var MessageClientIDSetter = messageClientIDSetter{ // contains filtered or unexported fields }
View Source
var MessageConst = &messageConst{
PropertyKeys: "KEYS",
PropertyTags: "TAGS",
PropertyWaitStoreMsgOk: "WAIT",
PropertyDelayTimeLevel: "DELAY",
PropertyRetryTopic: "RETRY_TOPIC",
PropertyRealTopic: "REAL_TOPIC",
PropertyRealQueueId: "REAL_QID",
PropertyTransactionPrepared: "TRAN_MSG",
PropertyProducerGroup: "PGROUP",
PropertyMinOffset: "MIN_OFFSET",
PropertyMaxOffset: "MAX_OFFSET",
PropertyBuyerId: "BUYER_ID",
PropertyOriginMessageId: "ORIGIN_MESSAGE_ID",
PropertyTransferFlag: "TRANSFER_FLAG",
PropertyCorrectionFlag: "CORRECTION_FLAG",
PropertyMq2Flag: "MQ2_FLAG",
PropertyReconsumeTime: "RECONSUME_TIME",
PropertyMsgRegion: "MSG_REGION",
PropertyUniqClientMessageIdKeyidx: "UNIQ_KEY",
PropertyMaxReconsumeTimes: "MAX_RECONSUME_TIMES",
PropertyConsumeStartTimeStamp: "CONSUME_START_TIME",
KeySeparator: "",
}
View Source
var PermName = permName{
PermPriority: 0x1 << 3,
PermRead: 0x1 << 2,
PermWrite: 0x1 << 1,
PermInherit: 0x1 << 0,
}
Functions ¶
func BrokerVIPChannel ¶
Types ¶
type AllocateMessageQueueAveragely ¶
type AllocateMessageQueueAveragely struct{}
type AllocateMessageQueueStrategy ¶
type AllocateMessageQueueStrategy interface {
// contains filtered or unexported methods
}
type BrokerData ¶
type Consumer ¶
type Consumer interface { //Admin Start() error Shutdown() RegisterMessageListener(listener MessageListener) Subscribe(topic string, subExpression string) UnSubscribe(topic string) SendMessageBack(msg MessageExt, delayLevel int) error SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error // contains filtered or unexported methods }
type ConsumerData ¶
type ConsumerIdSorter ¶
type ConsumerIdSorter []string
func (ConsumerIdSorter) Len ¶
func (r ConsumerIdSorter) Len() int
func (ConsumerIdSorter) Less ¶
func (r ConsumerIdSorter) Less(i, j int) bool
func (ConsumerIdSorter) Swap ¶
func (r ConsumerIdSorter) Swap(i, j int)
type DefaultConsumer ¶
type DefaultConsumer struct {
// contains filtered or unexported fields
}
func (*DefaultConsumer) RegisterMessageListener ¶
func (c *DefaultConsumer) RegisterMessageListener(messageListener MessageListener)
func (*DefaultConsumer) SendMessageBack ¶
func (c *DefaultConsumer) SendMessageBack(msg MessageExt, delayLevel int) error
func (*DefaultConsumer) SendMessageBack1 ¶
func (c *DefaultConsumer) SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error
func (*DefaultConsumer) Shutdown ¶
func (c *DefaultConsumer) Shutdown()
func (*DefaultConsumer) Start ¶
func (c *DefaultConsumer) Start() error
func (*DefaultConsumer) Subscribe ¶
func (c *DefaultConsumer) Subscribe(topic string, subExpression string)
func (*DefaultConsumer) UnSubscribe ¶
func (c *DefaultConsumer) UnSubscribe(topic string)
type DefaultProducer ¶
type DefaultProducer struct {
// contains filtered or unexported fields
}
func (*DefaultProducer) Send ¶
func (d *DefaultProducer) Send(msg *Message) (*SendResult, error)
func (*DefaultProducer) SendAsync ¶
func (d *DefaultProducer) SendAsync(msg *Message, sendCallback SendCallback) (err error)
func (*DefaultProducer) SendOneway ¶
func (d *DefaultProducer) SendOneway(msg *Message) error
func (*DefaultProducer) Shutdown ¶
func (d *DefaultProducer) Shutdown()
func (*DefaultProducer) Start ¶
func (d *DefaultProducer) Start() error
type DefaultRemotingClient ¶
type DefaultRemotingClient struct {
// contains filtered or unexported fields
}
func (*DefaultRemotingClient) ScanResponseTable ¶
func (d *DefaultRemotingClient) ScanResponseTable()
type GetConsumerListByGroupRequestHeader ¶
type GetConsumerListByGroupRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
}
type GetConsumerListByGroupResponseBody ¶
type GetConsumerListByGroupResponseBody struct {
ConsumerIdList []string
}
type GetRouteInfoRequestHeader ¶
type GetRouteInfoRequestHeader struct {
// contains filtered or unexported fields
}
func (*GetRouteInfoRequestHeader) MarshalJSON ¶
func (g *GetRouteInfoRequestHeader) MarshalJSON() ([]byte, error)
type HeartbeatData ¶
type HeartbeatData struct { ClientId string ConsumerDataSet []*ConsumerData }
type InvokeCallback ¶
type InvokeCallback func(responseFuture *ResponseFuture)
type Message ¶
func NewMessage ¶
type MessageExt ¶
type MessageListener ¶
type MessageListener func(msgs []*MessageExt) error
type MessageQueue ¶
type MessageQueue struct {
// contains filtered or unexported fields
}
func NewMessageQueue ¶
func NewMessageQueue(topic string, brokerName string, queueId int32) *MessageQueue
type MessageQueues ¶
type MessageQueues []*MessageQueue
func (MessageQueues) Len ¶
func (m MessageQueues) Len() int
func (MessageQueues) Less ¶
func (m MessageQueues) Less(i, j int) bool
func (MessageQueues) Swap ¶
func (m MessageQueues) Swap(i, j int)
type MqClient ¶
type MqClient struct {
// contains filtered or unexported fields
}
func NewMqClient ¶
func NewMqClient() *MqClient
type OffsetStore ¶
type OffsetStore interface {
// contains filtered or unexported methods
}
type Producer ¶
type Producer interface { Start() error Shutdown() Send(msg *Message) (*SendResult, error) SendAsync(msg *Message, sendCallback SendCallback) error SendOneway(msg *Message) error }
type PullMessageRequestHeader ¶
type PullMessageRequestHeader struct { ConsumerGroup string `json:"consumerGroup"` Topic string `json:"topic"` QueueId int32 `json:"queueId"` QueueOffset int64 `json:"queueOffset"` MaxMsgNums int32 `json:"maxMsgNums"` SysFlag int32 `json:"sysFlag"` CommitOffset int64 `json:"commitOffset"` SuspendTimeoutMillis int64 `json:"suspendTimeoutMillis"` Subscription string `json:"subscription"` SubVersion int64 `json:"subVersion"` }
type PullMessageService ¶
type PullMessageService struct {
// contains filtered or unexported fields
}
func NewPullMessageService ¶
func NewPullMessageService() *PullMessageService
type PullRequest ¶
type PullRequest struct {
// contains filtered or unexported fields
}
type Rebalance ¶
type Rebalance struct {
// contains filtered or unexported fields
}
func NewRebalance ¶
func NewRebalance() *Rebalance
type RemoteOffsetStore ¶
type RemoteOffsetStore struct {
// contains filtered or unexported fields
}
type RemotingClient ¶
type RemotingClient interface { ScanResponseTable() // contains filtered or unexported methods }
func NewDefaultRemotingClient ¶
func NewDefaultRemotingClient() RemotingClient
type RemotingCommand ¶
type RemotingCommand struct { // header Code int `json:"code"` Language string `json:"language"` Version int `json:"version"` Opaque int32 `json:"opaque"` Flag int `json:"flag"` ExtFields interface{} `json:"extFields"` // body Body []byte `json:"body,omitempty"` // contains filtered or unexported fields }
type ResponseFuture ¶
type ResponseFuture struct {
// contains filtered or unexported fields
}
type SendCallback ¶
type SendCallback func() error
type SendMessageContext ¶
type SendMessageContext struct { Message Message // contains filtered or unexported fields }
type SendMessageRequestHeader ¶
type SendMessageRequestHeader struct { ProducerGroup string `json:"producerGroup"` Topic string `json:"topic"` DefaultTopic string `json:"defaultTopic"` DefaultTopicQueueNums int `json:"defaultTopicQueueNums"` QueueId int32 `json:"queueId"` SysFlag int `json:"sysFlag"` BornTimestamp int64 `json:"bornTimestamp"` Flag int32 `json:"flag"` Properties string `json:"properties"` ReconsumeTimes int `json:"reconsumeTimes"` UnitMode bool `json:"unitMode"` MaxReconsumeTimes int `json:"maxReconsumeTimes"` }
type SendMessageResponseHeader ¶
type SendMessageResponseHeader struct {
// contains filtered or unexported fields
}
type SendMessageService ¶
type SendMessageService struct {
// contains filtered or unexported fields
}
func NewSendMessageService ¶
func NewSendMessageService() *SendMessageService
type SendRequest ¶
type SendRequest struct {
// contains filtered or unexported fields
}
type SendResult ¶
type SendResult struct {
// contains filtered or unexported fields
}
func NewSendResult ¶
func NewSendResult(sendStatus int, msgId string, offsetMsgId string, messageQueue *MessageQueue, queueOffset int64) *SendResult
func (*SendResult) SendResult ¶
func (s *SendResult) SendResult(SendStatus int, msgId string, messageQueue MessageQueue, queueOffset uint64, transactionId string, offsetMsgId string, regionId string) (ok bool)
type SubscriptionData ¶
type TopicPublishInfo ¶
type TopicPublishInfo struct {
// contains filtered or unexported fields
}
func NewTopicPublishInfo ¶
func NewTopicPublishInfo() *TopicPublishInfo
type TopicRouteData ¶
type TopicRouteData struct { OrderTopicConf string QueueDatas []*QueueData BrokerDatas []*BrokerData }
type UpdateConsumerOffsetRequestHeader ¶
type UpdateConsumerOffsetRequestHeader struct {
// contains filtered or unexported fields
}
Source Files ¶
- admin.go
- consumer.go
- init.go
- message.go
- message_client_id_setter.go
- message_const.go
- message_queue.go
- mix_all.go
- mq_client.go
- net.go
- perm.go
- producer.go
- pull_message.go
- rebalance.go
- remote_cmd.go
- remoting_client.go
- request_code.go
- response_code.go
- send_message.go
- send_message_context.go
- send_message_response_header.go
- send_result.go
- store.go
- topic_publish_info.go
Click to show internal directories.
Click to hide internal directories.