Documentation ¶
Index ¶
- Variables
- func DecodeInt64(value string) (int64, error)
- func DecodeProto(data string, m proto.Message) error
- func DecodeUint64(value string) (uint64, error)
- func EncodeInt64(value int64) string
- func EncodeProto(m proto.Message) (string, error)
- func EncodeUint64(value uint64) string
- func RegisterMessageIDUnmsarshaler(name string, unmarshaler MessageIDUnmarshaler)
- type BasicMessage
- type BeginTxnMessageBody
- type BeginTxnMessageHeader
- type BroadcastMutableMessage
- type ChanMessageHandler
- type CommitTxnMessageBody
- type CommitTxnMessageHeader
- type CreateCollectionMessageHeader
- type CreatePartitionMessageHeader
- type CreateSegmentMessageBody
- type CreateSegmentMessageHeader
- type DeleteMessageHeader
- type DropCollectionMessageHeader
- type DropPartitionMessageHeader
- type FlushMessageBody
- type FlushMessageHeader
- type Handler
- type ImmutableBeginTxnMessageV2
- type ImmutableCommitTxnMessageV2
- type ImmutableCreateCollectionMessageV1
- type ImmutableCreatePartitionMessageV1
- type ImmutableCreateSegmentMessageV2
- type ImmutableDeleteMessageV1
- type ImmutableDropCollectionMessageV1
- type ImmutableDropPartitionMessageV1
- type ImmutableFlushMessageV2
- type ImmutableInsertMessageV1
- type ImmutableManualFlushMessageV2
- type ImmutableMessage
- type ImmutableRollbackTxnMessageV2
- type ImmutableTimeTickMessageV1
- type ImmutableTxnMessage
- type ImmutableTxnMessageBuilder
- func (b *ImmutableTxnMessageBuilder) Add(msg ImmutableMessage) *ImmutableTxnMessageBuilder
- func (b *ImmutableTxnMessageBuilder) Build(commit ImmutableCommitTxnMessageV2) (ImmutableTxnMessage, error)
- func (b *ImmutableTxnMessageBuilder) EstimateSize() int
- func (b *ImmutableTxnMessageBuilder) ExpiredTimeTick() uint64
- type InsertMessageHeader
- type ManualFlushExtraResponse
- type ManualFlushMessageBody
- type ManualFlushMessageHeader
- type MessageID
- type MessageIDUnmarshaler
- type MessageType
- type MutableBeginTxnMessageV2
- type MutableCommitTxnMessageV2
- type MutableCreateCollectionMessageV1
- type MutableCreatePartitionMessageV1
- type MutableCreateSegmentMessageV2
- type MutableDeleteMessageV1
- type MutableDropCollectionMessageV1
- type MutableDropPartitionMessageV1
- type MutableFlushMessageV2
- type MutableInsertMessageV1
- type MutableMessage
- type MutableRollbackTxnMessageV2
- type MutableTimeTickMessageV1
- type PartitionSegmentAssignment
- type Properties
- type RProperties
- type RollbackTxnMessageBody
- type RollbackTxnMessageHeader
- type SegmentAssignment
- type TimeTickMessageHeader
- type TxnContext
- type TxnID
- type TxnMessageBody
- type TxnMessageHeader
- type TxnState
- type Version
Constants ¶
This section is empty.
Variables ¶
var ( NewTimeTickMessageBuilderV1 = createNewMessageBuilderV1[*TimeTickMessageHeader, *msgpb.TimeTickMsg]() NewInsertMessageBuilderV1 = createNewMessageBuilderV1[*InsertMessageHeader, *msgpb.InsertRequest]() NewDeleteMessageBuilderV1 = createNewMessageBuilderV1[*DeleteMessageHeader, *msgpb.DeleteRequest]() NewCreateCollectionMessageBuilderV1 = createNewMessageBuilderV1[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]() NewDropCollectionMessageBuilderV1 = createNewMessageBuilderV1[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]() NewCreatePartitionMessageBuilderV1 = createNewMessageBuilderV1[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]() NewDropPartitionMessageBuilderV1 = createNewMessageBuilderV1[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]() NewCreateSegmentMessageBuilderV2 = createNewMessageBuilderV2[*CreateSegmentMessageHeader, *CreateSegmentMessageBody]() NewFlushMessageBuilderV2 = createNewMessageBuilderV2[*FlushMessageHeader, *FlushMessageBody]() NewManualFlushMessageBuilderV2 = createNewMessageBuilderV2[*ManualFlushMessageHeader, *ManualFlushMessageBody]() NewBeginTxnMessageBuilderV2 = createNewMessageBuilderV2[*BeginTxnMessageHeader, *BeginTxnMessageBody]() NewCommitTxnMessageBuilderV2 = createNewMessageBuilderV2[*CommitTxnMessageHeader, *CommitTxnMessageBody]() NewRollbackTxnMessageBuilderV2 = createNewMessageBuilderV2[*RollbackTxnMessageHeader, *RollbackTxnMessageBody]() )
List all type-safe mutable message builders here.
var ( AsMutableTimeTickMessageV1 = asSpecializedMutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg] AsMutableInsertMessageV1 = asSpecializedMutableMessage[*InsertMessageHeader, *msgpb.InsertRequest] AsMutableDeleteMessageV1 = asSpecializedMutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest] AsMutableCreateCollectionMessageV1 = asSpecializedMutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest] AsMutableDropCollectionMessageV1 = asSpecializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] AsMutableCreatePartitionMessageV1 = asSpecializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] AsMutableDropPartitionMessageV1 = asSpecializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] AsMutableCreateSegmentMessageV2 = asSpecializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] AsMutableFlushMessageV2 = asSpecializedMutableMessage[*FlushMessageHeader, *FlushMessageBody] AsMutableManualFlushMessageV2 = asSpecializedMutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] AsMutableBeginTxnMessageV2 = asSpecializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] AsMutableCommitTxnMessageV2 = asSpecializedMutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] AsMutableRollbackTxnMessageV2 = asSpecializedMutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] AsImmutableTimeTickMessageV1 = asSpecializedImmutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg] AsImmutableInsertMessageV1 = asSpecializedImmutableMessage[*InsertMessageHeader, *msgpb.InsertRequest] AsImmutableDeleteMessageV1 = asSpecializedImmutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest] AsImmutableCreateCollectionMessageV1 = asSpecializedImmutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest] AsImmutableDropCollectionMessageV1 = asSpecializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] AsImmutableCreatePartitionMessageV1 = asSpecializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] AsImmutableDropPartitionMessageV1 = asSpecializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] AsImmutableCreateSegmentMessageV2 = asSpecializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] AsImmutableFlushMessageV2 = asSpecializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody] AsImmutableManualFlushMessageV2 = asSpecializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] AsImmutableBeginTxnMessageV2 = asSpecializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] AsImmutableCommitTxnMessageV2 = asSpecializedImmutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] AsImmutableRollbackTxnMessageV2 = asSpecializedImmutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] AsImmutableTxnMessage = func(msg ImmutableMessage) ImmutableTxnMessage { underlying, ok := msg.(*immutableTxnMessageImpl) if !ok { return nil } return underlying } )
List all as functions for specialized messages.
var (
ErrInvalidMessageID = errors.New("invalid message id")
)
Functions ¶
func DecodeInt64 ¶
DecodeInt64 decodes string to int64.
func DecodeUint64 ¶
DecodeUint64 decodes string to uint64.
func EncodeProto ¶
EncodeProto encodes proto message to json string.
func RegisterMessageIDUnmsarshaler ¶
func RegisterMessageIDUnmsarshaler(name string, unmarshaler MessageIDUnmarshaler)
RegisterMessageIDUnmsarshaler register the message id unmarshaler.
Types ¶
type BasicMessage ¶
type BasicMessage interface { // MessageType returns the type of message. MessageType() MessageType // Version returns the message version. // 0: old version before streamingnode. // from 1: new version after streamingnode. Version() Version // Message payload. Payload() []byte // EstimateSize returns the estimated size of message. EstimateSize() int // Properties returns the message properties. // Should be used with read-only promise. Properties() RProperties // TimeTick returns the time tick of current message. // Available only when the message's version greater than 0. // Otherwise, it will panic. TimeTick() uint64 // BarrierTimeTick returns the barrier time tick of current message. // 0 by default, no fence. BarrierTimeTick() uint64 // TxnContext returns the transaction context of current message. TxnContext() *TxnContext }
BasicMessage is the basic interface of message.
type BeginTxnMessageBody ¶
type BeginTxnMessageBody = messagespb.BeginTxnMessageBody
type BeginTxnMessageHeader ¶
type BeginTxnMessageHeader = messagespb.BeginTxnMessageHeader
type BroadcastMutableMessage ¶
type BroadcastMutableMessage interface { BasicMessage // BroadcastVChannels returns the target vchannels of the message broadcast. // Those vchannels can be on multi pchannels. BroadcastVChannels() []string // SplitIntoMutableMessage splits the broadcast message into multiple mutable messages. SplitIntoMutableMessage() []MutableMessage }
BroadcastMutableMessage is the broadcast message interface. Indicated the message is broadcasted on various vchannels.
func NewBroadcastMutableMessage ¶
func NewBroadcastMutableMessage(payload []byte, properties map[string]string) BroadcastMutableMessage
NewBroadcastMutableMessage creates a new broadcast mutable message. !!! Only used at server side for streamingcoord internal service, don't use it at client side.
type ChanMessageHandler ¶
type ChanMessageHandler chan ImmutableMessage
ChanMessageHandler is a handler just forward the message into a channel.
func (ChanMessageHandler) Close ¶
func (cmh ChanMessageHandler) Close()
Close is called after all messages are handled or handling is interrupted.
func (ChanMessageHandler) Handle ¶
func (cmh ChanMessageHandler) Handle(ctx context.Context, msg ImmutableMessage) (bool, error)
Handle is the callback for handling message.
type CommitTxnMessageBody ¶
type CommitTxnMessageBody = messagespb.CommitTxnMessageBody
type CommitTxnMessageHeader ¶
type CommitTxnMessageHeader = messagespb.CommitTxnMessageHeader
type CreateCollectionMessageHeader ¶
type CreateCollectionMessageHeader = messagespb.CreateCollectionMessageHeader
type CreatePartitionMessageHeader ¶
type CreatePartitionMessageHeader = messagespb.CreatePartitionMessageHeader
type CreateSegmentMessageBody ¶
type CreateSegmentMessageBody = messagespb.CreateSegmentMessageBody
type CreateSegmentMessageHeader ¶
type CreateSegmentMessageHeader = messagespb.CreateSegmentMessageHeader
type DeleteMessageHeader ¶
type DeleteMessageHeader = messagespb.DeleteMessageHeader
type DropCollectionMessageHeader ¶
type DropCollectionMessageHeader = messagespb.DropCollectionMessageHeader
type DropPartitionMessageHeader ¶
type DropPartitionMessageHeader = messagespb.DropPartitionMessageHeader
type FlushMessageBody ¶
type FlushMessageBody = messagespb.FlushMessageBody
type FlushMessageHeader ¶
type FlushMessageHeader = messagespb.FlushMessageHeader
type Handler ¶
type Handler interface { // Handle is the callback for handling message. // Return true if the message is consumed, false if the message is not consumed. // Should return error if and only if ctx is done. // !!! It's a bad implementation for compatibility for msgstream, // should be removed in the future. Handle(ctx context.Context, msg ImmutableMessage) (bool, error) // Close is called after all messages are handled or handling is interrupted. Close() }
Handler is used to handle message read from log.
type ImmutableBeginTxnMessageV2 ¶
type ImmutableBeginTxnMessageV2 = specializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody]
List all specialized message types.
type ImmutableCommitTxnMessageV2 ¶
type ImmutableCommitTxnMessageV2 = specializedImmutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody]
List all specialized message types.
type ImmutableCreateCollectionMessageV1 ¶
type ImmutableCreateCollectionMessageV1 = specializedImmutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]
List all specialized message types.
type ImmutableCreatePartitionMessageV1 ¶
type ImmutableCreatePartitionMessageV1 = specializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]
List all specialized message types.
type ImmutableCreateSegmentMessageV2 ¶
type ImmutableCreateSegmentMessageV2 = specializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody]
List all specialized message types.
type ImmutableDeleteMessageV1 ¶
type ImmutableDeleteMessageV1 = specializedImmutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest]
List all specialized message types.
type ImmutableDropCollectionMessageV1 ¶
type ImmutableDropCollectionMessageV1 = specializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]
List all specialized message types.
type ImmutableDropPartitionMessageV1 ¶
type ImmutableDropPartitionMessageV1 = specializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]
List all specialized message types.
type ImmutableFlushMessageV2 ¶
type ImmutableFlushMessageV2 = specializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody]
List all specialized message types.
type ImmutableInsertMessageV1 ¶
type ImmutableInsertMessageV1 = specializedImmutableMessage[*InsertMessageHeader, *msgpb.InsertRequest]
List all specialized message types.
type ImmutableManualFlushMessageV2 ¶
type ImmutableManualFlushMessageV2 = specializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody]
List all specialized message types.
type ImmutableMessage ¶
type ImmutableMessage interface { BasicMessage // WALName returns the name of message related wal. WALName() string // VChannel returns the virtual channel of current message. // Available only when the message's version greater than 0. // Return "" if message is can be seen by all vchannels on the pchannel. VChannel() string // MessageID returns the message id of current message. MessageID() MessageID // LastConfirmedMessageID returns the last confirmed message id of current message. // last confirmed message is always a timetick message. // Read from this message id will guarantee the time tick greater than this message is consumed. // Available only when the message's version greater than 0. // Otherwise, it will panic. LastConfirmedMessageID() MessageID }
ImmutableMessage is the read-only message interface. Once a message is persistent by wal or temporary generated by wal, it will be immutable.
func NewImmutableMesasge ¶
func NewImmutableMesasge( id MessageID, payload []byte, properties map[string]string, ) ImmutableMessage
NewImmutableMessage creates a new immutable message. !!! Only used at server side for streaming internal service, don't use it at client side.
type ImmutableRollbackTxnMessageV2 ¶
type ImmutableRollbackTxnMessageV2 = specializedImmutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody]
List all specialized message types.
type ImmutableTimeTickMessageV1 ¶
type ImmutableTimeTickMessageV1 = specializedImmutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg]
List all specialized message types.
type ImmutableTxnMessage ¶
type ImmutableTxnMessage interface { ImmutableMessage // Begin returns the begin message of the transaction. Begin() ImmutableMessage // Commit returns the commit message of the transaction. Commit() ImmutableMessage // RangeOver iterates over the underlying messages in the transaction. // If visitor return not nil, the iteration will be stopped. RangeOver(visitor func(ImmutableMessage) error) error // Size returns the number of messages in the transaction. Size() int }
ImmutableTxnMessage is the read-only transaction message interface. Once a transaction is committed, the wal will generate a transaction message. The MessageType() is always return MessageTypeTransaction if it's a transaction message.
type ImmutableTxnMessageBuilder ¶
type ImmutableTxnMessageBuilder struct {
// contains filtered or unexported fields
}
ImmutableTxnMessageBuilder is a builder for txn message.
func NewImmutableTxnMessageBuilder ¶
func NewImmutableTxnMessageBuilder(begin ImmutableBeginTxnMessageV2) *ImmutableTxnMessageBuilder
NewImmutableTxnMessageBuilder creates a new txn builder.
func (*ImmutableTxnMessageBuilder) Add ¶
func (b *ImmutableTxnMessageBuilder) Add(msg ImmutableMessage) *ImmutableTxnMessageBuilder
Push pushes a message into the txn builder.
func (*ImmutableTxnMessageBuilder) Build ¶
func (b *ImmutableTxnMessageBuilder) Build(commit ImmutableCommitTxnMessageV2) (ImmutableTxnMessage, error)
Build builds a txn message.
func (*ImmutableTxnMessageBuilder) EstimateSize ¶
func (b *ImmutableTxnMessageBuilder) EstimateSize() int
EstimateSize estimates the size of the txn message.
func (*ImmutableTxnMessageBuilder) ExpiredTimeTick ¶
func (b *ImmutableTxnMessageBuilder) ExpiredTimeTick() uint64
ExpiredTimeTick returns the expired time tick of the txn.
type InsertMessageHeader ¶
type InsertMessageHeader = messagespb.InsertMessageHeader
type ManualFlushExtraResponse ¶
type ManualFlushExtraResponse = messagespb.ManualFlushExtraResponse
type ManualFlushMessageBody ¶
type ManualFlushMessageBody = messagespb.ManualFlushMessageBody
type ManualFlushMessageHeader ¶
type ManualFlushMessageHeader = messagespb.ManualFlushMessageHeader
type MessageID ¶
type MessageID interface { // WALName returns the name of message id related wal. WALName() string // LT less than. LT(MessageID) bool // LTE less than or equal to. LTE(MessageID) bool // EQ Equal to. EQ(MessageID) bool // Marshal marshal the message id. Marshal() string // Convert into string for logging. String() string }
MessageID is the interface for message id.
type MessageIDUnmarshaler ¶
MessageIDUnmarshaler is the unmarshaler for message id.
type MessageType ¶
type MessageType messagespb.MessageType
const ( MessageTypeUnknown MessageType = MessageType(messagespb.MessageType_Unknown) MessageTypeTimeTick MessageType = MessageType(messagespb.MessageType_TimeTick) MessageTypeInsert MessageType = MessageType(messagespb.MessageType_Insert) MessageTypeDelete MessageType = MessageType(messagespb.MessageType_Delete) MessageTypeCreateSegment MessageType = MessageType(messagespb.MessageType_CreateSegment) MessageTypeFlush MessageType = MessageType(messagespb.MessageType_Flush) MessageTypeManualFlush MessageType = MessageType(messagespb.MessageType_ManualFlush) MessageTypeCreateCollection MessageType = MessageType(messagespb.MessageType_CreateCollection) MessageTypeDropCollection MessageType = MessageType(messagespb.MessageType_DropCollection) MessageTypeCreatePartition MessageType = MessageType(messagespb.MessageType_CreatePartition) MessageTypeDropPartition MessageType = MessageType(messagespb.MessageType_DropPartition) MessageTypeTxn MessageType = MessageType(messagespb.MessageType_Txn) MessageTypeBeginTxn MessageType = MessageType(messagespb.MessageType_BeginTxn) MessageTypeCommitTxn MessageType = MessageType(messagespb.MessageType_CommitTxn) MessageTypeRollbackTxn MessageType = MessageType(messagespb.MessageType_RollbackTxn) )
func (MessageType) IsSystem ¶
func (t MessageType) IsSystem() bool
IsSysmtem checks if the MessageType is a system type.
func (MessageType) String ¶
func (t MessageType) String() string
String implements fmt.Stringer interface.
func (MessageType) Valid ¶
func (t MessageType) Valid() bool
Valid checks if the MessageType is valid.
type MutableBeginTxnMessageV2 ¶
type MutableBeginTxnMessageV2 = specializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody]
List all specialized message types.
type MutableCommitTxnMessageV2 ¶
type MutableCommitTxnMessageV2 = specializedMutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody]
List all specialized message types.
type MutableCreateCollectionMessageV1 ¶
type MutableCreateCollectionMessageV1 = specializedMutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]
List all specialized message types.
type MutableCreatePartitionMessageV1 ¶
type MutableCreatePartitionMessageV1 = specializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]
List all specialized message types.
type MutableCreateSegmentMessageV2 ¶
type MutableCreateSegmentMessageV2 = specializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody]
List all specialized message types.
type MutableDeleteMessageV1 ¶
type MutableDeleteMessageV1 = specializedMutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest]
List all specialized message types.
type MutableDropCollectionMessageV1 ¶
type MutableDropCollectionMessageV1 = specializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]
List all specialized message types.
type MutableDropPartitionMessageV1 ¶
type MutableDropPartitionMessageV1 = specializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]
List all specialized message types.
type MutableFlushMessageV2 ¶
type MutableFlushMessageV2 = specializedMutableMessage[*FlushMessageHeader, *FlushMessageBody]
List all specialized message types.
type MutableInsertMessageV1 ¶
type MutableInsertMessageV1 = specializedMutableMessage[*InsertMessageHeader, *msgpb.InsertRequest]
List all specialized message types.
type MutableMessage ¶
type MutableMessage interface { BasicMessage // VChannel returns the virtual channel of current message. // Available only when the message's version greater than 0. // Return "" if message is can be seen by all vchannels on the pchannel. VChannel() string // WithBarrierTimeTick sets the barrier time tick of current message. // these time tick is used to promised the message will be sent after that time tick. // and the message which timetick is less than it will never concurrent append with it. // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithBarrierTimeTick(tt uint64) MutableMessage // WithWALTerm sets the wal term of current message. // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithWALTerm(term int64) MutableMessage // WithLastConfirmed sets the last confirmed message id of current message. // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithLastConfirmed(id MessageID) MutableMessage // WithLastConfirmedUseMessageID sets the last confirmed message id of current message to be the same as message id. // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithLastConfirmedUseMessageID() MutableMessage // WithTimeTick sets the time tick of current message. // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithTimeTick(tt uint64) MutableMessage // WithTxnContext sets the transaction context of current message. // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithTxnContext(txnCtx TxnContext) MutableMessage // IntoImmutableMessage converts the mutable message to immutable message. IntoImmutableMessage(msgID MessageID) ImmutableMessage }
MutableMessage is the mutable message interface. Message can be modified before it is persistent by wal.
func NewMutableMessage ¶
func NewMutableMessage(payload []byte, properties map[string]string) MutableMessage
NewMutableMessage creates a new mutable message. !!! Only used at server side for streamingnode internal service, don't use it at client side.
type MutableRollbackTxnMessageV2 ¶
type MutableRollbackTxnMessageV2 = specializedMutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody]
List all specialized message types.
type MutableTimeTickMessageV1 ¶
type MutableTimeTickMessageV1 = specializedMutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg]
List all specialized message types.
type PartitionSegmentAssignment ¶
type PartitionSegmentAssignment = messagespb.PartitionSegmentAssignment
type Properties ¶
type Properties interface { RProperties // Set a key-value pair in Properties. Set(key, value string) }
Properties is the write and readable properties for message.
type RProperties ¶
type RProperties interface { // Get find a value by key. Get(key string) (value string, ok bool) // Exist check if a key exists. Exist(key string) bool // ToRawMap returns the raw map of properties. ToRawMap() map[string]string }
RProperties is the read-only properties for message.
type RollbackTxnMessageBody ¶
type RollbackTxnMessageBody = messagespb.RollbackTxnMessageBody
type RollbackTxnMessageHeader ¶
type RollbackTxnMessageHeader = messagespb.RollbackTxnMessageHeader
type SegmentAssignment ¶
type SegmentAssignment = messagespb.SegmentAssignment
type TimeTickMessageHeader ¶
type TimeTickMessageHeader = messagespb.TimeTickMessageHeader
type TxnContext ¶
TxnContext is the transaction context for message.
func NewTxnContextFromProto ¶
func NewTxnContextFromProto(proto *messagespb.TxnContext) *TxnContext
NewTxnContextFromProto generates TxnContext from proto message.
func (*TxnContext) IntoProto ¶
func (t *TxnContext) IntoProto() *messagespb.TxnContext
IntoProto converts TxnContext to proto message.
type TxnMessageBody ¶
type TxnMessageBody = messagespb.TxnMessageBody
type TxnMessageHeader ¶
type TxnMessageHeader = messagespb.TxnMessageHeader
type TxnState ¶
type TxnState = messagespb.TxnState
const ( TxnStateBegin TxnState = messagespb.TxnState_TxnBegin TxnStateInFlight TxnState = messagespb.TxnState_TxnInFlight TxnStateOnCommit TxnState = messagespb.TxnState_TxnOnCommit TxnStateCommitted TxnState = messagespb.TxnState_TxnCommitted TxnStateOnRollback TxnState = messagespb.TxnState_TxnOnRollback TxnStateRollbacked TxnState = messagespb.TxnState_TxnRollbacked NonTxnID = TxnID(-1) )