Documentation ¶
Index ¶
- Constants
- Variables
- func AllocateEngineIDs(filesRegions []*TableRegion, dataFileSizes []float64, batchSize float64, ...)
- func CalculateBatchSize(mydumperBatchSize float64, isRowOrdered bool, totalSize float64) float64
- func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, ...) ([]byte, error)
- func IndexAnyByte(s []byte, as *byteSet) int
- func OpenParquetReader(ctx context.Context, store storage.ExternalStorage, path string, size int64) (source.ParquetFile, error)
- func OpenReader(ctx context.Context, fileMeta *SourceFileMeta, store storage.ExternalStorage, ...) (reader storage.ReadSeekCloser, err error)
- func ReadParquetFileRowCountByFile(ctx context.Context, store storage.ExternalStorage, fileMeta SourceFileMeta) (int64, error)
- func ReadUntil(parser Parser, pos int64) error
- func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)
- func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)
- func ToStorageCompressType(compression Compression) (storage.CompressType, error)
- type CSVParser
- func (parser *CSVParser) Close() error
- func (parser *CSVParser) Columns() []string
- func (parser *CSVParser) LastRow() Row
- func (parser *CSVParser) Pos() (pos int64, lastRowID int64)
- func (parser *CSVParser) ReadColumns() error
- func (parser *CSVParser) ReadRow() error
- func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error)
- func (parser *CSVParser) RecycleRow(row Row)
- func (parser *CSVParser) ScannedPos() (int64, error)
- func (parser *CSVParser) SetColumns(columns []string)
- func (parser *CSVParser) SetLogger(logger log.Logger)
- func (parser *CSVParser) SetPos(pos int64, rowID int64) error
- func (parser *CSVParser) SetRowID(rowID int64)
- type CharsetConvertor
- type Chunk
- type ChunkParser
- func (parser *ChunkParser) Close() error
- func (parser *ChunkParser) Columns() []string
- func (parser *ChunkParser) LastRow() Row
- func (parser *ChunkParser) Pos() (pos int64, lastRowID int64)
- func (parser *ChunkParser) ReadRow() error
- func (parser *ChunkParser) RecycleRow(row Row)
- func (parser *ChunkParser) ScannedPos() (int64, error)
- func (parser *ChunkParser) SetColumns(columns []string)
- func (parser *ChunkParser) SetLogger(logger log.Logger)
- func (parser *ChunkParser) SetPos(pos int64, rowID int64) error
- func (parser *ChunkParser) SetRowID(rowID int64)
- type Compression
- type DataDivideConfig
- type ExtendColumnData
- type FileHandler
- type FileInfo
- type FileIterator
- type FileRouter
- type LoaderConfig
- type MDDatabaseMeta
- type MDLoader
- type MDLoaderSetupConfig
- type MDLoaderSetupOption
- type MDTableMeta
- type ParquetParser
- func (pp *ParquetParser) Close() error
- func (pp *ParquetParser) Columns() []string
- func (pp *ParquetParser) LastRow() Row
- func (pp *ParquetParser) Pos() (pos int64, rowID int64)
- func (pp *ParquetParser) ReadRow() error
- func (*ParquetParser) RecycleRow(_ Row)
- func (pp *ParquetParser) ScannedPos() (int64, error)
- func (*ParquetParser) SetColumns(_ []string)
- func (pp *ParquetParser) SetLogger(l log.Logger)
- func (pp *ParquetParser) SetPos(pos int64, rowID int64) error
- func (pp *ParquetParser) SetRowID(rowID int64)
- type Parser
- type PooledReader
- type ReadSeekCloser
- type RegexRouter
- type RouteResult
- type Row
- type SchemaImporter
- type SourceFileMeta
- type SourceType
- type StringReader
- type TableRegion
- func MakeSourceFileRegion(ctx context.Context, cfg *DataDivideConfig, fi FileInfo) ([]*TableRegion, []float64, error)
- func MakeTableRegions(ctx context.Context, cfg *DataDivideConfig) ([]*TableRegion, error)
- func SplitLargeCSV(ctx context.Context, cfg *DataDivideConfig, dataFile FileInfo) (regions []*TableRegion, dataFileSizes []float64, err error)
Constants ¶
const ( // TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency // It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files. TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold // CompressSizeFactor is used to adjust compressed data size CompressSizeFactor = 5 )
const ( // SchemaSchema is the source type value for schema file for DB. SchemaSchema = "schema-schema" // TableSchema is the source type value for schema file for table. TableSchema = "table-schema" // ViewSchema is the source type value for schema file for view. ViewSchema = "view-schema" // TypeSQL is the source type value for sql data file. TypeSQL = "sql" // TypeCSV is the source type value for csv data file. TypeCSV = "csv" // TypeParquet is the source type value for parquet data file. TypeParquet = "parquet" // TypeIgnore is the source type value for a ignored data file. TypeIgnore = "ignore" )
Variables ¶
var ( // ErrInsertStatementNotFound is the error that cannot find the insert statement. ErrInsertStatementNotFound = errors.New("insert statement not found") )
var ( // LargestEntryLimit is the max size for reading file to buf LargestEntryLimit int )
Functions ¶
func AllocateEngineIDs ¶
func AllocateEngineIDs( filesRegions []*TableRegion, dataFileSizes []float64, batchSize float64, batchImportRatio float64, engineConcurrency float64, )
AllocateEngineIDs allocates the table engine IDs.
func CalculateBatchSize ¶
CalculateBatchSize calculates batch size according to row order and file size.
func ExportStatement ¶
func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, characterSet string) ([]byte, error)
ExportStatement exports the SQL statement in the schema file.
func IndexAnyByte ¶
IndexAnyByte returns the byte index of the first occurrence in s of any of the byte points in chars. It returns -1 if there is no code point in common.
func OpenParquetReader ¶
func OpenParquetReader( ctx context.Context, store storage.ExternalStorage, path string, size int64, ) (source.ParquetFile, error)
OpenParquetReader opens a parquet file and returns a handle that can at least read the file.
func OpenReader ¶
func OpenReader( ctx context.Context, fileMeta *SourceFileMeta, store storage.ExternalStorage, decompressCfg storage.DecompressConfig, ) (reader storage.ReadSeekCloser, err error)
OpenReader opens a reader for the given file and storage.
func ReadParquetFileRowCountByFile ¶
func ReadParquetFileRowCountByFile( ctx context.Context, store storage.ExternalStorage, fileMeta SourceFileMeta, ) (int64, error)
ReadParquetFileRowCountByFile reads the parquet file row count through fileMeta.
func ReadUntil ¶
ReadUntil parses the entire file and splits it into continuous chunks of size >= minSize.
func SampleFileCompressRatio ¶
func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)
SampleFileCompressRatio samples the compress ratio of the compressed file.
func SampleParquetRowSize ¶
func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)
SampleParquetRowSize samples row size of the parquet file.
func ToStorageCompressType ¶
func ToStorageCompressType(compression Compression) (storage.CompressType, error)
ToStorageCompressType converts Compression to storage.CompressType.
Types ¶
type CSVParser ¶
type CSVParser struct {
// contains filtered or unexported fields
}
CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.
func NewCSVParser ¶
func NewCSVParser( ctx context.Context, cfg *config.CSVConfig, reader ReadSeekCloser, blockBufSize int64, ioWorkers *worker.Pool, shouldParseHeader bool, charsetConvertor *CharsetConvertor, ) (*CSVParser, error)
NewCSVParser creates a CSV parser. The ownership of the reader is transferred to the parser.
func (*CSVParser) LastRow ¶
func (parser *CSVParser) LastRow() Row
LastRow is the copy of the row parsed by the last call to ReadRow().
func (*CSVParser) Pos ¶
Pos returns the current file offset. Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (*CSVParser) ReadColumns ¶
ReadColumns reads the columns of this CSV file.
func (*CSVParser) ReadUntilTerminator ¶
ReadUntilTerminator seeks the file until the terminator token is found, and returns - the content with terminator, or the content read before meet error - the file offset beyond the terminator, or the offset when meet error - error Note that the terminator string pattern may be the content of a field, which means it's inside quotes. Caller should make sure to handle this case.
func (*CSVParser) RecycleRow ¶
func (parser *CSVParser) RecycleRow(row Row)
RecycleRow places the row object back into the allocation pool.
func (*CSVParser) ScannedPos ¶
ScannedPos gets the read position of current reader. this always returns the position of the underlying file, either compressed or not.
func (*CSVParser) SetColumns ¶
func (parser *CSVParser) SetColumns(columns []string)
type CharsetConvertor ¶
type CharsetConvertor struct {
// contains filtered or unexported fields
}
CharsetConvertor is used to convert a character set to utf8mb4 encoding. In Lightning, we mainly use it to do the GB18030/GBK -> UTF8MB4 conversion.
func NewCharsetConvertor ¶
func NewCharsetConvertor(dataCharacterSet, dataInvalidCharReplace string) (*CharsetConvertor, error)
NewCharsetConvertor creates a new CharsetConvertor.
func (*CharsetConvertor) Decode ¶
func (cc *CharsetConvertor) Decode(src string) (string, error)
Decode does the charset conversion work from sourceCharacterSet to utf8mb4. It will return a string as the conversion result whose length may be less or greater than the original string `src`. TODO: maybe using generic type later to make Decode/Encode accept both []byte and string.
type Chunk ¶
type Chunk struct { Offset int64 // for parquet file, it's the total row count // see makeParquetFileRegion EndOffset int64 RealOffset int64 // we estimate row-id range of the chunk using file-size divided by some factor(depends on column count) // after estimation, we will rebase them for all chunks of this table in this instance, // then it's rebased again based on all instances of parallel import. // allocatable row-id is in range (PrevRowIDMax, RowIDMax]. // PrevRowIDMax will be increased during local encoding PrevRowIDMax int64 RowIDMax int64 // only assigned when using strict-mode for CSV files and the file contains header Columns []string }
Chunk represents a portion of the data file.
type ChunkParser ¶
type ChunkParser struct {
// contains filtered or unexported fields
}
ChunkParser is a parser of the data files (the file containing only INSERT statements).
func NewChunkParser ¶
func NewChunkParser( ctx context.Context, sqlMode mysql.SQLMode, reader ReadSeekCloser, blockBufSize int64, ioWorkers *worker.Pool, ) *ChunkParser
NewChunkParser creates a new parser which can read chunks out of a file.
func (*ChunkParser) LastRow ¶
func (parser *ChunkParser) LastRow() Row
LastRow is the copy of the row parsed by the last call to ReadRow().
func (*ChunkParser) Pos ¶
Pos returns the current file offset. Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (*ChunkParser) ReadRow ¶
func (parser *ChunkParser) ReadRow() error
ReadRow reads a row from the datafile.
func (*ChunkParser) RecycleRow ¶
func (parser *ChunkParser) RecycleRow(row Row)
RecycleRow places the row object back into the allocation pool.
func (*ChunkParser) ScannedPos ¶
ScannedPos gets the read position of current reader. this always returns the position of the underlying file, either compressed or not.
func (*ChunkParser) SetColumns ¶
func (parser *ChunkParser) SetColumns(columns []string)
type Compression ¶
type Compression int
Compression specifies the compression type.
const ( // CompressionNone is the compression type that with no compression. CompressionNone Compression = iota // CompressionGZ is the compression type that uses GZ algorithm. CompressionGZ // CompressionLZ4 is the compression type that uses LZ4 algorithm. CompressionLZ4 // CompressionZStd is the compression type that uses ZStd algorithm. CompressionZStd // CompressionXZ is the compression type that uses XZ algorithm. CompressionXZ // CompressionLZO is the compression type that uses LZO algorithm. CompressionLZO // CompressionSnappy is the compression type that uses Snappy algorithm. CompressionSnappy )
func ParseCompressionOnFileExtension ¶
func ParseCompressionOnFileExtension(filename string) Compression
ParseCompressionOnFileExtension parses the compression type from the file extension.
type DataDivideConfig ¶
type DataDivideConfig struct { ColumnCnt int // limit of engine size, we have a complex algorithm to calculate the best engine size, see AllocateEngineIDs. EngineDataSize int64 // max chunk size(inside this file we named it region which collides with TiKV region) MaxChunkSize int64 // number of concurrent workers to dive data files Concurrency int // number of engine runs concurrently, need this to calculate the best engine size for pipelining local-sort and import. // todo: remove those 2 params, the algorithm seems useless, since we can import concurrently now, the foundation // assumption of the algorithm is broken. EngineConcurrency int // used together with prev param. it is 0.75 nearly all the time, see Mydumper.BatchImportRatio. // this variable is defined as speed-write-to-TiKV / speed-to-do-local-sort BatchImportRatio float64 // used to split large CSV files, to limit concurrency of data read/seek operations // when nil, no limit. IOWorkers *worker.Pool // we need it read row-count for parquet, and to read line terminator to split large CSV files Store storage.ExternalStorage TableMeta *MDTableMeta // only used when split large CSV files. StrictFormat bool DataCharacterSet string DataInvalidCharReplace string ReadBlockSize int64 CSV config.CSVConfig }
DataDivideConfig config used to divide data files into chunks/engines(regions in this context).
func NewDataDivideConfig ¶
func NewDataDivideConfig(cfg *config.Config, columns int, ioWorkers *worker.Pool, store storage.ExternalStorage, meta *MDTableMeta, ) *DataDivideConfig
NewDataDivideConfig creates a new DataDivideConfig from lightning cfg.
type ExtendColumnData ¶
ExtendColumnData contains the extended column names and values information for a table.
type FileHandler ¶
FileHandler is the interface to handle the file give the path and size. It is mainly used in the `FileIterator` as parameters.
type FileInfo ¶
type FileInfo struct { TableName filter.Table FileMeta SourceFileMeta }
FileInfo contains the information for a data file in a table.
type FileIterator ¶
type FileIterator interface {
IterateFiles(ctx context.Context, hdl FileHandler) error
}
FileIterator is the interface to iterate files in a data source. Use this interface to customize the file iteration policy.
type FileRouter ¶
type FileRouter interface { // Route apply rule to path. Return nil if path doesn't match route rule; // return error if path match route rule but the captured value for field is invalid Route(path string) (*RouteResult, error) }
FileRouter provides some operations to apply a rule to route file path to target schema/table
func NewDefaultFileRouter ¶
func NewDefaultFileRouter(logger log.Logger) (FileRouter, error)
NewDefaultFileRouter creates a new file router with the default file route rules.
func NewFileRouter ¶
func NewFileRouter(cfg []*config.FileRouteRule, logger log.Logger) (FileRouter, error)
NewFileRouter creates a new file router with the rule.
type LoaderConfig ¶
type LoaderConfig struct { // SourceID is the unique identifier for the data source, it's used in DM only. // must be used together with Routes. SourceID string // SourceURL is the URL of the data source. SourceURL string // Routes is the routing rules for the tables, exclusive with FileRouters. // it's deprecated in lightning, but still used in DM. // when used this, DefaultFileRules must be true. Routes config.Routes // CharacterSet is the character set of the schema sql files. CharacterSet string // Filter is the filter for the tables, files related to filtered-out tables are not loaded. // must be specified, else all tables are filtered out, see config.GetDefaultFilter. Filter []string FileRouters []*config.FileRouteRule // CaseSensitive indicates whether Routes and Filter are case-sensitive. CaseSensitive bool // DefaultFileRules indicates whether to use the default file routing rules. // If it's true, the default file routing rules will be appended to the FileRouters. // a little confusing, but it's true only when FileRouters is empty. DefaultFileRules bool }
LoaderConfig is the configuration for constructing a MDLoader.
func NewLoaderCfg ¶
func NewLoaderCfg(cfg *config.Config) LoaderConfig
NewLoaderCfg creates loader config from lightning config.
type MDDatabaseMeta ¶
type MDDatabaseMeta struct { Name string SchemaFile FileInfo Tables []*MDTableMeta Views []*MDTableMeta // contains filtered or unexported fields }
MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader.
func NewMDDatabaseMeta ¶
func NewMDDatabaseMeta(charSet string) *MDDatabaseMeta
NewMDDatabaseMeta creates an Mydumper database meta with specified character set.
func (*MDDatabaseMeta) GetSchema ¶
func (m *MDDatabaseMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) string
GetSchema gets the schema SQL for a source database.
type MDLoader ¶
type MDLoader struct {
// contains filtered or unexported fields
}
MDLoader is for 'Mydumper File Loader', which loads the files in the data source and generates a set of metadata.
func NewLoader ¶
func NewLoader(ctx context.Context, cfg LoaderConfig, opts ...MDLoaderSetupOption) (*MDLoader, error)
NewLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas.
func NewLoaderWithStore ¶
func NewLoaderWithStore(ctx context.Context, cfg LoaderConfig, store storage.ExternalStorage, opts ...MDLoaderSetupOption) (*MDLoader, error)
NewLoaderWithStore constructs a MyDumper loader with the provided external storage that scanns the data source and constructs a set of metadatas.
func (*MDLoader) GetDatabases ¶
func (l *MDLoader) GetDatabases() []*MDDatabaseMeta
GetDatabases gets the list of scanned MDDatabaseMeta for the loader.
func (*MDLoader) GetStore ¶
func (l *MDLoader) GetStore() storage.ExternalStorage
GetStore gets the external storage used by the loader.
type MDLoaderSetupConfig ¶
type MDLoaderSetupConfig struct { // MaxScanFiles specifies the maximum number of files to scan. // If the value is <= 0, it means the number of data source files will be scanned as many as possible. MaxScanFiles int // ReturnPartialResultOnError specifies whether the currently scanned files are analyzed, // and return the partial result. ReturnPartialResultOnError bool // FileIter controls the file iteration policy when constructing a MDLoader. FileIter FileIterator }
MDLoaderSetupConfig stores the configs when setting up a MDLoader. This can control the behavior when constructing an MDLoader.
func DefaultMDLoaderSetupConfig ¶
func DefaultMDLoaderSetupConfig() *MDLoaderSetupConfig
DefaultMDLoaderSetupConfig generates a default MDLoaderSetupConfig.
type MDLoaderSetupOption ¶
type MDLoaderSetupOption func(cfg *MDLoaderSetupConfig)
MDLoaderSetupOption is the option type for setting up a MDLoaderSetupConfig.
func ReturnPartialResultOnError ¶
func ReturnPartialResultOnError(supportPartialResult bool) MDLoaderSetupOption
ReturnPartialResultOnError generates an option that controls whether return the partial scanned result on error when setting up a MDLoader.
func WithFileIterator ¶
func WithFileIterator(fileIter FileIterator) MDLoaderSetupOption
WithFileIterator generates an option that specifies the file iteration policy.
func WithMaxScanFiles ¶
func WithMaxScanFiles(maxScanFiles int) MDLoaderSetupOption
WithMaxScanFiles generates an option that limits the max scan files when setting up a MDLoader.
type MDTableMeta ¶
type MDTableMeta struct { DB string Name string SchemaFile FileInfo DataFiles []FileInfo TotalSize int64 IndexRatio float64 // default to true, and if we do precheck, this var is updated using data sampling result, so it's not accurate. IsRowOrdered bool // contains filtered or unexported fields }
MDTableMeta contains some parsed metadata for a table in the source by MyDumper Loader.
func NewMDTableMeta ¶
func NewMDTableMeta(charSet string) *MDTableMeta
NewMDTableMeta creates an Mydumper table meta with specified character set.
func (*MDTableMeta) GetSchema ¶
func (m *MDTableMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error)
GetSchema gets the table-creating SQL for a source table.
type ParquetParser ¶
type ParquetParser struct { Reader *preader.ParquetReader // contains filtered or unexported fields }
ParquetParser parses a parquet file for import It implements the Parser interface.
func NewParquetParser ¶
func NewParquetParser( ctx context.Context, store storage.ExternalStorage, r storage.ReadSeekCloser, path string, ) (*ParquetParser, error)
NewParquetParser generates a parquet parser.
func (*ParquetParser) Close ¶
func (pp *ParquetParser) Close() error
Close closes the parquet file of the parser. It implements the Parser interface.
func (*ParquetParser) Columns ¶
func (pp *ParquetParser) Columns() []string
Columns returns the _lower-case_ column names corresponding to values in the LastRow.
func (*ParquetParser) LastRow ¶
func (pp *ParquetParser) LastRow() Row
LastRow gets the last row parsed by the parser. It implements the Parser interface.
func (*ParquetParser) Pos ¶
func (pp *ParquetParser) Pos() (pos int64, rowID int64)
Pos returns the currently row number of the parquet file
func (*ParquetParser) ReadRow ¶
func (pp *ParquetParser) ReadRow() error
ReadRow reads a row in the parquet file by the parser. It implements the Parser interface.
func (*ParquetParser) RecycleRow ¶
func (*ParquetParser) RecycleRow(_ Row)
RecycleRow implements the Parser interface.
func (*ParquetParser) ScannedPos ¶
func (pp *ParquetParser) ScannedPos() (int64, error)
ScannedPos implements the Parser interface. For parquet it's parquet file's reader current position.
func (*ParquetParser) SetColumns ¶
func (*ParquetParser) SetColumns(_ []string)
SetColumns set restored column names to parser
func (*ParquetParser) SetLogger ¶
func (pp *ParquetParser) SetLogger(l log.Logger)
SetLogger sets the logger used in the parser. It implements the Parser interface.
func (*ParquetParser) SetPos ¶
func (pp *ParquetParser) SetPos(pos int64, rowID int64) error
SetPos sets the position in a parquet file. It implements the Parser interface.
func (*ParquetParser) SetRowID ¶
func (pp *ParquetParser) SetRowID(rowID int64)
SetRowID sets the rowID in a parquet file when we start a compressed file. It implements the Parser interface.
type Parser ¶
type Parser interface { // Pos returns means the position that parser have already handled. It's mainly used for checkpoint. // For normal files it's the file offset we handled. // For parquet files it's the row count we handled. // For compressed files it's the uncompressed file offset we handled. // TODO: replace pos with a new structure to specify position offset and rows offset Pos() (pos int64, rowID int64) SetPos(pos int64, rowID int64) error // ScannedPos always returns the current file reader pointer's location ScannedPos() (int64, error) Close() error ReadRow() error LastRow() Row RecycleRow(row Row) // Columns returns the _lower-case_ column names corresponding to values in // the LastRow. Columns() []string // SetColumns set restored column names to parser SetColumns([]string) SetLogger(log.Logger) SetRowID(rowID int64) }
Parser provides some methods to parse a source data file.
type PooledReader ¶
type PooledReader struct {
// contains filtered or unexported fields
}
PooledReader is a throttled reader wrapper, where Read() calls have an upper limit of concurrency imposed by the given worker pool.
func MakePooledReader ¶
func MakePooledReader(reader ReadSeekCloser, ioWorkers *worker.Pool) PooledReader
MakePooledReader constructs a new PooledReader.
func (PooledReader) Read ¶
func (pr PooledReader) Read(p []byte) (n int, err error)
Read implements io.Reader
type ReadSeekCloser ¶
ReadSeekCloser = Reader + Seeker + Closer
type RegexRouter ¶
type RegexRouter struct {
// contains filtered or unexported fields
}
RegexRouter is a `FileRouter` implement that apply specific regex pattern to filepath. if regex pattern match, then each extractors with capture the matched regexp pattern and set value to target field in `RouteResult`
func (*RegexRouter) Route ¶
func (r *RegexRouter) Route(path string) (*RouteResult, error)
Route routes a file path to a source file type.
type RouteResult ¶
type RouteResult struct { filter.Table Key string Compression Compression Type SourceType }
RouteResult contains the information for a file routing.
type Row ¶
type Row struct { // RowID is the row id of the row. // as objects of this struct is reused, this RowID is increased when reading // next row. RowID int64 Row []types.Datum Length int }
Row is the content of a row.
func (Row) MarshalLogArray ¶
func (row Row) MarshalLogArray(encoder zapcore.ArrayEncoder) error
MarshalLogArray implements the zapcore.ArrayMarshaler interface
type SchemaImporter ¶
type SchemaImporter struct {
// contains filtered or unexported fields
}
SchemaImporter is used to import schema from dump files.
func NewSchemaImporter ¶
func NewSchemaImporter(logger log.Logger, sqlMode mysql.SQLMode, db *sql.DB, store storage.ExternalStorage, concurrency int) *SchemaImporter
NewSchemaImporter creates a new SchemaImporter instance.
func (*SchemaImporter) Run ¶
func (si *SchemaImporter) Run(ctx context.Context, dbMetas []*MDDatabaseMeta) (err error)
Run imports all schemas from the given database metas.
type SourceFileMeta ¶
type SourceFileMeta struct { Path string Type SourceType Compression Compression SortKey string // FileSize is the size of the file in the storage. FileSize int64 // WARNING: variables below are not persistent ExtendData ExtendColumnData // RealSize is same as FileSize if the file is not compressed and not parquet. // If the file is compressed, RealSize is the estimated uncompressed size. // If the file is parquet, RealSize is the estimated data size after convert // to row oriented storage. RealSize int64 Rows int64 // only for parquet }
SourceFileMeta contains some analyzed metadata for a source file by MyDumper Loader.
type SourceType ¶
type SourceType int
SourceType specifies the source file types.
const ( // SourceTypeIgnore means this source file is ignored. SourceTypeIgnore SourceType = iota // SourceTypeSchemaSchema means this source file is a schema file for the DB. SourceTypeSchemaSchema // SourceTypeTableSchema means this source file is a schema file for the table. SourceTypeTableSchema // SourceTypeSQL means this source file is a SQL data file. SourceTypeSQL // SourceTypeCSV means this source file is a CSV data file. SourceTypeCSV // SourceTypeParquet means this source file is a parquet data file. SourceTypeParquet // SourceTypeViewSchema means this source file is a schema file for the view. SourceTypeViewSchema )
func (SourceType) String ¶
func (s SourceType) String() string
type StringReader ¶
StringReader is a wrapper around *strings.Reader with an additional Close() method
func NewStringReader ¶
func NewStringReader(s string) StringReader
NewStringReader constructs a new StringReader
type TableRegion ¶
type TableRegion struct { EngineID int32 DB string Table string FileMeta SourceFileMeta ExtendData ExtendColumnData Chunk Chunk }
TableRegion contains information for a table region during import.
func MakeSourceFileRegion ¶
func MakeSourceFileRegion( ctx context.Context, cfg *DataDivideConfig, fi FileInfo, ) ([]*TableRegion, []float64, error)
MakeSourceFileRegion create a new source file region.
func MakeTableRegions ¶
func MakeTableRegions( ctx context.Context, cfg *DataDivideConfig, ) ([]*TableRegion, error)
MakeTableRegions create a new table region. row-id range of returned TableRegion is increasing monotonically
func SplitLargeCSV ¶
func SplitLargeCSV( ctx context.Context, cfg *DataDivideConfig, dataFile FileInfo, ) (regions []*TableRegion, dataFileSizes []float64, err error)
SplitLargeCSV splits a large csv file into multiple regions, the size of each regions is specified by `config.MaxRegionSize`. Note: We split the file coarsely, thus the format of csv file is needed to be strict. e.g. - CSV file with header is invalid - a complete tuple split into multiple lines is invalid
func (*TableRegion) Offset ¶
func (reg *TableRegion) Offset() int64
Offset gets the offset in the file of this table region.
func (*TableRegion) RowIDMin ¶
func (reg *TableRegion) RowIDMin() int64
RowIDMin returns the minimum row ID of this table region.
func (*TableRegion) Rows ¶
func (reg *TableRegion) Rows() int64
Rows returns the row counts of this table region.
func (*TableRegion) Size ¶
func (reg *TableRegion) Size() int64
Size gets the size of this table region.