Documentation ¶
Overview ¶
Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.
Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.
Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.
Index ¶
- func CheckPointsOf(task string) string
- func CheckRegionConsistency(startKey, endKey []byte, regions []RegionWithLeader) error
- func GlobalCheckpointOf(task string) string
- func LastErrorPrefixOf(task string) string
- func NewClusterCollector(ctx context.Context, srv LogBackupService) *clusterCollector
- func OwnerManagerForLogBackup(ctx context.Context, etcdCli *clientv3.Client) owner.Manager
- func Pause(task string) string
- func PrefixOfPause() string
- func PrefixOfTask() string
- func RangeKeyOf(name string, startKey []byte) string
- func RangesOf(name string) string
- func StorageCheckpointOf(task string) string
- func TaskOf(name string) string
- type AdvancerExt
- func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error
- func (t AdvancerExt) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
- func (t AdvancerExt) GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error)
- func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
- type AdvancerLockResolver
- type Checkpoint
- type CheckpointAdvancer
- func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context, threshold time.Duration) (spans.Valued, error)
- func (c *CheckpointAdvancer) Config() config.Config
- func (c *CheckpointAdvancer) GetCheckpointInRange(ctx context.Context, start, end []byte, collector *clusterCollector) error
- func (c *CheckpointAdvancer) GetInResolvingLock() bool
- func (c *CheckpointAdvancer) HasSubscriptions() bool
- func (c *CheckpointAdvancer) HasTask() bool
- func (c *CheckpointAdvancer) Name() string
- func (c *CheckpointAdvancer) OnBecomeOwner(ctx context.Context)
- func (c *CheckpointAdvancer) OnStart(ctx context.Context)
- func (c *CheckpointAdvancer) OnStop()
- func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error)
- func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context)
- func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context)
- func (c *CheckpointAdvancer) TEST_registerCallbackForSubscriptions(f func()) int
- func (c *CheckpointAdvancer) UpdateConfig(newConf config.Config)
- func (c *CheckpointAdvancer) UpdateConfigWith(f func(*config.Config))
- func (c *CheckpointAdvancer) UpdateLastCheckpoint(p *checkpoint)
- func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull))
- type CheckpointType
- type Env
- type EventType
- type FlushSubscriber
- func (f *FlushSubscriber) Clear()
- func (f *FlushSubscriber) Drop()
- func (f *FlushSubscriber) Events() <-chan spans.Valued
- func (f *FlushSubscriber) HandleErrors(ctx context.Context)
- func (f *FlushSubscriber) PendingErrors() error
- func (f *FlushSubscriber) UpdateStoreTopology(ctx context.Context) error
- type LogBackupService
- type MetaDataClient
- func (c *MetaDataClient) CleanLastErrorOfTask(ctx context.Context, taskName string) error
- func (c *MetaDataClient) DeleteTask(ctx context.Context, taskName string) error
- func (c *MetaDataClient) GetAllTasks(ctx context.Context) ([]Task, error)
- func (c *MetaDataClient) GetAllTasksWithRevision(ctx context.Context) ([]Task, int64, error)
- func (c *MetaDataClient) GetTask(ctx context.Context, taskName string) (*Task, error)
- func (c *MetaDataClient) GetTaskCount(ctx context.Context) (int, error)
- func (c *MetaDataClient) GetTaskWithPauseStatus(ctx context.Context, taskName string) (*Task, bool, error)
- func (c *MetaDataClient) PauseTask(ctx context.Context, taskName string) error
- func (c *MetaDataClient) PutTask(ctx context.Context, task TaskInfo) error
- func (c *MetaDataClient) ResumeTask(ctx context.Context, taskName string) error
- func (c *MetaDataClient) TaskByInfo(t backuppb.StreamBackupTaskInfo) *Task
- type PDRegionScanner
- func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, error)
- func (c PDRegionScanner) FetchCurrentTS(ctx context.Context) (uint64, error)
- func (c PDRegionScanner) RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error)
- func (c PDRegionScanner) Stores(ctx context.Context) ([]Store, error)
- func (c PDRegionScanner) UnblockGC(ctx context.Context) error
- type Range
- type Ranges
- type RegionIter
- type RegionWithLeader
- type Store
- type StoreCheckpoints
- type StreamMeta
- type SubscriberConfig
- type Task
- func (t *Task) GetGlobalCheckPointTS(ctx context.Context) (uint64, error)
- func (t *Task) GetStorageCheckpoint(ctx context.Context) (uint64, error)
- func (t *Task) IsPaused(ctx context.Context) (bool, error)
- func (t *Task) LastError(ctx context.Context) (map[uint64]backuppb.StreamBackupError, error)
- func (t *Task) NextBackupTSList(ctx context.Context) ([]Checkpoint, error)
- func (t *Task) Pause(ctx context.Context) error
- func (t *Task) Ranges(ctx context.Context) (Ranges, error)
- func (t *Task) Resume(ctx context.Context) error
- func (t *Task) UploadGlobalCheckpoint(ctx context.Context, ts uint64) error
- type TaskEvent
- type TaskInfo
- func (t *TaskInfo) Check() (*TaskInfo, error)
- func (t *TaskInfo) FromTS(ts uint64) *TaskInfo
- func (t *TaskInfo) ToStorage(backend *backuppb.StorageBackend) *TaskInfo
- func (t *TaskInfo) UntilTS(ts uint64) *TaskInfo
- func (t *TaskInfo) WithRange(startKey, endKey []byte) *TaskInfo
- func (t *TaskInfo) WithRanges(ranges ...Range) *TaskInfo
- func (t *TaskInfo) WithTableFilter(filterChain ...string) *TaskInfo
- func (t *TaskInfo) ZapTaskInfo() []zap.Field
- type TiKVClusterMeta
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CheckPointsOf ¶
CheckpointOf returns the checkpoint prefix of some store. Normally it would be <prefix>/checkpoint/<task-name>/.
func CheckRegionConsistency ¶
func CheckRegionConsistency(startKey, endKey []byte, regions []RegionWithLeader) error
func GlobalCheckpointOf ¶
GlobalCheckpointOf returns the path to the "global" checkpoint of some task. Normally it would be <prefix>/checkpoint/<task-name>/central_globa.
func LastErrorPrefixOf ¶
LastErrorPrefixOf make the prefix for searching last error by some task.
func NewClusterCollector ¶
func NewClusterCollector(ctx context.Context, srv LogBackupService) *clusterCollector
NewClusterCollector creates a new cluster collector. collectors are the structure transform region information to checkpoint information, by requesting the checkpoint of regions in the store.
func Pause ¶
Pause returns the path for pausing the task. Normally it would be <prefix>/pause/<task-name>.
func PrefixOfPause ¶
func PrefixOfPause() string
PrefixOfPause returns the prefix for pausing the task. Normally it would be <prefix>/pause/
func PrefixOfTask ¶
func PrefixOfTask() string
PrefixOfTask is the prefix of all task It would be `<prefix>/info/`
func RangeKeyOf ¶
RangeKeyOf returns the path for ranges of some task. Normally it would be <prefix>/ranges/<task-name(string)>/<start-key(binary)> -> <end-key(binary)>
func RangesOf ¶
RangesOf returns the path prefix for some task. Normally it would be <prefix>/ranges/<task-name(string)>/ the trailling slash is essential or we may scan ranges of tasks with same prefix.
func StorageCheckpointOf ¶
StorageCheckpointOf get the prefix path of the `storage checkpoint status` of a task. Normally it would be <prefix>/storage-checkpoint/<task>.
Types ¶
type AdvancerExt ¶
type AdvancerExt struct {
MetaDataClient
}
func (AdvancerExt) Begin ¶
func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error
func (AdvancerExt) ClearV3GlobalCheckpointForTask ¶
func (t AdvancerExt) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
func (AdvancerExt) GetGlobalCheckpointForTask ¶
func (AdvancerExt) UploadV3GlobalCheckpointForTask ¶
type AdvancerLockResolver ¶
type AdvancerLockResolver struct {
*tikv.BaseRegionLockResolver
}
func (*AdvancerLockResolver) GetStore ¶
func (l *AdvancerLockResolver) GetStore() tikv.Storage
If we don't implement GetStore here, it won't complie.
func (*AdvancerLockResolver) ResolveLocksInOneRegion ¶
func (l *AdvancerLockResolver) ResolveLocksInOneRegion( bo *tikv.Backoffer, locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error)
ResolveLocksInOneRegion tries to resolve expired locks with this method. It will check status of the txn. Resolve the lock if txn is expired, Or do nothing.
type Checkpoint ¶
type Checkpoint struct { ID uint64 `json:"id,omitempty"` Version uint64 `json:"epoch_version,omitempty"` TS uint64 `json:"ts"` IsGlobal bool `json:"-"` }
Checkpoint is the polymorphic checkpoint type. The `ID` and `Version` implies the type of this checkpoint: When ID == 0 and Version == 0, it is the task start ts. When ID != 0 and Version == 0, it is the store level checkpoint. When ID != 0 and Version != 0, it is the region level checkpoint.
func ParseCheckpoint ¶
func ParseCheckpoint(task string, key, value []byte) (Checkpoint, error)
ParseCheckpoint parses the checkpoint from a key & value pair.
func (Checkpoint) Type ¶
func (cp Checkpoint) Type() CheckpointType
Type returns the type(provider) of the checkpoint.
type CheckpointAdvancer ¶
type CheckpointAdvancer struct {
// contains filtered or unexported fields
}
CheckpointAdvancer is the central node for advancing the checkpoint of log backup. It's a part of "checkpoint v3". Generally, it scan the regions in the task range, collect checkpoints from tikvs.
┌──────┐ ┌────►│ TiKV │ │ └──────┘ │ │ ┌──────────┐GetLastFlushTSOfRegion│ ┌──────┐ │ Advancer ├──────────────────────┼────►│ TiKV │ └────┬─────┘ │ └──────┘ │ │ │ │ │ │ ┌──────┐ │ └────►│ TiKV │ │ └──────┘ │ │ UploadCheckpointV3 ┌──────────────────┐ └─────────────────────►│ PD │ └──────────────────┘
func NewCheckpointAdvancer ¶
func NewCheckpointAdvancer(env Env) *CheckpointAdvancer
NewCheckpointAdvancer creates a checkpoint advancer with the env.
func (*CheckpointAdvancer) CalculateGlobalCheckpointLight ¶
func (*CheckpointAdvancer) Config ¶
func (c *CheckpointAdvancer) Config() config.Config
Config returns the current config.
func (*CheckpointAdvancer) GetCheckpointInRange ¶
func (c *CheckpointAdvancer) GetCheckpointInRange(ctx context.Context, start, end []byte, collector *clusterCollector) error
GetCheckpointInRange scans the regions in the range, collect them to the collector.
func (*CheckpointAdvancer) GetInResolvingLock ¶
func (c *CheckpointAdvancer) GetInResolvingLock() bool
GetInResolvingLock only used for test.
func (*CheckpointAdvancer) HasSubscriptions ¶
func (c *CheckpointAdvancer) HasSubscriptions() bool
HasSubscriptions returns whether the advancer is associated with a subscriber.
func (*CheckpointAdvancer) HasTask ¶
func (c *CheckpointAdvancer) HasTask() bool
HasTask returns whether the advancer has been bound to a task.
func (*CheckpointAdvancer) Name ¶
func (c *CheckpointAdvancer) Name() string
Name implements daemon.Interface.
func (*CheckpointAdvancer) OnBecomeOwner ¶
func (c *CheckpointAdvancer) OnBecomeOwner(ctx context.Context)
OnBecomeOwner implements daemon.Interface. If the tidb-server become owner, this function will be called.
func (*CheckpointAdvancer) OnStart ¶
func (c *CheckpointAdvancer) OnStart(ctx context.Context)
OnStart implements daemon.Interface, which will be called when log backup service starts.
func (*CheckpointAdvancer) OnStop ¶
func (c *CheckpointAdvancer) OnStop()
func (*CheckpointAdvancer) OnTick ¶
func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error)
OnTick advances the inner logic clock for the advancer. It's synchronous: this would only return after the events triggered by the clock has all been done. It's generally panic-free, you may not need to trying recover a panic here.
func (*CheckpointAdvancer) SpawnSubscriptionHandler ¶
func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context)
func (*CheckpointAdvancer) StartTaskListener ¶
func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context)
StartTaskListener starts the task listener for the advancer. When no task detected, advancer would do nothing, please call this before begin the tick loop.
func (*CheckpointAdvancer) TEST_registerCallbackForSubscriptions ¶
func (c *CheckpointAdvancer) TEST_registerCallbackForSubscriptions(f func()) int
func (*CheckpointAdvancer) UpdateConfig ¶
func (c *CheckpointAdvancer) UpdateConfig(newConf config.Config)
UpdateConfig updates the config for the advancer. Note this should be called before starting the loop, because there isn't locks, TODO: support updating config when advancer starts working. (Maybe by applying changes at begin of ticking, and add locks.)
func (*CheckpointAdvancer) UpdateConfigWith ¶
func (c *CheckpointAdvancer) UpdateConfigWith(f func(*config.Config))
UpdateConfigWith updates the config by modifying the current config.
func (*CheckpointAdvancer) UpdateLastCheckpoint ¶
func (c *CheckpointAdvancer) UpdateLastCheckpoint(p *checkpoint)
UpdateLastCheckpoint modify the checkpoint in ticking.
func (*CheckpointAdvancer) WithCheckpoints ¶
func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull))
type CheckpointType ¶
type CheckpointType int
const ( CheckpointTypeStore CheckpointType = iota CheckpointTypeRegion CheckpointTypeTask CheckpointTypeGlobal CheckpointTypeInvalid )
type Env ¶
type Env interface { // The region scanner provides the region information. TiKVClusterMeta // LogBackupService connects to the TiKV, so we can collect the region checkpoints. LogBackupService // StreamMeta connects to the metadata service (normally PD). StreamMeta // GCLockResolver try to resolve locks when region checkpoint stopped. tikv.RegionLockResolver }
Env is the interface required by the advancer.
type FlushSubscriber ¶
type FlushSubscriber struct {
// contains filtered or unexported fields
}
FlushSubscriber maintains the state of subscribing to the cluster.
func NewSubscriber ¶
func NewSubscriber(dialer LogBackupService, cluster TiKVClusterMeta, config ...SubscriberConfig) *FlushSubscriber
NewSubscriber creates a new subscriber via the environment and optional configs.
func (*FlushSubscriber) Clear ¶
func (f *FlushSubscriber) Clear()
Clear clears all the subscriptions.
func (*FlushSubscriber) Drop ¶
func (f *FlushSubscriber) Drop()
Drop terminates the lifetime of the subscriber. This subscriber would be no more usable.
func (*FlushSubscriber) Events ¶
func (f *FlushSubscriber) Events() <-chan spans.Valued
Events returns the output channel of the events.
func (*FlushSubscriber) HandleErrors ¶
func (f *FlushSubscriber) HandleErrors(ctx context.Context)
HandleErrors execute the handlers over all pending errors. Note that the handler may cannot handle the pending errors, at that time, you can fetch the errors via `PendingErrors` call.
func (*FlushSubscriber) PendingErrors ¶
func (f *FlushSubscriber) PendingErrors() error
func (*FlushSubscriber) UpdateStoreTopology ¶
func (f *FlushSubscriber) UpdateStoreTopology(ctx context.Context) error
UpdateStoreTopology fetches the current store topology and try to adapt the subscription state with it.
type LogBackupService ¶
type MetaDataClient ¶
MetaDataClient is the client for operations over metadata.
func NewMetaDataClient ¶
func NewMetaDataClient(c *clientv3.Client) *MetaDataClient
func (*MetaDataClient) CleanLastErrorOfTask ¶
func (c *MetaDataClient) CleanLastErrorOfTask(ctx context.Context, taskName string) error
func (*MetaDataClient) DeleteTask ¶
func (c *MetaDataClient) DeleteTask(ctx context.Context, taskName string) error
DeleteTask deletes a task, along with its metadata.
func (*MetaDataClient) GetAllTasks ¶
func (c *MetaDataClient) GetAllTasks(ctx context.Context) ([]Task, error)
GetAllTasks get all of tasks from metadata storage.
func (*MetaDataClient) GetAllTasksWithRevision ¶
func (*MetaDataClient) GetTaskCount ¶
func (c *MetaDataClient) GetTaskCount(ctx context.Context) (int, error)
GetTaskCount get the count of tasks from metadata storage.
func (*MetaDataClient) GetTaskWithPauseStatus ¶
func (*MetaDataClient) PauseTask ¶
func (c *MetaDataClient) PauseTask(ctx context.Context, taskName string) error
func (*MetaDataClient) PutTask ¶
func (c *MetaDataClient) PutTask(ctx context.Context, task TaskInfo) error
PutTask put a task to the metadata storage.
func (*MetaDataClient) ResumeTask ¶
func (c *MetaDataClient) ResumeTask(ctx context.Context, taskName string) error
func (*MetaDataClient) TaskByInfo ¶
func (c *MetaDataClient) TaskByInfo(t backuppb.StreamBackupTaskInfo) *Task
type PDRegionScanner ¶
PDRegionScanner is a simple wrapper over PD to adapt the requirement of `RegionScan`.
func (PDRegionScanner) BlockGCUntil ¶
Updates the service GC safe point for the cluster. Returns the minimal service GC safe point across all services. If the arguments is `0`, this would remove the service safe point.
func (PDRegionScanner) FetchCurrentTS ¶
func (c PDRegionScanner) FetchCurrentTS(ctx context.Context) (uint64, error)
TODO: It should be able to synchoronize the current TS with the PD.
func (PDRegionScanner) RegionScan ¶
func (c PDRegionScanner) RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error)
RegionScan gets a list of regions, starts from the region that contains key. Limit limits the maximum number of regions returned.
type RegionIter ¶
type RegionIter struct { // The max slice size returned by `Next`. // This can be changed before calling `Next` each time, // however no thread safety provided. PageSize int // contains filtered or unexported fields }
func IterateRegion ¶
func IterateRegion(cli TiKVClusterMeta, startKey, endKey []byte) *RegionIter
IterateRegion creates an iterater over the region range.
func (*RegionIter) Done ¶
func (r *RegionIter) Done() bool
Done checks whether the iteration is done.
func (*RegionIter) Next ¶
func (r *RegionIter) Next(ctx context.Context) ([]RegionWithLeader, error)
Next get the next page of regions.
func (*RegionIter) String ¶
func (r *RegionIter) String() string
type StoreCheckpoints ¶
type StoreCheckpoints struct { HasCheckpoint bool Checkpoint uint64 FailureSubRanges []kv.KeyRange }
func (*StoreCheckpoints) String ¶
func (s *StoreCheckpoints) String() string
type StreamMeta ¶
type StreamMeta interface { // Begin begins listen the task event change. Begin(ctx context.Context, ch chan<- TaskEvent) error // UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store. UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error // GetGlobalCheckpointForTask gets the global checkpoint from the meta store. GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) // ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store. ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error PauseTask(ctx context.Context, taskName string) error }
StreamMeta connects to the metadata service (normally PD). It provides the global checkpoint information.
type SubscriberConfig ¶
type SubscriberConfig func(*FlushSubscriber)
SubscriberConfig is a config which cloud be applied into the subscriber.
func WithMasterContext ¶
func WithMasterContext(ctx context.Context) SubscriberConfig
WithMasterContext sets the "master context" for the subscriber, that context would be the "background" context for every subtasks created by the subscription manager.
type Task ¶
type Task struct { Info backuppb.StreamBackupTaskInfo // contains filtered or unexported fields }
Task presents a remote "task" object. returned by a query of task. Associated to the client created it, hence be able to fetch remote fields like `ranges`.
func NewTask ¶
func NewTask(client *MetaDataClient, info backuppb.StreamBackupTaskInfo) *Task
func (*Task) GetGlobalCheckPointTS ¶
GetGlobalCheckPointTS gets the global checkpoint timestamp according to log task.
func (*Task) GetStorageCheckpoint ¶
func (*Task) NextBackupTSList ¶
func (t *Task) NextBackupTSList(ctx context.Context) ([]Checkpoint, error)
NextBackupTSList lists the backup ts of each store.
type TaskEvent ¶
type TaskInfo ¶
type TaskInfo struct { PBInfo backuppb.StreamBackupTaskInfo Ranges Ranges Pausing bool }
TaskInfo is a task info with extra information.
func NewTaskInfo ¶
NewTask creates a new task with the name.
func (*TaskInfo) ToStorage ¶
func (t *TaskInfo) ToStorage(backend *backuppb.StorageBackend) *TaskInfo
ToStorage indicates the backup task to the external storage.
func (*TaskInfo) UntilTS ¶
UntilTS set the terminal version of the stream backup, and return itself.
func (*TaskInfo) WithRanges ¶
WithRanges adds some ranges to the task, and return itself.
func (*TaskInfo) WithTableFilter ¶
WithTableFilterHint adds the table filter of the stream backup, and return itself. When schama version changed, TiDB should change the ranges of the task according to the table filter.
func (*TaskInfo) ZapTaskInfo ¶
type TiKVClusterMeta ¶
type TiKVClusterMeta interface { // RegionScan gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error) // Stores returns the store metadata from the cluster. Stores(ctx context.Context) ([]Store, error) // Updates the service GC safe point for the cluster. // Returns the latest service GC safe point. // If the arguments is `0`, this would remove the service safe point. // NOTE: once we support multi tasks, perhaps we need to allow the caller to provide a namespace. // For now, all tasks (exactly one task in fact) use the same checkpoint. BlockGCUntil(ctx context.Context, at uint64) (uint64, error) // UnblockGC used to remove the service GC safe point in PD. UnblockGC(ctx context.Context) error FetchCurrentTS(ctx context.Context) (uint64, error) }