streamhelper

package
v1.1.0-beta.0...-d5dab77 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: Apache-2.0, Apache-2.0 Imports: 52 Imported by: 0

Documentation

Overview

Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CheckPointsOf

func CheckPointsOf(task string) string

CheckpointOf returns the checkpoint prefix of some store. Normally it would be <prefix>/checkpoint/<task-name>/.

func CheckRegionConsistency

func CheckRegionConsistency(startKey, endKey []byte, regions []RegionWithLeader) error

func GlobalCheckpointOf

func GlobalCheckpointOf(task string) string

GlobalCheckpointOf returns the path to the "global" checkpoint of some task. Normally it would be <prefix>/checkpoint/<task-name>/central_globa.

func LastErrorPrefixOf

func LastErrorPrefixOf(task string) string

LastErrorPrefixOf make the prefix for searching last error by some task.

func NewClusterCollector

func NewClusterCollector(ctx context.Context, srv LogBackupService) *clusterCollector

NewClusterCollector creates a new cluster collector. collectors are the structure transform region information to checkpoint information, by requesting the checkpoint of regions in the store.

func OwnerManagerForLogBackup

func OwnerManagerForLogBackup(ctx context.Context, etcdCli *clientv3.Client) owner.Manager

func Pause

func Pause(task string) string

Pause returns the path for pausing the task. Normally it would be <prefix>/pause/<task-name>.

func PrefixOfPause

func PrefixOfPause() string

PrefixOfPause returns the prefix for pausing the task. Normally it would be <prefix>/pause/

func PrefixOfTask

func PrefixOfTask() string

PrefixOfTask is the prefix of all task It would be `<prefix>/info/`

func RangeKeyOf

func RangeKeyOf(name string, startKey []byte) string

RangeKeyOf returns the path for ranges of some task. Normally it would be <prefix>/ranges/<task-name(string)>/<start-key(binary)> -> <end-key(binary)>

func RangesOf

func RangesOf(name string) string

RangesOf returns the path prefix for some task. Normally it would be <prefix>/ranges/<task-name(string)>/ the trailling slash is essential or we may scan ranges of tasks with same prefix.

func StorageCheckpointOf

func StorageCheckpointOf(task string) string

StorageCheckpointOf get the prefix path of the `storage checkpoint status` of a task. Normally it would be <prefix>/storage-checkpoint/<task>.

func TaskOf

func TaskOf(name string) string

TaskOf returns the path of tasks. Normally it would be <prefix>/info/<task-name(string)> -> <task(protobuf)>

Types

type AdvancerExt

type AdvancerExt struct {
	MetaDataClient
}

func (AdvancerExt) Begin

func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error

func (AdvancerExt) ClearV3GlobalCheckpointForTask

func (t AdvancerExt) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error

func (AdvancerExt) GetGlobalCheckpointForTask

func (t AdvancerExt) GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error)

func (AdvancerExt) UploadV3GlobalCheckpointForTask

func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error

type AdvancerLockResolver

type AdvancerLockResolver struct {
	*tikv.BaseRegionLockResolver
}

func (*AdvancerLockResolver) GetStore

func (l *AdvancerLockResolver) GetStore() tikv.Storage

If we don't implement GetStore here, it won't complie.

func (*AdvancerLockResolver) ResolveLocksInOneRegion

func (l *AdvancerLockResolver) ResolveLocksInOneRegion(
	bo *tikv.Backoffer, locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error)

ResolveLocksInOneRegion tries to resolve expired locks with this method. It will check status of the txn. Resolve the lock if txn is expired, Or do nothing.

type Checkpoint

type Checkpoint struct {
	ID      uint64 `json:"id,omitempty"`
	Version uint64 `json:"epoch_version,omitempty"`
	TS      uint64 `json:"ts"`

	IsGlobal bool `json:"-"`
}

Checkpoint is the polymorphic checkpoint type. The `ID` and `Version` implies the type of this checkpoint: When ID == 0 and Version == 0, it is the task start ts. When ID != 0 and Version == 0, it is the store level checkpoint. When ID != 0 and Version != 0, it is the region level checkpoint.

func ParseCheckpoint

func ParseCheckpoint(task string, key, value []byte) (Checkpoint, error)

ParseCheckpoint parses the checkpoint from a key & value pair.

