Documentation ¶
Overview ¶
Package memstore has to put test factory here since otherwise we will have a memstore -> utils -> memstore import cycle.
Index ¶
- Constants
- func CalculateVectorBytes(dataType common.DataType, size int) int
- func CalculateVectorPartyBytes(dataType common.DataType, size int, hasNulls bool, hasCounts bool) int
- func GetPrimaryKeyBytes(primaryKeyValues []common.DataValue, key []byte) error
- func MarshalPrimaryKey(pk PrimaryKey) ([]byte, error)
- func NewHostMemoryManager(memStore *memStoreImpl, totalMemorySize int64) common.HostMemoryManager
- func NewLiveVectorParty(length int, dataType common.DataType, defaultValue common.DataValue, ...) common.LiveVectorParty
- func NewVectorPartyArchiveSerializer(hostMemManager common.HostMemoryManager, diskStore diskstore.DiskStore, ...) common.VectorPartySerializer
- func NewVectorPartySnapshotSerializer(shard *TableShard, columnID, batchID int, batchVersion uint32, seqNum uint32, ...) common.VectorPartySerializer
- func UnpinVectorParties(requestedVPs []common.ArchiveVectorParty)
- func VectorPartyEquals(v1 common.VectorParty, v2 common.VectorParty) bool
- type ArchiveBatch
- func (b *ArchiveBatch) BlockingDelete(columnID int) common.ArchiveVectorParty
- func (b *ArchiveBatch) BuildIndex(sortColumns []int, primaryKeyColumns []int, pk PrimaryKey) error
- func (b *ArchiveBatch) Clone() *ArchiveBatch
- func (b *ArchiveBatch) MarshalJSON() ([]byte, error)
- func (b *ArchiveBatch) RequestVectorParty(columnID int) common.ArchiveVectorParty
- func (b *ArchiveBatch) TryEvict(columnID int) common.ArchiveVectorParty
- func (b *ArchiveBatch) WriteToDisk() error
- type ArchiveJobDetail
- type ArchiveJobDetailMutator
- type ArchiveJobDetailReporter
- type ArchiveStore
- type ArchiveStoreVersion
- type ArchivingJob
- type ArchivingStage
- type BackfillJob
- type BackfillJobDetail
- type BackfillJobDetailMutator
- type BackfillJobDetailReporter
- type BackfillManager
- func (r *BackfillManager) Append(upsertBatch *UpsertBatch, redoFile int64, batchOffset uint32) bool
- func (r *BackfillManager) Destruct()
- func (r *BackfillManager) Done(currentRedoFile int64, currentBatchOffset uint32, ...) error
- func (r *BackfillManager) GetLatestRedoFileAndOffset() (int64, uint32)
- func (r *BackfillManager) MarshalJSON() ([]byte, error)
- func (r *BackfillManager) QualifyToTriggerBackfill() bool
- func (r *BackfillManager) ReadUpsertBatch(index, start, length int, schema *TableSchema) (data [][]interface{}, columnNames []string, err error)
- func (r *BackfillManager) StartBackfill() ([]*UpsertBatch, int64, uint32)
- func (r *BackfillManager) WaitForBackfillBufferAvailability()
- type BackfillStage
- type Batch
- func (b *Batch) Equals(other *Batch) bool
- func (b *Batch) GetDataValue(row, columnID int) common.DataValue
- func (b *Batch) GetDataValueWithDefault(row, columnID int, defaultValue common.DataValue) common.DataValue
- func (b *Batch) GetVectorParty(columnID int) common.VectorParty
- func (b *Batch) ReplaceVectorParty(columnID int, vp common.VectorParty)
- func (b *Batch) SafeDestruct()
- type BatchReader
- type BatchStatsReporter
- type CuckooIndex
- func (c *CuckooIndex) AllocatedBytes() uint
- func (c *CuckooIndex) Capacity() uint
- func (c *CuckooIndex) Delete(key Key)
- func (c *CuckooIndex) Destruct()
- func (c *CuckooIndex) Find(key Key) (RecordID, bool)
- func (c *CuckooIndex) FindOrInsert(key Key, value RecordID, eventTime uint32) (existingFound bool, recordID RecordID, err error)
- func (c *CuckooIndex) GetEventTimeCutoff() uint32
- func (c *CuckooIndex) LockForTransfer() PrimaryKeyData
- func (c *CuckooIndex) Size() uint
- func (c *CuckooIndex) UnlockAfterTransfer()
- func (c *CuckooIndex) Update(key Key, value RecordID) bool
- func (c *CuckooIndex) UpdateEventTimeCutoff(cutoff uint32)
- type EnumDict
- type Job
- type JobDetail
- type JobStatus
- type Key
- type LiveBatch
- type LiveStore
- func (s *LiveStore) AdvanceLastReadRecord()
- func (s *LiveStore) AdvanceNextWriteRecord() RecordID
- func (s *LiveStore) Destruct()
- func (s *LiveStore) GetBatchForRead(id int32) *LiveBatch
- func (s *LiveStore) GetBatchForWrite(id int32) *LiveBatch
- func (s *LiveStore) GetBatchIDs() (batchIDs []int32, numRecordsInLastBatch int)
- func (s *LiveStore) GetMemoryUsageForColumn(valueType common.DataType, columnID int) int
- func (s *LiveStore) LookupKey(keyStrs []string) (RecordID, bool)
- func (s *LiveStore) MarshalJSON() ([]byte, error)
- func (s *LiveStore) PurgeBatch(id int32)
- func (s *LiveStore) PurgeBatches(ids []int32)
- type MemStore
- type PrimaryKey
- type PrimaryKeyData
- type PurgeJob
- type PurgeJobDetail
- type PurgeJobDetailMutator
- type PurgeJobDetailReporter
- type PurgeManager
- type PurgeStage
- type RecordID
- type RedoLogBrowser
- type RedoLogManager
- func (r *RedoLogManager) Close()
- func (r *RedoLogManager) MarshalJSON() ([]byte, error)
- func (r *RedoLogManager) NextUpsertBatch() func() (*UpsertBatch, int64, uint32)
- func (r *RedoLogManager) PurgeRedologFileAndData(cutoff uint32, redoFileCheckpointed int64, batchOffset uint32) error
- func (r *RedoLogManager) UpdateBatchCount(redoFile int64) uint32
- func (r *RedoLogManager) UpdateMaxEventTime(eventTime uint32, redoFile int64)
- func (r *RedoLogManager) WriteUpsertBatch(upsertBatch *UpsertBatch) (int64, uint32)
- type Scheduler
- type SnapshotJob
- type SnapshotJobDetail
- type SnapshotJobDetailMutator
- type SnapshotJobDetailReporter
- type SnapshotManager
- func (s *SnapshotManager) ApplyUpsertBatch(redoFile int64, offset uint32, numMutations int, currentRecord RecordID)
- func (s *SnapshotManager) Done(currentRedoFile int64, currentBatchOffset uint32, lastNumMutations int, ...) error
- func (s *SnapshotManager) GetLastSnapshotInfo() (int64, uint32, time.Time, RecordID)
- func (s *SnapshotManager) MarshalJSON() ([]byte, error)
- func (s *SnapshotManager) QualifyForSnapshot() bool
- func (s *SnapshotManager) SetLastSnapshotInfo(redoLogFile int64, offset uint32, record RecordID)
- func (s *SnapshotManager) StartSnapshot() (int64, uint32, int, RecordID)
- type SnapshotStage
- type TableSchema
- func (t *TableSchema) GetArchivingSortColumns() []int
- func (t *TableSchema) GetColumnDeletions() []bool
- func (t *TableSchema) GetPrimaryKeyColumns() []int
- func (t *TableSchema) GetValueTypeByColumn() []memCom.DataType
- func (t *TableSchema) MarshalJSON() ([]byte, error)
- func (t *TableSchema) SetDefaultValue(columnID int)
- func (t *TableSchema) SetTable(table *metaCom.Table)
- type TableShard
- func (shard *TableShard) ApplyUpsertBatch(upsertBatch *UpsertBatch, redoLogFile int64, offset uint32, ...) (bool, error)
- func (shard *TableShard) DeleteColumn(columnID int) error
- func (shard *TableShard) Destruct()
- func (shard *TableShard) LoadMetaData()
- func (shard *TableShard) LoadSnapshot() error
- func (shard *TableShard) NewRedoLogBrowser() RedoLogBrowser
- func (shard *TableShard) PreloadColumn(columnID int, startDay int, endDay int)
- func (shard *TableShard) ReplayRedoLogs()
- type TableShardMemoryUsage
- type TestFactoryT
- func (t TestFactoryT) NewMockMemStore() *memStoreImpl
- func (t TestFactoryT) ReadArchiveBatch(name string) (*Batch, error)
- func (t TestFactoryT) ReadArchiveVectorParty(name string, locker sync.Locker) (*archiveVectorParty, error)
- func (t TestFactoryT) ReadBatch(name string) (*Batch, error)
- func (t TestFactoryT) ReadLiveBatch(name string) (*Batch, error)
- func (t TestFactoryT) ReadLiveVectorParty(name string) (*cLiveVectorParty, error)
- func (t TestFactoryT) ReadUpsertBatch(name string) (*UpsertBatch, error)
- func (t TestFactoryT) ReadVector(name string) (*Vector, error)
- func (t TestFactoryT) ReadVectorParty(name string) (*cVectorParty, error)
- type TransferableVectorParty
- type UpsertBatch
- func (u *UpsertBatch) ExtractBackfillBatch(backfillRows []int) *UpsertBatch
- func (u *UpsertBatch) GetBool(row int, col int) (bool, bool, error)
- func (u *UpsertBatch) GetBuffer() []byte
- func (u *UpsertBatch) GetColumnID(col int) (int, error)
- func (u *UpsertBatch) GetColumnIndex(columnID int) (int, error)
- func (u *UpsertBatch) GetColumnNames(schema *TableSchema) ([]string, error)
- func (u *UpsertBatch) GetColumnType(col int) (common.DataType, error)
- func (u *UpsertBatch) GetDataValue(row, col int) (common.DataValue, error)
- func (u *UpsertBatch) GetEventColumnIndex() int
- func (u *UpsertBatch) GetPrimaryKeyBytes(row int, primaryKeyCols []int, key []byte) error
- func (u *UpsertBatch) GetPrimaryKeyCols(primaryKeyColumnIDs []int) ([]int, error)
- func (u *UpsertBatch) GetValue(row int, col int) (unsafe.Pointer, bool, error)
- func (u *UpsertBatch) ReadData(start int, length int) ([][]interface{}, error)
- type Vector
- func (v *Vector) Buffer() unsafe.Pointer
- func (v *Vector) CheckAllValid() bool
- func (v *Vector) GetBool(index int) bool
- func (v *Vector) GetSliceBytesAligned(lowerBound int, upperBound int) (buffer unsafe.Pointer, startIndex int, bytes int)
- func (v *Vector) GetValue(index int) unsafe.Pointer
- func (v *Vector) LowerBound(first int, last int, value unsafe.Pointer) int
- func (v *Vector) SafeDestruct()
- func (v *Vector) SetAllValid()
- func (v *Vector) SetBool(index int, value bool)
- func (v *Vector) SetValue(index int, data unsafe.Pointer)
- func (v *Vector) UpperBound(first int, last int, value unsafe.Pointer) int
Constants ¶
const ( // IgnoreCount skip setting value counts. IgnoreCount common.ValueCountsUpdateMode = iota // IncrementCount only increment count. IncrementCount // CheckExistingCount also check existing count. CheckExistingCount )
const BaseBatchID = int32(math.MinInt32)
BaseBatchID is the starting id of all batches.
const UpsertHeader uint32 = 0xADDAFEED
UpsertHeader is the magic header written into the beginning of each redo log file.
const VectorPartyHeader uint32 = 0xFADEFACE
VectorPartyHeader is the magic header written into the beginning of each vector party file.
Variables ¶
This section is empty.
Functions ¶
func CalculateVectorBytes ¶
CalculateVectorBytes calculates bytes the vector will occupy given data type and size without actual allocation.
func CalculateVectorPartyBytes ¶
func CalculateVectorPartyBytes(dataType common.DataType, size int, hasNulls bool, hasCounts bool) int
CalculateVectorPartyBytes calculates bytes the vector party will occupy. Note: data type supported in go memory will report memory usage when value is actually set, therefore report 0 here
func GetPrimaryKeyBytes ¶
GetPrimaryKeyBytes returns primary key bytes for a given row.
func MarshalPrimaryKey ¶
func MarshalPrimaryKey(pk PrimaryKey) ([]byte, error)
MarshalPrimaryKey marshals a PrimaryKey into json. We cannot define MarshalJson for PrimaryKey since pointer cannot be a receiver.
func NewHostMemoryManager ¶
func NewHostMemoryManager(memStore *memStoreImpl, totalMemorySize int64) common.HostMemoryManager
NewHostMemoryManager is used to init a HostMemoryManager.
func NewLiveVectorParty ¶
func NewLiveVectorParty(length int, dataType common.DataType, defaultValue common.DataValue, hostMemoryManager common.HostMemoryManager) common.LiveVectorParty
NewLiveVectorParty creates LiveVectorParty
func NewVectorPartyArchiveSerializer ¶
func NewVectorPartyArchiveSerializer(hostMemManager common.HostMemoryManager, diskStore diskstore.DiskStore, table string, shardID int, columnID int, batchID int, batchVersion uint32, seqNum uint32) common.VectorPartySerializer
NewVectorPartyArchiveSerializer returns a new VectorPartySerializer
func NewVectorPartySnapshotSerializer ¶
func NewVectorPartySnapshotSerializer( shard *TableShard, columnID, batchID int, batchVersion uint32, seqNum uint32, redoLogFile int64, offset uint32) common.VectorPartySerializer
NewVectorPartySnapshotSerializer returns a new VectorPartySerializer
func UnpinVectorParties ¶
func UnpinVectorParties(requestedVPs []common.ArchiveVectorParty)
UnpinVectorParties unpins all vector parties in the slice.
func VectorPartyEquals ¶
func VectorPartyEquals(v1 common.VectorParty, v2 common.VectorParty) bool
VectorPartyEquals covers nil VectorParty compare
Types ¶
type ArchiveBatch ¶
type ArchiveBatch struct { Batch // Size of the batch (number of rows). Notice that compression changes the // length of some columns, does not change the size of the batch. Size int // Version for archive batches. Version uint32 // SeqNum denotes backfill sequence number SeqNum uint32 // For convenience. BatchID int32 Shard *TableShard }
ArchiveBatch represents a archive batch.
func (*ArchiveBatch) BlockingDelete ¶
func (b *ArchiveBatch) BlockingDelete(columnID int) common.ArchiveVectorParty
BlockingDelete blocks until all users are finished with the specified column, and then deletes the column from the batch. Returns the vector party deleted if any.
func (*ArchiveBatch) BuildIndex ¶
func (b *ArchiveBatch) BuildIndex(sortColumns []int, primaryKeyColumns []int, pk PrimaryKey) error
BuildIndex builds an index over the primary key columns of this archive batch and inserts the records id into the given primary key.
func (*ArchiveBatch) Clone ¶
func (b *ArchiveBatch) Clone() *ArchiveBatch
Clone returns a copy of current batch including all references to underlying vector parties. Caller is responsible for holding the lock (if necessary).
func (*ArchiveBatch) MarshalJSON ¶
func (b *ArchiveBatch) MarshalJSON() ([]byte, error)
MarshalJSON marshals a ArchiveBatch into json.
func (*ArchiveBatch) RequestVectorParty ¶
func (b *ArchiveBatch) RequestVectorParty(columnID int) common.ArchiveVectorParty
RequestVectorParty creates(optional), pins, and returns the requested vector party. On creation it also asynchronously loads a vector party from disk into memory.
Caller must call vp.WaitForDiskLoad() before using it, and call vp.Release() afterwards.
func (*ArchiveBatch) TryEvict ¶
func (b *ArchiveBatch) TryEvict(columnID int) common.ArchiveVectorParty
TryEvict attempts to evict and destruct the specified column from the archive batch. It will fail fast if the column is currently in use so that host memory manager can try evicting other VPs immediately. Returns vector party evicted if succeeded.
func (*ArchiveBatch) WriteToDisk ¶
func (b *ArchiveBatch) WriteToDisk() error
WriteToDisk writes each column of a batch to disk. It happens on archiving stage for merged archive batch so there is no need to lock it.
type ArchiveJobDetail ¶
type ArchiveJobDetail struct { JobDetail // Current cutoff. CurrentCutoff uint32 `json:"currentCutoff"` // Stage of the job is running. Stage ArchivingStage `json:"stage"` // New Cutoff. RunningCutoff uint32 `json:"runningCutoff"` // Cutoff of last completed archiving job. LastCutoff uint32 `json:"lastCutoff"` }
ArchiveJobDetail represents archiving job status of a table Shard.
type ArchiveJobDetailMutator ¶
type ArchiveJobDetailMutator func(jobDetail *ArchiveJobDetail)
ArchiveJobDetailMutator is the mutator functor to change ArchiveJobDetail.
type ArchiveJobDetailReporter ¶
type ArchiveJobDetailReporter func(key string, mutator ArchiveJobDetailMutator)
ArchiveJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.
type ArchiveStore ¶
type ArchiveStore struct { // The mutex protects the pointer pointing to the current version of archived vector version. sync.RWMutex PurgeManager *PurgeManager // Current version points to the most recent version of vector store version for queries to use. CurrentVersion *ArchiveStoreVersion }
ArchiveStore manages archive stores versions. Archive store version evolves to a new version after archiving. Readers should follow the following locking protocol:
archiveStore.Users.Add(1) // tableShard.ArchiveStore can no longer be accessed directly. // continue reading from archiveStore archiveStore.Users.Done()
func NewArchiveStore ¶
func NewArchiveStore(shard *TableShard) ArchiveStore
NewArchiveStore creates a new archive store. Current version is just a place holder for test. It will be replaced during recovery.
func (*ArchiveStore) Destruct ¶
func (s *ArchiveStore) Destruct()
Destruct deletes all vectors allocated in C. Caller must detach the Shard first and wait until all users are finished.
func (*ArchiveStore) GetCurrentVersion ¶
func (s *ArchiveStore) GetCurrentVersion() *ArchiveStoreVersion
GetCurrentVersion returns current SortedVectorStoreVersion and does proper locking. It'v used by query and data browsing. Users need to call version.Users.Done() after their work.
func (*ArchiveStore) MarshalJSON ¶
func (s *ArchiveStore) MarshalJSON() ([]byte, error)
MarshalJSON marshals a ArchiveStore into json.
type ArchiveStoreVersion ¶
type ArchiveStoreVersion struct { // The mutex // protects the Batches map structure and the archiving cutoff field. // It does not protect contents within a batch. Before releasing // the VectorStore mutex, user should lock the batch level mutex if necessary // to ensure proper protection at batch level. sync.RWMutex `json:"-"` // Wait group used to prevent this ArchiveStore from being evicted Users sync.WaitGroup `json:"-"` // Each batch in the slice is identified by BaseBatchID+index. // Index out of bound and nil Batch for archive batches indicates that none of // the columns have been loaded into memory from disk. Batches map[int32]*ArchiveBatch `json:"batches"` // The archiving cutoff used for this version of the sorted store. ArchivingCutoff uint32 `json:"archivingCutoff"` // contains filtered or unexported fields }
ArchiveStoreVersion stores a version of archive batches of columnar data.
func NewArchiveStoreVersion ¶
func NewArchiveStoreVersion(cutoff uint32, shard *TableShard) *ArchiveStoreVersion
NewArchiveStoreVersion creates a new empty archive store version given cutoff.
func (*ArchiveStoreVersion) GetBatchForRead ¶
func (v *ArchiveStoreVersion) GetBatchForRead(batchID int) *ArchiveBatch
GetBatchForRead returns a archiveBatch for read, reader needs to unlock after use
func (*ArchiveStoreVersion) MarshalJSON ¶
func (v *ArchiveStoreVersion) MarshalJSON() ([]byte, error)
MarshalJSON marshals a ArchiveStoreVersion into json.
func (*ArchiveStoreVersion) RequestBatch ¶
func (v *ArchiveStoreVersion) RequestBatch(batchID int32) *ArchiveBatch
RequestBatch returns the requested archive batch from the archive store version.
type ArchivingJob ¶
type ArchivingJob struct {
// contains filtered or unexported fields
}
ArchivingJob defines the structure that an archiving job needs.
func (*ArchivingJob) GetIdentifier ¶
func (job *ArchivingJob) GetIdentifier() string
GetIdentifier returns a unique identifier of this job.
func (*ArchivingJob) Run ¶
func (job *ArchivingJob) Run() error
Run starts the archiving process and wait for it to finish.
func (*ArchivingJob) String ¶
func (job *ArchivingJob) String() string
String gives meaningful string representation for this job
type ArchivingStage ¶
type ArchivingStage string
ArchivingStage represents different stages of a running archive job.
const ( ArchivingCreatePatch ArchivingStage = "create patch" ArchivingMerge ArchivingStage = "merge" ArchivingPurge ArchivingStage = "purge" ArchivingComplete ArchivingStage = "complete" )
List of ArchivingStages.
type BackfillJob ¶
type BackfillJob struct {
// contains filtered or unexported fields
}
BackfillJob defines the structure that a backfill job needs.
func (*BackfillJob) GetIdentifier ¶
func (job *BackfillJob) GetIdentifier() string
GetIdentifier returns a unique identifier of this job.
func (*BackfillJob) Run ¶
func (job *BackfillJob) Run() error
Run starts the backfill process and wait for it to finish.
func (*BackfillJob) String ¶
func (job *BackfillJob) String() string
String gives meaningful string representation for this job
type BackfillJobDetail ¶
type BackfillJobDetail struct { JobDetail // Stage of the job is running. Stage BackfillStage `json:"stage"` // Current redolog file that's being backfilled. RedologFile int64 `json:"redologFile"` // Batch offset within the RedologFile. BatchOffset uint32 `json:"batchOffset"` }
BackfillJobDetail represents backfill job status of a table Shard.
type BackfillJobDetailMutator ¶
type BackfillJobDetailMutator func(jobDetail *BackfillJobDetail)
BackfillJobDetailMutator is the mutator functor to change BackfillJobDetail.
type BackfillJobDetailReporter ¶
type BackfillJobDetailReporter func(key string, mutator BackfillJobDetailMutator)
BackfillJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.
type BackfillManager ¶
type BackfillManager struct { sync.RWMutex `json:"-"` // Name of the table. TableName string `json:"-"` // The shard id of the table. Shard int `json:"-"` // queue to hold UpsertBatches to backfill UpsertBatches []*UpsertBatch `json:"-"` // keep track of the number of records in backfill queue NumRecords int `json:"numRecords"` // keep track of the size of the buffer that holds batches to be backfilled CurrentBufferSize int64 `json:"currentBufferSize"` // keep track of the size of the buffer that holds batches being backfilled BackfillingBufferSize int64 `json:"backfillingBufferSize"` // max buffer size to hold backfill data MaxBufferSize int64 `json:"maxBufferSize"` // threshold to trigger archive BackfillThresholdInBytes int64 `json:"backfillThresholdInBytes"` // keep track of the redo log file of the last batch backfilled LastRedoFile int64 `json:"lastRedoFile"` // keep track of the offset of the last batch backfilled LastBatchOffset uint32 `json:"lastBatchOffset"` // keep track of the redo log file of the last batch queued CurrentRedoFile int64 `json:"currentRedoFile"` // keep track of the offset of the last batch being queued CurrentBatchOffset uint32 `json:"currentBatchOffset"` AppendCond *sync.Cond `json:"-"` }
BackfillManager manages the records that need to be put into a backfill queue and merged with sorted batches directly.
func NewBackfillManager ¶
func NewBackfillManager(tableName string, shard int, tableConfig metaCom.TableConfig) *BackfillManager
NewBackfillManager creates a new BackfillManager instance.
func (*BackfillManager) Append ¶
func (r *BackfillManager) Append(upsertBatch *UpsertBatch, redoFile int64, batchOffset uint32) bool
Append appends an upsert batch into the backfill queue. Returns true if buffer limit has been reached and caller may need to wait
func (*BackfillManager) Destruct ¶
func (r *BackfillManager) Destruct()
Destruct set the golang object references used by backfill manager to be nil to trigger gc ealier.
func (*BackfillManager) Done ¶
func (r *BackfillManager) Done(currentRedoFile int64, currentBatchOffset uint32, metaStore metastore.MetaStore) error
Done updates the backfill progress both in memory and in metastore.
func (*BackfillManager) GetLatestRedoFileAndOffset ¶
func (r *BackfillManager) GetLatestRedoFileAndOffset() (int64, uint32)
GetLatestRedoFileAndOffset returns latest redofile and its batch offset
func (*BackfillManager) MarshalJSON ¶
func (r *BackfillManager) MarshalJSON() ([]byte, error)
MarshalJSON marshals a BackfillManager into json.
func (*BackfillManager) QualifyToTriggerBackfill ¶
func (r *BackfillManager) QualifyToTriggerBackfill() bool
QualifyToTriggerBackfill decides if OK to trigger size-based backfill process
func (*BackfillManager) ReadUpsertBatch ¶
func (r *BackfillManager) ReadUpsertBatch(index, start, length int, schema *TableSchema) (data [][]interface{}, columnNames []string, err error)
ReadUpsertBatch reads upsert batch in backfill queue, user should not lock schema
func (*BackfillManager) StartBackfill ¶
func (r *BackfillManager) StartBackfill() ([]*UpsertBatch, int64, uint32)
StartBackfill gets a slice of UpsertBatches from backfill queue and returns the CurrentRedoFile and CurrentBatchOffset.
func (*BackfillManager) WaitForBackfillBufferAvailability ¶
func (r *BackfillManager) WaitForBackfillBufferAvailability()
WaitForBackfillBufferAvailability blocks until backfill buffer is available
type BackfillStage ¶
type BackfillStage string
BackfillStage represents different stages of a running backfill job.
const ( BackfillCreatePatch BackfillStage = "create patch" BackfillApplyPatch BackfillStage = "apply patch" BackfillPurge BackfillStage = "purge" BackfillComplete BackfillStage = "complete" )
List of BackfillStages.
type Batch ¶
type Batch struct { // Batch mutex is locked in reader mode by queries during the entire transfer // to ensure row level consistent read. It is locked in writer mode only for // updates from ingestion, and for modifications to the columns slice itself // (e.g., adding new columns). Appends will update LastReadBatchID and // NumRecordsInLastWriteBatch to make newly added records visible only at the last // step, therefore the batch does not need to be locked for appends. // For sorted bathes this is also locked in writer mode for initiating loading // from disk (vector party creation, Loader/Users initialization), and for // vector party detaching during eviction. sync.RWMutex // For live batches, index out of bound and nil VectorParty indicates // mode 0 for the corresponding VectorParty. // For archive batches, index out of bound and nil VectorParty indicates that // the corresponding VectorParty has not been loaded into memory from disk. Columns []common.VectorParty }
Batch represents a sorted or live batch.
func (*Batch) Equals ¶
Equals check whether two batches are the same. Notes both batches should have all its columns loaded into memory before comparison. Therefore this function should be only called for unit test purpose.
func (*Batch) GetDataValue ¶
GetDataValue read value from underlying columns.
func (*Batch) GetDataValueWithDefault ¶
func (b *Batch) GetDataValueWithDefault(row, columnID int, defaultValue common.DataValue) common.DataValue
GetDataValueWithDefault read value from underlying columns and if it's missing, it will return passed value instead.
func (*Batch) GetVectorParty ¶
func (b *Batch) GetVectorParty(columnID int) common.VectorParty
GetVectorParty returns the VectorParty for the specified column from the batch. It requires the batch to be locked for reading.
func (*Batch) ReplaceVectorParty ¶
func (b *Batch) ReplaceVectorParty(columnID int, vp common.VectorParty)
ReplaceVectorParty replaces the VectorParty for the specified column in the archive batch. Existing copies will be destructed. vp can be specified as nil to purge the existing copy. It requires the batch to be locked for writing.
func (*Batch) SafeDestruct ¶
func (b *Batch) SafeDestruct()
SafeDestruct destructs all vector parties of this batch.
type BatchReader ¶
type BatchReader interface { GetDataValue(row, columnID int) common.DataValue GetDataValueWithDefault(row, columnID int, defaultValue common.DataValue) common.DataValue }
BatchReader defines the interface to retrieve a DataValue given a row index and column index.
type BatchStatsReporter ¶
type BatchStatsReporter struct {
// contains filtered or unexported fields
}
BatchStatsReporter is used to report batch level stats like row count
func NewBatchStatsReporter ¶
func NewBatchStatsReporter(intervalInSeconds int, memStore MemStore, metaStore metastore.MetaStore) *BatchStatsReporter
NewBatchStatsReporter create a new BatchStatsReporter instance
func (*BatchStatsReporter) Run ¶
func (batchStats *BatchStatsReporter) Run()
Run is a ticker function to run report periodically
func (*BatchStatsReporter) Stop ¶
func (batchStats *BatchStatsReporter) Stop()
Stop to stop the stats reporter
type CuckooIndex ¶
type CuckooIndex struct {
// contains filtered or unexported fields
}
CuckooIndex is a implementation of Hash Index using Cuckoo Hashing algorithm Lazy expiration is used to invalidate expired items CuckooIndex is not threadsafe
func (*CuckooIndex) AllocatedBytes ¶
func (c *CuckooIndex) AllocatedBytes() uint
AllocatedBytes returns the allocated size of primary key in bytes.
func (*CuckooIndex) Capacity ¶
func (c *CuckooIndex) Capacity() uint
Capacity returns how many items current primary key can hold.
func (*CuckooIndex) Delete ¶
func (c *CuckooIndex) Delete(key Key)
Delete will delete a item with given key
func (*CuckooIndex) Find ¶
func (c *CuckooIndex) Find(key Key) (RecordID, bool)
Find looks up a record given key
func (*CuckooIndex) FindOrInsert ¶
func (c *CuckooIndex) FindOrInsert(key Key, value RecordID, eventTime uint32) (existingFound bool, recordID RecordID, err error)
FindOrInsert find the existing key or insert a new (key, value) pair
func (*CuckooIndex) GetEventTimeCutoff ¶
func (c *CuckooIndex) GetEventTimeCutoff() uint32
GetEventTimeCutoff returns the cutoff event time.
func (*CuckooIndex) LockForTransfer ¶
func (c *CuckooIndex) LockForTransfer() PrimaryKeyData
LockForTransfer locks primary key for transfer and returns PrimaryKeyData
func (*CuckooIndex) Size ¶
func (c *CuckooIndex) Size() uint
Size returns the current number of items stored in the hash table including expired items yet not known to the system
func (*CuckooIndex) UnlockAfterTransfer ¶
func (c *CuckooIndex) UnlockAfterTransfer()
UnlockAfterTransfer release transfer lock
func (*CuckooIndex) Update ¶
func (c *CuckooIndex) Update(key Key, value RecordID) bool
Update updates a key with a new recordID. Return whether key exists in the primary key or not.
func (*CuckooIndex) UpdateEventTimeCutoff ¶
func (c *CuckooIndex) UpdateEventTimeCutoff(cutoff uint32)
UpdateEventTimeCutoff updates eventTimeCutoff
type EnumDict ¶
type EnumDict struct { // Either 0x100 for small_enum, or 0x10000 for big_enum. Capacity int `json:"capacity"` Dict map[string]int `json:"dict"` ReverseDict []string `json:"reverseDict"` }
EnumDict contains mapping from and to enum strings to numbers.
type JobDetail ¶
type JobDetail struct { // Status of archiving for current table Shard. Status JobStatus `json:"status"` // Time of next archiving job NextRun time.Time `json:"nextRun,omitempty"` // Time when the last backfill job finishes LastRun time.Time `json:"lastRun,omitempty"` // Error of last run if failed. LastError error `json:"lastError,omitempty"` // Start time of last archiving job. LastStartTime time.Time `json:"lastStartTime,omitempty"` // Duration of last archiving job. LastDuration time.Duration `json:"lastDuration,omitempty"` // Number of records processed. NumRecords int `json:"numRecords,omitempty"` // Number of days affected. NumAffectedDays int `json:"numAffectedDays,omitempty"` // Total amount of work // ie, archiving merge: number of days // archiving snapshot: number of records Total int `json:"total,omitempty"` // Current finished work. Current int `json:"current,omitempty"` // Duration for waiting for lock. LockDuration time.Duration `json:"lockDuration,omitempty"` }
JobDetail represents common job status of a table Shard.
type LiveBatch ¶
type LiveBatch struct { // The common data structure holding column data. Batch // Capacity of the batch which is decided at the creation time. Capacity int // maximum of arrival time MaxArrivalTime uint32 // contains filtered or unexported fields }
LiveBatch represents a live batch.
func (*LiveBatch) GetOrCreateVectorParty ¶
func (b *LiveBatch) GetOrCreateVectorParty(columnID int, locked bool) common.LiveVectorParty
GetOrCreateVectorParty returns LiveVectorParty for the specified column from the live batch. locked specifies whether the batch has been locked. The lock will be left in the same state after the function returns.
func (*LiveBatch) MarshalJSON ¶
MarshalJSON marshals a LiveBatch into json.
type LiveStore ¶
type LiveStore struct { sync.RWMutex // The batch id to batch map. Batches map[int32]*LiveBatch // Number of rows to create for new batches. BatchSize int // The upper bound of records (exclusive) that can be read by queries. LastReadRecord RecordID // This is the in memory archiving cutoff time high watermark that gets set by the archiving job // before each archiving run. Ingestion will not insert/update records that are older than // the archiving cutoff watermark. ArchivingCutoffHighWatermark uint32 // Logs. RedoLogManager RedoLogManager // Manage backfill queue during ingestion. BackfillManager *BackfillManager // Manage snapshot related stats. SnapshotManager *SnapshotManager // The writer lock is to guarantee single writer to a Shard at all time. To ensure this, writers // (ingestion, archiving etc) need to hold this lock at all times. This lock // should be acquired before the VectorStore and Batch locks. // TODO: if spinning lock performance is a concern we may need to upgrade this // to a designated goroutine with a pc-queue channel. WriterLock sync.RWMutex // Primary key table of the Shard. PrimaryKey PrimaryKey // The position of the next record to be used for writing. Only used by the ingester. NextWriteRecord RecordID // For convenience. HostMemoryManager common.HostMemoryManager `json:"-"` // contains filtered or unexported fields }
LiveStore stores live batches of columnar data.
func NewLiveStore ¶
func NewLiveStore(batchSize int, shard *TableShard) *LiveStore
NewLiveStore creates a new live batch.
func (*LiveStore) AdvanceLastReadRecord ¶
func (s *LiveStore) AdvanceLastReadRecord()
AdvanceLastReadRecord advances the high watermark of the rows to the next write record.
func (*LiveStore) AdvanceNextWriteRecord ¶
AdvanceNextWriteRecord reserves space for a record that return the next available record position back to the caller.
func (*LiveStore) Destruct ¶
func (s *LiveStore) Destruct()
Destruct deletes all vectors allocated in C. Caller must detach the Shard first and wait until all users are finished.
func (*LiveStore) GetBatchForRead ¶
GetBatchForRead returns and read locks the batch with its ID for reads. Caller must explicitly RUnlock() the returned batch after all reads.
func (*LiveStore) GetBatchForWrite ¶
GetBatchForWrite returns and locks the batch with its ID for reads. Caller must explicitly Unlock() the returned batch after all reads.
func (*LiveStore) GetBatchIDs ¶
GetBatchIDs snapshots the batches and returns a list of batch ids for read with the number of records in batchIDs[len()-1].
func (*LiveStore) GetMemoryUsageForColumn ¶
GetMemoryUsageForColumn get the live store memory usage for given data type
func (*LiveStore) MarshalJSON ¶
MarshalJSON marshals a LiveStore into json.
func (*LiveStore) PurgeBatch ¶
PurgeBatch purges the specified batch.
func (*LiveStore) PurgeBatches ¶
PurgeBatches purges the specified batches.
type MemStore ¶
type MemStore interface { // GetMemoryUsageDetails GetMemoryUsageDetails() (map[string]TableShardMemoryUsage, error) // GetScheduler returns the scheduler for scheduling archiving and backfill jobs. GetScheduler() Scheduler // GetTableShard gets the data for a pinned table Shard. Caller needs to unpin after use. GetTableShard(table string, shardID int) (*TableShard, error) // GetSchema returns schema for a table. GetSchema(table string) (*TableSchema, error) // GetSchemas returns all table schemas. GetSchemas() map[string]*TableSchema // FetchSchema fetches schema from metaStore and updates in-memory copy of table schema, // and set up watch channels for metaStore schema changes, used for bootstrapping mem store. FetchSchema() error // InitShards loads/recovers data for shards initially owned by the current instance. InitShards(schedulerOff bool) // HandleIngestion logs an upsert batch and applies it to the in-memory store. HandleIngestion(table string, shardID int, upsertBatch *UpsertBatch) error // Archive is the process moving stable records in fact tables from live batches to archive // batches. Archive(table string, shardID int, cutoff uint32, reporter ArchiveJobDetailReporter) error // Backfill is the process of merging records with event time older than cutoff with // archive batches. Backfill(table string, shardID int, reporter BackfillJobDetailReporter) error // Snapshot is the process to write the current content of dimension table live store in memory to disk. Snapshot(table string, shardID int, reporter SnapshotJobDetailReporter) error // Purge is the process to purge out of retention archive batches Purge(table string, shardID, batchIDStart, batchIDEnd int, reporter PurgeJobDetailReporter) error // Provide exclusive access to read/write data protected by MemStore. utils.RWLocker }
MemStore defines the interface for managing multiple table shards in memory. This is for mocking in unit tests
type PrimaryKey ¶
type PrimaryKey interface { // Find looks up a value given key Find(key Key) (RecordID, bool) // FindOrInsert find or insert a key value pair into FindOrInsert(key Key, value RecordID, eventTime uint32) (existingFound bool, recordID RecordID, err error) // Update updates a key with a new recordID. Return whether key exists in the primary key or not. Update(key Key, value RecordID) bool // Delete deletes a key if it exists Delete(key Key) // Update the cutoff event time. UpdateEventTimeCutoff(eventTimeCutoff uint32) // GetEventTimeCutoff returns the cutoff event time. GetEventTimeCutoff() uint32 // GetDataForTransfer locks the primary key for transferring data // the caller should unlock by calling UnlockAfterTransfer when done LockForTransfer() PrimaryKeyData // UnlockAfterTransfer unlocks primary key UnlockAfterTransfer() // Destruct clean up all existing resources used by primary key Destruct() // Size returns the current number of items. Size() uint // Capacity returns how many items current primary key can hold. Capacity() uint // AllocatedBytes returns the size of primary key in bytes. AllocatedBytes() uint }
PrimaryKey is an interface for primary key index
func NewPrimaryKey ¶
func NewPrimaryKey(keyBytes int, hasEventTime bool, initNumBuckets int, hostMemoryManager common.HostMemoryManager) PrimaryKey
NewPrimaryKey create a primary key data structure params:
- keyBytes, number of bytes of key
- hasEventTime determine whether primary key should record event time for expiration
- initNumBuckets determines the starting number of buckets, setting to 0 to use default
type PrimaryKeyData ¶
type PrimaryKeyData struct { Data unsafe.Pointer NumBytes int Seeds [numHashes]uint32 KeyBytes int NumBuckets int }
PrimaryKeyData holds the data for transferring to GPU for query purposes
type PurgeJob ¶
type PurgeJob struct {
// contains filtered or unexported fields
}
PurgeJob defines the structure that a purge job needs.
func (*PurgeJob) GetIdentifier ¶
GetIdentifier returns a unique identifier of this job.
type PurgeJobDetail ¶
type PurgeJobDetail struct { JobDetail // Stage of the job is running. Stage PurgeStage `json:"stage"` // Number of batches purged NumBatches int `json:"numBatches"` BatchIDStart int `json:"batchIDStart"` BatchIDEnd int `json:"batchIDEnd"` }
PurgeJobDetail represents purge job status of a table shard.
type PurgeJobDetailMutator ¶
type PurgeJobDetailMutator func(jobDetail *PurgeJobDetail)
PurgeJobDetailMutator is the mutator functor to change PurgeJobDetail.
type PurgeJobDetailReporter ¶
type PurgeJobDetailReporter func(key string, mutator PurgeJobDetailMutator)
PurgeJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.
type PurgeManager ¶
type PurgeManager struct { sync.RWMutex `json:"-"` // Job Trigger Condition related fields // Last purge time. LastPurgeTime time.Time `json:"lastPurgeTime"` PurgeInterval time.Duration `json:"purgeInterval"` // contains filtered or unexported fields }
PurgeManager manages the purge related stats and progress.
func NewPurgeManager ¶
func NewPurgeManager(shard *TableShard) *PurgeManager
NewPurgeManager creates a new PurgeManager instance.
func (*PurgeManager) QualifyForPurge ¶
func (p *PurgeManager) QualifyForPurge() bool
QualifyForPurge tells whether we can trigger a purge job.
type PurgeStage ¶
type PurgeStage string
PurgeStage represents different stages of a running purge job.
const ( PurgeMetaData PurgeStage = "purge metadata" PurgeDataFile PurgeStage = "purge data file" PurgeMemory PurgeStage = "purge memory" PurgeComplete PurgeStage = "complete" )
List of purge stages
type RecordID ¶
RecordID represents a record location with BatchID as the inflated vector id and offset determines the offset of record inside the vector
type RedoLogBrowser ¶
type RedoLogBrowser interface { ListLogFiles() ([]int64, error) ListUpsertBatch(creationTime int64) ([]int64, error) ReadData(creationTime int64, upsertBatchOffset int64, start int, length int) ( [][]interface{}, []string, int, error) }
RedoLogBrowser is the interface to list redo log files, upsert batches and read upsert batch data.
type RedoLogManager ¶
type RedoLogManager struct { // The lock is to protect MaxEventTimePerFile. sync.RWMutex `json:"-"` // The time interval of redo file rotations. RotationInterval int64 `json:"rotationInterval"` // The limit of redo file size to trigger rotations. MaxRedoLogSize int64 `json:"maxRedoLogSize"` // Current redo log size CurrentRedoLogSize uint32 `json:"currentRedoLogSize"` // size of all redologs TotalRedoLogSize uint `json:"totalRedologSize"` // The map with redo log creation time as the key and max event time as the value. Readers // need to hold the reader lock in accessing the field. MaxEventTimePerFile map[int64]uint32 `json:"maxEventTimePerFile"` // redo log creation time -> batch count mapping. // Readers need to hold the reader lock in accessing the field. BatchCountPerFile map[int64]uint32 `json:"batchCountPerFile"` // SizePerFile SizePerFile map[int64]uint32 `json:"sizePerFile"` // Current file creation time in milliseconds. CurrentFileCreationTime int64 `json:"currentFileCreationTime"` // contains filtered or unexported fields }
RedoLogManager manages the redo log file append, rotation, purge. It is used by ingestion, recovery and archiving. Accessor must hold the TableShard.WriterLock to access it.
func NewRedoLogManager ¶
func NewRedoLogManager(rotationInterval int64, maxRedoLogSize int64, diskStore diskstore.DiskStore, tableName string, shard int) RedoLogManager
NewRedoLogManager creates a new RedoLogManager instance.
func (*RedoLogManager) MarshalJSON ¶
func (r *RedoLogManager) MarshalJSON() ([]byte, error)
MarshalJSON marshals a RedoLogManager into json.
func (*RedoLogManager) NextUpsertBatch ¶
func (r *RedoLogManager) NextUpsertBatch() func() (*UpsertBatch, int64, uint32)
NextUpsertBatch returns a functor that can be used to iterate over redo logs on disk and returns one UpsertBatch at each call. It returns nil to indicate the end of the upsert batch stream.
Any failure in file reading and upsert batch creation will trigger system panic.
func (*RedoLogManager) PurgeRedologFileAndData ¶
func (r *RedoLogManager) PurgeRedologFileAndData(cutoff uint32, redoFileCheckpointed int64, batchOffset uint32) error
PurgeRedologFileAndData purges disk files and in memory data of redologs that are eligible to be purged.
func (*RedoLogManager) UpdateBatchCount ¶
func (r *RedoLogManager) UpdateBatchCount(redoFile int64) uint32
UpdateBatchCount saves/updates batch counts for the given redolog
func (*RedoLogManager) UpdateMaxEventTime ¶
func (r *RedoLogManager) UpdateMaxEventTime(eventTime uint32, redoFile int64)
UpdateMaxEventTime updates the max event time of the current redo log file. redoFile is the key to the corresponding redo file that needs to have the maxEventTime updated. redoFile == 0 is used in serving ingestion requests where the current file's max event time is updated. redoFile != 0 is used in recovery where the redo log file loaded from disk needs to get its max event time calculated.
func (*RedoLogManager) WriteUpsertBatch ¶
func (r *RedoLogManager) WriteUpsertBatch(upsertBatch *UpsertBatch) (int64, uint32)
WriteUpsertBatch saves an upsert batch into disk before applying it. Any errors from diskStore will trigger system panic.
type Scheduler ¶
type Scheduler interface { Start() Stop() SubmitJob(job Job) chan error DeleteTable(table string, isFactTable bool) GetJobDetails(jobType common.JobType) interface{} NewBackfillJob(tableName string, shardID int) Job NewArchivingJob(tableName string, shardID int, cutoff uint32) Job NewSnapshotJob(tableName string, shardID int) Job NewPurgeJob(tableName string, shardID int, batchIDStart int, batchIDEnd int) Job utils.RWLocker }
Scheduler is for scheduling archiving jobs (and later backfill jobs) for table shards in memStore. It scans through all tables and shards to generate list of eligible jobs to run.
type SnapshotJob ¶
type SnapshotJob struct {
// contains filtered or unexported fields
}
SnapshotJob defines the structure that a snapshot job needs.
func (*SnapshotJob) GetIdentifier ¶
func (job *SnapshotJob) GetIdentifier() string
GetIdentifier returns a unique identifier of this job.
func (*SnapshotJob) Run ¶
func (job *SnapshotJob) Run() error
Run starts the snapshot process and wait for it to finish.
func (*SnapshotJob) String ¶
func (job *SnapshotJob) String() string
String gives meaningful string representation for this job
type SnapshotJobDetail ¶
type SnapshotJobDetail struct { JobDetail // Number of mutations in this snapshot. NumMutations int `json:"numMutations"` // Number of batches written in this snapshot. NumBatches int `json:"numBatches"` // Current redolog file that's being backfilled. RedologFile int64 `json:"redologFile"` // Batch offset within the RedologFile. BatchOffset uint32 `json:"batchOffset"` // Stage of the job is running. Stage SnapshotStage `json:"stage"` }
SnapshotJobDetail represents snapshot job status of a table shard.
type SnapshotJobDetailMutator ¶
type SnapshotJobDetailMutator func(jobDetail *SnapshotJobDetail)
SnapshotJobDetailMutator is the mutator functor to change SnapshotJobDetail.
type SnapshotJobDetailReporter ¶
type SnapshotJobDetailReporter func(key string, mutator SnapshotJobDetailMutator)
SnapshotJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.
type SnapshotManager ¶
type SnapshotManager struct { sync.RWMutex `json:"-"` // Number of mutations since last snapshot. Measured as number of rows mutated. NumMutations int `json:"numMutations"` // Last snapshot time. LastSnapshotTime time.Time `json:"'lastSnapshotTime'"` // keep track of the redo log file of the last batch snapshotted. LastRedoFile int64 `json:"lastRedoFile"` // keep track of the offset of the last batch snapshotted LastBatchOffset uint32 `json:"lastBatchOffset"` // keep track of the record position of the last batch snapshotted LastRecord RecordID // keep track of the redo log file of the last batch queued CurrentRedoFile int64 `json:"currentRedoFile"` // keep track of the offset of the last batch queued CurrentBatchOffset uint32 `json:"currentBatchOffset"` // keep track of the record position when last batch queued CurrentRecord RecordID // Configs SnapshotInterval time.Duration `json:"snapshotInterval"` SnapshotThreshold int `json:"snapshotThreshold"` // contains filtered or unexported fields }
SnapshotManager manages the snapshot related stats and progress.
func NewSnapshotManager ¶
func NewSnapshotManager(shard *TableShard) *SnapshotManager
NewSnapshotManager creates a new SnapshotManager instance.
func (*SnapshotManager) ApplyUpsertBatch ¶
func (s *SnapshotManager) ApplyUpsertBatch(redoFile int64, offset uint32, numMutations int, currentRecord RecordID)
ApplyUpsertBatch advances CurrentRedoLogFile and CurrentBatchOffset and increments NumMutations after applying an upsert batch to live store.
func (*SnapshotManager) Done ¶
func (s *SnapshotManager) Done(currentRedoFile int64, currentBatchOffset uint32, lastNumMutations int, currentRecord RecordID) error
Done updates the snapshot progress both in memory and in metastore and updates number of mutations accordingly.
func (*SnapshotManager) GetLastSnapshotInfo ¶
GetLastSnapshotInfo get last snapshot redolog file, offset and timestamp, lastRecord
func (*SnapshotManager) MarshalJSON ¶
func (s *SnapshotManager) MarshalJSON() ([]byte, error)
MarshalJSON marshals a BackfillManager into json.
func (*SnapshotManager) QualifyForSnapshot ¶
func (s *SnapshotManager) QualifyForSnapshot() bool
QualifyForSnapshot tells whether we can trigger a snapshot job.
func (*SnapshotManager) SetLastSnapshotInfo ¶
func (s *SnapshotManager) SetLastSnapshotInfo(redoLogFile int64, offset uint32, record RecordID)
SetLastSnapshotInfo update last snapshot redolog file, offset and timestamp, lastRecord
func (*SnapshotManager) StartSnapshot ¶
func (s *SnapshotManager) StartSnapshot() (int64, uint32, int, RecordID)
StartSnapshot returns current redo log file ,offset
type SnapshotStage ¶
type SnapshotStage string
SnapshotStage represents different stages of a running snapshot job.
const ( SnapshotSnapshot SnapshotStage = "snapshot" SnapshotCleanup SnapshotStage = "cleanup" SnapshotComplete SnapshotStage = "complete" )
List of SnapshotStages
type TableSchema ¶
type TableSchema struct { sync.RWMutex `json:"-"` // Main schema of the table. Mutable. Schema metaCom.Table `json:"schema"` // Maps from column names to their IDs. Mutable. ColumnIDs map[string]int `json:"columnIDs"` // Maps from enum column names to their case dictionaries. Mutable. EnumDicts map[string]EnumDict `json:"enumDicts"` // DataType for each column ordered by column ID. Mutable. ValueTypeByColumn []memCom.DataType `json:"valueTypeByColumn"` // Number of bytes in the primary key. Immutable. PrimaryKeyBytes int `json:"primaryKeyBytes"` // Types of each primary key column. Immutable. PrimaryKeyColumnTypes []memCom.DataType `json:"primaryKeyColumnTypes"` // Default values of each column. Mutable. Nil means default value is not set. DefaultValues []*memCom.DataValue `json:"-"` }
TableSchema stores metadata of the table such as columns and primary keys. It also stores the dictionaries for enum columns.
func NewTableSchema ¶
func NewTableSchema(table *metaCom.Table) *TableSchema
NewTableSchema creates a new table schema object from metaStore table object, this does not set enum cases.
func (*TableSchema) GetArchivingSortColumns ¶
func (t *TableSchema) GetArchivingSortColumns() []int
GetArchivingSortColumns makes a copy of the Schema.ArchivingSortColumns so callers don't have to hold a read lock to access it.
func (*TableSchema) GetColumnDeletions ¶
func (t *TableSchema) GetColumnDeletions() []bool
GetColumnDeletions returns a boolean slice that indicates whether a column has been deleted. Callers need to hold a read lock.
func (*TableSchema) GetPrimaryKeyColumns ¶
func (t *TableSchema) GetPrimaryKeyColumns() []int
GetPrimaryKeyColumns makes a copy of the Schema.PrimaryKeyColumns so callers don't have to hold a read lock to access it.
func (*TableSchema) GetValueTypeByColumn ¶
func (t *TableSchema) GetValueTypeByColumn() []memCom.DataType
GetValueTypeByColumn makes a copy of the ValueTypeByColumn so callers don't have to hold a read lock to access it.
func (*TableSchema) MarshalJSON ¶
func (t *TableSchema) MarshalJSON() ([]byte, error)
MarshalJSON marshals TableSchema into json.
func (*TableSchema) SetDefaultValue ¶
func (t *TableSchema) SetDefaultValue(columnID int)
SetDefaultValue parses the default value string if present and sets to TableSchema. Schema lock should be acquired and release by caller and enum dict should already be created/update before this function.
func (*TableSchema) SetTable ¶
func (t *TableSchema) SetTable(table *metaCom.Table)
SetTable sets a updated table and update TableSchema, should acquire lock before calling.
type TableShard ¶
type TableShard struct { // Wait group used to prevent the stores from being prematurely deleted. Users sync.WaitGroup `json:"-"` ShardID int `json:"-"` // For convenience, reference to the table schema struct. Schema *TableSchema `json:"schema"` // Live store. Its locks also cover the primary key. LiveStore *LiveStore `json:"liveStore"` // Archive store. ArchiveStore ArchiveStore `json:"archiveStore"` // For convenience. HostMemoryManager common.HostMemoryManager `json:"-"` // contains filtered or unexported fields }
TableShard stores the data for one table shard in memory.
func NewTableShard ¶
func NewTableShard(schema *TableSchema, metaStore metastore.MetaStore, diskStore diskstore.DiskStore, hostMemoryManager common.HostMemoryManager, shard int) *TableShard
NewTableShard creates and initiates a table shard based on the schema.
func (*TableShard) ApplyUpsertBatch ¶
func (shard *TableShard) ApplyUpsertBatch(upsertBatch *UpsertBatch, redoLogFile int64, offset uint32, skipBackfillRows bool) (bool, error)
ApplyUpsertBatch applies the upsert batch to the memstore shard. Returns true if caller needs to wait for availability of backfill buffer
func (*TableShard) DeleteColumn ¶
func (shard *TableShard) DeleteColumn(columnID int) error
DeleteColumn deletes the data for the specified column.
func (*TableShard) Destruct ¶
func (shard *TableShard) Destruct()
Destruct destructs the table shard. Caller must detach the shard from memstore first.
func (*TableShard) LoadMetaData ¶
func (shard *TableShard) LoadMetaData()
LoadMetaData loads metadata for the table Shard from metastore.
func (*TableShard) LoadSnapshot ¶
func (shard *TableShard) LoadSnapshot() error
LoadSnapshot load shard data from snapshot files
func (*TableShard) NewRedoLogBrowser ¶
func (shard *TableShard) NewRedoLogBrowser() RedoLogBrowser
NewRedoLogBrowser creates a RedoLogBrowser using field from Shard.
func (*TableShard) PreloadColumn ¶
func (shard *TableShard) PreloadColumn(columnID int, startDay int, endDay int)
PreloadColumn loads the column into memory and wait for completion of loading within (startDay, endDay]. Note endDay is inclusive but startDay is exclusive.
func (*TableShard) ReplayRedoLogs ¶
func (shard *TableShard) ReplayRedoLogs()
ReplayRedoLogs loads data for the table Shard from disk store and recovers the Shard for serving.
type TableShardMemoryUsage ¶
type TableShardMemoryUsage struct { ColumnMemory map[string]*common.ColumnMemoryUsage `json:"cols"` PrimaryKeyMemory uint `json:"pk"` }
TableShardMemoryUsage contains memory usage for column memory and primary key memory usage
type TestFactoryT ¶
type TestFactoryT struct { RootPath string utils.FileSystem }
TestFactoryT creates memstore test objects from text file
func (TestFactoryT) NewMockMemStore ¶
func (t TestFactoryT) NewMockMemStore() *memStoreImpl
NewMockMemStore returns a new memstore with mocked diskstore and metastore.
func (TestFactoryT) ReadArchiveBatch ¶
func (t TestFactoryT) ReadArchiveBatch(name string) (*Batch, error)
ReadArchiveBatch read batch and do pruning for every columns.
func (TestFactoryT) ReadArchiveVectorParty ¶
func (t TestFactoryT) ReadArchiveVectorParty(name string, locker sync.Locker) (*archiveVectorParty, error)
ReadArchiveVectorParty loads a vector party and prune it after construction.
func (TestFactoryT) ReadBatch ¶
func (t TestFactoryT) ReadBatch(name string) (*Batch, error)
ReadBatch returns a batch given batch name. Batch will be searched under testing/data/batches folder. Prune tells whether need to prune the columns after column contruction.
func (TestFactoryT) ReadLiveBatch ¶
func (t TestFactoryT) ReadLiveBatch(name string) (*Batch, error)
ReadLiveBatch read batch and skip pruning for every columns.
func (TestFactoryT) ReadLiveVectorParty ¶
func (t TestFactoryT) ReadLiveVectorParty(name string) (*cLiveVectorParty, error)
ReadLiveVectorParty loads a vector party and skip pruning.
func (TestFactoryT) ReadUpsertBatch ¶
func (t TestFactoryT) ReadUpsertBatch(name string) (*UpsertBatch, error)
ReadUpsertBatch returns a pointer to UpsertBatch given the upsert batch name.
func (TestFactoryT) ReadVector ¶
func (t TestFactoryT) ReadVector(name string) (*Vector, error)
ReadVector returns a vector given vector name. Vector will be searched under testing/data/vectors folder.
func (TestFactoryT) ReadVectorParty ¶
func (t TestFactoryT) ReadVectorParty(name string) (*cVectorParty, error)
ReadVectorParty returns a vector party given vector party name. Vector party will be searched under testing/data/vps folder. Prune tells whether to prune this column.
type TransferableVectorParty ¶
type TransferableVectorParty interface { // GetHostVectorPartySlice slice vector party between [startIndex, startIndex+length) before transfer to gpu GetHostVectorPartySlice(startIndex, length int) common.HostVectorPartySlice }
TransferableVectorParty is vector party that can be transferred to gpu for processing
type UpsertBatch ¶
type UpsertBatch struct { // Number of rows in the batch, must be between 0 and 65535. NumRows int // Number of columns. NumColumns int // Arrival Time of Upsert Batch ArrivalTime uint32 // contains filtered or unexported fields }
UpsertBatch stores and indexes a serialized upsert batch of data on a particular table. It is used for both client-server data transfer and redo logging. In redo logs each batch is prepended by a 4-byte buffer size. The serialized buffer of the batch is in the following format:
[uint32] magic_number [uint32] buffer_size <begin of buffer> [int32] version_number [int32] num_of_rows [uint16] num_of_columns <reserve 14 bytes> [uint32] arrival_time [uint32] column_offset_0 ... [uint32] column_offset_x+1 [uint32] column_reserved_field1_0 ... [uint32] column_reserved_field1_x [uint32] column_reserved_field2_0 ... [uint32] column_reserved_field2_x [uint32] column_data_type_0 ... [uint32] column_data_type_x [uint16] column_id_0 ... [uint16] column_id_x [uint8] column_mode_0 ... [uint8] column_mode_x (optional) [uint8] null_vector_0 (optional) [padding to 4 byte alignment uint32] offset_vector_0 [padding for 8 byte alignment] value_vector_0 ... [padding for 8 byte alignment] <end of buffer>
Each component in the serialized buffer is byte aligned (not pointer aligned or bit aligned). All serialized numbers are written in little-endian. The struct is used for both client serialization and server deserialization. See https://github.com/uber/aresdb/wiki/redo_logs for more details.
Note: only fixed size values are supported currently.
func NewUpsertBatch ¶
func NewUpsertBatch(buffer []byte) (*UpsertBatch, error)
NewUpsertBatch deserializes an upsert batch on the server. buffer does not contain the 4-byte buffer size.
func (*UpsertBatch) ExtractBackfillBatch ¶
func (u *UpsertBatch) ExtractBackfillBatch(backfillRows []int) *UpsertBatch
ExtractBackfillBatch extracts given rows and stores in a new UpsertBatch The returned new UpsertBatch is not fully serialized and can only be used for structured reads.
func (*UpsertBatch) GetBool ¶
GetBool returns the data (boolean type) stored at (row, col), and the validity of the value.
func (*UpsertBatch) GetBuffer ¶
func (u *UpsertBatch) GetBuffer() []byte
GetBuffer returns the underline buffer used to construct the upsert batch.
func (*UpsertBatch) GetColumnID ¶
func (u *UpsertBatch) GetColumnID(col int) (int, error)
GetColumnID returns the logical id of a column.
func (*UpsertBatch) GetColumnIndex ¶
func (u *UpsertBatch) GetColumnIndex(columnID int) (int, error)
GetColumnIndex returns the local index of a column given a logical index id.
func (*UpsertBatch) GetColumnNames ¶
func (u *UpsertBatch) GetColumnNames(schema *TableSchema) ([]string, error)
GetColumnNames reads columnNames in UpsertBatch, user should not lock schema
func (*UpsertBatch) GetColumnType ¶
func (u *UpsertBatch) GetColumnType(col int) (common.DataType, error)
GetColumnType returns the data type of a column.
func (*UpsertBatch) GetDataValue ¶
func (u *UpsertBatch) GetDataValue(row, col int) (common.DataValue, error)
GetDataValue returns the DataValue for the given row and col index. It first check validity of the value, then it check whether it's a boolean column to decide whether to load bool value or other value type.
func (*UpsertBatch) GetEventColumnIndex ¶
func (u *UpsertBatch) GetEventColumnIndex() int
GetEventColumnIndex returns the column index of event time
func (*UpsertBatch) GetPrimaryKeyBytes ¶
func (u *UpsertBatch) GetPrimaryKeyBytes(row int, primaryKeyCols []int, key []byte) error
GetPrimaryKeyBytes returns primary key bytes for a given row. Note primaryKeyCol is not list of primary key columnIDs.
func (*UpsertBatch) GetPrimaryKeyCols ¶
func (u *UpsertBatch) GetPrimaryKeyCols(primaryKeyColumnIDs []int) ([]int, error)
GetPrimaryKeyCols converts primary key columnIDs to cols in this upsert batch.
type Vector ¶
type Vector struct { // The data type of the value stored in the vector. DataType common.DataType // Max number of values that can be stored in the vector. Size int // Allocated size of the vector in bytes. Bytes int // contains filtered or unexported fields }
Vector stores a batch of columnar data (values, nulls, or counts) for a column.
func NewVector ¶
NewVector creates a vector with the specified bits per unit and size(capacity). The majority of its storage space is managed in C.
func (*Vector) CheckAllValid ¶
CheckAllValid checks whether all bits are 1 in a bool typed vector.
func (*Vector) GetBool ¶
GetBool returns the bool value for the specified index. index bound is not checked!
func (*Vector) GetSliceBytesAligned ¶
func (v *Vector) GetSliceBytesAligned(lowerBound int, upperBound int) (buffer unsafe.Pointer, startIndex int, bytes int)
GetSliceBytesAligned calculate the number of bytes of a slice of the vector, represented by [lowerBound, upperBound), aligned to 64-byte return the buffer pointer, new start index (start entry in vector), and length in bytes
func (*Vector) GetValue ¶
GetValue returns the data value for the specified index. index bound is not checked! The return value points to the internal buffer location that stores the value.
func (*Vector) LowerBound ¶
LowerBound returns the index of the first element in vector[first, last) that is greater or equal to the given value. The result is only valid if vector[first, last) is fully sorted in ascendant order. If all values in the given range is less than the given value, LowerBound returns last. Note that first/last is not checked against vector bound.
func (*Vector) SafeDestruct ¶
func (v *Vector) SafeDestruct()
SafeDestruct destructs this vector's storage space managed in C.
func (*Vector) SetAllValid ¶
func (v *Vector) SetAllValid()
SetAllValid set all bits to be 1 in a bool typed vector.
func (*Vector) SetValue ¶
SetValue sets the data value for the specified index. index bound is not checked! data points to a buffer (in UpsertBatch for instance) that contains the value to be set.
func (*Vector) UpperBound ¶
UpperBound returns the index of the first element in vector[first, last) that is greater than the given value. The result is only valid if vector[first, last) is fully sorted in ascendant order. If all values in the given range is less than the given value, LowerBound returns last. Note that first/last is not checked against vector bound.
Source Files ¶
- archive_store.go
- archive_vector_party.go
- archiving.go
- backfill.go
- backfill_manager.go
- batch.go
- batchstats.go
- cuckoo_index.go
- host_memory_manager.go
- ingestion.go
- job_manager.go
- job_status.go
- live_store.go
- live_vector_party.go
- memstore.go
- merge.go
- primary_key.go
- purge.go
- purge_manager.go
- recovery.go
- redo_log_browser.go
- redo_log_manager.go
- scheduler.go
- schema.go
- snapshot.go
- snapshot_manager.go
- table_shard.go
- test_factory.go
- upsert_batch.go
- vector.go
- vector_party.go
- vector_party_serializer.go