Documentation
¶
Index ¶
- Constants
- Variables
- func GetDropCollectionMsgID(collectionID int64) string
- func GetDropPartitionMsgID(collectionID int64, partitionID int64) string
- func GetSimpleAttributions(pairs []*commonpb.KeyValuePair) []entity.CollectionAttribute
- type AlterDatabaseParam
- type AlterIndexParam
- type BaseTaskMsg
- type ChannelManager
- type CollectionEventConsumer
- type CollectionFilter
- type CreateCollectionParam
- type CreateDatabaseParam
- type CreateIndexParam
- type CreatePartitionParam
- type CreateRoleParam
- type CreateUserParam
- type DataFormatter
- type DataHandler
- type DefaultChannelManager
- func (d *DefaultChannelManager) AddDroppedCollection(ids []int64)
- func (d *DefaultChannelManager) AddDroppedPartition(ids []int64)
- func (d *DefaultChannelManager) AddPartition(ctx context.Context, dbInfo *model.DatabaseInfo, ...) error
- func (d *DefaultChannelManager) GetChannelChan() <-chan string
- func (d *DefaultChannelManager) GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error)
- func (d *DefaultChannelManager) GetEventChan() <-chan *ReplicateAPIEvent
- func (d *DefaultChannelManager) GetMsgChan(pChannel string) <-chan *ReplicateMsg
- func (d *DefaultChannelManager) SetCtx(ctx context.Context)
- func (d *DefaultChannelManager) StartReadCollection(ctx context.Context, db *model.DatabaseInfo, info *pb.CollectionInfo, ...) error
- func (d *DefaultChannelManager) StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error
- type DefaultDataFormatter
- type DefaultDataHandler
- func (d *DefaultDataHandler) AlterDatabase(ctx context.Context, param *AlterDatabaseParam) error
- func (d *DefaultDataHandler) AlterIndex(ctx context.Context, param *AlterIndexParam) error
- func (d *DefaultDataHandler) CreateCollection(ctx context.Context, param *CreateCollectionParam) error
- func (d *DefaultDataHandler) CreateDatabase(ctx context.Context, param *CreateDatabaseParam) error
- func (d *DefaultDataHandler) CreateIndex(ctx context.Context, param *CreateIndexParam) error
- func (d *DefaultDataHandler) CreatePartition(ctx context.Context, param *CreatePartitionParam) error
- func (d *DefaultDataHandler) CreateRole(ctx context.Context, param *CreateRoleParam) error
- func (d *DefaultDataHandler) CreateUser(ctx context.Context, param *CreateUserParam) error
- func (d *DefaultDataHandler) Delete(ctx context.Context, param *DeleteParam) error
- func (d *DefaultDataHandler) DeleteUser(ctx context.Context, param *DeleteUserParam) error
- func (d *DefaultDataHandler) DescribeCollection(ctx context.Context, param *DescribeCollectionParam) error
- func (d *DefaultDataHandler) DescribeDatabase(ctx context.Context, param *DescribeDatabaseParam) error
- func (d *DefaultDataHandler) DescribePartition(ctx context.Context, param *DescribePartitionParam) error
- func (d *DefaultDataHandler) DropCollection(ctx context.Context, param *DropCollectionParam) error
- func (d *DefaultDataHandler) DropDatabase(ctx context.Context, param *DropDatabaseParam) error
- func (d *DefaultDataHandler) DropIndex(ctx context.Context, param *DropIndexParam) error
- func (d *DefaultDataHandler) DropPartition(ctx context.Context, param *DropPartitionParam) error
- func (d *DefaultDataHandler) DropRole(ctx context.Context, param *DropRoleParam) error
- func (d *DefaultDataHandler) Flush(ctx context.Context, param *FlushParam) error
- func (d *DefaultDataHandler) Insert(ctx context.Context, param *InsertParam) error
- func (d *DefaultDataHandler) LoadCollection(ctx context.Context, param *LoadCollectionParam) error
- func (d *DefaultDataHandler) LoadPartitions(ctx context.Context, param *LoadPartitionsParam) error
- func (d *DefaultDataHandler) OperatePrivilege(ctx context.Context, param *OperatePrivilegeParam) error
- func (d *DefaultDataHandler) OperateUserRole(ctx context.Context, param *OperateUserRoleParam) error
- func (d *DefaultDataHandler) ReleaseCollection(ctx context.Context, param *ReleaseCollectionParam) error
- func (d *DefaultDataHandler) ReleasePartitions(ctx context.Context, param *ReleasePartitionsParam) error
- func (d *DefaultDataHandler) ReplicateMessage(ctx context.Context, param *ReplicateMessageParam) error
- func (d *DefaultDataHandler) UpdateUser(ctx context.Context, param *UpdateUserParam) error
- type DefaultMessageManager
- type DefaultMetaOp
- func (d *DefaultMetaOp) GetAllCollection(ctx context.Context, filter CollectionFilter) ([]*pb.CollectionInfo, error)
- func (d *DefaultMetaOp) GetAllDroppedObj() map[string]map[string]uint64
- func (d *DefaultMetaOp) GetAllPartition(ctx context.Context, filter PartitionFilter) ([]*pb.PartitionInfo, error)
- func (d *DefaultMetaOp) GetCollectionNameByID(ctx context.Context, id int64) string
- func (d *DefaultMetaOp) GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo
- func (d *DefaultMetaOp) StartWatch()
- func (d *DefaultMetaOp) SubscribeCollectionEvent(taskID string, consumer CollectionEventConsumer)
- func (d *DefaultMetaOp) SubscribePartitionEvent(taskID string, consumer PartitionEventConsumer)
- func (d *DefaultMetaOp) UnsubscribeEvent(taskID string, eventType WatchEventType)
- func (d *DefaultMetaOp) WatchCollection(ctx context.Context, filter CollectionFilter)
- func (d *DefaultMetaOp) WatchPartition(ctx context.Context, filter PartitionFilter)
- type DefaultReader
- type DefaultTargetAPI
- func (d *DefaultTargetAPI) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
- func (d *DefaultTargetAPI) GetDatabaseName(ctx context.Context, collectionName, databaseName string) (string, error)
- func (d *DefaultTargetAPI) GetPartitionInfo(ctx context.Context, collectionName string, databaseName string) (*model.CollectionInfo, error)
- type DefaultWriter
- func (d *DefaultWriter) HandleOpMessagePack(ctx context.Context, msgPack *msgstream.MsgPack) ([]byte, error)
- func (d *DefaultWriter) HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error
- func (d *DefaultWriter) HandleReplicateMessage(ctx context.Context, channelName string, msgPack *msgstream.MsgPack) ([]byte, []byte, error)
- func (d *DefaultWriter) RecoveryMetaMsg(ctx context.Context, taskID string) error
- type DeleteParam
- type DeleteUserParam
- type DescribeCollectionParam
- type DescribeDatabaseParam
- type DescribePartitionParam
- type DropCollectionParam
- type DropDatabaseParam
- type DropIndexParam
- type DropPartitionParam
- type DropRoleParam
- type FlushParam
- type InsertParam
- type LoadCollectionParam
- type LoadPartitionsParam
- type MessageManager
- type MetaMsg
- type MetaMsgType
- type MetaOp
- type MsgBaseParam
- type OperatePrivilegeParam
- type OperateUserRoleParam
- type PartitionEventConsumer
- type PartitionFilter
- type Reader
- type ReleaseCollectionParam
- type ReleasePartitionsParam
- type ReplicateAPIEvent
- type ReplicateAPIEventType
- type ReplicateMessage
- type ReplicateMessageParam
- type ReplicateMeta
- type ReplicateMsg
- type ReplicateParam
- type ReplicateStore
- type SimpleAttribution
- type TargetAPI
- type TaskDropCollectionMsg
- type TaskDropPartitionMsg
- type UpdateUserParam
- type WatchEventType
- type Writer
Constants ¶
View Source
const IndexKeyMmap = "mmap.enabled"
Variables ¶
View Source
var EmptyMsgPack = &ReplicateMsg{}
Functions ¶
func GetDropCollectionMsgID ¶
func GetDropPartitionMsgID ¶
func GetSimpleAttributions ¶
func GetSimpleAttributions(pairs []*commonpb.KeyValuePair) []entity.CollectionAttribute
Types ¶
type AlterDatabaseParam ¶
type AlterDatabaseParam struct { ReplicateParam *milvuspb.AlterDatabaseRequest }
type AlterIndexParam ¶
type AlterIndexParam struct { ReplicateParam *milvuspb.AlterIndexRequest }
type BaseTaskMsg ¶
type BaseTaskMsg struct { TaskID string `json:"task_id"` MsgID string `json:"msg_id"` TargetChannels []string `json:"target_channels"` ReadyChannels []string `json:"ready_channels"` }
func (BaseTaskMsg) IsReady ¶
func (msg BaseTaskMsg) IsReady() bool
type ChannelManager ¶
type ChannelManager interface { SetCtx(ctx context.Context) AddDroppedCollection(ids []int64) AddDroppedPartition(ids []int64) StartReadCollection(ctx context.Context, db *model.DatabaseInfo, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error AddPartition(ctx context.Context, dbInfo *model.DatabaseInfo, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error GetChannelChan() <-chan string GetMsgChan(pChannel string) <-chan *ReplicateMsg GetEventChan() <-chan *ReplicateAPIEvent GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error) }
ChannelManager a target must promise a manager
type CollectionEventConsumer ¶
type CollectionEventConsumer CollectionFilter
type CollectionFilter ¶
type CollectionFilter func(*pb.CollectionInfo) bool
CollectionFilter the filter will be used before the collection is filled the schema info
type CreateCollectionParam ¶
type CreateCollectionParam struct { MsgBaseParam ReplicateParam Schema *entity.Schema ShardsNum int32 ConsistencyLevel commonpb.ConsistencyLevel Properties []*commonpb.KeyValuePair }
type CreateDatabaseParam ¶
type CreateDatabaseParam struct { ReplicateParam *milvuspb.CreateDatabaseRequest }
type CreateIndexParam ¶
type CreateIndexParam struct { ReplicateParam *milvuspb.CreateIndexRequest }
type CreatePartitionParam ¶
type CreatePartitionParam struct { MsgBaseParam ReplicateParam CollectionName string PartitionName string }
type CreateRoleParam ¶
type CreateRoleParam struct { ReplicateParam *milvuspb.CreateRoleRequest }
type CreateUserParam ¶
type CreateUserParam struct { ReplicateParam *milvuspb.CreateCredentialRequest }
type DataFormatter ¶
type DataHandler ¶
type DataHandler interface { CreateCollection(ctx context.Context, param *CreateCollectionParam) error DropCollection(ctx context.Context, param *DropCollectionParam) error CreatePartition(ctx context.Context, param *CreatePartitionParam) error DropPartition(ctx context.Context, param *DropPartitionParam) error // Deprecated Insert(ctx context.Context, param *InsertParam) error // Deprecated Delete(ctx context.Context, param *DeleteParam) error Flush(ctx context.Context, param *FlushParam) error LoadCollection(ctx context.Context, param *LoadCollectionParam) error ReleaseCollection(ctx context.Context, param *ReleaseCollectionParam) error LoadPartitions(ctx context.Context, param *LoadPartitionsParam) error ReleasePartitions(ctx context.Context, param *ReleasePartitionsParam) error CreateIndex(ctx context.Context, param *CreateIndexParam) error DropIndex(ctx context.Context, param *DropIndexParam) error AlterIndex(ctx context.Context, param *AlterIndexParam) error CreateDatabase(ctx context.Context, param *CreateDatabaseParam) error DropDatabase(ctx context.Context, param *DropDatabaseParam) error AlterDatabase(ctx context.Context, param *AlterDatabaseParam) error ReplicateMessage(ctx context.Context, param *ReplicateMessageParam) error DescribeCollection(ctx context.Context, param *DescribeCollectionParam) error DescribeDatabase(ctx context.Context, param *DescribeDatabaseParam) error DescribePartition(ctx context.Context, param *DescribePartitionParam) error CreateUser(ctx context.Context, param *CreateUserParam) error DeleteUser(ctx context.Context, param *DeleteUserParam) error UpdateUser(ctx context.Context, param *UpdateUserParam) error CreateRole(ctx context.Context, param *CreateRoleParam) error DropRole(ctx context.Context, param *DropRoleParam) error OperateUserRole(ctx context.Context, param *OperateUserRoleParam) error OperatePrivilege(ctx context.Context, param *OperatePrivilegeParam) error }
type DefaultChannelManager ¶
type DefaultChannelManager struct{}
func (*DefaultChannelManager) AddDroppedCollection ¶
func (d *DefaultChannelManager) AddDroppedCollection(ids []int64)
func (*DefaultChannelManager) AddDroppedPartition ¶
func (d *DefaultChannelManager) AddDroppedPartition(ids []int64)
func (*DefaultChannelManager) AddPartition ¶
func (d *DefaultChannelManager) AddPartition(ctx context.Context, dbInfo *model.DatabaseInfo, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error
func (*DefaultChannelManager) GetChannelChan ¶
func (d *DefaultChannelManager) GetChannelChan() <-chan string
func (*DefaultChannelManager) GetChannelLatestMsgID ¶
func (*DefaultChannelManager) GetEventChan ¶
func (d *DefaultChannelManager) GetEventChan() <-chan *ReplicateAPIEvent
func (*DefaultChannelManager) GetMsgChan ¶
func (d *DefaultChannelManager) GetMsgChan(pChannel string) <-chan *ReplicateMsg
func (*DefaultChannelManager) SetCtx ¶
func (d *DefaultChannelManager) SetCtx(ctx context.Context)
func (*DefaultChannelManager) StartReadCollection ¶
func (d *DefaultChannelManager) StartReadCollection(ctx context.Context, db *model.DatabaseInfo, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error
func (*DefaultChannelManager) StopReadCollection ¶
func (d *DefaultChannelManager) StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error
type DefaultDataFormatter ¶
type DefaultDataFormatter struct{}
type DefaultDataHandler ¶
type DefaultDataHandler struct{}
func (*DefaultDataHandler) AlterDatabase ¶
func (d *DefaultDataHandler) AlterDatabase(ctx context.Context, param *AlterDatabaseParam) error
func (*DefaultDataHandler) AlterIndex ¶
func (d *DefaultDataHandler) AlterIndex(ctx context.Context, param *AlterIndexParam) error
func (*DefaultDataHandler) CreateCollection ¶
func (d *DefaultDataHandler) CreateCollection(ctx context.Context, param *CreateCollectionParam) error
func (*DefaultDataHandler) CreateDatabase ¶
func (d *DefaultDataHandler) CreateDatabase(ctx context.Context, param *CreateDatabaseParam) error
func (*DefaultDataHandler) CreateIndex ¶
func (d *DefaultDataHandler) CreateIndex(ctx context.Context, param *CreateIndexParam) error
func (*DefaultDataHandler) CreatePartition ¶
func (d *DefaultDataHandler) CreatePartition(ctx context.Context, param *CreatePartitionParam) error
func (*DefaultDataHandler) CreateRole ¶
func (d *DefaultDataHandler) CreateRole(ctx context.Context, param *CreateRoleParam) error
func (*DefaultDataHandler) CreateUser ¶
func (d *DefaultDataHandler) CreateUser(ctx context.Context, param *CreateUserParam) error
func (*DefaultDataHandler) Delete ¶
func (d *DefaultDataHandler) Delete(ctx context.Context, param *DeleteParam) error
func (*DefaultDataHandler) DeleteUser ¶
func (d *DefaultDataHandler) DeleteUser(ctx context.Context, param *DeleteUserParam) error
func (*DefaultDataHandler) DescribeCollection ¶
func (d *DefaultDataHandler) DescribeCollection(ctx context.Context, param *DescribeCollectionParam) error
func (*DefaultDataHandler) DescribeDatabase ¶
func (d *DefaultDataHandler) DescribeDatabase(ctx context.Context, param *DescribeDatabaseParam) error
func (*DefaultDataHandler) DescribePartition ¶
func (d *DefaultDataHandler) DescribePartition(ctx context.Context, param *DescribePartitionParam) error
func (*DefaultDataHandler) DropCollection ¶
func (d *DefaultDataHandler) DropCollection(ctx context.Context, param *DropCollectionParam) error
func (*DefaultDataHandler) DropDatabase ¶
func (d *DefaultDataHandler) DropDatabase(ctx context.Context, param *DropDatabaseParam) error
func (*DefaultDataHandler) DropIndex ¶
func (d *DefaultDataHandler) DropIndex(ctx context.Context, param *DropIndexParam) error
func (*DefaultDataHandler) DropPartition ¶
func (d *DefaultDataHandler) DropPartition(ctx context.Context, param *DropPartitionParam) error
func (*DefaultDataHandler) DropRole ¶
func (d *DefaultDataHandler) DropRole(ctx context.Context, param *DropRoleParam) error
func (*DefaultDataHandler) Flush ¶
func (d *DefaultDataHandler) Flush(ctx context.Context, param *FlushParam) error
func (*DefaultDataHandler) Insert ¶
func (d *DefaultDataHandler) Insert(ctx context.Context, param *InsertParam) error
func (*DefaultDataHandler) LoadCollection ¶
func (d *DefaultDataHandler) LoadCollection(ctx context.Context, param *LoadCollectionParam) error
func (*DefaultDataHandler) LoadPartitions ¶
func (d *DefaultDataHandler) LoadPartitions(ctx context.Context, param *LoadPartitionsParam) error
func (*DefaultDataHandler) OperatePrivilege ¶
func (d *DefaultDataHandler) OperatePrivilege(ctx context.Context, param *OperatePrivilegeParam) error
func (*DefaultDataHandler) OperateUserRole ¶
func (d *DefaultDataHandler) OperateUserRole(ctx context.Context, param *OperateUserRoleParam) error
func (*DefaultDataHandler) ReleaseCollection ¶
func (d *DefaultDataHandler) ReleaseCollection(ctx context.Context, param *ReleaseCollectionParam) error
func (*DefaultDataHandler) ReleasePartitions ¶
func (d *DefaultDataHandler) ReleasePartitions(ctx context.Context, param *ReleasePartitionsParam) error
func (*DefaultDataHandler) ReplicateMessage ¶
func (d *DefaultDataHandler) ReplicateMessage(ctx context.Context, param *ReplicateMessageParam) error
func (*DefaultDataHandler) UpdateUser ¶
func (d *DefaultDataHandler) UpdateUser(ctx context.Context, param *UpdateUserParam) error
type DefaultMessageManager ¶
type DefaultMessageManager struct{}
func (*DefaultMessageManager) Close ¶
func (d *DefaultMessageManager) Close(channelName string)
func (*DefaultMessageManager) ReplicateMessage ¶
func (d *DefaultMessageManager) ReplicateMessage(message *ReplicateMessage)
type DefaultMetaOp ¶
type DefaultMetaOp struct{}
func (*DefaultMetaOp) GetAllCollection ¶
func (d *DefaultMetaOp) GetAllCollection(ctx context.Context, filter CollectionFilter) ([]*pb.CollectionInfo, error)
func (*DefaultMetaOp) GetAllDroppedObj ¶
func (d *DefaultMetaOp) GetAllDroppedObj() map[string]map[string]uint64
func (*DefaultMetaOp) GetAllPartition ¶
func (d *DefaultMetaOp) GetAllPartition(ctx context.Context, filter PartitionFilter) ([]*pb.PartitionInfo, error)
func (*DefaultMetaOp) GetCollectionNameByID ¶
func (d *DefaultMetaOp) GetCollectionNameByID(ctx context.Context, id int64) string
func (*DefaultMetaOp) GetDatabaseInfoForCollection ¶
func (d *DefaultMetaOp) GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo
func (*DefaultMetaOp) StartWatch ¶
func (d *DefaultMetaOp) StartWatch()
func (*DefaultMetaOp) SubscribeCollectionEvent ¶
func (d *DefaultMetaOp) SubscribeCollectionEvent(taskID string, consumer CollectionEventConsumer)
func (*DefaultMetaOp) SubscribePartitionEvent ¶
func (d *DefaultMetaOp) SubscribePartitionEvent(taskID string, consumer PartitionEventConsumer)
func (*DefaultMetaOp) UnsubscribeEvent ¶
func (d *DefaultMetaOp) UnsubscribeEvent(taskID string, eventType WatchEventType)
func (*DefaultMetaOp) WatchCollection ¶
func (d *DefaultMetaOp) WatchCollection(ctx context.Context, filter CollectionFilter)
func (*DefaultMetaOp) WatchPartition ¶
func (d *DefaultMetaOp) WatchPartition(ctx context.Context, filter PartitionFilter)
type DefaultReader ¶
type DefaultReader struct{}
DefaultReader All CDCReader implements should combine it
func (*DefaultReader) ErrorChan ¶
func (d *DefaultReader) ErrorChan() <-chan error
func (*DefaultReader) QuitRead ¶
func (d *DefaultReader) QuitRead(ctx context.Context)
func (*DefaultReader) StartRead ¶
func (d *DefaultReader) StartRead(ctx context.Context)
StartRead the return value is nil, and if you receive the data from the nil chan, will block forever, not panic
type DefaultTargetAPI ¶
type DefaultTargetAPI struct{}
func (*DefaultTargetAPI) GetCollectionInfo ¶
func (d *DefaultTargetAPI) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
func (*DefaultTargetAPI) GetDatabaseName ¶
func (*DefaultTargetAPI) GetPartitionInfo ¶
func (d *DefaultTargetAPI) GetPartitionInfo(ctx context.Context, collectionName string, databaseName string) (*model.CollectionInfo, error)
type DefaultWriter ¶
type DefaultWriter struct{}
func (*DefaultWriter) HandleOpMessagePack ¶
func (*DefaultWriter) HandleReplicateAPIEvent ¶
func (d *DefaultWriter) HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error
func (*DefaultWriter) HandleReplicateMessage ¶
func (*DefaultWriter) RecoveryMetaMsg ¶
func (d *DefaultWriter) RecoveryMetaMsg(ctx context.Context, taskID string) error
type DeleteParam ¶
type DeleteParam struct { MsgBaseParam ReplicateParam CollectionName string PartitionName string Column entity.Column }
type DeleteUserParam ¶
type DeleteUserParam struct { ReplicateParam *milvuspb.DeleteCredentialRequest }
type DescribeCollectionParam ¶
type DescribeCollectionParam struct { ReplicateParam Name string }
type DescribeDatabaseParam ¶
type DescribeDatabaseParam struct { ReplicateParam Name string }
type DescribePartitionParam ¶
type DescribePartitionParam struct { ReplicateParam CollectionName string PartitionName string }
type DropCollectionParam ¶
type DropCollectionParam struct { MsgBaseParam ReplicateParam CollectionName string }
type DropDatabaseParam ¶
type DropDatabaseParam struct { ReplicateParam *milvuspb.DropDatabaseRequest }
type DropIndexParam ¶
type DropIndexParam struct { ReplicateParam *milvuspb.DropIndexRequest }
type DropPartitionParam ¶
type DropPartitionParam struct { MsgBaseParam ReplicateParam CollectionName string PartitionName string }
type DropRoleParam ¶
type DropRoleParam struct { ReplicateParam *milvuspb.DropRoleRequest }
type FlushParam ¶
type FlushParam struct { ReplicateParam *milvuspb.FlushRequest }
type InsertParam ¶
type InsertParam struct { MsgBaseParam ReplicateParam CollectionName string PartitionName string Columns []entity.Column }
type LoadCollectionParam ¶
type LoadCollectionParam struct { ReplicateParam *milvuspb.LoadCollectionRequest }
type LoadPartitionsParam ¶
type LoadPartitionsParam struct { ReplicateParam *milvuspb.LoadPartitionsRequest }
type MessageManager ¶
type MessageManager interface { ReplicateMessage(message *ReplicateMessage) Close(channelName string) }
type MetaMsg ¶
type MetaMsg struct { Base BaseTaskMsg `json:"base"` Type MetaMsgType `json:"type"` Data map[string]interface{} `json:"data"` }
type MetaMsgType ¶
type MetaMsgType int
const ( DropCollectionMetaMsgType MetaMsgType = iota + 1 DropPartitionMetaMsgType )
type MetaOp ¶
type MetaOp interface { // WatchCollection its implementation should make sure it's only called once. The WatchPartition is same WatchCollection(ctx context.Context, filter CollectionFilter) WatchPartition(ctx context.Context, filter PartitionFilter) StartWatch() // SubscribeCollectionEvent an event only is consumed once. The SubscribePartitionEvent is same // TODO need to consider the many target, maybe try the method a meta op corresponds to a target SubscribeCollectionEvent(taskID string, consumer CollectionEventConsumer) SubscribePartitionEvent(taskID string, consumer PartitionEventConsumer) UnsubscribeEvent(taskID string, eventType WatchEventType) GetAllCollection(ctx context.Context, filter CollectionFilter) ([]*pb.CollectionInfo, error) GetAllPartition(ctx context.Context, filter PartitionFilter) ([]*pb.PartitionInfo, error) GetAllDroppedObj() map[string]map[string]uint64 GetCollectionNameByID(ctx context.Context, id int64) string GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo }
MetaOp meta operation
type MsgBaseParam ¶
type OperatePrivilegeParam ¶
type OperatePrivilegeParam struct { ReplicateParam *milvuspb.OperatePrivilegeRequest }
type OperateUserRoleParam ¶
type OperateUserRoleParam struct { ReplicateParam *milvuspb.OperateUserRoleRequest }
type PartitionEventConsumer ¶
type PartitionEventConsumer PartitionFilter
type PartitionFilter ¶
type PartitionFilter func(info *pb.PartitionInfo) bool
type ReleaseCollectionParam ¶
type ReleaseCollectionParam struct { ReplicateParam *milvuspb.ReleaseCollectionRequest }
type ReleasePartitionsParam ¶
type ReleasePartitionsParam struct { ReplicateParam *milvuspb.ReleasePartitionsRequest }
type ReplicateAPIEvent ¶
type ReplicateAPIEvent struct { EventType ReplicateAPIEventType CollectionInfo *pb.CollectionInfo PartitionInfo *pb.PartitionInfo ReplicateInfo *commonpb.ReplicateInfo ReplicateParam ReplicateParam TaskID string MsgID string Error error }
type ReplicateAPIEventType ¶
type ReplicateAPIEventType int
const ( ReplicateCreateCollection ReplicateAPIEventType = iota + 1 ReplicateDropCollection ReplicateCreatePartition ReplicateDropPartition ReplicateError = 100 )
func (ReplicateAPIEventType) String ¶
func (r ReplicateAPIEventType) String() string
type ReplicateMessage ¶
type ReplicateMessage struct { Ctx context.Context Param *ReplicateMessageParam SuccessFunc func(param *ReplicateMessageParam) FailFunc func(param *ReplicateMessageParam, err error) }
type ReplicateMessageParam ¶
type ReplicateMessageParam struct { MsgBaseParam ReplicateParam ChannelName string BeginTs, EndTs uint64 MsgsBytes [][]byte StartPositions, EndPositions []*msgpb.MsgPosition TargetMsgPosition string }
type ReplicateMeta ¶
type ReplicateMeta interface { UpdateTaskDropCollectionMsg(ctx context.Context, msg TaskDropCollectionMsg) (bool, error) GetTaskDropCollectionMsg(ctx context.Context, taskID string, msgID string) ([]TaskDropCollectionMsg, error) UpdateTaskDropPartitionMsg(ctx context.Context, msg TaskDropPartitionMsg) (bool, error) GetTaskDropPartitionMsg(ctx context.Context, taskID string, msgID string) ([]TaskDropPartitionMsg, error) RemoveTaskMsg(ctx context.Context, taskID string, msgID string) error }
type ReplicateMsg ¶
type ReplicateMsg struct { // source collection and channel info CollectionName string CollectionID int64 PChannelName string TaskID string MsgPack *msgstream.MsgPack }
func GetReplicateMsg ¶
type ReplicateParam ¶
type ReplicateParam struct {
Database string
}
type ReplicateStore ¶
type SimpleAttribution ¶
func (SimpleAttribution) KeyValue ¶
func (s SimpleAttribution) KeyValue() (string, string)
func (SimpleAttribution) Valid ¶
func (s SimpleAttribution) Valid() error
type TargetAPI ¶
type TargetAPI interface { GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error) GetPartitionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error) GetDatabaseName(ctx context.Context, collectionName, databaseName string) (string, error) }
type TaskDropCollectionMsg ¶
type TaskDropCollectionMsg struct { Base BaseTaskMsg `mapstructure:"-"` DatabaseName string `mapstructure:"database_name"` CollectionName string `mapstructure:"collection_name"` DropTS uint64 `mapstructure:"drop_ts"` }
func GetTaskDropCollectionMsg ¶
func GetTaskDropCollectionMsg(msg MetaMsg) (TaskDropCollectionMsg, error)
func (TaskDropCollectionMsg) ConvertToMetaMsg ¶
func (msg TaskDropCollectionMsg) ConvertToMetaMsg() (MetaMsg, error)
type TaskDropPartitionMsg ¶
type TaskDropPartitionMsg struct { Base BaseTaskMsg `mapstructure:"-"` DatabaseName string `mapstructure:"database_name"` CollectionName string `mapstructure:"collection_name"` PartitionName string `mapstructure:"partition_name"` DropTS uint64 `mapstructure:"drop_ts"` }
func GetTaskDropPartitionMsg ¶
func GetTaskDropPartitionMsg(msg MetaMsg) (TaskDropPartitionMsg, error)
func (TaskDropPartitionMsg) ConvertToMetaMsg ¶
func (msg TaskDropPartitionMsg) ConvertToMetaMsg() (MetaMsg, error)
type UpdateUserParam ¶
type UpdateUserParam struct { ReplicateParam *milvuspb.UpdateCredentialRequest }
type WatchEventType ¶
type WatchEventType int
const ( CollectionEventType WatchEventType = iota + 1 PartitionEventType )
type Writer ¶
type Writer interface { HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error HandleReplicateMessage(ctx context.Context, channelName string, msgPack *msgstream.MsgPack) ([]byte, []byte, error) HandleOpMessagePack(ctx context.Context, msgPack *msgstream.MsgPack) ([]byte, error) RecoveryMetaMsg(ctx context.Context, taskID string) error }
Click to show internal directories.
Click to hide internal directories.