mydump

package
v1.1.0-beta.0...-1a455d0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 53 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency
	// It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files.
	TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold
	// CompressSizeFactor is used to adjust compressed data size
	CompressSizeFactor = 5
)
View Source
const (
	// SchemaSchema is the source type value for schema file for DB.
	SchemaSchema = "schema-schema"
	// TableSchema is the source type value for schema file for table.
	TableSchema = "table-schema"
	// ViewSchema is the source type value for schema file for view.
	ViewSchema = "view-schema"
	// TypeSQL is the source type value for sql data file.
	TypeSQL = "sql"
	// TypeCSV is the source type value for csv data file.
	TypeCSV = "csv"
	// TypeParquet is the source type value for parquet data file.
	TypeParquet = "parquet"
	// TypeIgnore is the source type value for a ignored data file.
	TypeIgnore = "ignore"
)

Variables

View Source
var (
	// ErrInsertStatementNotFound is the error that cannot find the insert statement.
	ErrInsertStatementNotFound = errors.New("insert statement not found")
)
View Source
var (

	// LargestEntryLimit is the max size for reading file to buf
	LargestEntryLimit int
)

Functions

func AllocateEngineIDs

func AllocateEngineIDs(
	filesRegions []*TableRegion,
	dataFileSizes []float64,
	batchSize float64,
	batchImportRatio float64,
	engineConcurrency float64,
)

AllocateEngineIDs allocates the table engine IDs.

func CalculateBatchSize

func CalculateBatchSize(mydumperBatchSize float64, isRowOrdered bool, totalSize float64) float64

CalculateBatchSize calculates batch size according to row order and file size.

func ExportStatement

func ExportStatement(ctx context.Context, store storage.ExternalStorage,
	sqlFile FileInfo, characterSet string) ([]byte, error)

ExportStatement exports the SQL statement in the schema file.

func IndexAnyByte

func IndexAnyByte(s []byte, as *byteSet) int

IndexAnyByte returns the byte index of the first occurrence in s of any of the byte points in chars. It returns -1 if there is no code point in common.

func OpenParquetReader

func OpenParquetReader(
	ctx context.Context,
	store storage.ExternalStorage,
	path string,
	size int64,
) (source.ParquetFile, error)

OpenParquetReader opens a parquet file and returns a handle that can at least read the file.

func OpenReader

func OpenReader(
	ctx context.Context,
	fileMeta *SourceFileMeta,
	store storage.ExternalStorage,
	decompressCfg storage.DecompressConfig,
) (reader storage.ReadSeekCloser, err error)

OpenReader opens a reader for the given file and storage.

func ReadParquetFileRowCountByFile

func ReadParquetFileRowCountByFile(
	ctx context.Context,
	store storage.ExternalStorage,
	fileMeta SourceFileMeta,
) (int64, error)

ReadParquetFileRowCountByFile reads the parquet file row count through fileMeta.

func ReadUntil

func ReadUntil(parser Parser, pos int64) error

ReadUntil parses the entire file and splits it into continuous chunks of size >= minSize.

func SampleFileCompressRatio

func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)

SampleFileCompressRatio samples the compress ratio of the compressed file.

func SampleParquetRowSize

func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error)

SampleParquetRowSize samples row size of the parquet file.

func ToStorageCompressType

func ToStorageCompressType(compression Compression) (storage.CompressType, error)

ToStorageCompressType converts Compression to storage.CompressType.

Types

type CSVParser

type CSVParser struct {
	// contains filtered or unexported fields
}

CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.

func NewCSVParser

func NewCSVParser(
	ctx context.Context,
	cfg *config.CSVConfig,
	reader ReadSeekCloser,
	blockBufSize int64,
	ioWorkers *worker.Pool,
	shouldParseHeader bool,
	charsetConvertor *CharsetConvertor,
) (*CSVParser, error)

NewCSVParser creates a CSV parser. The ownership of the reader is transferred to the parser.

func (*CSVParser) Close

func (parser *CSVParser) Close() error

func (*CSVParser) Columns

func (parser *CSVParser) Columns() []string

func (*CSVParser) LastRow

func (parser *CSVParser) LastRow() Row

LastRow is the copy of the row parsed by the last call to ReadRow().

func (*CSVParser) Pos

func (parser *CSVParser) Pos() (pos int64, lastRowID int64)

Pos returns the current file offset. Attention: for compressed sql/csv files, pos is the position in uncompressed files

func (*CSVParser) ReadColumns

func (parser *CSVParser) ReadColumns() error

ReadColumns reads the columns of this CSV file.

func (*CSVParser) ReadRow

func (parser *CSVParser) ReadRow() error

ReadRow reads a row from the datafile.

func (*CSVParser) ReadUntilTerminator

func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error)

ReadUntilTerminator seeks the file until the terminator token is found, and returns - the content with terminator, or the content read before meet error - the file offset beyond the terminator, or the offset when meet error - error Note that the terminator string pattern may be the content of a field, which means it's inside quotes. Caller should make sure to handle this case.

func (*CSVParser) RecycleRow

func (parser *CSVParser) RecycleRow(row Row)

RecycleRow places the row object back into the allocation pool.

func (*CSVParser) ScannedPos

func (parser *CSVParser) ScannedPos() (int64, error)

ScannedPos gets the read position of current reader. this always returns the position of the underlying file, either compressed or not.

func (*CSVParser) SetColumns

func (parser *CSVParser) SetColumns(columns []string)

func (*CSVParser) SetLogger

func (parser *CSVParser) SetLogger(logger log.Logger)

func (*CSVParser) SetPos

func (parser *CSVParser) SetPos(pos int64, rowID int64) error

SetPos changes the reported position and row ID.

func (*CSVParser) SetRowID

func (parser *CSVParser) SetRowID(rowID int64)

SetRowID changes the reported row ID when we firstly read compressed files.

type CharsetConvertor

type CharsetConvertor struct {
	// contains filtered or unexported fields
}

CharsetConvertor is used to convert a character set to utf8mb4 encoding. In Lightning, we mainly use it to do the GB18030/GBK -> UTF8MB4 conversion.

func NewCharsetConvertor

func NewCharsetConvertor(dataCharacterSet, dataInvalidCharReplace string) (*CharsetConvertor, error)

NewCharsetConvertor creates a new CharsetConvertor.

func (*CharsetConvertor) Decode

func (cc *CharsetConvertor) Decode(src string) (string, error)

Decode does the charset conversion work from sourceCharacterSet to utf8mb4. It will return a string as the conversion result whose length may be less or greater than the original string `src`. TODO: maybe using generic type later to make Decode/Encode accept both []byte and string.

func (*CharsetConvertor) Encode

func (cc *CharsetConvertor) Encode(src string) (string, error)

Encode will encode the data from utf8mb4 to sourceCharacterSet.

type Chunk

type Chunk struct {
	Offset int64
	// for parquet file, it's the total row count
	// see makeParquetFileRegion
	EndOffset  int64
	RealOffset int64
	// we estimate row-id range of the chunk using file-size divided by some factor(depends on column count)
	// after estimation, we will rebase them for all chunks of this table in this instance,
	// then it's rebased again based on all instances of parallel import.
	// allocatable row-id is in range (PrevRowIDMax, RowIDMax].
	// PrevRowIDMax will be increased during local encoding
	PrevRowIDMax int64
	RowIDMax     int64
	// only assigned when using strict-mode for CSV files and the file contains header
	Columns []string
}

Chunk represents a portion of the data file.

func ReadChunks

func ReadChunks(parser Parser, minSize int64) ([]Chunk, error)

ReadChunks parses the entire file and splits it into continuous chunks of size >= minSize.

type ChunkParser

type ChunkParser struct {
	// contains filtered or unexported fields
}

ChunkParser is a parser of the data files (the file containing only INSERT statements).

func NewChunkParser

func NewChunkParser(
	ctx context.Context,
	sqlMode mysql.SQLMode,
	reader ReadSeekCloser,
	blockBufSize int64,
	ioWorkers *worker.Pool,
) *ChunkParser

NewChunkParser creates a new parser which can read chunks out of a file.

func (*ChunkParser) Close

func (parser *ChunkParser) Close() error

func (*ChunkParser) Columns

func (parser *ChunkParser) Columns() []string

func (*ChunkParser) LastRow

func (parser *ChunkParser) LastRow() Row

LastRow is the copy of the row parsed by the last call to ReadRow().

func (*ChunkParser) Pos

func (parser *ChunkParser) Pos() (pos int64, lastRowID int64)

Pos returns the current file offset. Attention: for compressed sql/csv files, pos is the position in uncompressed files

func (*ChunkParser) ReadRow

func (parser *ChunkParser) ReadRow() error

ReadRow reads a row from the datafile.

func (*ChunkParser) RecycleRow

func (parser *ChunkParser) RecycleRow(row Row)

RecycleRow places the row object back into the allocation pool.

func (*ChunkParser) ScannedPos

func (parser *ChunkParser) ScannedPos() (int64, error)

ScannedPos gets the read position of current reader. this always returns the position of the underlying file, either compressed or not.

func (*ChunkParser) SetColumns

func (parser *ChunkParser) SetColumns(columns []string)

func (*ChunkParser) SetLogger

func (parser *ChunkParser) SetLogger(logger log.Logger)

func (*ChunkParser) SetPos

func (parser *ChunkParser) SetPos(pos int64, rowID int64) error

SetPos changes the reported position and row ID.

func (*ChunkParser) SetRowID

func (parser *ChunkParser) SetRowID(rowID int64)

SetRowID changes the reported row ID when we firstly read compressed files.

type Compression

type Compression int

Compression specifies the compression type.

const (
	// CompressionNone is the compression type that with no compression.
	CompressionNone Compression = iota
	// CompressionGZ is the compression type that uses GZ algorithm.
	CompressionGZ
	// CompressionLZ4 is the compression type that uses LZ4 algorithm.
	CompressionLZ4
	// CompressionZStd is the compression type that uses ZStd algorithm.
	CompressionZStd
	// CompressionXZ is the compression type that uses XZ algorithm.
	CompressionXZ
	// CompressionLZO is the compression type that uses LZO algorithm.
	CompressionLZO
	// CompressionSnappy is the compression type that uses Snappy algorithm.
	CompressionSnappy
)

func ParseCompressionOnFileExtension

func ParseCompressionOnFileExtension(filename string) Compression

ParseCompressionOnFileExtension parses the compression type from the file extension.

type DataDivideConfig

type DataDivideConfig struct {
	ColumnCnt int
	// limit of engine size, we have a complex algorithm to calculate the best engine size, see AllocateEngineIDs.
	EngineDataSize int64
	// max chunk size(inside this file we named it region which collides with TiKV region)
	MaxChunkSize int64
	// number of concurrent workers to dive data files
	Concurrency int
	// number of engine runs concurrently, need this to calculate the best engine size for pipelining local-sort and import.
	// todo: remove those 2 params, the algorithm seems useless, since we can import concurrently now, the foundation
	// assumption of the algorithm is broken.
	EngineConcurrency int
	// used together with prev param. it is 0.75 nearly all the time, see Mydumper.BatchImportRatio.
	// this variable is defined as speed-write-to-TiKV / speed-to-do-local-sort
	BatchImportRatio float64
	// used to split large CSV files, to limit concurrency of data read/seek operations
	// when nil, no limit.
	IOWorkers *worker.Pool
	// we need it read row-count for parquet, and to read line terminator to split large CSV files
	Store     storage.ExternalStorage
	TableMeta *MDTableMeta

	// only used when split large CSV files.
	StrictFormat           bool
	DataCharacterSet       string
	DataInvalidCharReplace string
	ReadBlockSize          int64
	CSV                    config.CSVConfig
}

DataDivideConfig config used to divide data files into chunks/engines(regions in this context).

func NewDataDivideConfig

func NewDataDivideConfig(cfg *config.Config,
	columns int,
	ioWorkers *worker.Pool,
	store storage.ExternalStorage,
	meta *MDTableMeta,
) *DataDivideConfig

NewDataDivideConfig creates a new DataDivideConfig from lightning cfg.

type ExtendColumnData

type ExtendColumnData struct {
	Columns []string
	Values  []string
}

ExtendColumnData contains the extended column names and values information for a table.

type FileHandler

type FileHandler func(ctx context.Context, path string, size int64) error

FileHandler is the interface to handle the file give the path and size. It is mainly used in the `FileIterator` as parameters.

type FileInfo

type FileInfo struct {
	TableName filter.Table
	FileMeta  SourceFileMeta
}

FileInfo contains the information for a data file in a table.

type FileIterator

type FileIterator interface {
	IterateFiles(ctx context.Context, hdl FileHandler) error
}

FileIterator is the interface to iterate files in a data source. Use this interface to customize the file iteration policy.

type FileRouter

type FileRouter interface {
	// Route apply rule to path. Return nil if path doesn't match route rule;
	// return error if path match route rule but the captured value for field is invalid
	Route(path string) (*RouteResult, error)
}

FileRouter provides some operations to apply a rule to route file path to target schema/table

func NewDefaultFileRouter

func NewDefaultFileRouter(logger log.Logger) (FileRouter, error)

NewDefaultFileRouter creates a new file router with the default file route rules.

func NewFileRouter

func NewFileRouter(cfg []*config.FileRouteRule, logger log.Logger) (FileRouter, error)

NewFileRouter creates a new file router with the rule.

type LoaderConfig

type LoaderConfig struct {
	// SourceID is the unique identifier for the data source, it's used in DM only.
	// must be used together with Routes.
	SourceID string
	// SourceURL is the URL of the data source.
	SourceURL string
	// Routes is the routing rules for the tables, exclusive with FileRouters.
	// it's deprecated in lightning, but still used in DM.
	// when used this, DefaultFileRules must be true.
	Routes config.Routes
	// CharacterSet is the character set of the schema sql files.
	CharacterSet string
	// Filter is the filter for the tables, files related to filtered-out tables are not loaded.
	// must be specified, else all tables are filtered out, see config.GetDefaultFilter.
	Filter      []string
	FileRouters []*config.FileRouteRule
	// CaseSensitive indicates whether Routes and Filter are case-sensitive.
	CaseSensitive bool
	// DefaultFileRules indicates whether to use the default file routing rules.
	// If it's true, the default file routing rules will be appended to the FileRouters.
	// a little confusing, but it's true only when FileRouters is empty.
	DefaultFileRules bool
}

LoaderConfig is the configuration for constructing a MDLoader.

func NewLoaderCfg

func NewLoaderCfg(cfg *config.Config) LoaderConfig

NewLoaderCfg creates loader config from lightning config.

type MDDatabaseMeta

type MDDatabaseMeta struct {
	Name       string
	SchemaFile FileInfo
	Tables     []*MDTableMeta
	Views      []*MDTableMeta
	// contains filtered or unexported fields
}

MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader.

func NewMDDatabaseMeta

func NewMDDatabaseMeta(charSet string) *MDDatabaseMeta

NewMDDatabaseMeta creates an Mydumper database meta with specified character set.

func (*MDDatabaseMeta) GetSchema

func (m *MDDatabaseMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) string

GetSchema gets the schema SQL for a source database.

type MDLoader

type MDLoader struct {
	// contains filtered or unexported fields
}

MDLoader is for 'Mydumper File Loader', which loads the files in the data source and generates a set of metadata.

func NewLoader

func NewLoader(ctx context.Context, cfg LoaderConfig, opts ...MDLoaderSetupOption) (*MDLoader, error)

NewLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas.

func NewLoaderWithStore

func NewLoaderWithStore(ctx context.Context, cfg LoaderConfig,
	store storage.ExternalStorage, opts ...MDLoaderSetupOption) (*MDLoader, error)

