executor

package
v2.2.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2018 License: Apache-2.0 Imports: 26 Imported by: 64

Documentation

Index

Constants

View Source
const RecordsPerRead = 2000
View Source
const WriteChannelCommandDepth = 1000000

NOTE: Access to the TransactionPipe structures is single threaded with the exception of the CommandChannel. Modification of the cache contents is performed by de-queueing commands from the CommandChannel in the single Cache thread.

Variables

This section is empty.

Functions

func AppendIntervalTicks

func AppendIntervalTicks(buf []byte, t time.Time, index, intervalsPerDay int64) (outBuf []byte)

func FinishAndWait

func FinishAndWait()

FinishAndWait closes the writtenIndexes channel, and waits for the remaining triggers to fire, returning

func NewIOPlan

func NewIOPlan(fl SortedFileList, pr *planner.ParseResult) (iop *ioplan, err error)

func NewInstanceSetup

func NewInstanceSetup(relRootDir string, options ...bool)

func NewReader

func NewReader(pr *planner.ParseResult) (r *reader, err error)

func StartupCacheAndWAL

func StartupCacheAndWAL(rootDir string) (tgc *TransactionPipe, wf *WALFileType, err error)

func WriteBufferToFile

func WriteBufferToFile(fp stdio.WriterAt, buffer offsetIndexBuffer) error

func WriteBufferToFileIndirect

func WriteBufferToFileIndirect(fp *os.File, buffer offsetIndexBuffer) (err error)

func WriteCSM

func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error)

WriteCSM writs ColumnSeriesMap csm to each destination file, and flush it to the disk, isVariableLength is set to true if the record content is variable-length type. WriteCSM also verifies the DataShapeVector of the incoming ColumnSeriesMap matches the on-disk DataShapeVector defined by the file header. WriteCSM will create any files if they do not already exist for the given ColumnSeriesMap based on its TimeBucketKey.

Types

type CacheEntryAlreadyOpenError

type CacheEntryAlreadyOpenError string

WAL Messages

func (CacheEntryAlreadyOpenError) Error

func (msg CacheEntryAlreadyOpenError) Error() string

type CacheImmutableError

type CacheImmutableError string

func (CacheImmutableError) Error

func (msg CacheImmutableError) Error() string

type CachedFP

type CachedFP struct {
	// contains filtered or unexported fields
}

func NewCachedFP

func NewCachedFP() *CachedFP

func (*CachedFP) Close

func (cfp *CachedFP) Close() error

func (*CachedFP) GetFP

func (cfp *CachedFP) GetFP(fileName string) (fp *os.File, err error)

type DestEnum

type DestEnum int8

--- Destination ID

const (
	WAL DestEnum = iota
	CHECKPOINT
)

type FileStatusEnum

type FileStatusEnum int8
const (
	Invalid FileStatusEnum = iota
	OPEN
	CLOSED
)

type IndirectRecordInfo

type IndirectRecordInfo struct {
	Index, Offset, Len int64
}

type InstanceMetadata

type InstanceMetadata struct {
	InstanceID      int64
	RootDir         string
	CatalogDir      *catalog.Directory
	TXNPipe         *TransactionPipe
	WALFile         *WALFileType
	WALWg           sync.WaitGroup
	ShutdownPending bool
	WALBypass       bool
	TriggerMatchers []*trigger.TriggerMatcher
}
var ThisInstance *InstanceMetadata

type MIDEnum

type MIDEnum int8

Message Types for WAL Messages --- Message ID

const (
	TGDATA MIDEnum = iota
	TXNINFO
	STATUS
)

type NotOpenError

type NotOpenError string

func (NotOpenError) Error

func (msg NotOpenError) Error() string

type RecordLengthNotConsistent

type RecordLengthNotConsistent string

func (RecordLengthNotConsistent) Error

func (msg RecordLengthNotConsistent) Error() string

type ReplayStateEnum

type ReplayStateEnum int8
const (
	Invalid2 ReplayStateEnum = iota
	NOTREPLAYED
	REPLAYED
	REPLAYINPROCESS
)

type ShortReadError

type ShortReadError string

func (ShortReadError) Error

func (msg ShortReadError) Error() string

type SingleTargetRequiredForWriter

type SingleTargetRequiredForWriter string

func (SingleTargetRequiredForWriter) Error

type SortedFileList

type SortedFileList []planner.QualifiedFile

func (SortedFileList) Len

func (fl SortedFileList) Len() int

func (SortedFileList) Less

func (fl SortedFileList) Less(i, j int) bool

func (SortedFileList) Swap

func (fl SortedFileList) Swap(i, j int)

type TGIDlist

type TGIDlist []int64

func (TGIDlist) Len

func (tgl TGIDlist) Len() int

func (TGIDlist) Less

func (tgl TGIDlist) Less(i, j int) bool

func (TGIDlist) Swap

func (tgl TGIDlist) Swap(i, j int)

type TransactionPipe

type TransactionPipe struct {
	// contains filtered or unexported fields
}

TransactionPipe stores the contents of the current pending Transaction Group and writes it to WAL when flush() is called

func NewTransactionPipe

func NewTransactionPipe() *TransactionPipe

NewTransactionPipe creates a new transaction pipe that channels all of the write transactions to the WAL and primary writers

func (*TransactionPipe) NewTGID

func (tgc *TransactionPipe) NewTGID() int64

NewTGID monotonically increases the transaction group ID using the current unix epoch nanosecond timestamp

func (*TransactionPipe) TGID

func (tgc *TransactionPipe) TGID() int64

TGID returns the latest transaction group ID

type TxnStatusEnum

type TxnStatusEnum int8

--- Status

const (
	PREPARING TxnStatusEnum = iota
	COMMITINTENDED
	COMMITCOMPLETE
)

type WALCreateError

type WALCreateError string

func (WALCreateError) Error

func (msg WALCreateError) Error() string

type WALFileType

type WALFileType struct {
	// These three fields plus the MID form the WAL Header, written at the beginning of the WAL File
	FileStatus       FileStatusEnum
	ReplayState      ReplayStateEnum
	OwningInstanceID int64
	// End of WAL Header
	RootPath string // Path to the root directory, base of FileName
	FilePath string // WAL file full path

	FilePtr *os.File // Active file pointer to FileName
	// contains filtered or unexported fields
}

func NewWALFile

func NewWALFile(rootDir string, existingFilePath string) (wf *WALFileType, err error)

func (*WALFileType) CanDeleteSafely

func (wf *WALFileType) CanDeleteSafely() bool

func (*WALFileType) CanWrite

func (wf *WALFileType) CanWrite(msg string) bool

func (*WALFileType) Close

func (wf *WALFileType) Close(ReplayStatus ReplayStateEnum)

func (*WALFileType) Delete

func (wf *WALFileType) Delete() (err error)

func (*WALFileType) FullPathToWALKey

func (wf *WALFileType) FullPathToWALKey(fullPath string) (keyPath string)

func (*WALFileType) IsOpen

func (wf *WALFileType) IsOpen() bool

func (*WALFileType) NeedsReplay

func (wf *WALFileType) NeedsReplay() bool

func (*WALFileType) Open

func (wf *WALFileType) Open() error

func (*WALFileType) ReadStatus

func (wf *WALFileType) ReadStatus() (fileStatus FileStatusEnum, replayStatus ReplayStateEnum, OwningInstanceID int64, err error)

func (*WALFileType) Replay

func (wf *WALFileType) Replay(writeData bool) error

func (*WALFileType) RequestFlush

func (wf *WALFileType) RequestFlush()

RequestFlush requests WAL Flush to the WAL writer goroutine if it exists, or just does the work in the same goroutine otherwise. The function blocks if there are no current queued flushes, and returns if there is already one queued which will handle the data present in the write channel, as it will flush as soon as possible.

func (*WALFileType) SyncWAL

func (wf *WALFileType) SyncWAL(WALRefresh, PrimaryRefresh time.Duration, walRotateInterval int)

func (*WALFileType) WALKeyToFullPath

func (wf *WALFileType) WALKeyToFullPath(keyPath string) (fullPath string)

func (*WALFileType) WriteStatus

func (wf *WALFileType) WriteStatus(FileStatus FileStatusEnum, ReplayState ReplayStateEnum)

func (*WALFileType) WriteTransactionInfo

func (wf *WALFileType) WriteTransactionInfo(tid int64, did DestEnum, txnStatus TxnStatusEnum)

type WALTakeOverError

type WALTakeOverError string

func (WALTakeOverError) Error

func (msg WALTakeOverError) Error() string

type WALWriteError

type WALWriteError string

func (WALWriteError) Error

func (msg WALWriteError) Error() string

type WriteCommand

type WriteCommand struct {
	RecordType    io.EnumRecordType
	WALKeyPath    string
	Offset, Index int64
	Data          []byte
}

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(tbi *io.TimeBucketInfo, tgc *TransactionPipe, rootCatDir *catalog.Directory) (*Writer, error)

func (*Writer) AddNewYearFile

func (w *Writer) AddNewYearFile(year int16) (err error)

func (*Writer) WriteRecords

func (w *Writer) WriteRecords(ts []time.Time, data []byte)

WriteRecords creates a WriteCommand from the supplied timestamp and data buffer, and sends it over the write channel to be flushed to disk in the WAL sync subroutine. The caller should assume that by calling WriteRecords directly, the data will be written to the file regardless if it satisfies the on-disk data shape, possible corrupting the data files. It is recommended to call WriteCSM() for any writes as it is safer.

type WrongSizeError

type WrongSizeError string

func (WrongSizeError) Error

func (msg WrongSizeError) Error() string

Directories

Path Synopsis
package buffile helps batch write by writes to the tempobary in-memory buffer under the assumption that many writes come in to the part of single file frequently.
package buffile helps batch write by writes to the tempobary in-memory buffer under the assumption that many writes come in to the part of single file frequently.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL