message

package
v0.0.0-...-ba2213e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2019 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// 消息关键词,多个Key用KEY_SEPARATOR隔开(查询消息使用)
	PROPERTY_KEYS = "KEYS"

	// 消息标签,只支持设置一个Tag(服务端消息过滤使用)
	PROPERTY_TAGS = "TAGS"

	// 是否等待服务器将消息存储完毕再返回(可能是等待刷盘完成或者等待同步复制到其他服务器)
	PROPERTY_WAIT_STORE_MSG_OK = "WAIT"

	// 消息延时投递时间级别,0表示不延时,大于0表示特定延时级别(具体级别在服务器端定义)
	PROPERTY_DELAY_TIME_LEVEL = "DELAY"

	// 内部使用
	PROPERTY_RETRY_TOPIC          = "RETRY_TOPIC"
	PROPERTY_REAL_TOPIC           = "REAL_TOPIC"
	PROPERTY_REAL_QUEUE_ID        = "REAL_QID"
	PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"
	PROPERTY_PRODUCER_GROUP       = "PGROUP"
	PROPERTY_MIN_OFFSET           = "MIN_OFFSET"
	PROPERTY_MAX_OFFSET           = "MAX_OFFSET"
	PROPERTY_BUYER_ID             = "BUYER_ID"
	PROPERTY_ORIGIN_MESSAGE_ID    = "ORIGIN_MESSAGE_ID"
	PROPERTY_TRANSFER_FLAG        = "TRANSFER_FLAG"
	PROPERTY_CORRECTION_FLAG      = "CORRECTION_FLAG"
	PROPERTY_MQ2_FLAG             = "MQ2_FLAG"
	PROPERTY_RECONSUME_TIME       = "RECONSUME_TIME"
	KEY_SEPARATOR                 = " "
)
View Source
const (
	// 消息ID定长
	MSG_ID_LENGTH = 8 + 8

	// 存储记录各个字段位置
	MessageMagicCodePostion      = 4
	MessageFlagPostion           = 16
	MessagePhysicOffsetPostion   = 28
	MessageStoreTimestampPostion = 56

	// 序列化消息属性
	NAME_VALUE_SEPARATOR = 1
	PROPERTY_SEPARATOR   = 2
)

MessageDecoder: 消息解码 Author: yintongqiang Since: 2017/8/16

Variables

View Source
var (
	MessageMagicCode = 0xAABBCCDD ^ 1880681586 + 8
)

Functions

func Bytes2messageProperties

func Bytes2messageProperties(propertiesBuf []byte) map[string]string

func ClearProperty

func ClearProperty(msg *Message, name string)

func CreateMessageId

func CreateMessageId(addr string, offset int64) (string, error)

CreateMessageId 解析消息msgId字段addr是host:port

func GetOriginMessageId

func GetOriginMessageId(msg Message) string

func GetReconsumeTime

func GetReconsumeTime(msg *Message) string

func JoinHostPort

func JoinHostPort(hostBytes []byte, port int32) string

JoinHostPort 连接host:port

func MessageProperties2Bytes

func MessageProperties2Bytes(properties map[string]string) []byte

func MessageProperties2String

func MessageProperties2String(properties map[string]string) string

修复string不可见字符问题,使用[]byte Modify: jerrylou, <gunsluo@gmail.com> Since: 2017-08-25

func ParseTopicFilterType

func ParseTopicFilterType(sysFlag int32) stgcommon.TopicFilterType

func PutProperty

func PutProperty(msg *Message, name string, value string)

func SetOriginMessageId

func SetOriginMessageId(msg *Message, originMessageId string)

func SetProperties

func SetProperties(msg *Message, name string, value string)

func SetPropertiesMap

func SetPropertiesMap(msg *Message, properties map[string]string)

func SetReconsumeTime

func SetReconsumeTime(msg *Message, reconsumeTimes string)

func SplitHostPort

func SplitHostPort(addr string) (string, int32, error)

SplitHostPort 解析host:port

func String2messageProperties

func String2messageProperties(propertiesStr string) map[string]string

修复string不可见字符问题,使用[]byte Modify: jerrylou, <gunsluo@gmail.com> Since: 2017-08-25

Types

type Message

type Message struct {
	Topic      string            // 消息主题
	Flag       int32             // 消息标志,系统不做干预,完全由应用决定如何使用
	Properties map[string]string // 消息属性,都是系统属性,禁止应用设置
	Body       []byte            // 消息体
}

Message: 消息结构体 Author: yintongqiang Since: 2017/8/9

func NewMessage

func NewMessage(topic string, tags string, body []byte) *Message

func (*Message) ClearProperty

func (msg *Message) ClearProperty(name string)

func (*Message) GetKeys

func (self *Message) GetKeys() string

func (*Message) GetOriginMessageID

func (self *Message) GetOriginMessageID() string

func (*Message) GetProperty

func (self *Message) GetProperty(name string) string

func (*Message) GetTags

func (self *Message) GetTags() string

func (*Message) PutProperty

func (self *Message) PutProperty(name string, value string)

func (*Message) SetDelayTimeLevel

func (self *Message) SetDelayTimeLevel(level int)

func (*Message) SetKeys

func (self *Message) SetKeys(keys string)

func (*Message) SetTags

func (self *Message) SetTags(tags string)

func (*Message) SetWaitStoreMsgOK

func (self *Message) SetWaitStoreMsgOK(waitStoreMsgOK bool)

type MessageExt

type MessageExt struct {
	Message                          // 消息结构体
	QueueId                   int32  // 队列ID<PUT>
	StoreSize                 int32  // 存储记录大小
	QueueOffset               int64  // 队列偏移量
	SysFlag                   int32  // 消息标志位 <PUT>
	BornTimestamp             int64  // 消息在客户端创建时间戳 <PUT>
	BornHost                  string // 消息来自哪里 <PUT>
	StoreTimestamp            int64  // 消息在服务器存储时间戳
	StoreHost                 string // 消息存储在哪个服务器 <PUT>
	MsgId                     string // 消息ID
	CommitLogOffset           int64  // 消息对应的Commit Log Offset
	BodyCRC                   int32  // 消息体CRC
	ReconsumeTimes            int32  // 当前消息被某个订阅组重新消费了几次(订阅组之间独立计数)
	PreparedTransactionOffset int64  // 事务预处理偏移量
}

MessageExt 消息拓展结构体

func DecodeMessageExt

func DecodeMessageExt(buffer []byte, isReadBody, isCompressBody bool) (*MessageExt, error)

DecodeMessageExt 解析消息体,返回MessageExt

func DecodesMessageExt

func DecodesMessageExt(buffer []byte, isReadBody bool) ([]*MessageExt, error)

DecodesMessageExt 解析消息体,返回多个消息

func (*MessageExt) Encode

func (msgExt *MessageExt) Encode() ([]byte, error)

Encode 编码MessageExt

func (*MessageExt) ToString

func (m *MessageExt) ToString() string

ToString 打印消息Message的数据

type MessageId

type MessageId struct {
	Address string // 消息落地存储,角色为storeHost对应的brokerAddr
	Offset  uint64 // 消息落地存储,物理偏移量, 即 physicOffset、commitLogOffset
}

func DecodeMessageId

func DecodeMessageId(msgId string) (*MessageId, error)

DecodeMessageId 解析messageId Author: jerrylou, <gunsluo@gmail.com> Since: 2017-08-23

type MessageQueue

type MessageQueue struct {
	Topic      string `json:"topic"`
	BrokerName string `json:"brokerName"`
	QueueId    int    `json:"queueId"`
}

func NewDefaultMessageQueue

func NewDefaultMessageQueue(topic, brokerName string, queueId int) *MessageQueue

func NewMessageQueue

func NewMessageQueue() *MessageQueue

func (MessageQueue) Equal

func (mq MessageQueue) Equal(other MessageQueue) bool

func (MessageQueue) Equals

func (mq MessageQueue) Equals(v interface{}) bool

func (MessageQueue) HashBytes

func (mq MessageQueue) HashBytes() []byte

func (MessageQueue) Key

func (mq MessageQueue) Key() string

func (MessageQueue) ToString

func (mq MessageQueue) ToString() string

type MessageQueues

type MessageQueues []*MessageQueue

func (MessageQueues) Len

func (self MessageQueues) Len() int

func (MessageQueues) Less

func (self MessageQueues) Less(i, j int) bool

func (MessageQueues) Swap

func (self MessageQueues) Swap(i, j int)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL