ttlworker

package
v1.1.0-beta.0...-d5dab77 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: Apache-2.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AttachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
	return s
}

AttachStatsCollector attaches the stats collector for the session. this function is registered in BootstrapSession in /session/session.go

View Source
var DetachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
	return s
}

DetachStatsCollector removes the stats collector for the session this function is registered in BootstrapSession in /session/session.go

Functions

func GetDelayMetricRecords

func GetDelayMetricRecords(ctx context.Context, se session.Session, now time.Time) (map[int64]*metrics.DelayMetricsRecord, error)

GetDelayMetricRecords gets the records of TTL delay metrics

func NewScanSession

func NewScanSession(se session.Session, tbl *cache.PhysicalTable, expire time.Time) (*ttlTableSession, func(), error)

NewScanSession creates a session for scan

Types

type JobManager

type JobManager struct {
	// contains filtered or unexported fields
}

JobManager schedules and manages the ttl jobs on this instance

func NewJobManager

func NewJobManager(id string, sessPool util.SessionPool, store kv.Storage, etcdCli *clientv3.Client, leaderFunc func() bool) (manager *JobManager)

NewJobManager creates a new ttl job manager

func (*JobManager) DoGC

func (m *JobManager) DoGC(ctx context.Context, se session.Session, now time.Time)

DoGC deletes some old TTL job histories and redundant scan tasks

func (*JobManager) Error

func (w *JobManager) Error() error

func (*JobManager) GetCommandCli

func (m *JobManager) GetCommandCli() client.CommandClient

GetCommandCli returns the command client

func (*JobManager) GetNotificationCli

func (m *JobManager) GetNotificationCli() client.NotificationClient

GetNotificationCli returns the notification client

func (*JobManager) Send

func (w *JobManager) Send() chan<- any

func (*JobManager) Start

func (w *JobManager) Start()

func (*JobManager) Status

func (w *JobManager) Status() workerStatus

func (*JobManager) Stop

func (w *JobManager) Stop()

func (*JobManager) WaitStopped

func (w *JobManager) WaitStopped(ctx context.Context, timeout time.Duration) error

type SubmitTTLManagerJobRequest

type SubmitTTLManagerJobRequest struct {
	// TableID indicates the parent table id
	TableID int64
	//  PhysicalID indicates the physical table id
	PhysicalID int64
	//  RequestID indicates the request id of the job
	RequestID string
	// RespCh indicates the channel for response
	RespCh chan<- error
}

SubmitTTLManagerJobRequest is the request to submit a TTL job to manager

type TTLJobAdapter

type TTLJobAdapter interface {
	// Now returns the current time with system timezone.
	Now() (time.Time, error)
	// CanSubmitJob returns whether a new job can be created for the specified table
	CanSubmitJob(tableID, physicalID int64) bool
	// SubmitJob submits a new job
	SubmitJob(ctx context.Context, tableID, physicalID int64, requestID string, watermark time.Time) (*TTLJobTrace, error)
	// GetJob returns the job to trace
	GetJob(ctx context.Context, tableID, physicalID int64, requestID string) (*TTLJobTrace, error)
}

TTLJobAdapter is used to submit TTL job and trace job status

func NewManagerJobAdapter

func NewManagerJobAdapter(store kv.Storage, sessPool util.SessionPool, requestCh chan<- *SubmitTTLManagerJobRequest) TTLJobAdapter

NewManagerJobAdapter creates a managerJobAdapter

type TTLJobTrace

type TTLJobTrace struct {
	// RequestID is the request id when job submitted, we can use it to trace a job
	RequestID string
	// Finished indicates whether the job is finished
	Finished bool
	// Summary indicates the summary of the job
	Summary *TTLSummary
}

TTLJobTrace contains some TTL job information to trace

type TTLSummary

type TTLSummary struct {
	TotalRows   uint64 `json:"total_rows"`
	SuccessRows uint64 `json:"success_rows"`
	ErrorRows   uint64 `json:"error_rows"`

	TotalScanTask     int `json:"total_scan_task"`
	ScheduledScanTask int `json:"scheduled_scan_task"`
	FinishedScanTask  int `json:"finished_scan_task"`

	ScanTaskErr string `json:"scan_task_err,omitempty"`
	SummaryText string `json:"-"`
}

TTLSummary is the summary for TTL job

type TTLTimerData

type TTLTimerData struct {
	TableID    int64 `json:"table_id"`
	PhysicalID int64 `json:"physical_id"`
}

TTLTimerData is the data stored in each timer for TTL

type TTLTimersSyncer

type TTLTimersSyncer struct {
	// contains filtered or unexported fields
}

TTLTimersSyncer is used to sync timers for ttl

func NewTTLTimerSyncer

func NewTTLTimerSyncer(pool util.SessionPool, cli timerapi.TimerClient) *TTLTimersSyncer

NewTTLTimerSyncer creates a new TTLTimersSyncer

func (*TTLTimersSyncer) GetCachedTimerRecord

func (g *TTLTimersSyncer) GetCachedTimerRecord(key string) (r *timerapi.TimerRecord, ok bool)

GetCachedTimerRecord returns a cached timer by key

func (*TTLTimersSyncer) GetLastSyncInfo

func (g *TTLTimersSyncer) GetLastSyncInfo() (time.Time, int64)

GetLastSyncInfo returns last sync time and information schema version

func (*TTLTimersSyncer) ManualTriggerTTLTimer

func (g *TTLTimersSyncer) ManualTriggerTTLTimer(ctx context.Context, tbl *cache.PhysicalTable) (func() (string, bool, error), error)

ManualTriggerTTLTimer triggers a TTL job for a physical table which returns a function to wait the job done. This returned function returns a bool value to indicates whether the job is finished.

func (*TTLTimersSyncer) Reset

func (g *TTLTimersSyncer) Reset()

Reset resets the syncer's state

func (*TTLTimersSyncer) SetDelayDeleteInterval

func (g *TTLTimersSyncer) SetDelayDeleteInterval(interval time.Duration)

SetDelayDeleteInterval sets interval for delay delete a timer It's better not to delete a timer immediately when the related table is not exist. The reason is that information schema is synced asynchronously, the new created table's meta may not synced to the current node yet.

func (*TTLTimersSyncer) SyncTimers

func (g *TTLTimersSyncer) SyncTimers(ctx context.Context, is infoschema.InfoSchema)

SyncTimers syncs timers with TTL tables

type TaskTerminateReason

type TaskTerminateReason string

TaskTerminateReason indicates the reason why the task is terminated.

const (
	// ReasonTaskFinished indicates the task is finished.
	ReasonTaskFinished TaskTerminateReason = "finished"
	// ReasonError indicates whether the task is terminated because of error.
	ReasonError TaskTerminateReason = "error"
	// ReasonWorkerStop indicates whether the task is terminated because the scan worker stops.
	// We should reschedule this task in another worker or TiDB again.
	ReasonWorkerStop TaskTerminateReason = "workerStop"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL