cloudstorage

package
v0.0.0-...-f2eaa2c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: Apache-2.0 Imports: 35 Imported by: 8

Documentation

Overview

Copyright 2022 PingCAP, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsSchemaFile

func IsSchemaFile(path string) bool

IsSchemaFile checks whether the file is a schema file.

func RemoveEmptyDirs

func RemoveEmptyDirs(
	ctx context.Context,
	id model.ChangeFeedID,
	target string,
) (uint64, error)

RemoveEmptyDirs removes empty directories from external storage.

func RemoveExpiredFiles

func RemoveExpiredFiles(
	ctx context.Context,
	_ model.ChangeFeedID,
	storage storage.ExternalStorage,
	cfg *Config,
	checkpointTs model.Ts,
) (uint64, error)

RemoveExpiredFiles removes expired files from external storage.

Types

type Config

type Config struct {
	WorkerCount              int
	FlushInterval            time.Duration
	FileSize                 int
	FileIndexWidth           int
	DateSeparator            string
	FileExpirationDays       int
	FileCleanupCronSpec      string
	EnablePartitionSeparator bool
	OutputColumnID           bool
	FlushConcurrency         int
}

Config is the configuration for cloud storage sink.

func NewConfig

func NewConfig() *Config

NewConfig returns the default cloud storage sink config.

func (*Config) Apply

func (c *Config) Apply(
	ctx context.Context,
	sinkURI *url.URL,
	replicaConfig *config.ReplicaConfig,
) (err error)

Apply applies the sink URI parameters to the config.

type DmlPathKey

type DmlPathKey struct {
	SchemaPathKey
	PartitionNum int64
	Date         string
}

DmlPathKey is the key of dml path.

func (*DmlPathKey) GenerateDMLFilePath

func (d *DmlPathKey) GenerateDMLFilePath(
	idx uint64, extension string, fileIndexWidth int,
) string

GenerateDMLFilePath generates the dml file path.

func (*DmlPathKey) ParseDMLFilePath

func (d *DmlPathKey) ParseDMLFilePath(dateSeparator, path string) (uint64, error)

ParseDMLFilePath parses the dml file path and returns the max file index. DML file path pattern is as follows: {schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/, where partition-separator and date-separator could be empty. DML file name pattern is as follows: CDC{num}.extension.

type FilePathGenerator

type FilePathGenerator struct {
	// contains filtered or unexported fields
}

FilePathGenerator is used to generate data file path and index file path.

func NewFilePathGenerator

func NewFilePathGenerator(
	changefeedID model.ChangeFeedID,
	config *Config,
	storage storage.ExternalStorage,
	extension string,
	pdclock pdutil.Clock,
) *FilePathGenerator

NewFilePathGenerator creates a FilePathGenerator.

func (*FilePathGenerator) CheckOrWriteSchema

func (f *FilePathGenerator) CheckOrWriteSchema(
	ctx context.Context,
	table VersionedTableName,
	tableInfo *model.TableInfo,
) error

CheckOrWriteSchema checks whether the schema file exists in the storage and write scheme.json if necessary.

func (*FilePathGenerator) GenerateDataFilePath

func (f *FilePathGenerator) GenerateDataFilePath(
	ctx context.Context, tbl VersionedTableName, date string,
) (string, error)

GenerateDataFilePath generates a canonical path for data file.

func (*FilePathGenerator) GenerateDateStr

func (f *FilePathGenerator) GenerateDateStr() string

GenerateDateStr generates a date string base on current time and the date-separator configuration item.

func (*FilePathGenerator) GenerateIndexFilePath

func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTableName, date string) string

GenerateIndexFilePath generates a canonical path for index file.

func (*FilePathGenerator) SetClock

func (f *FilePathGenerator) SetClock(pdClock pdutil.Clock)

SetClock is used for unit test

type SchemaPathKey

type SchemaPathKey struct {
	Schema       string
	Table        string
	TableVersion uint64
}

SchemaPathKey is the key of schema path.

func (*SchemaPathKey) GetKey

func (s *SchemaPathKey) GetKey() string

GetKey returns the key of schema path.

func (*SchemaPathKey) ParseSchemaFilePath

func (s *SchemaPathKey) ParseSchemaFilePath(path string) (uint32, error)

ParseSchemaFilePath parses the schema file path and returns the table version and checksum.

type TableCol

type TableCol struct {
	ID        string      `json:"ColumnId,omitempty"`
	Name      string      `json:"ColumnName" `
	Tp        string      `json:"ColumnType"`
	Default   interface{} `json:"ColumnDefault,omitempty"`
	Precision string      `json:"ColumnPrecision,omitempty"`
	Scale     string      `json:"ColumnScale,omitempty"`
	Nullable  string      `json:"ColumnNullable,omitempty"`
	IsPK      string      `json:"ColumnIsPk,omitempty"`
}

TableCol denotes the column info for a table definition.

func (*TableCol) FromTiColumnInfo

func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo, outputColumnID bool)

FromTiColumnInfo converts from TiDB ColumnInfo to TableCol.

func (*TableCol) ToTiColumnInfo

func (t *TableCol) ToTiColumnInfo(colID int64) (*timodel.ColumnInfo, error)

ToTiColumnInfo converts from TableCol to TiDB ColumnInfo.

type TableDefinition

type TableDefinition struct {
	Table        string             `json:"Table"`
	Schema       string             `json:"Schema"`
	Version      uint64             `json:"Version"`
	TableVersion uint64             `json:"TableVersion"`
	Query        string             `json:"Query"`
	Type         timodel.ActionType `json:"Type"`
	Columns      []TableCol         `json:"TableColumns"`
	TotalColumns int                `json:"TableColumnsTotal"`
}

TableDefinition is the detailed table definition used for cloud storage sink. TODO: find a better name for this struct.

func (*TableDefinition) FromDDLEvent

func (t *TableDefinition) FromDDLEvent(event *model.DDLEvent, outputColumnID bool)

FromDDLEvent converts from DDLEvent to TableDefinition.

func (*TableDefinition) FromTableInfo

func (t *TableDefinition) FromTableInfo(
	info *model.TableInfo, tableInfoVersion model.Ts, outputColumnID bool,
)

FromTableInfo converts from TableInfo to TableDefinition.

func (*TableDefinition) GenerateSchemaFilePath

func (t *TableDefinition) GenerateSchemaFilePath() (string, error)

GenerateSchemaFilePath generates the schema file path for TableDefinition.

func (*TableDefinition) IsTableSchema

func (t *TableDefinition) IsTableSchema() bool

IsTableSchema returns whether the TableDefinition is a table schema.

func (*TableDefinition) MarshalWithQuery

func (t *TableDefinition) MarshalWithQuery() ([]byte, error)

MarshalWithQuery marshals TableDefinition with Query field.

func (*TableDefinition) Sum32

func (t *TableDefinition) Sum32(hasher *hash.PositionInertia) (uint32, error)

Sum32 returns the 32-bits hash value of TableDefinition.

func (*TableDefinition) ToDDLEvent

func (t *TableDefinition) ToDDLEvent() (*model.DDLEvent, error)

ToDDLEvent converts from TableDefinition to DDLEvent.

func (*TableDefinition) ToTableInfo

func (t *TableDefinition) ToTableInfo() (*model.TableInfo, error)

ToTableInfo converts from TableDefinition to DDLEvent.

type VersionedTableName

type VersionedTableName struct {
	// Because we need to generate different file paths for different
	// tables, we need to use the physical table ID instead of the
	// logical table ID.(Especially when the table is a partitioned table).
	TableNameWithPhysicTableID model.TableName
	// TableInfoVersion is consistent with the version of TableInfo recorded in
	// schema storage. It can either be finished ts of a DDL event,
	// or be the checkpoint ts when processor is restarted.
	TableInfoVersion uint64
}

VersionedTableName is used to wrap TableNameWithPhysicTableID with a version.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL