Documentation ¶
Index ¶
- Constants
- Variables
- func CopyBlockFileToDestDir(file, srcDir, destDir string, idMapFn func(*common.ID) (*common.ID, error)) error
- func CopyDataFiles(tblks, blks, segs []string, srcDir, destDir string, ...) error
- func CopySegmentFileToDestDir(file, srcDir, destDir string, idMapFn func(*common.ID) (*common.ID, error)) error
- func CopyTBlockFileToDestDir(file, srcDir, destDir string, idMapFn func(*common.ID) (*common.ID, error)) error
- func NewDBSSLoader(catalog *metadata.Catalog, replaced *metadata.Database, tables *table.Tables, ...) *ssLoader
- func NewDBSSWriter(database *metadata.Database, dir string, tables *table.Tables) *ssWriter
- func NewReplayHandle(workDir string, catalog *metadata.Catalog, tables *table.Tables, ...) *replayHandle
- func NewSegmentFilter(s *Segment) engine.Filter
- func NewSegmentSparseFilter(s *Segment) aoe.SparseFilter
- func NewSegmentSummarizer(s *Segment) engine.Summarizer
- func ScanMigrationDir(path string) (metas []string, tblks []string, blks []string, segs []string, err error)
- type Block
- type ColDef
- type DB
- func (d *DB) AbortTxn(txn *TxnCtx) error
- func (d *DB) ApplySnapshot(dbName string, path string) error
- func (d *DB) Close() error
- func (d *DB) CommitTxn(txn *TxnCtx) error
- func (d *DB) CreateDatabaseInTxn(txn *TxnCtx, name string) (*metadata.Database, error)
- func (d *DB) CreateSnapshot(dbName string, path string, forcesync bool) (uint64, error)
- func (d *DB) CreateTableInTxn(txn *TxnCtx, dbName string, schema *TableSchema, indice *IndexSchema) (*metadata.Table, error)
- func (d *DB) DatabaseNames() []string
- func (d *DB) DoAppend(meta *metadata.Table, data *batch.Batch, index *shard.SliceIndex) error
- func (d *DB) DoCreateSnapshot(database *metadata.Database, path string, forcesync bool) (uint64, error)
- func (d *DB) DoFlushDatabase(meta *metadata.Database) error
- func (d *DB) DoFlushTable(meta *metadata.Table) error
- func (d *DB) FlushDatabase(dbName string) error
- func (d *DB) FlushTable(dbName, tableName string) error
- func (d *DB) ForceCompactCatalog() error
- func (d *DB) GetDBCheckpointId(dbName string) uint64
- func (d *DB) GetSegmentIds(dbName string, tableName string) (ids dbi.IDS)
- func (d *DB) GetShardCheckpointId(shardId uint64) uint64
- func (d *DB) GetSnapshot(ctx *dbi.GetSnapshotCtx) (*handle.Snapshot, error)
- func (d *DB) GetTableData(meta *metadata.Table) (tiface.ITableData, error)
- func (d *DB) GetTempDir() string
- func (d *DB) IsClosed() bool
- func (d *DB) MakeMutationHandle(meta *metadata.Table) (iface.MutationHandle, error)
- func (d *DB) ScheduleGCDatabase(database *metadata.Database)
- func (d *DB) ScheduleGCTable(meta *metadata.Table)
- func (d *DB) SpliteDatabaseCheck(dbName string, size uint64) (coarseSize uint64, coarseCount uint64, keys [][]byte, ctx []byte, err error)
- func (d *DB) StartTxn(index *metadata.LogIndex) *TxnCtx
- func (d *DB) TableIDs(dbName string) (ids []uint64, err error)
- func (d *DB) TableIdempotenceCheckAndIndexRewrite(meta *metadata.Table, index *LogIndex) (*LogIndex, error)
- func (d *DB) TableNames(dbName string) (names []string)
- type IReplayObserver
- type IndexId
- type IndexInfo
- type IndexSchema
- type IndexT
- type LogIndex
- type RenameTableFactory
- type SSLoader
- type SSWriter
- type Segment
- func (seg *Segment) Block(id string) aoe.Block
- func (seg *Segment) Blocks() []string
- func (seg *Segment) Cardinality(_ string) int64
- func (seg *Segment) ID() string
- func (seg *Segment) NewFilter() engine.Filter
- func (seg *Segment) NewSparseFilter() aoe.SparseFilter
- func (seg *Segment) NewSummarizer() engine.Summarizer
- func (seg *Segment) Rows() int64
- func (seg *Segment) Size(attr string) int64
- type SegmentFilter
- func (f *SegmentFilter) Btw(attr string, minv interface{}, maxv interface{}) (*roaring64.Bitmap, error)
- func (f *SegmentFilter) Eq(attr string, val interface{}) (*roaring64.Bitmap, error)
- func (f *SegmentFilter) Ge(attr string, val interface{}) (*roaring64.Bitmap, error)
- func (f *SegmentFilter) Gt(attr string, val interface{}) (*roaring64.Bitmap, error)
- func (f *SegmentFilter) Le(attr string, val interface{}) (*roaring64.Bitmap, error)
- func (f *SegmentFilter) Lt(attr string, val interface{}) (*roaring64.Bitmap, error)
- func (f *SegmentFilter) Ne(attr string, val interface{}) (*roaring64.Bitmap, error)
- type SegmentSparseFilter
- func (f *SegmentSparseFilter) Btw(attr string, minv interface{}, maxv interface{}) ([]string, error)
- func (f *SegmentSparseFilter) Eq(attr string, val interface{}) ([]string, error)
- func (f *SegmentSparseFilter) Ge(attr string, val interface{}) ([]string, error)
- func (f *SegmentSparseFilter) Gt(attr string, val interface{}) ([]string, error)
- func (f *SegmentSparseFilter) Le(attr string, val interface{}) ([]string, error)
- func (f *SegmentSparseFilter) Lt(attr string, val interface{}) ([]string, error)
- func (f *SegmentSparseFilter) Ne(attr string, val interface{}) ([]string, error)
- type SegmentSummarizer
- func (s *SegmentSummarizer) Count(attr string, filter *roaring.Bitmap) (uint64, error)
- func (s *SegmentSummarizer) Max(attr string, filter *roaring.Bitmap) (interface{}, error)
- func (s *SegmentSummarizer) Min(attr string, filter *roaring.Bitmap) (interface{}, error)
- func (s *SegmentSummarizer) NullCount(attr string, filter *roaring.Bitmap) (uint64, error)
- func (s *SegmentSummarizer) Sum(attr string, filter *roaring.Bitmap) (int64, uint64, error)
- type Snapshoter
- type Splitter
- type TableSchema
- type TxnCtx
Constants ¶
const (
LockName string = "AOE"
)
Variables ¶
var ( ErrClosed = errors.New("aoe: closed") ErrUnsupported = errors.New("aoe: unsupported") ErrNotFound = errors.New("aoe: notfound") ErrUnexpectedWalRole = errors.New("aoe: unexpected wal role setted") ErrTimeout = errors.New("aoe: timeout") ErrStaleErr = errors.New("aoe: stale") ErrIdempotence = metadata.ErrIdempotence ErrResourceDeleted = errors.New("aoe: resource is deleted") )
var ( DefaultFlushInterval = time.Duration(20) * time.Second DefaultNodeFlushInterval = time.Duration(120) * time.Second )
var ( CopyTableFn func(t iface.ITableData, destDir string) error CopyFileFn func(src, dest string) error )
Functions ¶
func CopyBlockFileToDestDir ¶
func CopyDataFiles ¶
func CopyTBlockFileToDestDir ¶
func NewDBSSLoader ¶
func NewDBSSLoader(catalog *metadata.Catalog, replaced *metadata.Database, tables *table.Tables, src string) *ssLoader
func NewDBSSWriter ¶
func NewDBSSWriter(database *metadata.Database, dir string, tables *table.Tables) *ssWriter
func NewReplayHandle ¶
func NewReplayHandle(workDir string, catalog *metadata.Catalog, tables *table.Tables, observer IReplayObserver) *replayHandle
func NewSegmentFilter ¶
func NewSegmentSparseFilter ¶
func NewSegmentSparseFilter(s *Segment) aoe.SparseFilter
func NewSegmentSummarizer ¶
func NewSegmentSummarizer(s *Segment) engine.Summarizer
Types ¶
type Block ¶
type Block struct { // Segment this block belongs to Host *Segment // uint64 representation of block id Id uint64 // string representation of block id StrId string }
Block is a high-level wrapper of the block type in memory. It only provides some essential interfaces used by computation layer.
func (*Block) Cardinality ¶
func (*Block) Prefetch ¶
Prefetch prefetches the given columns of this block. We use the readahead system call, so if the data has been already fetched in page cache, nothing would happen.
func (*Block) Read ¶
func (blk *Block) Read(cs []uint64, attrs []string, compressed []*bytes.Buffer, deCompressed []*bytes.Buffer) (*batch.Batch, error)
Read reads the given columns into memory, and build a pipeline batch with the given reference count used by computation layer. For memory reuse, we pass two buffer array down and all the temp usage of memory would be taken in those buffers. (e.g. decompress)
type DB ¶
type DB struct { // Working directory of DB Dir string // Basic options of DB Opts *storage.Options // FsMgr manages all file related usages including virtual file. FsMgr base.IManager // IndexBufMgr manages all segment/block indices in memory. IndexBufMgr bmgrif.IBufferManager // Those two managers not used currently. MTBufMgr bmgrif.IBufferManager SSTBufMgr bmgrif.IBufferManager // MutationBufMgr is a replacement for MTBufMgr MutationBufMgr bb.INodeManager Wal wal.ShardAwareWal FlushDriver flusher.Driver TimedFlusher wb.IHeartbeater // Internal data storage of DB. Store struct { Mu *sync.RWMutex Catalog *metadata.Catalog DataTables *table.Tables } DataDir *os.File DBLocker io.Closer // Scheduler schedules all the events happening like flush segment, drop table, etc. Scheduler sched.Scheduler Closed *atomic.Value ClosedC chan struct{} }
func OpenWithWalBroker ¶
func (*DB) ApplySnapshot ¶
ApplySnapshot applies a snapshot of the shard stored in `path` to engine atomically.
func (*DB) CreateDatabaseInTxn ¶
func (*DB) CreateSnapshot ¶
There is a premise here, that is, all mutation requests of a database are single-threaded
func (*DB) CreateTableInTxn ¶
func (d *DB) CreateTableInTxn(txn *TxnCtx, dbName string, schema *TableSchema, indice *IndexSchema) (*metadata.Table, error)
func (*DB) DatabaseNames ¶
func (*DB) DoCreateSnapshot ¶
func (*DB) DoFlushDatabase ¶
func (*DB) DoFlushTable ¶
func (*DB) FlushDatabase ¶
func (*DB) FlushTable ¶
func (*DB) ForceCompactCatalog ¶
func (*DB) GetDBCheckpointId ¶
func (*DB) GetSegmentIds ¶
func (*DB) GetShardCheckpointId ¶
func (*DB) GetSnapshot ¶
func (*DB) GetTableData ¶
func (d *DB) GetTableData(meta *metadata.Table) (tiface.ITableData, error)
func (*DB) GetTempDir ¶
func (*DB) MakeMutationHandle ¶
func (d *DB) MakeMutationHandle(meta *metadata.Table) (iface.MutationHandle, error)
func (*DB) ScheduleGCDatabase ¶
func (d *DB) ScheduleGCDatabase(database *metadata.Database)
func (*DB) ScheduleGCTable ¶
func (d *DB) ScheduleGCTable(meta *metadata.Table)
func (*DB) SpliteDatabaseCheck ¶
func (*DB) StartTxn ¶
FIXME: start txn should not accept log index. For create database, the index is comfirmed until then end
func (*DB) TableIdempotenceCheckAndIndexRewrite ¶
func (*DB) TableNames ¶
type IReplayObserver ¶
type IReplayObserver interface {
OnRemove(string)
}
type IndexSchema ¶
type IndexSchema = metadata.IndexSchema
type RenameTableFactory ¶
type RenameTableFactory = metadata.RenameTableFactory
type Segment ¶
Segment is a high-level wrapper of the segment type in memory. It only provides some essential interfaces used by computation layer.
func (*Segment) Cardinality ¶
func (*Segment) NewSparseFilter ¶
func (seg *Segment) NewSparseFilter() aoe.SparseFilter
NewSparseFilter generates a SparseFilter for segment.
func (*Segment) NewSummarizer ¶
func (seg *Segment) NewSummarizer() engine.Summarizer
NewSummarizer generates a Summarizer for segment.
type SegmentFilter ¶
type SegmentFilter struct {
// contains filtered or unexported fields
}
SegmentFilter provides segment-level & dense interfaces with bitmap support. (e.g. Eq(string, interface{}) (*roaring.Bitmap, error) where inputs are column name and value, returns a bitmap telling which rows have the same value.)
func (*SegmentFilter) Btw ¶
func (f *SegmentFilter) Btw(attr string, minv interface{}, maxv interface{}) (*roaring64.Bitmap, error)
func (*SegmentFilter) Eq ¶
func (f *SegmentFilter) Eq(attr string, val interface{}) (*roaring64.Bitmap, error)
func (*SegmentFilter) Ge ¶
func (f *SegmentFilter) Ge(attr string, val interface{}) (*roaring64.Bitmap, error)
func (*SegmentFilter) Gt ¶
func (f *SegmentFilter) Gt(attr string, val interface{}) (*roaring64.Bitmap, error)
func (*SegmentFilter) Le ¶
func (f *SegmentFilter) Le(attr string, val interface{}) (*roaring64.Bitmap, error)
type SegmentSparseFilter ¶
type SegmentSparseFilter struct {
// contains filtered or unexported fields
}
SegmentSparseFilter provides segment-level & sparse interfaces with bitmap support. (e.g. Eq(string, interface{}) ([]string, error) where inputs are column name and value, returns a string array telling which blocks *might* have the same value.)
func (*SegmentSparseFilter) Btw ¶
func (f *SegmentSparseFilter) Btw(attr string, minv interface{}, maxv interface{}) ([]string, error)
func (*SegmentSparseFilter) Eq ¶
func (f *SegmentSparseFilter) Eq(attr string, val interface{}) ([]string, error)
func (*SegmentSparseFilter) Ge ¶
func (f *SegmentSparseFilter) Ge(attr string, val interface{}) ([]string, error)
func (*SegmentSparseFilter) Gt ¶
func (f *SegmentSparseFilter) Gt(attr string, val interface{}) ([]string, error)
func (*SegmentSparseFilter) Le ¶
func (f *SegmentSparseFilter) Le(attr string, val interface{}) ([]string, error)
type SegmentSummarizer ¶
type SegmentSummarizer struct {
// contains filtered or unexported fields
}
SegmentSummarizer provides segment-level aggregations with bitmap support. (e.g. Count(string, *roaring.Bitmap) (uint64, error) where inputs are column name and row filter, returns a count telling the number of rows filtered by input. Sum(string, *roaring.Bitmap) (int64, uint64, error) where inputs are column name and row filter, returns count of rows, sum of those counted rows, and an error if returned. Others are similar.)
func (*SegmentSummarizer) Max ¶
func (s *SegmentSummarizer) Max(attr string, filter *roaring.Bitmap) (interface{}, error)
func (*SegmentSummarizer) Min ¶
func (s *SegmentSummarizer) Min(attr string, filter *roaring.Bitmap) (interface{}, error)
type Snapshoter ¶
type Splitter ¶
type Splitter struct {
// contains filtered or unexported fields
}
func NewSplitter ¶
func (*Splitter) ScheduleEvents ¶
type TableSchema ¶
type TableSchema = metadata.Schema