Documentation ¶
Overview ¶
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- func ParseRequestCode(code int32) string
- func ParseResponseCode(code int32) string
- type CommandCustomHeader
- type ConsumeConcurrentlyStatus
- type KVTable
- type Language
- type OffsetMovedEvent
- type RemotingCommand
- func CreateDefaultResponseCommand(customHeader ...CommandCustomHeader) *RemotingCommand
- func CreateRequestCommand(code int32, customHeader ...CommandCustomHeader) *RemotingCommand
- func CreateResponseCommand(code int32, remark string) *RemotingCommand
- func DecodeRemotingCommand(buf *bytes.Buffer) (*RemotingCommand, error)
- func (rc *RemotingCommand) Bytes() []byte
- func (rc *RemotingCommand) DecodeCommandCustomHeader(commandCustomHeader CommandCustomHeader) error
- func (rc *RemotingCommand) EncodeHeader() []byte
- func (rc *RemotingCommand) IsOnewayRPC() bool
- func (rc *RemotingCommand) IsResponseType() bool
- func (rc *RemotingCommand) MarkOnewayRPC()
- func (rc *RemotingCommand) MarkResponseType()
- func (j *RemotingCommand) MarshalJSON() ([]byte, error)
- func (j *RemotingCommand) MarshalJSONBuf(buf fflib.EncodingBuffer) error
- func (rc *RemotingCommand) String() string
- func (rc *RemotingCommand) Type() RemotingCommandType
- func (j *RemotingCommand) UnmarshalJSON(input []byte) error
- func (j *RemotingCommand) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
- type RemotingCommandType
Constants ¶
const ( SEND_MESSAGE = 10 // Broker 发送消息 PULL_MESSAGE = 11 // Broker 订阅消息 QUERY_MESSAGE = 12 // Broker 查询消息 QUERY_BROKER_OFFSET = 13 // Broker 查询Broker Offset QUERY_CONSUMER_OFFSET = 14 // Broker 查询Consumer Offset UPDATE_CONSUMER_OFFSET = 15 // Broker 更新Consumer Offset UPDATE_AND_CREATE_TOPIC = 17 // Broker 更新或者增加一个Topic GET_ALL_TOPIC_CONFIG = 21 // Broker 获取所有Topic的配置(Slave和Namesrv都会向Master请求此配置) GET_TOPIC_CONFIG_LIST = 22 // Broker 获取所有Topic配置(Slave和Namesrv都会向Master请求此配置) GET_TOPIC_NAME_LIST = 23 // Broker 获取所有Topic名称列表 UPDATE_BROKER_CONFIG = 25 // Broker 更新Broker上的配置 GET_BROKER_CONFIG = 26 // Broker 获取Broker上的配置 TRIGGER_DELETE_FILES = 27 // Broker 触发Broker删除文件 GET_BROKER_RUNTIME_INFO = 28 // Broker 获取Broker运行时信息 SEARCH_OFFSET_BY_TIMESTAMP = 29 // Broker 根据时间查询队列的Offset GET_MAX_OFFSET = 30 // Broker 查询队列最大Offset GET_MIN_OFFSET = 31 // Broker 查询队列最小Offset GET_EARLIEST_MSG_STORETIME = 32 // Broker 查询队列最早消息对应时间 VIEW_MESSAGE_BY_ID = 33 // Broker 根据消息ID来查询消息 HEART_BEAT = 34 // Broker Client向Client发送心跳,并注册自身 UNREGISTER_CLIENT = 35 // Broker Client注销 CONSUMER_SEND_MSG_BACK = 36 // Broker Consumer将处理不了的消息发回服务器 END_TRANSACTION = 37 // Broker Commit或者Rollback事务 GET_CONSUMER_LIST_BY_GROUP = 38 // Broker 获取ConsumerId列表通过GroupName CHECK_TRANSACTION_STATE = 39 // Broker 主动向Producer回查事务状态 NOTIFY_CONSUMER_IDS_CHANGED = 40 // Broker Broker通知Consumer列表变化 LOCK_BATCH_MQ = 41 // Broker Consumer向Master锁定队列 UNLOCK_BATCH_MQ = 42 // Broker Consumer向Master解锁队列 GET_ALL_CONSUMER_OFFSET = 43 // Broker 获取所有Consumer Offset GET_ALL_DELAY_OFFSET = 45 // Broker 获取所有定时进度 PUT_KV_CONFIG = 100 // Namesrv 向Namesrv追加KV配置 GET_KV_CONFIG = 101 // Namesrv 从Namesrv获取KV配置 DELETE_KV_CONFIG = 102 // Namesrv 从Namesrv获取KV配置 REGISTER_BROKER = 103 // Namesrv 注册一个Broker,数据都是持久化的,如果存在则覆盖配置 UNREGISTER_BROKER = 104 // Namesrv 卸载一个Broker,数据都是持久化的 GET_ROUTEINTO_BY_TOPIC = 105 // Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列) GET_BROKER_CLUSTER_INFO = 106 // Namesrv 获取注册到Name Server的所有Broker集群信息 UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200 // 创建或更新订阅组 GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201 // 订阅组配置 GET_TOPIC_STATS_INFO = 202 // 统计信息,获取Topic统计信息 GET_CONSUMER_CONNECTION_LIST = 203 // Consumer连接管理 GET_PRODUCER_CONNECTION_LIST = 204 // Producer连接管理 WIPE_WRITE_PERM_OF_BROKER = 205 // 优雅地向Broker写数据 GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206 // 从Name Server获取完整Topic列表 DELETE_SUBSCRIPTIONGROUP = 207 // 从Broker删除订阅组 GET_CONSUME_STATS = 208 // 从Broker获取消费状态(进度) SUSPEND_CONSUMER = 209 // Suspend Consumer消费过程 RESUME_CONSUMER = 210 // Resume Consumer消费过程 RESET_CONSUMER_OFFSET_IN_CONSUMER = 211 // 重置Consumer Offset RESET_CONSUMER_OFFSET_IN_BROKER = 212 // 重置Consumer Offset ADJUST_CONSUMER_THREAD_POOL = 213 // 调整Consumer线程池数量 WHO_CONSUME_THE_MESSAGE = 214 // 查询消息被哪些消费组消费 DELETE_TOPIC_IN_BROKER = 215 // 从Broker删除Topic配置 DELETE_TOPIC_IN_NAMESRV = 216 // 从Namesrv删除Topic配置 GET_KV_CONFIG_BY_VALUE = 217 // Namesrv 通过 project 获取所有的 server ip 信息 DELETE_KV_CONFIG_BY_VALUE = 218 // Namesrv 删除指定 project group 下的所有 server ip 信息 GET_KVLIST_BY_NAMESPACE = 219 // 通过NameSpace获取所有的KV List RESET_CONSUMER_CLIENT_OFFSET = 220 // offset 重置 GET_CONSUMER_STATUS_FROM_CLIENT = 221 // 客户端订阅消息 INVOKE_BROKER_TO_RESET_OFFSET = 222 // 通知 broker 调用 offset 重置处理 INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223 // 通知 broker 调用客户端订阅消息处理 QUERY_TOPIC_CONSUME_BY_WHO = 300 // Broker 查询topic被谁消费 2014-03-21 Add By shijia GET_TOPICS_BY_CLUSTER = 224 // 获取指定集群下的所有 topic 2014-03-26 REGISTER_FILTER_SERVER = 301 // 向Broker注册Filter Server 2014-04-06 Add By shijia REGISTER_MESSAGE_FILTER_CLASS = 302 // 向Filter Server注册Class 2014-04-06 Add By shijia QUERY_CONSUME_TIME_SPAN = 303 // 根据 topic 和 group 获取消息的时间跨度 GET_SYSTEM_TOPIC_LIST_FROM_NS = 304 // 从Namesrv获取所有系统内置 Topic 列表 GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305 // 从Broker获取所有系统内置 Topic 列表 CLEAN_EXPIRED_CONSUMEQUEUE = 306 // 清理失效队列 GET_CONSUMER_RUNNING_INFO = 307 // 通过Broker查询Consumer内存数据 2014-07-19 Add By shijia QUERY_CORRECTION_OFFSET = 308 // 查找被修正 offset (转发组件) CONSUME_MESSAGE_DIRECTLY = 309 // 通过Broker直接向某个Consumer发送一条消息,并立刻消费,返回结果给broker,再返回给调用方 SEND_MESSAGE_V2 = 310 // Broker 发送消息,优化网络数据包 GET_UNIT_TOPIC_LIST = 311 // 单元化相关 topic GET_HAS_UNIT_SUB_TOPIC_LIST = 312 // 获取含有单元化订阅组的 Topic 列表 GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313 // 获取含有单元化订阅组的非单元化 Topic 列表 CLONE_GROUP_OFFSET = 314 // 克隆某一个组的消费进度到新的组 VIEW_BROKER_STATS_DATA = 315 // 查看Broker上的各种统计信息 )
RequestCode: 内部传输协议码 Author: yintongqiang Since: 2017/8/10
const ( SUCCESS = 0 // 成功 SYSTEM_ERROR = 1 // 发生了未捕获异常 SYSTEM_BUSY = 2 // 由于线程池拥堵,系统繁忙 REQUEST_CODE_NOT_SUPPORTED = 3 // 请求代码不支持 TRANSACTION_FAILED = 4 // 事务失败,添加db失败 FLUSH_DISK_TIMEOUT = 10 // Broker 刷盘超时 SLAVE_NOT_AVAILABLE = 11 // Broker 同步双写,Slave不可用 FLUSH_SLAVE_TIMEOUT = 12 // Broker 同步双写,等待Slave应答超时 MESSAGE_ILLEGAL = 13 // Broker 消息非法 SERVICE_NOT_AVAILABLE = 14 // Broker, Namesrv 服务不可用,可能是正在关闭或者权限问题 VERSION_NOT_SUPPORTED = 15 // Broker, Namesrv 版本号不支持 NO_PERMISSION = 16 // Broker, Namesrv 无权限执行此操作,可能是发、收、或者其他操作 TOPIC_NOT_EXIST = 17 // Broker, Topic不存在 TOPIC_EXIST_ALREADY = 18 // Broker, Topic已经存在,创建Topic PULL_NOT_FOUND = 19 // Broker 拉消息未找到(请求的Offset等于最大Offset,最大Offset无对应消息) PULL_RETRY_IMMEDIATELY = 20 // Broker 可能被过滤,或者误通知等 PULL_OFFSET_MOVED = 21 // Broker 拉消息请求的Offset不合法,太小或太大 QUERY_NOT_FOUND = 22 // Broker 查询消息未找到 SUBSCRIPTION_PARSE_FAILED = 23 // Broker 订阅关系解析失败 SUBSCRIPTION_NOT_EXIST = 24 // Broker 订阅关系不存在 SUBSCRIPTION_NOT_LATEST = 25 // Broker 订阅关系不是最新的 SUBSCRIPTION_GROUP_NOT_EXIST = 26 // Broker 订阅组不存在 TRANSACTION_SHOULD_COMMIT = 200 // producer 事务应该被提交 TRANSACTION_SHOULD_ROLLBACK = 201 // producer 事务应该被回滚 TRANSACTION_STATE_UNKNOW = 202 // producer 事务状态未知 TRANSACTION_STATE_GROUP_WRONG = 203 // producer ProducerGroup错误 NO_BUYER_ID = 204 // 单元化消息,需要设置 buyerId NOT_IN_CURRENT_UNIT = 205 // 单元化消息,非本单元消息 CONSUMER_NOT_ONLINE = 206 // Consumer不在线 CONSUME_MSG_TIMEOUT = 207 // Consumer消费消息超时 )
ResponseCode: 响应码 Author: yintongqiang Since: 2017/8/16
const (
RemotingVersionKey = "boltmq.remoting.version"
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CommandCustomHeader ¶
type CommandCustomHeader interface {
CheckFields() error
}
CommandCustomHeader:头信息接口 Author: yintongqiang Since: 2017/8/16
type ConsumeConcurrentlyStatus ¶
type ConsumeConcurrentlyStatus int
ConsumeConcurrentlyStatus: 普通消费状态回执 Author: yintongqiang Since: 2017/8/10
const ( CONSUME_SUCCESS ConsumeConcurrentlyStatus = iota // Success consumption RECONSUME_LATER // Failure consumption,later try to consume )
func (ConsumeConcurrentlyStatus) String ¶
func (cct ConsumeConcurrentlyStatus) String() string
type KVTable ¶
func NewKVTable ¶
func NewKVTable() *KVTable
type OffsetMovedEvent ¶
type OffsetMovedEvent struct { ConsumerGroup string `json:"consumerGroup"` // 消费组名称 MessageQueue message.MessageQueue `json:"messageQueue"` // 消息Queue OffsetRequest int64 `json:"offsetRequest"` // 客户端请求的Offset OffsetNew int64 `json:"offsetNew"` // Broker要求从这个新的Offset开始消费 }
func NewOffsetMovedEvent ¶
func NewOffsetMovedEvent() *OffsetMovedEvent
type RemotingCommand ¶
type RemotingCommand struct { Code int32 `json:"code"` Language string `json:"language"` Version int32 `json:"version"` Opaque int32 `json:"opaque"` Flag int32 `json:"flag"` Remark string `json:"remark"` ExtFields map[string]string `json:"extFields"` // 请求拓展字段 CustomHeader CommandCustomHeader `json:"-"` // 修改字段类型,"CustomHeader"字段不序列化 2017/8/24 Modify by luoji, <gunsluo@gmail.com> Body []byte `json:"-"` // body字段不会被Encode()并进行网络传输 }
RemotingCommand remoting command Author: luoji, <gunsluo@gmail.com> Since: 2017-08-22
func CreateDefaultResponseCommand ¶
func CreateDefaultResponseCommand(customHeader ...CommandCustomHeader) *RemotingCommand
CreateResponseCommand 只有通信层内部会调用,业务不会调用
func CreateRequestCommand ¶
func CreateRequestCommand(code int32, customHeader ...CommandCustomHeader) *RemotingCommand
CreateRequestCommand 创建客户端请求信息 2017/8/16 Add by yintongqiang
func CreateResponseCommand ¶
func CreateResponseCommand(code int32, remark string) *RemotingCommand
CreateResponseCommand
func DecodeRemotingCommand ¶
func DecodeRemotingCommand(buf *bytes.Buffer) (*RemotingCommand, error)
DecodeRemotingCommand 解析返回RemotingCommand
func (*RemotingCommand) DecodeCommandCustomHeader ¶
func (rc *RemotingCommand) DecodeCommandCustomHeader(commandCustomHeader CommandCustomHeader) error
func (*RemotingCommand) EncodeHeader ¶
func (rc *RemotingCommand) EncodeHeader() []byte
EncodeHeader 编码头部
func (*RemotingCommand) IsOnewayRPC ¶
func (rc *RemotingCommand) IsOnewayRPC() bool
IsOnewayRPC is oneway rpc, return bool
func (*RemotingCommand) IsResponseType ¶
func (rc *RemotingCommand) IsResponseType() bool
IsResponseType is response type, return bool
func (*RemotingCommand) MarkOnewayRPC ¶
func (rc *RemotingCommand) MarkOnewayRPC()
MarkOnewayRPC mark oneway type
func (*RemotingCommand) MarkResponseType ¶
func (rc *RemotingCommand) MarkResponseType()
MarkResponseType mark response type
func (*RemotingCommand) MarshalJSON ¶
func (j *RemotingCommand) MarshalJSON() ([]byte, error)
MarshalJSON marshal bytes to json - template
func (*RemotingCommand) MarshalJSONBuf ¶
func (j *RemotingCommand) MarshalJSONBuf(buf fflib.EncodingBuffer) error
MarshalJSONBuf marshal buff to json - template
func (*RemotingCommand) Type ¶
func (rc *RemotingCommand) Type() RemotingCommandType
Type return remoting command type
func (*RemotingCommand) UnmarshalJSON ¶
func (j *RemotingCommand) UnmarshalJSON(input []byte) error
UnmarshalJSON umarshall json - template of ffjson
func (*RemotingCommand) UnmarshalJSONFFLexer ¶
func (j *RemotingCommand) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
UnmarshalJSONFFLexer fast json unmarshall - template ffjson
type RemotingCommandType ¶
type RemotingCommandType int
const ( REQUEST_COMMAND RemotingCommandType = iota // 请求命令 RESPONSE_COMMAND // 响应命令 )
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |