Versions in this module Expand all Collapse all v0 v0.0.2 Sep 5, 2019 Changes in this version type ArchivingJob + func (job *ArchivingJob) JobType() common.JobType + type BackfillConfig struct + BackfillThresholdInBytes int64 + MaxBufferSize int64 type BackfillJob + func (job *BackfillJob) JobType() common.JobType type Job + JobType func() common.JobType type MemStore + AddTableShard func(table string, shardID int, needPeerCopy bool) + GetHostMemoryManager func() common.HostMemoryManager + RemoveTableShard func(table string, shardID int) + type Option func(o *Options) + func WithNumShards(numShards int) Option + type Options struct + func NewOptions(bootstrapToken common.BootStrapToken, ...) Options type PurgeJob + func (job *PurgeJob) JobType() common.JobType type Scheduler + EnableJobType func(jobType common.JobType, enable bool) + IsJobTypeEnabled func(jobType common.JobType) bool type SnapshotJob + func (job *SnapshotJob) JobType() common.JobType type TableShard + BootstrapDetails bootstrap.BootstrapDetails + BootstrapState bootstrap.BootstrapState + func (shard *TableShard) Bootstrap(peerSource client.PeerSource, origin string, topo topology.Topology, ...) error + func (shard *TableShard) IsBootstrapped() bool + func (shard *TableShard) IsDiskDataAvailable() bool + func (shard *TableShard) PlayRedoLog() type TestFactoryT + func GetFactory() TestFactoryT v0.0.1 Nov 27, 2018 Changes in this version + const BaseBatchID + const CheckExistingCount + const IgnoreCount + const IncrementCount + const UpsertHeader + const VectorPartyHeader + func CalculateVectorBytes(dataType common.DataType, size int) int + func CalculateVectorPartyBytes(dataType common.DataType, size int, hasNulls bool, hasCounts bool) int + func GetPrimaryKeyBytes(primaryKeyValues []common.DataValue, key []byte) error + func MarshalPrimaryKey(pk PrimaryKey) ([]byte, error) + func NewHostMemoryManager(memStore *memStoreImpl, totalMemorySize int64) common.HostMemoryManager + func NewLiveVectorParty(length int, dataType common.DataType, defaultValue common.DataValue, ...) common.LiveVectorParty + func NewVectorPartyArchiveSerializer(hostMemManager common.HostMemoryManager, diskStore diskstore.DiskStore, ...) common.VectorPartySerializer + func NewVectorPartySnapshotSerializer(shard *TableShard, columnID, batchID int, batchVersion uint32, seqNum uint32, ...) common.VectorPartySerializer + func UnpinVectorParties(requestedVPs []common.ArchiveVectorParty) + func VectorPartyEquals(v1 common.VectorParty, v2 common.VectorParty) bool + type ArchiveBatch struct + BatchID int32 + SeqNum uint32 + Shard *TableShard + Size int + Version uint32 + func (b *ArchiveBatch) BlockingDelete(columnID int) common.ArchiveVectorParty + func (b *ArchiveBatch) BuildIndex(sortColumns []int, primaryKeyColumns []int, pk PrimaryKey) error + func (b *ArchiveBatch) Clone() *ArchiveBatch + func (b *ArchiveBatch) MarshalJSON() ([]byte, error) + func (b *ArchiveBatch) RequestVectorParty(columnID int) common.ArchiveVectorParty + func (b *ArchiveBatch) TryEvict(columnID int) common.ArchiveVectorParty + func (b *ArchiveBatch) WriteToDisk() error + type ArchiveJobDetail struct + CurrentCutoff uint32 + LastCutoff uint32 + RunningCutoff uint32 + Stage ArchivingStage + type ArchiveJobDetailMutator func(jobDetail *ArchiveJobDetail) + type ArchiveJobDetailReporter func(key string, mutator ArchiveJobDetailMutator) + type ArchiveStore struct + CurrentVersion *ArchiveStoreVersion + PurgeManager *PurgeManager + func NewArchiveStore(shard *TableShard) ArchiveStore + func (s *ArchiveStore) Destruct() + func (s *ArchiveStore) GetCurrentVersion() *ArchiveStoreVersion + func (s *ArchiveStore) MarshalJSON() ([]byte, error) + type ArchiveStoreVersion struct + ArchivingCutoff uint32 + Batches map[int32]*ArchiveBatch + Users sync.WaitGroup + func NewArchiveStoreVersion(cutoff uint32, shard *TableShard) *ArchiveStoreVersion + func (v *ArchiveStoreVersion) GetBatchForRead(batchID int) *ArchiveBatch + func (v *ArchiveStoreVersion) MarshalJSON() ([]byte, error) + func (v *ArchiveStoreVersion) RequestBatch(batchID int32) *ArchiveBatch + type ArchivingJob struct + func (job *ArchivingJob) GetIdentifier() string + func (job *ArchivingJob) Run() error + func (job *ArchivingJob) String() string + type ArchivingStage string + const ArchivingComplete + const ArchivingCreatePatch + const ArchivingMerge + const ArchivingPurge + type BackfillJob struct + func (job *BackfillJob) GetIdentifier() string + func (job *BackfillJob) Run() error + func (job *BackfillJob) String() string + type BackfillJobDetail struct + BatchOffset uint32 + RedologFile int64 + Stage BackfillStage + type BackfillJobDetailMutator func(jobDetail *BackfillJobDetail) + type BackfillJobDetailReporter func(key string, mutator BackfillJobDetailMutator) + type BackfillManager struct + AppendCond *sync.Cond + BackfillThresholdInBytes int64 + BackfillingBufferSize int64 + CurrentBatchOffset uint32 + CurrentBufferSize int64 + CurrentRedoFile int64 + LastBatchOffset uint32 + LastRedoFile int64 + MaxBufferSize int64 + NumRecords int + Shard int + TableName string + UpsertBatches []*UpsertBatch + func NewBackfillManager(tableName string, shard int, tableConfig metaCom.TableConfig) *BackfillManager + func (r *BackfillManager) Append(upsertBatch *UpsertBatch, redoFile int64, batchOffset uint32) bool + func (r *BackfillManager) Destruct() + func (r *BackfillManager) Done(currentRedoFile int64, currentBatchOffset uint32, ...) error + func (r *BackfillManager) GetLatestRedoFileAndOffset() (int64, uint32) + func (r *BackfillManager) MarshalJSON() ([]byte, error) + func (r *BackfillManager) QualifyToTriggerBackfill() bool + func (r *BackfillManager) ReadUpsertBatch(index, start, length int, schema *TableSchema) (data [][]interface{}, columnNames []string, err error) + func (r *BackfillManager) StartBackfill() ([]*UpsertBatch, int64, uint32) + func (r *BackfillManager) WaitForBackfillBufferAvailability() + type BackfillStage string + const BackfillApplyPatch + const BackfillComplete + const BackfillCreatePatch + const BackfillPurge + type Batch struct + Columns []common.VectorParty + func (b *Batch) Equals(other *Batch) bool + func (b *Batch) GetDataValue(row, columnID int) common.DataValue + func (b *Batch) GetDataValueWithDefault(row, columnID int, defaultValue common.DataValue) common.DataValue + func (b *Batch) GetVectorParty(columnID int) common.VectorParty + func (b *Batch) ReplaceVectorParty(columnID int, vp common.VectorParty) + func (b *Batch) SafeDestruct() + type BatchReader interface + GetDataValue func(row, columnID int) common.DataValue + GetDataValueWithDefault func(row, columnID int, defaultValue common.DataValue) common.DataValue + type BatchStatsReporter struct + func NewBatchStatsReporter(intervalInSeconds int, memStore MemStore, metaStore metastore.MetaStore) *BatchStatsReporter + func (batchStats *BatchStatsReporter) Run() + func (batchStats *BatchStatsReporter) Stop() + type CuckooIndex struct + func (c *CuckooIndex) AllocatedBytes() uint + func (c *CuckooIndex) Capacity() uint + func (c *CuckooIndex) Delete(key Key) + func (c *CuckooIndex) Destruct() + func (c *CuckooIndex) Find(key Key) (RecordID, bool) + func (c *CuckooIndex) FindOrInsert(key Key, value RecordID, eventTime uint32) (existingFound bool, recordID RecordID, err error) + func (c *CuckooIndex) GetEventTimeCutoff() uint32 + func (c *CuckooIndex) LockForTransfer() PrimaryKeyData + func (c *CuckooIndex) Size() uint + func (c *CuckooIndex) UnlockAfterTransfer() + func (c *CuckooIndex) Update(key Key, value RecordID) bool + func (c *CuckooIndex) UpdateEventTimeCutoff(cutoff uint32) + type EnumDict struct + Capacity int + Dict map[string]int + ReverseDict []string + type Job interface + GetIdentifier func() string + Run func() error + String func() string + type JobDetail struct + Current int + LastDuration time.Duration + LastError error + LastRun time.Time + LastStartTime time.Time + LockDuration time.Duration + NextRun time.Time + NumAffectedDays int + NumRecords int + Status JobStatus + Total int + type JobStatus string + const JobFailed + const JobReady + const JobRunning + const JobSucceeded + const JobWaiting + type Key []byte + type LiveBatch struct + Capacity int + MaxArrivalTime uint32 + func (b *LiveBatch) GetOrCreateVectorParty(columnID int, locked bool) common.LiveVectorParty + func (b *LiveBatch) MarshalJSON() ([]byte, error) + type LiveStore struct + ArchivingCutoffHighWatermark uint32 + BackfillManager *BackfillManager + BatchSize int + Batches map[int32]*LiveBatch + HostMemoryManager common.HostMemoryManager + LastReadRecord RecordID + NextWriteRecord RecordID + PrimaryKey PrimaryKey + RedoLogManager RedoLogManager + SnapshotManager *SnapshotManager + WriterLock sync.RWMutex + func NewLiveStore(batchSize int, shard *TableShard) *LiveStore + func (s *LiveStore) AdvanceLastReadRecord() + func (s *LiveStore) AdvanceNextWriteRecord() RecordID + func (s *LiveStore) Destruct() + func (s *LiveStore) GetBatchForRead(id int32) *LiveBatch + func (s *LiveStore) GetBatchForWrite(id int32) *LiveBatch + func (s *LiveStore) GetBatchIDs() (batchIDs []int32, numRecordsInLastBatch int) + func (s *LiveStore) GetMemoryUsageForColumn(valueType common.DataType, columnID int) int + func (s *LiveStore) LookupKey(keyStrs []string) (RecordID, bool) + func (s *LiveStore) MarshalJSON() ([]byte, error) + func (s *LiveStore) PurgeBatch(id int32) + func (s *LiveStore) PurgeBatches(ids []int32) + type MemStore interface + Archive func(table string, shardID int, cutoff uint32, reporter ArchiveJobDetailReporter) error + Backfill func(table string, shardID int, reporter BackfillJobDetailReporter) error + FetchSchema func() error + GetMemoryUsageDetails func() (map[string]TableShardMemoryUsage, error) + GetScheduler func() Scheduler + GetSchema func(table string) (*TableSchema, error) + GetSchemas func() map[string]*TableSchema + GetTableShard func(table string, shardID int) (*TableShard, error) + HandleIngestion func(table string, shardID int, upsertBatch *UpsertBatch) error + InitShards func(schedulerOff bool) + Purge func(table string, shardID, batchIDStart, batchIDEnd int, ...) error + Snapshot func(table string, shardID int, reporter SnapshotJobDetailReporter) error + func NewMemStore(metaStore metastore.MetaStore, diskStore diskstore.DiskStore) MemStore + type PrimaryKey interface + AllocatedBytes func() uint + Capacity func() uint + Delete func(key Key) + Destruct func() + Find func(key Key) (RecordID, bool) + FindOrInsert func(key Key, value RecordID, eventTime uint32) (existingFound bool, recordID RecordID, err error) + GetEventTimeCutoff func() uint32 + LockForTransfer func() PrimaryKeyData + Size func() uint + UnlockAfterTransfer func() + Update func(key Key, value RecordID) bool + UpdateEventTimeCutoff func(eventTimeCutoff uint32) + func NewPrimaryKey(keyBytes int, hasEventTime bool, initNumBuckets int, ...) PrimaryKey + type PrimaryKeyData struct + Data unsafe.Pointer + KeyBytes int + NumBuckets int + NumBytes int + Seeds [numHashes]uint32 + type PurgeJob struct + func (job *PurgeJob) GetIdentifier() string + func (job *PurgeJob) Run() error + func (job *PurgeJob) String() string + type PurgeJobDetail struct + BatchIDEnd int + BatchIDStart int + NumBatches int + Stage PurgeStage + type PurgeJobDetailMutator func(jobDetail *PurgeJobDetail) + type PurgeJobDetailReporter func(key string, mutator PurgeJobDetailMutator) + type PurgeManager struct + LastPurgeTime time.Time + PurgeInterval time.Duration + func NewPurgeManager(shard *TableShard) *PurgeManager + func (p *PurgeManager) QualifyForPurge() bool + type PurgeStage string + const PurgeComplete + const PurgeDataFile + const PurgeMemory + const PurgeMetaData + type RecordID struct + BatchID int32 + Index uint32 + type RedoLogBrowser interface + ListLogFiles func() ([]int64, error) + ListUpsertBatch func(creationTime int64) ([]int64, error) + ReadData func(creationTime int64, upsertBatchOffset int64, start int, length int) ([][]interface{}, []string, int, error) + type RedoLogManager struct + BatchCountPerFile map[int64]uint32 + CurrentFileCreationTime int64 + CurrentRedoLogSize uint32 + MaxEventTimePerFile map[int64]uint32 + MaxRedoLogSize int64 + RotationInterval int64 + SizePerFile map[int64]uint32 + TotalRedoLogSize uint + func NewRedoLogManager(rotationInterval int64, maxRedoLogSize int64, diskStore diskstore.DiskStore, ...) RedoLogManager + func (r *RedoLogManager) Close() + func (r *RedoLogManager) MarshalJSON() ([]byte, error) + func (r *RedoLogManager) NextUpsertBatch() func() (*UpsertBatch, int64, uint32) + func (r *RedoLogManager) PurgeRedologFileAndData(cutoff uint32, redoFileCheckpointed int64, batchOffset uint32) error + func (r *RedoLogManager) UpdateBatchCount(redoFile int64) uint32 + func (r *RedoLogManager) UpdateMaxEventTime(eventTime uint32, redoFile int64) + func (r *RedoLogManager) WriteUpsertBatch(upsertBatch *UpsertBatch) (int64, uint32) + type Scheduler interface + DeleteTable func(table string, isFactTable bool) + GetJobDetails func(jobType common.JobType) interface{} + NewArchivingJob func(tableName string, shardID int, cutoff uint32) Job + NewBackfillJob func(tableName string, shardID int) Job + NewPurgeJob func(tableName string, shardID int, batchIDStart int, batchIDEnd int) Job + NewSnapshotJob func(tableName string, shardID int) Job + Start func() + Stop func() + SubmitJob func(job Job) chan error + type SnapshotJob struct + func (job *SnapshotJob) GetIdentifier() string + func (job *SnapshotJob) Run() error + func (job *SnapshotJob) String() string + type SnapshotJobDetail struct + BatchOffset uint32 + NumBatches int + NumMutations int + RedologFile int64 + Stage SnapshotStage + type SnapshotJobDetailMutator func(jobDetail *SnapshotJobDetail) + type SnapshotJobDetailReporter func(key string, mutator SnapshotJobDetailMutator) + type SnapshotManager struct + CurrentBatchOffset uint32 + CurrentRecord RecordID + CurrentRedoFile int64 + LastBatchOffset uint32 + LastRecord RecordID + LastRedoFile int64 + LastSnapshotTime time.Time + NumMutations int + SnapshotInterval time.Duration + SnapshotThreshold int + func NewSnapshotManager(shard *TableShard) *SnapshotManager + func (s *SnapshotManager) ApplyUpsertBatch(redoFile int64, offset uint32, numMutations int, currentRecord RecordID) + func (s *SnapshotManager) Done(currentRedoFile int64, currentBatchOffset uint32, lastNumMutations int, ...) error + func (s *SnapshotManager) GetLastSnapshotInfo() (int64, uint32, time.Time, RecordID) + func (s *SnapshotManager) MarshalJSON() ([]byte, error) + func (s *SnapshotManager) QualifyForSnapshot() bool + func (s *SnapshotManager) SetLastSnapshotInfo(redoLogFile int64, offset uint32, record RecordID) + func (s *SnapshotManager) StartSnapshot() (int64, uint32, int, RecordID) + type SnapshotStage string + const SnapshotCleanup + const SnapshotComplete + const SnapshotSnapshot + type TableSchema struct + ColumnIDs map[string]int + DefaultValues []*memCom.DataValue + EnumDicts map[string]EnumDict + PrimaryKeyBytes int + PrimaryKeyColumnTypes []memCom.DataType + Schema metaCom.Table + ValueTypeByColumn []memCom.DataType + func NewTableSchema(table *metaCom.Table) *TableSchema + func (t *TableSchema) GetArchivingSortColumns() []int + func (t *TableSchema) GetColumnDeletions() []bool + func (t *TableSchema) GetPrimaryKeyColumns() []int + func (t *TableSchema) GetValueTypeByColumn() []memCom.DataType + func (t *TableSchema) MarshalJSON() ([]byte, error) + func (t *TableSchema) SetDefaultValue(columnID int) + func (t *TableSchema) SetTable(table *metaCom.Table) + type TableShard struct + ArchiveStore ArchiveStore + HostMemoryManager common.HostMemoryManager + LiveStore *LiveStore + Schema *TableSchema + ShardID int + Users sync.WaitGroup + func NewTableShard(schema *TableSchema, metaStore metastore.MetaStore, ...) *TableShard + func (shard *TableShard) ApplyUpsertBatch(upsertBatch *UpsertBatch, redoLogFile int64, offset uint32, ...) (bool, error) + func (shard *TableShard) DeleteColumn(columnID int) error + func (shard *TableShard) Destruct() + func (shard *TableShard) LoadMetaData() + func (shard *TableShard) LoadSnapshot() error + func (shard *TableShard) NewRedoLogBrowser() RedoLogBrowser + func (shard *TableShard) PreloadColumn(columnID int, startDay int, endDay int) + func (shard *TableShard) ReplayRedoLogs() + type TableShardMemoryUsage struct + ColumnMemory map[string]*common.ColumnMemoryUsage + PrimaryKeyMemory uint + type TestFactoryT struct + RootPath string + func (t TestFactoryT) NewMockMemStore() *memStoreImpl + func (t TestFactoryT) ReadArchiveBatch(name string) (*Batch, error) + func (t TestFactoryT) ReadArchiveVectorParty(name string, locker sync.Locker) (*archiveVectorParty, error) + func (t TestFactoryT) ReadBatch(name string) (*Batch, error) + func (t TestFactoryT) ReadLiveBatch(name string) (*Batch, error) + func (t TestFactoryT) ReadLiveVectorParty(name string) (*cLiveVectorParty, error) + func (t TestFactoryT) ReadUpsertBatch(name string) (*UpsertBatch, error) + func (t TestFactoryT) ReadVector(name string) (*Vector, error) + func (t TestFactoryT) ReadVectorParty(name string) (*cVectorParty, error) + type TransferableVectorParty interface + GetHostVectorPartySlice func(startIndex, length int) common.HostVectorPartySlice + type UpsertBatch struct + ArrivalTime uint32 + NumColumns int + NumRows int + func NewUpsertBatch(buffer []byte) (*UpsertBatch, error) + func (u *UpsertBatch) ExtractBackfillBatch(backfillRows []int) *UpsertBatch + func (u *UpsertBatch) GetBool(row int, col int) (bool, bool, error) + func (u *UpsertBatch) GetBuffer() []byte + func (u *UpsertBatch) GetColumnID(col int) (int, error) + func (u *UpsertBatch) GetColumnIndex(columnID int) (int, error) + func (u *UpsertBatch) GetColumnNames(schema *TableSchema) ([]string, error) + func (u *UpsertBatch) GetColumnType(col int) (common.DataType, error) + func (u *UpsertBatch) GetDataValue(row, col int) (common.DataValue, error) + func (u *UpsertBatch) GetEventColumnIndex() int + func (u *UpsertBatch) GetPrimaryKeyBytes(row int, primaryKeyCols []int, key []byte) error + func (u *UpsertBatch) GetPrimaryKeyCols(primaryKeyColumnIDs []int) ([]int, error) + func (u *UpsertBatch) GetValue(row int, col int) (unsafe.Pointer, bool, error) + func (u *UpsertBatch) ReadData(start int, length int) ([][]interface{}, error) + type Vector struct + Bytes int + DataType common.DataType + Size int + func NewVector(dataType common.DataType, size int) *Vector + func (v *Vector) Buffer() unsafe.Pointer + func (v *Vector) CheckAllValid() bool + func (v *Vector) GetBool(index int) bool + func (v *Vector) GetSliceBytesAligned(lowerBound int, upperBound int) (buffer unsafe.Pointer, startIndex int, bytes int) + func (v *Vector) GetValue(index int) unsafe.Pointer + func (v *Vector) LowerBound(first int, last int, value unsafe.Pointer) int + func (v *Vector) SafeDestruct() + func (v *Vector) SetAllValid() + func (v *Vector) SetBool(index int, value bool) + func (v *Vector) SetValue(index int, data unsafe.Pointer) + func (v *Vector) UpperBound(first int, last int, value unsafe.Pointer) int