Documentation
¶
Index ¶
- Constants
- Variables
- func CheckAndFixVirtualChannel(vchannel string) string
- func ForeachChannel(sourcePChannels, targetPChannels []string, ...) error
- func FormatChanKey(replicateID, channelName string) string
- func GetMsgDispatcherClient(creator FactoryCreator, mqConfig config.MQConfig, ttMsgStream bool) (msgdispatcher.Client, error)
- func GetStreamFactory(creator FactoryCreator, mqConfig config.MQConfig, ttMsgStream bool) (msgstream.Factory, error)
- func GetTSManager() *tsManager
- func GetVChannelByPChannel(pChannel string, vChannels []string) string
- func GreedyConsumeChan(packChan chan *api.ReplicateMsg, forward bool, f func(bool, *api.ReplicateMsg))
- func IsCollectionNotFoundError(err error) bool
- func IsDatabaseNotFoundError(err error) bool
- func IsDroppedObject(name string) bool
- func IsVirtualChannel(vchannel string) bool
- func NewChannelReader(channelName, seekPosition string, dispatchClient msgdispatcher.Client, ...) (api.Reader, error)
- func NewCollectionReader(id string, channelManager api.ChannelManager, metaOp api.MetaOp, ...) (api.Reader, error)
- func NewEtcdOp(etcdServerConfig config.EtcdServerConfig, defaultPartitionName string, ...) (api.MetaOp, error)
- func NewEtcdOpWithAddress(endpoints []string, rootPath, metaPath, defaultPartitionName string, ...) (api.MetaOp, error)
- func NewReplicateChannelManager(dispatchClient msgdispatcher.Client, factory msgstream.Factory, ...) (api.ChannelManager, error)
- func NewTarget(ctx context.Context, config TargetConfig) (api.TargetAPI, error)
- func ParseChanKey(key string) (string, string)
- type Barrier
- type ChannelReader
- type CollectionInfo
- type CollectionReader
- type DefaultFactoryCreator
- type DisptachClientStreamCreator
- func (dcsc *DisptachClientStreamCreator) CheckConnection(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) error
- func (dcsc *DisptachClientStreamCreator) GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error)
- func (dcsc *DisptachClientStreamCreator) GetStreamChan(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) (<-chan *msgstream.MsgPack, io.Closer, error)
- type EtcdOp
- func (e *EtcdOp) GetAllCollection(ctx context.Context, filter api.CollectionFilter) ([]*pb.CollectionInfo, error)
- func (e *EtcdOp) GetAllDroppedObj() map[string]map[string]uint64
- func (e *EtcdOp) GetAllPartition(ctx context.Context, filter api.PartitionFilter) ([]*pb.PartitionInfo, error)
- func (e *EtcdOp) GetCollectionNameByID(ctx context.Context, id int64) string
- func (e *EtcdOp) GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo
- func (e *EtcdOp) StartWatch()
- func (e *EtcdOp) SubscribeCollectionEvent(taskID string, consumer api.CollectionEventConsumer)
- func (e *EtcdOp) SubscribePartitionEvent(taskID string, consumer api.PartitionEventConsumer)
- func (e *EtcdOp) UnsubscribeEvent(taskID string, eventType api.WatchEventType)
- func (e *EtcdOp) WatchCollection(ctx context.Context, filter api.CollectionFilter)
- func (e *EtcdOp) WatchPartition(ctx context.Context, filter api.PartitionFilter)
- type FactoryCreator
- type FactoryStreamCreator
- func (fsc *FactoryStreamCreator) CheckConnection(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) error
- func (fsc *FactoryStreamCreator) GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error)
- func (fsc *FactoryStreamCreator) GetStreamChan(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) (<-chan *msgstream.MsgPack, io.Closer, error)
- type ShouldReadFunc
- type StreamCloser
- type StreamCreator
- type TargetClient
- func (t *TargetClient) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
- func (t *TargetClient) GetDatabaseName(ctx context.Context, collectionName, databaseName string) (string, error)
- func (t *TargetClient) GetPartitionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
- func (t *TargetClient) UpdateNameMappings(nameMappings map[string]string)
- type TargetConfig
Constants ¶
View Source
const ( AllCollection = "*" AllDatabase = "*" DefaultDatabase = "default" )
View Source
const ( TomeObject = "_tome" // has marked deleted object SkipCollectionState = pb.CollectionState(-100) SkipPartitionState = pb.PartitionState(-100) )
View Source
const BufferSize = "4"
Variables ¶
View Source
var ( TSMetricVec = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "milvus", Subsystem: "cdc", Name: "center_tt", Help: "the center ts, unit: ms", }, []string{"channel_name"}) )
Functions ¶
func ForeachChannel ¶
func FormatChanKey ¶
func GetMsgDispatcherClient ¶
func GetMsgDispatcherClient(creator FactoryCreator, mqConfig config.MQConfig, ttMsgStream bool) (msgdispatcher.Client, error)
GetMsgDispatcherClient TODO the client can't include the current msg, however it should include when give the position from the backup tool
func GetStreamFactory ¶
func GetTSManager ¶
func GetTSManager() *tsManager
func GetVChannelByPChannel ¶
func GreedyConsumeChan ¶
func GreedyConsumeChan(packChan chan *api.ReplicateMsg, forward bool, f func(bool, *api.ReplicateMsg))
func IsDatabaseNotFoundError ¶
func IsDroppedObject ¶
func IsVirtualChannel ¶
func NewChannelReader ¶
func NewCollectionReader ¶
func NewCollectionReader(id string, channelManager api.ChannelManager, metaOp api.MetaOp, seekPosition map[int64]map[string]*msgpb.MsgPosition, shouldReadFunc ShouldReadFunc, readerConfig config.ReaderConfig, ) (api.Reader, error)
func NewEtcdOp ¶
func NewEtcdOp(etcdServerConfig config.EtcdServerConfig, defaultPartitionName string, etcdConfig config.EtcdRetryConfig, target api.TargetAPI, ) (api.MetaOp, error)
func NewEtcdOpWithAddress ¶
func NewReplicateChannelManager ¶
func NewReplicateChannelManager( dispatchClient msgdispatcher.Client, factory msgstream.Factory, client api.TargetAPI, readConfig config.ReaderConfig, metaOp api.MetaOp, replicateMeta api.ReplicateMeta, msgPackCallback func(string, *msgstream.MsgPack), downstream string, ) (api.ChannelManager, error)
func ParseChanKey ¶
Types ¶
type Barrier ¶
type Barrier struct { Dest int BarrierSignalChan chan *model.BarrierSignal CloseChan chan struct{} UpdateFunc func() }
type ChannelReader ¶
type ChannelReader struct { api.DefaultReader // contains filtered or unexported fields }
func (*ChannelReader) QuitRead ¶
func (c *ChannelReader) QuitRead(ctx context.Context)
func (*ChannelReader) StartRead ¶
func (c *ChannelReader) StartRead(ctx context.Context)
type CollectionInfo ¶
type CollectionInfo struct {
// contains filtered or unexported fields
}
type CollectionReader ¶
type CollectionReader struct { api.DefaultReader // contains filtered or unexported fields }
func (*CollectionReader) ErrorChan ¶
func (reader *CollectionReader) ErrorChan() <-chan error
func (*CollectionReader) QuitRead ¶
func (reader *CollectionReader) QuitRead(ctx context.Context)
func (*CollectionReader) StartRead ¶
func (reader *CollectionReader) StartRead(ctx context.Context)
type DefaultFactoryCreator ¶
type DefaultFactoryCreator struct{}
func (*DefaultFactoryCreator) NewKmsFactory ¶
func (d *DefaultFactoryCreator) NewKmsFactory(cfg *config.KafkaConfig) msgstream.Factory
func (*DefaultFactoryCreator) NewPmsFactory ¶
func (d *DefaultFactoryCreator) NewPmsFactory(cfg *config.PulsarConfig) msgstream.Factory
type DisptachClientStreamCreator ¶
type DisptachClientStreamCreator struct {
// contains filtered or unexported fields
}
func NewDisptachClientStreamCreator ¶
func NewDisptachClientStreamCreator(factory msgstream.Factory, dispatchClient msgdispatcher.Client) *DisptachClientStreamCreator
func (*DisptachClientStreamCreator) CheckConnection ¶
func (dcsc *DisptachClientStreamCreator) CheckConnection(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) error
func (*DisptachClientStreamCreator) GetChannelLatestMsgID ¶
func (*DisptachClientStreamCreator) GetStreamChan ¶
type EtcdOp ¶
type EtcdOp struct {
// contains filtered or unexported fields
}
func (*EtcdOp) GetAllCollection ¶
func (e *EtcdOp) GetAllCollection(ctx context.Context, filter api.CollectionFilter) ([]*pb.CollectionInfo, error)
func (*EtcdOp) GetAllPartition ¶
func (e *EtcdOp) GetAllPartition(ctx context.Context, filter api.PartitionFilter) ([]*pb.PartitionInfo, error)
func (*EtcdOp) GetCollectionNameByID ¶
func (*EtcdOp) GetDatabaseInfoForCollection ¶
func (*EtcdOp) StartWatch ¶
func (e *EtcdOp) StartWatch()
func (*EtcdOp) SubscribeCollectionEvent ¶
func (e *EtcdOp) SubscribeCollectionEvent(taskID string, consumer api.CollectionEventConsumer)
func (*EtcdOp) SubscribePartitionEvent ¶
func (e *EtcdOp) SubscribePartitionEvent(taskID string, consumer api.PartitionEventConsumer)
func (*EtcdOp) UnsubscribeEvent ¶
func (e *EtcdOp) UnsubscribeEvent(taskID string, eventType api.WatchEventType)
func (*EtcdOp) WatchCollection ¶
func (e *EtcdOp) WatchCollection(ctx context.Context, filter api.CollectionFilter)
func (*EtcdOp) WatchPartition ¶
func (e *EtcdOp) WatchPartition(ctx context.Context, filter api.PartitionFilter)
type FactoryCreator ¶
type FactoryCreator interface { NewPmsFactory(cfg *config.PulsarConfig) msgstream.Factory NewKmsFactory(cfg *config.KafkaConfig) msgstream.Factory }
func NewDefaultFactoryCreator ¶
func NewDefaultFactoryCreator() FactoryCreator
type FactoryStreamCreator ¶
type FactoryStreamCreator struct {
// contains filtered or unexported fields
}
func (*FactoryStreamCreator) CheckConnection ¶
func (fsc *FactoryStreamCreator) CheckConnection(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) error
func (*FactoryStreamCreator) GetChannelLatestMsgID ¶
func (*FactoryStreamCreator) GetStreamChan ¶
type ShouldReadFunc ¶
type ShouldReadFunc func(*model.DatabaseInfo, *pb.CollectionInfo) bool
type StreamCloser ¶
type StreamCloser func()
func (StreamCloser) Close ¶
func (sc StreamCloser) Close() error
type StreamCreator ¶
type StreamCreator interface { GetStreamChan(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) (<-chan *msgstream.MsgPack, io.Closer, error) CheckConnection(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) error GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error) }
type TargetClient ¶
type TargetClient struct {
// contains filtered or unexported fields
}
func (*TargetClient) GetCollectionInfo ¶
func (t *TargetClient) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
func (*TargetClient) GetDatabaseName ¶
func (*TargetClient) GetPartitionInfo ¶
func (t *TargetClient) GetPartitionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
func (*TargetClient) UpdateNameMappings ¶
func (t *TargetClient) UpdateNameMappings(nameMappings map[string]string)
type TargetConfig ¶
type TargetConfig struct { URI string Token string APIKey string DialConfig util.DialConfig }
Click to show internal directories.
Click to hide internal directories.