Documentation ¶
Index ¶
- Constants
- Variables
- func AllocateEngineIDs(filesRegions []*TableRegion, dataFileSizes []float64, batchSize float64, ...)
- func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, ...) ([]byte, error)
- func IndexAnyByte(s []byte, as *byteSet) int
- func OpenParquetReader(ctx context.Context, store storage.ExternalStorage, path string, size int64) (source.ParquetFile, error)
- func ReadParquetFileRowCount(ctx context.Context, store storage.ExternalStorage, r storage.ReadSeekCloser, ...) (int64, error)
- func ReadUntil(parser Parser, pos int64) error
- func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)
- func ToStorageCompressType(compression Compression) (storage.CompressType, error)
- type CSVParser
- func (parser *CSVParser) Close() error
- func (parser *CSVParser) Columns() []string
- func (parser *CSVParser) LastRow() Row
- func (parser *CSVParser) Pos() (pos int64, lastRowID int64)
- func (parser *CSVParser) ReadColumns() error
- func (parser *CSVParser) ReadRow() error
- func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error)
- func (parser *CSVParser) RealPos() (int64, error)
- func (parser *CSVParser) RecycleRow(row Row)
- func (parser *CSVParser) SetColumns(columns []string)
- func (parser *CSVParser) SetLogger(logger log.Logger)
- func (parser *CSVParser) SetPos(pos int64, rowID int64) error
- func (parser *CSVParser) SetRowID(rowID int64)
- type CharsetConvertor
- type Chunk
- type ChunkParser
- func (parser *ChunkParser) Close() error
- func (parser *ChunkParser) Columns() []string
- func (parser *ChunkParser) LastRow() Row
- func (parser *ChunkParser) Pos() (pos int64, lastRowID int64)
- func (parser *ChunkParser) ReadRow() error
- func (parser *ChunkParser) RealPos() (int64, error)
- func (parser *ChunkParser) RecycleRow(row Row)
- func (parser *ChunkParser) SetColumns(columns []string)
- func (parser *ChunkParser) SetLogger(logger log.Logger)
- func (parser *ChunkParser) SetPos(pos int64, rowID int64) error
- func (parser *ChunkParser) SetRowID(rowID int64)
- type Compression
- type ExtendColumnData
- type FileHandler
- type FileInfo
- type FileIterator
- type FileRouter
- type MDDatabaseMeta
- type MDLoader
- type MDLoaderSetupConfig
- type MDLoaderSetupOption
- type MDTableMeta
- type ParquetParser
- func (pp *ParquetParser) Close() error
- func (pp *ParquetParser) Columns() []string
- func (pp *ParquetParser) LastRow() Row
- func (pp *ParquetParser) Pos() (pos int64, rowID int64)
- func (pp *ParquetParser) ReadRow() error
- func (pp *ParquetParser) RealPos() (int64, error)
- func (*ParquetParser) RecycleRow(_ Row)
- func (*ParquetParser) SetColumns(_ []string)
- func (pp *ParquetParser) SetLogger(l log.Logger)
- func (pp *ParquetParser) SetPos(pos int64, rowID int64) error
- func (pp *ParquetParser) SetRowID(rowID int64)
- type Parser
- type PooledReader
- type ReadSeekCloser
- type RegexRouter
- type RouteResult
- type Row
- type SourceFileMeta
- type SourceType
- type StringReader
- type TableRegion
- func MakeSourceFileRegion(ctx context.Context, meta *MDTableMeta, fi FileInfo, columns int, ...) ([]*TableRegion, []float64, error)
- func MakeTableRegions(ctx context.Context, meta *MDTableMeta, columns int, cfg *config.Config, ...) ([]*TableRegion, error)
- func SplitLargeFile(ctx context.Context, meta *MDTableMeta, cfg *config.Config, dataFile FileInfo, ...) (prevRowIDMax int64, regions []*TableRegion, dataFileSizes []float64, err error)
Constants ¶
const ( // TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency // It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files. TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold // CompressSizeFactor is used to adjust compressed data size CompressSizeFactor = 5 )
const ( // SchemaSchema is the source type value for schema file for DB. SchemaSchema = "schema-schema" // TableSchema is the source type value for schema file for table. TableSchema = "table-schema" // ViewSchema is the source type value for schema file for view. ViewSchema = "view-schema" // TypeSQL is the source type value for sql data file. TypeSQL = "sql" // TypeCSV is the source type value for csv data file. TypeCSV = "csv" // TypeParquet is the source type value for parquet data file. TypeParquet = "parquet" // TypeIgnore is the source type value for a ignored data file. TypeIgnore = "ignore" )
Variables ¶
var ( // ErrInsertStatementNotFound is the error that cannot find the insert statement. ErrInsertStatementNotFound = errors.New("insert statement not found") )
var ( // LargestEntryLimit is the max size for reading file to buf LargestEntryLimit int )
Functions ¶
func AllocateEngineIDs ¶
func AllocateEngineIDs( filesRegions []*TableRegion, dataFileSizes []float64, batchSize float64, batchImportRatio float64, tableConcurrency float64, )
AllocateEngineIDs allocates the table engine IDs.
func ExportStatement ¶
func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, characterSet string) ([]byte, error)
ExportStatement exports the SQL statement in the schema file.
func IndexAnyByte ¶
IndexAnyByte returns the byte index of the first occurrence in s of any of the byte points in chars. It returns -1 if there is no code point in common.
func OpenParquetReader ¶
func OpenParquetReader( ctx context.Context, store storage.ExternalStorage, path string, size int64, ) (source.ParquetFile, error)
OpenParquetReader opens a parquet file and returns a handle that can at least read the file.
func ReadParquetFileRowCount ¶
func ReadParquetFileRowCount( ctx context.Context, store storage.ExternalStorage, r storage.ReadSeekCloser, path string, ) (int64, error)
ReadParquetFileRowCount reads the parquet file row count. It is a special func to fetch parquet file row count fast.
func ReadUntil ¶
ReadUntil parses the entire file and splits it into continuous chunks of size >= minSize.
func SampleFileCompressRatio ¶
func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)
SampleFileCompressRatio samples the compress ratio of the compressed file.
func ToStorageCompressType ¶
func ToStorageCompressType(compression Compression) (storage.CompressType, error)
ToStorageCompressType converts Compression to storage.CompressType.
Types ¶
type CSVParser ¶
type CSVParser struct {
// contains filtered or unexported fields
}
CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.
func NewCSVParser ¶
func NewCSVParser( ctx context.Context, cfg *config.CSVConfig, reader ReadSeekCloser, blockBufSize int64, ioWorkers *worker.Pool, shouldParseHeader bool, charsetConvertor *CharsetConvertor, ) (*CSVParser, error)
NewCSVParser creates a CSV parser.
func (*CSVParser) LastRow ¶
func (parser *CSVParser) LastRow() Row
LastRow is the copy of the row parsed by the last call to ReadRow().
func (*CSVParser) Pos ¶
Pos returns the current file offset. Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (*CSVParser) ReadColumns ¶
ReadColumns reads the columns of this CSV file.
func (*CSVParser) ReadUntilTerminator ¶
ReadUntilTerminator seeks the file until the terminator token is found, and returns - the content before terminator - the file offset beyond the terminator - error Note that the terminator string pattern may be the content of a field, which means it's inside quotes. Caller should make sure to handle this case.
func (*CSVParser) RecycleRow ¶
func (parser *CSVParser) RecycleRow(row Row)
RecycleRow places the row object back into the allocation pool.
func (*CSVParser) SetColumns ¶
func (parser *CSVParser) SetColumns(columns []string)
type CharsetConvertor ¶
type CharsetConvertor struct {
// contains filtered or unexported fields
}
CharsetConvertor is used to convert a character set to utf8mb4 encoding. In Lightning, we mainly use it to do the GB18030/GBK -> UTF8MB4 conversion.
func NewCharsetConvertor ¶
func NewCharsetConvertor(dataCharacterSet, dataInvalidCharReplace string) (*CharsetConvertor, error)
NewCharsetConvertor creates a new CharsetConvertor.
func (*CharsetConvertor) Decode ¶
func (cc *CharsetConvertor) Decode(src string) (string, error)
Decode does the charset conversion work from sourceCharacterSet to utf8mb4. It will return a string as the conversion result whose length may be less or greater than the original string `src`. TODO: maybe using generic type later to make Decode/Encode accept both []byte and string.
type Chunk ¶
type Chunk struct { Offset int64 EndOffset int64 RealOffset int64 PrevRowIDMax int64 RowIDMax int64 Columns []string }
Chunk represents a portion of the data file.
type ChunkParser ¶
type ChunkParser struct {
// contains filtered or unexported fields
}
ChunkParser is a parser of the data files (the file containing only INSERT statements).
func NewChunkParser ¶
func NewChunkParser( ctx context.Context, sqlMode mysql.SQLMode, reader ReadSeekCloser, blockBufSize int64, ioWorkers *worker.Pool, ) *ChunkParser
NewChunkParser creates a new parser which can read chunks out of a file.
func (*ChunkParser) LastRow ¶
func (parser *ChunkParser) LastRow() Row
LastRow is the copy of the row parsed by the last call to ReadRow().
func (*ChunkParser) Pos ¶
Pos returns the current file offset. Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (*ChunkParser) ReadRow ¶
func (parser *ChunkParser) ReadRow() error
ReadRow reads a row from the datafile.
func (*ChunkParser) RecycleRow ¶
func (parser *ChunkParser) RecycleRow(row Row)
RecycleRow places the row object back into the allocation pool.
func (*ChunkParser) SetColumns ¶
func (parser *ChunkParser) SetColumns(columns []string)
type Compression ¶
type Compression int
Compression specifies the compression type.
const ( // CompressionNone is the compression type that with no compression. CompressionNone Compression = iota // CompressionGZ is the compression type that uses GZ algorithm. CompressionGZ // CompressionLZ4 is the compression type that uses LZ4 algorithm. CompressionLZ4 // CompressionZStd is the compression type that uses ZStd algorithm. CompressionZStd // CompressionXZ is the compression type that uses XZ algorithm. CompressionXZ // CompressionLZO is the compression type that uses LZO algorithm. CompressionLZO // CompressionSnappy is the compression type that uses Snappy algorithm. CompressionSnappy )
type ExtendColumnData ¶
ExtendColumnData contains the extended column names and values information for a table.
type FileHandler ¶
FileHandler is the interface to handle the file give the path and size. It is mainly used in the `FileIterator` as parameters.
type FileInfo ¶
type FileInfo struct { TableName filter.Table FileMeta SourceFileMeta }
FileInfo contains the information for a data file in a table.
type FileIterator ¶
type FileIterator interface {
IterateFiles(ctx context.Context, hdl FileHandler) error
}
FileIterator is the interface to iterate files in a data source. Use this interface to customize the file iteration policy.
type FileRouter ¶
type FileRouter interface { // Route apply rule to path. Return nil if path doesn't match route rule; // return error if path match route rule but the captured value for field is invalid Route(path string) (*RouteResult, error) }
FileRouter provides some operations to apply a rule to route file path to target schema/table
func NewDefaultFileRouter ¶
func NewDefaultFileRouter(logger log.Logger) (FileRouter, error)
NewDefaultFileRouter creates a new file router with the default file route rules.
func NewFileRouter ¶
func NewFileRouter(cfg []*config.FileRouteRule, logger log.Logger) (FileRouter, error)
NewFileRouter creates a new file router with the rule.
type MDDatabaseMeta ¶
type MDDatabaseMeta struct { Name string SchemaFile FileInfo Tables []*MDTableMeta Views []*MDTableMeta // contains filtered or unexported fields }
MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader.
func NewMDDatabaseMeta ¶
func NewMDDatabaseMeta(charSet string) *MDDatabaseMeta
NewMDDatabaseMeta creates an Mydumper database meta with specified character set.
func (*MDDatabaseMeta) GetSchema ¶
func (m *MDDatabaseMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) string
GetSchema gets the schema SQL for a source database.
type MDLoader ¶
type MDLoader struct {
// contains filtered or unexported fields
}
MDLoader is for 'Mydumper File Loader', which loads the files in the data source and generates a set of metadata.
func NewMyDumpLoader ¶
func NewMyDumpLoader(ctx context.Context, cfg *config.Config, opts ...MDLoaderSetupOption) (*MDLoader, error)
NewMyDumpLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas.
func NewMyDumpLoaderWithStore ¶
func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config, store storage.ExternalStorage, opts ...MDLoaderSetupOption) (*MDLoader, error)
NewMyDumpLoaderWithStore constructs a MyDumper loader with the provided external storage that scanns the data source and constructs a set of metadatas.
func (*MDLoader) GetDatabases ¶
func (l *MDLoader) GetDatabases() []*MDDatabaseMeta
GetDatabases gets the list of scanned MDDatabaseMeta for the loader.
func (*MDLoader) GetStore ¶
func (l *MDLoader) GetStore() storage.ExternalStorage
GetStore gets the external storage used by the loader.
type MDLoaderSetupConfig ¶
type MDLoaderSetupConfig struct { // MaxScanFiles specifies the maximum number of files to scan. // If the value is <= 0, it means the number of data source files will be scanned as many as possible. MaxScanFiles int // ReturnPartialResultOnError specifies whether the currently scanned files are analyzed, // and return the partial result. ReturnPartialResultOnError bool // FileIter controls the file iteration policy when constructing a MDLoader. FileIter FileIterator }
MDLoaderSetupConfig stores the configs when setting up a MDLoader. This can control the behavior when constructing an MDLoader.
func DefaultMDLoaderSetupConfig ¶
func DefaultMDLoaderSetupConfig() *MDLoaderSetupConfig
DefaultMDLoaderSetupConfig generates a default MDLoaderSetupConfig.
type MDLoaderSetupOption ¶
type MDLoaderSetupOption func(cfg *MDLoaderSetupConfig)
MDLoaderSetupOption is the option type for setting up a MDLoaderSetupConfig.
func ReturnPartialResultOnError ¶
func ReturnPartialResultOnError(supportPartialResult bool) MDLoaderSetupOption
ReturnPartialResultOnError generates an option that controls whether return the partial scanned result on error when setting up a MDLoader.
func WithFileIterator ¶
func WithFileIterator(fileIter FileIterator) MDLoaderSetupOption
WithFileIterator generates an option that specifies the file iteration policy.
func WithMaxScanFiles ¶
func WithMaxScanFiles(maxScanFiles int) MDLoaderSetupOption
WithMaxScanFiles generates an option that limits the max scan files when setting up a MDLoader.
type MDTableMeta ¶
type MDTableMeta struct { DB string Name string SchemaFile FileInfo DataFiles []FileInfo TotalSize int64 IndexRatio float64 IsRowOrdered bool // contains filtered or unexported fields }
MDTableMeta contains some parsed metadata for a table in the source by MyDumper Loader.
func NewMDTableMeta ¶
func NewMDTableMeta(charSet string) *MDTableMeta
NewMDTableMeta creates an Mydumper table meta with specified character set.
func (*MDTableMeta) GetSchema ¶
func (m *MDTableMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error)
GetSchema gets the table-creating SQL for a source table.
type ParquetParser ¶
type ParquetParser struct { Reader *preader.ParquetReader // contains filtered or unexported fields }
ParquetParser parses a parquet file for import It implements the Parser interface.
func NewParquetParser ¶
func NewParquetParser( ctx context.Context, store storage.ExternalStorage, r storage.ReadSeekCloser, path string, ) (*ParquetParser, error)
NewParquetParser generates a parquet parser.
func (*ParquetParser) Close ¶
func (pp *ParquetParser) Close() error
Close closes the parquet file of the parser. It implements the Parser interface.
func (*ParquetParser) Columns ¶
func (pp *ParquetParser) Columns() []string
Columns returns the _lower-case_ column names corresponding to values in the LastRow.
func (*ParquetParser) LastRow ¶
func (pp *ParquetParser) LastRow() Row
LastRow gets the last row parsed by the parser. It implements the Parser interface.
func (*ParquetParser) Pos ¶
func (pp *ParquetParser) Pos() (pos int64, rowID int64)
Pos returns the currently row number of the parquet file
func (*ParquetParser) ReadRow ¶
func (pp *ParquetParser) ReadRow() error
ReadRow reads a row in the parquet file by the parser. It implements the Parser interface.
func (*ParquetParser) RealPos ¶
func (pp *ParquetParser) RealPos() (int64, error)
RealPos implements the Parser interface. For parquet it's equal to Pos().
func (*ParquetParser) RecycleRow ¶
func (*ParquetParser) RecycleRow(_ Row)
RecycleRow implements the Parser interface.
func (*ParquetParser) SetColumns ¶
func (*ParquetParser) SetColumns(_ []string)
SetColumns set restored column names to parser
func (*ParquetParser) SetLogger ¶
func (pp *ParquetParser) SetLogger(l log.Logger)
SetLogger sets the logger used in the parser. It implements the Parser interface.
func (*ParquetParser) SetPos ¶
func (pp *ParquetParser) SetPos(pos int64, rowID int64) error
SetPos sets the position in a parquet file. It implements the Parser interface.
func (*ParquetParser) SetRowID ¶
func (pp *ParquetParser) SetRowID(rowID int64)
SetRowID sets the rowID in a parquet file when we start a compressed file. It implements the Parser interface.
type Parser ¶
type Parser interface { Pos() (pos int64, rowID int64) SetPos(pos int64, rowID int64) error RealPos() (int64, error) Close() error ReadRow() error LastRow() Row RecycleRow(row Row) // Columns returns the _lower-case_ column names corresponding to values in // the LastRow. Columns() []string // SetColumns set restored column names to parser SetColumns([]string) SetLogger(log.Logger) SetRowID(rowID int64) }
Parser provides some methods to parse a source data file.
type PooledReader ¶
type PooledReader struct {
// contains filtered or unexported fields
}
PooledReader is a throttled reader wrapper, where Read() calls have an upper limit of concurrency imposed by the given worker pool.
func MakePooledReader ¶
func MakePooledReader(reader ReadSeekCloser, ioWorkers *worker.Pool) PooledReader
MakePooledReader constructs a new PooledReader.
func (PooledReader) Read ¶
func (pr PooledReader) Read(p []byte) (n int, err error)
Read implements io.Reader
type ReadSeekCloser ¶
ReadSeekCloser = Reader + Seeker + Closer
type RegexRouter ¶
type RegexRouter struct {
// contains filtered or unexported fields
}
RegexRouter is a `FileRouter` implement that apply specific regex pattern to filepath. if regex pattern match, then each extractors with capture the matched regexp pattern and set value to target field in `RouteResult`
func (*RegexRouter) Route ¶
func (r *RegexRouter) Route(path string) (*RouteResult, error)
Route routes a file path to a source file type.
type RouteResult ¶
type RouteResult struct { filter.Table Key string Compression Compression Type SourceType }
RouteResult contains the information for a file routing.
type Row ¶
Row is the content of a row.
func (Row) MarshalLogArray ¶
func (row Row) MarshalLogArray(encoder zapcore.ArrayEncoder) error
MarshalLogArray implements the zapcore.ArrayMarshaler interface
type SourceFileMeta ¶
type SourceFileMeta struct { Path string Type SourceType Compression Compression SortKey string FileSize int64 // WARNING: variables below are not persistent ExtendData ExtendColumnData RealSize int64 }
SourceFileMeta contains some analyzed metadata for a source file by MyDumper Loader.
type SourceType ¶
type SourceType int
SourceType specifies the source file types.
const ( // SourceTypeIgnore means this source file is ignored. SourceTypeIgnore SourceType = iota // SourceTypeSchemaSchema means this source file is a schema file for the DB. SourceTypeSchemaSchema // SourceTypeTableSchema means this source file is a schema file for the table. SourceTypeTableSchema // SourceTypeSQL means this source file is a SQL data file. SourceTypeSQL // SourceTypeCSV means this source file is a CSV data file. SourceTypeCSV // SourceTypeParquet means this source file is a parquet data file. SourceTypeParquet // SourceTypeViewSchema means this source file is a schema file for the view. SourceTypeViewSchema )
func (SourceType) String ¶
func (s SourceType) String() string
type StringReader ¶
StringReader is a wrapper around *strings.Reader with an additional Close() method
func NewStringReader ¶
func NewStringReader(s string) StringReader
NewStringReader constructs a new StringReader
type TableRegion ¶
type TableRegion struct { EngineID int32 DB string Table string FileMeta SourceFileMeta ExtendData ExtendColumnData Chunk Chunk }
TableRegion contains information for a table region during import.
func MakeSourceFileRegion ¶
func MakeSourceFileRegion( ctx context.Context, meta *MDTableMeta, fi FileInfo, columns int, cfg *config.Config, ioWorkers *worker.Pool, store storage.ExternalStorage, ) ([]*TableRegion, []float64, error)
MakeSourceFileRegion create a new source file region.
func MakeTableRegions ¶
func MakeTableRegions( ctx context.Context, meta *MDTableMeta, columns int, cfg *config.Config, ioWorkers *worker.Pool, store storage.ExternalStorage, ) ([]*TableRegion, error)
MakeTableRegions create a new table region.
func SplitLargeFile ¶
func SplitLargeFile( ctx context.Context, meta *MDTableMeta, cfg *config.Config, dataFile FileInfo, divisor int64, prevRowIdxMax int64, ioWorker *worker.Pool, store storage.ExternalStorage, ) (prevRowIDMax int64, regions []*TableRegion, dataFileSizes []float64, err error)
SplitLargeFile splits a large csv file into multiple regions, the size of each regions is specified by `config.MaxRegionSize`. Note: We split the file coarsely, thus the format of csv file is needed to be strict. e.g. - CSV file with header is invalid - a complete tuple split into multiple lines is invalid
func (*TableRegion) Offset ¶
func (reg *TableRegion) Offset() int64
Offset gets the offset in the file of this table region.
func (*TableRegion) RowIDMin ¶
func (reg *TableRegion) RowIDMin() int64
RowIDMin returns the minimum row ID of this table region.
func (*TableRegion) Rows ¶
func (reg *TableRegion) Rows() int64
Rows returns the row counts of this table region.
func (*TableRegion) Size ¶
func (reg *TableRegion) Size() int64
Size gets the size of this table region.