message

package
v0.0.0-...-572c485 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	NewTimeTickMessageBuilderV1         = createNewMessageBuilderV1[*TimeTickMessageHeader, *msgpb.TimeTickMsg]()
	NewInsertMessageBuilderV1           = createNewMessageBuilderV1[*InsertMessageHeader, *msgpb.InsertRequest]()
	NewDeleteMessageBuilderV1           = createNewMessageBuilderV1[*DeleteMessageHeader, *msgpb.DeleteRequest]()
	NewCreateCollectionMessageBuilderV1 = createNewMessageBuilderV1[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]()
	NewDropCollectionMessageBuilderV1   = createNewMessageBuilderV1[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]()
	NewCreatePartitionMessageBuilderV1  = createNewMessageBuilderV1[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]()
	NewDropPartitionMessageBuilderV1    = createNewMessageBuilderV1[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]()
	NewFlushMessageBuilderV2            = createNewMessageBuilderV2[*FlushMessageHeader, *FlushMessageBody]()
)

List all type-safe mutable message builders here.

View Source
var (
	AsMutableTimeTickMessageV1         = asSpecializedMutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg]
	AsMutableInsertMessageV1           = asSpecializedMutableMessage[*InsertMessageHeader, *msgpb.InsertRequest]
	AsMutableDeleteMessageV1           = asSpecializedMutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest]
	AsMutableCreateCollectionMessageV1 = asSpecializedMutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]
	AsMutableDropCollectionMessageV1   = asSpecializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]
	AsMutableCreatePartitionMessageV1  = asSpecializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]
	AsMutableDropPartitionMessageV1    = asSpecializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]
	AsMutableFlushMessageV2            = asSpecializedMutableMessage[*FlushMessageHeader, *FlushMessageBody]

	AsImmutableTimeTickMessageV1         = asSpecializedImmutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg]
	AsImmutableInsertMessageV1           = asSpecializedImmutableMessage[*InsertMessageHeader, *msgpb.InsertRequest]
	AsImmutableDeleteMessageV1           = asSpecializedImmutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest]
	AsImmutableCreateCollectionMessageV1 = asSpecializedImmutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]
	AsImmutableDropCollectionMessageV1   = asSpecializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]
	AsImmutableCreatePartitionMessageV1  = asSpecializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]
	AsImmutableDropPartitionMessageV1    = asSpecializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]
	AsImmutableFlushMessageV2            = asSpecializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody]
)

List all as functions for specialized messages.

View Source
var (
	ErrInvalidMessageID = errors.New("invalid message id")
)

Functions

func DecodeInt64

func DecodeInt64(value string) (int64, error)

DecodeInt64 decodes string to int64.

func DecodeProto

func DecodeProto(data string, m proto.Message) error

DecodeProto

func DecodeUint64

func DecodeUint64(value string) (uint64, error)

DecodeUint64 decodes string to uint64.

func EncodeInt64

func EncodeInt64(value int64) string

EncodeInt64 encodes int64 to string.

func EncodeProto

func EncodeProto(m proto.Message) (string, error)

EncodeProto encodes proto message to json string.

func EncodeUint64

func EncodeUint64(value uint64) string

EncodeUint64 encodes uint64 to string.

func RegisterMessageIDUnmsarshaler

func RegisterMessageIDUnmsarshaler(name string, unmarshaler MessageIDUnmarshaler)

RegisterMessageIDUnmsarshaler register the message id unmarshaler.

Types

type BasicMessage

type BasicMessage interface {
	// MessageType returns the type of message.
	MessageType() MessageType

	// Version returns the message version.
	// 0: old version before streamingnode.
	// from 1: new version after streamingnode.
	Version() Version

	// Message payload.
	Payload() []byte

	// EstimateSize returns the estimated size of message.
	EstimateSize() int

	// Properties returns the message properties.
	// Should be used with read-only promise.
	Properties() RProperties

	// VChannel returns the virtual channel of current message.
	// Available only when the message's version greater than 0.
	// Otherwise, it will panic.
	VChannel() string

	// TimeTick returns the time tick of current message.
	// Available only when the message's version greater than 0.
	// Otherwise, it will panic.
	TimeTick() uint64
}

BasicMessage is the basic interface of message.

type ChanMessageHandler

type ChanMessageHandler chan ImmutableMessage

ChanMessageHandler is a handler just forward the message into a channel.

func (ChanMessageHandler) Close

func (cmh ChanMessageHandler) Close()

Close is called after all messages are handled or handling is interrupted.

func (ChanMessageHandler) Handle

func (cmh ChanMessageHandler) Handle(msg ImmutableMessage)

Handle is the callback for handling message.

type CreateCollectionMessageHeader

type CreateCollectionMessageHeader = messagespb.CreateCollectionMessageHeader

type CreatePartitionMessageHeader

type CreatePartitionMessageHeader = messagespb.CreatePartitionMessageHeader

type DeleteMessageHeader

type DeleteMessageHeader = messagespb.DeleteMessageHeader

type DropCollectionMessageHeader

type DropCollectionMessageHeader = messagespb.DropCollectionMessageHeader

type DropPartitionMessageHeader

type DropPartitionMessageHeader = messagespb.DropPartitionMessageHeader

type FlushMessageBody

type FlushMessageBody = messagespb.FlushMessageBody

type FlushMessageHeader

type FlushMessageHeader = messagespb.FlushMessageHeader

type Handler

type Handler interface {
	// Handle is the callback for handling message.
	Handle(msg ImmutableMessage)

	// Close is called after all messages are handled or handling is interrupted.
	Close()
}

Handler is used to handle message read from log.

type ImmutableCreateCollectionMessageV1

type ImmutableCreateCollectionMessageV1 = specializedImmutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]

List all specialized message types.

type ImmutableCreatePartitionMessageV1

type ImmutableCreatePartitionMessageV1 = specializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]

List all specialized message types.

type ImmutableDeleteMessageV1

type ImmutableDeleteMessageV1 = specializedImmutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest]

List all specialized message types.

type ImmutableDropCollectionMessageV1

type ImmutableDropCollectionMessageV1 = specializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]

List all specialized message types.

type ImmutableDropPartitionMessageV1

type ImmutableDropPartitionMessageV1 = specializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]

List all specialized message types.

type ImmutableFlushMessageV2

type ImmutableFlushMessageV2 = specializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody]

List all specialized message types.

type ImmutableInsertMessageV1

type ImmutableInsertMessageV1 = specializedImmutableMessage[*InsertMessageHeader, *msgpb.InsertRequest]

List all specialized message types.

type ImmutableMessage

type ImmutableMessage interface {
	BasicMessage

	// WALName returns the name of message related wal.
	WALName() string

	// MessageID returns the message id of current message.
	MessageID() MessageID

	// LastConfirmedMessageID returns the last confirmed message id of current message.
	// last confirmed message is always a timetick message.
	// Read from this message id will guarantee the time tick greater than this message is consumed.
	// Available only when the message's version greater than 0.
	// Otherwise, it will panic.
	LastConfirmedMessageID() MessageID
}

ImmutableMessage is the read-only message interface. Once a message is persistent by wal or temporary generated by wal, it will be immutable.

func NewImmutableMesasge

func NewImmutableMesasge(
	id MessageID,
	payload []byte,
	properties map[string]string,
) ImmutableMessage

NewImmutableMessage creates a new immutable message.

type ImmutableTimeTickMessageV1

type ImmutableTimeTickMessageV1 = specializedImmutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg]

List all specialized message types.

type InsertMessageHeader

type InsertMessageHeader = messagespb.InsertMessageHeader

type MessageID

type MessageID interface {
	// WALName returns the name of message id related wal.
	WALName() string

	// LT less than.
	LT(MessageID) bool

	// LTE less than or equal to.
	LTE(MessageID) bool

	// EQ Equal to.
	EQ(MessageID) bool

	// Marshal marshal the message id.
	Marshal() string
}

MessageID is the interface for message id.

func UnmarshalMessageID

func UnmarshalMessageID(name string, b string) (MessageID, error)

UnmsarshalMessageID unmarshal the message id.

type MessageIDUnmarshaler

type MessageIDUnmarshaler = func(b string) (MessageID, error)

MessageIDUnmarshaler is the unmarshaler for message id.

type MessageType

type MessageType messagespb.MessageType

func (MessageType) String

func (t MessageType) String() string

String implements fmt.Stringer interface.

func (MessageType) Valid

func (t MessageType) Valid() bool

Valid checks if the MessageType is valid.

type MutableCreateCollectionMessageV1

type MutableCreateCollectionMessageV1 = specializedMutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]

List all specialized message types.

type MutableCreatePartitionMessageV1

type MutableCreatePartitionMessageV1 = specializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]

List all specialized message types.

type MutableDeleteMessageV1

type MutableDeleteMessageV1 = specializedMutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest]

List all specialized message types.

type MutableDropCollectionMessageV1

type MutableDropCollectionMessageV1 = specializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]

List all specialized message types.

type MutableDropPartitionMessageV1

type MutableDropPartitionMessageV1 = specializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]

List all specialized message types.

type MutableFlushMessageV2

type MutableFlushMessageV2 = specializedMutableMessage[*FlushMessageHeader, *FlushMessageBody]

List all specialized message types.

type MutableInsertMessageV1

type MutableInsertMessageV1 = specializedMutableMessage[*InsertMessageHeader, *msgpb.InsertRequest]

List all specialized message types.

type MutableMessage

type MutableMessage interface {
	BasicMessage

	// WithLastConfirmed sets the last confirmed message id of current message.
	// !!! preserved for streaming system internal usage, don't call it outside of streaming system.
	WithLastConfirmed(id MessageID) MutableMessage

	// WithTimeTick sets the time tick of current message.
	// !!! preserved for streaming system internal usage, don't call it outside of streaming system.
	WithTimeTick(tt uint64) MutableMessage

	// IntoImmutableMessage converts the mutable message to immutable message.
	IntoImmutableMessage(msgID MessageID) ImmutableMessage
}

MutableMessage is the mutable message interface. Message can be modified before it is persistent by wal.

func NewMutableMessage

func NewMutableMessage(payload []byte, properties map[string]string) MutableMessage

NewMutableMessage creates a new mutable message. Only used at server side for streamingnode internal service, don't use it at client side.

type MutableTimeTickMessageV1

type MutableTimeTickMessageV1 = specializedMutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg]

List all specialized message types.

type NopCloseHandler

type NopCloseHandler struct {
	Handler
}

NopCloseHandler is a handler that do nothing when close.

func (NopCloseHandler) Close

func (nch NopCloseHandler) Close()

Close is called after all messages are handled or handling is interrupted.

type PartitionSegmentAssignment

type PartitionSegmentAssignment = messagespb.PartitionSegmentAssignment

type Properties

type Properties interface {
	RProperties

	// Set a key-value pair in Properties.
	Set(key, value string)
}

Properties is the write and readable properties for message.

type RProperties

type RProperties interface {
	// Get find a value by key.
	Get(key string) (value string, ok bool)

	// Exist check if a key exists.
	Exist(key string) bool

	// ToRawMap returns the raw map of properties.
	ToRawMap() map[string]string
}

RProperties is the read-only properties for message.

type SegmentAssignment

type SegmentAssignment = messagespb.SegmentAssignment

type TimeTickMessageHeader

type TimeTickMessageHeader = messagespb.TimeTickMessageHeader

type Version

type Version int // message version for compatibility.
var (
	VersionOld Version = 0 // old version before streamingnode.
	VersionV1  Version = 1 // The message marshal unmarshal still use msgstream.
	VersionV2  Version = 2 // The message marshal unmsarhsl is not rely on msgstream.
)

func (Version) GT

func (v Version) GT(v2 Version) bool

func (Version) String

func (v Version) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL