splitter

package
v0.0.0-...-ddbfbf2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultChannelBuffer = 1024

DefaultChannelBuffer is the default size for channel buffer

View Source
const (
	// SplitThreshold is the threshold for splitting
	SplitThreshold = 1000
)

Variables

This section is empty.

Functions

func GetSplitFields

func GetSplitFields(table *model.TableInfo, splitFields []string) ([]*model.ColumnInfo, error)

GetSplitFields returns fields to split chunks, order by pk, uk, index, columns.

Types

type BucketIterator

type BucketIterator struct {
	// contains filtered or unexported fields
}

BucketIterator is struct for bucket iterator

func NewBucketIterator

func NewBucketIterator(ctx context.Context, progressID string, table *common.TableDiff, dbConn *sql.DB) (*BucketIterator, error)

NewBucketIterator return a new iterator

func NewBucketIteratorWithCheckpoint

func NewBucketIteratorWithCheckpoint(
	ctx context.Context,
	progressID string,
	table *common.TableDiff,
	dbConn *sql.DB,
	startRange *RangeInfo,
	bucketSpliterPool *utils.WorkerPool,
) (*BucketIterator, error)

NewBucketIteratorWithCheckpoint return a new iterator

func (*BucketIterator) Close

func (s *BucketIterator) Close()

Close closes the iterator

func (*BucketIterator) GetIndexID

func (s *BucketIterator) GetIndexID() int64

GetIndexID return the index id

func (*BucketIterator) Next

func (s *BucketIterator) Next() (*chunk.Range, error)

Next return the next chunk

type ChunkIterator

type ChunkIterator interface {
	// Next seeks the next chunk, return nil if seeks to end.
	Next() (*chunk.Range, error)

	// Close close the current iterator.
	Close()
}

ChunkIterator generate next chunk for only one table lazily.

type LimitIterator

type LimitIterator struct {
	// contains filtered or unexported fields
}

LimitIterator is the iterator with limit

func NewLimitIterator

func NewLimitIterator(ctx context.Context, progressID string, table *common.TableDiff, dbConn *sql.DB) (*LimitIterator, error)

NewLimitIterator return a new iterator

func NewLimitIteratorWithCheckpoint

func NewLimitIteratorWithCheckpoint(
	ctx context.Context,
	progressID string,
	table *common.TableDiff,
	dbConn *sql.DB,
	startRange *RangeInfo,
) (*LimitIterator, error)

NewLimitIteratorWithCheckpoint return a new iterator

func (*LimitIterator) Close

func (lmt *LimitIterator) Close()

Close close the iterator

func (*LimitIterator) GetIndexID

func (lmt *LimitIterator) GetIndexID() int64

GetIndexID get the current index id

func (*LimitIterator) Next

func (lmt *LimitIterator) Next() (*chunk.Range, error)

Next return the next chunk

type RandomIterator

type RandomIterator struct {
	// contains filtered or unexported fields
}

RandomIterator is used to random iterate a table

func NewRandomIterator

func NewRandomIterator(ctx context.Context, progressID string, table *common.TableDiff, dbConn *sql.DB) (*RandomIterator, error)

NewRandomIterator return a new iterator

func NewRandomIteratorWithCheckpoint

func NewRandomIteratorWithCheckpoint(
	ctx context.Context,
	progressID string,
	table *common.TableDiff,
	dbConn *sql.DB,
	startRange *RangeInfo,
) (*RandomIterator, error)

NewRandomIteratorWithCheckpoint return a new iterator with checkpoint

func (*RandomIterator) Close

func (s *RandomIterator) Close()

Close close the iterator

func (*RandomIterator) Next

func (s *RandomIterator) Next() (*chunk.Range, error)

Next get the next chunk

type RangeInfo

type RangeInfo struct {
	ChunkRange *chunk.Range `json:"chunk-range"`
	// for bucket checkpoint
	IndexID int64 `json:"index-id"`

	ProgressID string `json:"progress-id"`
}

RangeInfo represents the unit of a process chunk. It's the only entrance of checkpoint.

func FromNode

func FromNode(n *checkpoints.Node) *RangeInfo

FromNode converts the Node into RangeInfo

func (*RangeInfo) Copy

func (r *RangeInfo) Copy() *RangeInfo

Copy returns a copy of RangeInfo

func (*RangeInfo) GetBucketIndexLeft

func (r *RangeInfo) GetBucketIndexLeft() int

GetBucketIndexLeft returns the BucketIndexLeft

func (*RangeInfo) GetBucketIndexRight

func (r *RangeInfo) GetBucketIndexRight() int

GetBucketIndexRight returns the BucketIndexRight

func (*RangeInfo) GetChunk

func (r *RangeInfo) GetChunk() *chunk.Range

GetChunk returns the chunk

func (*RangeInfo) GetChunkIndex

func (r *RangeInfo) GetChunkIndex() int

GetChunkIndex returns the ChunkIndex

func (*RangeInfo) GetTableIndex

func (r *RangeInfo) GetTableIndex() int

GetTableIndex returns the index of table diffs. IMPORTANT!!! We need to keep the tables order during checkpoint. So we should have to save the config info to checkpoint file too

func (*RangeInfo) ToNode

func (r *RangeInfo) ToNode() *checkpoints.Node

ToNode converts RangeInfo to node

func (*RangeInfo) Update

func (r *RangeInfo) Update(column, lower, upper string, updateLower, updateUpper bool, collation, limits string)

Update updates the current RangeInfo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL