Documentation ¶
Overview ¶
Copyright (c) 2017-2018 Uber Technologies, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Package memstore has to put test factory here since otherwise we will have a memstore -> utils -> memstore import cycle.
Index ¶
- Constants
- func NewHostMemoryManager(memStore *memStoreImpl, totalMemorySize int64) common.HostMemoryManager
- func NewLiveVectorParty(length int, dataType common.DataType, defaultValue common.DataValue, ...) common.LiveVectorParty
- func NewPrimaryKey(keyBytes int, hasEventTime bool, initNumBuckets int, ...) memCom.PrimaryKey
- func UnpinVectorParties(requestedVPs []common.ArchiveVectorParty)
- type ArchiveBatch
- func (b *ArchiveBatch) BlockingDelete(columnID int) common.ArchiveVectorParty
- func (b *ArchiveBatch) BuildIndex(sortColumns []int, primaryKeyColumns []int, pk common.PrimaryKey) error
- func (b *ArchiveBatch) Clone() *ArchiveBatch
- func (b *ArchiveBatch) MarshalJSON() ([]byte, error)
- func (b *ArchiveBatch) RequestVectorParty(columnID int) common.ArchiveVectorParty
- func (b *ArchiveBatch) TryEvict(columnID int) common.ArchiveVectorParty
- func (b *ArchiveBatch) WriteToDisk() error
- type ArchiveJobDetail
- type ArchiveJobDetailMutator
- type ArchiveJobDetailReporter
- type ArchiveStore
- type ArchiveStoreVersion
- type ArchivingJob
- type ArchivingStage
- type BackfillConfig
- type BackfillJob
- type BackfillJobDetail
- type BackfillJobDetailMutator
- type BackfillJobDetailReporter
- type BackfillManager
- func (r *BackfillManager) Append(upsertBatch *memCom.UpsertBatch, redoFile int64, batchOffset uint32) bool
- func (r *BackfillManager) Destruct()
- func (r *BackfillManager) Done(currentRedoFile int64, currentBatchOffset uint32, metaStore metaCom.MetaStore) error
- func (r *BackfillManager) GetLatestRedoFileAndOffset() (int64, uint32)
- func (r *BackfillManager) MarshalJSON() ([]byte, error)
- func (r *BackfillManager) QualifyToTriggerBackfill() bool
- func (r *BackfillManager) ReadUpsertBatch(index, start, length int, schema *memCom.TableSchema) (data [][]interface{}, columnNames []string, err error)
- func (r *BackfillManager) StartBackfill() ([]*memCom.UpsertBatch, int64, uint32)
- func (r *BackfillManager) WaitForBackfillBufferAvailability()
- type BackfillStage
- type BatchStatsReporter
- type CuckooIndex
- func (c *CuckooIndex) AllocatedBytes() uint
- func (c *CuckooIndex) Capacity() uint
- func (c *CuckooIndex) Delete(key memCom.Key)
- func (c *CuckooIndex) Destruct()
- func (c *CuckooIndex) Find(key memCom.Key) (memCom.RecordID, bool)
- func (c *CuckooIndex) FindOrInsert(key memCom.Key, value memCom.RecordID, eventTime uint32) (existingFound bool, recordID memCom.RecordID, err error)
- func (c *CuckooIndex) GetEventTimeCutoff() uint32
- func (c *CuckooIndex) LockForTransfer() memCom.PrimaryKeyData
- func (c *CuckooIndex) Size() uint
- func (c *CuckooIndex) UnlockAfterTransfer()
- func (c *CuckooIndex) Update(key memCom.Key, value memCom.RecordID) bool
- func (c *CuckooIndex) UpdateEventTimeCutoff(cutoff uint32)
- type Job
- type JobDetail
- type JobStatus
- type LiveBatch
- type LiveStore
- func (s *LiveStore) AdvanceLastReadRecord()
- func (s *LiveStore) AdvanceNextWriteRecord() common.RecordID
- func (s *LiveStore) Destruct()
- func (s *LiveStore) GetBatchForRead(id int32) *LiveBatch
- func (s *LiveStore) GetBatchForWrite(id int32) *LiveBatch
- func (s *LiveStore) GetBatchIDs() (batchIDs []int32, numRecordsInLastBatch int)
- func (s *LiveStore) GetMemoryUsageForColumn(valueType common.DataType, columnID int) int
- func (s *LiveStore) LookupKey(keyStrs []string) (common.RecordID, bool)
- func (s *LiveStore) MarshalJSON() ([]byte, error)
- func (s *LiveStore) PurgeBatch(id int32)
- func (s *LiveStore) PurgeBatches(ids []int32)
- type MemStore
- type Option
- type Options
- type PurgeJob
- type PurgeJobDetail
- type PurgeJobDetailMutator
- type PurgeJobDetailReporter
- type PurgeManager
- type PurgeStage
- type RedoLogBrowser
- type Scheduler
- type SnapshotJob
- type SnapshotJobDetail
- type SnapshotJobDetailMutator
- type SnapshotJobDetailReporter
- type SnapshotManager
- func (s *SnapshotManager) ApplyUpsertBatch(redoFile int64, offset uint32, numMutations int, currentRecord common.RecordID)
- func (s *SnapshotManager) Done(currentRedoFile int64, currentBatchOffset uint32, lastNumMutations int, ...) error
- func (s *SnapshotManager) GetLastSnapshotInfo() (int64, uint32, time.Time, common.RecordID)
- func (s *SnapshotManager) MarshalJSON() ([]byte, error)
- func (s *SnapshotManager) QualifyForSnapshot() bool
- func (s *SnapshotManager) SetLastSnapshotInfo(redoLogFile int64, offset uint32, record common.RecordID)
- func (s *SnapshotManager) StartSnapshot() (int64, uint32, int, common.RecordID)
- type SnapshotStage
- type TableShard
- func (shard *TableShard) ApplyUpsertBatch(upsertBatch *common.UpsertBatch, redoLogFile int64, offset uint32, ...) (bool, error)
- func (shard *TableShard) Bootstrap(peerSource client.PeerSource, origin string, topo topology.Topology, ...) error
- func (shard *TableShard) DeleteColumn(columnID int) error
- func (shard *TableShard) Destruct()
- func (shard *TableShard) IsBootstrapped() bool
- func (shard *TableShard) IsDiskDataAvailable() bool
- func (shard *TableShard) LoadMetaData() error
- func (shard *TableShard) LoadSnapshot() error
- func (shard *TableShard) NewRedoLogBrowser() RedoLogBrowser
- func (shard *TableShard) PlayRedoLog()
- func (shard *TableShard) PreloadColumn(columnID int, startDay int, endDay int)
- type TableShardMemoryUsage
- type TestFactoryT
- type TransferableVectorParty
Constants ¶
const BaseBatchID = int32(math.MinInt32)
BaseBatchID is the starting id of all batches.
Variables ¶
This section is empty.
Functions ¶
func NewHostMemoryManager ¶
func NewHostMemoryManager(memStore *memStoreImpl, totalMemorySize int64) common.HostMemoryManager
NewHostMemoryManager is used to init a HostMemoryManager.
func NewLiveVectorParty ¶
func NewLiveVectorParty(length int, dataType common.DataType, defaultValue common.DataValue, hostMemoryManager common.HostMemoryManager) common.LiveVectorParty
NewLiveVectorParty creates LiveVectorParty
func NewPrimaryKey ¶
func NewPrimaryKey(keyBytes int, hasEventTime bool, initNumBuckets int, hostMemoryManager memCom.HostMemoryManager) memCom.PrimaryKey
NewPrimaryKey create a primary key data structure params:
- keyBytes, number of bytes of key
- hasEventTime determine whether primary key should record event time for expiration
- initNumBuckets determines the starting number of buckets, setting to 0 to use default
func UnpinVectorParties ¶
func UnpinVectorParties(requestedVPs []common.ArchiveVectorParty)
UnpinVectorParties unpins all vector parties in the slice.
Types ¶
type ArchiveBatch ¶
type ArchiveBatch struct { common.Batch // Size of the batch (number of rows). Notice that compression changes the // length of some columns, does not change the size of the batch. Size int // Version for archive batches. Version uint32 // SeqNum denotes backfill sequence number SeqNum uint32 // For convenience. BatchID int32 Shard *TableShard }
ArchiveBatch represents a archive batch.
func (*ArchiveBatch) BlockingDelete ¶
func (b *ArchiveBatch) BlockingDelete(columnID int) common.ArchiveVectorParty
BlockingDelete blocks until all users are finished with the specified column, and then deletes the column from the batch. Returns the vector party deleted if any.
func (*ArchiveBatch) BuildIndex ¶
func (b *ArchiveBatch) BuildIndex(sortColumns []int, primaryKeyColumns []int, pk common.PrimaryKey) error
BuildIndex builds an index over the primary key columns of this archive batch and inserts the records id into the given primary key.
func (*ArchiveBatch) Clone ¶
func (b *ArchiveBatch) Clone() *ArchiveBatch
Clone returns a copy of current batch including all references to underlying vector parties. Caller is responsible for holding the lock (if necessary).
func (*ArchiveBatch) MarshalJSON ¶
func (b *ArchiveBatch) MarshalJSON() ([]byte, error)
MarshalJSON marshals a ArchiveBatch into json.
func (*ArchiveBatch) RequestVectorParty ¶
func (b *ArchiveBatch) RequestVectorParty(columnID int) common.ArchiveVectorParty
RequestVectorParty creates(optional), pins, and returns the requested vector party. On creation it also asynchronously loads a vector party from disk into memory.
Caller must call vp.WaitForDiskLoad() before using it, and call vp.Release() afterwards.
func (*ArchiveBatch) TryEvict ¶
func (b *ArchiveBatch) TryEvict(columnID int) common.ArchiveVectorParty
TryEvict attempts to evict and destruct the specified column from the archive batch. It will fail fast if the column is currently in use so that host memory manager can try evicting other VPs immediately. Returns vector party evicted if succeeded.
func (*ArchiveBatch) WriteToDisk ¶
func (b *ArchiveBatch) WriteToDisk() error
WriteToDisk writes each column of a batch to disk. It happens on archiving stage for merged archive batch so there is no need to lock it.
type ArchiveJobDetail ¶
type ArchiveJobDetail struct { JobDetail // Current cutoff. CurrentCutoff uint32 `json:"currentCutoff"` // Stage of the job is running. Stage ArchivingStage `json:"stage"` // New Cutoff. RunningCutoff uint32 `json:"runningCutoff"` // Cutoff of last completed archiving job. LastCutoff uint32 `json:"lastCutoff"` }
ArchiveJobDetail represents archiving job status of a table Shard.
type ArchiveJobDetailMutator ¶
type ArchiveJobDetailMutator func(jobDetail *ArchiveJobDetail)
ArchiveJobDetailMutator is the mutator functor to change ArchiveJobDetail.
type ArchiveJobDetailReporter ¶
type ArchiveJobDetailReporter func(key string, mutator ArchiveJobDetailMutator)
ArchiveJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.
type ArchiveStore ¶
type ArchiveStore struct { // The mutex protects the pointer pointing to the current version of archived vector version. sync.RWMutex PurgeManager *PurgeManager // Current version points to the most recent version of vector store version for queries to use. CurrentVersion *ArchiveStoreVersion }
ArchiveStore manages archive stores versions. Archive store version evolves to a new version after archiving. Readers should follow the following locking protocol:
archiveStore.Users.Add(1) // tableShard.ArchiveStore can no longer be accessed directly. // continue reading from archiveStore archiveStore.Users.Done()
func NewArchiveStore ¶
func NewArchiveStore(shard *TableShard) *ArchiveStore
NewArchiveStore creates a new archive store. Current version is just a place holder for test. It will be replaced during recovery.
func (*ArchiveStore) Destruct ¶
func (s *ArchiveStore) Destruct()
Destruct deletes all vectors allocated in C. Caller must detach the Shard first and wait until all users are finished.
func (*ArchiveStore) GetCurrentVersion ¶
func (s *ArchiveStore) GetCurrentVersion() *ArchiveStoreVersion
GetCurrentVersion returns current SortedVectorStoreVersion and does proper locking. It'v used by query and data browsing. Users need to call version.Users.Done() after their work.
func (*ArchiveStore) MarshalJSON ¶
func (s *ArchiveStore) MarshalJSON() ([]byte, error)
MarshalJSON marshals a ArchiveStore into json.
type ArchiveStoreVersion ¶
type ArchiveStoreVersion struct { // The mutex // protects the Batches map structure and the archiving cutoff field. // It does not protect contents within a batch. Before releasing // the VectorStore mutex, user should lock the batch level mutex if necessary // to ensure proper protection at batch level. sync.RWMutex `json:"-"` // Wait group used to prevent this ArchiveStore from being evicted Users sync.WaitGroup `json:"-"` // Each batch in the slice is identified by BaseBatchID+index. // Index out of bound and nil Batch for archive batches indicates that none of // the columns have been loaded into memory from disk. Batches map[int32]*ArchiveBatch `json:"batches"` // The archiving cutoff used for this version of the sorted store. ArchivingCutoff uint32 `json:"archivingCutoff"` // contains filtered or unexported fields }
ArchiveStoreVersion stores a version of archive batches of columnar data.
func NewArchiveStoreVersion ¶
func NewArchiveStoreVersion(cutoff uint32, shard *TableShard) *ArchiveStoreVersion
NewArchiveStoreVersion creates a new empty archive store version given cutoff.
func (*ArchiveStoreVersion) GetBatchForRead ¶
func (v *ArchiveStoreVersion) GetBatchForRead(batchID int) *ArchiveBatch
GetBatchForRead returns a archiveBatch for read, reader needs to unlock after use
func (*ArchiveStoreVersion) MarshalJSON ¶
func (v *ArchiveStoreVersion) MarshalJSON() ([]byte, error)
MarshalJSON marshals a ArchiveStoreVersion into json.
func (*ArchiveStoreVersion) RequestBatch ¶
func (v *ArchiveStoreVersion) RequestBatch(batchID int32) *ArchiveBatch
RequestBatch returns the requested archive batch from the archive store version.
type ArchivingJob ¶
type ArchivingJob struct {
// contains filtered or unexported fields
}
ArchivingJob defines the structure that an archiving job needs.
func (*ArchivingJob) GetIdentifier ¶
func (job *ArchivingJob) GetIdentifier() string
GetIdentifier returns a unique identifier of this job.
func (*ArchivingJob) JobType ¶ added in v0.0.2
func (job *ArchivingJob) JobType() common.JobType
JobType return job type
func (*ArchivingJob) Run ¶
func (job *ArchivingJob) Run() error
Run starts the archiving process and wait for it to finish.
func (*ArchivingJob) String ¶
func (job *ArchivingJob) String() string
String gives meaningful string representation for this job
type ArchivingStage ¶
type ArchivingStage string
ArchivingStage represents different stages of a running archive job.
const ( ArchivingCreatePatch ArchivingStage = "create patch" ArchivingMerge ArchivingStage = "merge" ArchivingPurge ArchivingStage = "purge" ArchivingComplete ArchivingStage = "complete" )
List of ArchivingStages.
type BackfillConfig ¶ added in v0.0.2
type BackfillConfig struct { // max buffer size to hold backfill data MaxBufferSize int64 `json:"maxBufferSize"` // threshold to trigger backfill BackfillThresholdInBytes int64 `json:"backfillThresholdInBytes"` }
BackfillConfig defines configs for backfill
type BackfillJob ¶
type BackfillJob struct {
// contains filtered or unexported fields
}
BackfillJob defines the structure that a backfill job needs.
func (*BackfillJob) GetIdentifier ¶
func (job *BackfillJob) GetIdentifier() string
GetIdentifier returns a unique identifier of this job.
func (*BackfillJob) JobType ¶ added in v0.0.2
func (job *BackfillJob) JobType() common.JobType
JobType return job type
func (*BackfillJob) Run ¶
func (job *BackfillJob) Run() error
Run starts the backfill process and wait for it to finish.
func (*BackfillJob) String ¶
func (job *BackfillJob) String() string
String gives meaningful string representation for this job
type BackfillJobDetail ¶
type BackfillJobDetail struct { JobDetail // Stage of the job is running. Stage BackfillStage `json:"stage"` // Current redolog file that's being backfilled. RedologFile int64 `json:"redologFile"` // Batch offset within the RedologFile. BatchOffset uint32 `json:"batchOffset"` }
BackfillJobDetail represents backfill job status of a table Shard.
type BackfillJobDetailMutator ¶
type BackfillJobDetailMutator func(jobDetail *BackfillJobDetail)
BackfillJobDetailMutator is the mutator functor to change BackfillJobDetail.
type BackfillJobDetailReporter ¶
type BackfillJobDetailReporter func(key string, mutator BackfillJobDetailMutator)
BackfillJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.
type BackfillManager ¶
type BackfillManager struct { sync.RWMutex `json:"-"` BackfillConfig // Name of the table. TableName string `json:"-"` // The shard id of the table. Shard int `json:"-"` // queue to hold UpsertBatches to backfill UpsertBatches []*memCom.UpsertBatch `json:"-"` // keep track of the number of records in backfill queue NumRecords int `json:"numRecords"` // keep track of the size of the buffer that holds batches to be backfilled CurrentBufferSize int64 `json:"currentBufferSize"` // keep track of the size of the buffer that holds batches being backfilled BackfillingBufferSize int64 `json:"backfillingBufferSize"` // keep track of the redo log file of the last batch backfilled LastRedoFile int64 `json:"lastRedoFile"` // keep track of the offset of the last batch backfilled LastBatchOffset uint32 `json:"lastBatchOffset"` // keep track of the redo log file of the last batch queued CurrentRedoFile int64 `json:"currentRedoFile"` // keep track of the offset of the last batch being queued CurrentBatchOffset uint32 `json:"currentBatchOffset"` AppendCond *sync.Cond `json:"-"` }
BackfillManager manages the records that need to be put into a backfill queue and merged with sorted batches directly.
func NewBackfillManager ¶
func NewBackfillManager(tableName string, shard int, config BackfillConfig) *BackfillManager
NewBackfillManager creates a new BackfillManager instance.
func (*BackfillManager) Append ¶
func (r *BackfillManager) Append(upsertBatch *memCom.UpsertBatch, redoFile int64, batchOffset uint32) bool
Append appends an upsert batch into the backfill queue. Returns true if buffer limit has been reached and caller may need to wait
func (*BackfillManager) Destruct ¶
func (r *BackfillManager) Destruct()
Destruct set the golang object references used by backfill manager to be nil to trigger gc ealier.
func (*BackfillManager) Done ¶
func (r *BackfillManager) Done(currentRedoFile int64, currentBatchOffset uint32, metaStore metaCom.MetaStore) error
Done updates the backfill progress both in memory and in metastore.
func (*BackfillManager) GetLatestRedoFileAndOffset ¶
func (r *BackfillManager) GetLatestRedoFileAndOffset() (int64, uint32)
GetLatestRedoFileAndOffset returns latest redofile and its batch offset
func (*BackfillManager) MarshalJSON ¶
func (r *BackfillManager) MarshalJSON() ([]byte, error)
MarshalJSON marshals a BackfillManager into json.
func (*BackfillManager) QualifyToTriggerBackfill ¶
func (r *BackfillManager) QualifyToTriggerBackfill() bool
QualifyToTriggerBackfill decides if OK to trigger size-based backfill process
func (*BackfillManager) ReadUpsertBatch ¶
func (r *BackfillManager) ReadUpsertBatch(index, start, length int, schema *memCom.TableSchema) (data [][]interface{}, columnNames []string, err error)
ReadUpsertBatch reads upsert batch in backfill queue, user should not lock schema
func (*BackfillManager) StartBackfill ¶
func (r *BackfillManager) StartBackfill() ([]*memCom.UpsertBatch, int64, uint32)
StartBackfill gets a slice of UpsertBatches from backfill queue and returns the CurrentRedoFile and CurrentBatchOffset.
func (*BackfillManager) WaitForBackfillBufferAvailability ¶
func (r *BackfillManager) WaitForBackfillBufferAvailability()
WaitForBackfillBufferAvailability blocks until backfill buffer is available
type BackfillStage ¶
type BackfillStage string
BackfillStage represents different stages of a running backfill job.
const ( BackfillCreatePatch BackfillStage = "create patch" BackfillApplyPatch BackfillStage = "apply patch" BackfillPurge BackfillStage = "purge" BackfillComplete BackfillStage = "complete" )
List of BackfillStages.
type BatchStatsReporter ¶
type BatchStatsReporter struct {
// contains filtered or unexported fields
}
BatchStatsReporter is used to report batch level stats like row count
func NewBatchStatsReporter ¶
func NewBatchStatsReporter(intervalInSeconds int, memStore MemStore, shardOwner topology.ShardOwner) *BatchStatsReporter
NewBatchStatsReporter create a new BatchStatsReporter instance
func (*BatchStatsReporter) Run ¶
func (batchStats *BatchStatsReporter) Run()
Run is a ticker function to run report periodically
func (*BatchStatsReporter) Stop ¶
func (batchStats *BatchStatsReporter) Stop()
Stop to stop the stats reporter
type CuckooIndex ¶
type CuckooIndex struct {
// contains filtered or unexported fields
}
CuckooIndex is a implementation of Hash Index using Cuckoo Hashing algorithm Lazy expiration is used to invalidate expired items CuckooIndex is not threadsafe
func (*CuckooIndex) AllocatedBytes ¶
func (c *CuckooIndex) AllocatedBytes() uint
AllocatedBytes returns the allocated size of primary key in bytes.
func (*CuckooIndex) Capacity ¶
func (c *CuckooIndex) Capacity() uint
Capacity returns how many items current primary key can hold.
func (*CuckooIndex) Delete ¶
func (c *CuckooIndex) Delete(key memCom.Key)
Delete will delete a item with given key
func (*CuckooIndex) FindOrInsert ¶
func (c *CuckooIndex) FindOrInsert(key memCom.Key, value memCom.RecordID, eventTime uint32) (existingFound bool, recordID memCom.RecordID, err error)
FindOrInsert find the existing key or insert a new (key, value) pair
func (*CuckooIndex) GetEventTimeCutoff ¶
func (c *CuckooIndex) GetEventTimeCutoff() uint32
GetEventTimeCutoff returns the cutoff event time.
func (*CuckooIndex) LockForTransfer ¶
func (c *CuckooIndex) LockForTransfer() memCom.PrimaryKeyData
LockForTransfer locks primary key for transfer and returns PrimaryKeyData
func (*CuckooIndex) Size ¶
func (c *CuckooIndex) Size() uint
Size returns the current number of items stored in the hash table including expired items yet not known to the system
func (*CuckooIndex) UnlockAfterTransfer ¶
func (c *CuckooIndex) UnlockAfterTransfer()
UnlockAfterTransfer release transfer lock
func (*CuckooIndex) Update ¶
Update updates a key with a new recordID. Return whether key exists in the primary key or not.
func (*CuckooIndex) UpdateEventTimeCutoff ¶
func (c *CuckooIndex) UpdateEventTimeCutoff(cutoff uint32)
UpdateEventTimeCutoff updates eventTimeCutoff
type JobDetail ¶
type JobDetail struct { // Status of archiving for current table Shard. Status JobStatus `json:"status"` // Time of next archiving job NextRun time.Time `json:"nextRun,omitempty"` // Time when the last backfill job finishes LastRun time.Time `json:"lastRun,omitempty"` // Error of last run if failed. LastError error `json:"lastError,omitempty"` // Start time of last archiving job. LastStartTime time.Time `json:"lastStartTime,omitempty"` // Duration of last archiving job. LastDuration time.Duration `json:"lastDuration,omitempty"` // Number of records processed. NumRecords int `json:"numRecords,omitempty"` // Number of days affected. NumAffectedDays int `json:"numAffectedDays,omitempty"` // Total amount of work // ie, archiving merge: number of days // archiving snapshot: number of records Total int `json:"total,omitempty"` // Current finished work. Current int `json:"current,omitempty"` // Duration for waiting for lock. LockDuration time.Duration `json:"lockDuration,omitempty"` }
JobDetail represents common job status of a table Shard.
type LiveBatch ¶
type LiveBatch struct { // The common data structure holding column data. common.Batch // Capacity of the batch which is decided at the creation time. Capacity int // maximum of arrival time MaxArrivalTime uint32 // contains filtered or unexported fields }
LiveBatch represents a live batch.
func (*LiveBatch) GetOrCreateVectorParty ¶
func (b *LiveBatch) GetOrCreateVectorParty(columnID int, locked bool) common.LiveVectorParty
GetOrCreateVectorParty returns LiveVectorParty for the specified column from the live batch. locked specifies whether the batch has been locked. The lock will be left in the same state after the function returns.
func (*LiveBatch) MarshalJSON ¶
MarshalJSON marshals a LiveBatch into json.
type LiveStore ¶
type LiveStore struct { sync.RWMutex // The batch id to batch map. Batches map[int32]*LiveBatch // Number of rows to create for new batches. BatchSize int // The upper bound of records (exclusive) that can be read by queries. LastReadRecord common.RecordID // This is the in memory archiving cutoff time high watermark that gets set by the archiving job // before each archiving run. Ingestion will not insert/update records that are older than // the archiving cutoff watermark. ArchivingCutoffHighWatermark uint32 // Logs. RedoLogManager redolog.RedologManager // Manage backfill queue during ingestion. BackfillManager *BackfillManager // Manage snapshot related stats. SnapshotManager *SnapshotManager // The writer lock is to guarantee single writer to a Shard at all time. To ensure this, writers // (ingestion, archiving etc) need to hold this lock at all times. This lock // should be acquired before the VectorStore and Batch locks. // TODO: if spinning lock performance is a concern we may need to upgrade this // to a designated goroutine with a pc-queue channel. WriterLock sync.RWMutex // Primary key table of the Shard. PrimaryKey common.PrimaryKey // The position of the next record to be used for writing. Only used by the ingester. NextWriteRecord common.RecordID // For convenience. HostMemoryManager common.HostMemoryManager `json:"-"` // contains filtered or unexported fields }
LiveStore stores live batches of columnar data.
func NewLiveStore ¶
func NewLiveStore(batchSize int, shard *TableShard) *LiveStore
NewLiveStore creates a new live batch.
func (*LiveStore) AdvanceLastReadRecord ¶
func (s *LiveStore) AdvanceLastReadRecord()
AdvanceLastReadRecord advances the high watermark of the rows to the next write record.
func (*LiveStore) AdvanceNextWriteRecord ¶
AdvanceNextWriteRecord reserves space for a record that return the next available record position back to the caller.
func (*LiveStore) Destruct ¶
func (s *LiveStore) Destruct()
Destruct deletes all vectors allocated in C. Caller must detach the Shard first and wait until all users are finished.
func (*LiveStore) GetBatchForRead ¶
GetBatchForRead returns and read locks the batch with its ID for reads. Caller must explicitly RUnlock() the returned batch after all reads.
func (*LiveStore) GetBatchForWrite ¶
GetBatchForWrite returns and locks the batch with its ID for reads. Caller must explicitly Unlock() the returned batch after all reads.
func (*LiveStore) GetBatchIDs ¶
GetBatchIDs snapshots the batches and returns a list of batch ids for read with the number of records in batchIDs[len()-1].
func (*LiveStore) GetMemoryUsageForColumn ¶
GetMemoryUsageForColumn get the live store memory usage for given data type
func (*LiveStore) MarshalJSON ¶
MarshalJSON marshals a LiveStore into json.
func (*LiveStore) PurgeBatch ¶
PurgeBatch purges the specified batch.
func (*LiveStore) PurgeBatches ¶
PurgeBatches purges the specified batches.
type MemStore ¶
type MemStore interface { common.TableSchemaReader bootstrap.Bootstrapable // GetMemoryUsageDetails GetMemoryUsageDetails() (map[string]TableShardMemoryUsage, error) // GetScheduler returns the scheduler for scheduling archiving and backfill jobs. GetScheduler() Scheduler // GetHostMemoryManager returns the host memory manager GetHostMemoryManager() common.HostMemoryManager // AddTableShard add a table shard to the memstore AddTableShard(table string, shardID int, needPeerCopy bool) // GetTableShard gets the data for a pinned table Shard. Caller needs to unpin after use. GetTableShard(table string, shardID int) (*TableShard, error) // RemoveTableShard removes table shard from memstore RemoveTableShard(table string, shardID int) // FetchSchema fetches schema from metaStore and updates in-memory copy of table schema, // and set up watch channels for metaStore schema changes, used for bootstrapping mem store. FetchSchema() error // InitShards loads/recovers data for shards initially owned by the current instance. InitShards(schedulerOff bool, shardOwner topology.ShardOwner) // HandleIngestion logs an upsert batch and applies it to the in-memory store. HandleIngestion(table string, shardID int, upsertBatch *common.UpsertBatch) error // Archive is the process moving stable records in fact tables from live batches to archive // batches. Archive(table string, shardID int, cutoff uint32, reporter ArchiveJobDetailReporter) error // Backfill is the process of merging records with event time older than cutoff with // archive batches. Backfill(table string, shardID int, reporter BackfillJobDetailReporter) error // Snapshot is the process to write the current content of dimension table live store in memory to disk. Snapshot(table string, shardID int, reporter SnapshotJobDetailReporter) error // Purge is the process to purge out of retention archive batches Purge(table string, shardID, batchIDStart, batchIDEnd int, reporter PurgeJobDetailReporter) error }
MemStore defines the interface for managing multiple table shards in memory. This is for mocking in unit tests
type Option ¶ added in v0.0.2
type Option func(o *Options)
func WithNumShards ¶ added in v0.0.2
WithNumShards set numShards to memstore options
type Options ¶ added in v0.0.2
type Options struct {
// contains filtered or unexported fields
}
class to hold all necessary context objects used in memstore
func NewOptions ¶ added in v0.0.2
func NewOptions(bootstrapToken common.BootStrapToken, redoLogMaster *redolog.RedoLogManagerMaster, setters ...Option) Options
NewOptions create new options instance
type PurgeJob ¶
type PurgeJob struct {
// contains filtered or unexported fields
}
PurgeJob defines the structure that a purge job needs.
func (*PurgeJob) GetIdentifier ¶
GetIdentifier returns a unique identifier of this job.
type PurgeJobDetail ¶
type PurgeJobDetail struct { JobDetail // Stage of the job is running. Stage PurgeStage `json:"stage"` // Number of batches purged NumBatches int `json:"numBatches"` BatchIDStart int `json:"batchIDStart"` BatchIDEnd int `json:"batchIDEnd"` }
PurgeJobDetail represents purge job status of a table shard.
type PurgeJobDetailMutator ¶
type PurgeJobDetailMutator func(jobDetail *PurgeJobDetail)
PurgeJobDetailMutator is the mutator functor to change PurgeJobDetail.
type PurgeJobDetailReporter ¶
type PurgeJobDetailReporter func(key string, mutator PurgeJobDetailMutator)
PurgeJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.
type PurgeManager ¶
type PurgeManager struct { sync.RWMutex `json:"-"` // Job Trigger Condition related fields // Last purge time. LastPurgeTime time.Time `json:"lastPurgeTime"` PurgeInterval time.Duration `json:"purgeInterval"` // contains filtered or unexported fields }
PurgeManager manages the purge related stats and progress.
func NewPurgeManager ¶
func NewPurgeManager(shard *TableShard) *PurgeManager
NewPurgeManager creates a new PurgeManager instance.
func (*PurgeManager) QualifyForPurge ¶
func (p *PurgeManager) QualifyForPurge() bool
QualifyForPurge tells whether we can trigger a purge job.
type PurgeStage ¶
type PurgeStage string
PurgeStage represents different stages of a running purge job.
const ( PurgeMetaData PurgeStage = "purge metadata" PurgeDataFile PurgeStage = "purge data file" PurgeMemory PurgeStage = "purge memory" PurgeComplete PurgeStage = "complete" )
List of purge stages
type RedoLogBrowser ¶
type RedoLogBrowser interface { ListLogFiles() ([]int64, error) ListUpsertBatch(creationTime int64) ([]int64, error) ReadData(creationTime int64, upsertBatchOffset int64, start int, length int) ( [][]interface{}, []string, int, error) }
RedoLogBrowser is the interface to list redo log files, upsert batches and read upsert batch data.
type Scheduler ¶
type Scheduler interface { Start() Stop() SubmitJob(job Job) (error, chan error) DeleteTable(table string, isFactTable bool) GetJobDetails(jobType common.JobType) interface{} NewBackfillJob(tableName string, shardID int) Job NewArchivingJob(tableName string, shardID int, cutoff uint32) Job NewSnapshotJob(tableName string, shardID int) Job NewPurgeJob(tableName string, shardID int, batchIDStart int, batchIDEnd int) Job EnableJobType(jobType common.JobType, enable bool) IsJobTypeEnabled(jobType common.JobType) bool utils.RWLocker }
Scheduler is for scheduling archiving jobs (and later backfill jobs) for table shards in memStore. It scans through all tables and shards to generate list of eligible jobs to run.
type SnapshotJob ¶
type SnapshotJob struct {
// contains filtered or unexported fields
}
SnapshotJob defines the structure that a snapshot job needs.
func (*SnapshotJob) GetIdentifier ¶
func (job *SnapshotJob) GetIdentifier() string
GetIdentifier returns a unique identifier of this job.
func (*SnapshotJob) JobType ¶ added in v0.0.2
func (job *SnapshotJob) JobType() common.JobType
JobType return job type
func (*SnapshotJob) Run ¶
func (job *SnapshotJob) Run() error
Run starts the snapshot process and wait for it to finish.
func (*SnapshotJob) String ¶
func (job *SnapshotJob) String() string
String gives meaningful string representation for this job
type SnapshotJobDetail ¶
type SnapshotJobDetail struct { JobDetail // Number of mutations in this snapshot. NumMutations int `json:"numMutations"` // Number of batches written in this snapshot. NumBatches int `json:"numBatches"` // Current redolog file that's being backfilled. RedologFile int64 `json:"redologFile"` // Batch offset within the RedologFile. BatchOffset uint32 `json:"batchOffset"` // Stage of the job is running. Stage SnapshotStage `json:"stage"` }
SnapshotJobDetail represents snapshot job status of a table shard.
type SnapshotJobDetailMutator ¶
type SnapshotJobDetailMutator func(jobDetail *SnapshotJobDetail)
SnapshotJobDetailMutator is the mutator functor to change SnapshotJobDetail.
type SnapshotJobDetailReporter ¶
type SnapshotJobDetailReporter func(key string, mutator SnapshotJobDetailMutator)
SnapshotJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.
type SnapshotManager ¶
type SnapshotManager struct { sync.RWMutex `json:"-"` // Number of mutations since last snapshot. Measured as number of rows mutated. NumMutations int `json:"numMutations"` // Last snapshot time. LastSnapshotTime time.Time `json:"'lastSnapshotTime'"` // keep track of the redo log file of the last batch snapshotted. LastRedoFile int64 `json:"lastRedoFile"` // keep track of the offset of the last batch snapshotted LastBatchOffset uint32 `json:"lastBatchOffset"` // keep track of the record position of the last batch snapshotted LastRecord common.RecordID // keep track of the redo log file of the last batch queued CurrentRedoFile int64 `json:"currentRedoFile"` // keep track of the offset of the last batch queued CurrentBatchOffset uint32 `json:"currentBatchOffset"` // keep track of the record position when last batch queued CurrentRecord common.RecordID // Configs SnapshotInterval time.Duration `json:"snapshotInterval"` SnapshotThreshold int `json:"snapshotThreshold"` // contains filtered or unexported fields }
SnapshotManager manages the snapshot related stats and progress.
func NewSnapshotManager ¶
func NewSnapshotManager(shard *TableShard) *SnapshotManager
NewSnapshotManager creates a new SnapshotManager instance.
func (*SnapshotManager) ApplyUpsertBatch ¶
func (s *SnapshotManager) ApplyUpsertBatch(redoFile int64, offset uint32, numMutations int, currentRecord common.RecordID)
ApplyUpsertBatch advances CurrentRedoLogFile and CurrentBatchOffset and increments NumMutations after applying an upsert batch to live store.
func (*SnapshotManager) Done ¶
func (s *SnapshotManager) Done(currentRedoFile int64, currentBatchOffset uint32, lastNumMutations int, currentRecord common.RecordID) error
Done updates the snapshot progress both in memory and in metastore and updates number of mutations accordingly.
func (*SnapshotManager) GetLastSnapshotInfo ¶
GetLastSnapshotInfo get last snapshot redolog file, offset and timestamp, lastRecord
func (*SnapshotManager) MarshalJSON ¶
func (s *SnapshotManager) MarshalJSON() ([]byte, error)
MarshalJSON marshals a BackfillManager into json.
func (*SnapshotManager) QualifyForSnapshot ¶
func (s *SnapshotManager) QualifyForSnapshot() bool
QualifyForSnapshot tells whether we can trigger a snapshot job.
func (*SnapshotManager) SetLastSnapshotInfo ¶
func (s *SnapshotManager) SetLastSnapshotInfo(redoLogFile int64, offset uint32, record common.RecordID)
SetLastSnapshotInfo update last snapshot redolog file, offset and timestamp, lastRecord
func (*SnapshotManager) StartSnapshot ¶
StartSnapshot returns current redo log file ,offset
type SnapshotStage ¶
type SnapshotStage string
SnapshotStage represents different stages of a running snapshot job.
const ( SnapshotSnapshot SnapshotStage = "snapshot" SnapshotCleanup SnapshotStage = "cleanup" SnapshotComplete SnapshotStage = "complete" )
List of SnapshotStages
type TableShard ¶
type TableShard struct { // Wait group used to prevent the stores from being prematurely deleted. Users sync.WaitGroup `json:"-"` ShardID int `json:"-"` // For convenience, reference to the table schema struct. Schema *common.TableSchema `json:"schema"` // Live store. Its locks also cover the primary key. LiveStore *LiveStore `json:"liveStore"` // Archive store. ArchiveStore *ArchiveStore `json:"archiveStore"` // For convenience. HostMemoryManager common.HostMemoryManager `json:"-"` BootstrapState bootstrap.BootstrapState `json:"BootstrapState"` // BootstrapDetails shows the details of bootstrap BootstrapDetails bootstrap.BootstrapDetails `json:"bootstrapDetails,omitempty"` // contains filtered or unexported fields }
TableShard stores the data for one table shard in memory.
func NewTableShard ¶
func NewTableShard(schema *common.TableSchema, metaStore metaCom.MetaStore, diskStore diskstore.DiskStore, hostMemoryManager common.HostMemoryManager, shard int, options Options) *TableShard
NewTableShard creates and initiates a table shard based on the schema.
func (*TableShard) ApplyUpsertBatch ¶
func (shard *TableShard) ApplyUpsertBatch(upsertBatch *common.UpsertBatch, redoLogFile int64, offset uint32, skipBackfillRows bool) (bool, error)
ApplyUpsertBatch applies the upsert batch to the memstore shard. Returns true if caller needs to wait for availability of backfill buffer
func (*TableShard) Bootstrap ¶ added in v0.0.2
func (shard *TableShard) Bootstrap( peerSource client.PeerSource, origin string, topo topology.Topology, topoState *topology.StateSnapshot, options bootstrap.Options, ) error
Bootstrap executes bootstrap for table shard
func (*TableShard) DeleteColumn ¶
func (shard *TableShard) DeleteColumn(columnID int) error
DeleteColumn deletes the data for the specified column.
func (*TableShard) Destruct ¶
func (shard *TableShard) Destruct()
Destruct destructs the table shard. Caller must detach the shard from memstore first.
func (*TableShard) IsBootstrapped ¶ added in v0.0.2
func (shard *TableShard) IsBootstrapped() bool
IsBootstrapped returns whether this table shard is bootstrapped.
func (*TableShard) IsDiskDataAvailable ¶ added in v0.0.2
func (shard *TableShard) IsDiskDataAvailable() bool
IsDiskDataAvailable returns whether the data is available on disk for table shard
func (*TableShard) LoadMetaData ¶
func (shard *TableShard) LoadMetaData() error
LoadMetaData loads metadata for the table Shard from metastore.
func (*TableShard) LoadSnapshot ¶
func (shard *TableShard) LoadSnapshot() error
LoadSnapshot load shard data from snapshot files
func (*TableShard) NewRedoLogBrowser ¶
func (shard *TableShard) NewRedoLogBrowser() RedoLogBrowser
NewRedoLogBrowser creates a RedoLogBrowser using field from Shard.
func (*TableShard) PlayRedoLog ¶ added in v0.0.2
func (shard *TableShard) PlayRedoLog()
PlayRedoLog loads data for the table Shard from disk store and recovers the Shard for serving.
func (*TableShard) PreloadColumn ¶
func (shard *TableShard) PreloadColumn(columnID int, startDay int, endDay int)
PreloadColumn loads the column into memory and wait for completion of loading within (startDay, endDay]. Note endDay is inclusive but startDay is exclusive.
type TableShardMemoryUsage ¶
type TableShardMemoryUsage struct { ColumnMemory map[string]*common.ColumnMemoryUsage `json:"cols"` PrimaryKeyMemory uint `json:"pk"` }
TableShardMemoryUsage contains memory usage for column memory and primary key memory usage
type TestFactoryT ¶
type TestFactoryT struct {
tests.TestFactoryBase
}
TestFactoryT creates memstore test objects from text file
func GetFactory ¶ added in v0.0.2
func GetFactory() TestFactoryT
func (TestFactoryT) NewMockMemStore ¶
func (t TestFactoryT) NewMockMemStore() *memStoreImpl
NewMockMemStore returns a new memstore with mocked diskstore and metastore.
type TransferableVectorParty ¶
type TransferableVectorParty interface { // GetHostVectorPartySlice slice vector party between [startIndex, startIndex+length) before transfer to gpu GetHostVectorPartySlice(startIndex, length int) common.HostVectorPartySlice }
TransferableVectorParty is vector party that can be transferred to gpu for processing
Source Files ¶
- archive_store.go
- archive_vector_party.go
- archiving.go
- backfill.go
- backfill_manager.go
- batchstats.go
- bootstrap.go
- cuckoo_index.go
- host_memory_manager.go
- ingestion.go
- job_manager.go
- job_status.go
- live_store.go
- live_vector_party.go
- memstore.go
- merge.go
- options.go
- purge.go
- purge_manager.go
- recovery.go
- redo_log_browser.go
- scheduler.go
- schema.go
- snapshot.go
- snapshot_manager.go
- table_shard.go
- test_factory.go
- vector_party.go