mydump

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2023 License: Apache-2.0, Apache-2.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// SchemaSchema is the source type value for schema file for DB.
	SchemaSchema = "schema-schema"
	// TableSchema is the source type value for schema file for table.
	TableSchema = "table-schema"
	// ViewSchema is the source type value for schema file for view.
	ViewSchema = "view-schema"
	// TypeSQL is the source type value for sql data file.
	TypeSQL = "sql"
	// TypeCSV is the source type value for csv data file.
	TypeCSV = "csv"
	// TypeParquet is the source type value for parquet data file.
	TypeParquet = "parquet"
	// TypeIgnore is the source type value for a ignored data file.
	TypeIgnore = "ignore"
)
View Source
const (

	// TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency
	// It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files.
	TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold
)

Variables

View Source
var (
	// ErrInsertStatementNotFound is the error that cannot find the insert statement.
	ErrInsertStatementNotFound = errors.New("insert statement not found")
)
View Source
var (

	// LargestEntryLimit is the max size for reading file to buf
	LargestEntryLimit int
)

Functions

func AllocateEngineIDs

func AllocateEngineIDs(
	filesRegions []*TableRegion,
	dataFileSizes []float64,
	batchSize float64,
	batchImportRatio float64,
	tableConcurrency float64,
)

AllocateEngineIDs allocates the table engine IDs.

func ExportStatement

func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, characterSet string) ([]byte, error)

ExportStatement exports the SQL statement in the schema file.

func IndexAnyByte

func IndexAnyByte(s []byte, as *byteSet) int

IndexAnyByte returns the byte index of the first occurrence in s of any of the byte points in chars. It returns -1 if there is no code point in common.

func OpenParquetReader

func OpenParquetReader(
	ctx context.Context,
	store storage.ExternalStorage,
	path string,
	size int64,
) (source.ParquetFile, error)

OpenParquetReader opens a parquet file and returns a handle that can at least read the file.

func ReadParquetFileRowCount

func ReadParquetFileRowCount(
	ctx context.Context,
	store storage.ExternalStorage,
	r storage.ReadSeekCloser,
	path string,
) (int64, error)

ReadParquetFileRowCount reads the parquet file row count. It is a special func to fetch parquet file row count fast.

func ReadUntil

func ReadUntil(parser Parser, pos int64) error

ReadUntil parses the entire file and splits it into continuous chunks of size >= minSize.

func ToStorageCompressType

func ToStorageCompressType(compression Compression) (storage.CompressType, error)

ToStorageCompressType converts Compression to storage.CompressType.

Types

type CSVParser

type CSVParser struct {
	// contains filtered or unexported fields
}

CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.

func NewCSVParser

func NewCSVParser(
	ctx context.Context,
	cfg *config.CSVConfig,
	reader ReadSeekCloser,
	blockBufSize int64,
	ioWorkers *worker.Pool,
	shouldParseHeader bool,
	charsetConvertor *CharsetConvertor,
) (*CSVParser, error)

NewCSVParser creates a CSV parser.

func (*CSVParser) Close

func (parser *CSVParser) Close() error

func (*CSVParser) Columns

func (parser *CSVParser) Columns() []string

func (*CSVParser) LastRow

func (parser *CSVParser) LastRow() Row

LastRow is the copy of the row parsed by the last call to ReadRow().

func (*CSVParser) Pos

func (parser *CSVParser) Pos() (pos int64, lastRowID int64)

Pos returns the current file offset. Attention: for compressed sql/csv files, pos is the position in uncompressed files

func (*CSVParser) ReadColumns

func (parser *CSVParser) ReadColumns() error

ReadColumns reads the columns of this CSV file.

func (*CSVParser) ReadRow

func (parser *CSVParser) ReadRow() error

ReadRow reads a row from the datafile.

func (*CSVParser) ReadUntilTerminator

func (parser *CSVParser) ReadUntilTerminator() (int64, error)

ReadUntilTerminator seeks the file until the terminator token is found, and returns the file offset beyond the terminator. This function is used in strict-format dividing a CSV file.

func (*CSVParser) RealPos

func (parser *CSVParser) RealPos() (int64, error)

RealPos gets the read position of current reader.

func (*CSVParser) RecycleRow

func (parser *CSVParser) RecycleRow(row Row)

RecycleRow places the row object back into the allocation pool.

func (*CSVParser) SetColumns

func (parser *CSVParser) SetColumns(columns []string)

