Documentation ¶
Overview ¶
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func Bytes2messageProperties(propertiesBuf []byte) map[string]string
- func ClearProperty(msg *Message, name string)
- func CreateMessageId(addr string, offset int64) (string, error)
- func GetOriginMessageId(msg Message) string
- func GetReconsumeTime(msg *Message) string
- func JoinHostPort(hostBytes []byte, port int32) string
- func MessageProperties2Bytes(properties map[string]string) []byte
- func MessageProperties2String(properties map[string]string) string
- func PutProperty(msg *Message, name string, value string)
- func SetOriginMessageId(msg *Message, originMessageId string)
- func SetProperties(msg *Message, name string, value string)
- func SetPropertiesMap(msg *Message, properties map[string]string)
- func SetReconsumeTime(msg *Message, reconsumeTimes string)
- func SplitHostPort(addr string) (string, int32, error)
- func String2messageProperties(propertiesStr string) map[string]string
- type Message
- func (msg *Message) ClearProperty(name string)
- func (msg *Message) GetKeys() string
- func (msg *Message) GetOriginMessageID() string
- func (msg *Message) GetProperty(name string) string
- func (msg *Message) GetTags() string
- func (msg *Message) PutProperty(name string, value string)
- func (msg *Message) SetDelayTimeLevel(level int)
- func (msg *Message) SetKeys(keys string)
- func (msg *Message) SetTags(tags string)
- func (msg *Message) SetWaitStoreMsgOK(waitStoreMsgOK bool)
- type MessageExt
- type MessageId
- type MessageQueue
- type MessageQueues
Constants ¶
const ( // 消息关键词,多个Key用KEY_SEPARATOR隔开(查询消息使用) PROPERTY_KEYS = "KEYS" // 消息标签,只支持设置一个Tag(服务端消息过滤使用) PROPERTY_TAGS = "TAGS" // 是否等待服务器将消息存储完毕再返回(可能是等待刷盘完成或者等待同步复制到其他服务器) PROPERTY_WAIT_STORE_MSG_OK = "WAIT" // 消息延时投递时间级别,0表示不延时,大于0表示特定延时级别(具体级别在服务器端定义) PROPERTY_DELAY_TIME_LEVEL = "DELAY" // 内部使用 PROPERTY_RETRY_TOPIC = "RETRY_TOPIC" PROPERTY_REAL_TOPIC = "REAL_TOPIC" PROPERTY_REAL_QUEUE_ID = "REAL_QID" PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG" PROPERTY_PRODUCER_GROUP = "PGROUP" PROPERTY_MIN_OFFSET = "MIN_OFFSET" PROPERTY_MAX_OFFSET = "MAX_OFFSET" PROPERTY_BUYER_ID = "BUYER_ID" PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID" PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG" PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG" PROPERTY_MQ2_FLAG = "MQ2_FLAG" PROPERTY_RECONSUME_TIME = "RECONSUME_TIME" KEY_SEPARATOR = " " )
const ( // 消息ID定长 MSG_ID_LENGTH = 8 + 8 // 存储记录各个字段位置 MessageMagicCodePostion = 4 MessageFlagPostion = 16 MessagePhysicOffsetPostion = 28 MessageStoreTimestampPostion = 56 // 序列化消息属性 NAME_VALUE_SEPARATOR = 1 PROPERTY_SEPARATOR = 2 )
MessageDecoder: 消息解码 Author: yintongqiang Since: 2017/8/16
Variables ¶
var (
MessageMagicCode = 0xAABBCCDD ^ 1880681586 + 8
)
Functions ¶
func Bytes2messageProperties ¶
func ClearProperty ¶
func CreateMessageId ¶
CreateMessageId 解析消息msgId字段addr是host:port
func GetOriginMessageId ¶
func GetReconsumeTime ¶
Author: yintongqiang Since: 2017/8/16
func MessageProperties2Bytes ¶
func MessageProperties2String ¶
修复string不可见字符问题,使用[]byte Modify: jerrylou, <gunsluo@gmail.com> Since: 2017-08-25
func PutProperty ¶
func SetOriginMessageId ¶
func SetProperties ¶
func SetPropertiesMap ¶
func SetReconsumeTime ¶
func SplitHostPort ¶
SplitHostPort 解析host:port
func String2messageProperties ¶
修复string不可见字符问题,使用[]byte Modify: jerrylou, <gunsluo@gmail.com> Since: 2017-08-25
Types ¶
type Message ¶
type Message struct { Topic string // 消息主题 Flag int32 // 消息标志,系统不做干预,完全由应用决定如何使用 Properties map[string]string // 消息属性,都是系统属性,禁止应用设置 Body []byte // 消息体 }
Message: 消息结构体 Author: yintongqiang Since: 2017/8/9
func (*Message) ClearProperty ¶
func (*Message) GetOriginMessageID ¶
func (*Message) GetProperty ¶
func (*Message) PutProperty ¶
func (*Message) SetDelayTimeLevel ¶
func (*Message) SetWaitStoreMsgOK ¶
type MessageExt ¶
type MessageExt struct { Message // 消息结构体 QueueId int32 // 队列ID<PUT> StoreSize int32 // 存储记录大小 QueueOffset int64 // 队列偏移量 SysFlag int32 // 消息标志位 <PUT> BornTimestamp int64 // 消息在客户端创建时间戳 <PUT> BornHost string // 消息来自哪里 <PUT> StoreTimestamp int64 // 消息在服务器存储时间戳 StoreHost string // 消息存储在哪个服务器 <PUT> MsgId string // 消息ID CommitLogOffset int64 // 消息对应的Commit Log Offset BodyCRC int32 // 消息体CRC ReconsumeTimes int32 // 当前消息被某个订阅组重新消费了几次(订阅组之间独立计数) PreparedTransactionOffset int64 // 事务预处理偏移量 }
MessageExt 消息拓展结构体
func DecodeMessageExt ¶
func DecodeMessageExt(buffer []byte, isReadBody, isCompressBody bool) (*MessageExt, error)
DecodeMessageExt 解析消息体,返回MessageExt
func DecodesMessageExt ¶
func DecodesMessageExt(buffer []byte, isReadBody bool) ([]*MessageExt, error)
DecodesMessageExt 解析消息体,返回多个消息
type MessageId ¶
type MessageId struct { Address string // 消息落地存储,角色为storeHost对应的brokerAddr Offset uint64 // 消息落地存储,物理偏移量, 即 physicOffset、commitLogOffset }
func DecodeMessageId ¶
DecodeMessageId 解析messageId Author: jerrylou, <gunsluo@gmail.com> Since: 2017-08-23
type MessageQueue ¶
type MessageQueue struct { Topic string `json:"topic"` BrokerName string `json:"brokerName"` QueueId int `json:"queueId"` }
func NewMessageQueue ¶
func NewMessageQueue(topic, brokerName string, queueId int) *MessageQueue
func (*MessageQueue) Equal ¶
func (mq *MessageQueue) Equal(other *MessageQueue) bool
func (*MessageQueue) Equals ¶
func (mq *MessageQueue) Equals(v interface{}) bool
func (*MessageQueue) HashBytes ¶
func (mq *MessageQueue) HashBytes() []byte
func (*MessageQueue) Key ¶
func (mq *MessageQueue) Key() string
func (*MessageQueue) String ¶
func (mq *MessageQueue) String() string
type MessageQueues ¶
type MessageQueues []*MessageQueue
func (MessageQueues) Len ¶
func (mqs MessageQueues) Len() int
func (MessageQueues) Less ¶
func (mqs MessageQueues) Less(i, j int) bool
func (MessageQueues) Swap ¶
func (mqs MessageQueues) Swap(i, j int)