func (Checkpoint) Type

func (cp Checkpoint) Type() CheckpointType

Type returns the type(provider) of the checkpoint.

type CheckpointAdvancer

type CheckpointAdvancer struct {
	// contains filtered or unexported fields
}

CheckpointAdvancer is the central node for advancing the checkpoint of log backup. It's a part of "checkpoint v3". Generally, it scan the regions in the task range, collect checkpoints from tikvs.

                                        ┌──────┐
                                  ┌────►│ TiKV │
                                  │     └──────┘
                                  │
                                  │
┌──────────┐GetLastFlushTSOfRegion│     ┌──────┐
│ Advancer ├──────────────────────┼────►│ TiKV │
└────┬─────┘                      │     └──────┘
     │                            │
     │                            │
     │                            │     ┌──────┐
     │                            └────►│ TiKV │
     │                                  └──────┘
     │
     │ UploadCheckpointV3   ┌──────────────────┐
     └─────────────────────►│  PD              │
                            └──────────────────┘

func NewCheckpointAdvancer

func NewCheckpointAdvancer(env Env) *CheckpointAdvancer

NewCheckpointAdvancer creates a checkpoint advancer with the env.

func (*CheckpointAdvancer) CalculateGlobalCheckpointLight

func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,
	threshold time.Duration) (spans.Valued, error)

func (*CheckpointAdvancer) Config

func (c *CheckpointAdvancer) Config() config.Config

Config returns the current config.

func (*CheckpointAdvancer) GetCheckpointInRange

func (c *CheckpointAdvancer) GetCheckpointInRange(ctx context.Context, start, end []byte,
	collector *clusterCollector) error

GetCheckpointInRange scans the regions in the range, collect them to the collector.

func (*CheckpointAdvancer) GetInResolvingLock

func (c *CheckpointAdvancer) GetInResolvingLock() bool

GetInResolvingLock only used for test.

func (*CheckpointAdvancer) HasSubscriptions

func (c *CheckpointAdvancer) HasSubscriptions() bool

HasSubscriptions returns whether the advancer is associated with a subscriber.

func (*CheckpointAdvancer) HasTask

func (c *CheckpointAdvancer) HasTask() bool

HasTask returns whether the advancer has been bound to a task.

func (*CheckpointAdvancer) Name

func (c *CheckpointAdvancer) Name() string

Name implements daemon.Interface.

func (*CheckpointAdvancer) OnBecomeOwner

func (c *CheckpointAdvancer) OnBecomeOwner(ctx context.Context)

OnBecomeOwner implements daemon.Interface. If the tidb-server become owner, this function will be called.

func (*CheckpointAdvancer) OnStart

func (c *CheckpointAdvancer) OnStart(ctx context.Context)

OnStart implements daemon.Interface, which will be called when log backup service starts.

func (*CheckpointAdvancer) OnStop

func (c *CheckpointAdvancer) OnStop()

func (*CheckpointAdvancer) OnTick

func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error)

OnTick advances the inner logic clock for the advancer. It's synchronous: this would only return after the events triggered by the clock has all been done. It's generally panic-free, you may not need to trying recover a panic here.

func (*CheckpointAdvancer) SpawnSubscriptionHandler

func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context)

func (*CheckpointAdvancer) StartTaskListener

func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context)

StartTaskListener starts the task listener for the advancer. When no task detected, advancer would do nothing, please call this before begin the tick loop.

func (*CheckpointAdvancer) TEST_registerCallbackForSubscriptions

func (c *CheckpointAdvancer) TEST_registerCallbackForSubscriptions(f func()) int

func (*CheckpointAdvancer) UpdateConfig

func (c *CheckpointAdvancer) UpdateConfig(newConf config.Config)

UpdateConfig updates the config for the advancer. Note this should be called before starting the loop, because there isn't locks, TODO: support updating config when advancer starts working. (Maybe by applying changes at begin of ticking, and add locks.)

func (*CheckpointAdvancer) UpdateConfigWith

func (c *CheckpointAdvancer) UpdateConfigWith(f func(*config.Config))

UpdateConfigWith updates the config by modifying the current config.

func (*CheckpointAdvancer) UpdateLastCheckpoint

func (c *CheckpointAdvancer) UpdateLastCheckpoint(p *checkpoint)

UpdateLastCheckpoint modify the checkpoint in ticking.

func (*CheckpointAdvancer) WithCheckpoints

func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull))

type CheckpointType

type CheckpointType int
const (
	CheckpointTypeStore CheckpointType = iota
	CheckpointTypeRegion
	CheckpointTypeTask
	CheckpointTypeGlobal
	CheckpointTypeInvalid
)

type Env

type Env interface {
	// The region scanner provides the region information.
	TiKVClusterMeta
	// LogBackupService connects to the TiKV, so we can collect the region checkpoints.
	LogBackupService
	// StreamMeta connects to the metadata service (normally PD).
	StreamMeta
	// GCLockResolver try to resolve locks when region checkpoint stopped.
	tikv.RegionLockResolver
}

Env is the interface required by the advancer.

func CliEnv

func CliEnv(cli *utils.StoreManager, tikvStore tikv.Storage, etcdCli *clientv3.Client) Env

CliEnv creates the Env for CLI usage.

func TiDBEnv

func TiDBEnv(tikvStore tikv.Storage, pdCli pd.Client, etcdCli *clientv3.Client, conf *config.Config) (Env, error)

TiDBEnv creates the Env by TiDB config.

type EventType

type EventType int
const (
	EventAdd EventType = iota
	EventDel
	EventErr
	EventPause
	EventResume
)

func (EventType) String

func (t EventType) String() string

type FlushSubscriber

type FlushSubscriber struct {
	// contains filtered or unexported fields
}

FlushSubscriber maintains the state of subscribing to the cluster.

func NewSubscriber

func NewSubscriber(dialer LogBackupService, cluster TiKVClusterMeta, config ...SubscriberConfig) *FlushSubscriber

NewSubscriber creates a new subscriber via the environment and optional configs.

func (*FlushSubscriber) Clear

func (f *FlushSubscriber) Clear()

Clear clears all the subscriptions.

func (*FlushSubscriber) Drop

func (f *FlushSubscriber) Drop()

Drop terminates the lifetime of the subscriber. This subscriber would be no more usable.

func (*FlushSubscriber) Events

func (f *FlushSubscriber) Events() <-chan spans.Valued

Events returns the output channel of the events.

func (*FlushSubscriber) HandleErrors

func (f *FlushSubscriber) HandleErrors(ctx context.Context)

HandleErrors execute the handlers over all pending errors. Note that the handler may cannot handle the pending errors, at that time, you can fetch the errors via `PendingErrors` call.

func (*FlushSubscriber) PendingErrors

func (f *FlushSubscriber) PendingErrors() error

func (*FlushSubscriber) UpdateStoreTopology

func (f *FlushSubscriber) UpdateStoreTopology(ctx context.Context) error

UpdateStoreTopology fetches the current store topology and try to adapt the subscription state with it.

type LogBackupService

type LogBackupService interface {
	// GetLogBackupClient gets the log backup client.
	GetLogBackupClient(ctx context.Context, storeID uint64) (logbackup.LogBackupClient, error)
	// Disable log backup client connection cache.
	ClearCache(ctx context.Context, storeID uint64) error
}

type MetaDataClient

type MetaDataClient struct {
	*clientv3.Client
}

MetaDataClient is the client for operations over metadata.

func NewMetaDataClient

func NewMetaDataClient(c *clientv3.Client) *MetaDataClient

func (*MetaDataClient) CleanLastErrorOfTask

func (c *MetaDataClient) CleanLastErrorOfTask(ctx context.Context, taskName string) error

func (*MetaDataClient) DeleteTask

func (c *MetaDataClient) DeleteTask(ctx context.Context, taskName string) error

DeleteTask deletes a task, along with its metadata.

func (*MetaDataClient) GetAllTasks

func (c *MetaDataClient) GetAllTasks(ctx context.Context) ([]Task, error)

GetAllTasks get all of tasks from metadata storage.

func (*MetaDataClient) GetAllTasksWithRevision

func (c *MetaDataClient) GetAllTasksWithRevision(ctx context.Context) ([]Task, int64, error)

func (*MetaDataClient) GetTask

func (c *MetaDataClient) GetTask(ctx context.Context, taskName string) (*Task, error)

GetTask get the basic task handle from the metadata storage.

func (*MetaDataClient) GetTaskCount

func (c *MetaDataClient) GetTaskCount(ctx context.Context) (int, error)

GetTaskCount get the count of tasks from metadata storage.

func (*MetaDataClient) GetTaskWithPauseStatus

func (c *MetaDataClient) GetTaskWithPauseStatus(ctx context.Context, taskName string) (*Task, bool, error)

func (*MetaDataClient) PauseTask

func (c *MetaDataClient) PauseTask(ctx context.Context, taskName string) error

func (*MetaDataClient) PutTask

func (c *MetaDataClient) PutTask(ctx context.Context, task TaskInfo) error

PutTask put a task to the metadata storage.

func (*MetaDataClient) ResumeTask

func (c *MetaDataClient) ResumeTask(ctx context.Context, taskName string) error

func (*MetaDataClient) TaskByInfo

type PDRegionScanner

type PDRegionScanner struct {
	pd.Client
}

PDRegionScanner is a simple wrapper over PD to adapt the requirement of `RegionScan`.

func (PDRegionScanner) BlockGCUntil

func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, error)

Updates the service GC safe point for the cluster. Returns the minimal service GC safe point across all services. If the arguments is `0`, this would remove the service safe point.

func (PDRegionScanner) FetchCurrentTS

func (c PDRegionScanner) FetchCurrentTS(ctx context.Context) (uint64, error)

TODO: It should be able to synchoronize the current TS with the PD.

func (PDRegionScanner) RegionScan

func (c PDRegionScanner) RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error)

RegionScan gets a list of regions, starts from the region that contains key. Limit limits the maximum number of regions returned.

func (PDRegionScanner) Stores

func (c PDRegionScanner) Stores(ctx context.Context) ([]Store, error)

func (PDRegionScanner) UnblockGC

func (c PDRegionScanner) UnblockGC(ctx context.Context) error

type Range

type Range = kv.KeyRange

type Ranges

type Ranges = []Range

Ranges is a vector of [start_key, end_key) pairs.

type RegionIter

type RegionIter struct {

	// The max slice size returned by `Next`.
	// This can be changed before calling `Next` each time,
	// however no thread safety provided.
	PageSize int
	// contains filtered or unexported fields
}

func IterateRegion

func IterateRegion(cli TiKVClusterMeta, startKey, endKey []byte) *RegionIter

IterateRegion creates an iterater over the region range.

func (*RegionIter) Done

func (r *RegionIter) Done() bool

Done checks whether the iteration is done.

func (*RegionIter) Next

func (r *RegionIter) Next(ctx context.Context) ([]RegionWithLeader, error)

Next get the next page of regions.

func (*RegionIter) String

func (r *RegionIter) String() string

type RegionWithLeader

type RegionWithLeader struct {
	Region *metapb.Region
	Leader *metapb.Peer
}

type Store

type Store struct {
	ID     uint64
	BootAt uint64
}

type StoreCheckpoints

type StoreCheckpoints struct {
	HasCheckpoint    bool
	Checkpoint       uint64
	FailureSubRanges []kv.KeyRange
}

func (*StoreCheckpoints) String

func (s *StoreCheckpoints) String() string

type StreamMeta

type StreamMeta interface {
	// Begin begins listen the task event change.
	Begin(ctx context.Context, ch chan<- TaskEvent) error
	// UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store.
	UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
	// GetGlobalCheckpointForTask gets the global checkpoint from the meta store.
	GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error)
	// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
	ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
	PauseTask(ctx context.Context, taskName string) error
}

StreamMeta connects to the metadata service (normally PD). It provides the global checkpoint information.

type SubscriberConfig

type SubscriberConfig func(*FlushSubscriber)

SubscriberConfig is a config which cloud be applied into the subscriber.

func WithMasterContext

func WithMasterContext(ctx context.Context) SubscriberConfig

WithMasterContext sets the "master context" for the subscriber, that context would be the "background" context for every subtasks created by the subscription manager.

type Task

type Task struct {
	Info backuppb.StreamBackupTaskInfo
	// contains filtered or unexported fields
}

Task presents a remote "task" object. returned by a query of task. Associated to the client created it, hence be able to fetch remote fields like `ranges`.

func NewTask

func NewTask(client *MetaDataClient, info backuppb.StreamBackupTaskInfo) *Task

func (*Task) GetGlobalCheckPointTS

func (t *Task) GetGlobalCheckPointTS(ctx context.Context) (uint64, error)

GetGlobalCheckPointTS gets the global checkpoint timestamp according to log task.

func (*Task) GetStorageCheckpoint

func (t *Task) GetStorageCheckpoint(ctx context.Context) (uint64, error)

func (*Task) IsPaused

func (t *Task) IsPaused(ctx context.Context) (bool, error)

func (*Task) LastError

func (t *Task) LastError(ctx context.Context) (map[uint64]backuppb.StreamBackupError, error)

func (*Task) NextBackupTSList

func (t *Task) NextBackupTSList(ctx context.Context) ([]Checkpoint, error)

NextBackupTSList lists the backup ts of each store.

func (*Task) Pause

func (t *Task) Pause(ctx context.Context) error

Pause is a shorthand for `metaCli.PauseTask`.

func (*Task) Ranges

func (t *Task) Ranges(ctx context.Context) (Ranges, error)

Ranges tries to fetch the range from the metadata storage.

func (*Task) Resume

func (t *Task) Resume(ctx context.Context) error

Resume is a shorthand for `metaCli.ResumeTask`

func (*Task) UploadGlobalCheckpoint

func (t *Task) UploadGlobalCheckpoint(ctx context.Context, ts uint64) error

type TaskEvent

type TaskEvent struct {
	Type   EventType
	Name   string
	Info   *backuppb.StreamBackupTaskInfo
	Ranges []kv.KeyRange
	Err    error
}

func (*TaskEvent) String

func (t *TaskEvent) String() string

type TaskInfo

type TaskInfo struct {
	PBInfo  backuppb.StreamBackupTaskInfo
	Ranges  Ranges
	Pausing bool
}

TaskInfo is a task info with extra information.

func NewTaskInfo

func NewTaskInfo(name string) *TaskInfo

NewTask creates a new task with the name.

func (*TaskInfo) Check

func (t *TaskInfo) Check() (*TaskInfo, error)

func (*TaskInfo) FromTS

func (t *TaskInfo) FromTS(ts uint64) *TaskInfo

FromTS set the initial version of the stream backup, and return itself.

func (*TaskInfo) ToStorage

func (t *TaskInfo) ToStorage(backend *backuppb.StorageBackend) *TaskInfo

ToStorage indicates the backup task to the external storage.

func (*TaskInfo) UntilTS

func (t *TaskInfo) UntilTS(ts uint64) *TaskInfo

UntilTS set the terminal version of the stream backup, and return itself.

func (*TaskInfo) WithRange

func (t *TaskInfo) WithRange(startKey, endKey []byte) *TaskInfo

WithRange adds a backup range to the task, and return itself.

func (*TaskInfo) WithRanges

func (t *TaskInfo) WithRanges(ranges ...Range) *TaskInfo

WithRanges adds some ranges to the task, and return itself.

func (*TaskInfo) WithTableFilter

func (t *TaskInfo) WithTableFilter(filterChain ...string) *TaskInfo

WithTableFilterHint adds the table filter of the stream backup, and return itself. When schama version changed, TiDB should change the ranges of the task according to the table filter.

func (*TaskInfo) ZapTaskInfo

func (t *TaskInfo) ZapTaskInfo() []zap.Field

type TiKVClusterMeta

type TiKVClusterMeta interface {
	// RegionScan gets a list of regions, starts from the region that contains key.
	// Limit limits the maximum number of regions returned.
	RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error)

	// Stores returns the store metadata from the cluster.
	Stores(ctx context.Context) ([]Store, error)

	// Updates the service GC safe point for the cluster.
	// Returns the latest service GC safe point.
	// If the arguments is `0`, this would remove the service safe point.
	// NOTE: once we support multi tasks, perhaps we need to allow the caller to provide a namespace.
	// For now, all tasks (exactly one task in fact) use the same checkpoint.
	BlockGCUntil(ctx context.Context, at uint64) (uint64, error)

	// UnblockGC used to remove the service GC safe point in PD.
	UnblockGC(ctx context.Context) error

	FetchCurrentTS(ctx context.Context) (uint64, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL