disttae

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2022 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Overview

Copyright 2022 Matrix Origin

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	INSERT = iota
	DELETE
)
View Source
const (
	MO_DATABASE_ID_NAME_IDX       = 1
	MO_DATABASE_ID_ACCOUNT_IDX    = 2
	MO_DATABASE_LIST_ACCOUNT_IDX  = 1
	MO_TABLE_ID_NAME_IDX          = 1
	MO_TABLE_ID_DATABASE_ID_IDX   = 2
	MO_TABLE_ID_ACCOUNT_IDX       = 3
	MO_TABLE_LIST_DATABASE_ID_IDX = 1
	MO_TABLE_LIST_ACCOUNT_IDX     = 2
	MO_PRIMARY_OFF                = 2
)
View Source
const (
	HASH_VALUE_FUN string = "hash_value"
	MAX_RANGE_SIZE int64  = 200
)
View Source
const (
	GcCycle = 10 * time.Second
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BlockMeta

type BlockMeta struct {
	Rows    int64
	Info    catalog.BlockInfo
	Zonemap [][64]byte
}

func (BlockMeta) Eq

func (a BlockMeta) Eq(b BlockMeta) bool

type Columns

type Columns []column

func (Columns) Len

func (cols Columns) Len() int

func (Columns) Less

func (cols Columns) Less(i, j int) bool

func (Columns) Swap

func (cols Columns) Swap(i, j int)

type ColumnsIndexDef

type ColumnsIndexDef struct {
	Name    memtable.Text
	Columns []int
}

func NewColumnsIndexDef

func NewColumnsIndexDef(name memtable.Text, cols ...int) ColumnsIndexDef

type DB

type DB struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

DB is implementataion of cache

func (*DB) Update

func (db *DB) Update(ctx context.Context, dnList []DNStore, tbl *table, op client.TxnOperator,
	primaryIdx int, databaseId, tableId uint64, ts timestamp.Timestamp) error

type DNStore

type DNStore = logservice.DNStore

type DataRow

type DataRow struct {
	// contains filtered or unexported fields
}

func (*DataRow) Indexes

func (d *DataRow) Indexes() []memtable.Tuple

func (*DataRow) Key

func (d *DataRow) Key() RowID

func (*DataRow) UniqueIndexes

func (d *DataRow) UniqueIndexes() []memtable.Tuple

func (*DataRow) Value

func (d *DataRow) Value() DataValue

type DataValue

type DataValue struct {
	// contains filtered or unexported fields
}

type Engine

type Engine struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func New

func New(
	ctx context.Context,
	mp *mpool.MPool,
	fs fileservice.FileService,
	cli client.TxnClient,
	idGen IDGenerator,
	getClusterDetails engine.GetClusterDetailsFunc,
) *Engine

func (*Engine) Commit

func (e *Engine) Commit(ctx context.Context, op client.TxnOperator) error

func (*Engine) Create

func (e *Engine) Create(ctx context.Context, name string, op client.TxnOperator) error

func (*Engine) Database

func (e *Engine) Database(ctx context.Context, name string,
	op client.TxnOperator) (engine.Database, error)

func (*Engine) Databases

func (e *Engine) Databases(ctx context.Context, op client.TxnOperator) ([]string, error)

func (*Engine) Delete

func (e *Engine) Delete(ctx context.Context, name string, op client.TxnOperator) error

func (*Engine) Hints

func (e *Engine) Hints() (h engine.Hints)

func (*Engine) New

func (e *Engine) New(ctx context.Context, op client.TxnOperator) error

func (*Engine) NewBlockReader

func (e *Engine) NewBlockReader(ctx context.Context, num int, ts timestamp.Timestamp,
	expr *plan.Expr, ranges [][]byte, tblDef *plan.TableDef) ([]engine.Reader, error)

func (*Engine) Nodes

func (e *Engine) Nodes() (engine.Nodes, error)

func (*Engine) Rollback

func (e *Engine) Rollback(ctx context.Context, op client.TxnOperator) error

type Entry

type Entry struct {
	// contains filtered or unexported fields
}

Entry represents a delete/insert

type IDGenerator

type IDGenerator interface {
	AllocateID(ctx context.Context) (uint64, error)
}

type MVCC

type MVCC interface {
	CheckPoint(ctx context.Context, ts timestamp.Timestamp) error
	Insert(ctx context.Context, primaryKeyIndex int, bat *api.Batch, needCheck bool) error
	Delete(ctx context.Context, bat *api.Batch) error
	BlockList(ctx context.Context, ts timestamp.Timestamp,
		blocks []BlockMeta, entries []Entry) ([]BlockMeta, map[uint64][]int)
	// If blocks is empty, it means no merge operation with the files on s3 is required.
	NewReader(ctx context.Context, readerNumber int, index memtable.Tuple, defs []engine.TableDef,
		tableDef *plan.TableDef, skipBlocks map[uint64]uint8, blks []ModifyBlockMeta,
		ts timestamp.Timestamp, fs fileservice.FileService, entries []Entry) ([]engine.Reader, error)
}

mvcc is the core data structure of cn and is used to maintain multiple versions of logtail data for a table's partition

type ModifyBlockMeta

type ModifyBlockMeta struct {
	// contains filtered or unexported fields
}

type Partition

type Partition struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

a partition corresponds to a dn

func NewPartition

func NewPartition(
	columnsIndexDefs []ColumnsIndexDef,
) *Partition

func (*Partition) BlockList

func (p *Partition) BlockList(ctx context.Context, ts timestamp.Timestamp,
	blocks []BlockMeta, entries []Entry) ([]BlockMeta, map[uint64][]int)

func (*Partition) CheckPoint

func (*Partition) CheckPoint(ctx context.Context, ts timestamp.Timestamp) error

func (*Partition) Delete

func (p *Partition) Delete(ctx context.Context, b *api.Batch) error

func (*Partition) DeleteByBlockID

func (p *Partition) DeleteByBlockID(ctx context.Context, ts timestamp.Timestamp, blockID uint64) error

func (*Partition) GC

func (p *Partition) GC(ts timestamp.Timestamp) error

func (*Partition) Get

func (p *Partition) Get(key types.Rowid, ts timestamp.Timestamp) bool

func (*Partition) GetRowsByIndex

func (p *Partition) GetRowsByIndex(ts timestamp.Timestamp, index memtable.Tuple,
	columns []string, deletes map[types.Rowid]uint8) (rows [][]any, err error)

func (*Partition) GetRowsByIndexPrefix

func (p *Partition) GetRowsByIndexPrefix(ts timestamp.Timestamp, prefix memtable.Tuple) (rows []DataValue, err error)

func (*Partition) Insert

func (p *Partition) Insert(ctx context.Context, primaryKeyIndex int,
	b *api.Batch, needCheck bool) error

func (*Partition) IterDeletedRowIDs

func (p *Partition) IterDeletedRowIDs(ctx context.Context, blockIDs []uint64, ts timestamp.Timestamp, fn func(rowID RowID) bool)

func (*Partition) NewReader

func (p *Partition) NewReader(
	ctx context.Context,
	readerNumber int,
	index memtable.Tuple,
	defs []engine.TableDef,
	tableDef *plan.TableDef,
	skipBlocks map[uint64]uint8,
	blks []ModifyBlockMeta,
	ts timestamp.Timestamp,
	fs fileservice.FileService,
	entries []Entry,
) ([]engine.Reader, error)

type PartitionReader

type PartitionReader struct {
	// contains filtered or unexported fields
}

func (*PartitionReader) Close

func (p *PartitionReader) Close() error

func (*PartitionReader) Read

func (p *PartitionReader) Read(colNames []string, expr *plan.Expr, mp *mpool.MPool) (*batch.Batch, error)

type Partitions

type Partitions []*Partition

type RowID

type RowID types.Rowid

func (RowID) Less

func (r RowID) Less(than RowID) bool

type Transaction

type Transaction struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Transaction represents a transaction

func (*Transaction) IncStatementId

func (txn *Transaction) IncStatementId()

use for solving halloween problem

func (*Transaction) ReadOnly

func (txn *Transaction) ReadOnly() bool

detecting whether a transaction is a read-only transaction

func (*Transaction) RegisterFile

func (txn *Transaction) RegisterFile(fileName string)

func (*Transaction) WriteBatch

func (txn *Transaction) WriteBatch(
	typ int,
	databaseId uint64,
	tableId uint64,
	databaseName string,
	tableName string,
	bat *batch.Batch,
	dnStore DNStore,
	primaryIdx int,
) error

Write used to write data to the transaction buffer insert/delete/update all use this api

func (*Transaction) WriteFile

func (txn *Transaction) WriteFile(typ int, databaseId, tableId uint64,
	databaseName, tableName string, fileName string) error

WriteFile used to add a s3 file information to the transaction buffer insert/delete/update all use this api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL