Documentation
¶
Index ¶
- func DefaultRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)
- func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)
- func ExtractCtx(msg TsMsg, properties map[string]string) (context.Context, trace.Span)
- func GetChannelLatestMsgID(ctx context.Context, factory Factory, channelName string) ([]byte, error)
- func InjectCtx(sc context.Context, properties map[string]string)
- func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)
- func MsgSpanFromCtx(ctx context.Context, msg TsMsg) (context.Context, trace.Span)
- func NewMqMsgStream(ctx context.Context, receiveBufSize int64, bufSize int64, ...) (*mqMsgStream, error)
- func UnsubscribeChannels(ctx context.Context, factory Factory, subName string, channels []string)
- type AlterIndexMsg
- func (it *AlterIndexMsg) ID() UniqueID
- func (it *AlterIndexMsg) Marshal(input TsMsg) (MarshalType, error)
- func (it *AlterIndexMsg) SetID(id UniqueID)
- func (it *AlterIndexMsg) Size() int
- func (it *AlterIndexMsg) SourceID() int64
- func (it *AlterIndexMsg) Type() MsgType
- func (it *AlterIndexMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type BaseMsg
- func (bm *BaseMsg) BeginTs() Timestamp
- func (bm *BaseMsg) EndTs() Timestamp
- func (bm *BaseMsg) HashKeys() []uint32
- func (bm *BaseMsg) Position() *MsgPosition
- func (bm *BaseMsg) SetPosition(position *MsgPosition)
- func (bm *BaseMsg) SetTraceCtx(ctx context.Context)
- func (bm *BaseMsg) SetTs(ts uint64)
- func (bm *BaseMsg) TraceCtx() context.Context
- type CommonFactory
- type CreateCollectionMsg
- func (cc *CreateCollectionMsg) ID() UniqueID
- func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
- func (cc *CreateCollectionMsg) SetID(id UniqueID)
- func (cc *CreateCollectionMsg) Size() int
- func (cc *CreateCollectionMsg) SourceID() int64
- func (cc *CreateCollectionMsg) Type() MsgType
- func (cc *CreateCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type CreateDatabaseMsg
- func (c *CreateDatabaseMsg) ID() UniqueID
- func (c *CreateDatabaseMsg) Marshal(input TsMsg) (MarshalType, error)
- func (c *CreateDatabaseMsg) SetID(id UniqueID)
- func (c *CreateDatabaseMsg) Size() int
- func (c *CreateDatabaseMsg) SourceID() int64
- func (c *CreateDatabaseMsg) Type() MsgType
- func (c *CreateDatabaseMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type CreateIndexMsg
- func (it *CreateIndexMsg) ID() UniqueID
- func (it *CreateIndexMsg) Marshal(input TsMsg) (MarshalType, error)
- func (it *CreateIndexMsg) SetID(id UniqueID)
- func (it *CreateIndexMsg) Size() int
- func (it *CreateIndexMsg) SourceID() int64
- func (it *CreateIndexMsg) Type() MsgType
- func (it *CreateIndexMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type CreatePartitionMsg
- func (cp *CreatePartitionMsg) ID() UniqueID
- func (cp *CreatePartitionMsg) Marshal(input TsMsg) (MarshalType, error)
- func (cp *CreatePartitionMsg) SetID(id UniqueID)
- func (cp *CreatePartitionMsg) Size() int
- func (cp *CreatePartitionMsg) SourceID() int64
- func (cp *CreatePartitionMsg) Type() MsgType
- func (cp *CreatePartitionMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type DataNodeTtMsg
- func (m *DataNodeTtMsg) ID() UniqueID
- func (m *DataNodeTtMsg) Marshal(input TsMsg) (MarshalType, error)
- func (m *DataNodeTtMsg) SetID(id UniqueID)
- func (m *DataNodeTtMsg) Size() int
- func (m *DataNodeTtMsg) SourceID() int64
- func (m *DataNodeTtMsg) Type() MsgType
- func (m *DataNodeTtMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type DeleteMsg
- func (dt *DeleteMsg) CheckAligned() error
- func (dt *DeleteMsg) ID() UniqueID
- func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error)
- func (dt *DeleteMsg) SetID(id UniqueID)
- func (dt *DeleteMsg) Size() int
- func (dt *DeleteMsg) SourceID() int64
- func (dt *DeleteMsg) Type() MsgType
- func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type DropCollectionMsg
- func (dc *DropCollectionMsg) ID() UniqueID
- func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
- func (dc *DropCollectionMsg) SetID(id UniqueID)
- func (dc *DropCollectionMsg) Size() int
- func (dc *DropCollectionMsg) SourceID() int64
- func (dc *DropCollectionMsg) Type() MsgType
- func (dc *DropCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type DropDatabaseMsg
- func (d *DropDatabaseMsg) ID() UniqueID
- func (d *DropDatabaseMsg) Marshal(input TsMsg) (MarshalType, error)
- func (d *DropDatabaseMsg) SetID(id UniqueID)
- func (d *DropDatabaseMsg) Size() int
- func (d *DropDatabaseMsg) SourceID() int64
- func (d *DropDatabaseMsg) Type() MsgType
- func (d *DropDatabaseMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type DropIndexMsg
- func (d *DropIndexMsg) ID() UniqueID
- func (d *DropIndexMsg) Marshal(input TsMsg) (MarshalType, error)
- func (d *DropIndexMsg) SetID(id UniqueID)
- func (d *DropIndexMsg) Size() int
- func (d *DropIndexMsg) SourceID() int64
- func (d *DropIndexMsg) Type() MsgType
- func (d *DropIndexMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type DropPartitionMsg
- func (dp *DropPartitionMsg) ID() UniqueID
- func (dp *DropPartitionMsg) Marshal(input TsMsg) (MarshalType, error)
- func (dp *DropPartitionMsg) SetID(id UniqueID)
- func (dp *DropPartitionMsg) Size() int
- func (dp *DropPartitionMsg) SourceID() int64
- func (dp *DropPartitionMsg) Type() MsgType
- func (dp *DropPartitionMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type Factory
- type FlushMsg
- type InsertMsg
- func (it *InsertMsg) CheckAligned() error
- func (it *InsertMsg) ID() UniqueID
- func (it *InsertMsg) IndexMsg(index int) *InsertMsg
- func (it *InsertMsg) IndexRequest(index int) *msgpb.InsertRequest
- func (it *InsertMsg) IsColumnBased() bool
- func (it *InsertMsg) IsRowBased() bool
- func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error)
- func (it *InsertMsg) NRows() uint64
- func (it *InsertMsg) SetID(id UniqueID)
- func (it *InsertMsg) Size() int
- func (it *InsertMsg) SourceID() int64
- func (it *InsertMsg) Type() MsgType
- func (it *InsertMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type IntPrimaryKey
- type KmsFactory
- type LoadCollectionMsg
- func (l *LoadCollectionMsg) ID() UniqueID
- func (l *LoadCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
- func (l *LoadCollectionMsg) SetID(id UniqueID)
- func (l *LoadCollectionMsg) Size() int
- func (l *LoadCollectionMsg) SourceID() int64
- func (l *LoadCollectionMsg) Type() MsgType
- func (l *LoadCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type LoadPartitionsMsg
- func (l *LoadPartitionsMsg) ID() UniqueID
- func (l *LoadPartitionsMsg) Marshal(input TsMsg) (MarshalType, error)
- func (l *LoadPartitionsMsg) SetID(id UniqueID)
- func (l *LoadPartitionsMsg) Size() int
- func (l *LoadPartitionsMsg) SourceID() int64
- func (l *LoadPartitionsMsg) Type() MsgType
- func (l *LoadPartitionsMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type MarshalType
- type MessageID
- type MockFactory
- func (_m *MockFactory) EXPECT() *MockFactory_Expecter
- func (_m *MockFactory) NewMsgStream(ctx context.Context) (MsgStream, error)
- func (_m *MockFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
- func (_m *MockFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error)
- type MockFactory_Expecter
- type MockFactory_NewMsgStreamDisposer_Call
- func (_c *MockFactory_NewMsgStreamDisposer_Call) Return(_a0 func([]string, string) error) *MockFactory_NewMsgStreamDisposer_Call
- func (_c *MockFactory_NewMsgStreamDisposer_Call) Run(run func(ctx context.Context)) *MockFactory_NewMsgStreamDisposer_Call
- func (_c *MockFactory_NewMsgStreamDisposer_Call) RunAndReturn(run func(context.Context) func([]string, string) error) *MockFactory_NewMsgStreamDisposer_Call
- type MockFactory_NewMsgStream_Call
- func (_c *MockFactory_NewMsgStream_Call) Return(_a0 MsgStream, _a1 error) *MockFactory_NewMsgStream_Call
- func (_c *MockFactory_NewMsgStream_Call) Run(run func(ctx context.Context)) *MockFactory_NewMsgStream_Call
- func (_c *MockFactory_NewMsgStream_Call) RunAndReturn(run func(context.Context) (MsgStream, error)) *MockFactory_NewMsgStream_Call
- type MockFactory_NewTtMsgStream_Call
- func (_c *MockFactory_NewTtMsgStream_Call) Return(_a0 MsgStream, _a1 error) *MockFactory_NewTtMsgStream_Call
- func (_c *MockFactory_NewTtMsgStream_Call) Run(run func(ctx context.Context)) *MockFactory_NewTtMsgStream_Call
- func (_c *MockFactory_NewTtMsgStream_Call) RunAndReturn(run func(context.Context) (MsgStream, error)) *MockFactory_NewTtMsgStream_Call
- type MockMqFactory
- type MockMsgStream
- func (_m *MockMsgStream) AsConsumer(ctx context.Context, channels []string, subName string, ...) error
- func (_m *MockMsgStream) AsProducer(channels []string)
- func (_m *MockMsgStream) Broadcast(_a0 *MsgPack) (map[string][]common.MessageID, error)
- func (_m *MockMsgStream) Chan() <-chan *MsgPack
- func (_m *MockMsgStream) CheckTopicValid(channel string) error
- func (_m *MockMsgStream) Close()
- func (_m *MockMsgStream) EXPECT() *MockMsgStream_Expecter
- func (_m *MockMsgStream) EnableProduce(can bool)
- func (_m *MockMsgStream) GetLatestMsgID(channel string) (common.MessageID, error)
- func (_m *MockMsgStream) GetProduceChannels() []string
- func (_m *MockMsgStream) Produce(_a0 *MsgPack) error
- func (_m *MockMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosition, includeCurrentMsg bool) error
- func (_m *MockMsgStream) SetRepackFunc(repackFunc RepackFunc)
- type MockMsgStream_AsConsumer_Call
- func (_c *MockMsgStream_AsConsumer_Call) Return(_a0 error) *MockMsgStream_AsConsumer_Call
- func (_c *MockMsgStream_AsConsumer_Call) Run(run func(ctx context.Context, channels []string, subName string, ...)) *MockMsgStream_AsConsumer_Call
- func (_c *MockMsgStream_AsConsumer_Call) RunAndReturn(...) *MockMsgStream_AsConsumer_Call
- type MockMsgStream_AsProducer_Call
- type MockMsgStream_Broadcast_Call
- func (_c *MockMsgStream_Broadcast_Call) Return(_a0 map[string][]common.MessageID, _a1 error) *MockMsgStream_Broadcast_Call
- func (_c *MockMsgStream_Broadcast_Call) Run(run func(_a0 *MsgPack)) *MockMsgStream_Broadcast_Call
- func (_c *MockMsgStream_Broadcast_Call) RunAndReturn(run func(*MsgPack) (map[string][]common.MessageID, error)) *MockMsgStream_Broadcast_Call
- type MockMsgStream_Chan_Call
- type MockMsgStream_CheckTopicValid_Call
- func (_c *MockMsgStream_CheckTopicValid_Call) Return(_a0 error) *MockMsgStream_CheckTopicValid_Call
- func (_c *MockMsgStream_CheckTopicValid_Call) Run(run func(channel string)) *MockMsgStream_CheckTopicValid_Call
- func (_c *MockMsgStream_CheckTopicValid_Call) RunAndReturn(run func(string) error) *MockMsgStream_CheckTopicValid_Call
- type MockMsgStream_Close_Call
- type MockMsgStream_EnableProduce_Call
- type MockMsgStream_Expecter
- func (_e *MockMsgStream_Expecter) AsConsumer(ctx interface{}, channels interface{}, subName interface{}, ...) *MockMsgStream_AsConsumer_Call
- func (_e *MockMsgStream_Expecter) AsProducer(channels interface{}) *MockMsgStream_AsProducer_Call
- func (_e *MockMsgStream_Expecter) Broadcast(_a0 interface{}) *MockMsgStream_Broadcast_Call
- func (_e *MockMsgStream_Expecter) Chan() *MockMsgStream_Chan_Call
- func (_e *MockMsgStream_Expecter) CheckTopicValid(channel interface{}) *MockMsgStream_CheckTopicValid_Call
- func (_e *MockMsgStream_Expecter) Close() *MockMsgStream_Close_Call
- func (_e *MockMsgStream_Expecter) EnableProduce(can interface{}) *MockMsgStream_EnableProduce_Call
- func (_e *MockMsgStream_Expecter) GetLatestMsgID(channel interface{}) *MockMsgStream_GetLatestMsgID_Call
- func (_e *MockMsgStream_Expecter) GetProduceChannels() *MockMsgStream_GetProduceChannels_Call
- func (_e *MockMsgStream_Expecter) Produce(_a0 interface{}) *MockMsgStream_Produce_Call
- func (_e *MockMsgStream_Expecter) Seek(ctx interface{}, msgPositions interface{}, includeCurrentMsg interface{}) *MockMsgStream_Seek_Call
- func (_e *MockMsgStream_Expecter) SetRepackFunc(repackFunc interface{}) *MockMsgStream_SetRepackFunc_Call
- type MockMsgStream_GetLatestMsgID_Call
- func (_c *MockMsgStream_GetLatestMsgID_Call) Return(_a0 common.MessageID, _a1 error) *MockMsgStream_GetLatestMsgID_Call
- func (_c *MockMsgStream_GetLatestMsgID_Call) Run(run func(channel string)) *MockMsgStream_GetLatestMsgID_Call
- func (_c *MockMsgStream_GetLatestMsgID_Call) RunAndReturn(run func(string) (common.MessageID, error)) *MockMsgStream_GetLatestMsgID_Call
- type MockMsgStream_GetProduceChannels_Call
- func (_c *MockMsgStream_GetProduceChannels_Call) Return(_a0 []string) *MockMsgStream_GetProduceChannels_Call
- func (_c *MockMsgStream_GetProduceChannels_Call) Run(run func()) *MockMsgStream_GetProduceChannels_Call
- func (_c *MockMsgStream_GetProduceChannels_Call) RunAndReturn(run func() []string) *MockMsgStream_GetProduceChannels_Call
- type MockMsgStream_Produce_Call
- type MockMsgStream_Seek_Call
- type MockMsgStream_SetRepackFunc_Call
- func (_c *MockMsgStream_SetRepackFunc_Call) Return() *MockMsgStream_SetRepackFunc_Call
- func (_c *MockMsgStream_SetRepackFunc_Call) Run(run func(repackFunc RepackFunc)) *MockMsgStream_SetRepackFunc_Call
- func (_c *MockMsgStream_SetRepackFunc_Call) RunAndReturn(run func(RepackFunc)) *MockMsgStream_SetRepackFunc_Call
- type MqTtMsgStream
- func (ms *MqTtMsgStream) AsConsumer(ctx context.Context, channels []string, subName string, ...) error
- func (ms MqTtMsgStream) AsProducer(channels []string)
- func (ms MqTtMsgStream) Broadcast(msgPack *MsgPack) (map[string][]MessageID, error)
- func (ms *MqTtMsgStream) Chan() <-chan *MsgPack
- func (ms MqTtMsgStream) CheckTopicValid(channel string) error
- func (ms *MqTtMsgStream) Close()
- func (ms MqTtMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32
- func (ms MqTtMsgStream) EnableProduce(can bool)
- func (ms MqTtMsgStream) GetLatestMsgID(channel string) (MessageID, error)
- func (ms MqTtMsgStream) GetProduceChannels() []string
- func (ms MqTtMsgStream) Produce(msgPack *MsgPack) error
- func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*MsgPosition, includeCurrentMsg bool) error
- func (ms MqTtMsgStream) SetRepackFunc(repackFunc RepackFunc)
- type MsgPack
- type MsgPosition
- type MsgStream
- type MsgType
- type PmsFactory
- type ProtoUDFactory
- type ProtoUnmarshalDispatcher
- type ReleaseCollectionMsg
- func (r *ReleaseCollectionMsg) ID() UniqueID
- func (r *ReleaseCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
- func (r *ReleaseCollectionMsg) SetID(id UniqueID)
- func (r *ReleaseCollectionMsg) Size() int
- func (r *ReleaseCollectionMsg) SourceID() int64
- func (r *ReleaseCollectionMsg) Type() MsgType
- func (r *ReleaseCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type ReleasePartitionsMsg
- func (r *ReleasePartitionsMsg) ID() UniqueID
- func (r *ReleasePartitionsMsg) Marshal(input TsMsg) (MarshalType, error)
- func (r *ReleasePartitionsMsg) SetID(id UniqueID)
- func (r *ReleasePartitionsMsg) Size() int
- func (r *ReleasePartitionsMsg) SourceID() int64
- func (r *ReleasePartitionsMsg) Type() MsgType
- func (r *ReleasePartitionsMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type RepackFunc
- type TimeTickMsg
- func (tst *TimeTickMsg) ID() UniqueID
- func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error)
- func (tst *TimeTickMsg) SetID(id UniqueID)
- func (tst *TimeTickMsg) Size() int
- func (tst *TimeTickMsg) SourceID() int64
- func (tst *TimeTickMsg) Type() MsgType
- func (tst *TimeTickMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type Timestamp
- type TsMsg
- type UniqueID
- type UnmarshalDispatcher
- type UnmarshalDispatcherFactory
- type UnmarshalFunc
- type UpsertMsg
- type WastedMockMsgStream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultRepackFunc ¶
DefaultRepackFunc is used to repack messages after hash by primary key
func DeleteRepackFunc ¶
DeleteRepackFunc is used to repack messages after hash by primary key
func ExtractCtx ¶
ExtractCtx extracts trace span from msg.properties. And it will attach some default tags to the span.
func GetChannelLatestMsgID ¶
func InsertRepackFunc ¶
InsertRepackFunc is used to repack messages after hash by primary key
func MsgSpanFromCtx ¶
MsgSpanFromCtx extracts the span from context. And it will attach some default tags to the span.
func NewMqMsgStream ¶
func NewMqMsgStream(ctx context.Context, receiveBufSize int64, bufSize int64, client mqwrapper.Client, unmarshal UnmarshalDispatcher, ) (*mqMsgStream, error)
NewMqMsgStream is used to generate a new mqMsgStream object
Types ¶
type AlterIndexMsg ¶
type AlterIndexMsg struct { BaseMsg *milvuspb.AlterIndexRequest }
AlterIndexMsg is a message pack that contains create index request
func (*AlterIndexMsg) ID ¶
func (it *AlterIndexMsg) ID() UniqueID
ID returns the ID of this message pack
func (*AlterIndexMsg) Marshal ¶
func (it *AlterIndexMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serialize a message pack to byte array
func (*AlterIndexMsg) SetID ¶
func (it *AlterIndexMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*AlterIndexMsg) Size ¶
func (it *AlterIndexMsg) Size() int
func (*AlterIndexMsg) SourceID ¶
func (it *AlterIndexMsg) SourceID() int64
SourceID indicates which component generated this message
func (*AlterIndexMsg) Type ¶
func (it *AlterIndexMsg) Type() MsgType
Type returns the type of this message pack
func (*AlterIndexMsg) Unmarshal ¶
func (it *AlterIndexMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserialize a message pack from byte array
type BaseMsg ¶
type BaseMsg struct { Ctx context.Context BeginTimestamp Timestamp EndTimestamp Timestamp HashValues []uint32 MsgPosition *MsgPosition // contains filtered or unexported fields }
BaseMsg is a basic structure that contains begin timestamp, end timestamp and the position of msgstream
func (*BaseMsg) Position ¶
func (bm *BaseMsg) Position() *MsgPosition
Position returns the position of this message pack in msgstream
func (*BaseMsg) SetPosition ¶
func (bm *BaseMsg) SetPosition(position *MsgPosition)
SetPosition is used to set position of this message in msgstream
func (*BaseMsg) SetTraceCtx ¶
SetTraceCtx is used to set context for opentracing
type CommonFactory ¶
type CommonFactory struct { Newer func(context.Context) (mqwrapper.Client, error) // client constructor DispatcherFactory ProtoUDFactory ReceiveBufSize int64 MQBufSize int64 }
CommonFactory is a Factory for creating message streams with common logic.
It contains a function field named newer, which is a function that creates an mqwrapper.Client when called.
func (*CommonFactory) NewMsgStream ¶
func (f *CommonFactory) NewMsgStream(ctx context.Context) (ms MsgStream, err error)
NewMsgStream is used to generate a new Msgstream object
func (*CommonFactory) NewMsgStreamDisposer ¶
NewMsgStreamDisposer returns a function that can be used to dispose of a message stream. The returned function takes a slice of channel names and a subscription name, and disposes of the message stream associated with those arguments.
func (*CommonFactory) NewTtMsgStream ¶
func (f *CommonFactory) NewTtMsgStream(ctx context.Context) (ms MsgStream, err error)
NewTtMsgStream is used to generate a new TtMsgstream object
type CreateCollectionMsg ¶
type CreateCollectionMsg struct { BaseMsg *msgpb.CreateCollectionRequest }
CreateCollectionMsg is a message pack that contains create collection request
func (*CreateCollectionMsg) ID ¶
func (cc *CreateCollectionMsg) ID() UniqueID
ID returns the ID of this message pack
func (*CreateCollectionMsg) Marshal ¶
func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*CreateCollectionMsg) SetID ¶
func (cc *CreateCollectionMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*CreateCollectionMsg) Size ¶
func (cc *CreateCollectionMsg) Size() int
func (*CreateCollectionMsg) SourceID ¶
func (cc *CreateCollectionMsg) SourceID() int64
SourceID indicates which component generated this message
func (*CreateCollectionMsg) Type ¶
func (cc *CreateCollectionMsg) Type() MsgType
Type returns the type of this message pack
func (*CreateCollectionMsg) Unmarshal ¶
func (cc *CreateCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type CreateDatabaseMsg ¶
type CreateDatabaseMsg struct { BaseMsg *milvuspb.CreateDatabaseRequest }
func (*CreateDatabaseMsg) ID ¶
func (c *CreateDatabaseMsg) ID() UniqueID
func (*CreateDatabaseMsg) Marshal ¶
func (c *CreateDatabaseMsg) Marshal(input TsMsg) (MarshalType, error)
func (*CreateDatabaseMsg) SetID ¶
func (c *CreateDatabaseMsg) SetID(id UniqueID)
func (*CreateDatabaseMsg) Size ¶
func (c *CreateDatabaseMsg) Size() int
func (*CreateDatabaseMsg) SourceID ¶
func (c *CreateDatabaseMsg) SourceID() int64
func (*CreateDatabaseMsg) Type ¶
func (c *CreateDatabaseMsg) Type() MsgType
func (*CreateDatabaseMsg) Unmarshal ¶
func (c *CreateDatabaseMsg) Unmarshal(input MarshalType) (TsMsg, error)
type CreateIndexMsg ¶
type CreateIndexMsg struct { BaseMsg *milvuspb.CreateIndexRequest }
CreateIndexMsg is a message pack that contains create index request
func (*CreateIndexMsg) ID ¶
func (it *CreateIndexMsg) ID() UniqueID
ID returns the ID of this message pack
func (*CreateIndexMsg) Marshal ¶
func (it *CreateIndexMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serialize a message pack to byte array
func (*CreateIndexMsg) SetID ¶
func (it *CreateIndexMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*CreateIndexMsg) Size ¶
func (it *CreateIndexMsg) Size() int
func (*CreateIndexMsg) SourceID ¶
func (it *CreateIndexMsg) SourceID() int64
SourceID indicates which component generated this message
func (*CreateIndexMsg) Type ¶
func (it *CreateIndexMsg) Type() MsgType
Type returns the type of this message pack
func (*CreateIndexMsg) Unmarshal ¶
func (it *CreateIndexMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserialize a message pack from byte array
type CreatePartitionMsg ¶
type CreatePartitionMsg struct { BaseMsg *msgpb.CreatePartitionRequest }
CreatePartitionMsg is a message pack that contains create partition request
func (*CreatePartitionMsg) ID ¶
func (cp *CreatePartitionMsg) ID() UniqueID
ID returns the ID of this message pack
func (*CreatePartitionMsg) Marshal ¶
func (cp *CreatePartitionMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*CreatePartitionMsg) SetID ¶
func (cp *CreatePartitionMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*CreatePartitionMsg) Size ¶
func (cp *CreatePartitionMsg) Size() int
func (*CreatePartitionMsg) SourceID ¶
func (cp *CreatePartitionMsg) SourceID() int64
SourceID indicates which component generated this message
func (*CreatePartitionMsg) Type ¶
func (cp *CreatePartitionMsg) Type() MsgType
Type returns the type of this message pack
func (*CreatePartitionMsg) Unmarshal ¶
func (cp *CreatePartitionMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type DataNodeTtMsg ¶
type DataNodeTtMsg struct { BaseMsg *msgpb.DataNodeTtMsg }
DataNodeTtMsg is a message pack that contains datanode time tick
func (*DataNodeTtMsg) ID ¶
func (m *DataNodeTtMsg) ID() UniqueID
ID returns the ID of this message pack
func (*DataNodeTtMsg) Marshal ¶
func (m *DataNodeTtMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*DataNodeTtMsg) SetID ¶
func (m *DataNodeTtMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*DataNodeTtMsg) Size ¶
func (m *DataNodeTtMsg) Size() int
func (*DataNodeTtMsg) SourceID ¶
func (m *DataNodeTtMsg) SourceID() int64
SourceID indicates which component generated this message
func (*DataNodeTtMsg) Type ¶
func (m *DataNodeTtMsg) Type() MsgType
Type returns the type of this message pack
func (*DataNodeTtMsg) Unmarshal ¶
func (m *DataNodeTtMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type DeleteMsg ¶
type DeleteMsg struct { BaseMsg *msgpb.DeleteRequest }
DeleteMsg is a message pack that contains delete request
func (*DeleteMsg) CheckAligned ¶
func (*DeleteMsg) Marshal ¶
func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
type DropCollectionMsg ¶
type DropCollectionMsg struct { BaseMsg *msgpb.DropCollectionRequest }
DropCollectionMsg is a message pack that contains drop collection request
func (*DropCollectionMsg) ID ¶
func (dc *DropCollectionMsg) ID() UniqueID
ID returns the ID of this message pack
func (*DropCollectionMsg) Marshal ¶
func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*DropCollectionMsg) SetID ¶
func (dc *DropCollectionMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*DropCollectionMsg) Size ¶
func (dc *DropCollectionMsg) Size() int
func (*DropCollectionMsg) SourceID ¶
func (dc *DropCollectionMsg) SourceID() int64
SourceID indicates which component generated this message
func (*DropCollectionMsg) Type ¶
func (dc *DropCollectionMsg) Type() MsgType
Type returns the type of this message pack
func (*DropCollectionMsg) Unmarshal ¶
func (dc *DropCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type DropDatabaseMsg ¶
type DropDatabaseMsg struct { BaseMsg *milvuspb.DropDatabaseRequest }
func (*DropDatabaseMsg) ID ¶
func (d *DropDatabaseMsg) ID() UniqueID
func (*DropDatabaseMsg) Marshal ¶
func (d *DropDatabaseMsg) Marshal(input TsMsg) (MarshalType, error)
func (*DropDatabaseMsg) SetID ¶
func (d *DropDatabaseMsg) SetID(id UniqueID)
func (*DropDatabaseMsg) Size ¶
func (d *DropDatabaseMsg) Size() int
func (*DropDatabaseMsg) SourceID ¶
func (d *DropDatabaseMsg) SourceID() int64
func (*DropDatabaseMsg) Type ¶
func (d *DropDatabaseMsg) Type() MsgType
func (*DropDatabaseMsg) Unmarshal ¶
func (d *DropDatabaseMsg) Unmarshal(input MarshalType) (TsMsg, error)
type DropIndexMsg ¶
type DropIndexMsg struct { BaseMsg *milvuspb.DropIndexRequest }
DropIndexMsg is a message pack that contains drop index request
func (*DropIndexMsg) ID ¶
func (d *DropIndexMsg) ID() UniqueID
func (*DropIndexMsg) Marshal ¶
func (d *DropIndexMsg) Marshal(input TsMsg) (MarshalType, error)
func (*DropIndexMsg) SetID ¶
func (d *DropIndexMsg) SetID(id UniqueID)
func (*DropIndexMsg) Size ¶
func (d *DropIndexMsg) Size() int
func (*DropIndexMsg) SourceID ¶
func (d *DropIndexMsg) SourceID() int64
func (*DropIndexMsg) Type ¶
func (d *DropIndexMsg) Type() MsgType
func (*DropIndexMsg) Unmarshal ¶
func (d *DropIndexMsg) Unmarshal(input MarshalType) (TsMsg, error)
type DropPartitionMsg ¶
type DropPartitionMsg struct { BaseMsg *msgpb.DropPartitionRequest }
DropPartitionMsg is a message pack that contains drop partition request
func (*DropPartitionMsg) ID ¶
func (dp *DropPartitionMsg) ID() UniqueID
ID returns the ID of this message pack
func (*DropPartitionMsg) Marshal ¶
func (dp *DropPartitionMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*DropPartitionMsg) SetID ¶
func (dp *DropPartitionMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*DropPartitionMsg) Size ¶
func (dp *DropPartitionMsg) Size() int
func (*DropPartitionMsg) SourceID ¶
func (dp *DropPartitionMsg) SourceID() int64
SourceID indicates which component generated this message
func (*DropPartitionMsg) Type ¶
func (dp *DropPartitionMsg) Type() MsgType
Type returns the type of this message pack
func (*DropPartitionMsg) Unmarshal ¶
func (dp *DropPartitionMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type Factory ¶
type Factory interface { NewMsgStream(ctx context.Context) (MsgStream, error) NewTtMsgStream(ctx context.Context) (MsgStream, error) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error }
func NewKmsFactory ¶
func NewKmsFactory(config *paramtable.ServiceParam) Factory
func NewNatsmqFactory ¶
func NewNatsmqFactory() Factory
NewNatsmqFactory create a new nats-mq factory.
func NewRocksmqFactory ¶
func NewRocksmqFactory(path string, cfg *paramtable.ServiceParam) Factory
NewRocksmqFactory creates a new message stream factory based on rocksmq.
type FlushMsg ¶
type FlushMsg struct { BaseMsg *milvuspb.FlushRequest }
type InsertMsg ¶
type InsertMsg struct { BaseMsg *msgpb.InsertRequest }
InsertMsg is a message pack that contains insert request
func (*InsertMsg) CheckAligned ¶
func (*InsertMsg) IndexRequest ¶
func (it *InsertMsg) IndexRequest(index int) *msgpb.InsertRequest
func (*InsertMsg) IsColumnBased ¶
func (*InsertMsg) IsRowBased ¶
func (*InsertMsg) Marshal ¶
func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serialize a message pack to byte array
type IntPrimaryKey ¶
type IntPrimaryKey = typeutil.IntPrimaryKey
IntPrimaryKey is an alias for short
type KmsFactory ¶
type KmsFactory struct { ReceiveBufSize int64 MQBufSize int64 // contains filtered or unexported fields }
func (*KmsFactory) NewMsgStream ¶
func (f *KmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error)
func (*KmsFactory) NewMsgStreamDisposer ¶
func (*KmsFactory) NewTtMsgStream ¶
func (f *KmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error)
type LoadCollectionMsg ¶
type LoadCollectionMsg struct { BaseMsg *milvuspb.LoadCollectionRequest }
LoadCollectionMsg is a message pack that contains load collection request
func (*LoadCollectionMsg) ID ¶
func (l *LoadCollectionMsg) ID() UniqueID
func (*LoadCollectionMsg) Marshal ¶
func (l *LoadCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
func (*LoadCollectionMsg) SetID ¶
func (l *LoadCollectionMsg) SetID(id UniqueID)
func (*LoadCollectionMsg) Size ¶
func (l *LoadCollectionMsg) Size() int
func (*LoadCollectionMsg) SourceID ¶
func (l *LoadCollectionMsg) SourceID() int64
func (*LoadCollectionMsg) Type ¶
func (l *LoadCollectionMsg) Type() MsgType
func (*LoadCollectionMsg) Unmarshal ¶
func (l *LoadCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
type LoadPartitionsMsg ¶
type LoadPartitionsMsg struct { BaseMsg *milvuspb.LoadPartitionsRequest }
func (*LoadPartitionsMsg) ID ¶
func (l *LoadPartitionsMsg) ID() UniqueID
func (*LoadPartitionsMsg) Marshal ¶
func (l *LoadPartitionsMsg) Marshal(input TsMsg) (MarshalType, error)
func (*LoadPartitionsMsg) SetID ¶
func (l *LoadPartitionsMsg) SetID(id UniqueID)
func (*LoadPartitionsMsg) Size ¶
func (l *LoadPartitionsMsg) Size() int
func (*LoadPartitionsMsg) SourceID ¶
func (l *LoadPartitionsMsg) SourceID() int64
func (*LoadPartitionsMsg) Type ¶
func (l *LoadPartitionsMsg) Type() MsgType
func (*LoadPartitionsMsg) Unmarshal ¶
func (l *LoadPartitionsMsg) Unmarshal(input MarshalType) (TsMsg, error)
type MockFactory ¶
MockFactory is an autogenerated mock type for the Factory type
func NewMockFactory ¶
func NewMockFactory(t interface { mock.TestingT Cleanup(func()) }) *MockFactory
NewMockFactory creates a new instance of MockFactory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockFactory) EXPECT ¶
func (_m *MockFactory) EXPECT() *MockFactory_Expecter
func (*MockFactory) NewMsgStream ¶
func (_m *MockFactory) NewMsgStream(ctx context.Context) (MsgStream, error)
NewMsgStream provides a mock function with given fields: ctx
func (*MockFactory) NewMsgStreamDisposer ¶
NewMsgStreamDisposer provides a mock function with given fields: ctx
func (*MockFactory) NewTtMsgStream ¶
func (_m *MockFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error)
NewTtMsgStream provides a mock function with given fields: ctx
type MockFactory_Expecter ¶
type MockFactory_Expecter struct {
// contains filtered or unexported fields
}
func (*MockFactory_Expecter) NewMsgStream ¶
func (_e *MockFactory_Expecter) NewMsgStream(ctx interface{}) *MockFactory_NewMsgStream_Call
NewMsgStream is a helper method to define mock.On call
- ctx context.Context
func (*MockFactory_Expecter) NewMsgStreamDisposer ¶
func (_e *MockFactory_Expecter) NewMsgStreamDisposer(ctx interface{}) *MockFactory_NewMsgStreamDisposer_Call
NewMsgStreamDisposer is a helper method to define mock.On call
- ctx context.Context
func (*MockFactory_Expecter) NewTtMsgStream ¶
func (_e *MockFactory_Expecter) NewTtMsgStream(ctx interface{}) *MockFactory_NewTtMsgStream_Call
NewTtMsgStream is a helper method to define mock.On call
- ctx context.Context
type MockFactory_NewMsgStreamDisposer_Call ¶
MockFactory_NewMsgStreamDisposer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NewMsgStreamDisposer'
func (*MockFactory_NewMsgStreamDisposer_Call) Return ¶
func (_c *MockFactory_NewMsgStreamDisposer_Call) Return(_a0 func([]string, string) error) *MockFactory_NewMsgStreamDisposer_Call
func (*MockFactory_NewMsgStreamDisposer_Call) Run ¶
func (_c *MockFactory_NewMsgStreamDisposer_Call) Run(run func(ctx context.Context)) *MockFactory_NewMsgStreamDisposer_Call
func (*MockFactory_NewMsgStreamDisposer_Call) RunAndReturn ¶
func (_c *MockFactory_NewMsgStreamDisposer_Call) RunAndReturn(run func(context.Context) func([]string, string) error) *MockFactory_NewMsgStreamDisposer_Call
type MockFactory_NewMsgStream_Call ¶
MockFactory_NewMsgStream_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NewMsgStream'
func (*MockFactory_NewMsgStream_Call) Return ¶
func (_c *MockFactory_NewMsgStream_Call) Return(_a0 MsgStream, _a1 error) *MockFactory_NewMsgStream_Call
func (*MockFactory_NewMsgStream_Call) Run ¶
func (_c *MockFactory_NewMsgStream_Call) Run(run func(ctx context.Context)) *MockFactory_NewMsgStream_Call
func (*MockFactory_NewMsgStream_Call) RunAndReturn ¶
func (_c *MockFactory_NewMsgStream_Call) RunAndReturn(run func(context.Context) (MsgStream, error)) *MockFactory_NewMsgStream_Call
type MockFactory_NewTtMsgStream_Call ¶
MockFactory_NewTtMsgStream_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NewTtMsgStream'
func (*MockFactory_NewTtMsgStream_Call) Return ¶
func (_c *MockFactory_NewTtMsgStream_Call) Return(_a0 MsgStream, _a1 error) *MockFactory_NewTtMsgStream_Call
func (*MockFactory_NewTtMsgStream_Call) Run ¶
func (_c *MockFactory_NewTtMsgStream_Call) Run(run func(ctx context.Context)) *MockFactory_NewTtMsgStream_Call
func (*MockFactory_NewTtMsgStream_Call) RunAndReturn ¶
func (_c *MockFactory_NewTtMsgStream_Call) RunAndReturn(run func(context.Context) (MsgStream, error)) *MockFactory_NewTtMsgStream_Call
type MockMqFactory ¶
func NewMockMqFactory ¶
func NewMockMqFactory() *MockMqFactory
func (MockMqFactory) NewMsgStream ¶
func (m MockMqFactory) NewMsgStream(ctx context.Context) (MsgStream, error)
func (MockMqFactory) NewTtMsgStream ¶
func (m MockMqFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error)
type MockMsgStream ¶
MockMsgStream is an autogenerated mock type for the MsgStream type
func NewMockMsgStream ¶
func NewMockMsgStream(t interface { mock.TestingT Cleanup(func()) }) *MockMsgStream
NewMockMsgStream creates a new instance of MockMsgStream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockMsgStream) AsConsumer ¶
func (_m *MockMsgStream) AsConsumer(ctx context.Context, channels []string, subName string, position common.SubscriptionInitialPosition) error
AsConsumer provides a mock function with given fields: ctx, channels, subName, position
func (*MockMsgStream) AsProducer ¶
func (_m *MockMsgStream) AsProducer(channels []string)
AsProducer provides a mock function with given fields: channels
func (*MockMsgStream) Chan ¶
func (_m *MockMsgStream) Chan() <-chan *MsgPack
Chan provides a mock function with given fields:
func (*MockMsgStream) CheckTopicValid ¶
func (_m *MockMsgStream) CheckTopicValid(channel string) error
CheckTopicValid provides a mock function with given fields: channel
func (*MockMsgStream) Close ¶
func (_m *MockMsgStream) Close()
Close provides a mock function with given fields:
func (*MockMsgStream) EXPECT ¶
func (_m *MockMsgStream) EXPECT() *MockMsgStream_Expecter
func (*MockMsgStream) EnableProduce ¶
func (_m *MockMsgStream) EnableProduce(can bool)
EnableProduce provides a mock function with given fields: can
func (*MockMsgStream) GetLatestMsgID ¶
func (_m *MockMsgStream) GetLatestMsgID(channel string) (common.MessageID, error)
GetLatestMsgID provides a mock function with given fields: channel
func (*MockMsgStream) GetProduceChannels ¶
func (_m *MockMsgStream) GetProduceChannels() []string
GetProduceChannels provides a mock function with given fields:
func (*MockMsgStream) Produce ¶
func (_m *MockMsgStream) Produce(_a0 *MsgPack) error
Produce provides a mock function with given fields: _a0
func (*MockMsgStream) Seek ¶
func (_m *MockMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosition, includeCurrentMsg bool) error
Seek provides a mock function with given fields: ctx, msgPositions, includeCurrentMsg
func (*MockMsgStream) SetRepackFunc ¶
func (_m *MockMsgStream) SetRepackFunc(repackFunc RepackFunc)
SetRepackFunc provides a mock function with given fields: repackFunc
type MockMsgStream_AsConsumer_Call ¶
MockMsgStream_AsConsumer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AsConsumer'
func (*MockMsgStream_AsConsumer_Call) Return ¶
func (_c *MockMsgStream_AsConsumer_Call) Return(_a0 error) *MockMsgStream_AsConsumer_Call
func (*MockMsgStream_AsConsumer_Call) Run ¶
func (_c *MockMsgStream_AsConsumer_Call) Run(run func(ctx context.Context, channels []string, subName string, position common.SubscriptionInitialPosition)) *MockMsgStream_AsConsumer_Call
func (*MockMsgStream_AsConsumer_Call) RunAndReturn ¶
func (_c *MockMsgStream_AsConsumer_Call) RunAndReturn(run func(context.Context, []string, string, common.SubscriptionInitialPosition) error) *MockMsgStream_AsConsumer_Call
type MockMsgStream_AsProducer_Call ¶
MockMsgStream_AsProducer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AsProducer'
func (*MockMsgStream_AsProducer_Call) Return ¶
func (_c *MockMsgStream_AsProducer_Call) Return() *MockMsgStream_AsProducer_Call
func (*MockMsgStream_AsProducer_Call) Run ¶
func (_c *MockMsgStream_AsProducer_Call) Run(run func(channels []string)) *MockMsgStream_AsProducer_Call
func (*MockMsgStream_AsProducer_Call) RunAndReturn ¶
func (_c *MockMsgStream_AsProducer_Call) RunAndReturn(run func([]string)) *MockMsgStream_AsProducer_Call
type MockMsgStream_Broadcast_Call ¶
MockMsgStream_Broadcast_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Broadcast'
func (*MockMsgStream_Broadcast_Call) Return ¶
func (_c *MockMsgStream_Broadcast_Call) Return(_a0 map[string][]common.MessageID, _a1 error) *MockMsgStream_Broadcast_Call
func (*MockMsgStream_Broadcast_Call) Run ¶
func (_c *MockMsgStream_Broadcast_Call) Run(run func(_a0 *MsgPack)) *MockMsgStream_Broadcast_Call
func (*MockMsgStream_Broadcast_Call) RunAndReturn ¶
func (_c *MockMsgStream_Broadcast_Call) RunAndReturn(run func(*MsgPack) (map[string][]common.MessageID, error)) *MockMsgStream_Broadcast_Call
type MockMsgStream_Chan_Call ¶
MockMsgStream_Chan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Chan'
func (*MockMsgStream_Chan_Call) Return ¶
func (_c *MockMsgStream_Chan_Call) Return(_a0 <-chan *MsgPack) *MockMsgStream_Chan_Call
func (*MockMsgStream_Chan_Call) Run ¶
func (_c *MockMsgStream_Chan_Call) Run(run func()) *MockMsgStream_Chan_Call
func (*MockMsgStream_Chan_Call) RunAndReturn ¶
func (_c *MockMsgStream_Chan_Call) RunAndReturn(run func() <-chan *MsgPack) *MockMsgStream_Chan_Call
type MockMsgStream_CheckTopicValid_Call ¶
MockMsgStream_CheckTopicValid_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckTopicValid'
func (*MockMsgStream_CheckTopicValid_Call) Return ¶
func (_c *MockMsgStream_CheckTopicValid_Call) Return(_a0 error) *MockMsgStream_CheckTopicValid_Call
func (*MockMsgStream_CheckTopicValid_Call) Run ¶
func (_c *MockMsgStream_CheckTopicValid_Call) Run(run func(channel string)) *MockMsgStream_CheckTopicValid_Call
func (*MockMsgStream_CheckTopicValid_Call) RunAndReturn ¶
func (_c *MockMsgStream_CheckTopicValid_Call) RunAndReturn(run func(string) error) *MockMsgStream_CheckTopicValid_Call
type MockMsgStream_Close_Call ¶
MockMsgStream_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
func (*MockMsgStream_Close_Call) Return ¶
func (_c *MockMsgStream_Close_Call) Return() *MockMsgStream_Close_Call
func (*MockMsgStream_Close_Call) Run ¶
func (_c *MockMsgStream_Close_Call) Run(run func()) *MockMsgStream_Close_Call
func (*MockMsgStream_Close_Call) RunAndReturn ¶
func (_c *MockMsgStream_Close_Call) RunAndReturn(run func()) *MockMsgStream_Close_Call
type MockMsgStream_EnableProduce_Call ¶
MockMsgStream_EnableProduce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EnableProduce'
func (*MockMsgStream_EnableProduce_Call) Return ¶
func (_c *MockMsgStream_EnableProduce_Call) Return() *MockMsgStream_EnableProduce_Call
func (*MockMsgStream_EnableProduce_Call) Run ¶
func (_c *MockMsgStream_EnableProduce_Call) Run(run func(can bool)) *MockMsgStream_EnableProduce_Call
func (*MockMsgStream_EnableProduce_Call) RunAndReturn ¶
func (_c *MockMsgStream_EnableProduce_Call) RunAndReturn(run func(bool)) *MockMsgStream_EnableProduce_Call
type MockMsgStream_Expecter ¶
type MockMsgStream_Expecter struct {
// contains filtered or unexported fields
}
func (*MockMsgStream_Expecter) AsConsumer ¶
func (_e *MockMsgStream_Expecter) AsConsumer(ctx interface{}, channels interface{}, subName interface{}, position interface{}) *MockMsgStream_AsConsumer_Call
AsConsumer is a helper method to define mock.On call
- ctx context.Context
- channels []string
- subName string
- position common.SubscriptionInitialPosition
func (*MockMsgStream_Expecter) AsProducer ¶
func (_e *MockMsgStream_Expecter) AsProducer(channels interface{}) *MockMsgStream_AsProducer_Call
AsProducer is a helper method to define mock.On call
- channels []string
func (*MockMsgStream_Expecter) Broadcast ¶
func (_e *MockMsgStream_Expecter) Broadcast(_a0 interface{}) *MockMsgStream_Broadcast_Call
Broadcast is a helper method to define mock.On call
- _a0 *MsgPack
func (*MockMsgStream_Expecter) Chan ¶
func (_e *MockMsgStream_Expecter) Chan() *MockMsgStream_Chan_Call
Chan is a helper method to define mock.On call
func (*MockMsgStream_Expecter) CheckTopicValid ¶
func (_e *MockMsgStream_Expecter) CheckTopicValid(channel interface{}) *MockMsgStream_CheckTopicValid_Call
CheckTopicValid is a helper method to define mock.On call
- channel string
func (*MockMsgStream_Expecter) Close ¶
func (_e *MockMsgStream_Expecter) Close() *MockMsgStream_Close_Call
Close is a helper method to define mock.On call
func (*MockMsgStream_Expecter) EnableProduce ¶
func (_e *MockMsgStream_Expecter) EnableProduce(can interface{}) *MockMsgStream_EnableProduce_Call
EnableProduce is a helper method to define mock.On call
- can bool
func (*MockMsgStream_Expecter) GetLatestMsgID ¶
func (_e *MockMsgStream_Expecter) GetLatestMsgID(channel interface{}) *MockMsgStream_GetLatestMsgID_Call
GetLatestMsgID is a helper method to define mock.On call
- channel string
func (*MockMsgStream_Expecter) GetProduceChannels ¶
func (_e *MockMsgStream_Expecter) GetProduceChannels() *MockMsgStream_GetProduceChannels_Call
GetProduceChannels is a helper method to define mock.On call
func (*MockMsgStream_Expecter) Produce ¶
func (_e *MockMsgStream_Expecter) Produce(_a0 interface{}) *MockMsgStream_Produce_Call
Produce is a helper method to define mock.On call
- _a0 *MsgPack
func (*MockMsgStream_Expecter) Seek ¶
func (_e *MockMsgStream_Expecter) Seek(ctx interface{}, msgPositions interface{}, includeCurrentMsg interface{}) *MockMsgStream_Seek_Call
Seek is a helper method to define mock.On call
- ctx context.Context
- msgPositions []*msgpb.MsgPosition
- includeCurrentMsg bool
func (*MockMsgStream_Expecter) SetRepackFunc ¶
func (_e *MockMsgStream_Expecter) SetRepackFunc(repackFunc interface{}) *MockMsgStream_SetRepackFunc_Call
SetRepackFunc is a helper method to define mock.On call
- repackFunc RepackFunc
type MockMsgStream_GetLatestMsgID_Call ¶
MockMsgStream_GetLatestMsgID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestMsgID'
func (*MockMsgStream_GetLatestMsgID_Call) Return ¶
func (_c *MockMsgStream_GetLatestMsgID_Call) Return(_a0 common.MessageID, _a1 error) *MockMsgStream_GetLatestMsgID_Call
func (*MockMsgStream_GetLatestMsgID_Call) Run ¶
func (_c *MockMsgStream_GetLatestMsgID_Call) Run(run func(channel string)) *MockMsgStream_GetLatestMsgID_Call
func (*MockMsgStream_GetLatestMsgID_Call) RunAndReturn ¶
func (_c *MockMsgStream_GetLatestMsgID_Call) RunAndReturn(run func(string) (common.MessageID, error)) *MockMsgStream_GetLatestMsgID_Call
type MockMsgStream_GetProduceChannels_Call ¶
MockMsgStream_GetProduceChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetProduceChannels'
func (*MockMsgStream_GetProduceChannels_Call) Return ¶
func (_c *MockMsgStream_GetProduceChannels_Call) Return(_a0 []string) *MockMsgStream_GetProduceChannels_Call
func (*MockMsgStream_GetProduceChannels_Call) Run ¶
func (_c *MockMsgStream_GetProduceChannels_Call) Run(run func()) *MockMsgStream_GetProduceChannels_Call
func (*MockMsgStream_GetProduceChannels_Call) RunAndReturn ¶
func (_c *MockMsgStream_GetProduceChannels_Call) RunAndReturn(run func() []string) *MockMsgStream_GetProduceChannels_Call
type MockMsgStream_Produce_Call ¶
MockMsgStream_Produce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Produce'
func (*MockMsgStream_Produce_Call) Return ¶
func (_c *MockMsgStream_Produce_Call) Return(_a0 error) *MockMsgStream_Produce_Call
func (*MockMsgStream_Produce_Call) Run ¶
func (_c *MockMsgStream_Produce_Call) Run(run func(_a0 *MsgPack)) *MockMsgStream_Produce_Call
func (*MockMsgStream_Produce_Call) RunAndReturn ¶
func (_c *MockMsgStream_Produce_Call) RunAndReturn(run func(*MsgPack) error) *MockMsgStream_Produce_Call
type MockMsgStream_Seek_Call ¶
MockMsgStream_Seek_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Seek'
func (*MockMsgStream_Seek_Call) Return ¶
func (_c *MockMsgStream_Seek_Call) Return(_a0 error) *MockMsgStream_Seek_Call
func (*MockMsgStream_Seek_Call) Run ¶
func (_c *MockMsgStream_Seek_Call) Run(run func(ctx context.Context, msgPositions []*msgpb.MsgPosition, includeCurrentMsg bool)) *MockMsgStream_Seek_Call
func (*MockMsgStream_Seek_Call) RunAndReturn ¶
func (_c *MockMsgStream_Seek_Call) RunAndReturn(run func(context.Context, []*msgpb.MsgPosition, bool) error) *MockMsgStream_Seek_Call
type MockMsgStream_SetRepackFunc_Call ¶
MockMsgStream_SetRepackFunc_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetRepackFunc'
func (*MockMsgStream_SetRepackFunc_Call) Return ¶
func (_c *MockMsgStream_SetRepackFunc_Call) Return() *MockMsgStream_SetRepackFunc_Call
func (*MockMsgStream_SetRepackFunc_Call) Run ¶
func (_c *MockMsgStream_SetRepackFunc_Call) Run(run func(repackFunc RepackFunc)) *MockMsgStream_SetRepackFunc_Call
func (*MockMsgStream_SetRepackFunc_Call) RunAndReturn ¶
func (_c *MockMsgStream_SetRepackFunc_Call) RunAndReturn(run func(RepackFunc)) *MockMsgStream_SetRepackFunc_Call
type MqTtMsgStream ¶
type MqTtMsgStream struct {
// contains filtered or unexported fields
}
MqTtMsgStream is a msgstream that contains timeticks
func NewMqTtMsgStream ¶
func NewMqTtMsgStream(ctx context.Context, receiveBufSize int64, bufSize int64, client mqwrapper.Client, unmarshal UnmarshalDispatcher, ) (*MqTtMsgStream, error)
NewMqTtMsgStream is used to generate a new MqTtMsgStream object
func (*MqTtMsgStream) AsConsumer ¶
func (ms *MqTtMsgStream) AsConsumer(ctx context.Context, channels []string, subName string, position common.SubscriptionInitialPosition) error
AsConsumerWithPosition subscribes channels as consumer for a MsgStream and seeks to a certain position.
func (MqTtMsgStream) AsProducer ¶
func (ms MqTtMsgStream) AsProducer(channels []string)
AsProducer create producer to send message to channels
func (MqTtMsgStream) Broadcast ¶
BroadcastMark broadcast msg pack to all producers and returns corresponding msg id the returned message id serves as marking
func (*MqTtMsgStream) Chan ¶
func (ms *MqTtMsgStream) Chan() <-chan *MsgPack
func (MqTtMsgStream) CheckTopicValid ¶
func (*MqTtMsgStream) Close ¶
func (ms *MqTtMsgStream) Close()
Close will stop goroutine and free internal producers and consumers
func (MqTtMsgStream) ComputeProduceChannelIndexes ¶
func (MqTtMsgStream) EnableProduce ¶
func (ms MqTtMsgStream) EnableProduce(can bool)
func (MqTtMsgStream) GetLatestMsgID ¶
func (MqTtMsgStream) GetProduceChannels ¶
func (ms MqTtMsgStream) GetProduceChannels() []string
func (*MqTtMsgStream) Seek ¶
func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*MsgPosition, includeCurrentMsg bool) error
Seek to the specified position
func (MqTtMsgStream) SetRepackFunc ¶
func (ms MqTtMsgStream) SetRepackFunc(repackFunc RepackFunc)
type MsgPack ¶
type MsgPack struct { BeginTs Timestamp EndTs Timestamp Msgs []TsMsg StartPositions []*MsgPosition EndPositions []*MsgPosition }
MsgPack represents a batch of msg in msgstream
type MsgStream ¶
type MsgStream interface { Close() AsProducer(channels []string) Produce(*MsgPack) error SetRepackFunc(repackFunc RepackFunc) GetProduceChannels() []string Broadcast(*MsgPack) (map[string][]MessageID, error) AsConsumer(ctx context.Context, channels []string, subName string, position common.SubscriptionInitialPosition) error Chan() <-chan *MsgPack // Seek consume message from the specified position // includeCurrentMsg indicates whether to consume the current message, and in the milvus system, it should be always false Seek(ctx context.Context, msgPositions []*MsgPosition, includeCurrentMsg bool) error GetLatestMsgID(channel string) (MessageID, error) CheckTopicValid(channel string) error EnableProduce(can bool) }
MsgStream is an interface that can be used to produce and consume message on message queue
type PmsFactory ¶
type PmsFactory struct { // the following members must be public, so that mapstructure.Decode() can access them PulsarAddress string PulsarWebAddress string ReceiveBufSize int64 MQBufSize int64 PulsarAuthPlugin string PulsarAuthParams string PulsarTenant string PulsarNameSpace string RequestTimeout time.Duration // contains filtered or unexported fields }
PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
func NewPmsFactory ¶
func NewPmsFactory(serviceParam *paramtable.ServiceParam) *PmsFactory
func (*PmsFactory) NewMsgStream ¶
func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error)
NewMsgStream is used to generate a new Msgstream object
func (*PmsFactory) NewMsgStreamDisposer ¶
func (*PmsFactory) NewTtMsgStream ¶
func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error)
NewTtMsgStream is used to generate a new TtMsgstream object
type ProtoUDFactory ¶
type ProtoUDFactory struct{}
ProtoUDFactory is a factory to generate ProtoUnmarshalDispatcher object
func (*ProtoUDFactory) NewUnmarshalDispatcher ¶
func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher
NewUnmarshalDispatcher returns a new UnmarshalDispatcher
type ProtoUnmarshalDispatcher ¶
type ProtoUnmarshalDispatcher struct {
TempMap map[commonpb.MsgType]UnmarshalFunc
}
ProtoUnmarshalDispatcher is Unmarshal Dispatcher which used for data of proto type
type ReleaseCollectionMsg ¶
type ReleaseCollectionMsg struct { BaseMsg *milvuspb.ReleaseCollectionRequest }
ReleaseCollectionMsg is a message pack that contains release collection request
func (*ReleaseCollectionMsg) ID ¶
func (r *ReleaseCollectionMsg) ID() UniqueID
func (*ReleaseCollectionMsg) Marshal ¶
func (r *ReleaseCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
func (*ReleaseCollectionMsg) SetID ¶
func (r *ReleaseCollectionMsg) SetID(id UniqueID)
func (*ReleaseCollectionMsg) Size ¶
func (r *ReleaseCollectionMsg) Size() int
func (*ReleaseCollectionMsg) SourceID ¶
func (r *ReleaseCollectionMsg) SourceID() int64
func (*ReleaseCollectionMsg) Type ¶
func (r *ReleaseCollectionMsg) Type() MsgType
func (*ReleaseCollectionMsg) Unmarshal ¶
func (r *ReleaseCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
type ReleasePartitionsMsg ¶
type ReleasePartitionsMsg struct { BaseMsg *milvuspb.ReleasePartitionsRequest }
func (*ReleasePartitionsMsg) ID ¶
func (r *ReleasePartitionsMsg) ID() UniqueID
func (*ReleasePartitionsMsg) Marshal ¶
func (r *ReleasePartitionsMsg) Marshal(input TsMsg) (MarshalType, error)
func (*ReleasePartitionsMsg) SetID ¶
func (r *ReleasePartitionsMsg) SetID(id UniqueID)
func (*ReleasePartitionsMsg) Size ¶
func (r *ReleasePartitionsMsg) Size() int
func (*ReleasePartitionsMsg) SourceID ¶
func (r *ReleasePartitionsMsg) SourceID() int64
func (*ReleasePartitionsMsg) Type ¶
func (r *ReleasePartitionsMsg) Type() MsgType
func (*ReleasePartitionsMsg) Unmarshal ¶
func (r *ReleasePartitionsMsg) Unmarshal(input MarshalType) (TsMsg, error)
type RepackFunc ¶
RepackFunc is a function type which used to repack message after hash by primary key
type TimeTickMsg ¶
type TimeTickMsg struct { BaseMsg *msgpb.TimeTickMsg }
TimeTickMsg is a message pack that contains time tick only
func (*TimeTickMsg) ID ¶
func (tst *TimeTickMsg) ID() UniqueID
ID returns the ID of this message pack
func (*TimeTickMsg) Marshal ¶
func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*TimeTickMsg) SetID ¶
func (tst *TimeTickMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*TimeTickMsg) Size ¶
func (tst *TimeTickMsg) Size() int
func (*TimeTickMsg) SourceID ¶
func (tst *TimeTickMsg) SourceID() int64
SourceID indicates which component generated this message
func (*TimeTickMsg) Type ¶
func (tst *TimeTickMsg) Type() MsgType
Type returns the type of this message pack
func (*TimeTickMsg) Unmarshal ¶
func (tst *TimeTickMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type TsMsg ¶
type TsMsg interface { TraceCtx() context.Context SetTraceCtx(ctx context.Context) ID() UniqueID SetID(id UniqueID) BeginTs() Timestamp EndTs() Timestamp Type() MsgType SourceID() int64 HashKeys() []uint32 Marshal(TsMsg) (MarshalType, error) Unmarshal(MarshalType) (TsMsg, error) Position() *MsgPosition SetPosition(*MsgPosition) SetTs(ts uint64) Size() int }
TsMsg provides methods to get begin timestamp and end timestamp of a message pack
func GetTsMsgFromConsumerMsg ¶
func GetTsMsgFromConsumerMsg(unmarshalDispatcher UnmarshalDispatcher, msg common.Message) (TsMsg, error)
GetTsMsgFromConsumerMsg get TsMsg from consumer message
type UnmarshalDispatcher ¶
type UnmarshalDispatcher interface {
Unmarshal(input interface{}, msgType commonpb.MsgType) (TsMsg, error)
}
UnmarshalDispatcher is an interface contains method Unmarshal
type UnmarshalDispatcherFactory ¶
type UnmarshalDispatcherFactory interface {
NewUnmarshalDispatcher() *UnmarshalDispatcher
}
UnmarshalDispatcherFactory is a factory to generate an object which implement interface UnmarshalDispatcher
type UnmarshalFunc ¶
UnmarshalFunc is an interface that has been implemented by each Msg
type UpsertMsg ¶
///////////////////////////////////////Upsert//////////////////////////////////////////
type WastedMockMsgStream ¶
type WastedMockMsgStream struct { MsgStream AsProducerFunc func(channels []string) BroadcastMarkFunc func(*MsgPack) (map[string][]MessageID, error) BroadcastFunc func(*MsgPack) error ChanFunc func() <-chan *MsgPack }
func NewWastedMockMsgStream ¶
func NewWastedMockMsgStream() *WastedMockMsgStream
func (WastedMockMsgStream) AsProducer ¶
func (m WastedMockMsgStream) AsProducer(channels []string)
func (WastedMockMsgStream) Broadcast ¶
func (m WastedMockMsgStream) Broadcast(pack *MsgPack) (map[string][]MessageID, error)
func (WastedMockMsgStream) Chan ¶
func (m WastedMockMsgStream) Chan() <-chan *MsgPack