Documentation ¶
Index ¶
- Constants
- Variables
- func Bytes2messageProperties(propertiesBuf []byte) map[string]string
- func ClearProperty(msg *Message, name string)
- func CreateMessageId(addr string, offset int64) (string, error)
- func GetOriginMessageId(msg Message) string
- func GetReconsumeTime(msg *Message) string
- func JoinHostPort(hostBytes []byte, port int32) string
- func MessageProperties2Bytes(properties map[string]string) []byte
- func MessageProperties2String(properties map[string]string) string
- func ParseTopicFilterType(sysFlag int32) stgcommon.TopicFilterType
- func PutProperty(msg *Message, name string, value string)
- func SetOriginMessageId(msg *Message, originMessageId string)
- func SetProperties(msg *Message, name string, value string)
- func SetPropertiesMap(msg *Message, properties map[string]string)
- func SetReconsumeTime(msg *Message, reconsumeTimes string)
- func SplitHostPort(addr string) (string, int32, error)
- func String2messageProperties(propertiesStr string) map[string]string
- type Message
- func (msg *Message) ClearProperty(name string)
- func (self *Message) GetKeys() string
- func (self *Message) GetOriginMessageID() string
- func (self *Message) GetProperty(name string) string
- func (self *Message) GetTags() string
- func (self *Message) PutProperty(name string, value string)
- func (self *Message) SetDelayTimeLevel(level int)
- func (self *Message) SetKeys(keys string)
- func (self *Message) SetTags(tags string)
- func (self *Message) SetWaitStoreMsgOK(waitStoreMsgOK bool)
- type MessageExt
- type MessageId
- type MessageQueue
- type MessageQueues
Constants ¶
View Source
const ( // 消息关键词,多个Key用KEY_SEPARATOR隔开(查询消息使用) PROPERTY_KEYS = "KEYS" // 消息标签,只支持设置一个Tag(服务端消息过滤使用) PROPERTY_TAGS = "TAGS" // 是否等待服务器将消息存储完毕再返回(可能是等待刷盘完成或者等待同步复制到其他服务器) PROPERTY_WAIT_STORE_MSG_OK = "WAIT" // 消息延时投递时间级别,0表示不延时,大于0表示特定延时级别(具体级别在服务器端定义) PROPERTY_DELAY_TIME_LEVEL = "DELAY" // 内部使用 PROPERTY_RETRY_TOPIC = "RETRY_TOPIC" PROPERTY_REAL_TOPIC = "REAL_TOPIC" PROPERTY_REAL_QUEUE_ID = "REAL_QID" PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG" PROPERTY_PRODUCER_GROUP = "PGROUP" PROPERTY_MIN_OFFSET = "MIN_OFFSET" PROPERTY_MAX_OFFSET = "MAX_OFFSET" PROPERTY_BUYER_ID = "BUYER_ID" PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID" PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG" PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG" PROPERTY_MQ2_FLAG = "MQ2_FLAG" PROPERTY_RECONSUME_TIME = "RECONSUME_TIME" KEY_SEPARATOR = " " )
View Source
const ( // 消息ID定长 MSG_ID_LENGTH = 8 + 8 // 存储记录各个字段位置 MessageMagicCodePostion = 4 MessageFlagPostion = 16 MessagePhysicOffsetPostion = 28 MessageStoreTimestampPostion = 56 // 序列化消息属性 NAME_VALUE_SEPARATOR = 1 PROPERTY_SEPARATOR = 2 )
MessageDecoder: 消息解码 Author: yintongqiang Since: 2017/8/16
Variables ¶
View Source
var (
MessageMagicCode = 0xAABBCCDD ^ 1880681586 + 8
)
Functions ¶
func Bytes2messageProperties ¶
func ClearProperty ¶
func CreateMessageId ¶
CreateMessageId 解析消息msgId字段addr是host:port
func GetOriginMessageId ¶
func GetReconsumeTime ¶
func MessageProperties2Bytes ¶
func MessageProperties2String ¶
修复string不可见字符问题,使用[]byte Modify: jerrylou, <gunsluo@gmail.com> Since: 2017-08-25
func ParseTopicFilterType ¶
func ParseTopicFilterType(sysFlag int32) stgcommon.TopicFilterType
func PutProperty ¶
func SetOriginMessageId ¶
func SetProperties ¶
func SetPropertiesMap ¶
func SetReconsumeTime ¶
func SplitHostPort ¶
SplitHostPort 解析host:port
func String2messageProperties ¶
修复string不可见字符问题,使用[]byte Modify: jerrylou, <gunsluo@gmail.com> Since: 2017-08-25
Types ¶
type Message ¶
type Message struct { Topic string // 消息主题 Flag int32 // 消息标志,系统不做干预,完全由应用决定如何使用 Properties map[string]string // 消息属性,都是系统属性,禁止应用设置 Body []byte // 消息体 }
Message: 消息结构体 Author: yintongqiang Since: 2017/8/9
func (*Message) ClearProperty ¶
func (*Message) GetOriginMessageID ¶
func (*Message) GetProperty ¶
func (*Message) PutProperty ¶
func (*Message) SetDelayTimeLevel ¶
func (*Message) SetWaitStoreMsgOK ¶
type MessageExt ¶
type MessageExt struct { Message // 消息结构体 QueueId int32 // 队列ID<PUT> StoreSize int32 // 存储记录大小 QueueOffset int64 // 队列偏移量 SysFlag int32 // 消息标志位 <PUT> BornTimestamp int64 // 消息在客户端创建时间戳 <PUT> BornHost string // 消息来自哪里 <PUT> StoreTimestamp int64 // 消息在服务器存储时间戳 StoreHost string // 消息存储在哪个服务器 <PUT> MsgId string // 消息ID CommitLogOffset int64 // 消息对应的Commit Log Offset BodyCRC int32 // 消息体CRC ReconsumeTimes int32 // 当前消息被某个订阅组重新消费了几次(订阅组之间独立计数) PreparedTransactionOffset int64 // 事务预处理偏移量 }
MessageExt 消息拓展结构体
func DecodeMessageExt ¶
func DecodeMessageExt(buffer []byte, isReadBody, isCompressBody bool) (*MessageExt, error)
DecodeMessageExt 解析消息体,返回MessageExt
func DecodesMessageExt ¶
func DecodesMessageExt(buffer []byte, isReadBody bool) ([]*MessageExt, error)
DecodesMessageExt 解析消息体,返回多个消息
type MessageId ¶
type MessageId struct { Address string // 消息落地存储,角色为storeHost对应的brokerAddr Offset uint64 // 消息落地存储,物理偏移量, 即 physicOffset、commitLogOffset }
func DecodeMessageId ¶
DecodeMessageId 解析messageId Author: jerrylou, <gunsluo@gmail.com> Since: 2017-08-23
type MessageQueue ¶
type MessageQueue struct { Topic string `json:"topic"` BrokerName string `json:"brokerName"` QueueId int `json:"queueId"` }
func NewDefaultMessageQueue ¶
func NewDefaultMessageQueue(topic, brokerName string, queueId int) *MessageQueue
func NewMessageQueue ¶
func NewMessageQueue() *MessageQueue
func (MessageQueue) Equal ¶
func (mq MessageQueue) Equal(other MessageQueue) bool
func (MessageQueue) Equals ¶
func (mq MessageQueue) Equals(v interface{}) bool
func (MessageQueue) HashBytes ¶
func (mq MessageQueue) HashBytes() []byte
func (MessageQueue) Key ¶
func (mq MessageQueue) Key() string
func (MessageQueue) ToString ¶
func (mq MessageQueue) ToString() string
type MessageQueues ¶
type MessageQueues []*MessageQueue
func (MessageQueues) Len ¶
func (self MessageQueues) Len() int
func (MessageQueues) Less ¶
func (self MessageQueues) Less(i, j int) bool
func (MessageQueues) Swap ¶
func (self MessageQueues) Swap(i, j int)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.