executor

package
v4.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2021 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const RecordsPerRead = 8192
View Source
const WriteChannelCommandDepth = 1000000

NOTE: Access to the TransactionPipe structures is single threaded with the exception of the CommandChannel. Modification of the cache contents is performed by de-queueing commands from the CommandChannel in the single Cache thread.

Variables

This section is empty.

Functions

func FullPathToWALKey added in v4.0.1

func FullPathToWALKey(rootPath, fullPath string) (keyPath string)

func GetTimeFromTicks

func GetTimeFromTicks(intervalStart uint64, intervalsPerDay, intervalTicks uint32) (sec uint64, nanosec uint32)

GetTimeFromTicks Takes two time components, the start of the interval and the number of interval ticks to the timestamp and returns an epoch time (seconds) and the number of nanoseconds of fractional time within the last second as a remainder

func NewByIntervalTicks

func NewByIntervalTicks(buffer []byte, numWords, recordLength int) sort.Interface

func NewDeleter

func NewDeleter(pr *planner.ParseResult) (de *deleter, err error)

func NewIOPlan

func NewIOPlan(fl SortedFileList, pr *planner.ParseResult) (iop *ioplan, err error)

func ParseTGData added in v4.1.0

func ParseTGData(TG_Serialized []byte, rootPath string) (TGID int64, wtSets []wal.WTSet)

func RewriteBuffer

func RewriteBuffer(buffer []byte, varRecLen, numVarRecords uint32, intervalsPerDay uint32, intervalStartEpoch uint64) []byte

RewriteBuffer converts variable_length records to the result buffer.

variable records in a file: [Actual Data (VarRecLen-4 byte) , Interval Ticks(4 byte) ] rewriteBuffer converts the binary data to [EpochSecond(8 byte), Actual Data(VarRecLen-4 byte), Nanoseconds(4 byte) ] format.

buffer +-----------------------VarRecLen [byte]---+-----------------------+ + Actual Data(Ask,Bid, etc.) | IntevalTicks(4byte) | +------------------------------------------+------------------------+

↓ rewriteBuffer

rbTemp (= temporary result buffer) +--------------------+--VarRecLen + 8 [byte]-----+-------------------+ + EpochSecond(8byte) | Actual Data(Ask,Bid, etc) | Nanosecond(4byte) | +--------------------+----------------------------+------------------+

func TimeOfVariableRecord

func TimeOfVariableRecord(buf []byte, cursor int, rowLength int) time.Time

func WriteBufferToFile

func WriteBufferToFile(fp stdio.WriterAt, buffer wal.OffsetIndexBuffer) error

func WriteBufferToFileIndirect

func WriteBufferToFileIndirect(fp *os.File, buffer wal.OffsetIndexBuffer, varRecLen int,
) (err error)

func WriteCSM

func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error)

WriteCSM writes ColumnSeriesMap (csm) to each destination file, and flush it to the disk, isVariableLength is set to true if the record content is variable-length type. WriteCSM also verifies the DataShapeVector of the incoming ColumnSeriesMap matches the on-disk DataShapeVector defined by the file header. WriteCSM will create any files if they do not already exist for the given ColumnSeriesMap based on its TimeBucketKey.

Types

type ByIntervalTicks

type ByIntervalTicks struct {
	// contains filtered or unexported fields
}

ByIntervalTicks implements a custom sort for Variable length record. Sort by the last 4 byte (= intervalTicks) of each record.

func (*ByIntervalTicks) Len

func (ei *ByIntervalTicks) Len() int

func (*ByIntervalTicks) Less

func (ei *ByIntervalTicks) Less(i, j int) bool

Less reports whether the element with index i should sort before the element with index j.

func (*ByIntervalTicks) Swap

func (ei *ByIntervalTicks) Swap(i, j int)

Swap swaps the elements with indexes i and j.

type CacheEntryAlreadyOpenError

type CacheEntryAlreadyOpenError string

WAL Messages

func (CacheEntryAlreadyOpenError) Error

func (msg CacheEntryAlreadyOpenError) Error() string

type CacheImmutableError

type CacheImmutableError string

func (CacheImmutableError) Error

func (msg CacheImmutableError) Error() string

type CachedFP

type CachedFP struct {
	// contains filtered or unexported fields
}

func NewCachedFP

func NewCachedFP() *CachedFP

func (*CachedFP) Close

func (cfp *CachedFP) Close() error

func (*CachedFP) GetFP

func (cfp *CachedFP) GetFP(fileName string) (fp *os.File, err error)

type DestEnum

type DestEnum int8

--- Destination ID

const (
	WAL DestEnum = iota
	CHECKPOINT
)

type ErrorWriter added in v4.1.2

type ErrorWriter struct{}

func (*ErrorWriter) WriteCSM added in v4.1.2

func (w *ErrorWriter) WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) error

type IndirectRecordInfo

type IndirectRecordInfo struct {
	Index, Offset, Len int64
}

type InstanceMetadata

type InstanceMetadata struct {
	CatalogDir *catalog.Directory
	WALFile    *WALFileType
}
var ThisInstance *InstanceMetadata

func NewInstanceSetup

func NewInstanceSetup(relRootDir string, rs ReplicationSender, tm []*trigger.TriggerMatcher,
	walRotateInterval int, options ...bool,
) (metadata *InstanceMetadata, shutdownPending *bool, walWG *sync.WaitGroup)

type MIDEnum

type MIDEnum int8

Message Types for WAL Messages --- Message ID

const (
	TGDATA MIDEnum = iota
	TXNINFO
	STATUS
)

type NotOpenError

type NotOpenError string

func (NotOpenError) Error

func (msg NotOpenError) Error() string

type Reader added in v4.0.1

type Reader struct {
	IOPMap map[TimeBucketKey]*ioplan
	// contains filtered or unexported fields
}

func NewReader

func NewReader(pr *planner.ParseResult) (r *Reader, err error)

func (*Reader) Read added in v4.0.1

func (r *Reader) Read() (csm ColumnSeriesMap, err error)

type RecordLengthNotConsistent

type RecordLengthNotConsistent string

func (RecordLengthNotConsistent) Error

func (msg RecordLengthNotConsistent) Error() string

type ReplicationSender added in v4.1.0

type ReplicationSender interface {
	Send(transactionGroup []byte)
}

type SingleTargetRequiredForWriter

type SingleTargetRequiredForWriter string

func (SingleTargetRequiredForWriter) Error

type SortedFileList

type SortedFileList []planner.QualifiedFile

func (SortedFileList) Len

func (fl SortedFileList) Len() int

func (SortedFileList) Less

func (fl SortedFileList) Less(i, j int) bool

func (SortedFileList) Swap

func (fl SortedFileList) Swap(i, j int)

type TGIDlist

type TGIDlist []int64

func (TGIDlist) Len

func (tgl TGIDlist) Len() int

func (TGIDlist) Less

func (tgl TGIDlist) Less(i, j int) bool

func (TGIDlist) Swap

func (tgl TGIDlist) Swap(i, j int)

type TransactionGroup added in v4.1.0

type TransactionGroup struct {
	// A "locally unique" transaction group identifier, can be a clock value
	ID int64
	//The contents of the WTSets
	WTGroup []wal.WTSet
	//MD5 checksum of the TG contents prior to the checksum
	Checksum [16]byte
}

type TransactionPipe

type TransactionPipe struct {
	// contains filtered or unexported fields
}

TransactionPipe stores the contents of the current pending Transaction Group and writes it to WAL when flush() is called

func NewTransactionPipe

func NewTransactionPipe() *TransactionPipe

NewTransactionPipe creates a new transaction pipe that channels all of the write transactions to the WAL and primary writers

func (*TransactionPipe) IncrementTGID added in v4.0.2

func (tgc *TransactionPipe) IncrementTGID() int64

IncrementTGID increments the transaction group ID and returns the new value

func (*TransactionPipe) TGID

func (tgc *TransactionPipe) TGID() int64

TGID returns the latest transaction group ID

type TriggerPluginDispatcher added in v4.1.2

type TriggerPluginDispatcher struct {
	// contains filtered or unexported fields
}

func NewTriggerPluginDispatcher added in v4.1.2

func NewTriggerPluginDispatcher(triggerMatchers []*trigger.TriggerMatcher) *TriggerPluginDispatcher

func (*TriggerPluginDispatcher) AppendRecord added in v4.1.2

func (tpd *TriggerPluginDispatcher) AppendRecord(keyPath string, record []byte)

AppendRecord collects the record from the serialized buffer.

func (*TriggerPluginDispatcher) DispatchRecords added in v4.1.2

func (tpd *TriggerPluginDispatcher) DispatchRecords()

DispatchRecords iterates over the registered triggers and fire the event if the file path matches the condition. This is meant to be run in a separate goroutine and recovers from panics in the triggers.

type TxnStatusEnum

type TxnStatusEnum int8

--- Status

const (
	PREPARING TxnStatusEnum = iota
	COMMITINTENDED
	COMMITCOMPLETE
)

type WALCreateError

type WALCreateError string

func (WALCreateError) Error

func (msg WALCreateError) Error() string

type WALFileType

type WALFileType struct {
	// These three fields plus the MID form the WAL Header, written at the beginning of the WAL File
	FileStatus       wal.FileStatusEnum
	ReplayState      wal.ReplayStateEnum
	OwningInstanceID int64

	FilePtr *os.File // Active file pointer to FileName

	ReplicationSender ReplicationSender // send messages to replica servers
	// contains filtered or unexported fields
}

func NewWALFile

func NewWALFile(rootDir string, owningInstanceID int64, rs ReplicationSender,
	walBypass bool, shutdownPending *bool, walWaitGroup *sync.WaitGroup, tpd *TriggerPluginDispatcher,
	txnPipe *TransactionPipe) (wf *WALFileType, err error)

func TakeOverWALFile added in v4.0.1

func TakeOverWALFile(rootDir, fileName string) (wf *WALFileType, err error)

TakeOverWALFile opens an existing wal file and returns WALFileType for it.

func (*WALFileType) CanWrite

func (wf *WALFileType) CanWrite(msg string, callersInstanceID int64) bool

func (*WALFileType) CreateCheckpoint added in v4.0.1

func (wf *WALFileType) CreateCheckpoint() error

CreateCheckpoint flushes all primary dirty pages to disk, and so closes out the previous WAL state to end. Note, this is not goroutine-safe with FlushToWAL and caller should make sure it is streamlined.

func (*WALFileType) Delete

func (wf *WALFileType) Delete(callersInstanceID int64) (err error)

func (*WALFileType) FinishAndWait added in v4.1.2

func (wf *WALFileType) FinishAndWait()

FinishAndWait closes the writtenIndexes channel, and waits for the remaining triggers to fire, returning

func (*WALFileType) FlushCommandsToWAL added in v4.1.0

func (wf *WALFileType) FlushCommandsToWAL(writeCommands []*wal.WriteCommand) (err error)

func (*WALFileType) FlushToWAL added in v4.0.1

func (wf *WALFileType) FlushToWAL() (err error)

A.k.a. Commit transaction

func (*WALFileType) IsOpen

func (wf *WALFileType) IsOpen() bool

func (*WALFileType) NeedsReplay

func (wf *WALFileType) NeedsReplay() bool

func (*WALFileType) QueueWriteCommand added in v4.1.2

func (wf *WALFileType) QueueWriteCommand(wc *wal.WriteCommand)

func (*WALFileType) Replay

func (wf *WALFileType) Replay(writeData bool) error

func (*WALFileType) RequestFlush

func (wf *WALFileType) RequestFlush()

RequestFlush requests WAL Flush to the WAL writer goroutine if it exists, or just does the work in the same goroutine otherwise. The function blocks if there are no current queued flushes, and returns if there is already one queued which will handle the data present in the write channel, as it will flush as soon as possible.

func (*WALFileType) SyncWAL

func (wf *WALFileType) SyncWAL(WALRefresh, PrimaryRefresh time.Duration, walRotateInterval int)

func (*WALFileType) WriteCommand added in v4.1.2

func (wf *WALFileType) WriteCommand(rt io.EnumRecordType, tbiAbsPath string, varRecLen int, offset, index int64,
	data []byte, ds []io.DataShape,
) *wal.WriteCommand

func (*WALFileType) WriteStatus

func (wf *WALFileType) WriteStatus(FileStatus wal.FileStatusEnum, ReplayState wal.ReplayStateEnum)

func (*WALFileType) WriteTransactionInfo

func (wf *WALFileType) WriteTransactionInfo(tid int64, did DestEnum, txnStatus TxnStatusEnum)

type WALTakeOverError

type WALTakeOverError string

func (WALTakeOverError) Error

func (msg WALTakeOverError) Error() string

type WALWriteError

type WALWriteError string

func (WALWriteError) Error

func (msg WALWriteError) Error() string

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is produced that complies with the parsed query results, including a possible date range restriction. If there is a date range restriction, the write() routine should produce an error when an out-of-bounds write is tried.

func NewWriter

func NewWriter(rootCatDir *catalog.Directory, walFile *WALFileType) (*Writer, error)

func (*Writer) WriteCSM added in v4.1.2

func (w *Writer) WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) error

WriteCSM has the same logic as the executor.WriteCSM function. In order to improve testability, use this function instead of the static WriteCSM function.

func (*Writer) WriteRecords

func (w *Writer) WriteRecords(ts []time.Time, data []byte, dsWithEpoch []DataShape, tbi *io.TimeBucketInfo) error

WriteRecords creates a WriteCommand from the supplied timestamp and data buffer, and sends it over the write channel to be flushed to disk in the WAL sync subroutine. The caller should assume that by calling WriteRecords directly, the data will be written to the file regardless if it satisfies the on-disk data shape, possible corrupting the data files. It is recommended to call WriteCSM() for any writes as it is safer.

type WrongSizeError

type WrongSizeError string

func (WrongSizeError) Error

func (msg WrongSizeError) Error() string

Directories

Path Synopsis
package buffile helps batch write by writes to the tempobary in-memory buffer under the assumption that many writes come in to the part of single file frequently.
package buffile helps batch write by writes to the tempobary in-memory buffer under the assumption that many writes come in to the part of single file frequently.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL