Documentation ¶
Index ¶
- Constants
- Variables
- func AllocateEngineIDs(filesRegions []*TableRegion, dataFileSizes []float64, batchSize float64, ...)
- func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, ...) ([]byte, error)
- func IndexAnyByte(s []byte, as *byteSet) int
- func OpenParquetReader(ctx context.Context, store storage.ExternalStorage, path string, size int64) (source.ParquetFile, error)
- func ReadParquetFileRowCount(ctx context.Context, store storage.ExternalStorage, r storage.ReadSeekCloser, ...) (int64, error)
- type CSVParser
- func (parser *CSVParser) Close() error
- func (parser *CSVParser) Columns() []string
- func (parser *CSVParser) LastRow() Row
- func (parser *CSVParser) Pos() (int64, int64)
- func (parser *CSVParser) ReadColumns() error
- func (parser *CSVParser) ReadRow() error
- func (parser *CSVParser) ReadUntilTerminator() (int64, error)
- func (parser *CSVParser) RecycleRow(row Row)
- func (parser *CSVParser) SetColumns(columns []string)
- func (parser *CSVParser) SetLogger(logger log.Logger)
- func (parser *CSVParser) SetPos(pos int64, rowID int64) error
- type CharsetConvertor
- type Chunk
- type ChunkParser
- func (parser *ChunkParser) Close() error
- func (parser *ChunkParser) Columns() []string
- func (parser *ChunkParser) LastRow() Row
- func (parser *ChunkParser) Pos() (int64, int64)
- func (parser *ChunkParser) ReadRow() error
- func (parser *ChunkParser) RecycleRow(row Row)
- func (parser *ChunkParser) SetColumns(columns []string)
- func (parser *ChunkParser) SetLogger(logger log.Logger)
- func (parser *ChunkParser) SetPos(pos int64, rowID int64) error
- type Compression
- type FileInfo
- type FileRouter
- type MDDatabaseMeta
- type MDLoader
- type MDTableMeta
- type ParquetParser
- func (pp *ParquetParser) Close() error
- func (pp *ParquetParser) Columns() []string
- func (pp *ParquetParser) LastRow() Row
- func (pp *ParquetParser) Pos() (pos int64, rowID int64)
- func (pp *ParquetParser) ReadRow() error
- func (pp *ParquetParser) RecycleRow(row Row)
- func (pp *ParquetParser) SetColumns(cols []string)
- func (pp *ParquetParser) SetLogger(l log.Logger)
- func (pp *ParquetParser) SetPos(pos int64, rowID int64) error
- type Parser
- type PooledReader
- type ReadSeekCloser
- type RegexRouter
- type RouteResult
- type Row
- type SourceFileMeta
- type SourceType
- type StringReader
- type TableRegion
- func MakeTableRegions(ctx context.Context, meta *MDTableMeta, columns int, cfg *config.Config, ...) ([]*TableRegion, error)
- func SplitLargeFile(ctx context.Context, meta *MDTableMeta, cfg *config.Config, dataFile FileInfo, ...) (prevRowIDMax int64, regions []*TableRegion, dataFileSizes []float64, err error)
Constants ¶
const ( SchemaSchema = "schema-schema" TableSchema = "table-schema" ViewSchema = "view-schema" TypeSQL = "sql" TypeCSV = "csv" TypeParquet = "parquet" TypeIgnore = "ignore" )
Variables ¶
var (
ErrInsertStatementNotFound = errors.New("insert statement not found")
)
Functions ¶
func AllocateEngineIDs ¶
func AllocateEngineIDs( filesRegions []*TableRegion, dataFileSizes []float64, batchSize float64, batchImportRatio float64, tableConcurrency float64, )
func ExportStatement ¶
func IndexAnyByte ¶
IndexAnyByte returns the byte index of the first occurrence in s of any of the byte points in chars. It returns -1 if there is no code point in common.
func OpenParquetReader ¶
func OpenParquetReader( ctx context.Context, store storage.ExternalStorage, path string, size int64, ) (source.ParquetFile, error)
func ReadParquetFileRowCount ¶
func ReadParquetFileRowCount( ctx context.Context, store storage.ExternalStorage, r storage.ReadSeekCloser, path string, ) (int64, error)
a special func to fetch parquet file row count fast.
Types ¶
type CSVParser ¶
type CSVParser struct {
// contains filtered or unexported fields
}
CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.
func NewCSVParser ¶
func NewCSVParser( cfg *config.CSVConfig, reader ReadSeekCloser, blockBufSize int64, ioWorkers *worker.Pool, shouldParseHeader bool, charsetConvertor *CharsetConvertor, ) (*CSVParser, error)
func (*CSVParser) LastRow ¶
func (parser *CSVParser) LastRow() Row
LastRow is the copy of the row parsed by the last call to ReadRow().
func (*CSVParser) ReadColumns ¶
func (*CSVParser) ReadUntilTerminator ¶
ReadUntilTerminator seeks the file until the terminator token is found, and returns the file offset beyond the terminator. This function is used in strict-format dividing a CSV file.
func (*CSVParser) RecycleRow ¶
func (parser *CSVParser) RecycleRow(row Row)
RecycleRow places the row object back into the allocation pool.
func (*CSVParser) SetColumns ¶
func (parser *CSVParser) SetColumns(columns []string)
type CharsetConvertor ¶
type CharsetConvertor struct {
// contains filtered or unexported fields
}
CharsetConvertor is used to convert a character set to utf8mb4 encoding. In Lightning, we mainly use it to do the GB18030/GBK -> UTF8MB4 conversion.
func NewCharsetConvertor ¶
func NewCharsetConvertor(dataCharacterSet, dataInvalidCharReplace string) (*CharsetConvertor, error)
NewCharsetConvertor creates a new CharsetConvertor.
func (*CharsetConvertor) Decode ¶
func (cc *CharsetConvertor) Decode(src string) (string, error)
Decode does the charset conversion work from sourceCharacterSet to utf8mb4. It will return a string as the conversion result whose length may be less or greater than the original string `src`. TODO: maybe using generic type later to make Decode/Encode accept both []byte and string.
type Chunk ¶
type Chunk struct { Offset int64 EndOffset int64 PrevRowIDMax int64 RowIDMax int64 Columns []string }
Chunk represents a portion of the data file.
type ChunkParser ¶
type ChunkParser struct {
// contains filtered or unexported fields
}
ChunkParser is a parser of the data files (the file containing only INSERT statements).
func NewChunkParser ¶
func NewChunkParser( sqlMode mysql.SQLMode, reader ReadSeekCloser, blockBufSize int64, ioWorkers *worker.Pool, ) *ChunkParser
NewChunkParser creates a new parser which can read chunks out of a file.
func (*ChunkParser) LastRow ¶
func (parser *ChunkParser) LastRow() Row
LastRow is the copy of the row parsed by the last call to ReadRow().
func (*ChunkParser) ReadRow ¶
func (parser *ChunkParser) ReadRow() error
ReadRow reads a row from the datafile.
func (*ChunkParser) RecycleRow ¶
func (parser *ChunkParser) RecycleRow(row Row)
RecycleRow places the row object back into the allocation pool.
func (*ChunkParser) SetColumns ¶
func (parser *ChunkParser) SetColumns(columns []string)
type Compression ¶
type Compression int
const ( CompressionNone Compression = iota CompressionGZ CompressionLZ4 CompressionZStd CompressionXZ )
type FileInfo ¶
type FileInfo struct { TableName filter.Table FileMeta SourceFileMeta }
type FileRouter ¶
type FileRouter interface { // Route apply rule to path. Return nil if path doesn't math route rule; // return error if path match route rule but the captured value for field is invalid Route(path string) (*RouteResult, error) }
// RouteRule is a rule to route file path to target schema/table
func NewFileRouter ¶
func NewFileRouter(cfg []*config.FileRouteRule) (FileRouter, error)
type MDDatabaseMeta ¶
type MDDatabaseMeta struct { Name string SchemaFile string Tables []*MDTableMeta Views []*MDTableMeta // contains filtered or unexported fields }
type MDLoader ¶
type MDLoader struct {
// contains filtered or unexported fields
}
Mydumper File Loader
func NewMyDumpLoader ¶
func (*MDLoader) GetDatabases ¶
func (l *MDLoader) GetDatabases() []*MDDatabaseMeta
func (*MDLoader) GetStore ¶
func (l *MDLoader) GetStore() storage.ExternalStorage
type MDTableMeta ¶
type MDTableMeta struct { DB string Name string SchemaFile FileInfo DataFiles []FileInfo TotalSize int64 IndexRatio float64 IsRowOrdered bool // contains filtered or unexported fields }
func (*MDTableMeta) GetSchema ¶
func (m *MDTableMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error)
type ParquetParser ¶
type ParquetParser struct { Reader *preader.ParquetReader // contains filtered or unexported fields }
func NewParquetParser ¶
func NewParquetParser( ctx context.Context, store storage.ExternalStorage, r storage.ReadSeekCloser, path string, ) (*ParquetParser, error)
func (*ParquetParser) Close ¶
func (pp *ParquetParser) Close() error
func (*ParquetParser) Columns ¶
func (pp *ParquetParser) Columns() []string
Columns returns the _lower-case_ column names corresponding to values in the LastRow.
func (*ParquetParser) LastRow ¶
func (pp *ParquetParser) LastRow() Row
func (*ParquetParser) Pos ¶
func (pp *ParquetParser) Pos() (pos int64, rowID int64)
Pos returns the currently row number of the parquet file
func (*ParquetParser) ReadRow ¶
func (pp *ParquetParser) ReadRow() error
func (*ParquetParser) RecycleRow ¶
func (pp *ParquetParser) RecycleRow(row Row)
func (*ParquetParser) SetColumns ¶
func (pp *ParquetParser) SetColumns(cols []string)
SetColumns set restored column names to parser
func (*ParquetParser) SetLogger ¶
func (pp *ParquetParser) SetLogger(l log.Logger)
type Parser ¶
type Parser interface { Pos() (pos int64, rowID int64) SetPos(pos int64, rowID int64) error Close() error ReadRow() error LastRow() Row RecycleRow(row Row) // Columns returns the _lower-case_ column names corresponding to values in // the LastRow. Columns() []string // SetColumns set restored column names to parser SetColumns([]string) SetLogger(log.Logger) }
type PooledReader ¶
type PooledReader struct {
// contains filtered or unexported fields
}
PooledReader is a throttled reader wrapper, where Read() calls have an upper limit of concurrency imposed by the given worker pool.
func MakePooledReader ¶
func MakePooledReader(reader ReadSeekCloser, ioWorkers *worker.Pool) PooledReader
MakePooledReader constructs a new PooledReader.
func (PooledReader) Read ¶
func (pr PooledReader) Read(p []byte) (n int, err error)
Read implements io.Reader
type ReadSeekCloser ¶
ReadSeekCloser = Reader + Seeker + Closer
type RegexRouter ¶
type RegexRouter struct {
// contains filtered or unexported fields
}
`RegexRouter` is a `FileRouter` implement that apply specific regex pattern to filepath. if regex pattern match, then each extractors with capture the matched regexp pattern and set value to target field in `RouteResult`
func (*RegexRouter) Route ¶
func (r *RegexRouter) Route(path string) (*RouteResult, error)
type RouteResult ¶
type RouteResult struct { filter.Table Key string Compression Compression Type SourceType }
type Row ¶
Row is the content of a row.
func (Row) MarshalLogArray ¶
func (row Row) MarshalLogArray(encoder zapcore.ArrayEncoder) error
MarshalLogArray implements the zapcore.ArrayMarshaler interface
type SourceFileMeta ¶
type SourceFileMeta struct { Path string Type SourceType Compression Compression SortKey string FileSize int64 }
type SourceType ¶
type SourceType int
const ( SourceTypeIgnore SourceType = iota SourceTypeSchemaSchema SourceTypeTableSchema SourceTypeSQL SourceTypeCSV SourceTypeParquet SourceTypeViewSchema )
func (SourceType) String ¶
func (s SourceType) String() string
type StringReader ¶
StringReader is a wrapper around *strings.Reader with an additional Close() method
func NewStringReader ¶
func NewStringReader(s string) StringReader
NewStringReader constructs a new StringReader
type TableRegion ¶
type TableRegion struct { EngineID int32 DB string Table string FileMeta SourceFileMeta Chunk Chunk }
func MakeTableRegions ¶
func MakeTableRegions( ctx context.Context, meta *MDTableMeta, columns int, cfg *config.Config, ioWorkers *worker.Pool, store storage.ExternalStorage, ) ([]*TableRegion, error)
func SplitLargeFile ¶
func SplitLargeFile( ctx context.Context, meta *MDTableMeta, cfg *config.Config, dataFile FileInfo, divisor int64, prevRowIdxMax int64, ioWorker *worker.Pool, store storage.ExternalStorage, ) (prevRowIDMax int64, regions []*TableRegion, dataFileSizes []float64, err error)
SplitLargeFile splits a large csv file into multiple regions, the size of each regions is specified by `config.MaxRegionSize`. Note: We split the file coarsely, thus the format of csv file is needed to be strict. e.g. - CSV file with header is invalid - a complete tuple split into multiple lines is invalid
func (*TableRegion) Offset ¶
func (reg *TableRegion) Offset() int64
func (*TableRegion) RowIDMin ¶
func (reg *TableRegion) RowIDMin() int64
func (*TableRegion) Rows ¶
func (reg *TableRegion) Rows() int64
func (*TableRegion) Size ¶
func (reg *TableRegion) Size() int64