mydump

package
v0.0.0-...-41b4272 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2022 License: Apache-2.0, Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SchemaSchema = "schema-schema"
	TableSchema  = "table-schema"
	ViewSchema   = "view-schema"
	TypeSQL      = "sql"
	TypeCSV      = "csv"
	TypeParquet  = "parquet"
	TypeIgnore   = "ignore"
)

Variables

View Source
var (
	ErrInsertStatementNotFound = errors.New("insert statement not found")
)

Functions

func AllocateEngineIDs

func AllocateEngineIDs(
	filesRegions []*TableRegion,
	dataFileSizes []float64,
	batchSize float64,
	batchImportRatio float64,
	tableConcurrency float64,
)

func ExportStatement

func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, characterSet string) ([]byte, error)

func IndexAnyByte

func IndexAnyByte(s []byte, as *byteSet) int

IndexAnyByte returns the byte index of the first occurrence in s of any of the byte points in chars. It returns -1 if there is no code point in common.

func OpenParquetReader

func OpenParquetReader(
	ctx context.Context,
	store storage.ExternalStorage,
	path string,
	size int64,
) (source.ParquetFile, error)

func ReadParquetFileRowCount

func ReadParquetFileRowCount(
	ctx context.Context,
	store storage.ExternalStorage,
	r storage.ReadSeekCloser,
	path string,
) (int64, error)

a special func to fetch parquet file row count fast.

Types

type CSVParser

type CSVParser struct {
	// contains filtered or unexported fields
}

CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.

func NewCSVParser

func NewCSVParser(
	cfg *config.CSVConfig,
	reader ReadSeekCloser,
	blockBufSize int64,
	ioWorkers *worker.Pool,
	shouldParseHeader bool,
	charsetConvertor *CharsetConvertor,
) (*CSVParser, error)

func (*CSVParser) Close

func (parser *CSVParser) Close() error

func (*CSVParser) Columns

func (parser *CSVParser) Columns() []string

func (*CSVParser) LastRow

func (parser *CSVParser) LastRow() Row

LastRow is the copy of the row parsed by the last call to ReadRow().

func (*CSVParser) Pos

func (parser *CSVParser) Pos() (int64, int64)

Pos returns the current file offset.

func (*CSVParser) ReadColumns

func (parser *CSVParser) ReadColumns() error

func (*CSVParser) ReadRow

func (parser *CSVParser) ReadRow() error

ReadRow reads a row from the datafile.

func (*CSVParser) ReadUntilTerminator

func (parser *CSVParser) ReadUntilTerminator() (int64, error)

ReadUntilTerminator seeks the file until the terminator token is found, and returns the file offset beyond the terminator. This function is used in strict-format dividing a CSV file.

func (*CSVParser) RecycleRow

func (parser *CSVParser) RecycleRow(row Row)

RecycleRow places the row object back into the allocation pool.

func (*CSVParser) SetColumns

func (parser *CSVParser) SetColumns(columns []string)

func (*CSVParser) SetLogger

func (parser *CSVParser) SetLogger(logger log.Logger)

func (*CSVParser) SetPos

func (parser *CSVParser) SetPos(pos int64, rowID int64) error

SetPos changes the reported position and row ID.

type CharsetConvertor

type CharsetConvertor struct {
	// contains filtered or unexported fields
}

CharsetConvertor is used to convert a character set to utf8mb4 encoding. In Lightning, we mainly use it to do the GB18030/GBK -> UTF8MB4 conversion.

func NewCharsetConvertor

func NewCharsetConvertor(dataCharacterSet, dataInvalidCharReplace string) (*CharsetConvertor, error)

NewCharsetConvertor creates a new CharsetConvertor.

func (*CharsetConvertor) Decode

func (cc *CharsetConvertor) Decode(src string) (string, error)

Decode does the charset conversion work from sourceCharacterSet to utf8mb4. It will return a string as the conversion result whose length may be less or greater than the original string `src`. TODO: maybe using generic type later to make Decode/Encode accept both []byte and string.

func (*CharsetConvertor) Encode

func (cc *CharsetConvertor) Encode(src string) (string, error)

Encode will encode the data from utf8mb4 to sourceCharacterSet.

type Chunk

type Chunk struct {
	Offset       int64
	EndOffset    int64
	PrevRowIDMax int64
	RowIDMax     int64
	Columns      []string
}

Chunk represents a portion of the data file.

func ReadChunks

func ReadChunks(parser Parser, minSize int64) ([]Chunk, error)

ReadChunks parses the entire file and splits it into continuous chunks of size >= minSize.

type ChunkParser

type ChunkParser struct {
	// contains filtered or unexported fields
}

ChunkParser is a parser of the data files (the file containing only INSERT statements).

func NewChunkParser

func NewChunkParser(
	sqlMode mysql.SQLMode,
	reader ReadSeekCloser,
	blockBufSize int64,
	ioWorkers *worker.Pool,
) *ChunkParser

NewChunkParser creates a new parser which can read chunks out of a file.

func (*ChunkParser) Close

func (parser *ChunkParser) Close() error

func (*ChunkParser) Columns

func (parser *ChunkParser) Columns() []string

func (*ChunkParser) LastRow

func (parser *ChunkParser) LastRow() Row

LastRow is the copy of the row parsed by the last call to ReadRow().

func (*ChunkParser) Pos

func (parser *ChunkParser) Pos() (int64, int64)

Pos returns the current file offset.

func (*ChunkParser) ReadRow

func (parser *ChunkParser) ReadRow() error

ReadRow reads a row from the datafile.

func (*ChunkParser) RecycleRow

func (parser *ChunkParser) RecycleRow(row Row)

RecycleRow places the row object back into the allocation pool.

func (*ChunkParser) SetColumns

func (parser *ChunkParser) SetColumns(columns []string)

func (*ChunkParser) SetLogger

func (parser *ChunkParser) SetLogger(logger log.Logger)

func (*ChunkParser) SetPos

func (parser *ChunkParser) SetPos(pos int64, rowID int64) error

SetPos changes the reported position and row ID.

type Compression

type Compression int
const (
	CompressionNone Compression = iota
	CompressionGZ
	CompressionLZ4
	CompressionZStd
	CompressionXZ
)

type FileInfo

type FileInfo struct {
	TableName filter.Table
	FileMeta  SourceFileMeta
}

type FileRouter

type FileRouter interface {
	// Route apply rule to path. Return nil if path doesn't math route rule;
	// return error if path match route rule but the captured value for field is invalid
	Route(path string) (*RouteResult, error)
}

// RouteRule is a rule to route file path to target schema/table

func NewFileRouter

func NewFileRouter(cfg []*config.FileRouteRule) (FileRouter, error)

type MDDatabaseMeta

type MDDatabaseMeta struct {
	Name       string
	SchemaFile string
	Tables     []*MDTableMeta
	Views      []*MDTableMeta
	// contains filtered or unexported fields
}

type MDLoader

type MDLoader struct {
	// contains filtered or unexported fields
}

Mydumper File Loader

func NewMyDumpLoader

func NewMyDumpLoader(ctx context.Context, cfg *config.Config) (*MDLoader, error)

func NewMyDumpLoaderWithStore

func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config, store storage.ExternalStorage) (*MDLoader, error)

func (*MDLoader) GetDatabases

func (l *MDLoader) GetDatabases() []*MDDatabaseMeta

func (*MDLoader) GetStore

func (l *MDLoader) GetStore() storage.ExternalStorage

type MDTableMeta

type MDTableMeta struct {
	DB         string
	Name       string
	SchemaFile FileInfo
	DataFiles  []FileInfo

	TotalSize    int64
	IndexRatio   float64
	IsRowOrdered bool
	// contains filtered or unexported fields
}

func (*MDTableMeta) GetSchema

func (m *MDTableMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error)

type ParquetParser

type ParquetParser struct {
	Reader *preader.ParquetReader
	// contains filtered or unexported fields
}

func NewParquetParser

func NewParquetParser(
	ctx context.Context,
	store storage.ExternalStorage,
	r storage.ReadSeekCloser,
	path string,
) (*ParquetParser, error)

func (*ParquetParser) Close

func (pp *ParquetParser) Close() error

func (*ParquetParser) Columns

func (pp *ParquetParser) Columns() []string

Columns returns the _lower-case_ column names corresponding to values in the LastRow.

func (*ParquetParser) LastRow

func (pp *ParquetParser) LastRow() Row

func (*ParquetParser) Pos

func (pp *ParquetParser) Pos() (pos int64, rowID int64)

Pos returns the currently row number of the parquet file

func (*ParquetParser) ReadRow

func (pp *ParquetParser) ReadRow() error

func (*ParquetParser) RecycleRow

func (pp *ParquetParser) RecycleRow(row Row)

func (*ParquetParser) SetColumns

func (pp *ParquetParser) SetColumns(cols []string)

SetColumns set restored column names to parser

func (*ParquetParser) SetLogger

func (pp *ParquetParser) SetLogger(l log.Logger)

func (*ParquetParser) SetPos

func (pp *ParquetParser) SetPos(pos int64, rowID int64) error

type Parser

type Parser interface {
	Pos() (pos int64, rowID int64)
	SetPos(pos int64, rowID int64) error
	Close() error
	ReadRow() error
	LastRow() Row
	RecycleRow(row Row)

	// Columns returns the _lower-case_ column names corresponding to values in
	// the LastRow.
	Columns() []string
	// SetColumns set restored column names to parser
	SetColumns([]string)

	SetLogger(log.Logger)
}

type PooledReader

type PooledReader struct {
	// contains filtered or unexported fields
}

PooledReader is a throttled reader wrapper, where Read() calls have an upper limit of concurrency imposed by the given worker pool.

func MakePooledReader

func MakePooledReader(reader ReadSeekCloser, ioWorkers *worker.Pool) PooledReader

MakePooledReader constructs a new PooledReader.

func (PooledReader) Close

func (pr PooledReader) Close() error

Close implements io.Closer

func (PooledReader) Read

func (pr PooledReader) Read(p []byte) (n int, err error)

Read implements io.Reader

func (PooledReader) ReadFull

func (pr PooledReader) ReadFull(buf []byte) (n int, err error)

ReadFull is same as `io.ReadFull(pr)` with less worker recycling

func (PooledReader) Seek

func (pr PooledReader) Seek(offset int64, whence int) (int64, error)

Seek implements io.Seeker

type ReadSeekCloser

type ReadSeekCloser interface {
	io.Reader
	io.Seeker
	io.Closer
}

ReadSeekCloser = Reader + Seeker + Closer

type RegexRouter

type RegexRouter struct {
	// contains filtered or unexported fields
}

`RegexRouter` is a `FileRouter` implement that apply specific regex pattern to filepath. if regex pattern match, then each extractors with capture the matched regexp pattern and set value to target field in `RouteResult`

func (*RegexRouter) Route

func (r *RegexRouter) Route(path string) (*RouteResult, error)

type RouteResult

type RouteResult struct {
	filter.Table
	Key         string
	Compression Compression
	Type        SourceType
}

type Row

type Row struct {
	RowID  int64
	Row    []types.Datum
	Length int
}

Row is the content of a row.

func (Row) MarshalLogArray

func (row Row) MarshalLogArray(encoder zapcore.ArrayEncoder) error

MarshalLogArray implements the zapcore.ArrayMarshaler interface

type SourceFileMeta

type SourceFileMeta struct {
	Path        string
	Type        SourceType
	Compression Compression
	SortKey     string
	FileSize    int64
}

type SourceType

type SourceType int
const (
	SourceTypeIgnore SourceType = iota
	SourceTypeSchemaSchema
	SourceTypeTableSchema
	SourceTypeSQL
	SourceTypeCSV
	SourceTypeParquet
	SourceTypeViewSchema
)

func (SourceType) String

func (s SourceType) String() string

type StringReader

type StringReader struct{ *strings.Reader }

StringReader is a wrapper around *strings.Reader with an additional Close() method

func NewStringReader

func NewStringReader(s string) StringReader

NewStringReader constructs a new StringReader

func (StringReader) Close

func (sr StringReader) Close() error

Close implements io.Closer

type TableRegion

type TableRegion struct {
	EngineID int32

	DB       string
	Table    string
	FileMeta SourceFileMeta

	Chunk Chunk
}

func MakeTableRegions

func MakeTableRegions(
	ctx context.Context,
	meta *MDTableMeta,
	columns int,
	cfg *config.Config,
	ioWorkers *worker.Pool,
	store storage.ExternalStorage,
) ([]*TableRegion, error)

func SplitLargeFile

func SplitLargeFile(
	ctx context.Context,
	meta *MDTableMeta,
	cfg *config.Config,
	dataFile FileInfo,
	divisor int64,
	prevRowIdxMax int64,
	ioWorker *worker.Pool,
	store storage.ExternalStorage,
) (prevRowIDMax int64, regions []*TableRegion, dataFileSizes []float64, err error)

SplitLargeFile splits a large csv file into multiple regions, the size of each regions is specified by `config.MaxRegionSize`. Note: We split the file coarsely, thus the format of csv file is needed to be strict. e.g. - CSV file with header is invalid - a complete tuple split into multiple lines is invalid

func (*TableRegion) Offset

func (reg *TableRegion) Offset() int64

func (*TableRegion) RowIDMin

func (reg *TableRegion) RowIDMin() int64

func (*TableRegion) Rows

func (reg *TableRegion) Rows() int64

func (*TableRegion) Size

func (reg *TableRegion) Size() int64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL