Documentation ¶
Index ¶
- func DefaultRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)
- func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)
- func ExtractCtx(msg TsMsg, properties map[string]string) (context.Context, trace.Span)
- func InjectCtx(sc context.Context, properties map[string]string)
- func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)
- func MsgSpanFromCtx(ctx context.Context, msg TsMsg) (context.Context, trace.Span)
- func NewMqMsgStream(ctx context.Context, receiveBufSize int64, bufSize int64, ...) (*mqMsgStream, error)
- func UnsubscribeChannels(ctx context.Context, factory Factory, subName string, channels []string)
- type BaseMsg
- func (bm *BaseMsg) BeginTs() Timestamp
- func (bm *BaseMsg) EndTs() Timestamp
- func (bm *BaseMsg) HashKeys() []uint32
- func (bm *BaseMsg) Position() *MsgPosition
- func (bm *BaseMsg) SetPosition(position *MsgPosition)
- func (bm *BaseMsg) SetTraceCtx(ctx context.Context)
- func (bm *BaseMsg) TraceCtx() context.Context
- type CommonFactory
- func (f *CommonFactory) NewMsgStream(ctx context.Context) (ms MsgStream, err error)
- func (f *CommonFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
- func (f *CommonFactory) NewQueryMsgStream(ctx context.Context) (ms MsgStream, err error)
- func (f *CommonFactory) NewTtMsgStream(ctx context.Context) (ms MsgStream, err error)
- type CreateCollectionMsg
- func (cc *CreateCollectionMsg) ID() UniqueID
- func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
- func (cc *CreateCollectionMsg) SetID(id UniqueID)
- func (cc *CreateCollectionMsg) SourceID() int64
- func (cc *CreateCollectionMsg) Type() MsgType
- func (cc *CreateCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type CreatePartitionMsg
- func (cp *CreatePartitionMsg) ID() UniqueID
- func (cp *CreatePartitionMsg) Marshal(input TsMsg) (MarshalType, error)
- func (cp *CreatePartitionMsg) SetID(id UniqueID)
- func (cp *CreatePartitionMsg) SourceID() int64
- func (cp *CreatePartitionMsg) Type() MsgType
- func (cp *CreatePartitionMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type DataNodeTtMsg
- type DeleteMsg
- func (dt *DeleteMsg) CheckAligned() error
- func (dt *DeleteMsg) ID() UniqueID
- func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error)
- func (dt *DeleteMsg) SetID(id UniqueID)
- func (dt *DeleteMsg) SourceID() int64
- func (dt *DeleteMsg) Type() MsgType
- func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type DropCollectionMsg
- func (dc *DropCollectionMsg) ID() UniqueID
- func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
- func (dc *DropCollectionMsg) SetID(id UniqueID)
- func (dc *DropCollectionMsg) SourceID() int64
- func (dc *DropCollectionMsg) Type() MsgType
- func (dc *DropCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type DropPartitionMsg
- func (dp *DropPartitionMsg) ID() UniqueID
- func (dp *DropPartitionMsg) Marshal(input TsMsg) (MarshalType, error)
- func (dp *DropPartitionMsg) SetID(id UniqueID)
- func (dp *DropPartitionMsg) SourceID() int64
- func (dp *DropPartitionMsg) Type() MsgType
- func (dp *DropPartitionMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type Factory
- type InsertMsg
- func (it *InsertMsg) CheckAligned() error
- func (it *InsertMsg) ID() UniqueID
- func (it *InsertMsg) IndexMsg(index int) *InsertMsg
- func (it *InsertMsg) IndexRequest(index int) msgpb.InsertRequest
- func (it *InsertMsg) IsColumnBased() bool
- func (it *InsertMsg) IsRowBased() bool
- func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error)
- func (it *InsertMsg) NRows() uint64
- func (it *InsertMsg) SetID(id UniqueID)
- func (it *InsertMsg) SourceID() int64
- func (it *InsertMsg) Type() MsgType
- func (it *InsertMsg) Unmarshal(input MarshalType) (TsMsg, error)
- type IntPrimaryKey
- type KmsFactory
- func (f *KmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error)
- func (f *KmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
- func (f *KmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error)
- func (f *KmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error)
- type MarshalType
- type MessageID
- type MockMqFactory
- type MockMsgStream
- func (_m *MockMsgStream) AsConsumer(channels []string, subName string, ...)
- func (_m *MockMsgStream) AsProducer(channels []string)
- func (_m *MockMsgStream) Broadcast(_a0 *MsgPack) (map[string][]mqwrapper.MessageID, error)
- func (_m *MockMsgStream) Chan() <-chan *MsgPack
- func (_m *MockMsgStream) CheckTopicValid(channel string) error
- func (_m *MockMsgStream) Close()
- func (_m *MockMsgStream) EXPECT() *MockMsgStream_Expecter
- func (_m *MockMsgStream) GetLatestMsgID(channel string) (mqwrapper.MessageID, error)
- func (_m *MockMsgStream) GetProduceChannels() []string
- func (_m *MockMsgStream) Produce(_a0 *MsgPack) error
- func (_m *MockMsgStream) Seek(offset []*msgpb.MsgPosition) error
- func (_m *MockMsgStream) SetRepackFunc(repackFunc RepackFunc)
- type MockMsgStream_AsConsumer_Call
- func (_c *MockMsgStream_AsConsumer_Call) Return() *MockMsgStream_AsConsumer_Call
- func (_c *MockMsgStream_AsConsumer_Call) Run(run func(channels []string, subName string, ...)) *MockMsgStream_AsConsumer_Call
- func (_c *MockMsgStream_AsConsumer_Call) RunAndReturn(run func([]string, string, mqwrapper.SubscriptionInitialPosition)) *MockMsgStream_AsConsumer_Call
- type MockMsgStream_AsProducer_Call
- type MockMsgStream_Broadcast_Call
- func (_c *MockMsgStream_Broadcast_Call) Return(_a0 map[string][]mqwrapper.MessageID, _a1 error) *MockMsgStream_Broadcast_Call
- func (_c *MockMsgStream_Broadcast_Call) Run(run func(_a0 *MsgPack)) *MockMsgStream_Broadcast_Call
- func (_c *MockMsgStream_Broadcast_Call) RunAndReturn(run func(*MsgPack) (map[string][]mqwrapper.MessageID, error)) *MockMsgStream_Broadcast_Call
- type MockMsgStream_Chan_Call
- type MockMsgStream_CheckTopicValid_Call
- func (_c *MockMsgStream_CheckTopicValid_Call) Return(_a0 error) *MockMsgStream_CheckTopicValid_Call
- func (_c *MockMsgStream_CheckTopicValid_Call) Run(run func(channel string)) *MockMsgStream_CheckTopicValid_Call
- func (_c *MockMsgStream_CheckTopicValid_Call) RunAndReturn(run func(string) error) *MockMsgStream_CheckTopicValid_Call
- type MockMsgStream_Close_Call
- type MockMsgStream_Expecter
- func (_e *MockMsgStream_Expecter) AsConsumer(channels interface{}, subName interface{}, position interface{}) *MockMsgStream_AsConsumer_Call
- func (_e *MockMsgStream_Expecter) AsProducer(channels interface{}) *MockMsgStream_AsProducer_Call
- func (_e *MockMsgStream_Expecter) Broadcast(_a0 interface{}) *MockMsgStream_Broadcast_Call
- func (_e *MockMsgStream_Expecter) Chan() *MockMsgStream_Chan_Call
- func (_e *MockMsgStream_Expecter) CheckTopicValid(channel interface{}) *MockMsgStream_CheckTopicValid_Call
- func (_e *MockMsgStream_Expecter) Close() *MockMsgStream_Close_Call
- func (_e *MockMsgStream_Expecter) GetLatestMsgID(channel interface{}) *MockMsgStream_GetLatestMsgID_Call
- func (_e *MockMsgStream_Expecter) GetProduceChannels() *MockMsgStream_GetProduceChannels_Call
- func (_e *MockMsgStream_Expecter) Produce(_a0 interface{}) *MockMsgStream_Produce_Call
- func (_e *MockMsgStream_Expecter) Seek(offset interface{}) *MockMsgStream_Seek_Call
- func (_e *MockMsgStream_Expecter) SetRepackFunc(repackFunc interface{}) *MockMsgStream_SetRepackFunc_Call
- type MockMsgStream_GetLatestMsgID_Call
- func (_c *MockMsgStream_GetLatestMsgID_Call) Return(_a0 mqwrapper.MessageID, _a1 error) *MockMsgStream_GetLatestMsgID_Call
- func (_c *MockMsgStream_GetLatestMsgID_Call) Run(run func(channel string)) *MockMsgStream_GetLatestMsgID_Call
- func (_c *MockMsgStream_GetLatestMsgID_Call) RunAndReturn(run func(string) (mqwrapper.MessageID, error)) *MockMsgStream_GetLatestMsgID_Call
- type MockMsgStream_GetProduceChannels_Call
- func (_c *MockMsgStream_GetProduceChannels_Call) Return(_a0 []string) *MockMsgStream_GetProduceChannels_Call
- func (_c *MockMsgStream_GetProduceChannels_Call) Run(run func()) *MockMsgStream_GetProduceChannels_Call
- func (_c *MockMsgStream_GetProduceChannels_Call) RunAndReturn(run func() []string) *MockMsgStream_GetProduceChannels_Call
- type MockMsgStream_Produce_Call
- type MockMsgStream_Seek_Call
- type MockMsgStream_SetRepackFunc_Call
- func (_c *MockMsgStream_SetRepackFunc_Call) Return() *MockMsgStream_SetRepackFunc_Call
- func (_c *MockMsgStream_SetRepackFunc_Call) Run(run func(repackFunc RepackFunc)) *MockMsgStream_SetRepackFunc_Call
- func (_c *MockMsgStream_SetRepackFunc_Call) RunAndReturn(run func(RepackFunc)) *MockMsgStream_SetRepackFunc_Call
- type MqTtMsgStream
- func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string, ...)
- func (ms MqTtMsgStream) AsProducer(channels []string)
- func (ms MqTtMsgStream) Broadcast(msgPack *MsgPack) (map[string][]MessageID, error)
- func (ms *MqTtMsgStream) Chan() <-chan *MsgPack
- func (ms MqTtMsgStream) CheckTopicValid(channel string) error
- func (ms *MqTtMsgStream) Close()
- func (ms MqTtMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32
- func (ms MqTtMsgStream) GetLatestMsgID(channel string) (MessageID, error)
- func (ms MqTtMsgStream) GetProduceChannels() []string
- func (ms MqTtMsgStream) Produce(msgPack *MsgPack) error
- func (ms *MqTtMsgStream) Seek(msgPositions []*msgpb.MsgPosition) error
- func (ms MqTtMsgStream) SetRepackFunc(repackFunc RepackFunc)
- type MsgPack
- type MsgPosition
- type MsgStream
- type MsgType
- type PmsFactory
- func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error)
- func (f *PmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
- func (f *PmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error)
- func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error)
- type ProtoUDFactory
- type ProtoUnmarshalDispatcher
- type RepackFunc
- type TimeTickMsg
- type Timestamp
- type TsMsg
- type UniqueID
- type UnmarshalDispatcher
- type UnmarshalDispatcherFactory
- type UnmarshalFunc
- type UpsertMsg
- type WastedMockMsgStream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultRepackFunc ¶
DefaultRepackFunc is used to repack messages after hash by primary key
func DeleteRepackFunc ¶
DeleteRepackFunc is used to repack messages after hash by primary key
func ExtractCtx ¶
ExtractCtx extracts trace span from msg.properties. And it will attach some default tags to the span.
func InsertRepackFunc ¶
InsertRepackFunc is used to repack messages after hash by primary key
func MsgSpanFromCtx ¶
MsgSpanFromCtx extracts the span from context. And it will attach some default tags to the span.
func NewMqMsgStream ¶
func NewMqMsgStream(ctx context.Context, receiveBufSize int64, bufSize int64, client mqwrapper.Client, unmarshal UnmarshalDispatcher) (*mqMsgStream, error)
NewMqMsgStream is used to generate a new mqMsgStream object
Types ¶
type BaseMsg ¶
type BaseMsg struct { Ctx context.Context BeginTimestamp Timestamp EndTimestamp Timestamp HashValues []uint32 MsgPosition *MsgPosition }
BaseMsg is a basic structure that contains begin timestamp, end timestamp and the position of msgstream
func (*BaseMsg) Position ¶
func (bm *BaseMsg) Position() *MsgPosition
Position returns the position of this message pack in msgstream
func (*BaseMsg) SetPosition ¶
func (bm *BaseMsg) SetPosition(position *MsgPosition)
SetPosition is used to set position of this message in msgstream
func (*BaseMsg) SetTraceCtx ¶
SetTraceCtx is used to set context for opentracing
type CommonFactory ¶
type CommonFactory struct { Newer func() (mqwrapper.Client, error) // client constructor DispatcherFactory ProtoUDFactory ReceiveBufSize int64 MQBufSize int64 }
CommonFactory is a Factory for creating message streams with common logic.
It contains a function field named newer, which is a function that creates an mqwrapper.Client when called.
func (*CommonFactory) NewMsgStream ¶
func (f *CommonFactory) NewMsgStream(ctx context.Context) (ms MsgStream, err error)
NewMsgStream is used to generate a new Msgstream object
func (*CommonFactory) NewMsgStreamDisposer ¶
NewMsgStreamDisposer returns a function that can be used to dispose of a message stream. The returned function takes a slice of channel names and a subscription name, and disposes of the message stream associated with those arguments.
func (*CommonFactory) NewQueryMsgStream ¶
func (f *CommonFactory) NewQueryMsgStream(ctx context.Context) (ms MsgStream, err error)
NewQueryMsgStream is used to generate a new QueryMsgstream object
func (*CommonFactory) NewTtMsgStream ¶
func (f *CommonFactory) NewTtMsgStream(ctx context.Context) (ms MsgStream, err error)
NewTtMsgStream is used to generate a new TtMsgstream object
type CreateCollectionMsg ¶
type CreateCollectionMsg struct { BaseMsg msgpb.CreateCollectionRequest }
CreateCollectionMsg is a message pack that contains create collection request
func (*CreateCollectionMsg) ID ¶
func (cc *CreateCollectionMsg) ID() UniqueID
ID returns the ID of this message pack
func (*CreateCollectionMsg) Marshal ¶
func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*CreateCollectionMsg) SetID ¶
func (cc *CreateCollectionMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*CreateCollectionMsg) SourceID ¶
func (cc *CreateCollectionMsg) SourceID() int64
SourceID indicates which component generated this message
func (*CreateCollectionMsg) Type ¶
func (cc *CreateCollectionMsg) Type() MsgType
Type returns the type of this message pack
func (*CreateCollectionMsg) Unmarshal ¶
func (cc *CreateCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type CreatePartitionMsg ¶
type CreatePartitionMsg struct { BaseMsg msgpb.CreatePartitionRequest }
CreatePartitionMsg is a message pack that contains create partition request
func (*CreatePartitionMsg) ID ¶
func (cp *CreatePartitionMsg) ID() UniqueID
ID returns the ID of this message pack
func (*CreatePartitionMsg) Marshal ¶
func (cp *CreatePartitionMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*CreatePartitionMsg) SetID ¶
func (cp *CreatePartitionMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*CreatePartitionMsg) SourceID ¶
func (cp *CreatePartitionMsg) SourceID() int64
SourceID indicates which component generated this message
func (*CreatePartitionMsg) Type ¶
func (cp *CreatePartitionMsg) Type() MsgType
Type returns the type of this message pack
func (*CreatePartitionMsg) Unmarshal ¶
func (cp *CreatePartitionMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type DataNodeTtMsg ¶
type DataNodeTtMsg struct { BaseMsg msgpb.DataNodeTtMsg }
DataNodeTtMsg is a message pack that contains datanode time tick
func (*DataNodeTtMsg) ID ¶
func (m *DataNodeTtMsg) ID() UniqueID
ID returns the ID of this message pack
func (*DataNodeTtMsg) Marshal ¶
func (m *DataNodeTtMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*DataNodeTtMsg) SetID ¶
func (m *DataNodeTtMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*DataNodeTtMsg) SourceID ¶
func (m *DataNodeTtMsg) SourceID() int64
SourceID indicates which component generated this message
func (*DataNodeTtMsg) Type ¶
func (m *DataNodeTtMsg) Type() MsgType
Type returns the type of this message pack
func (*DataNodeTtMsg) Unmarshal ¶
func (m *DataNodeTtMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type DeleteMsg ¶
type DeleteMsg struct { BaseMsg msgpb.DeleteRequest }
DeleteMsg is a message pack that contains delete request
func (*DeleteMsg) CheckAligned ¶
func (*DeleteMsg) Marshal ¶
func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
type DropCollectionMsg ¶
type DropCollectionMsg struct { BaseMsg msgpb.DropCollectionRequest }
DropCollectionMsg is a message pack that contains drop collection request
func (*DropCollectionMsg) ID ¶
func (dc *DropCollectionMsg) ID() UniqueID
ID returns the ID of this message pack
func (*DropCollectionMsg) Marshal ¶
func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*DropCollectionMsg) SetID ¶
func (dc *DropCollectionMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*DropCollectionMsg) SourceID ¶
func (dc *DropCollectionMsg) SourceID() int64
SourceID indicates which component generated this message
func (*DropCollectionMsg) Type ¶
func (dc *DropCollectionMsg) Type() MsgType
Type returns the type of this message pack
func (*DropCollectionMsg) Unmarshal ¶
func (dc *DropCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type DropPartitionMsg ¶
type DropPartitionMsg struct { BaseMsg msgpb.DropPartitionRequest }
DropPartitionMsg is a message pack that contains drop partition request
func (*DropPartitionMsg) ID ¶
func (dp *DropPartitionMsg) ID() UniqueID
ID returns the ID of this message pack
func (*DropPartitionMsg) Marshal ¶
func (dp *DropPartitionMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*DropPartitionMsg) SetID ¶
func (dp *DropPartitionMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*DropPartitionMsg) SourceID ¶
func (dp *DropPartitionMsg) SourceID() int64
SourceID indicates which component generated this message
func (*DropPartitionMsg) Type ¶
func (dp *DropPartitionMsg) Type() MsgType
Type returns the type of this message pack
func (*DropPartitionMsg) Unmarshal ¶
func (dp *DropPartitionMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type Factory ¶
type Factory interface { NewMsgStream(ctx context.Context) (MsgStream, error) NewTtMsgStream(ctx context.Context) (MsgStream, error) NewQueryMsgStream(ctx context.Context) (MsgStream, error) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error }
func NewKmsFactory ¶
func NewKmsFactory(config *paramtable.KafkaConfig) Factory
func NewNatsmqFactory ¶
func NewNatsmqFactory() Factory
NewNatsmqFactory create a new nats-mq factory.
type InsertMsg ¶
type InsertMsg struct { BaseMsg msgpb.InsertRequest }
InsertMsg is a message pack that contains insert request
func (*InsertMsg) CheckAligned ¶
func (*InsertMsg) IndexRequest ¶
func (it *InsertMsg) IndexRequest(index int) msgpb.InsertRequest
func (*InsertMsg) IsColumnBased ¶
func (*InsertMsg) IsRowBased ¶
func (*InsertMsg) Marshal ¶
func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serialize a message pack to byte array
type IntPrimaryKey ¶
type IntPrimaryKey = typeutil.IntPrimaryKey
IntPrimaryKey is an alias for short
type KmsFactory ¶
type KmsFactory struct { ReceiveBufSize int64 // contains filtered or unexported fields }
func (*KmsFactory) NewMsgStream ¶
func (f *KmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error)
func (*KmsFactory) NewMsgStreamDisposer ¶
func (*KmsFactory) NewQueryMsgStream ¶
func (f *KmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error)
func (*KmsFactory) NewTtMsgStream ¶
func (f *KmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error)
type MockMqFactory ¶
func NewMockMqFactory ¶
func NewMockMqFactory() *MockMqFactory
func (MockMqFactory) NewMsgStream ¶
func (m MockMqFactory) NewMsgStream(ctx context.Context) (MsgStream, error)
type MockMsgStream ¶
MockMsgStream is an autogenerated mock type for the MsgStream type
func NewMockMsgStream ¶
func NewMockMsgStream(t mockConstructorTestingTNewMockMsgStream) *MockMsgStream
NewMockMsgStream creates a new instance of MockMsgStream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func (*MockMsgStream) AsConsumer ¶
func (_m *MockMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition)
AsConsumer provides a mock function with given fields: channels, subName, position
func (*MockMsgStream) AsProducer ¶
func (_m *MockMsgStream) AsProducer(channels []string)
AsProducer provides a mock function with given fields: channels
func (*MockMsgStream) Chan ¶
func (_m *MockMsgStream) Chan() <-chan *MsgPack
Chan provides a mock function with given fields:
func (*MockMsgStream) CheckTopicValid ¶
func (_m *MockMsgStream) CheckTopicValid(channel string) error
CheckTopicValid provides a mock function with given fields: channel
func (*MockMsgStream) Close ¶
func (_m *MockMsgStream) Close()
Close provides a mock function with given fields:
func (*MockMsgStream) EXPECT ¶
func (_m *MockMsgStream) EXPECT() *MockMsgStream_Expecter
func (*MockMsgStream) GetLatestMsgID ¶
func (_m *MockMsgStream) GetLatestMsgID(channel string) (mqwrapper.MessageID, error)
GetLatestMsgID provides a mock function with given fields: channel
func (*MockMsgStream) GetProduceChannels ¶
func (_m *MockMsgStream) GetProduceChannels() []string
GetProduceChannels provides a mock function with given fields:
func (*MockMsgStream) Produce ¶
func (_m *MockMsgStream) Produce(_a0 *MsgPack) error
Produce provides a mock function with given fields: _a0
func (*MockMsgStream) Seek ¶
func (_m *MockMsgStream) Seek(offset []*msgpb.MsgPosition) error
Seek provides a mock function with given fields: offset
func (*MockMsgStream) SetRepackFunc ¶
func (_m *MockMsgStream) SetRepackFunc(repackFunc RepackFunc)
SetRepackFunc provides a mock function with given fields: repackFunc
type MockMsgStream_AsConsumer_Call ¶
MockMsgStream_AsConsumer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AsConsumer'
func (*MockMsgStream_AsConsumer_Call) Return ¶
func (_c *MockMsgStream_AsConsumer_Call) Return() *MockMsgStream_AsConsumer_Call
func (*MockMsgStream_AsConsumer_Call) Run ¶
func (_c *MockMsgStream_AsConsumer_Call) Run(run func(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition)) *MockMsgStream_AsConsumer_Call
func (*MockMsgStream_AsConsumer_Call) RunAndReturn ¶
func (_c *MockMsgStream_AsConsumer_Call) RunAndReturn(run func([]string, string, mqwrapper.SubscriptionInitialPosition)) *MockMsgStream_AsConsumer_Call
type MockMsgStream_AsProducer_Call ¶
MockMsgStream_AsProducer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AsProducer'
func (*MockMsgStream_AsProducer_Call) Return ¶
func (_c *MockMsgStream_AsProducer_Call) Return() *MockMsgStream_AsProducer_Call
func (*MockMsgStream_AsProducer_Call) Run ¶
func (_c *MockMsgStream_AsProducer_Call) Run(run func(channels []string)) *MockMsgStream_AsProducer_Call
func (*MockMsgStream_AsProducer_Call) RunAndReturn ¶
func (_c *MockMsgStream_AsProducer_Call) RunAndReturn(run func([]string)) *MockMsgStream_AsProducer_Call
type MockMsgStream_Broadcast_Call ¶
MockMsgStream_Broadcast_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Broadcast'
func (*MockMsgStream_Broadcast_Call) Return ¶
func (_c *MockMsgStream_Broadcast_Call) Return(_a0 map[string][]mqwrapper.MessageID, _a1 error) *MockMsgStream_Broadcast_Call
func (*MockMsgStream_Broadcast_Call) Run ¶
func (_c *MockMsgStream_Broadcast_Call) Run(run func(_a0 *MsgPack)) *MockMsgStream_Broadcast_Call
func (*MockMsgStream_Broadcast_Call) RunAndReturn ¶
func (_c *MockMsgStream_Broadcast_Call) RunAndReturn(run func(*MsgPack) (map[string][]mqwrapper.MessageID, error)) *MockMsgStream_Broadcast_Call
type MockMsgStream_Chan_Call ¶
MockMsgStream_Chan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Chan'
func (*MockMsgStream_Chan_Call) Return ¶
func (_c *MockMsgStream_Chan_Call) Return(_a0 <-chan *MsgPack) *MockMsgStream_Chan_Call
func (*MockMsgStream_Chan_Call) Run ¶
func (_c *MockMsgStream_Chan_Call) Run(run func()) *MockMsgStream_Chan_Call
func (*MockMsgStream_Chan_Call) RunAndReturn ¶
func (_c *MockMsgStream_Chan_Call) RunAndReturn(run func() <-chan *MsgPack) *MockMsgStream_Chan_Call
type MockMsgStream_CheckTopicValid_Call ¶
MockMsgStream_CheckTopicValid_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckTopicValid'
func (*MockMsgStream_CheckTopicValid_Call) Return ¶
func (_c *MockMsgStream_CheckTopicValid_Call) Return(_a0 error) *MockMsgStream_CheckTopicValid_Call
func (*MockMsgStream_CheckTopicValid_Call) Run ¶
func (_c *MockMsgStream_CheckTopicValid_Call) Run(run func(channel string)) *MockMsgStream_CheckTopicValid_Call
func (*MockMsgStream_CheckTopicValid_Call) RunAndReturn ¶
func (_c *MockMsgStream_CheckTopicValid_Call) RunAndReturn(run func(string) error) *MockMsgStream_CheckTopicValid_Call
type MockMsgStream_Close_Call ¶
MockMsgStream_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
func (*MockMsgStream_Close_Call) Return ¶
func (_c *MockMsgStream_Close_Call) Return() *MockMsgStream_Close_Call
func (*MockMsgStream_Close_Call) Run ¶
func (_c *MockMsgStream_Close_Call) Run(run func()) *MockMsgStream_Close_Call
func (*MockMsgStream_Close_Call) RunAndReturn ¶
func (_c *MockMsgStream_Close_Call) RunAndReturn(run func()) *MockMsgStream_Close_Call
type MockMsgStream_Expecter ¶
type MockMsgStream_Expecter struct {
// contains filtered or unexported fields
}
func (*MockMsgStream_Expecter) AsConsumer ¶
func (_e *MockMsgStream_Expecter) AsConsumer(channels interface{}, subName interface{}, position interface{}) *MockMsgStream_AsConsumer_Call
AsConsumer is a helper method to define mock.On call
- channels []string
- subName string
- position mqwrapper.SubscriptionInitialPosition
func (*MockMsgStream_Expecter) AsProducer ¶
func (_e *MockMsgStream_Expecter) AsProducer(channels interface{}) *MockMsgStream_AsProducer_Call
AsProducer is a helper method to define mock.On call
- channels []string
func (*MockMsgStream_Expecter) Broadcast ¶
func (_e *MockMsgStream_Expecter) Broadcast(_a0 interface{}) *MockMsgStream_Broadcast_Call
Broadcast is a helper method to define mock.On call
- _a0 *MsgPack
func (*MockMsgStream_Expecter) Chan ¶
func (_e *MockMsgStream_Expecter) Chan() *MockMsgStream_Chan_Call
Chan is a helper method to define mock.On call
func (*MockMsgStream_Expecter) CheckTopicValid ¶
func (_e *MockMsgStream_Expecter) CheckTopicValid(channel interface{}) *MockMsgStream_CheckTopicValid_Call
CheckTopicValid is a helper method to define mock.On call
- channel string
func (*MockMsgStream_Expecter) Close ¶
func (_e *MockMsgStream_Expecter) Close() *MockMsgStream_Close_Call
Close is a helper method to define mock.On call
func (*MockMsgStream_Expecter) GetLatestMsgID ¶
func (_e *MockMsgStream_Expecter) GetLatestMsgID(channel interface{}) *MockMsgStream_GetLatestMsgID_Call
GetLatestMsgID is a helper method to define mock.On call
- channel string
func (*MockMsgStream_Expecter) GetProduceChannels ¶
func (_e *MockMsgStream_Expecter) GetProduceChannels() *MockMsgStream_GetProduceChannels_Call
GetProduceChannels is a helper method to define mock.On call
func (*MockMsgStream_Expecter) Produce ¶
func (_e *MockMsgStream_Expecter) Produce(_a0 interface{}) *MockMsgStream_Produce_Call
Produce is a helper method to define mock.On call
- _a0 *MsgPack
func (*MockMsgStream_Expecter) Seek ¶
func (_e *MockMsgStream_Expecter) Seek(offset interface{}) *MockMsgStream_Seek_Call
Seek is a helper method to define mock.On call
- offset []*msgpb.MsgPosition
func (*MockMsgStream_Expecter) SetRepackFunc ¶
func (_e *MockMsgStream_Expecter) SetRepackFunc(repackFunc interface{}) *MockMsgStream_SetRepackFunc_Call
SetRepackFunc is a helper method to define mock.On call
- repackFunc RepackFunc
type MockMsgStream_GetLatestMsgID_Call ¶
MockMsgStream_GetLatestMsgID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestMsgID'
func (*MockMsgStream_GetLatestMsgID_Call) Return ¶
func (_c *MockMsgStream_GetLatestMsgID_Call) Return(_a0 mqwrapper.MessageID, _a1 error) *MockMsgStream_GetLatestMsgID_Call
func (*MockMsgStream_GetLatestMsgID_Call) Run ¶
func (_c *MockMsgStream_GetLatestMsgID_Call) Run(run func(channel string)) *MockMsgStream_GetLatestMsgID_Call
func (*MockMsgStream_GetLatestMsgID_Call) RunAndReturn ¶
func (_c *MockMsgStream_GetLatestMsgID_Call) RunAndReturn(run func(string) (mqwrapper.MessageID, error)) *MockMsgStream_GetLatestMsgID_Call
type MockMsgStream_GetProduceChannels_Call ¶
MockMsgStream_GetProduceChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetProduceChannels'
func (*MockMsgStream_GetProduceChannels_Call) Return ¶
func (_c *MockMsgStream_GetProduceChannels_Call) Return(_a0 []string) *MockMsgStream_GetProduceChannels_Call
func (*MockMsgStream_GetProduceChannels_Call) Run ¶
func (_c *MockMsgStream_GetProduceChannels_Call) Run(run func()) *MockMsgStream_GetProduceChannels_Call
func (*MockMsgStream_GetProduceChannels_Call) RunAndReturn ¶
func (_c *MockMsgStream_GetProduceChannels_Call) RunAndReturn(run func() []string) *MockMsgStream_GetProduceChannels_Call
type MockMsgStream_Produce_Call ¶
MockMsgStream_Produce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Produce'
func (*MockMsgStream_Produce_Call) Return ¶
func (_c *MockMsgStream_Produce_Call) Return(_a0 error) *MockMsgStream_Produce_Call
func (*MockMsgStream_Produce_Call) Run ¶
func (_c *MockMsgStream_Produce_Call) Run(run func(_a0 *MsgPack)) *MockMsgStream_Produce_Call
func (*MockMsgStream_Produce_Call) RunAndReturn ¶
func (_c *MockMsgStream_Produce_Call) RunAndReturn(run func(*MsgPack) error) *MockMsgStream_Produce_Call
type MockMsgStream_Seek_Call ¶
MockMsgStream_Seek_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Seek'
func (*MockMsgStream_Seek_Call) Return ¶
func (_c *MockMsgStream_Seek_Call) Return(_a0 error) *MockMsgStream_Seek_Call
func (*MockMsgStream_Seek_Call) Run ¶
func (_c *MockMsgStream_Seek_Call) Run(run func(offset []*msgpb.MsgPosition)) *MockMsgStream_Seek_Call
func (*MockMsgStream_Seek_Call) RunAndReturn ¶
func (_c *MockMsgStream_Seek_Call) RunAndReturn(run func([]*msgpb.MsgPosition) error) *MockMsgStream_Seek_Call
type MockMsgStream_SetRepackFunc_Call ¶
MockMsgStream_SetRepackFunc_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetRepackFunc'
func (*MockMsgStream_SetRepackFunc_Call) Return ¶
func (_c *MockMsgStream_SetRepackFunc_Call) Return() *MockMsgStream_SetRepackFunc_Call
func (*MockMsgStream_SetRepackFunc_Call) Run ¶
func (_c *MockMsgStream_SetRepackFunc_Call) Run(run func(repackFunc RepackFunc)) *MockMsgStream_SetRepackFunc_Call
func (*MockMsgStream_SetRepackFunc_Call) RunAndReturn ¶
func (_c *MockMsgStream_SetRepackFunc_Call) RunAndReturn(run func(RepackFunc)) *MockMsgStream_SetRepackFunc_Call
type MqTtMsgStream ¶
type MqTtMsgStream struct {
// contains filtered or unexported fields
}
MqTtMsgStream is a msgstream that contains timeticks
func NewMqTtMsgStream ¶
func NewMqTtMsgStream(ctx context.Context, receiveBufSize int64, bufSize int64, client mqwrapper.Client, unmarshal UnmarshalDispatcher) (*MqTtMsgStream, error)
NewMqTtMsgStream is used to generate a new MqTtMsgStream object
func (*MqTtMsgStream) AsConsumer ¶
func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition)
AsConsumerWithPosition subscribes channels as consumer for a MsgStream and seeks to a certain position.
func (MqTtMsgStream) AsProducer ¶
func (ms MqTtMsgStream) AsProducer(channels []string)
AsProducer create producer to send message to channels
func (MqTtMsgStream) Broadcast ¶
BroadcastMark broadcast msg pack to all producers and returns corresponding msg id the returned message id serves as marking
func (*MqTtMsgStream) Chan ¶
func (ms *MqTtMsgStream) Chan() <-chan *MsgPack
func (MqTtMsgStream) CheckTopicValid ¶
func (*MqTtMsgStream) Close ¶
func (ms *MqTtMsgStream) Close()
Close will stop goroutine and free internal producers and consumers
func (MqTtMsgStream) ComputeProduceChannelIndexes ¶
func (MqTtMsgStream) GetLatestMsgID ¶
func (MqTtMsgStream) GetProduceChannels ¶
func (ms MqTtMsgStream) GetProduceChannels() []string
func (*MqTtMsgStream) Seek ¶
func (ms *MqTtMsgStream) Seek(msgPositions []*msgpb.MsgPosition) error
Seek to the specified position
func (MqTtMsgStream) SetRepackFunc ¶
func (ms MqTtMsgStream) SetRepackFunc(repackFunc RepackFunc)
type MsgPack ¶
type MsgPack struct { BeginTs Timestamp EndTs Timestamp Msgs []TsMsg StartPositions []*MsgPosition EndPositions []*MsgPosition }
MsgPack represents a batch of msg in msgstream
type MsgStream ¶
type MsgStream interface { Close() AsProducer(channels []string) Produce(*MsgPack) error SetRepackFunc(repackFunc RepackFunc) GetProduceChannels() []string Broadcast(*MsgPack) (map[string][]MessageID, error) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) Chan() <-chan *MsgPack Seek(offset []*MsgPosition) error GetLatestMsgID(channel string) (MessageID, error) CheckTopicValid(channel string) error }
MsgStream is an interface that can be used to produce and consume message on message queue
type PmsFactory ¶
type PmsFactory struct { // the following members must be public, so that mapstructure.Decode() can access them PulsarAddress string PulsarWebAddress string ReceiveBufSize int64 PulsarBufSize int64 PulsarAuthPlugin string PulsarAuthParams string PulsarTenant string PulsarNameSpace string // contains filtered or unexported fields }
PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
func NewPmsFactory ¶
func NewPmsFactory(config *paramtable.PulsarConfig) *PmsFactory
func (*PmsFactory) NewMsgStream ¶
func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error)
NewMsgStream is used to generate a new Msgstream object
func (*PmsFactory) NewMsgStreamDisposer ¶
func (*PmsFactory) NewQueryMsgStream ¶
func (f *PmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error)
NewQueryMsgStream is used to generate a new QueryMsgstream object
func (*PmsFactory) NewTtMsgStream ¶
func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error)
NewTtMsgStream is used to generate a new TtMsgstream object
type ProtoUDFactory ¶
type ProtoUDFactory struct{}
ProtoUDFactory is a factory to generate ProtoUnmarshalDispatcher object
func (*ProtoUDFactory) NewUnmarshalDispatcher ¶
func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher
NewUnmarshalDispatcher returns a new UnmarshalDispatcher
type ProtoUnmarshalDispatcher ¶
type ProtoUnmarshalDispatcher struct {
TempMap map[commonpb.MsgType]UnmarshalFunc
}
ProtoUnmarshalDispatcher is Unmarshal Dispatcher which used for data of proto type
type RepackFunc ¶
RepackFunc is a function type which used to repack message after hash by primary key
type TimeTickMsg ¶
type TimeTickMsg struct { BaseMsg msgpb.TimeTickMsg }
TimeTickMsg is a message pack that contains time tick only
func (*TimeTickMsg) ID ¶
func (tst *TimeTickMsg) ID() UniqueID
ID returns the ID of this message pack
func (*TimeTickMsg) Marshal ¶
func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error)
Marshal is used to serializing a message pack to byte array
func (*TimeTickMsg) SetID ¶
func (tst *TimeTickMsg) SetID(id UniqueID)
SetID set the ID of this message pack
func (*TimeTickMsg) SourceID ¶
func (tst *TimeTickMsg) SourceID() int64
SourceID indicates which component generated this message
func (*TimeTickMsg) Type ¶
func (tst *TimeTickMsg) Type() MsgType
Type returns the type of this message pack
func (*TimeTickMsg) Unmarshal ¶
func (tst *TimeTickMsg) Unmarshal(input MarshalType) (TsMsg, error)
Unmarshal is used to deserializing a message pack from byte array
type TsMsg ¶
type TsMsg interface { TraceCtx() context.Context SetTraceCtx(ctx context.Context) ID() UniqueID SetID(id UniqueID) BeginTs() Timestamp EndTs() Timestamp Type() MsgType SourceID() int64 HashKeys() []uint32 Marshal(TsMsg) (MarshalType, error) Unmarshal(MarshalType) (TsMsg, error) Position() *MsgPosition SetPosition(*MsgPosition) }
TsMsg provides methods to get begin timestamp and end timestamp of a message pack
type UnmarshalDispatcher ¶
type UnmarshalDispatcher interface {
Unmarshal(input interface{}, msgType commonpb.MsgType) (TsMsg, error)
}
UnmarshalDispatcher is an interface contains method Unmarshal
type UnmarshalDispatcherFactory ¶
type UnmarshalDispatcherFactory interface {
NewUnmarshalDispatcher() *UnmarshalDispatcher
}
UnmarshalDispatcherFactory is a factory to generate an object which implement interface UnmarshalDispatcher
type UnmarshalFunc ¶
UnmarshalFunc is an interface that has been implemented by each Msg
type UpsertMsg ¶
///////////////////////////////////////Upsert//////////////////////////////////////////
type WastedMockMsgStream ¶
type WastedMockMsgStream struct { MsgStream AsProducerFunc func(channels []string) BroadcastMarkFunc func(*MsgPack) (map[string][]MessageID, error) BroadcastFunc func(*MsgPack) error ChanFunc func() <-chan *MsgPack }
func NewWastedMockMsgStream ¶
func NewWastedMockMsgStream() *WastedMockMsgStream
func (WastedMockMsgStream) AsProducer ¶
func (m WastedMockMsgStream) AsProducer(channels []string)
func (WastedMockMsgStream) Broadcast ¶
func (m WastedMockMsgStream) Broadcast(pack *MsgPack) (map[string][]MessageID, error)
func (WastedMockMsgStream) Chan ¶
func (m WastedMockMsgStream) Chan() <-chan *MsgPack