Documentation ¶
Overview ¶
Copyright 2022 PingCAP, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, See the License for the specific language governing permissions and limitations under the License.
Index ¶
- func IsSchemaFile(path string) bool
- func RemoveEmptyDirs(ctx context.Context, id model.ChangeFeedID, target string) (uint64, error)
- func RemoveExpiredFiles(ctx context.Context, _ model.ChangeFeedID, storage storage.ExternalStorage, ...) (uint64, error)
- type Config
- type DmlPathKey
- type FilePathGenerator
- func (f *FilePathGenerator) CheckOrWriteSchema(ctx context.Context, table VersionedTableName, tableInfo *model.TableInfo) error
- func (f *FilePathGenerator) GenerateDataFilePath(ctx context.Context, tbl VersionedTableName, date string) (string, error)
- func (f *FilePathGenerator) GenerateDateStr() string
- func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTableName, date string) string
- func (f *FilePathGenerator) SetClock(pdClock pdutil.Clock)
- type SchemaPathKey
- type TableCol
- type TableDefinition
- func (t *TableDefinition) FromDDLEvent(event *model.DDLEvent, outputColumnID bool)
- func (t *TableDefinition) FromTableInfo(info *model.TableInfo, tableInfoVersion model.Ts, outputColumnID bool)
- func (t *TableDefinition) GenerateSchemaFilePath() (string, error)
- func (t *TableDefinition) IsTableSchema() bool
- func (t *TableDefinition) MarshalWithQuery() ([]byte, error)
- func (t *TableDefinition) Sum32(hasher *hash.PositionInertia) (uint32, error)
- func (t *TableDefinition) ToDDLEvent() (*model.DDLEvent, error)
- func (t *TableDefinition) ToTableInfo() (*model.TableInfo, error)
- type VersionedTableName
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsSchemaFile ¶
IsSchemaFile checks whether the file is a schema file.
func RemoveEmptyDirs ¶
RemoveEmptyDirs removes empty directories from external storage.
func RemoveExpiredFiles ¶
func RemoveExpiredFiles( ctx context.Context, _ model.ChangeFeedID, storage storage.ExternalStorage, cfg *Config, checkpointTs model.Ts, ) (uint64, error)
RemoveExpiredFiles removes expired files from external storage.
Types ¶
type Config ¶
type Config struct { WorkerCount int FlushInterval time.Duration FileSize int FileIndexWidth int DateSeparator string FileExpirationDays int FileCleanupCronSpec string EnablePartitionSeparator bool OutputColumnID bool FlushConcurrency int }
Config is the configuration for cloud storage sink.
type DmlPathKey ¶
type DmlPathKey struct { SchemaPathKey PartitionNum int64 Date string }
DmlPathKey is the key of dml path.
func (*DmlPathKey) GenerateDMLFilePath ¶
func (d *DmlPathKey) GenerateDMLFilePath( idx uint64, extension string, fileIndexWidth int, ) string
GenerateDMLFilePath generates the dml file path.
func (*DmlPathKey) ParseDMLFilePath ¶
func (d *DmlPathKey) ParseDMLFilePath(dateSeparator, path string) (uint64, error)
ParseDMLFilePath parses the dml file path and returns the max file index. DML file path pattern is as follows: {schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/, where partition-separator and date-separator could be empty. DML file name pattern is as follows: CDC{num}.extension.
type FilePathGenerator ¶
type FilePathGenerator struct {
// contains filtered or unexported fields
}
FilePathGenerator is used to generate data file path and index file path.
func NewFilePathGenerator ¶
func NewFilePathGenerator( changefeedID model.ChangeFeedID, config *Config, storage storage.ExternalStorage, extension string, pdclock pdutil.Clock, ) *FilePathGenerator
NewFilePathGenerator creates a FilePathGenerator.
func (*FilePathGenerator) CheckOrWriteSchema ¶
func (f *FilePathGenerator) CheckOrWriteSchema( ctx context.Context, table VersionedTableName, tableInfo *model.TableInfo, ) error
CheckOrWriteSchema checks whether the schema file exists in the storage and write scheme.json if necessary.
func (*FilePathGenerator) GenerateDataFilePath ¶
func (f *FilePathGenerator) GenerateDataFilePath( ctx context.Context, tbl VersionedTableName, date string, ) (string, error)
GenerateDataFilePath generates a canonical path for data file.
func (*FilePathGenerator) GenerateDateStr ¶
func (f *FilePathGenerator) GenerateDateStr() string
GenerateDateStr generates a date string base on current time and the date-separator configuration item.
func (*FilePathGenerator) GenerateIndexFilePath ¶
func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTableName, date string) string
GenerateIndexFilePath generates a canonical path for index file.
func (*FilePathGenerator) SetClock ¶
func (f *FilePathGenerator) SetClock(pdClock pdutil.Clock)
SetClock is used for unit test
type SchemaPathKey ¶
SchemaPathKey is the key of schema path.
func (*SchemaPathKey) GetKey ¶
func (s *SchemaPathKey) GetKey() string
GetKey returns the key of schema path.
func (*SchemaPathKey) ParseSchemaFilePath ¶
func (s *SchemaPathKey) ParseSchemaFilePath(path string) (uint32, error)
ParseSchemaFilePath parses the schema file path and returns the table version and checksum.
type TableCol ¶
type TableCol struct { ID string `json:"ColumnId,omitempty"` Name string `json:"ColumnName" ` Tp string `json:"ColumnType"` Default interface{} `json:"ColumnDefault,omitempty"` Precision string `json:"ColumnPrecision,omitempty"` Scale string `json:"ColumnScale,omitempty"` Nullable string `json:"ColumnNullable,omitempty"` IsPK string `json:"ColumnIsPk,omitempty"` }
TableCol denotes the column info for a table definition.
func (*TableCol) FromTiColumnInfo ¶
func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo, outputColumnID bool)
FromTiColumnInfo converts from TiDB ColumnInfo to TableCol.
func (*TableCol) ToTiColumnInfo ¶
func (t *TableCol) ToTiColumnInfo(colID int64) (*timodel.ColumnInfo, error)
ToTiColumnInfo converts from TableCol to TiDB ColumnInfo.
type TableDefinition ¶
type TableDefinition struct { Table string `json:"Table"` Schema string `json:"Schema"` Version uint64 `json:"Version"` TableVersion uint64 `json:"TableVersion"` Query string `json:"Query"` Type timodel.ActionType `json:"Type"` Columns []TableCol `json:"TableColumns"` TotalColumns int `json:"TableColumnsTotal"` }
TableDefinition is the detailed table definition used for cloud storage sink. TODO: find a better name for this struct.
func (*TableDefinition) FromDDLEvent ¶
func (t *TableDefinition) FromDDLEvent(event *model.DDLEvent, outputColumnID bool)
FromDDLEvent converts from DDLEvent to TableDefinition.
func (*TableDefinition) FromTableInfo ¶
func (t *TableDefinition) FromTableInfo( info *model.TableInfo, tableInfoVersion model.Ts, outputColumnID bool, )
FromTableInfo converts from TableInfo to TableDefinition.
func (*TableDefinition) GenerateSchemaFilePath ¶
func (t *TableDefinition) GenerateSchemaFilePath() (string, error)
GenerateSchemaFilePath generates the schema file path for TableDefinition.
func (*TableDefinition) IsTableSchema ¶
func (t *TableDefinition) IsTableSchema() bool
IsTableSchema returns whether the TableDefinition is a table schema.
func (*TableDefinition) MarshalWithQuery ¶
func (t *TableDefinition) MarshalWithQuery() ([]byte, error)
MarshalWithQuery marshals TableDefinition with Query field.
func (*TableDefinition) Sum32 ¶
func (t *TableDefinition) Sum32(hasher *hash.PositionInertia) (uint32, error)
Sum32 returns the 32-bits hash value of TableDefinition.
func (*TableDefinition) ToDDLEvent ¶
func (t *TableDefinition) ToDDLEvent() (*model.DDLEvent, error)
ToDDLEvent converts from TableDefinition to DDLEvent.
func (*TableDefinition) ToTableInfo ¶
func (t *TableDefinition) ToTableInfo() (*model.TableInfo, error)
ToTableInfo converts from TableDefinition to DDLEvent.
type VersionedTableName ¶
type VersionedTableName struct { // Because we need to generate different file paths for different // tables, we need to use the physical table ID instead of the // logical table ID.(Especially when the table is a partitioned table). TableNameWithPhysicTableID model.TableName // TableInfoVersion is consistent with the version of TableInfo recorded in // schema storage. It can either be finished ts of a DDL event, // or be the checkpoint ts when processor is restarted. TableInfoVersion uint64 }
VersionedTableName is used to wrap TableNameWithPhysicTableID with a version.