Documentation
¶
Index ¶
- Variables
- func DecodeInt64(value string) (int64, error)
- func DecodeProto(data string, m proto.Message) error
- func DecodeUint64(value string) (uint64, error)
- func EncodeInt64(value int64) string
- func EncodeProto(m proto.Message) (string, error)
- func EncodeUint64(value uint64) string
- func RegisterMessageIDUnmsarshaler(name string, unmarshaler MessageIDUnmarshaler)
- type BasicMessage
- type ChanMessageHandler
- type CreateCollectionMessageHeader
- type CreatePartitionMessageHeader
- type DeleteMessageHeader
- type DropCollectionMessageHeader
- type DropPartitionMessageHeader
- type FlushMessageBody
- type FlushMessageHeader
- type Handler
- type ImmutableCreateCollectionMessageV1
- type ImmutableCreatePartitionMessageV1
- type ImmutableDeleteMessageV1
- type ImmutableDropCollectionMessageV1
- type ImmutableDropPartitionMessageV1
- type ImmutableFlushMessageV2
- type ImmutableInsertMessageV1
- type ImmutableMessage
- type ImmutableTimeTickMessageV1
- type InsertMessageHeader
- type MessageID
- type MessageIDUnmarshaler
- type MessageType
- type MutableCreateCollectionMessageV1
- type MutableCreatePartitionMessageV1
- type MutableDeleteMessageV1
- type MutableDropCollectionMessageV1
- type MutableDropPartitionMessageV1
- type MutableFlushMessageV2
- type MutableInsertMessageV1
- type MutableMessage
- type MutableTimeTickMessageV1
- type NopCloseHandler
- type PartitionSegmentAssignment
- type Properties
- type RProperties
- type SegmentAssignment
- type TimeTickMessageHeader
- type Version
Constants ¶
This section is empty.
Variables ¶
var ( NewTimeTickMessageBuilderV1 = createNewMessageBuilderV1[*TimeTickMessageHeader, *msgpb.TimeTickMsg]() NewInsertMessageBuilderV1 = createNewMessageBuilderV1[*InsertMessageHeader, *msgpb.InsertRequest]() NewDeleteMessageBuilderV1 = createNewMessageBuilderV1[*DeleteMessageHeader, *msgpb.DeleteRequest]() NewCreateCollectionMessageBuilderV1 = createNewMessageBuilderV1[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]() NewDropCollectionMessageBuilderV1 = createNewMessageBuilderV1[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]() NewCreatePartitionMessageBuilderV1 = createNewMessageBuilderV1[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]() NewDropPartitionMessageBuilderV1 = createNewMessageBuilderV1[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]() NewFlushMessageBuilderV2 = createNewMessageBuilderV2[*FlushMessageHeader, *FlushMessageBody]() )
List all type-safe mutable message builders here.
var ( AsMutableTimeTickMessageV1 = asSpecializedMutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg] AsMutableInsertMessageV1 = asSpecializedMutableMessage[*InsertMessageHeader, *msgpb.InsertRequest] AsMutableDeleteMessageV1 = asSpecializedMutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest] AsMutableCreateCollectionMessageV1 = asSpecializedMutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest] AsMutableDropCollectionMessageV1 = asSpecializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] AsMutableCreatePartitionMessageV1 = asSpecializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] AsMutableDropPartitionMessageV1 = asSpecializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] AsMutableFlushMessageV2 = asSpecializedMutableMessage[*FlushMessageHeader, *FlushMessageBody] AsImmutableTimeTickMessageV1 = asSpecializedImmutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg] AsImmutableInsertMessageV1 = asSpecializedImmutableMessage[*InsertMessageHeader, *msgpb.InsertRequest] AsImmutableDeleteMessageV1 = asSpecializedImmutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest] AsImmutableCreateCollectionMessageV1 = asSpecializedImmutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest] AsImmutableDropCollectionMessageV1 = asSpecializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] AsImmutableCreatePartitionMessageV1 = asSpecializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] AsImmutableDropPartitionMessageV1 = asSpecializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] AsImmutableFlushMessageV2 = asSpecializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody] )
List all as functions for specialized messages.
var (
ErrInvalidMessageID = errors.New("invalid message id")
)
Functions ¶
func DecodeInt64 ¶
DecodeInt64 decodes string to int64.
func DecodeUint64 ¶
DecodeUint64 decodes string to uint64.
func EncodeProto ¶
EncodeProto encodes proto message to json string.
func RegisterMessageIDUnmsarshaler ¶
func RegisterMessageIDUnmsarshaler(name string, unmarshaler MessageIDUnmarshaler)
RegisterMessageIDUnmsarshaler register the message id unmarshaler.
Types ¶
type BasicMessage ¶
type BasicMessage interface { // MessageType returns the type of message. MessageType() MessageType // Version returns the message version. // 0: old version before streamingnode. // from 1: new version after streamingnode. Version() Version // Message payload. Payload() []byte // EstimateSize returns the estimated size of message. EstimateSize() int // Properties returns the message properties. // Should be used with read-only promise. Properties() RProperties // VChannel returns the virtual channel of current message. // Available only when the message's version greater than 0. // Otherwise, it will panic. VChannel() string // TimeTick returns the time tick of current message. // Available only when the message's version greater than 0. // Otherwise, it will panic. TimeTick() uint64 }
BasicMessage is the basic interface of message.
type ChanMessageHandler ¶
type ChanMessageHandler chan ImmutableMessage
ChanMessageHandler is a handler just forward the message into a channel.
func (ChanMessageHandler) Close ¶
func (cmh ChanMessageHandler) Close()
Close is called after all messages are handled or handling is interrupted.
func (ChanMessageHandler) Handle ¶
func (cmh ChanMessageHandler) Handle(msg ImmutableMessage)
Handle is the callback for handling message.
type CreateCollectionMessageHeader ¶
type CreateCollectionMessageHeader = messagespb.CreateCollectionMessageHeader
type CreatePartitionMessageHeader ¶
type CreatePartitionMessageHeader = messagespb.CreatePartitionMessageHeader
type DeleteMessageHeader ¶
type DeleteMessageHeader = messagespb.DeleteMessageHeader
type DropCollectionMessageHeader ¶
type DropCollectionMessageHeader = messagespb.DropCollectionMessageHeader
type DropPartitionMessageHeader ¶
type DropPartitionMessageHeader = messagespb.DropPartitionMessageHeader
type FlushMessageBody ¶
type FlushMessageBody = messagespb.FlushMessageBody
type FlushMessageHeader ¶
type FlushMessageHeader = messagespb.FlushMessageHeader
type Handler ¶
type Handler interface { // Handle is the callback for handling message. Handle(msg ImmutableMessage) // Close is called after all messages are handled or handling is interrupted. Close() }
Handler is used to handle message read from log.
type ImmutableCreateCollectionMessageV1 ¶
type ImmutableCreateCollectionMessageV1 = specializedImmutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]
List all specialized message types.
type ImmutableCreatePartitionMessageV1 ¶
type ImmutableCreatePartitionMessageV1 = specializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]
List all specialized message types.
type ImmutableDeleteMessageV1 ¶
type ImmutableDeleteMessageV1 = specializedImmutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest]
List all specialized message types.
type ImmutableDropCollectionMessageV1 ¶
type ImmutableDropCollectionMessageV1 = specializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]
List all specialized message types.
type ImmutableDropPartitionMessageV1 ¶
type ImmutableDropPartitionMessageV1 = specializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]
List all specialized message types.
type ImmutableFlushMessageV2 ¶
type ImmutableFlushMessageV2 = specializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody]
List all specialized message types.
type ImmutableInsertMessageV1 ¶
type ImmutableInsertMessageV1 = specializedImmutableMessage[*InsertMessageHeader, *msgpb.InsertRequest]
List all specialized message types.
type ImmutableMessage ¶
type ImmutableMessage interface { BasicMessage // WALName returns the name of message related wal. WALName() string // MessageID returns the message id of current message. MessageID() MessageID // LastConfirmedMessageID returns the last confirmed message id of current message. // last confirmed message is always a timetick message. // Read from this message id will guarantee the time tick greater than this message is consumed. // Available only when the message's version greater than 0. // Otherwise, it will panic. LastConfirmedMessageID() MessageID }
ImmutableMessage is the read-only message interface. Once a message is persistent by wal or temporary generated by wal, it will be immutable.
func NewImmutableMesasge ¶
func NewImmutableMesasge( id MessageID, payload []byte, properties map[string]string, ) ImmutableMessage
NewImmutableMessage creates a new immutable message.
type ImmutableTimeTickMessageV1 ¶
type ImmutableTimeTickMessageV1 = specializedImmutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg]
List all specialized message types.
type InsertMessageHeader ¶
type InsertMessageHeader = messagespb.InsertMessageHeader
type MessageID ¶
type MessageID interface { // WALName returns the name of message id related wal. WALName() string // LT less than. LT(MessageID) bool // LTE less than or equal to. LTE(MessageID) bool // EQ Equal to. EQ(MessageID) bool // Marshal marshal the message id. Marshal() string }
MessageID is the interface for message id.
type MessageIDUnmarshaler ¶
MessageIDUnmarshaler is the unmarshaler for message id.
type MessageType ¶
type MessageType messagespb.MessageType
const ( MessageTypeUnknown MessageType = MessageType(messagespb.MessageType_Unknown) MessageTypeTimeTick MessageType = MessageType(messagespb.MessageType_TimeTick) MessageTypeInsert MessageType = MessageType(messagespb.MessageType_Insert) MessageTypeDelete MessageType = MessageType(messagespb.MessageType_Delete) MessageTypeFlush MessageType = MessageType(messagespb.MessageType_Flush) MessageTypeCreateCollection MessageType = MessageType(messagespb.MessageType_CreateCollection) MessageTypeDropCollection MessageType = MessageType(messagespb.MessageType_DropCollection) MessageTypeCreatePartition MessageType = MessageType(messagespb.MessageType_CreatePartition) MessageTypeDropPartition MessageType = MessageType(messagespb.MessageType_DropPartition) )
func (MessageType) String ¶
func (t MessageType) String() string
String implements fmt.Stringer interface.
func (MessageType) Valid ¶
func (t MessageType) Valid() bool
Valid checks if the MessageType is valid.
type MutableCreateCollectionMessageV1 ¶
type MutableCreateCollectionMessageV1 = specializedMutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]
List all specialized message types.
type MutableCreatePartitionMessageV1 ¶
type MutableCreatePartitionMessageV1 = specializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]
List all specialized message types.
type MutableDeleteMessageV1 ¶
type MutableDeleteMessageV1 = specializedMutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest]
List all specialized message types.
type MutableDropCollectionMessageV1 ¶
type MutableDropCollectionMessageV1 = specializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]
List all specialized message types.
type MutableDropPartitionMessageV1 ¶
type MutableDropPartitionMessageV1 = specializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]
List all specialized message types.
type MutableFlushMessageV2 ¶
type MutableFlushMessageV2 = specializedMutableMessage[*FlushMessageHeader, *FlushMessageBody]
List all specialized message types.
type MutableInsertMessageV1 ¶
type MutableInsertMessageV1 = specializedMutableMessage[*InsertMessageHeader, *msgpb.InsertRequest]
List all specialized message types.
type MutableMessage ¶
type MutableMessage interface { BasicMessage // WithLastConfirmed sets the last confirmed message id of current message. // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithLastConfirmed(id MessageID) MutableMessage // WithTimeTick sets the time tick of current message. // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithTimeTick(tt uint64) MutableMessage // IntoImmutableMessage converts the mutable message to immutable message. IntoImmutableMessage(msgID MessageID) ImmutableMessage }
MutableMessage is the mutable message interface. Message can be modified before it is persistent by wal.
func NewMutableMessage ¶
func NewMutableMessage(payload []byte, properties map[string]string) MutableMessage
NewMutableMessage creates a new mutable message. Only used at server side for streamingnode internal service, don't use it at client side.
type MutableTimeTickMessageV1 ¶
type MutableTimeTickMessageV1 = specializedMutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg]
List all specialized message types.
type NopCloseHandler ¶
type NopCloseHandler struct {
Handler
}
NopCloseHandler is a handler that do nothing when close.
func (NopCloseHandler) Close ¶
func (nch NopCloseHandler) Close()
Close is called after all messages are handled or handling is interrupted.
type PartitionSegmentAssignment ¶
type PartitionSegmentAssignment = messagespb.PartitionSegmentAssignment
type Properties ¶
type Properties interface { RProperties // Set a key-value pair in Properties. Set(key, value string) }
Properties is the write and readable properties for message.
type RProperties ¶
type RProperties interface { // Get find a value by key. Get(key string) (value string, ok bool) // Exist check if a key exists. Exist(key string) bool // ToRawMap returns the raw map of properties. ToRawMap() map[string]string }
RProperties is the read-only properties for message.
type SegmentAssignment ¶
type SegmentAssignment = messagespb.SegmentAssignment
type TimeTickMessageHeader ¶
type TimeTickMessageHeader = messagespb.TimeTickMessageHeader