Documentation ¶
Index ¶
- Constants
- type FileRedoLogManager
- func (r *FileRedoLogManager) AppendToRedoLog(upsertBatch *common.UpsertBatch) (int64, uint32)
- func (r *FileRedoLogManager) CheckpointRedolog(cutoff uint32, redoFileCheckpointed int64, batchOffset uint32) error
- func (r *FileRedoLogManager) Close()
- func (r *FileRedoLogManager) GetBatchReceived() int
- func (r *FileRedoLogManager) GetBatchRecovered() int
- func (r *FileRedoLogManager) GetNumFiles() int
- func (r *FileRedoLogManager) GetTotalSize() int
- func (r *FileRedoLogManager) IsAppendEnabled() bool
- func (r *FileRedoLogManager) Iterator() (NextUpsertFunc, error)
- func (r *FileRedoLogManager) MarshalJSON() ([]byte, error)
- func (r *FileRedoLogManager) UpdateMaxEventTime(eventTime uint32, redoFile int64)
- func (r *FileRedoLogManager) WaitForRecoveryDone()
- type NextUpsertBatchInfo
- type NextUpsertFunc
- type RedoLogManagerMaster
- type RedologManager
Constants ¶
const UpsertHeader uint32 = 0xADDAFEED
UpsertHeader is the magic header written into the beginning of each redo log file.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FileRedoLogManager ¶
type FileRedoLogManager struct { // The lock is to protect MaxEventTimePerFile. sync.RWMutex `json:"-"` // The time interval of redo file rotations. RotationInterval int64 `json:"rotationInterval"` // The limit of redo file size to trigger rotations. MaxRedoLogSize int64 `json:"maxRedoLogSize"` // Current redo log size CurrentRedoLogSize uint32 `json:"currentRedoLogSize"` // size of all redologs TotalRedoLogSize uint `json:"totalRedologSize"` // The map with redo log creation time as the key and max event time as the value. Readers // need to hold the reader lock in accessing the field. MaxEventTimePerFile map[int64]uint32 `json:"maxEventTimePerFile"` // redo log creation time -> batch count mapping. // Readers need to hold the reader lock in accessing the field. BatchCountPerFile map[int64]uint32 `json:"batchCountPerFile"` // SizePerFile SizePerFile map[int64]uint32 `json:"sizePerFile"` // Current file creation time in milliseconds. CurrentFileCreationTime int64 `json:"currentFileCreationTime"` // contains filtered or unexported fields }
fileRedologManager manages the redo log file append, rotation, purge. It is used by ingestion, recovery and archiving. Accessor must hold the TableShard.WriterLock to access it.
func (*FileRedoLogManager) AppendToRedoLog ¶
func (r *FileRedoLogManager) AppendToRedoLog(upsertBatch *common.UpsertBatch) (int64, uint32)
AppendToRedoLog saves an upsert batch into disk before applying it. Any errors from diskStore will trigger system panic.
func (*FileRedoLogManager) CheckpointRedolog ¶
func (r *FileRedoLogManager) CheckpointRedolog(cutoff uint32, redoFileCheckpointed int64, batchOffset uint32) error
CheckpointRedolog purges disk files and in memory data of redologs that are eligible to be purged.
func (*FileRedoLogManager) Close ¶
func (r *FileRedoLogManager) Close()
Close closes the current log file.
func (*FileRedoLogManager) GetBatchReceived ¶
func (r *FileRedoLogManager) GetBatchReceived() int
func (*FileRedoLogManager) GetBatchRecovered ¶
func (r *FileRedoLogManager) GetBatchRecovered() int
func (*FileRedoLogManager) GetNumFiles ¶
func (r *FileRedoLogManager) GetNumFiles() int
func (*FileRedoLogManager) GetTotalSize ¶
func (r *FileRedoLogManager) GetTotalSize() int
CheckpointRedolog purges disk files and in memory data of redologs that are eligible to be purged.
func (*FileRedoLogManager) IsAppendEnabled ¶
func (r *FileRedoLogManager) IsAppendEnabled() bool
IsAppendEnabled returns whether appending is enabled
func (*FileRedoLogManager) Iterator ¶
func (r *FileRedoLogManager) Iterator() (NextUpsertFunc, error)
Iterator returns a functor that can be used to iterate over redo logs on disk and returns one UpsertBatch at each call. It returns nil to indicate the end of the upsert batch stream.
Any failure in file reading and upsert batch creation will trigger system panic.
func (*FileRedoLogManager) MarshalJSON ¶
func (r *FileRedoLogManager) MarshalJSON() ([]byte, error)
MarshalJSON marshals a fileRedologManager into json.
func (*FileRedoLogManager) UpdateMaxEventTime ¶
func (r *FileRedoLogManager) UpdateMaxEventTime(eventTime uint32, redoFile int64)
UpdateMaxEventTime updates the max event time of the current redo log file. redoFile is the key to the corresponding redo file that needs to have the maxEventTime updated. redoFile == 0 is used in serving ingestion requests where the current file's max event time is updated. redoFile != 0 is used in recovery where the redo log file loaded from disk needs to get its max event time calculated.
func (*FileRedoLogManager) WaitForRecoveryDone ¶
func (r *FileRedoLogManager) WaitForRecoveryDone()
type NextUpsertBatchInfo ¶
type NextUpsertBatchInfo struct { // upsertbatch Batch *common.UpsertBatch // Redo log File id used to save this batch RedoLogFile int64 // offset of this batch in above redo log file BatchOffset uint32 // If this batch is coming from recovery Recovery bool }
NextUpsertBatchInfo contains UpsertBatch and its related info from redolog
type RedoLogManagerMaster ¶
type RedoLogManagerMaster struct { sync.Mutex // namespace of the datanode Namespace string // redolog config RedoLogConfig *common.RedoLogConfig // contains filtered or unexported fields }
Master class to create shard level redolog manager
func NewKafkaRedoLogManagerMaster ¶
func NewKafkaRedoLogManagerMaster(namespace string, cfg *common.RedoLogConfig, diskStore diskstore.DiskStore, metaStore metaCom.MetaStore, consumer sarama.Consumer) (*RedoLogManagerMaster, error)
NewKafkaRedoLogManagerMaster convenient function if the kafka consumer can be passed in from outside
func NewRedoLogManagerMaster ¶
func NewRedoLogManagerMaster(namespace string, c *common.RedoLogConfig, diskStore diskstore.DiskStore, metaStore metaCom.MetaStore) (*RedoLogManagerMaster, error)
NewRedoLogManagerMaster create RedoLogManagerMaster instance
func (*RedoLogManagerMaster) Close ¶
func (m *RedoLogManagerMaster) Close(table string, shard int)
Close one table shard Redolog manager
func (*RedoLogManagerMaster) NewRedologManager ¶
func (m *RedoLogManagerMaster) NewRedologManager(table string, shard int, unsharded bool, tableConfig *metaCom.TableConfig) (RedologManager, error)
NewRedologManager create compositeRedoLogManager on specified table/shard each table/shard should only have one compositeRedoLogManager
func (*RedoLogManagerMaster) Stop ¶
func (m *RedoLogManagerMaster) Stop()
Stop close all shard redolog manager and kafka consumer
type RedologManager ¶
type RedologManager interface { // IsAppendEnabled returns whether appending is enabled IsAppendEnabled() bool // Iterator will return a next function to iterate through all coming upsert batches Iterator() (NextUpsertFunc, error) // Block call to wait for recovery finish WaitForRecoveryDone() // UpdateMaxEventTime update max eventime of given redolog file UpdateMaxEventTime(eventTime uint32, redoFile int64) // CheckpointRedolog checkpoint event time cutoff (from archiving) and redologFileID and batchOffset (from backfill) // to redolog manager CheckpointRedolog(cutoff uint32, redoFileCheckpointed int64, batchOffset uint32) error // Append the upsertbatch into redolog AppendToRedoLog(upsertBatch *common.UpsertBatch) (int64, uint32) // Get total bytes of all redo log files GetTotalSize() int // Get total number of redo log files GetNumFiles() int // Get number of batch received after recovery GetBatchReceived() int // Get number of batch recovered for recovery GetBatchRecovered() int // Close free resources held by redolog manager Close() }
RedologManager defines rodelog manager interface