db

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2022 License: Apache-2.0 Imports: 48 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LockName string = "AOE"
)

Variables

View Source
var (
	ErrClosed            = errors.New("aoe: closed")
	ErrUnsupported       = errors.New("aoe: unsupported")
	ErrNotFound          = errors.New("aoe: notfound")
	ErrUnexpectedWalRole = errors.New("aoe: unexpected wal role setted")
	ErrTimeout           = errors.New("aoe: timeout")
	ErrStaleErr          = errors.New("aoe: stale")
	ErrIdempotence       = metadata.ErrIdempotence
	ErrResourceDeleted   = errors.New("aoe: resource is deleted")
)
View Source
var (
	DefaultFlushInterval     = time.Duration(20) * time.Second
	DefaultNodeFlushInterval = time.Duration(120) * time.Second
)
View Source
var (
	CopyTableFn func(t iface.ITableData, destDir string) error
	CopyFileFn  func(src, dest string) error
)

Functions

func CopyBlockFileToDestDir

func CopyBlockFileToDestDir(file, srcDir, destDir string, idMapFn func(*common.ID) (*common.ID, error)) error

func CopyDataFiles

func CopyDataFiles(tblks, blks, segs []string, srcDir, destDir string, blkMapFn, segMapFn func(*common.ID) (*common.ID, error)) error

func CopySegmentFileToDestDir

func CopySegmentFileToDestDir(file, srcDir, destDir string, idMapFn func(*common.ID) (*common.ID, error)) error

func CopyTBlockFileToDestDir

func CopyTBlockFileToDestDir(file, srcDir, destDir string, idMapFn func(*common.ID) (*common.ID, error)) error

func NewDBSSLoader

func NewDBSSLoader(catalog *metadata.Catalog, replaced *metadata.Database, tables *table.Tables, src string) *ssLoader

func NewDBSSWriter

func NewDBSSWriter(database *metadata.Database, dir string, tables *table.Tables) *ssWriter

func NewReplayHandle

func NewReplayHandle(workDir string, catalog *metadata.Catalog, tables *table.Tables, observer IReplayObserver) *replayHandle

func NewSegmentFilter

func NewSegmentFilter(s *Segment) engine.Filter

func NewSegmentSparseFilter

func NewSegmentSparseFilter(s *Segment) aoe.SparseFilter

func NewSegmentSummarizer

func NewSegmentSummarizer(s *Segment) engine.Summarizer

func ScanMigrationDir

func ScanMigrationDir(path string) (metas []string, tblks []string, blks []string, segs []string, err error)

Types

type Block

type Block struct {
	// Segment this block belongs to
	Host *Segment
	// uint64 representation of block id
	Id uint64
	// string representation of block id
	StrId string
}

Block is a high-level wrapper of the block type in memory. It only provides some essential interfaces used by computation layer.

func (*Block) Cardinality

func (blk *Block) Cardinality(_ string) int64

func (*Block) ID

func (blk *Block) ID() string

ID returns the string representation of this block's id.

func (*Block) Prefetch

func (blk *Block) Prefetch(attrs []string)

Prefetch prefetches the given columns of this block. We use the readahead system call, so if the data has been already fetched in page cache, nothing would happen.

func (*Block) Read

func (blk *Block) Read(cs []uint64, attrs []string, compressed []*bytes.Buffer, deCompressed []*bytes.Buffer) (*batch.Batch, error)

Read reads the given columns into memory, and build a pipeline batch with the given reference count used by computation layer. For memory reuse, we pass two buffer array down and all the temp usage of memory would be taken in those buffers. (e.g. decompress)

func (*Block) Rows

func (blk *Block) Rows() int64

Rows returns how many rows this block contains currently.

func (*Block) Size

func (blk *Block) Size(attr string) int64

Size returns the memory usage of the certain column in a block.

type ColDef

type ColDef = metadata.ColDef

type DB

type DB struct {
	// Working directory of DB
	Dir string
	// Basic options of DB
	Opts *storage.Options
	// FsMgr manages all file related usages including virtual file.
	FsMgr base.IManager
	// IndexBufMgr manages all segment/block indices in memory.
	IndexBufMgr bmgrif.IBufferManager

	// Those two managers not used currently.
	MTBufMgr  bmgrif.IBufferManager
	SSTBufMgr bmgrif.IBufferManager

	// MutationBufMgr is a replacement for MTBufMgr
	MutationBufMgr bb.INodeManager

	Wal wal.ShardAwareWal

	FlushDriver  flusher.Driver
	TimedFlusher wb.IHeartbeater

	// Internal data storage of DB.
	Store struct {
		Mu         *sync.RWMutex
		Catalog    *metadata.Catalog
		DataTables *table.Tables
	}

	DataDir  *os.File
	DBLocker io.Closer

	// Scheduler schedules all the events happening like flush segment, drop table, etc.
	Scheduler sched.Scheduler

	Closed  *atomic.Value
	ClosedC chan struct{}
}

func Open

func Open(dirname string, opts *storage.Options) (db *DB, err error)

func OpenWithWalBroker

func OpenWithWalBroker(dirname string, opts *storage.Options) (db *DB, err error)

func (*DB) AbortTxn

func (d *DB) AbortTxn(txn *TxnCtx) error

func (*DB) ApplySnapshot

func (d *DB) ApplySnapshot(dbName string, path string) error

ApplySnapshot applies a snapshot of the shard stored in `path` to engine atomically.

func (*DB) Close

func (d *DB) Close() error

func (*DB) CommitTxn

func (d *DB) CommitTxn(txn *TxnCtx) error

func (*DB) CreateDatabaseInTxn

func (d *DB) CreateDatabaseInTxn(txn *TxnCtx, name string) (*metadata.Database, error)

func (*DB) CreateSnapshot

func (d *DB) CreateSnapshot(dbName string, path string, forcesync bool) (uint64, error)

There is a premise here, that is, all mutation requests of a database are single-threaded

func (*DB) CreateTableInTxn

func (d *DB) CreateTableInTxn(txn *TxnCtx, dbName string, schema *TableSchema, indice *IndexSchema) (*metadata.Table, error)

func (*DB) DatabaseNames

func (d *DB) DatabaseNames() []string

func (*DB) DoAppend

func (d *DB) DoAppend(meta *metadata.Table, data *batch.Batch, index *shard.SliceIndex) error

func (*DB) DoCreateSnapshot

func (d *DB) DoCreateSnapshot(database *metadata.Database, path string, forcesync bool) (uint64, error)

func (*DB) DoFlushDatabase

func (d *DB) DoFlushDatabase(meta *metadata.Database) error

func (*DB) DoFlushTable

func (d *DB) DoFlushTable(meta *metadata.Table) error

func (*DB) FlushDatabase

func (d *DB) FlushDatabase(dbName string) error

func (*DB) FlushTable

func (d *DB) FlushTable(dbName, tableName string) error

func (*DB) ForceCompactCatalog

func (d *DB) ForceCompactCatalog() error

func (*DB) GetDBCheckpointId

func (d *DB) GetDBCheckpointId(dbName string) uint64

func (*DB) GetSegmentIds

func (d *DB) GetSegmentIds(dbName string, tableName string) (ids dbi.IDS)

func (*DB) GetShardCheckpointId

func (d *DB) GetShardCheckpointId(shardId uint64) uint64

func (*DB) GetSnapshot

func (d *DB) GetSnapshot(ctx *dbi.GetSnapshotCtx) (*handle.Snapshot, error)

func (*DB) GetTableData

func (d *DB) GetTableData(meta *metadata.Table) (tiface.ITableData, error)

func (*DB) GetTempDir

func (d *DB) GetTempDir() string

func (*DB) IsClosed

func (d *DB) IsClosed() bool

func (*DB) MakeMutationHandle

func (d *DB) MakeMutationHandle(meta *metadata.Table) (iface.MutationHandle, error)

func (*DB) ScheduleGCDatabase

func (d *DB) ScheduleGCDatabase(database *metadata.Database)

func (*DB) ScheduleGCTable

func (d *DB) ScheduleGCTable(meta *metadata.Table)

func (*DB) SpliteDatabaseCheck

func (d *DB) SpliteDatabaseCheck(dbName string, size uint64) (coarseSize uint64, coarseCount uint64, keys [][]byte, ctx []byte, err error)

func (*DB) StartTxn

func (d *DB) StartTxn(index *metadata.LogIndex) *TxnCtx

FIXME: start txn should not accept log index. For create database, the index is comfirmed until then end

func (*DB) TableIDs

func (d *DB) TableIDs(dbName string) (ids []uint64, err error)

func (*DB) TableIdempotenceCheckAndIndexRewrite

func (d *DB) TableIdempotenceCheckAndIndexRewrite(meta *metadata.Table, index *LogIndex) (*LogIndex, error)

func (*DB) TableNames

func (d *DB) TableNames(dbName string) (names []string)

type IReplayObserver

type IReplayObserver interface {
	OnRemove(string)
}

type IndexId

type IndexId = metadata.IndexId

type IndexInfo

type IndexInfo = metadata.IndexInfo

type IndexSchema

type IndexSchema = metadata.IndexSchema

type IndexT

type IndexT = metadata.IndexT

type LogIndex

type LogIndex = metadata.LogIndex

type RenameTableFactory

type RenameTableFactory = metadata.RenameTableFactory

type SSLoader

type SSLoader = metadata.SSLoader

type SSWriter

type SSWriter interface {
	io.Closer
	metadata.SSWriter
	GetIndex() uint64
}

type Segment

type Segment struct {
	Data iface.ISegment
	Ids  *atomic.Value
}

Segment is a high-level wrapper of the segment type in memory. It only provides some essential interfaces used by computation layer.

func (*Segment) Block

func (seg *Segment) Block(id string) aoe.Block

Block returns a block with the given block id.

func (*Segment) Blocks

func (seg *Segment) Blocks() []string

Blocks returns the list of block ids in string type.

func (*Segment) Cardinality

func (seg *Segment) Cardinality(_ string) int64

func (*Segment) ID

func (seg *Segment) ID() string

ID returns the string representation of this segment's id.

func (*Segment) NewFilter

func (seg *Segment) NewFilter() engine.Filter

NewFilter generates a Filter for segment.

func (*Segment) NewSparseFilter

func (seg *Segment) NewSparseFilter() aoe.SparseFilter

NewSparseFilter generates a SparseFilter for segment.

func (*Segment) NewSummarizer

func (seg *Segment) NewSummarizer() engine.Summarizer

NewSummarizer generates a Summarizer for segment.

func (*Segment) Rows

func (seg *Segment) Rows() int64

Rows returns how many rows this segment contains currently.

func (*Segment) Size

func (seg *Segment) Size(attr string) int64

Size returns the memory usage of the certain column in a segment.

type SegmentFilter

type SegmentFilter struct {
	// contains filtered or unexported fields
}

SegmentFilter provides segment-level & dense interfaces with bitmap support. (e.g. Eq(string, interface{}) (*roaring.Bitmap, error) where inputs are column name and value, returns a bitmap telling which rows have the same value.)

func (*SegmentFilter) Btw

func (f *SegmentFilter) Btw(attr string, minv interface{}, maxv interface{}) (*roaring64.Bitmap, error)

func (*SegmentFilter) Eq

func (f *SegmentFilter) Eq(attr string, val interface{}) (*roaring64.Bitmap, error)

func (*SegmentFilter) Ge

func (f *SegmentFilter) Ge(attr string, val interface{}) (*roaring64.Bitmap, error)

func (*SegmentFilter) Gt

func (f *SegmentFilter) Gt(attr string, val interface{}) (*roaring64.Bitmap, error)

func (*SegmentFilter) Le

func (f *SegmentFilter) Le(attr string, val interface{}) (*roaring64.Bitmap, error)

func (*SegmentFilter) Lt

func (f *SegmentFilter) Lt(attr string, val interface{}) (*roaring64.Bitmap, error)

func (*SegmentFilter) Ne

func (f *SegmentFilter) Ne(attr string, val interface{}) (*roaring64.Bitmap, error)

type SegmentSparseFilter

type SegmentSparseFilter struct {
	// contains filtered or unexported fields
}

SegmentSparseFilter provides segment-level & sparse interfaces with bitmap support. (e.g. Eq(string, interface{}) ([]string, error) where inputs are column name and value, returns a string array telling which blocks *might* have the same value.)

func (*SegmentSparseFilter) Btw

func (f *SegmentSparseFilter) Btw(attr string, minv interface{}, maxv interface{}) ([]string, error)

func (*SegmentSparseFilter) Eq

func (f *SegmentSparseFilter) Eq(attr string, val interface{}) ([]string, error)

func (*SegmentSparseFilter) Ge

func (f *SegmentSparseFilter) Ge(attr string, val interface{}) ([]string, error)

func (*SegmentSparseFilter) Gt

func (f *SegmentSparseFilter) Gt(attr string, val interface{}) ([]string, error)

func (*SegmentSparseFilter) Le

func (f *SegmentSparseFilter) Le(attr string, val interface{}) ([]string, error)

func (*SegmentSparseFilter) Lt

func (f *SegmentSparseFilter) Lt(attr string, val interface{}) ([]string, error)

func (*SegmentSparseFilter) Ne

func (f *SegmentSparseFilter) Ne(attr string, val interface{}) ([]string, error)

type SegmentSummarizer

type SegmentSummarizer struct {
	// contains filtered or unexported fields
}

SegmentSummarizer provides segment-level aggregations with bitmap support. (e.g. Count(string, *roaring.Bitmap) (uint64, error) where inputs are column name and row filter, returns a count telling the number of rows filtered by input. Sum(string, *roaring.Bitmap) (int64, uint64, error) where inputs are column name and row filter, returns count of rows, sum of those counted rows, and an error if returned. Others are similar.)

func (*SegmentSummarizer) Count

func (s *SegmentSummarizer) Count(attr string, filter *roaring.Bitmap) (uint64, error)

func (*SegmentSummarizer) Max

func (s *SegmentSummarizer) Max(attr string, filter *roaring.Bitmap) (interface{}, error)

func (*SegmentSummarizer) Min

func (s *SegmentSummarizer) Min(attr string, filter *roaring.Bitmap) (interface{}, error)

func (*SegmentSummarizer) NullCount

func (s *SegmentSummarizer) NullCount(attr string, filter *roaring.Bitmap) (uint64, error)

func (*SegmentSummarizer) Sum

func (s *SegmentSummarizer) Sum(attr string, filter *roaring.Bitmap) (int64, uint64, error)

type Snapshoter

type Snapshoter interface {
	SSWriter
	SSLoader
}

type Splitter

type Splitter struct {
	// contains filtered or unexported fields
}

func NewSplitter

func NewSplitter(database *metadata.Database, newDBNames []string, rename RenameTableFactory,
	keys [][]byte, ctx []byte, index *LogIndex, dbImpl *DB) *Splitter

func (*Splitter) Close

func (splitter *Splitter) Close() error

func (*Splitter) Commit

func (splitter *Splitter) Commit() error

func (*Splitter) Prepare

func (splitter *Splitter) Prepare() error

func (*Splitter) ScheduleEvents

func (splitter *Splitter) ScheduleEvents(d *DB) error

type TableSchema

type TableSchema = metadata.Schema

type TxnCtx

type TxnCtx = metadata.TxnCtx

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL