message

package
v2.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2019 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Compress                = 1
	MultiTags               = 1 << 1
	TransactionNotType      = 0
	TransactionPreparedType = 0x1 << 2
	TransactionCommitType   = 0x2 << 2
	TransactionRollbackType = 0x3 << 2
)

predefined tags

View Source
const (
	PropertyKeys                      = "KEYS"
	PropertyTags                      = "TAGS"
	PropertyWaitStoreMsgOK            = "WAIT"
	PropertyDelayTimeLevel            = "DELAY"
	PropertyRetryTopic                = "RETRY_TOPIC"
	PropertyRealTopic                 = "REAL_TOPIC"
	PropertyRealQueueID               = "REAL_QID"
	PropertyTransactionPrepared       = "TRAN_MSG"
	PropertyProducerGroup             = "PGROUP"
	PropertyMinOffset                 = "MIN_OFFSET"
	PropertyMaxOffset                 = "MAX_OFFSET"
	PropertyBuyerID                   = "BUYER_ID"
	PropertyOriginMessageID           = "ORIGIN_MESSAGE_ID"
	PropertyTransferFlag              = "TRANSFER_FLAG"
	PropertyCorrectionFlag            = "CORRECTION_FLAG"
	PropertyMQ2Flag                   = "MQ2_FLAG"
	PropertyReconsumeTime             = "RECONSUME_TIME"
	PropertyMsgRegion                 = "MSG_REGION"
	PropertyTraceSwitch               = "TRACE_ON"
	PropertyUniqClientMessageIDKeyidx = "UNIQ_KEY"
	PropertyMaxReconsumeTimes         = "MAX_RECONSUME_TIMES"
	PropertyConsumeStartTimestamp     = "CONSUME_START_TIME"

	KeySep = " "
)

predefined keys

View Source
const (
	MagicCodePostion      = 4
	FlagPostion           = 16
	PhysicOffsetPostion   = 28
	StoreTimestampPostion = 56
	MagicCode             = 0xAABBCCDD ^ 1880681586 + 8
	BodySizePosition      = 4 +
		4 +
		4 +
		4 +
		4 +
		8 +
		8 +
		4 +
		8 +
		8 +
		8 +
		8 +
		4 +
		8 /* 14 Prepared Transaction Offset */

	NameValueSep = byte(1)
	PropertySep  = byte(2)
)

predefined consts

Variables

This section is empty.

Functions

func CreateMessageID

func CreateMessageID(storeHost *Addr, commitOffset int64) string

CreateMessageID create id using store host address and the message commited offset returns the string of length 32

func CreateUniqID

func CreateUniqID() string

CreateUniqID returns the global unique id

func GetUniqID

func GetUniqID(properties map[string]string) string

GetUniqID returns the unique id from the properties

func IsMessageID

func IsMessageID(id string) bool

IsMessageID returns true if the id follows the rules: 1. the length is 32 2. the character is the hex character

func Properties2Bytes

func Properties2Bytes(properties map[string]string) []byte

Properties2Bytes converts properties to byte array

func Properties2String

func Properties2String(properties map[string]string) string

Properties2String converts properties to string

func SortQueue

func SortQueue(queues []*Queue)

SortQueue sort the consume queues

func String2Properties

func String2Properties(properties string) map[string]string

String2Properties converts string to map

Types

type Addr

type Addr struct {
	Host []byte
	Port uint16
}

Addr the ip address

func ParseMessageID

func ParseMessageID(id string) (addr Addr, commitOffset int64, err error)

ParseMessageID parse the id and get the ip address and commit offset

func (*Addr) String

func (addr *Addr) String() string

type Batch

type Batch struct {
	Topic      string
	Datas      []Data
	Flag       int32
	Properties map[string]string
}

Batch the batch message representation

func (*Batch) ToMessage

func (b *Batch) ToMessage() (*Message, error)

ToMessage convert to the message

type Data

type Data struct {
	Body       []byte
	Properties map[string]string
	Flag       int32
}

Data the data in the batch

type Ext

type Ext struct {
	Message
	StoreSize                 int32
	QueueOffset               int64
	SysFlag                   int32
	BornTimestamp             int64
	BornHost                  Addr
	StoreTimestamp            int64
	StoreHost                 Addr
	MsgID                     string
	CommitLogOffset           int64
	BodyCRC                   int32
	ReconsumeTimes            int32
	PreparedTransactionOffset int64
	QueueID                   uint8
}

Ext the message presentation with storage information

func Decode

func Decode(d []byte) (msgs []*Ext, err error)

Decode decodes the bytes to messages

func (*Ext) String

func (m *Ext) String() string

type GUID

type GUID struct {
	// contains filtered or unexported fields
}

GUID the global unique id generator

func NewGenerator

func NewGenerator() *GUID

NewGenerator creates the guid generator

func (*GUID) Create

func (g *GUID) Create() string

Create create new global unique id hex-encoded string with length 32

fixString + hex-encoded(id)

the id's content is following:

|<- unix time ->|<- increment num ->| +---------------+-------------------+ | 4 bytes | 3 bytes | +---------------+-------------------+

type Message

type Message struct {
	Topic      string
	Body       []byte
	Properties map[string]string
	Flag       int32
}

Message the message

func (*Message) ClearProperty

func (m *Message) ClearProperty(k string)

ClearProperty remove the property

func (*Message) GetConsumeStartTimestamp

func (m *Message) GetConsumeStartTimestamp() (timestamp int64, ok bool)

GetConsumeStartTimestamp returns the property of the consuming start timestamp

func (*Message) GetDelayTimeLevel

func (m *Message) GetDelayTimeLevel() int

GetDelayTimeLevel returns the property of the delay time level

func (*Message) GetProperty

func (m *Message) GetProperty(k string) string

GetProperty get the property byt the specify key

func (*Message) GetTags

func (m *Message) GetTags() string

GetTags return the property of the tags

func (*Message) GetUniqID

func (m *Message) GetUniqID() string

GetUniqID returns the unique id from the properties

func (*Message) GetWaitStoreMsgOK

func (m *Message) GetWaitStoreMsgOK() bool

GetWaitStoreMsgOK returns the property of the waiting store msag ok

func (*Message) PutProperty

func (m *Message) PutProperty(k, v string)

PutProperty update property

func (*Message) SetConsumeStartTimestamp

func (m *Message) SetConsumeStartTimestamp(timestamp int64)

SetConsumeStartTimestamp update the property of the consuming start timestamp

func (*Message) SetDelayTimeLevel

func (m *Message) SetDelayTimeLevel(l int)

SetDelayTimeLevel update the property of the delay time level

func (*Message) SetKey

func (m *Message) SetKey(keys string)

SetKey update keys

func (*Message) SetKeys

func (m *Message) SetKeys(ks []string)

SetKeys update the property of the keys with multi-value, split with space

func (*Message) SetTags

func (m *Message) SetTags(tags string)

SetTags set the property of the tags

func (*Message) SetUniqID

func (m *Message) SetUniqID(uniqID string)

SetUniqID set the unique id

func (*Message) SetWaitStoreMsgOK

func (m *Message) SetWaitStoreMsgOK(ok bool)

SetWaitStoreMsgOK update the property of the waiting store msg ok

func (*Message) String

func (m *Message) String() string

type Queue

type Queue struct {
	Topic      string
	BrokerName string
	QueueID    uint8
}

Queue the consume queue in the broker

func (*Queue) HashKey

func (q *Queue) HashKey() string

HashKey returns the hash key of the instance

func (*Queue) String

func (q *Queue) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL