Documentation ¶
Index ¶
- Constants
- Variables
- func Debug(format string, args ...interface{})
- func Error(format string, args ...interface{})
- func Fatal(format string, args ...interface{})
- func GetLocalIp4() (ip string)
- func HashCode(s string) int32
- func Info(format string, args ...interface{})
- func SetLog(log *logs.BeeLogger)
- func Trace(format string, args ...interface{})
- func UnixNano() int64
- func Warn(format string, args ...interface{})
- type Admin
- type AllocateMessageQueueAveragely
- type AllocateMessageQueueStrategy
- type BrokerData
- type BrokerRuntimeInfo
- type ClusterInfo
- type Config
- type Connection
- type ConsumeStats
- type Consumer
- type ConsumerConnection
- type ConsumerData
- type ConsumerGroup
- type ConsumerIdSorter
- type ConsumerProgress
- type DefalutRemotingClient
- type DefaultAdmin
- func (admin *DefaultAdmin) Close() error
- func (admin *DefaultAdmin) CreateTopic(topicName, clusterName string, readQueueNum int, writeQueueNum int, order bool) (bool, error)
- func (admin *DefaultAdmin) FetchBrokerRuntimeStats(brokerAddr string) (*BrokerRuntimeInfo, error)
- func (admin *DefaultAdmin) MessageTrackDetail(msg *MessageExt) ([]*MessageTrack, error)
- func (admin *DefaultAdmin) QueryConsumerConnection(consumeGroupId string) (*ConsumerConnection, error)
- func (admin *DefaultAdmin) QueryConsumerGroupByTopic(topic string) ([]string, error)
- func (admin *DefaultAdmin) QueryConsumerProgress(consumeGroupId string) (*ConsumerProgress, error)
- func (admin *DefaultAdmin) QueryMessageById(msgId string) (*MessageExt, error)
- func (admin *DefaultAdmin) SearchClusterInfo() (*ClusterInfo, error)
- func (admin *DefaultAdmin) SearchTopic() (*TopicData, error)
- type DefaultConsumer
- func (self *DefaultConsumer) RegisterMessageListener(messageListener MessageListener)
- func (self *DefaultConsumer) SendMessageBack(msg MessageExt, delayLevel int) error
- func (self *DefaultConsumer) SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error
- func (self *DefaultConsumer) Shutdown()
- func (self *DefaultConsumer) Start() error
- func (self *DefaultConsumer) Subscribe(topic string, subExpression string)
- func (self *DefaultConsumer) UnSubcribe(topic string)
- type GetConsumeStatsRequestHeader
- type GetConsumerConnectionListRequestHeader
- type GetConsumerListByGroupRequestHeader
- type GetConsumerListByGroupResponseBody
- type GetKVConfigRequestHeader
- type GetKVConfigResponseHeader
- type GetRouteInfoRequestHeader
- type GroupList
- type HeartbeatData
- type InvokeCallback
- type Message
- type MessageExt
- type MessageListener
- type MessageQueue
- type MessageQueues
- type MessageTrack
- type MqClient
- type OffsetStore
- type OffsetWrapper
- type PullMessageRequestHeader
- type PullMessageService
- type PullRequest
- type QueryConsumerOffsetRequestHeader
- type QueryTopicConsumeByWhoRequestHeader
- type QueueData
- type Rebalance
- type RemoteOffsetStore
- type RemotingClient
- type RemotingCommand
- type ResponseFuture
- type SubscriptionData
- type TopicData
- type TopicRouteData
- type UpdateConsumerOffsetRequestHeader
- type ViewMessageRequestHeader
Constants ¶
View Source
const ( RETRY_GROUP_TOPIC_PREFIX = "%RETRY%" MASTER_ID = 0 )
View Source
const ( BrokerSuspendMaxTimeMillis = 1000 * 15 FLAG_COMMIT_OFFSET int32 = 0x1 << 0 FLAG_SUSPEND int32 = 0x1 << 1 FLAG_SUBSCRIPTION int32 = 0x1 << 2 FLAG_CLASS_FILTER int32 = 0x1 << 3 )
View Source
const ( NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG" NAMESPACE_PROJECT_CONFIG = "PROJECT_CONFIG" )
View Source
const ( RPC_TYPE int = 0 RPC_ONEWAYint = 1 LANGUAGE = "JAVA" )
View Source
const ( // Broker 发送消息 SEND_MESSAGE = 10 // Broker 订阅消息 PULL_MESSAGE = 11 // Broker 查询消息 QUERY_MESSAGE = 12 // Broker 查询Broker Offset QUERY_BROKER_OFFSET = 13 // Broker 查询Consumer Offset QUERY_CONSUMER_OFFSET = 14 // Broker 更新Consumer Offset UPDATE_CONSUMER_OFFSET = 15 // Broker 更新或者增加一个Topic UPDATE_AND_CREATE_TOPIC = 17 // Broker 获取所有Topic的配置(Slave和Namesrv都会向Master请求此配置) GET_ALL_TOPIC_CONFIG = 21 // Broker 获取所有Topic配置(Slave和Namesrv都会向Master请求此配置) GET_TOPIC_CONFIG_LIST = 22 // Broker 获取所有Topic名称列表 GET_TOPIC_NAME_LIST = 23 // Broker 更新Broker上的配置 UPDATE_BROKER_CONFIG = 25 // Broker 获取Broker上的配置 GET_BROKER_CONFIG = 26 // Broker 触发Broker删除文件 TRIGGER_DELETE_FILES = 27 // Broker 获取Broker运行时信息 GET_BROKER_RUNTIME_INFO = 28 // Broker 根据时间查询队列的Offset SEARCH_OFFSET_BY_TIMESTAMP = 29 // Broker 查询队列最大Offset GET_MAX_OFFSET = 30 // Broker 查询队列最小Offset GET_MIN_OFFSET = 31 // Broker 查询队列最早消息对应时间 GET_EARLIEST_MSG_STORETIME = 32 // Broker 根据消息ID来查询消息 VIEW_MESSAGE_BY_ID = 33 // Broker Client向Client发送心跳,并注册自身 HEART_BEAT = 34 // Broker Client注销 UNREGISTER_CLIENT = 35 // Broker Consumer将处理不了的消息发回服务器 CONSUMER_SEND_MSG_BACK = 36 // Broker Commit或者Rollback事务 END_TRANSACTION = 37 // Broker 获取ConsumerId列表通过GroupName GET_CONSUMER_LIST_BY_GROUP = 38 // Broker 主动向Producer回查事务状态 CHECK_TRANSACTION_STATE = 39 // Broker Broker通知Consumer列表变化 NOTIFY_CONSUMER_IDS_CHANGED = 40 // Broker Consumer向Master锁定队列 LOCK_BATCH_MQ = 41 // Broker Consumer向Master解锁队列 UNLOCK_BATCH_MQ = 42 // Broker 获取所有Consumer Offset GET_ALL_CONSUMER_OFFSET = 43 // Broker 获取所有定时进度 GET_ALL_DELAY_OFFSET = 45 // Namesrv 向Namesrv追加KV配置 PUT_KV_CONFIG = 100 // Namesrv 从Namesrv获取KV配置 GET_KV_CONFIG = 101 // Namesrv 从Namesrv获取KV配置 DELETE_KV_CONFIG = 102 // Namesrv 注册一个Broker,数据都是持久化的,如果存在则覆盖配置 REGISTER_BROKER = 103 // Namesrv 卸载一个Broker,数据都是持久化的 UNREGISTER_BROKER = 104 // Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列) GET_ROUTEINTO_BY_TOPIC = 105 // Namesrv 获取注册到Name Server的所有Broker集群信息 GET_BROKER_CLUSTER_INFO = 106 UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200 GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201 GET_TOPIC_STATS_INFO = 202 GET_CONSUMER_CONNECTION_LIST = 203 GET_PRODUCER_CONNECTION_LIST = 204 WIPE_WRITE_PERM_OF_BROKER = 205 // 从Name Server获取完整Topic列表 GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206 // 从Broker删除订阅组 DELETE_SUBSCRIPTIONGROUP = 207 // 从Broker获取消费状态(进度) GET_CONSUME_STATS = 208 // Suspend Consumer消费过程 SUSPEND_CONSUMER = 209 // Resume Consumer消费过程 RESUME_CONSUMER = 210 // 重置Consumer Offset RESET_CONSUMER_OFFSET_IN_CONSUMER = 211 // 重置Consumer Offset RESET_CONSUMER_OFFSET_IN_BROKER = 212 // 调整Consumer线程池数量 ADJUST_CONSUMER_THREAD_POOL = 213 // 查询消息被哪些消费组消费 WHO_CONSUME_THE_MESSAGE = 214 // 从Broker删除Topic配置 DELETE_TOPIC_IN_BROKER = 215 // 从Namesrv删除Topic配置 DELETE_TOPIC_IN_NAMESRV = 216 // Namesrv 通过 project 获取所有的 server ip 信息 GET_KV_CONFIG_BY_VALUE = 217 // Namesrv 删除指定 project group 下的所有 server ip 信息 DELETE_KV_CONFIG_BY_VALUE = 218 // 通过NameSpace获取所有的KV List GET_KVLIST_BY_NAMESPACE = 219 // offset 重置 RESET_CONSUMER_CLIENT_OFFSET = 220 // 客户端订阅消息 GET_CONSUMER_STATUS_FROM_CLIENT = 221 // 通知 broker 调用 offset 重置处理 INVOKE_BROKER_TO_RESET_OFFSET = 222 // 通知 broker 调用客户端订阅消息处理 INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223 // Broker 查询topic被谁消费 // 2014-03-21 Add By shijia QUERY_TOPIC_CONSUME_BY_WHO = 300 // 获取指定集群下的所有 topic // 2014-03-26 GET_TOPICS_BY_CLUSTER = 224 // 向Broker注册Filter Server // 2014-04-06 Add By shijia REGISTER_FILTER_SERVER = 301 // 向Filter Server注册Class // 2014-04-06 Add By shijia REGISTER_MESSAGE_FILTER_CLASS = 302 // 根据 topic 和 group 获取消息的时间跨度 QUERY_CONSUME_TIME_SPAN = 303 // 获取所有系统内置 Topic 列表 GET_SYSTEM_TOPIC_LIST_FROM_NS = 304 GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305 // 清理失效队列 CLEAN_EXPIRED_CONSUMEQUEUE = 306 // 通过Broker查询Consumer内存数据 // 2014-07-19 Add By shijia GET_CONSUMER_RUNNING_INFO = 307 // 查找被修正 offset (转发组件) QUERY_CORRECTION_OFFSET = 308 // 通过Broker直接向某个Consumer发送一条消息,并立刻消费,返回结果给broker,再返回给调用方 // 2014-08-11 Add By shijia CONSUME_MESSAGE_DIRECTLY = 309 // Broker 发送消息,优化网络数据包 SEND_MESSAGE_V2 = 310 // 单元化相关 topic GET_UNIT_TOPIC_LIST = 311 // 获取含有单元化订阅组的 Topic 列表 GET_HAS_UNIT_SUB_TOPIC_LIST = 312 // 获取含有单元化订阅组的非单元化 Topic 列表 GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313 // 克隆某一个组的消费进度到新的组 CLONE_GROUP_OFFSET = 314 // 查看Broker上的各种统计信息 VIEW_BROKER_STATS_DATA = 315 )
View Source
const ( // 成功 SUCCESS = 0 // 发生了未捕获异常 SYSTEM_ERROR = 1 // 由于线程池拥堵,系统繁忙 SYSTEM_BUSY = 2 // 请求代码不支持 REQUEST_CODE_NOT_SUPPORTED = 3 //事务失败,添加db失败 TRANSACTION_FAILED = 4 // Broker 刷盘超时 FLUSH_DISK_TIMEOUT = 10 // Broker 同步双写,Slave不可用 SLAVE_NOT_AVAILABLE = 11 // Broker 同步双写,等待Slave应答超时 FLUSH_SLAVE_TIMEOUT = 12 // Broker 消息非法 MESSAGE_ILLEGAL = 13 // Broker, Namesrv 服务不可用,可能是正在关闭或者权限问题 SERVICE_NOT_AVAILABLE = 14 // Broker, Namesrv 版本号不支持 VERSION_NOT_SUPPORTED = 15 // Broker, Namesrv 无权限执行此操作,可能是发、收、或者其他操作 NO_PERMISSION = 16 // Broker, Topic不存在 TOPIC_NOT_EXIST = 17 // Broker, Topic已经存在,创建Topic TOPIC_EXIST_ALREADY = 18 // Broker 拉消息未找到(请求的Offset等于最大Offset,最大Offset无对应消息) PULL_NOT_FOUND = 19 // Broker 可能被过滤,或者误通知等 PULL_RETRY_IMMEDIATELY = 20 // Broker 拉消息请求的Offset不合法,太小或太大 PULL_OFFSET_MOVED = 21 // Broker 查询消息未找到 QUERY_NOT_FOUND = 22 // Broker 订阅关系解析失败 SUBSCRIPTION_PARSE_FAILED = 23 // Broker 订阅关系不存在 SUBSCRIPTION_NOT_EXIST = 24 // Broker 订阅关系不是最新的 SUBSCRIPTION_NOT_LATEST = 25 // Broker 订阅组不存在 SUBSCRIPTION_GROUP_NOT_EXIST = 26 // Producer 事务应该被提交 TRANSACTION_SHOULD_COMMIT = 200 // Producer 事务应该被回滚 TRANSACTION_SHOULD_ROLLBACK = 201 // Producer 事务状态未知 TRANSACTION_STATE_UNKNOW = 202 // Producer ProducerGroup错误 TRANSACTION_STATE_GROUP_WRONG = 203 // 单元化消息,需要设置 buyerId NO_BUYER_ID = 204 // 单元化消息,非本单元消息 NOT_IN_CURRENT_UNIT = 205 // Consumer不在线 CONSUMER_NOT_ONLINE = 206 // Consumer消费消息超时 CONSUME_MSG_TIMEOUT = 207 // 消息ID定长 Add: tianyuliang Since: 2017/4/18 MSG_ID_LENGTH = 8 + 8 // 属性key与value,在byte[]的分隔符 NAME_VALUE_SEPARATOR = 0x1 // 消息属性在byte[]的分隔符 PROPERTY_SEPARATOR = 0x2 // 消息key PROPERTY_KEYS = "KEYS" // 消息TAG PROPERTY_TAGS = "TAGS" )
View Source
const ( MEMORY_FIRST_THEN_STORE = 0 READ_FROM_MEMORY = 1 READ_FROM_STORE = 2 )
View Source
const (
CompressedFlag = (0x1 << 0)
)
Variables ¶
View Source
var Action = struct { CommitMessage int //消费成功 ReconsumeLater int //消费失败(消息进入重试队列,过一会儿消费) }{ 1, 2, }
Action 共享申请
View Source
var (
ConfigVersion int = -1
)
View Source
var ConsumeTypes = struct { Actively string //主动方式消费 Passively string //被动方式消费 }{ "CONSUME_ACTIVELY", "CONSUME_PASSIVELY", }
View Source
var DEFAULT_IP = GetLocalIp4()
Functions ¶
func GetLocalIp4 ¶
func GetLocalIp4() (ip string)
Types ¶
type Admin ¶
type Admin interface { SearchTopic() (*TopicData, error) SearchClusterInfo() (*ClusterInfo, error) CreateTopic(topicName, clusterName string, readQueueNum int, writeQueueNum int, order bool) (bool, error) QueryMessageById(msgId string) (*MessageExt, error) MessageTrackDetail(msg *MessageExt) ([]*MessageTrack, error) QueryConsumerGroupByTopic(topic string) ([]string, error) QueryConsumerProgress(consumeGroupId string) (*ConsumerProgress, error) QueryConsumerConnection(consumeGroupId string) (*ConsumerConnection, error) FetchBrokerRuntimeStats(brokerAddr string) (*BrokerRuntimeInfo, error) Close() error }
func NewDefaultAdmin ¶
type AllocateMessageQueueAveragely ¶
type AllocateMessageQueueAveragely struct{}
type AllocateMessageQueueStrategy ¶
type AllocateMessageQueueStrategy interface {
// contains filtered or unexported methods
}
type BrokerData ¶
type BrokerData struct { BrokerName string `json:"brokerName"` BrokerAddrs map[int64]string `json:"brokerAddrs"` BrokerAddrsLock sync.RWMutex `json:"-"` }
func (*BrokerData) MarshalJSON ¶
func (mj *BrokerData) MarshalJSON() ([]byte, error)
func (*BrokerData) MarshalJSONBuf ¶
func (mj *BrokerData) MarshalJSONBuf(buf fflib.EncodingBuffer) error
func (*BrokerData) UnmarshalJSON ¶
func (uj *BrokerData) UnmarshalJSON(input []byte) error
func (*BrokerData) UnmarshalJSONFFLexer ¶
func (uj *BrokerData) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
type BrokerRuntimeInfo ¶
type BrokerRuntimeInfo struct { BrokerVersionDesc string `json:"brokerVersionDesc"` BrokerVersion string `json:"brokerVersion"` MsgPutTotalYesterdayMorning string `json:"msgPutTotalYesterdayMorning"` MsgPutTotalTodayMorning string `json:"msgPutTotalTodayMorning"` MsgPutTotalTodayNow string `json:"msgPutTotalTodayNow"` MsgGetTotalYesterdayMorning string `json:"msgGetTotalYesterdayMorning"` MsgGetTotalTodayNow string `json:"msgGetTotalTodayNow"` SendThreadPoolQueueSize string `json:"sendThreadPoolQueueSize"` SendThreadPoolQueueCapacity string `json:"sendThreadPoolQueueCapacity"` MsgGetTotalTodayMorning string `json:"msgGetTotalTodayMorning"` InTps float64 `json:"inTps"` OutTps float64 `json:"outTps"` }
type ClusterInfo ¶
type ClusterInfo struct { BrokerAddrTable map[string]BrokerData `json:"brokerAddrTable"` ClusterAddrTable map[string][]string `json:"clusterAddrTable"` }
ClusterInfo cluster info
func (*ClusterInfo) MarshalJSON ¶
func (mj *ClusterInfo) MarshalJSON() ([]byte, error)
func (*ClusterInfo) MarshalJSONBuf ¶
func (mj *ClusterInfo) MarshalJSONBuf(buf fflib.EncodingBuffer) error
func (*ClusterInfo) UnmarshalJSON ¶
func (uj *ClusterInfo) UnmarshalJSON(input []byte) error
func (*ClusterInfo) UnmarshalJSONFFLexer ¶
func (uj *ClusterInfo) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
type Connection ¶
type ConsumeStats ¶
type ConsumeStats struct { OffsetTable map[MessageQueue]OffsetWrapper `json:"offsetTable"` ConsumeTps int64 `json:"consumeTps"` }
func (*ConsumeStats) MarshalJSON ¶
func (mj *ConsumeStats) MarshalJSON() ([]byte, error)
func (*ConsumeStats) MarshalJSONBuf ¶
func (mj *ConsumeStats) MarshalJSONBuf(buf fflib.EncodingBuffer) error
func (*ConsumeStats) UnmarshalJSON ¶
func (uj *ConsumeStats) UnmarshalJSON(input []byte) error
func (*ConsumeStats) UnmarshalJSONFFLexer ¶
func (uj *ConsumeStats) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
type Consumer ¶
type Consumer interface { //Admin Start() error Shutdown() RegisterMessageListener(listener MessageListener) Subscribe(topic string, subExpression string) UnSubcribe(topic string) SendMessageBack(msg MessageExt, delayLevel int) error SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error // contains filtered or unexported methods }
type ConsumerConnection ¶
type ConsumerConnection struct { ConnectionSet []*Connection `json:"connectionSet"` SubscriptionTable map[string]*SubscriptionData `json:"subscriptionTable"` ConsumeType string `json:"consumeType"` MessageModel string `json:"messageModel"` ConsumeFromWhere string `json:"consumeFromWhere"` }
type ConsumerData ¶
type ConsumerGroup ¶
type ConsumerGroup struct { BrokerOffset int64 `json:"brokerOffset"` ConsumerOffset int64 `json:"consumerOffset"` Diff int64 `json:"diff"` MessageQueue }
type ConsumerIdSorter ¶
type ConsumerIdSorter []string
func (ConsumerIdSorter) Len ¶
func (self ConsumerIdSorter) Len() int
func (ConsumerIdSorter) Less ¶
func (self ConsumerIdSorter) Less(i, j int) bool
func (ConsumerIdSorter) Swap ¶
func (self ConsumerIdSorter) Swap(i, j int)
type ConsumerProgress ¶
type ConsumerProgress struct { GroupId string `json:"groupId"` Tps int64 `json:"tps"` Diff int64 `json:"diff"` List []*ConsumerGroup `json:"list"` }
type DefalutRemotingClient ¶
type DefalutRemotingClient struct {
// contains filtered or unexported fields
}
func (*DefalutRemotingClient) ScanResponseTable ¶
func (self *DefalutRemotingClient) ScanResponseTable()
type DefaultAdmin ¶
type DefaultAdmin struct {
// contains filtered or unexported fields
}
func (*DefaultAdmin) Close ¶
func (admin *DefaultAdmin) Close() error
func (*DefaultAdmin) CreateTopic ¶
func (admin *DefaultAdmin) CreateTopic(topicName, clusterName string, readQueueNum int, writeQueueNum int, order bool) (bool, error)
CreateTopic 创建topic
func (*DefaultAdmin) FetchBrokerRuntimeStats ¶
func (admin *DefaultAdmin) FetchBrokerRuntimeStats(brokerAddr string) (*BrokerRuntimeInfo, error)
FetchBrokerRuntimeStats 根据broker addr查询在线broker运行统计信息
func (*DefaultAdmin) MessageTrackDetail ¶
func (admin *DefaultAdmin) MessageTrackDetail(msg *MessageExt) ([]*MessageTrack, error)
MessageTrackDetail 根据msgid查询消息轨迹
func (*DefaultAdmin) QueryConsumerConnection ¶
func (admin *DefaultAdmin) QueryConsumerConnection(consumeGroupId string) (*ConsumerConnection, error)
QueryConsumerConnection 根据consumeGroupId查询消费进程
func (*DefaultAdmin) QueryConsumerGroupByTopic ¶
func (admin *DefaultAdmin) QueryConsumerGroupByTopic(topic string) ([]string, error)
QueryConsumerGroupByTopic 根据topic查询消费组
func (*DefaultAdmin) QueryConsumerProgress ¶
func (admin *DefaultAdmin) QueryConsumerProgress(consumeGroupId string) (*ConsumerProgress, error)
QueryConsumerProgress 根据consumeGroupId查询消费进度
func (*DefaultAdmin) QueryMessageById ¶
func (admin *DefaultAdmin) QueryMessageById(msgId string) (*MessageExt, error)
QueryMessageById 根据msgid查询消息详情
func (*DefaultAdmin) SearchClusterInfo ¶
func (admin *DefaultAdmin) SearchClusterInfo() (*ClusterInfo, error)
SearchClusterInfo 查询集群的信息
func (*DefaultAdmin) SearchTopic ¶
func (admin *DefaultAdmin) SearchTopic() (*TopicData, error)
GetTopic 查询topic列表接口
type DefaultConsumer ¶
type DefaultConsumer struct {
// contains filtered or unexported fields
}
func (*DefaultConsumer) RegisterMessageListener ¶
func (self *DefaultConsumer) RegisterMessageListener(messageListener MessageListener)
func (*DefaultConsumer) SendMessageBack ¶
func (self *DefaultConsumer) SendMessageBack(msg MessageExt, delayLevel int) error
func (*DefaultConsumer) SendMessageBack1 ¶
func (self *DefaultConsumer) SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error
func (*DefaultConsumer) Shutdown ¶
func (self *DefaultConsumer) Shutdown()
func (*DefaultConsumer) Start ¶
func (self *DefaultConsumer) Start() error
func (*DefaultConsumer) Subscribe ¶
func (self *DefaultConsumer) Subscribe(topic string, subExpression string)
func (*DefaultConsumer) UnSubcribe ¶
func (self *DefaultConsumer) UnSubcribe(topic string)
type GetConsumerConnectionListRequestHeader ¶
type GetConsumerConnectionListRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
}
type GetConsumerListByGroupRequestHeader ¶
type GetConsumerListByGroupRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
}
type GetConsumerListByGroupResponseBody ¶
type GetConsumerListByGroupResponseBody struct {
ConsumerIdList []string `json:"consumerIdList"`
}
type GetKVConfigResponseHeader ¶
type GetKVConfigResponseHeader struct {
Value string `json:"value"`
}
type GetRouteInfoRequestHeader ¶
type GetRouteInfoRequestHeader struct {
Topic string `json:"topic"`
}
type HeartbeatData ¶
type HeartbeatData struct { ClientId string ConsumerDataSet []*ConsumerData }
type InvokeCallback ¶
type InvokeCallback func(responseFuture *ResponseFuture)
type Message ¶
type MessageExt ¶
type MessageExt struct { Message QueueId int32 `json:"queueId"` StoreSize int32 `json:"storeSize"` QueueOffset int64 `json:"queueOffset"` SysFlag int32 `json:"sysFlag"` BornTimestamp int64 `json:"bornTimestamp"` BornHost string `json:"bornHost"` StoreTimestamp int64 `json:"storeTimestamp"` StoreHost string `json:"storeHost"` MsgId string `json:"msgId"` CommitLogOffset int64 `json:"commitLogOffset"` BodyCRC int32 `json:"bodyCRC"` ReconsumeTimes int32 `json:"reconsumeTimes"` PreparedTransactionOffset int64 `json:"preparedTransactionOffset"` }
type MessageListener ¶
type MessageListener func(msgs []*MessageExt) (int, error)
type MessageQueue ¶
type MessageQueue struct { Topic string `json:"topic"` BrokerName string `json:"brokerName"` QueueId int32 `json:"queueId"` }
func (*MessageQueue) MarshalJSON ¶
func (mj *MessageQueue) MarshalJSON() ([]byte, error)
func (*MessageQueue) MarshalJSONBuf ¶
func (mj *MessageQueue) MarshalJSONBuf(buf fflib.EncodingBuffer) error
func (*MessageQueue) UnmarshalJSON ¶
func (uj *MessageQueue) UnmarshalJSON(input []byte) error
func (*MessageQueue) UnmarshalJSONFFLexer ¶
func (uj *MessageQueue) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
type MessageQueues ¶
type MessageQueues []*MessageQueue
func (MessageQueues) Len ¶
func (self MessageQueues) Len() int
func (MessageQueues) Less ¶
func (self MessageQueues) Less(i, j int) bool
func (MessageQueues) Swap ¶
func (self MessageQueues) Swap(i, j int)
type MessageTrack ¶
type MqClient ¶
type MqClient struct {
// contains filtered or unexported fields
}
func NewMqClient ¶
func NewMqClient() *MqClient
type OffsetStore ¶
type OffsetStore interface {
// contains filtered or unexported methods
}
type OffsetWrapper ¶
type OffsetWrapper struct { BrokerOffset int64 `json:"brokerOffset"` ConsumerOffset int64 `json:"consumerOffset"` LastTimestamp int64 `json:"lastTimestamp"` // 消费的最后一条消息对应的时间戳 }
func (*OffsetWrapper) MarshalJSON ¶
func (mj *OffsetWrapper) MarshalJSON() ([]byte, error)
func (*OffsetWrapper) MarshalJSONBuf ¶
func (mj *OffsetWrapper) MarshalJSONBuf(buf fflib.EncodingBuffer) error
func (*OffsetWrapper) UnmarshalJSON ¶
func (uj *OffsetWrapper) UnmarshalJSON(input []byte) error
func (*OffsetWrapper) UnmarshalJSONFFLexer ¶
func (uj *OffsetWrapper) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
type PullMessageRequestHeader ¶
type PullMessageRequestHeader struct { ConsumerGroup string `json:"consumerGroup"` Topic string `json:"topic"` QueueId int32 `json:"queueId"` QueueOffset int64 `json:"queueOffset"` MaxMsgNums int32 `json:"maxMsgNums"` SysFlag int32 `json:"sysFlag"` CommitOffset int64 `json:"commitOffset"` SuspendTimeoutMillis int64 `json:"suspendTimeoutMillis"` Subscription string `json:"subscription"` SubVersion int64 `json:"subVersion"` }
type PullMessageService ¶
type PullMessageService struct {
// contains filtered or unexported fields
}
func NewPullMessageService ¶
func NewPullMessageService() *PullMessageService
type PullRequest ¶
type PullRequest struct {
// contains filtered or unexported fields
}
type QueryTopicConsumeByWhoRequestHeader ¶
type QueryTopicConsumeByWhoRequestHeader struct {
Topic string `json:"topic"`
}
type QueueData ¶
type QueueData struct { BrokerName string ReadQueueNums int32 WriteQueueNums int32 Perm int32 TopicSynFlag int32 }
func (*QueueData) MarshalJSON ¶
func (*QueueData) MarshalJSONBuf ¶
func (mj *QueueData) MarshalJSONBuf(buf fflib.EncodingBuffer) error
func (*QueueData) UnmarshalJSON ¶
func (*QueueData) UnmarshalJSONFFLexer ¶
type Rebalance ¶
type Rebalance struct {
// contains filtered or unexported fields
}
func NewRebalance ¶
func NewRebalance() *Rebalance
type RemoteOffsetStore ¶
type RemoteOffsetStore struct {
// contains filtered or unexported fields
}
type RemotingClient ¶
type RemotingClient interface { ScanResponseTable() // contains filtered or unexported methods }
func NewDefaultRemotingClient ¶
func NewDefaultRemotingClient() RemotingClient
type RemotingCommand ¶
type RemotingCommand struct { //header Code int `json:"code"` Language string `json:"language"` Version int `json:"version"` Opaque int32 `json:"opaque"` Flag int `json:"flag"` ExtFields interface{} `json:"extFields"` //body Body []byte `json:"body,omitempty"` // contains filtered or unexported fields }
type ResponseFuture ¶
type ResponseFuture struct {
// contains filtered or unexported fields
}
type SubscriptionData ¶
type TopicRouteData ¶
type TopicRouteData struct { OrderTopicConf string QueueDatas []*QueueData BrokerDatas []*BrokerData }
func (*TopicRouteData) MarshalJSON ¶
func (mj *TopicRouteData) MarshalJSON() ([]byte, error)
func (*TopicRouteData) MarshalJSONBuf ¶
func (mj *TopicRouteData) MarshalJSONBuf(buf fflib.EncodingBuffer) error
func (*TopicRouteData) UnmarshalJSON ¶
func (uj *TopicRouteData) UnmarshalJSON(input []byte) error
func (*TopicRouteData) UnmarshalJSONFFLexer ¶
func (uj *TopicRouteData) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
type UpdateConsumerOffsetRequestHeader ¶
type UpdateConsumerOffsetRequestHeader struct {
// contains filtered or unexported fields
}
type ViewMessageRequestHeader ¶
type ViewMessageRequestHeader struct {
Offset uint64 `json:"offset"`
}
Source Files ¶
- admin.go
- broker_runtime_info.go
- cluster_info_ffjson.go
- consume_stats_ffjson.go
- consumer.go
- consumer_connection.go
- consumer_group.go
- log.go
- message.go
- message_queue.go
- message_track.go
- mqclient.go
- namespace.go
- net.go
- pull_message.go
- rebalance.go
- remote_cmd.go
- remoting_client.go
- request.go
- request_code.go
- response.go
- response_code.go
- store.go
- topic_route_ffjson.go
- util.go
Click to show internal directories.
Click to hide internal directories.