Documentation
¶
Index ¶
- func DeserializeToMQWrapperID(msgID []byte, walName string) (common.MessageID, error)
- func MustGetCommonpbMsgTypeFromMessageType(t message.MessageType) commonpb.MsgType
- func MustGetMQWrapperIDFromMessage(messageID message.MessageID) common.MessageID
- func MustGetMessageIDFromMQWrapperID(commonMessageID common.MessageID) message.MessageID
- func MustGetMessageIDFromMQWrapperIDBytes(walName string, msgIDBytes []byte) message.MessageID
- func NewCreateSegmentMessageBody(msg message.ImmutableMessage) (msgstream.TsMsg, error)
- func NewFlushMessageBody(msg message.ImmutableMessage) (msgstream.TsMsg, error)
- func NewManualFlushMessageBody(msg message.ImmutableMessage) (msgstream.TsMsg, error)
- func NewMsgPackFromMessage(msgs ...message.ImmutableMessage) (*msgstream.MsgPack, error)
- func NewMsgPackFromMutableMessageV1(msg message.MutableMessage) (msgstream.TsMsg, error)
- type BaseMsgPackAdaptorHandler
- type CreateSegmentMessageBody
- func (t CreateSegmentMessageBody) ID() msgstream.UniqueID
- func (t CreateSegmentMessageBody) Marshal(msgstream.TsMsg) (msgstream.MarshalType, error)
- func (t CreateSegmentMessageBody) SetID(id msgstream.UniqueID)
- func (t CreateSegmentMessageBody) SetTs(ts uint64)
- func (t CreateSegmentMessageBody) Size() int
- func (t CreateSegmentMessageBody) SourceID() int64
- func (t CreateSegmentMessageBody) Type() commonpb.MsgType
- func (t CreateSegmentMessageBody) Unmarshal(msgstream.MarshalType) (msgstream.TsMsg, error)
- type FlushMessageBody
- func (t FlushMessageBody) ID() msgstream.UniqueID
- func (t FlushMessageBody) Marshal(msgstream.TsMsg) (msgstream.MarshalType, error)
- func (t FlushMessageBody) SetID(id msgstream.UniqueID)
- func (t FlushMessageBody) SetTs(ts uint64)
- func (t FlushMessageBody) Size() int
- func (t FlushMessageBody) SourceID() int64
- func (t FlushMessageBody) Type() commonpb.MsgType
- func (t FlushMessageBody) Unmarshal(msgstream.MarshalType) (msgstream.TsMsg, error)
- type ManualFlushMessageBody
- func (t ManualFlushMessageBody) ID() msgstream.UniqueID
- func (t ManualFlushMessageBody) Marshal(msgstream.TsMsg) (msgstream.MarshalType, error)
- func (t ManualFlushMessageBody) SetID(id msgstream.UniqueID)
- func (t ManualFlushMessageBody) SetTs(ts uint64)
- func (t ManualFlushMessageBody) Size() int
- func (t ManualFlushMessageBody) SourceID() int64
- func (t ManualFlushMessageBody) Type() commonpb.MsgType
- func (t ManualFlushMessageBody) Unmarshal(msgstream.MarshalType) (msgstream.TsMsg, error)
- type MsgPackAdaptorHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DeserializeToMQWrapperID ¶
DeserializeToMQWrapperID deserializes messageID bytes to common.MessageID TODO: should be removed in future after common.MessageID is removed
func MustGetCommonpbMsgTypeFromMessageType ¶
func MustGetCommonpbMsgTypeFromMessageType(t message.MessageType) commonpb.MsgType
MustGetCommonpbMsgTypeFromMessageType returns the commonpb.MsgType from message.MessageType.
func MustGetMQWrapperIDFromMessage ¶
MustGetMQWrapperIDFromMessage converts message.MessageID to common.MessageID TODO: should be removed in future after common.MessageID is removed
func MustGetMessageIDFromMQWrapperID ¶
MustGetMessageIDFromMQWrapperID converts common.MessageID to message.MessageID TODO: should be removed in future after common.MessageID is removed
func NewCreateSegmentMessageBody ¶
func NewCreateSegmentMessageBody(msg message.ImmutableMessage) (msgstream.TsMsg, error)
func NewFlushMessageBody ¶
func NewFlushMessageBody(msg message.ImmutableMessage) (msgstream.TsMsg, error)
func NewManualFlushMessageBody ¶
func NewManualFlushMessageBody(msg message.ImmutableMessage) (msgstream.TsMsg, error)
func NewMsgPackFromMessage ¶
func NewMsgPackFromMessage(msgs ...message.ImmutableMessage) (*msgstream.MsgPack, error)
FromMessageToMsgPack converts message to msgpack. Same TimeTick must be sent with same msgpack. !!! Msgs must be keep same time tick. TODO: remove this function after remove the msgstream implementation.
func NewMsgPackFromMutableMessageV1 ¶
func NewMsgPackFromMutableMessageV1(msg message.MutableMessage) (msgstream.TsMsg, error)
Types ¶
type BaseMsgPackAdaptorHandler ¶
type BaseMsgPackAdaptorHandler struct { Logger *log.MLogger Channel chan *msgstream.MsgPack Pendings []message.ImmutableMessage // pendings hold the vOld message which has same time tick. PendingMsgPack *typeutil.MultipartQueue[*msgstream.MsgPack] // pendingMsgPack hold unsent msgPack. }
BaseMsgPackAdaptorHandler is the handler for message pack.
func NewBaseMsgPackAdaptorHandler ¶
func NewBaseMsgPackAdaptorHandler() *BaseMsgPackAdaptorHandler
NewBaseMsgPackAdaptorHandler create a new base message pack adaptor handler.
func (*BaseMsgPackAdaptorHandler) GenerateMsgPack ¶
func (m *BaseMsgPackAdaptorHandler) GenerateMsgPack(msg message.ImmutableMessage)
GenerateMsgPack generate msgPack from message.
type CreateSegmentMessageBody ¶
type CreateSegmentMessageBody struct { CreateSegmentMessage message.ImmutableCreateSegmentMessageV2 // contains filtered or unexported fields }
func (CreateSegmentMessageBody) Marshal ¶
func (t CreateSegmentMessageBody) Marshal(msgstream.TsMsg) (msgstream.MarshalType, error)
type FlushMessageBody ¶
type FlushMessageBody struct { FlushMessage message.ImmutableFlushMessageV2 // contains filtered or unexported fields }
type ManualFlushMessageBody ¶
type ManualFlushMessageBody struct { ManualFlushMessage message.ImmutableManualFlushMessageV2 // contains filtered or unexported fields }
type MsgPackAdaptorHandler ¶
type MsgPackAdaptorHandler struct {
// contains filtered or unexported fields
}
MsgPackAdaptorHandler is the handler for message pack.
func NewMsgPackAdaptorHandler ¶
func NewMsgPackAdaptorHandler() *MsgPackAdaptorHandler
NewMsgPackAdaptorHandler create a new message pack adaptor handler.
func (*MsgPackAdaptorHandler) Chan ¶
func (m *MsgPackAdaptorHandler) Chan() <-chan *msgstream.MsgPack
Chan is the channel for message.
func (*MsgPackAdaptorHandler) Close ¶
func (m *MsgPackAdaptorHandler) Close()
Close is the callback for closing message.
func (*MsgPackAdaptorHandler) Handle ¶
func (m *MsgPackAdaptorHandler) Handle(ctx context.Context, msg message.ImmutableMessage) (bool, error)
Handle is the callback for handling message.