Documentation ¶
Index ¶
- Constants
- Variables
- type DiskEntryReader
- type DiskStorage
- func (s *DiskStorage) Close() error
- func (s *DiskStorage) Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)
- func (s *DiskStorage) FirstIndex() (uint64, error)
- func (s *DiskStorage) LastIndex() (uint64, error)
- func (s *DiskStorage) NewEntryReaderAt(raftIndex uint64) (EntryReader, error)
- func (s *DiskStorage) SaveRaftEntries(entries []raftpb.Entry) error
- func (s *DiskStorage) Snapshot() (raftpb.Snapshot, error)
- func (s *DiskStorage) StartPurgeLog()
- func (s *DiskStorage) StopPurgeLog()
- func (s *DiskStorage) Term(i uint64) (uint64, error)
- func (s *DiskStorage) TruncateSuffix(raftIndex uint64) error
- type EntryReader
- type Index
- type IndexEntry
- type MemoryEventReader
- type MemoryStorage
- func (s *MemoryStorage) Close() error
- func (s *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)
- func (s *MemoryStorage) FirstIndex() (uint64, error)
- func (s *MemoryStorage) LastIndex() (uint64, error)
- func (s *MemoryStorage) NewEntryReaderAt(raftIndex uint64) (EntryReader, error)
- func (s *MemoryStorage) SaveRaftEntries(entries []raftpb.Entry) error
- func (s *MemoryStorage) Snapshot() (raftpb.Snapshot, error)
- func (s *MemoryStorage) StartPurgeLog()
- func (s *MemoryStorage) StopPurgeLog()
- func (s *MemoryStorage) Term(i uint64) (uint64, error)
- func (s *MemoryStorage) TruncateSuffix(i uint64) error
- type MetaStorage
- type MetaStore
- func (s *MetaStore) Close2() error
- func (s *MetaStore) Delete(key []byte) error
- func (s *MetaStore) Get(key []byte) ([]byte, error)
- func (s *MetaStore) GetFde(preGtidEventIndex uint64) ([]byte, error)
- func (s *MetaStore) GetGtidSet(flavor string, key string) (gomysql.GTIDSet, error)
- func (s *MetaStore) GetNextBinlogFile(startRaftIndex uint64) (string, error)
- func (s *MetaStore) GetPreviousGtidSet(slaveExecutedGtids *gomysql.MysqlGTIDSet) (uint64, error)
- func (s *MetaStore) InitialState() (pb.HardState, pb.ConfState, error)
- func (s *MetaStore) SaveHardState(st pb.HardState) error
- func (s *MetaStore) Set(key []byte, value []byte) error
- func (s *MetaStore) SetBinlogProgress(appliedIndex uint64, executedGtidSet gomysql.GTIDSet) error
- func (s *MetaStore) SetGtidSet(flavor string, key string, gtidSet gomysql.GTIDSet) error
- func (s *MetaStore) SetPreviousGtidSet(raftIndex uint64, previousGtidSet *gomysql.MysqlGTIDSet) error
- func (s *MetaStore) UpdatePugedGtidset(firstIndex uint64) error
- type MmapFile
- type RWFile
- type Segment
- type SegmentStatus
- type Storage
Constants ¶
const ( //FileMode is default file mode FileMode = 0600 //DirMode is default dir mode DirMode = 0700 )
const ( //MetadataFile is the file name of storage metadata MetadataFile = "kingbus_meta.db" //BucketName is the bucket name using in bolt db BucketName = "kingbus_meta_bucket" )
const ( //ReadonlySegmentPattern is the read only segment file name pattern ReadonlySegmentPattern = "%020d-%020d.log" //ReadonlyIndexPattern is the read only index file name pattern ReadonlyIndexPattern = "%020d-%020d.index" //ReadWriteSegmentPattern is the read write segment file name pattern ReadWriteSegmentPattern = "%020d-inprogress.log" //ReadWriteIndexPattern is the read only index file name pattern ReadWriteIndexPattern = "%020d-inprogress.index" //RecordLengthSize is the record length size RecordLengthSize int = 4 )
const ( //HardStateKey represents hard state in raft HardStateKey = "hard_state" //ConfStateKey represents conf state in raft ConfStateKey = "conf_state" //ExecutedGtidSetKey represents executed gtid set key ExecutedGtidSetKey = "executed_gtids" //GtidPurgedKey represents gtid purged key GtidPurgedKey = "gtid_purged" //FdePrefix is the key prefix of FORMAT_DESCRIPTION_EVENT FdePrefix = "fde" //NextBinlogPrefix is the key prefix of nex binlog file name NextBinlogPrefix = "next_binlog" //MasterInfoKey is master info key MasterInfoKey = "master_info" //PgePrefix if the key prefix of previous gtid event PgePrefix = "pre_gtids" //SyncerArgsKey is the key of syncer start args SyncerArgsKey = "syncer_args" //NeedRecoverKey is the key of storage need recover NeedRecoverKey = "ds_need_recover" //RaftClusterKey is the key of raft cluster information RaftClusterKey = "raft_cluster" //AppliedIndexKey is the key of applied index AppliedIndexKey = "apply_index" )
const (
//IndexEntrySize is the size of IndexEntry
IndexEntrySize = 12
)
Variables ¶
var ( //ErrKeyNotFound return for key is not found ErrKeyNotFound = errors.New("storage:key is not found") //ErrKeyIsNil return for key is nil ErrKeyIsNil = errors.New("storage:key is nil") //ErrArgsIllegal return for args are illegal ErrArgsIllegal = errors.New("storage:args are illegal") //ErrNotContain return for gtids not be contained ErrNotContain = errors.New("storage:gtids not be contained") //ErrOutOfBound return for index out of bound ErrOutOfBound = errors.New("storage:index out of bound") //ErrNotContinuous return for index is not continuous ErrNotContinuous = errors.New("storage:index is not continuous") //ErrClosed return for segment is closed ErrClosed = errors.New("storage:segment is closed") //ErrNotWritable return for segment not writable ErrNotWritable = errors.New("storage:segment not writable") )
var ( //SegmentSize is the size of segment file SegmentSize int64 = 1024 * 1024 * 1024 //1GB )
Functions ¶
This section is empty.
Types ¶
type DiskEntryReader ¶
type DiskEntryReader struct {
// contains filtered or unexported fields
}
DiskEntryReader is a raft entry reader
func (*DiskEntryReader) GetNext ¶
func (r *DiskEntryReader) GetNext() (*raftpb.Entry, error)
GetNext get the next raft entry
func (*DiskEntryReader) NextRaftIndex ¶
func (r *DiskEntryReader) NextRaftIndex() uint64
NextRaftIndex get the next raft index
type DiskStorage ¶
type DiskStorage struct { Dir string Mu sync.RWMutex Segments []*Segment ReserveSegmentCount int MetaStorage // contains filtered or unexported fields }
DiskStorage is the Storage store data in disk
func NewDiskStorage ¶
func NewDiskStorage(dir string, reserveSizeInGB int) (*DiskStorage, error)
NewDiskStorage create a disk storage
func (*DiskStorage) Entries ¶
func (s *DiskStorage) Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)
Entries get raft entries in [lo,hi)
func (*DiskStorage) FirstIndex ¶
func (s *DiskStorage) FirstIndex() (uint64, error)
FirstIndex get first raft index
func (*DiskStorage) LastIndex ¶
func (s *DiskStorage) LastIndex() (uint64, error)
LastIndex get last raft index
func (*DiskStorage) NewEntryReaderAt ¶
func (s *DiskStorage) NewEntryReaderAt(raftIndex uint64) (EntryReader, error)
NewEntryReaderAt create a DiskEntryReader at raftIndex
func (*DiskStorage) SaveRaftEntries ¶
func (s *DiskStorage) SaveRaftEntries(entries []raftpb.Entry) error
SaveRaftEntries save raft entries in storage
func (*DiskStorage) Snapshot ¶
func (s *DiskStorage) Snapshot() (raftpb.Snapshot, error)
Snapshot not implement
func (*DiskStorage) StartPurgeLog ¶
func (s *DiskStorage) StartPurgeLog()
StartPurgeLog implements purge the expired log files
func (*DiskStorage) Term ¶
func (s *DiskStorage) Term(i uint64) (uint64, error)
Term get raft term of raft index
func (*DiskStorage) TruncateSuffix ¶
func (s *DiskStorage) TruncateSuffix(raftIndex uint64) error
TruncateSuffix truncate raft entry to the raft index is i, include i.
type EntryReader ¶
EntryReader is a raft entry reader interface
type Index ¶
type Index struct {
// contains filtered or unexported fields
}
Index is a index of segment file
type IndexEntry ¶
IndexEntry is index entry in index file
func (*IndexEntry) Marshal ¶
func (i *IndexEntry) Marshal() ([]byte, error)
Marshal encode index entry into byte
func (*IndexEntry) Unmarshal ¶
func (i *IndexEntry) Unmarshal(data []byte) error
Unmarshal decode byte into index entry
type MemoryEventReader ¶
type MemoryEventReader struct { }
MemoryEventReader not implement
func (*MemoryEventReader) GetNext ¶
func (s *MemoryEventReader) GetNext() (*raftpb.Entry, error)
GetNext not implement
func (*MemoryEventReader) NextRaftIndex ¶
func (s *MemoryEventReader) NextRaftIndex() uint64
NextRaftIndex not implement
type MemoryStorage ¶
type MemoryStorage struct { MetaStorage // contains filtered or unexported fields }
MemoryStorage is Storage implemented in memory
func NewMemoryStorage ¶
func NewMemoryStorage(dir string) (*MemoryStorage, error)
NewMemoryStorage create a memory storage
func (*MemoryStorage) Entries ¶
func (s *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error)
Entries get raft entries in [lo,hi)
func (*MemoryStorage) FirstIndex ¶
func (s *MemoryStorage) FirstIndex() (uint64, error)
FirstIndex get first raft index
func (*MemoryStorage) LastIndex ¶
func (s *MemoryStorage) LastIndex() (uint64, error)
LastIndex get last raft index
func (*MemoryStorage) NewEntryReaderAt ¶
func (s *MemoryStorage) NewEntryReaderAt(raftIndex uint64) (EntryReader, error)
NewEntryReaderAt not implement
func (*MemoryStorage) SaveRaftEntries ¶
func (s *MemoryStorage) SaveRaftEntries(entries []raftpb.Entry) error
SaveRaftEntries save raft entries in storage
func (*MemoryStorage) Snapshot ¶
func (s *MemoryStorage) Snapshot() (raftpb.Snapshot, error)
Snapshot do not support
func (*MemoryStorage) StartPurgeLog ¶
func (s *MemoryStorage) StartPurgeLog()
StartPurgeLog not implement
func (*MemoryStorage) StopPurgeLog ¶
func (s *MemoryStorage) StopPurgeLog()
StopPurgeLog not implement
func (*MemoryStorage) Term ¶
func (s *MemoryStorage) Term(i uint64) (uint64, error)
Term get raft term of raft index
func (*MemoryStorage) TruncateSuffix ¶
func (s *MemoryStorage) TruncateSuffix(i uint64) error
TruncateSuffix not implement
type MetaStorage ¶
type MetaStorage interface { InitialState() (raftpb.HardState, raftpb.ConfState, error) SaveHardState(st raftpb.HardState) error Get(key []byte) ([]byte, error) Set(key, value []byte) error Delete(key []byte) error //key is key+flavor SetGtidSet(flavor string, key string, gtidSet gomysql.GTIDSet) error GetGtidSet(flavor string, key string) (gomysql.GTIDSet, error) SetBinlogProgress(appliedIndex uint64, executedGtidSet gomysql.GTIDSet) error GetFde(preGtidEventIndex uint64) ([]byte, error) //get the raft index of PreviousGtidSet GetPreviousGtidSet(slaveExecutedGtids *gomysql.MysqlGTIDSet) (uint64, error) SetPreviousGtidSet(raftIndex uint64, previousGtidSet *gomysql.MysqlGTIDSet) error GetNextBinlogFile(startRaftIndex uint64) (string, error) UpdatePugedGtidset(firstIndex uint64) error Close2() error }
MetaStorage is metadata storage interface
type MetaStore ¶
MetaStore is a metadata store
func NewMetaStore ¶
NewMetaStore create a meta store
func (*MetaStore) GetGtidSet ¶
GetGtidSet get gtid set
func (*MetaStore) GetNextBinlogFile ¶
GetNextBinlogFile get the next binlog file name
func (*MetaStore) GetPreviousGtidSet ¶
func (s *MetaStore) GetPreviousGtidSet(slaveExecutedGtids *gomysql.MysqlGTIDSet) (uint64, error)
GetPreviousGtidSet Iterate over all Previous Gtid keys in order, and read only the Previous_gtids_log_event, to find the last one, that is the subset of slaveExecutedGtids. Since every binary log file begins with a Previous_gtids_log_event, that contains all GTIDs in all previous binary logs. We also ask for the first GTID in the binary log to know if we should send the FD event with the "created" field cleared or not.
func (*MetaStore) InitialState ¶
InitialState init raft state
func (*MetaStore) SaveHardState ¶
SaveHardState save hard state in raft
func (*MetaStore) SetBinlogProgress ¶
SetBinlogProgress save appliedIndex and executedGtidSet at the same time
func (*MetaStore) SetGtidSet ¶
SetGtidSet set mysql gtid
func (*MetaStore) SetPreviousGtidSet ¶
func (s *MetaStore) SetPreviousGtidSet(raftIndex uint64, previousGtidSet *gomysql.MysqlGTIDSet) error
SetPreviousGtidSet set previous gtid set
func (*MetaStore) UpdatePugedGtidset ¶
UpdatePugedGtidset update purged gtid when segment purged or updated by master gtid_purged
type MmapFile ¶
type MmapFile struct {
// contains filtered or unexported fields
}
MmapFile is the file mmaped in os
type RWFile ¶
type RWFile struct {
// contains filtered or unexported fields
}
RWFile is a index file with RW mode
type Segment ¶
type Segment struct { Mu sync.RWMutex FirstIndex uint64 LastIndex uint64 LogFile *MmapFile LogIndex *Index Status SegmentStatus }
Segment is segment file
type SegmentStatus ¶
type SegmentStatus int8
SegmentStatus is the status of segment file
const ( //SegmentReadOnly represents segment file read only SegmentReadOnly SegmentStatus = iota //SegmentRDWR represents segment file read write SegmentRDWR //SegmentClosed represents segment file is closed SegmentClosed )
type Storage ¶
type Storage interface { // entries returns a slice of log entries in the range [lo,hi). // MaxSize limits the total size of the log entries returned, but // entries returns at least one entry if any. Entries(lo, hi, maxSize uint64) ([]raftpb.Entry, error) // Term returns the term of entry i, which must be in the range // [FirstIndex()-1, LastIndex()]. The term of the entry before // FirstIndex is retained for matching purposes even though the // rest of that entry may not be available.SaveHardState Term(i uint64) (uint64, error) // LastIndex returns the index of the last entry in the log. LastIndex() (uint64, error) // FirstIndex returns the index of the first log entry that is // possibly available via entries (older entries have been incorporated // into the latest Snapshot; if storage only contains the dummy entry the // first log entry is not available). FirstIndex() (uint64, error) // Snapshot returns the most recent snapshot. // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable, // so raft state machine could know that Storage needs some time to prepare // snapshot and call Snapshot later. Snapshot() (raftpb.Snapshot, error) SaveRaftEntries(entries []raftpb.Entry) error TruncateSuffix(i uint64) error StartPurgeLog() StopPurgeLog() MetaStorage NewEntryReaderAt(raftIndex uint64) (EntryReader, error) Close() error }
Storage is kingbus server storage