NewLoaderWithStore constructs a MyDumper loader with the provided external storage that scanns the data source and constructs a set of metadatas.

func (*MDLoader) GetDatabases

func (l *MDLoader) GetDatabases() []*MDDatabaseMeta

GetDatabases gets the list of scanned MDDatabaseMeta for the loader.

func (*MDLoader) GetStore

func (l *MDLoader) GetStore() storage.ExternalStorage

GetStore gets the external storage used by the loader.

type MDLoaderSetupConfig

type MDLoaderSetupConfig struct {
	// MaxScanFiles specifies the maximum number of files to scan.
	// If the value is <= 0, it means the number of data source files will be scanned as many as possible.
	MaxScanFiles int
	// ReturnPartialResultOnError specifies whether the currently scanned files are analyzed,
	// and return the partial result.
	ReturnPartialResultOnError bool
	// FileIter controls the file iteration policy when constructing a MDLoader.
	FileIter FileIterator
}

MDLoaderSetupConfig stores the configs when setting up a MDLoader. This can control the behavior when constructing an MDLoader.

func DefaultMDLoaderSetupConfig

func DefaultMDLoaderSetupConfig() *MDLoaderSetupConfig

DefaultMDLoaderSetupConfig generates a default MDLoaderSetupConfig.

type MDLoaderSetupOption

type MDLoaderSetupOption func(cfg *MDLoaderSetupConfig)

MDLoaderSetupOption is the option type for setting up a MDLoaderSetupConfig.

func ReturnPartialResultOnError

func ReturnPartialResultOnError(supportPartialResult bool) MDLoaderSetupOption

ReturnPartialResultOnError generates an option that controls whether return the partial scanned result on error when setting up a MDLoader.

func WithFileIterator

func WithFileIterator(fileIter FileIterator) MDLoaderSetupOption

WithFileIterator generates an option that specifies the file iteration policy.

func WithMaxScanFiles

func WithMaxScanFiles(maxScanFiles int) MDLoaderSetupOption

WithMaxScanFiles generates an option that limits the max scan files when setting up a MDLoader.

type MDTableMeta

type MDTableMeta struct {
	DB         string
	Name       string
	SchemaFile FileInfo
	DataFiles  []FileInfo

	TotalSize  int64
	IndexRatio float64
	// default to true, and if we do precheck, this var is updated using data sampling result, so it's not accurate.
	IsRowOrdered bool
	// contains filtered or unexported fields
}

MDTableMeta contains some parsed metadata for a table in the source by MyDumper Loader.

func NewMDTableMeta

func NewMDTableMeta(charSet string) *MDTableMeta

NewMDTableMeta creates an Mydumper table meta with specified character set.

func (*MDTableMeta) GetSchema

func (m *MDTableMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error)

GetSchema gets the table-creating SQL for a source table.

type ParquetParser

type ParquetParser struct {
	Reader *preader.ParquetReader
	// contains filtered or unexported fields
}

ParquetParser parses a parquet file for import It implements the Parser interface.

func NewParquetParser

func NewParquetParser(
	ctx context.Context,
	store storage.ExternalStorage,
	r storage.ReadSeekCloser,
	path string,
) (*ParquetParser, error)

NewParquetParser generates a parquet parser.

func (*ParquetParser) Close

func (pp *ParquetParser) Close() error

Close closes the parquet file of the parser. It implements the Parser interface.

func (*ParquetParser) Columns

func (pp *ParquetParser) Columns() []string

Columns returns the _lower-case_ column names corresponding to values in the LastRow.

func (*ParquetParser) LastRow

func (pp *ParquetParser) LastRow() Row

LastRow gets the last row parsed by the parser. It implements the Parser interface.

func (*ParquetParser) Pos

func (pp *ParquetParser) Pos() (pos int64, rowID int64)

Pos returns the currently row number of the parquet file

func (*ParquetParser) ReadRow

func (pp *ParquetParser) ReadRow() error

ReadRow reads a row in the parquet file by the parser. It implements the Parser interface.

func (*ParquetParser) RecycleRow

func (*ParquetParser) RecycleRow(_ Row)

RecycleRow implements the Parser interface.

func (*ParquetParser) ScannedPos

func (pp *ParquetParser) ScannedPos() (int64, error)

ScannedPos implements the Parser interface. For parquet it's parquet file's reader current position.

func (*ParquetParser) SetColumns

func (*ParquetParser) SetColumns(_ []string)

SetColumns set restored column names to parser

func (*ParquetParser) SetLogger

func (pp *ParquetParser) SetLogger(l log.Logger)

SetLogger sets the logger used in the parser. It implements the Parser interface.

func (*ParquetParser) SetPos

func (pp *ParquetParser) SetPos(pos int64, rowID int64) error

SetPos sets the position in a parquet file. It implements the Parser interface.

func (*ParquetParser) SetRowID

func (pp *ParquetParser) SetRowID(rowID int64)

SetRowID sets the rowID in a parquet file when we start a compressed file. It implements the Parser interface.

type Parser

type Parser interface {
	// Pos returns means the position that parser have already handled. It's mainly used for checkpoint.
	// For normal files it's the file offset we handled.
	// For parquet files it's the row count we handled.
	// For compressed files it's the uncompressed file offset we handled.
	// TODO: replace pos with a new structure to specify position offset and rows offset
	Pos() (pos int64, rowID int64)
	SetPos(pos int64, rowID int64) error
	// ScannedPos always returns the current file reader pointer's location
	ScannedPos() (int64, error)
	Close() error
	ReadRow() error
	LastRow() Row
	RecycleRow(row Row)

	// Columns returns the _lower-case_ column names corresponding to values in
	// the LastRow.
	Columns() []string
	// SetColumns set restored column names to parser
	SetColumns([]string)

	SetLogger(log.Logger)

	SetRowID(rowID int64)
}

Parser provides some methods to parse a source data file.

type PooledReader

type PooledReader struct {
	// contains filtered or unexported fields
}

PooledReader is a throttled reader wrapper, where Read() calls have an upper limit of concurrency imposed by the given worker pool.

func MakePooledReader

func MakePooledReader(reader ReadSeekCloser, ioWorkers *worker.Pool) PooledReader

MakePooledReader constructs a new PooledReader.

func (PooledReader) Close

func (pr PooledReader) Close() error

Close implements io.Closer

func (PooledReader) Read

func (pr PooledReader) Read(p []byte) (n int, err error)

Read implements io.Reader

func (PooledReader) ReadFull

func (pr PooledReader) ReadFull(buf []byte) (n int, err error)

ReadFull is same as `io.ReadFull(pr)` with less worker recycling

func (PooledReader) Seek

func (pr PooledReader) Seek(offset int64, whence int) (int64, error)

Seek implements io.Seeker

type ReadSeekCloser

type ReadSeekCloser interface {
	io.Reader
	io.Seeker
	io.Closer
}

ReadSeekCloser = Reader + Seeker + Closer

type RegexRouter

type RegexRouter struct {
	// contains filtered or unexported fields
}

RegexRouter is a `FileRouter` implement that apply specific regex pattern to filepath. if regex pattern match, then each extractors with capture the matched regexp pattern and set value to target field in `RouteResult`

func (*RegexRouter) Route

func (r *RegexRouter) Route(path string) (*RouteResult, error)

Route routes a file path to a source file type.

type RouteResult

type RouteResult struct {
	filter.Table
	Key         string
	Compression Compression
	Type        SourceType
}

RouteResult contains the information for a file routing.

type Row

type Row struct {
	// RowID is the row id of the row.
	// as objects of this struct is reused, this RowID is increased when reading
	// next row.
	RowID  int64
	Row    []types.Datum
	Length int
}

Row is the content of a row.

func (Row) MarshalLogArray

func (row Row) MarshalLogArray(encoder zapcore.ArrayEncoder) error

MarshalLogArray implements the zapcore.ArrayMarshaler interface

type SchemaImporter

type SchemaImporter struct {
	// contains filtered or unexported fields
}

SchemaImporter is used to import schema from dump files.

func NewSchemaImporter

func NewSchemaImporter(logger log.Logger, sqlMode mysql.SQLMode, db *sql.DB, store storage.ExternalStorage, concurrency int) *SchemaImporter

NewSchemaImporter creates a new SchemaImporter instance.

func (*SchemaImporter) Run

func (si *SchemaImporter) Run(ctx context.Context, dbMetas []*MDDatabaseMeta) (err error)

Run imports all schemas from the given database metas.

type SourceFileMeta

type SourceFileMeta struct {
	Path        string
	Type        SourceType
	Compression Compression
	SortKey     string
	// FileSize is the size of the file in the storage.
	FileSize int64
	// WARNING: variables below are not persistent
	ExtendData ExtendColumnData
	// RealSize is same as FileSize if the file is not compressed and not parquet.
	// If the file is compressed, RealSize is the estimated uncompressed size.
	// If the file is parquet, RealSize is the estimated data size after convert
	// to row oriented storage.
	RealSize int64
	Rows     int64 // only for parquet
}

SourceFileMeta contains some analyzed metadata for a source file by MyDumper Loader.

type SourceType

type SourceType int

SourceType specifies the source file types.

const (
	// SourceTypeIgnore means this source file is ignored.
	SourceTypeIgnore SourceType = iota
	// SourceTypeSchemaSchema means this source file is a schema file for the DB.
	SourceTypeSchemaSchema
	// SourceTypeTableSchema means this source file is a schema file for the table.
	SourceTypeTableSchema
	// SourceTypeSQL means this source file is a SQL data file.
	SourceTypeSQL
	// SourceTypeCSV means this source file is a CSV data file.
	SourceTypeCSV
	// SourceTypeParquet means this source file is a parquet data file.
	SourceTypeParquet
	// SourceTypeViewSchema means this source file is a schema file for the view.
	SourceTypeViewSchema
)

func (SourceType) String

func (s SourceType) String() string

type StringReader

type StringReader struct{ *strings.Reader }

StringReader is a wrapper around *strings.Reader with an additional Close() method

func NewStringReader

func NewStringReader(s string) StringReader

NewStringReader constructs a new StringReader

func (StringReader) Close

func (StringReader) Close() error

Close implements io.Closer

type TableRegion

type TableRegion struct {
	EngineID int32

	DB         string
	Table      string
	FileMeta   SourceFileMeta
	ExtendData ExtendColumnData

	Chunk Chunk
}

TableRegion contains information for a table region during import.

func MakeSourceFileRegion

func MakeSourceFileRegion(
	ctx context.Context,
	cfg *DataDivideConfig,
	fi FileInfo,
) ([]*TableRegion, []float64, error)

MakeSourceFileRegion create a new source file region.

func MakeTableRegions

func MakeTableRegions(
	ctx context.Context,
	cfg *DataDivideConfig,
) ([]*TableRegion, error)

MakeTableRegions create a new table region. row-id range of returned TableRegion is increasing monotonically

func SplitLargeCSV

func SplitLargeCSV(
	ctx context.Context,
	cfg *DataDivideConfig,
	dataFile FileInfo,
) (regions []*TableRegion, dataFileSizes []float64, err error)

SplitLargeCSV splits a large csv file into multiple regions, the size of each regions is specified by `config.MaxRegionSize`. Note: We split the file coarsely, thus the format of csv file is needed to be strict. e.g. - CSV file with header is invalid - a complete tuple split into multiple lines is invalid

func (*TableRegion) Offset

func (reg *TableRegion) Offset() int64

Offset gets the offset in the file of this table region.

func (*TableRegion) RowIDMin

func (reg *TableRegion) RowIDMin() int64

RowIDMin returns the minimum row ID of this table region.

func (*TableRegion) Rows

func (reg *TableRegion) Rows() int64

Rows returns the row counts of this table region.

func (*TableRegion) Size

func (reg *TableRegion) Size() int64

Size gets the size of this table region.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL