Documentation ¶
Index ¶
- Constants
- func FullPathToWALKey(rootPath, fullPath string) (keyPath string)
- func GetTimeFromTicks(intervalStart uint64, intervalsPerDay, intervalTicks uint32) (sec uint64, nanosec uint32)
- func NewByIntervalTicks(buffer []byte, numWords, recordLength int) sort.Interface
- func NewDeleter(pr *planner.ParseResult) (de *deleter, err error)
- func NewIOPlan(fl SortedFileList, pr *planner.ParseResult) (iop *ioplan, err error)
- func ParseTGData(TG_Serialized []byte, rootPath string) (TGID int64, wtSets []wal.WTSet)
- func RewriteBuffer(buffer []byte, varRecLen, numVarRecords uint32, intervalsPerDay uint32, ...) []byte
- func TimeOfVariableRecord(buf []byte, cursor int, rowLength int) time.Time
- func WriteBufferToFile(fp stdio.WriterAt, buffer wal.OffsetIndexBuffer) error
- func WriteBufferToFileIndirect(fp *os.File, buffer wal.OffsetIndexBuffer, varRecLen int) (err error)
- func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error)
- type ByIntervalTicks
- type CacheEntryAlreadyOpenError
- type CacheImmutableError
- type CachedFP
- type DestEnum
- type ErrorWriter
- type IndirectRecordInfo
- type InstanceMetadata
- type MIDEnum
- type NotOpenError
- type Reader
- type RecordLengthNotConsistent
- type ReplicationSender
- type SingleTargetRequiredForWriter
- type SortedFileList
- type TGIDlist
- type TransactionGroup
- type TransactionPipe
- type TriggerPluginDispatcher
- type TxnStatusEnum
- type WALCleaner
- type WALCreateError
- type WALFileType
- func (wf *WALFileType) CanWrite(msg string, callersInstanceID int64) (bool, error)
- func (wf *WALFileType) CreateCheckpoint() error
- func (wf *WALFileType) Delete(callersInstanceID int64) (err error)
- func (wf *WALFileType) FinishAndWait()
- func (wf *WALFileType) FlushCommandsToWAL(writeCommands []*wal.WriteCommand) (err error)
- func (wf *WALFileType) FlushToWAL() (err error)
- func (wf *WALFileType) IsOpen() bool
- func (wf *WALFileType) NeedsReplay() (bool, error)
- func (wf *WALFileType) QueueWriteCommand(wc *wal.WriteCommand)
- func (wf *WALFileType) Replay(dryRun bool) error
- func (wf *WALFileType) RequestFlush()
- func (wf *WALFileType) SyncWAL(WALRefresh, PrimaryRefresh time.Duration, walRotateInterval int)
- func (wf *WALFileType) WriteCommand(rt io.EnumRecordType, tbiAbsPath string, varRecLen int, offset, index int64, ...) *wal.WriteCommand
- func (wf *WALFileType) WriteStatus(FileStatus wal.FileStatusEnum, ReplayState wal.ReplayStateEnum)
- func (wf *WALFileType) WriteTransactionInfo(tid int64, did DestEnum, txnStatus TxnStatusEnum)
- type WALTakeOverError
- type WALWriteError
- type Writer
- type WrongSizeError
Constants ¶
const RecordsPerRead = 8192
const WriteChannelCommandDepth = 1000000
NOTE: Access to the TransactionPipe structures is single threaded with the exception of the CommandChannel. Modification of the cache contents is performed by de-queueing commands from the CommandChannel in the single Cache thread.
Variables ¶
This section is empty.
Functions ¶
func FullPathToWALKey ¶ added in v4.0.1
func GetTimeFromTicks ¶
func GetTimeFromTicks(intervalStart uint64, intervalsPerDay, intervalTicks uint32) (sec uint64, nanosec uint32)
GetTimeFromTicks Takes two time components, the start of the interval and the number of interval ticks to the timestamp and returns an epoch time (seconds) and the number of nanoseconds of fractional time within the last second as a remainder
func NewByIntervalTicks ¶
func NewDeleter ¶
func NewDeleter(pr *planner.ParseResult) (de *deleter, err error)
func NewIOPlan ¶
func NewIOPlan(fl SortedFileList, pr *planner.ParseResult) (iop *ioplan, err error)
func ParseTGData ¶ added in v4.1.0
func RewriteBuffer ¶
func RewriteBuffer(buffer []byte, varRecLen, numVarRecords uint32, intervalsPerDay uint32, intervalStartEpoch uint64) []byte
RewriteBuffer converts variable_length records to the result buffer.
variable records in a file: [Actual Data (VarRecLen-4 byte) , Interval Ticks(4 byte) ] rewriteBuffer converts the binary data to [EpochSecond(8 byte), Actual Data(VarRecLen-4 byte), Nanoseconds(4 byte) ] format.
buffer +-----------------------VarRecLen [byte]---+-----------------------+ + Actual Data(Ask,Bid, etc.) | IntevalTicks(4byte) | +------------------------------------------+------------------------+
↓ rewriteBuffer
rbTemp (= temporary result buffer) +--------------------+--VarRecLen + 8 [byte]-----+-------------------+ + EpochSecond(8byte) | Actual Data(Ask,Bid, etc) | Nanosecond(4byte) | +--------------------+----------------------------+------------------+
func TimeOfVariableRecord ¶
func WriteBufferToFile ¶
func WriteBufferToFile(fp stdio.WriterAt, buffer wal.OffsetIndexBuffer) error
func WriteCSM ¶
func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error)
WriteCSM writes ColumnSeriesMap (csm) to each destination file, and flush it to the disk, isVariableLength is set to true if the record content is variable-length type. WriteCSM also verifies the DataShapeVector of the incoming ColumnSeriesMap matches the on-disk DataShapeVector defined by the file header. WriteCSM will create any files if they do not already exist for the given ColumnSeriesMap based on its TimeBucketKey.
Types ¶
type ByIntervalTicks ¶
type ByIntervalTicks struct {
// contains filtered or unexported fields
}
ByIntervalTicks implements a custom sort for Variable length record. Sort by the last 4 byte (= intervalTicks) of each record.
func (*ByIntervalTicks) Len ¶
func (ei *ByIntervalTicks) Len() int
func (*ByIntervalTicks) Less ¶
func (ei *ByIntervalTicks) Less(i, j int) bool
Less reports whether the element with index i should sort before the element with index j.
func (*ByIntervalTicks) Swap ¶
func (ei *ByIntervalTicks) Swap(i, j int)
Swap swaps the elements with indexes i and j.
type CacheEntryAlreadyOpenError ¶
type CacheEntryAlreadyOpenError string
WAL Messages
func (CacheEntryAlreadyOpenError) Error ¶
func (msg CacheEntryAlreadyOpenError) Error() string
type CacheImmutableError ¶
type CacheImmutableError string
func (CacheImmutableError) Error ¶
func (msg CacheImmutableError) Error() string
type CachedFP ¶
type CachedFP struct {
// contains filtered or unexported fields
}
func NewCachedFP ¶
func NewCachedFP() *CachedFP
type ErrorWriter ¶ added in v4.1.2
type ErrorWriter struct{}
func (*ErrorWriter) WriteCSM ¶ added in v4.1.2
func (w *ErrorWriter) WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) error
type IndirectRecordInfo ¶
type IndirectRecordInfo struct {
Index, Offset, Len int64
}
type InstanceMetadata ¶
type InstanceMetadata struct { CatalogDir *catalog.Directory WALFile *WALFileType }
var ThisInstance *InstanceMetadata
func NewInstanceSetup ¶
func NewInstanceSetup(relRootDir string, rs ReplicationSender, tm []*trigger.TriggerMatcher, walRotateInterval int, options ...bool, ) (metadata *InstanceMetadata, shutdownPending *bool, walWG *sync.WaitGroup, err error)
type NotOpenError ¶
type NotOpenError string
func (NotOpenError) Error ¶
func (msg NotOpenError) Error() string
type Reader ¶ added in v4.0.1
type Reader struct { IOPMap map[TimeBucketKey]*ioplan // contains filtered or unexported fields }
type RecordLengthNotConsistent ¶
type RecordLengthNotConsistent string
func (RecordLengthNotConsistent) Error ¶
func (msg RecordLengthNotConsistent) Error() string
type ReplicationSender ¶ added in v4.1.0
type ReplicationSender interface {
Send(transactionGroup []byte)
}
type SingleTargetRequiredForWriter ¶
type SingleTargetRequiredForWriter string
func (SingleTargetRequiredForWriter) Error ¶
func (msg SingleTargetRequiredForWriter) Error() string
type SortedFileList ¶
type SortedFileList []planner.QualifiedFile
func (SortedFileList) Len ¶
func (fl SortedFileList) Len() int
func (SortedFileList) Less ¶
func (fl SortedFileList) Less(i, j int) bool
func (SortedFileList) Swap ¶
func (fl SortedFileList) Swap(i, j int)
type TransactionGroup ¶ added in v4.1.0
type TransactionPipe ¶
type TransactionPipe struct {
// contains filtered or unexported fields
}
TransactionPipe stores the contents of the current pending Transaction Group and writes it to WAL when flush() is called
func NewTransactionPipe ¶
func NewTransactionPipe() *TransactionPipe
NewTransactionPipe creates a new transaction pipe that channels all of the write transactions to the WAL and primary writers
func (*TransactionPipe) IncrementTGID ¶ added in v4.0.2
func (tgc *TransactionPipe) IncrementTGID() int64
IncrementTGID increments the transaction group ID and returns the new value
func (*TransactionPipe) TGID ¶
func (tgc *TransactionPipe) TGID() int64
TGID returns the latest transaction group ID
type TriggerPluginDispatcher ¶ added in v4.1.2
type TriggerPluginDispatcher struct {
// contains filtered or unexported fields
}
func NewTriggerPluginDispatcher ¶ added in v4.1.2
func NewTriggerPluginDispatcher(triggerMatchers []*trigger.TriggerMatcher) *TriggerPluginDispatcher
func (*TriggerPluginDispatcher) AppendRecord ¶ added in v4.1.2
func (tpd *TriggerPluginDispatcher) AppendRecord(keyPath string, record []byte)
AppendRecord collects the record from the serialized buffer.
func (*TriggerPluginDispatcher) DispatchRecords ¶ added in v4.1.2
func (tpd *TriggerPluginDispatcher) DispatchRecords()
DispatchRecords iterates over the registered triggers and fire the event if the file path matches the condition. This is meant to be run in a separate goroutine and recovers from panics in the triggers.
type TxnStatusEnum ¶
type TxnStatusEnum int8
--- Status
const ( PREPARING TxnStatusEnum = iota COMMITINTENDED COMMITCOMPLETE )
type WALCleaner ¶ added in v4.1.7
type WALCleaner struct {
// contains filtered or unexported fields
}
func NewWALCleaner ¶ added in v4.1.7
func NewWALCleaner(ignoreFile string, myInstanceID int64) *WALCleaner
func (*WALCleaner) CleanupOldWALFiles ¶ added in v4.1.7
func (c *WALCleaner) CleanupOldWALFiles(walfileAbsPaths []string) error
type WALCreateError ¶
type WALCreateError string
func (WALCreateError) Error ¶
func (msg WALCreateError) Error() string
type WALFileType ¶
type WALFileType struct { // These three fields plus the MID form the WAL Header, written at the beginning of the WAL File FileStatus wal.FileStatusEnum ReplayState wal.ReplayStateEnum OwningInstanceID int64 FilePtr *os.File // Active file pointer to FileName ReplicationSender ReplicationSender // send messages to replica servers // contains filtered or unexported fields }
func NewWALFile ¶
func NewWALFile(rootDir string, owningInstanceID int64, rs ReplicationSender, walBypass bool, shutdownPending *bool, walWaitGroup *sync.WaitGroup, tpd *TriggerPluginDispatcher, txnPipe *TransactionPipe) (wf *WALFileType, err error)
func TakeOverWALFile ¶ added in v4.0.1
func TakeOverWALFile(filePath string) (wf *WALFileType, err error)
TakeOverWALFile opens an existing wal file and returns WALFileType for it.
func (*WALFileType) CanWrite ¶
func (wf *WALFileType) CanWrite(msg string, callersInstanceID int64) (bool, error)
func (*WALFileType) CreateCheckpoint ¶ added in v4.0.1
func (wf *WALFileType) CreateCheckpoint() error
CreateCheckpoint flushes all primary dirty pages to disk, and so closes out the previous WAL state to end. Note, this is not goroutine-safe with FlushToWAL and caller should make sure it is streamlined.
func (*WALFileType) Delete ¶
func (wf *WALFileType) Delete(callersInstanceID int64) (err error)
func (*WALFileType) FinishAndWait ¶ added in v4.1.2
func (wf *WALFileType) FinishAndWait()
FinishAndWait closes the writtenIndexes channel, and waits for the remaining triggers to fire, returning
func (*WALFileType) FlushCommandsToWAL ¶ added in v4.1.0
func (wf *WALFileType) FlushCommandsToWAL(writeCommands []*wal.WriteCommand) (err error)
func (*WALFileType) FlushToWAL ¶ added in v4.0.1
func (wf *WALFileType) FlushToWAL() (err error)
A.k.a. Commit transaction
func (*WALFileType) IsOpen ¶
func (wf *WALFileType) IsOpen() bool
func (*WALFileType) NeedsReplay ¶
func (wf *WALFileType) NeedsReplay() (bool, error)
func (*WALFileType) QueueWriteCommand ¶ added in v4.1.2
func (wf *WALFileType) QueueWriteCommand(wc *wal.WriteCommand)
func (*WALFileType) Replay ¶
func (wf *WALFileType) Replay(dryRun bool) error
Replay loads this WAL File's unwritten transactions to primary store and mark it completely processed. We will do this in two passes, in the first pass we will collect the Transaction Group IDs that are not yet durably written to the primary store. In the second pass, we write the data into the Primary Store directly and then flush the results. Finally we close the WAL File and mark it completely written.
1) First WAL Pass: Locate unwritten TGIDs 2) Second WAL Pass: Load the open TG data into the Primary Data files 3) Flush the TG Cache to primary and mark this WAL File completely processed
Note that the TG Data for any given TGID should appear in the WAL only once. We verify it in the first pass.
func (*WALFileType) RequestFlush ¶
func (wf *WALFileType) RequestFlush()
RequestFlush requests WAL Flush to the WAL writer goroutine if it exists, or just does the work in the same goroutine otherwise. The function blocks if there are no current queued flushes, and returns if there is already one queued which will handle the data present in the write channel, as it will flush as soon as possible.
func (*WALFileType) SyncWAL ¶
func (wf *WALFileType) SyncWAL(WALRefresh, PrimaryRefresh time.Duration, walRotateInterval int)
func (*WALFileType) WriteCommand ¶ added in v4.1.2
func (wf *WALFileType) WriteCommand(rt io.EnumRecordType, tbiAbsPath string, varRecLen int, offset, index int64, data []byte, ds []io.DataShape, ) *wal.WriteCommand
func (*WALFileType) WriteStatus ¶
func (wf *WALFileType) WriteStatus(FileStatus wal.FileStatusEnum, ReplayState wal.ReplayStateEnum)
func (*WALFileType) WriteTransactionInfo ¶
func (wf *WALFileType) WriteTransactionInfo(tid int64, did DestEnum, txnStatus TxnStatusEnum)
type WALTakeOverError ¶
type WALTakeOverError string
func (WALTakeOverError) Error ¶
func (msg WALTakeOverError) Error() string
type WALWriteError ¶
type WALWriteError string
func (WALWriteError) Error ¶
func (msg WALWriteError) Error() string
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is produced that complies with the parsed query results, including a possible date range restriction. If there is a date range restriction, the write() routine should produce an error when an out-of-bounds write is tried.
func NewWriter ¶
func NewWriter(rootCatDir *catalog.Directory, walFile *WALFileType) (*Writer, error)
func (*Writer) WriteCSM ¶ added in v4.1.2
func (w *Writer) WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) error
WriteCSM has the same logic as the executor.WriteCSM function. In order to improve testability, use this function instead of the static WriteCSM function.
func (*Writer) WriteRecords ¶
func (w *Writer) WriteRecords(ts []time.Time, data []byte, dsWithEpoch []DataShape, tbi *io.TimeBucketInfo) error
WriteRecords creates a WriteCommand from the supplied timestamp and data buffer, and sends it over the write channel to be flushed to disk in the WAL sync subroutine. The caller should assume that by calling WriteRecords directly, the data will be written to the file regardless if it satisfies the on-disk data shape, possible corrupting the data files. It is recommended to call WriteCSM() for any writes as it is safer.
type WrongSizeError ¶
type WrongSizeError string
func (WrongSizeError) Error ¶
func (msg WrongSizeError) Error() string
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
package buffile helps batch write by writes to the tempobary in-memory buffer under the assumption that many writes come in to the part of single file frequently.
|
package buffile helps batch write by writes to the tempobary in-memory buffer under the assumption that many writes come in to the part of single file frequently. |