Documentation ¶
Index ¶
- Constants
- func AppendIntervalTicks(buf []byte, t time.Time, index, intervalsPerDay int64) (outBuf []byte)
- func FinishAndWait()
- func FullPathToWALKey(rootPath, fullPath string) (keyPath string)
- func GetTimeFromTicks(intervalStart uint64, intervalsPerDay, intervalTicks uint32) (sec uint64, nanosec uint32)
- func NewByIntervalTicks(buffer []byte, numWords, recordLength int) sort.Interface
- func NewDeleter(pr *planner.ParseResult) (de *deleter, err error)
- func NewIOPlan(fl SortedFileList, pr *planner.ParseResult) (iop *ioplan, err error)
- func ParseTGData(TG_Serialized []byte, rootPath string) (TGID int64, wtSets []wal.WTSet)
- func RewriteBuffer(buffer []byte, varRecLen, numVarRecords uint32, intervalsPerDay uint32, ...) []byte
- func StartupCacheAndWAL(rootDir string, owningInstanceID int64, rs ReplicationSender, walBypass bool, ...) (tgc *TransactionPipe, wf *WALFileType, err error)
- func TimeOfVariableRecord(buf []byte, cursor int, rowLength int) time.Time
- func WriteBufferToFile(fp stdio.WriterAt, buffer wal.OffsetIndexBuffer) error
- func WriteBufferToFileIndirect(fp *os.File, buffer wal.OffsetIndexBuffer, varRecLen int) (err error)
- func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error)
- func WriteCSMInner(csm io.ColumnSeriesMap, isVariableLength bool) (err error)
- type ByIntervalTicks
- type CacheEntryAlreadyOpenError
- type CacheImmutableError
- type CachedFP
- type DestEnum
- type IndirectRecordInfo
- type InstanceMetadata
- type MIDEnum
- type NotOpenError
- type Reader
- type RecordLengthNotConsistent
- type ReplicationSender
- type SingleTargetRequiredForWriter
- type SortedFileList
- type TGIDlist
- type TransactionGroup
- type TransactionPipe
- type TxnStatusEnum
- type WALCreateError
- type WALFileType
- func (wf *WALFileType) CanWrite(msg string, callersInstanceID int64) bool
- func (wf *WALFileType) CreateCheckpoint() error
- func (wf *WALFileType) Delete(callersInstanceID int64) (err error)
- func (wf *WALFileType) FlushCommandsToWAL(tgc *TransactionPipe, writeCommands []*wal.WriteCommand) (err error)
- func (wf *WALFileType) FlushToWAL(tgc *TransactionPipe) (err error)
- func (wf *WALFileType) IsOpen() bool
- func (wf *WALFileType) NeedsReplay() bool
- func (wf *WALFileType) Replay(writeData bool) error
- func (wf *WALFileType) RequestFlush()
- func (wf *WALFileType) SyncWAL(WALRefresh, PrimaryRefresh time.Duration, walRotateInterval int)
- func (wf *WALFileType) WriteStatus(FileStatus wal.FileStatusEnum, ReplayState wal.ReplayStateEnum)
- func (wf *WALFileType) WriteTransactionInfo(tid int64, did DestEnum, txnStatus TxnStatusEnum)
- type WALTakeOverError
- type WALWriteError
- type Writer
- type WrongSizeError
Constants ¶
const RecordsPerRead = 8192
const WriteChannelCommandDepth = 1000000
NOTE: Access to the TransactionPipe structures is single threaded with the exception of the CommandChannel. Modification of the cache contents is performed by de-queueing commands from the CommandChannel in the single Cache thread.
Variables ¶
This section is empty.
Functions ¶
func AppendIntervalTicks ¶
func FinishAndWait ¶
func FinishAndWait()
FinishAndWait closes the writtenIndexes channel, and waits for the remaining triggers to fire, returning
func FullPathToWALKey ¶ added in v4.0.1
func GetTimeFromTicks ¶
func GetTimeFromTicks(intervalStart uint64, intervalsPerDay, intervalTicks uint32) (sec uint64, nanosec uint32)
GetTimeFromTicks Takes two time components, the start of the interval and the number of interval ticks to the timestamp and returns an epoch time (seconds) and the number of nanoseconds of fractional time within the last second as a remainder
func NewByIntervalTicks ¶
func NewDeleter ¶
func NewDeleter(pr *planner.ParseResult) (de *deleter, err error)
func NewIOPlan ¶
func NewIOPlan(fl SortedFileList, pr *planner.ParseResult) (iop *ioplan, err error)
func ParseTGData ¶ added in v4.1.0
func RewriteBuffer ¶
func RewriteBuffer(buffer []byte, varRecLen, numVarRecords uint32, intervalsPerDay uint32, intervalStartEpoch uint64) []byte
RewriteBuffer converts variable_length records to the result buffer.
variable records in a file: [Actual Data (VarRecLen-4 byte) , Interval Ticks(4 byte) ] rewriteBuffer converts the binary data to [EpochSecond(8 byte), Actual Data(VarRecLen-4 byte), Nanoseconds(4 byte) ] format.
buffer +-----------------------VarRecLen [byte]---+-----------------------+ + Actual Data(Ask,Bid, etc.) | IntevalTicks(4byte) | +------------------------------------------+------------------------+
↓ rewriteBuffer
rbTemp (= temporary result buffer) +--------------------+--VarRecLen + 8 [byte]-----+-------------------+ + EpochSecond(8byte) | Actual Data(Ask,Bid, etc) | Nanosecond(4byte) | +--------------------+----------------------------+------------------+
func StartupCacheAndWAL ¶
func StartupCacheAndWAL(rootDir string, owningInstanceID int64, rs ReplicationSender, walBypass bool, shutdownPending *bool, walWG *sync.WaitGroup) (tgc *TransactionPipe, wf *WALFileType, err error)
func TimeOfVariableRecord ¶
func WriteBufferToFile ¶
func WriteBufferToFile(fp stdio.WriterAt, buffer wal.OffsetIndexBuffer) error
func WriteCSM ¶
func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error)
WriteCSM writes ColumnSeriesMap (csm) to each destination file, and flush it to the disk, isVariableLength is set to true if the record content is variable-length type. WriteCSM also verifies the DataShapeVector of the incoming ColumnSeriesMap matches the on-disk DataShapeVector defined by the file header. WriteCSM will create any files if they do not already exist for the given ColumnSeriesMap based on its TimeBucketKey.
func WriteCSMInner ¶ added in v4.1.0
func WriteCSMInner(csm io.ColumnSeriesMap, isVariableLength bool) (err error)
Types ¶
type ByIntervalTicks ¶
type ByIntervalTicks struct {
// contains filtered or unexported fields
}
ByIntervalTicks implements a custom sort for Variable length record. Sort by the last 4 byte (= intervalTicks) of each record.
func (*ByIntervalTicks) Len ¶
func (ei *ByIntervalTicks) Len() int
func (*ByIntervalTicks) Less ¶
func (ei *ByIntervalTicks) Less(i, j int) bool
Less reports whether the element with index i should sort before the element with index j.
func (*ByIntervalTicks) Swap ¶
func (ei *ByIntervalTicks) Swap(i, j int)
Swap swaps the elements with indexes i and j.
type CacheEntryAlreadyOpenError ¶
type CacheEntryAlreadyOpenError string
WAL Messages
func (CacheEntryAlreadyOpenError) Error ¶
func (msg CacheEntryAlreadyOpenError) Error() string
type CacheImmutableError ¶
type CacheImmutableError string
func (CacheImmutableError) Error ¶
func (msg CacheImmutableError) Error() string
type CachedFP ¶
type CachedFP struct {
// contains filtered or unexported fields
}
func NewCachedFP ¶
func NewCachedFP() *CachedFP
type IndirectRecordInfo ¶
type IndirectRecordInfo struct {
Index, Offset, Len int64
}
type InstanceMetadata ¶
type InstanceMetadata struct { // RootDir is the absolute path to the data directory RootDir string CatalogDir *catalog.Directory TXNPipe *TransactionPipe WALFile *WALFileType TriggerMatchers []*trigger.TriggerMatcher }
var ThisInstance *InstanceMetadata
func NewInstanceSetup ¶
func NewInstanceSetup(relRootDir string, rs ReplicationSender, walRotateInterval int, options ...bool, ) (metadata *InstanceMetadata, shutdownPending *bool, walWG *sync.WaitGroup)
type NotOpenError ¶
type NotOpenError string
func (NotOpenError) Error ¶
func (msg NotOpenError) Error() string
type Reader ¶ added in v4.0.1
type Reader struct { IOPMap map[TimeBucketKey]*ioplan // contains filtered or unexported fields }
type RecordLengthNotConsistent ¶
type RecordLengthNotConsistent string
func (RecordLengthNotConsistent) Error ¶
func (msg RecordLengthNotConsistent) Error() string
type ReplicationSender ¶ added in v4.1.0
type ReplicationSender interface {
Send(transactionGroup []byte)
}
type SingleTargetRequiredForWriter ¶
type SingleTargetRequiredForWriter string
func (SingleTargetRequiredForWriter) Error ¶
func (msg SingleTargetRequiredForWriter) Error() string
type SortedFileList ¶
type SortedFileList []planner.QualifiedFile
func (SortedFileList) Len ¶
func (fl SortedFileList) Len() int
func (SortedFileList) Less ¶
func (fl SortedFileList) Less(i, j int) bool
func (SortedFileList) Swap ¶
func (fl SortedFileList) Swap(i, j int)
type TransactionGroup ¶ added in v4.1.0
type TransactionPipe ¶
type TransactionPipe struct {
// contains filtered or unexported fields
}
TransactionPipe stores the contents of the current pending Transaction Group and writes it to WAL when flush() is called
func NewTransactionPipe ¶
func NewTransactionPipe() *TransactionPipe
NewTransactionPipe creates a new transaction pipe that channels all of the write transactions to the WAL and primary writers
func (*TransactionPipe) IncrementTGID ¶ added in v4.0.2
func (tgc *TransactionPipe) IncrementTGID() int64
IncrementTGID increments the transaction group ID and returns the new value
func (*TransactionPipe) TGID ¶
func (tgc *TransactionPipe) TGID() int64
TGID returns the latest transaction group ID
type TxnStatusEnum ¶
type TxnStatusEnum int8
--- Status
const ( PREPARING TxnStatusEnum = iota COMMITINTENDED COMMITCOMPLETE )
type WALCreateError ¶
type WALCreateError string
func (WALCreateError) Error ¶
func (msg WALCreateError) Error() string
type WALFileType ¶
type WALFileType struct { // These three fields plus the MID form the WAL Header, written at the beginning of the WAL File FileStatus wal.FileStatusEnum ReplayState wal.ReplayStateEnum OwningInstanceID int64 // End of WAL Header RootPath string // Path to the root directory, base of FileName FilePath string // WAL file full path FilePtr *os.File // Active file pointer to FileName ReplicationSender ReplicationSender // send messages to replica servers // contains filtered or unexported fields }
func NewWALFile ¶
func NewWALFile(rootDir string, owningInstanceID int64, rs ReplicationSender, walBypass bool, shutdownPending *bool, walWaitGroup *sync.WaitGroup, ) (wf *WALFileType, err error)
func TakeOverWALFile ¶ added in v4.0.1
func TakeOverWALFile(rootDir, fileName string) (wf *WALFileType, err error)
TakeOverWALFile opens an existing wal file and returns WALFileType for it.
func (*WALFileType) CanWrite ¶
func (wf *WALFileType) CanWrite(msg string, callersInstanceID int64) bool
func (*WALFileType) CreateCheckpoint ¶ added in v4.0.1
func (wf *WALFileType) CreateCheckpoint() error
CreateCheckpoint flushes all primary dirty pages to disk, and so closes out the previous WAL state to end. Note, this is not goroutine-safe with FlushToWAL and caller should make sure it is streamlined.
func (*WALFileType) Delete ¶
func (wf *WALFileType) Delete(callersInstanceID int64) (err error)
func (*WALFileType) FlushCommandsToWAL ¶ added in v4.1.0
func (wf *WALFileType) FlushCommandsToWAL(tgc *TransactionPipe, writeCommands []*wal.WriteCommand) (err error)
func (*WALFileType) FlushToWAL ¶ added in v4.0.1
func (wf *WALFileType) FlushToWAL(tgc *TransactionPipe) (err error)
A.k.a. Commit transaction
func (*WALFileType) IsOpen ¶
func (wf *WALFileType) IsOpen() bool
func (*WALFileType) NeedsReplay ¶
func (wf *WALFileType) NeedsReplay() bool
func (*WALFileType) Replay ¶
func (wf *WALFileType) Replay(writeData bool) error
func (*WALFileType) RequestFlush ¶
func (wf *WALFileType) RequestFlush()
RequestFlush requests WAL Flush to the WAL writer goroutine if it exists, or just does the work in the same goroutine otherwise. The function blocks if there are no current queued flushes, and returns if there is already one queued which will handle the data present in the write channel, as it will flush as soon as possible.
func (*WALFileType) SyncWAL ¶
func (wf *WALFileType) SyncWAL(WALRefresh, PrimaryRefresh time.Duration, walRotateInterval int)
func (*WALFileType) WriteStatus ¶
func (wf *WALFileType) WriteStatus(FileStatus wal.FileStatusEnum, ReplayState wal.ReplayStateEnum)
func (*WALFileType) WriteTransactionInfo ¶
func (wf *WALFileType) WriteTransactionInfo(tid int64, did DestEnum, txnStatus TxnStatusEnum)
type WALTakeOverError ¶
type WALTakeOverError string
func (WALTakeOverError) Error ¶
func (msg WALTakeOverError) Error() string
type WALWriteError ¶
type WALWriteError string
func (WALWriteError) Error ¶
func (msg WALWriteError) Error() string
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is produced that complies with the parsed query results, including a possible date range restriction. If there is a date range restriction, the write() routine should produce an error when an out-of-bounds write is tried.
func NewWriter ¶
func NewWriter(tbi *io.TimeBucketInfo, tgc *TransactionPipe, rootCatDir *catalog.Directory, walFile *WALFileType, ) (*Writer, error)
func (*Writer) WriteRecords ¶
WriteRecords creates a WriteCommand from the supplied timestamp and data buffer, and sends it over the write channel to be flushed to disk in the WAL sync subroutine. The caller should assume that by calling WriteRecords directly, the data will be written to the file regardless if it satisfies the on-disk data shape, possible corrupting the data files. It is recommended to call WriteCSM() for any writes as it is safer.
type WrongSizeError ¶
type WrongSizeError string
func (WrongSizeError) Error ¶
func (msg WrongSizeError) Error() string
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
package buffile helps batch write by writes to the tempobary in-memory buffer under the assumption that many writes come in to the part of single file frequently.
|
package buffile helps batch write by writes to the tempobary in-memory buffer under the assumption that many writes come in to the part of single file frequently. |