func (*CSVParser) SetLogger

func (parser *CSVParser) SetLogger(logger log.Logger)

func (*CSVParser) SetPos

func (parser *CSVParser) SetPos(pos int64, rowID int64) error

SetPos changes the reported position and row ID.

func (*CSVParser) SetRowID

func (parser *CSVParser) SetRowID(rowID int64)

SetRowID changes the reported row ID when we firstly read compressed files.

type CharsetConvertor

type CharsetConvertor struct {
	// contains filtered or unexported fields
}

CharsetConvertor is used to convert a character set to utf8mb4 encoding. In Lightning, we mainly use it to do the GB18030/GBK -> UTF8MB4 conversion.

func NewCharsetConvertor

func NewCharsetConvertor(dataCharacterSet, dataInvalidCharReplace string) (*CharsetConvertor, error)

NewCharsetConvertor creates a new CharsetConvertor.

func (*CharsetConvertor) Decode

func (cc *CharsetConvertor) Decode(src string) (string, error)

Decode does the charset conversion work from sourceCharacterSet to utf8mb4. It will return a string as the conversion result whose length may be less or greater than the original string `src`. TODO: maybe using generic type later to make Decode/Encode accept both []byte and string.

func (*CharsetConvertor) Encode

func (cc *CharsetConvertor) Encode(src string) (string, error)

Encode will encode the data from utf8mb4 to sourceCharacterSet.

type Chunk

type Chunk struct {
	Offset       int64
	EndOffset    int64
	RealOffset   int64
	PrevRowIDMax int64
	RowIDMax     int64
	Columns      []string
}

Chunk represents a portion of the data file.

func ReadChunks

func ReadChunks(parser Parser, minSize int64) ([]Chunk, error)

ReadChunks parses the entire file and splits it into continuous chunks of size >= minSize.

type ChunkParser

type ChunkParser struct {
	// contains filtered or unexported fields
}

ChunkParser is a parser of the data files (the file containing only INSERT statements).

func NewChunkParser

func NewChunkParser(
	ctx context.Context,
	sqlMode mysql.SQLMode,
	reader ReadSeekCloser,
	blockBufSize int64,
	ioWorkers *worker.Pool,
) *ChunkParser

NewChunkParser creates a new parser which can read chunks out of a file.

func (*ChunkParser) Close

func (parser *ChunkParser) Close() error

func (*ChunkParser) Columns

func (parser *ChunkParser) Columns() []string

func (*ChunkParser) LastRow

func (parser *ChunkParser) LastRow() Row

LastRow is the copy of the row parsed by the last call to ReadRow().

func (*ChunkParser) Pos

func (parser *ChunkParser) Pos() (pos int64, lastRowID int64)

Pos returns the current file offset. Attention: for compressed sql/csv files, pos is the position in uncompressed files

func (*ChunkParser) ReadRow

func (parser *ChunkParser) ReadRow() error

ReadRow reads a row from the datafile.

func (*ChunkParser) RealPos

func (parser *ChunkParser) RealPos() (int64, error)

RealPos gets the read position of current reader.

func (*ChunkParser) RecycleRow

func (parser *ChunkParser) RecycleRow(row Row)

RecycleRow places the row object back into the allocation pool.

func (*ChunkParser) SetColumns

func (parser *ChunkParser) SetColumns(columns []string)

func (*ChunkParser) SetLogger

func (parser *ChunkParser) SetLogger(logger log.Logger)

func (*ChunkParser) SetPos

func (parser *ChunkParser) SetPos(pos int64, rowID int64) error

SetPos changes the reported position and row ID.

func (*ChunkParser) SetRowID

func (parser *ChunkParser) SetRowID(rowID int64)

SetRowID changes the reported row ID when we firstly read compressed files.

type Compression

type Compression int

Compression specifies the compression type.

const (
	// CompressionNone is the compression type that with no compression.
	CompressionNone Compression = iota
	// CompressionGZ is the compression type that uses GZ algorithm.
	CompressionGZ
	// CompressionLZ4 is the compression type that uses LZ4 algorithm.
	CompressionLZ4
	// CompressionZStd is the compression type that uses ZStd algorithm.
	CompressionZStd
	// CompressionXZ is the compression type that uses XZ algorithm.
	CompressionXZ
	// CompressionLZO is the compression type that uses LZO algorithm.
	CompressionLZO
	// CompressionSnappy is the compression type that uses Snappy algorithm.
	CompressionSnappy
)

type ExtendColumnData

type ExtendColumnData struct {
	Columns []string
	Values  []string
}

ExtendColumnData contains the extended column names and values information for a table.

type FileHandler

type FileHandler func(ctx context.Context, path string, size int64) error

FileHandler is the interface to handle the file give the path and size. It is mainly used in the `FileIterator` as parameters.

type FileInfo

type FileInfo struct {
	TableName filter.Table
	FileMeta  SourceFileMeta
}

FileInfo contains the information for a data file in a table.

type FileIterator

type FileIterator interface {
	IterateFiles(ctx context.Context, hdl FileHandler) error
}

FileIterator is the interface to iterate files in a data source. Use this interface to customize the file iteration policy.

type FileRouter

type FileRouter interface {
	// Route apply rule to path. Return nil if path doesn't match route rule;
	// return error if path match route rule but the captured value for field is invalid
	Route(path string) (*RouteResult, error)
}

FileRouter provides some operations to apply a rule to route file path to target schema/table

func NewDefaultFileRouter

func NewDefaultFileRouter(logger log.Logger) (FileRouter, error)

NewDefaultFileRouter creates a new file router with the default file route rules.

func NewFileRouter

func NewFileRouter(cfg []*config.FileRouteRule, logger log.Logger) (FileRouter, error)

NewFileRouter creates a new file router with the rule.

type MDDatabaseMeta

type MDDatabaseMeta struct {
	Name       string
	SchemaFile FileInfo
	Tables     []*MDTableMeta
	Views      []*MDTableMeta
	// contains filtered or unexported fields
}

MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader.

func NewMDDatabaseMeta

func NewMDDatabaseMeta(charSet string) *MDDatabaseMeta

NewMDDatabaseMeta creates an Mydumper database meta with specified character set.

func (*MDDatabaseMeta) GetSchema

func (m *MDDatabaseMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) string

GetSchema gets the schema SQL for a source database.

type MDLoader

type MDLoader struct {
	// contains filtered or unexported fields
}

MDLoader is for 'Mydumper File Loader', which loads the files in the data source and generates a set of metadata.

func NewMyDumpLoader

func NewMyDumpLoader(ctx context.Context, cfg *config.Config, opts ...MDLoaderSetupOption) (*MDLoader, error)

NewMyDumpLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas.

func NewMyDumpLoaderWithStore

func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config, store storage.ExternalStorage, opts ...MDLoaderSetupOption) (*MDLoader, error)

NewMyDumpLoaderWithStore constructs a MyDumper loader with the provided external storage that scanns the data source and constructs a set of metadatas.

func (*MDLoader) GetDatabases

func (l *MDLoader) GetDatabases() []*MDDatabaseMeta

GetDatabases gets the list of scanned MDDatabaseMeta for the loader.

func (*MDLoader) GetStore

func (l *MDLoader) GetStore() storage.ExternalStorage

GetStore gets the external storage used by the loader.

type MDLoaderSetupConfig

type MDLoaderSetupConfig struct {
	// MaxScanFiles specifies the maximum number of files to scan.
	// If the value is <= 0, it means the number of data source files will be scanned as many as possible.
	MaxScanFiles int
	// ReturnPartialResultOnError specifies whether the currently scanned files are analyzed,
	// and return the partial result.
	ReturnPartialResultOnError bool
	// FileIter controls the file iteration policy when constructing a MDLoader.
	FileIter FileIterator
}

MDLoaderSetupConfig stores the configs when setting up a MDLoader. This can control the behavior when constructing an MDLoader.

func DefaultMDLoaderSetupConfig

func DefaultMDLoaderSetupConfig() *MDLoaderSetupConfig

DefaultMDLoaderSetupConfig generates a default MDLoaderSetupConfig.

type MDLoaderSetupOption

type MDLoaderSetupOption func(cfg *MDLoaderSetupConfig)

MDLoaderSetupOption is the option type for setting up a MDLoaderSetupConfig.

func ReturnPartialResultOnError

func ReturnPartialResultOnError(supportPartialResult bool) MDLoaderSetupOption

ReturnPartialResultOnError generates an option that controls whether return the partial scanned result on error when setting up a MDLoader.

func WithFileIterator

func WithFileIterator(fileIter FileIterator) MDLoaderSetupOption

WithFileIterator generates an option that specifies the file iteration policy.

func WithMaxScanFiles

func WithMaxScanFiles(maxScanFiles int) MDLoaderSetupOption

WithMaxScanFiles generates an option that limits the max scan files when setting up a MDLoader.

type MDTableMeta

type MDTableMeta struct {
	DB         string
	Name       string
	SchemaFile FileInfo
	DataFiles  []FileInfo

	TotalSize    int64
	IndexRatio   float64
	IsRowOrdered bool
	// contains filtered or unexported fields
}

MDTableMeta contains some parsed metadata for a table in the source by MyDumper Loader.

func NewMDTableMeta

func NewMDTableMeta(charSet string) *MDTableMeta

NewMDTableMeta creates an Mydumper table meta with specified character set.

func (*MDTableMeta) GetSchema

func (m *MDTableMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error)

GetSchema gets the table-creating SQL for a source table.

type ParquetParser

type ParquetParser struct {
	Reader *preader.ParquetReader
	// contains filtered or unexported fields
}

ParquetParser parses a parquet file for import It implements the Parser interface.

func NewParquetParser

func NewParquetParser(
	ctx context.Context,
	store storage.ExternalStorage,
	r storage.ReadSeekCloser,
	path string,
) (*ParquetParser, error)

NewParquetParser generates a parquet parser.

func (*ParquetParser) Close

func (pp *ParquetParser) Close() error

Close closes the parquet file of the parser. It implements the Parser interface.

func (*ParquetParser) Columns

func (pp *ParquetParser) Columns() []string

Columns returns the _lower-case_ column names corresponding to values in the LastRow.

func (*ParquetParser) LastRow

func (pp *ParquetParser) LastRow() Row

LastRow gets the last row parsed by the parser. It implements the Parser interface.

func (*ParquetParser) Pos

func (pp *ParquetParser) Pos() (pos int64, rowID int64)

Pos returns the currently row number of the parquet file

func (*ParquetParser) ReadRow

func (pp *ParquetParser) ReadRow() error

ReadRow reads a row in the parquet file by the parser. It implements the Parser interface.

func (*ParquetParser) RealPos

func (pp *ParquetParser) RealPos() (int64, error)

RealPos implements the Parser interface. For parquet it's equal to Pos().

func (*ParquetParser) RecycleRow

func (*ParquetParser) RecycleRow(_ Row)

RecycleRow implements the Parser interface.

func (*ParquetParser) SetColumns

func (*ParquetParser) SetColumns(_ []string)

SetColumns set restored column names to parser

func (*ParquetParser) SetLogger

func (pp *ParquetParser) SetLogger(l log.Logger)

SetLogger sets the logger used in the parser. It implements the Parser interface.

func (*ParquetParser) SetPos

func (pp *ParquetParser) SetPos(pos int64, rowID int64) error

SetPos sets the position in a parquet file. It implements the Parser interface.

func (*ParquetParser) SetRowID

func (pp *ParquetParser) SetRowID(rowID int64)

SetRowID sets the rowID in a parquet file when we start a compressed file. It implements the Parser interface.

type Parser

type Parser interface {
	Pos() (pos int64, rowID int64)
	SetPos(pos int64, rowID int64) error
	RealPos() (int64, error)
	Close() error
	ReadRow() error
	LastRow() Row
	RecycleRow(row Row)

	// Columns returns the _lower-case_ column names corresponding to values in
	// the LastRow.
	Columns() []string
	// SetColumns set restored column names to parser
	SetColumns([]string)

	SetLogger(log.Logger)

	SetRowID(rowID int64)
}

Parser provides some methods to parse a source data file.

type PooledReader

type PooledReader struct {
	// contains filtered or unexported fields
}

PooledReader is a throttled reader wrapper, where Read() calls have an upper limit of concurrency imposed by the given worker pool.

func MakePooledReader

func MakePooledReader(reader ReadSeekCloser, ioWorkers *worker.Pool) PooledReader

MakePooledReader constructs a new PooledReader.

func (PooledReader) Close

func (pr PooledReader) Close() error

Close implements io.Closer

func (PooledReader) Read

func (pr PooledReader) Read(p []byte) (n int, err error)

Read implements io.Reader

func (PooledReader) ReadFull

func (pr PooledReader) ReadFull(buf []byte) (n int, err error)

ReadFull is same as `io.ReadFull(pr)` with less worker recycling

func (PooledReader) Seek

func (pr PooledReader) Seek(offset int64, whence int) (int64, error)

Seek implements io.Seeker

type ReadSeekCloser

type ReadSeekCloser interface {
	io.Reader
	io.Seeker
	io.Closer
}

ReadSeekCloser = Reader + Seeker + Closer

type RegexRouter

type RegexRouter struct {
	// contains filtered or unexported fields
}

RegexRouter is a `FileRouter` implement that apply specific regex pattern to filepath. if regex pattern match, then each extractors with capture the matched regexp pattern and set value to target field in `RouteResult`

func (*RegexRouter) Route

func (r *RegexRouter) Route(path string) (*RouteResult, error)

Route routes a file path to a source file type.

type RouteResult

type RouteResult struct {
	filter.Table
	Key         string
	Compression Compression
	Type        SourceType
}

RouteResult contains the information for a file routing.

type Row

type Row struct {
	RowID  int64
	Row    []types.Datum
	Length int
}

Row is the content of a row.

func (Row) MarshalLogArray

func (row Row) MarshalLogArray(encoder zapcore.ArrayEncoder) error

MarshalLogArray implements the zapcore.ArrayMarshaler interface

type SourceFileMeta

type SourceFileMeta struct {
	Path        string
	Type        SourceType
	Compression Compression
	SortKey     string
	FileSize    int64
	ExtendData  ExtendColumnData
}

SourceFileMeta contains some analyzed metadata for a source file by MyDumper Loader.

type SourceType

type SourceType int

SourceType specifies the source file types.

const (
	// SourceTypeIgnore means this source file is ignored.
	SourceTypeIgnore SourceType = iota
	// SourceTypeSchemaSchema means this source file is a schema file for the DB.
	SourceTypeSchemaSchema
	// SourceTypeTableSchema means this source file is a schema file for the table.
	SourceTypeTableSchema
	// SourceTypeSQL means this source file is a SQL data file.
	SourceTypeSQL
	// SourceTypeCSV means this source file is a CSV data file.
	SourceTypeCSV
	// SourceTypeParquet means this source file is a parquet data file.
	SourceTypeParquet
	// SourceTypeViewSchema means this source file is a schema file for the view.
	SourceTypeViewSchema
)

func (SourceType) String

func (s SourceType) String() string

type StringReader

type StringReader struct{ *strings.Reader }

StringReader is a wrapper around *strings.Reader with an additional Close() method

func NewStringReader

func NewStringReader(s string) StringReader

NewStringReader constructs a new StringReader

func (StringReader) Close

func (StringReader) Close() error

Close implements io.Closer

type TableRegion

type TableRegion struct {
	EngineID int32

	DB         string
	Table      string
	FileMeta   SourceFileMeta
	ExtendData ExtendColumnData

	Chunk Chunk
}

TableRegion contains information for a table region during import.

func MakeSourceFileRegion

func MakeSourceFileRegion(
	ctx context.Context,
	meta *MDTableMeta,
	fi FileInfo,
	columns int,
	cfg *config.Config,
	ioWorkers *worker.Pool,
	store storage.ExternalStorage,
) ([]*TableRegion, []float64, error)

MakeSourceFileRegion create a new source file region.

func MakeTableRegions

func MakeTableRegions(
	ctx context.Context,
	meta *MDTableMeta,
	columns int,
	cfg *config.Config,
	ioWorkers *worker.Pool,
	store storage.ExternalStorage,
) ([]*TableRegion, error)

MakeTableRegions create a new table region.

func SplitLargeFile

func SplitLargeFile(
	ctx context.Context,
	meta *MDTableMeta,
	cfg *config.Config,
	dataFile FileInfo,
	divisor int64,
	prevRowIdxMax int64,
	ioWorker *worker.Pool,
	store storage.ExternalStorage,
) (prevRowIDMax int64, regions []*TableRegion, dataFileSizes []float64, err error)

SplitLargeFile splits a large csv file into multiple regions, the size of each regions is specified by `config.MaxRegionSize`. Note: We split the file coarsely, thus the format of csv file is needed to be strict. e.g. - CSV file with header is invalid - a complete tuple split into multiple lines is invalid

func (*TableRegion) Offset

func (reg *TableRegion) Offset() int64

Offset gets the offset in the file of this table region.

func (*TableRegion) RowIDMin

func (reg *TableRegion) RowIDMin() int64

RowIDMin returns the minimum row ID of this table region.

func (*TableRegion) Rows

func (reg *TableRegion) Rows() int64

Rows returns the row counts of this table region.

func (*TableRegion) Size

func (reg *TableRegion) Size() int64

Size gets the size of this table region.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL