Documentation ¶
Index ¶
- Constants
- Variables
- func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error)
- func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, ...) (err error)
- func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, ...) (err error)
- func ScanVolumeFileNeedleFrom(version needle.Version, dataFile *os.File, offset int64, ...) (err error)
- func VolumeFileName(dir string, collection string, id int) (fileName string)
- type DiskLocation
- func (l *DiskLocation) Close()
- func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error)
- func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error
- func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId)
- func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool)
- func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool)
- func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool)
- func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error)
- func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool
- func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume)
- func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) bool
- func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error
- func (l *DiskLocation) VolumesLen() int
- type LevelDbNeedleMap
- func (m *LevelDbNeedleMap) Close()
- func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error
- func (m *LevelDbNeedleMap) Destroy() error
- func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
- func (nm *LevelDbNeedleMap) IndexFileSize() uint64
- func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error
- type NeedleMap
- func (nm *NeedleMap) Close()
- func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error
- func (nm *NeedleMap) Destroy() error
- func (nm *NeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
- func (nm *NeedleMap) IndexFileSize() uint64
- func (nm *NeedleMap) Put(key NeedleId, offset Offset, size uint32) error
- type NeedleMapType
- type NeedleMapper
- type ReplicaPlacement
- type Store
- func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, ...) error
- func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error)
- func (s *Store) Close()
- func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat
- func (s *Store) CollectHeartbeat() *master_pb.Heartbeat
- func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error
- func (s *Store) CommitCompactVolume(vid needle.VolumeId) error
- func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error
- func (s *Store) DeleteCollection(collection string) (e error)
- func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_coding.EcVolume, n *needle.Needle, ...) (int64, error)
- func (s *Store) DeleteVolume(i needle.VolumeId) error
- func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error)
- func (s *Store) DestroyEcVolume(vid needle.VolumeId)
- func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume)
- func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool)
- func (s *Store) FindFreeLocation() (ret *DiskLocation)
- func (s *Store) GetVolume(i needle.VolumeId) *Volume
- func (s *Store) GetVolumeSizeLimit() uint64
- func (s *Store) HasVolume(i needle.VolumeId) bool
- func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error
- func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error
- func (s *Store) MountVolume(i needle.VolumeId) error
- func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error)
- func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle) (int, error)
- func (s *Store) SetDataCenter(dataCenter string)
- func (s *Store) SetRack(rack string)
- func (s *Store) SetVolumeSizeLimit(x uint64)
- func (s *Store) Status() []*VolumeInfo
- func (s *Store) String() (str string)
- func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error
- func (s *Store) UnmountVolume(i needle.VolumeId) error
- func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error)
- type SuperBlock
- type Volume
- func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error)
- func (v *Volume) Close()
- func (v *Volume) CommitCompact() error
- func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
- func (v *Volume) Compact2() error
- func (v *Volume) ContentSize() uint64
- func (v *Volume) DataFile() *os.File
- func (v *Volume) DeletedCount() uint64
- func (v *Volume) DeletedSize() uint64
- func (v *Volume) Destroy() (err error)
- func (v *Volume) FileCount() uint64
- func (v *Volume) FileName() (fileName string)
- func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time)
- func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse
- func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error
- func (v *Volume) IndexFileSize() uint64
- func (v *Volume) MaxFileKey() types.NeedleId
- func (v *Volume) NeedToReplicate() bool
- func (v *Volume) String() string
- func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage
- func (v *Volume) Version() needle.Version
- type VolumeFileScanner
- type VolumeFileScanner4GenIdx
- type VolumeFileScanner4Vacuum
- type VolumeInfo
Constants ¶
View Source
const (
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)
Variables ¶
View Source
var ErrorNotFound = errors.New("not found")
Functions ¶
func ScanVolumeFile ¶
func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, volumeFileScanner VolumeFileScanner) (err error)
func ScanVolumeFileFrom ¶
Types ¶
type DiskLocation ¶
type DiskLocation struct { Directory string MaxVolumeCount int sync.RWMutex // contains filtered or unexported fields }
func NewDiskLocation ¶
func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation
func (*DiskLocation) Close ¶
func (l *DiskLocation) Close()
func (*DiskLocation) DeleteCollectionFromDiskLocation ¶
func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error)
func (*DiskLocation) DeleteVolume ¶
func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error
func (*DiskLocation) DestroyEcVolume ¶
func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId)
func (*DiskLocation) FindEcShard ¶
func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool)
func (*DiskLocation) FindEcVolume ¶
func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool)
func (*DiskLocation) FindVolume ¶
func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool)
func (*DiskLocation) LoadEcShard ¶
func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error)
func (*DiskLocation) LoadVolume ¶
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool
func (*DiskLocation) SetVolume ¶
func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume)
func (*DiskLocation) UnloadEcShard ¶
func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) bool
func (*DiskLocation) UnloadVolume ¶
func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error
func (*DiskLocation) VolumesLen ¶
func (l *DiskLocation) VolumesLen() int
type LevelDbNeedleMap ¶
type LevelDbNeedleMap struct {
// contains filtered or unexported fields
}
func NewLevelDbNeedleMap ¶
func (*LevelDbNeedleMap) Close ¶
func (m *LevelDbNeedleMap) Close()
func (*LevelDbNeedleMap) Delete ¶
func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error
func (*LevelDbNeedleMap) Destroy ¶
func (m *LevelDbNeedleMap) Destroy() error
func (*LevelDbNeedleMap) Get ¶
func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
func (*LevelDbNeedleMap) IndexFileSize ¶
func (nm *LevelDbNeedleMap) IndexFileSize() uint64
func (*LevelDbNeedleMap) Put ¶
func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error
type NeedleMap ¶
type NeedleMap struct {
// contains filtered or unexported fields
}
func NewBtreeNeedleMap ¶
func NewCompactNeedleMap ¶
func (*NeedleMap) Get ¶
func (nm *NeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
func (*NeedleMap) IndexFileSize ¶
func (nm *NeedleMap) IndexFileSize() uint64
type NeedleMapType ¶
type NeedleMapType int
const ( NeedleMapInMemory NeedleMapType = iota NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer )
type NeedleMapper ¶
type NeedleMapper interface { Put(key NeedleId, offset Offset, size uint32) error Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) Delete(key NeedleId, offset Offset) error Close() Destroy() error ContentSize() uint64 DeletedSize() uint64 FileCount() int DeletedCount() int MaxFileKey() NeedleId IndexFileSize() uint64 }
type ReplicaPlacement ¶
func NewReplicaPlacementFromByte ¶
func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error)
func NewReplicaPlacementFromString ¶
func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error)
func (*ReplicaPlacement) Byte ¶
func (rp *ReplicaPlacement) Byte() byte
func (*ReplicaPlacement) GetCopyCount ¶
func (rp *ReplicaPlacement) GetCopyCount() int
func (*ReplicaPlacement) String ¶
func (rp *ReplicaPlacement) String() string
type Store ¶
type Store struct { MasterAddress string Ip string Port int PublicUrl string Locations []*DiskLocation NeedleMapType NeedleMapType NewVolumesChan chan master_pb.VolumeShortInformationMessage DeletedVolumesChan chan master_pb.VolumeShortInformationMessage NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage // contains filtered or unexported fields }
* A VolumeServer contains one Store
func NewStore ¶
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store)
func (*Store) CheckCompactVolume ¶
func (*Store) CollectErasureCodingHeartbeat ¶
func (*Store) CollectHeartbeat ¶
func (*Store) CompactVolume ¶
func (*Store) DeleteCollection ¶
func (*Store) DeleteEcShardNeedle ¶
func (*Store) DeleteVolumeNeedle ¶
func (*Store) DestroyEcVolume ¶
func (*Store) EcVolumes ¶
func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume)
func (*Store) FindEcVolume ¶
func (*Store) FindFreeLocation ¶
func (s *Store) FindFreeLocation() (ret *DiskLocation)
func (*Store) GetVolumeSizeLimit ¶
func (*Store) MountEcShards ¶
func (*Store) ReadEcShardNeedle ¶
func (*Store) ReadVolumeNeedle ¶
func (*Store) SetDataCenter ¶
func (*Store) SetVolumeSizeLimit ¶
func (*Store) Status ¶
func (s *Store) Status() []*VolumeInfo
func (*Store) UnmountEcShards ¶
type SuperBlock ¶
type SuperBlock struct { ReplicaPlacement *ReplicaPlacement Ttl *needle.TTL CompactionRevision uint16 Extra *master_pb.SuperBlockExtra // contains filtered or unexported fields }
* Super block currently has 8 bytes allocated for each volume. * Byte 0: version, 1 or 2 * Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc * Byte 2 and byte 3: Time to live. See TTL for definition * Byte 4 and byte 5: The number of times the volume has been compacted. * Rest bytes: Reserved
func ReadSuperBlock ¶
func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error)
ReadSuperBlock reads from data file and load it into volume's super block
func (*SuperBlock) BlockSize ¶
func (s *SuperBlock) BlockSize() int
func (*SuperBlock) Bytes ¶
func (s *SuperBlock) Bytes() []byte
func (*SuperBlock) Version ¶
func (s *SuperBlock) Version() needle.Version
type Volume ¶
type Volume struct { Id needle.VolumeId Collection string MemoryMapMaxSizeMB uint32 SuperBlock // contains filtered or unexported fields }
func (*Volume) BinarySearchByAppendAtNs ¶
on server side
func (*Volume) CommitCompact ¶
func (*Volume) ContentSize ¶
func (*Volume) DeletedCount ¶
func (*Volume) DeletedSize ¶
func (*Volume) GetVolumeSyncStatus ¶
func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse
func (*Volume) IncrementalBackup ¶
func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error
func (*Volume) IndexFileSize ¶
func (*Volume) MaxFileKey ¶
func (*Volume) NeedToReplicate ¶
func (*Volume) ToVolumeInformationMessage ¶
func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage
type VolumeFileScanner ¶
type VolumeFileScanner4GenIdx ¶
type VolumeFileScanner4GenIdx struct {
// contains filtered or unexported fields
}
generate the volume idx
func (*VolumeFileScanner4GenIdx) ReadNeedleBody ¶
func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool
func (*VolumeFileScanner4GenIdx) VisitNeedle ¶
func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64) error
func (*VolumeFileScanner4GenIdx) VisitSuperBlock ¶
func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock SuperBlock) error
type VolumeFileScanner4Vacuum ¶
type VolumeFileScanner4Vacuum struct {
// contains filtered or unexported fields
}
func (*VolumeFileScanner4Vacuum) ReadNeedleBody ¶
func (scanner *VolumeFileScanner4Vacuum) ReadNeedleBody() bool
func (*VolumeFileScanner4Vacuum) VisitNeedle ¶
func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset int64) error
func (*VolumeFileScanner4Vacuum) VisitSuperBlock ¶
func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error
type VolumeInfo ¶
type VolumeInfo struct { Id needle.VolumeId Size uint64 ReplicaPlacement *ReplicaPlacement Ttl *needle.TTL Collection string Version needle.Version FileCount int DeleteCount int DeletedByteCount uint64 ReadOnly bool CompactRevision uint32 ModifiedAtSecond int64 }
func NewVolumeInfo ¶
func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err error)
func NewVolumeInfoFromShort ¶
func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi VolumeInfo, err error)
func (VolumeInfo) String ¶
func (vi VolumeInfo) String() string
func (VolumeInfo) ToVolumeInformationMessage ¶
func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage
Source Files ¶
- disk_location.go
- disk_location_ec.go
- needle_map.go
- needle_map_leveldb.go
- needle_map_memory.go
- needle_map_metric.go
- replica_placement.go
- store.go
- store_ec.go
- store_ec_delete.go
- store_vacuum.go
- volume.go
- volume_backup.go
- volume_checking.go
- volume_create.go
- volume_create_linux.go
- volume_info.go
- volume_loading.go
- volume_read_write.go
- volume_super_block.go
- volume_vacuum.go
Click to show internal directories.
Click to hide internal directories.