Documentation ¶
Index ¶
- Constants
- func ForgetAboutNode(ctx context.Context, ioctx *rados.IOContext, nodename string) bte.BTE
- func IsAddressHot(addr uint64) bool
- func NewJournalProvider(cfg configprovider.Configuration, ccfg configprovider.ClusterConfiguration) (jprovider.JournalProvider, bte.BTE)
- func ParseObjectName(s string) *objName
- func UUIDSliceToArr(id []byte) [16]byte
- type CJournalProvider
- func (jp *CJournalProvider) Barrier(ctx context.Context, cp jprovider.Checkpoint) bte.BTE
- func (jp *CJournalProvider) ForgetAboutNode(ctx context.Context, nodename string) bte.BTE
- func (jp *CJournalProvider) GetLatestCheckpoint() jprovider.Checkpoint
- func (jp *CJournalProvider) Insert(ctx context.Context, rng *configprovider.MashRange, ...) (checkpoint jprovider.Checkpoint, err bte.BTE)
- func (jp *CJournalProvider) Nodename() string
- func (jp *CJournalProvider) ObtainNodeJournals(ctx context.Context, nodename string) (jprovider.JournalIterator, bte.BTE)
- func (jp *CJournalProvider) ReleaseAllOurJournals(ctx context.Context) bte.BTE
- func (jp *CJournalProvider) ReleaseDisjointCheckpoint(ctx context.Context, cp jprovider.Checkpoint) bte.BTE
- func (jp *CJournalProvider) ReleaseJournalEntries(ctx context.Context, nodename string, upto jprovider.Checkpoint, ...) bte.BTE
- func (jp *CJournalProvider) WaitForCheckpoint(ctx context.Context, checkpoint jprovider.Checkpoint) bte.BTE
- type CJrecord
- type CacheItem
- type CephCache
- type CephSegment
- type CephStorageProvider
- func (sp *CephStorageProvider) BackgroundCleanup(uuids [][]byte) error
- func (sp *CephStorageProvider) CreateDatabase(cfg configprovider.Configuration, overwrite bool) error
- func (sp *CephStorageProvider) CreateJournalProvider(ournodename string) (jprovider.JournalProvider, bte.BTE)
- func (sp *CephStorageProvider) GetStreamVersion(ctx context.Context, uuid []byte) (uint64, error)
- func (sp *CephStorageProvider) Initialize(cfg configprovider.Configuration, rm *rez.RezManager)
- func (sp *CephStorageProvider) LockCoreSegment(uuid []byte) bprovider.Segment
- func (sp *CephStorageProvider) LockVectorSegment(uuid []byte) bprovider.Segment
- func (sp *CephStorageProvider) ObliterateStreamMetadata(uuid []byte)
- func (sp *CephStorageProvider) Read(ctx context.Context, uuid []byte, address uint64, buffer []byte) ([]byte, error)
- func (sp *CephStorageProvider) ReadSuperBlock(ctx context.Context, uuid []byte, version uint64, buffer []byte) ([]byte, error)
- func (sp *CephStorageProvider) SetStreamVersion(uuid []byte, version uint64)
- func (sp *CephStorageProvider) WriteSuperBlock(uuid []byte, version uint64, buffer []byte)
- type CheckpointHeap
Constants ¶
const ADDR_LOCK_SIZE = 0x1000000000
4096 blocks per addr lock
const ADDR_OBJ_SIZE = 0x0001000000
const CJournalProviderNamespace = "journalprovider"
const INITIAL_COLD_BASE_ADDRESS = 0
const INITIAL_HOT_BASE_ADDRESS = 0x8000000000000000
const MAX_EXPECTED_OBJECT_SIZE = 20485
Just over the DBSIZE
const METADATA_BASE = 0xFF00000000000000
We know we won't get any addresses here, because this is the relocation base as well
const MaxDistinctRanges = 64
const MaxObjectSize = 16 * 1024 * 1024
Keeping RADOS objects small is a good idea
const NUM_COLD_HANDLES = 128
const NUM_HOT_HANDLES = 128
const OFFSET_MASK = 0xFFFFFF
const RADOS_CACHE_SIZE = 512
The number of RADOS blocks to cache (up to 16MB each, probably only 1.6MB each)
const R_ADDRMASK = ^(uint64(R_CHUNKSIZE) - 1)
const R_CHUNKSIZE = 1 << 17
This is the size of the readahead/behind for caching. It rounds down so it is sometimes readahead sometimes readbehind
const R_OFFSETMASK = (uint64(R_CHUNKSIZE) - 1)
const SBLOCKS_PER_CHUNK = 1 << SBLOCK_CHUNK_SHIFT
const SBLOCK_CHUNK_MASK = 0xFFFFF
const SBLOCK_CHUNK_SHIFT = 20
Makes 16MB for 16B sblocks
const SBLOCK_SIZE = 16
const SEGCACHE_SIZE = 100 * 1024
const WCACHE_SIZE = 1 << 20
1MB for write cache, I doubt we will ever hit this tbh
const WORTH_CACHING = OFFSET_MASK - MAX_EXPECTED_OBJECT_SIZE
This is how many uuid/address pairs we will keep to facilitate appending to segments instead of creating new ones.
Variables ¶
This section is empty.
Functions ¶
func ForgetAboutNode ¶
Delete all journals associated with a node and also remove the tombstone, allowing the same node name to be used again
func IsAddressHot ¶
func NewJournalProvider ¶
func NewJournalProvider(cfg configprovider.Configuration, ccfg configprovider.ClusterConfiguration) (jprovider.JournalProvider, bte.BTE)
func ParseObjectName ¶
func ParseObjectName(s string) *objName
func UUIDSliceToArr ¶
Types ¶
type CJournalProvider ¶
type CJournalProvider struct {
// contains filtered or unexported fields
}
We also need some other metadata that we can use to learn what checkpoint number to start from
func (*CJournalProvider) Barrier ¶
func (jp *CJournalProvider) Barrier(ctx context.Context, cp jprovider.Checkpoint) bte.BTE
func (*CJournalProvider) ForgetAboutNode ¶
func (*CJournalProvider) GetLatestCheckpoint ¶
func (jp *CJournalProvider) GetLatestCheckpoint() jprovider.Checkpoint
func (*CJournalProvider) Insert ¶
func (jp *CJournalProvider) Insert(ctx context.Context, rng *configprovider.MashRange, jr *jprovider.JournalRecord) (checkpoint jprovider.Checkpoint, err bte.BTE)
func (*CJournalProvider) Nodename ¶
func (jp *CJournalProvider) Nodename() string
func (*CJournalProvider) ObtainNodeJournals ¶
func (jp *CJournalProvider) ObtainNodeJournals(ctx context.Context, nodename string) (jprovider.JournalIterator, bte.BTE)
Used by a node taking control of a range, returns an iterator over all unreleased journals
func (*CJournalProvider) ReleaseAllOurJournals ¶
func (jp *CJournalProvider) ReleaseAllOurJournals(ctx context.Context) bte.BTE
A bit like forget about node, but still keep the node name tombstone
func (*CJournalProvider) ReleaseDisjointCheckpoint ¶
func (jp *CJournalProvider) ReleaseDisjointCheckpoint(ctx context.Context, cp jprovider.Checkpoint) bte.BTE
func (*CJournalProvider) ReleaseJournalEntries ¶
func (jp *CJournalProvider) ReleaseJournalEntries(ctx context.Context, nodename string, upto jprovider.Checkpoint, rng *configprovider.MashRange) bte.BTE
Used by both the recovering nodes and the generating nodes Given that the same journal can be processed by two different nodes across different ranges, it is important that the provider only frees resources associated with old checkpoints if they have been released across the entire range of the journal. The checkpoint is EXCLUSIVE.
func (*CJournalProvider) WaitForCheckpoint ¶
func (jp *CJournalProvider) WaitForCheckpoint(ctx context.Context, checkpoint jprovider.Checkpoint) bte.BTE
type CJrecord ¶
type CJrecord struct { R *jprovider.JournalRecord C uint64 }
func (*CJrecord) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
type CephSegment ¶
type CephSegment struct {
// contains filtered or unexported fields
}
func (*CephSegment) BaseAddress ¶
func (seg *CephSegment) BaseAddress() uint64
Returns the address of the first free word in the segment when it was locked
func (*CephSegment) Flush ¶
func (seg *CephSegment) Flush()
Block until all writes are complete. Note this does not imply a flush of the underlying files.
func (*CephSegment) Unlock ¶
func (seg *CephSegment) Unlock()
Unlocks the segment for the StorageProvider to give to other consumers Implies a flush
type CephStorageProvider ¶
type CephStorageProvider struct {
// contains filtered or unexported fields
}
func (*CephStorageProvider) BackgroundCleanup ¶
func (sp *CephStorageProvider) BackgroundCleanup(uuids [][]byte) error
func (*CephStorageProvider) CreateDatabase ¶
func (sp *CephStorageProvider) CreateDatabase(cfg configprovider.Configuration, overwrite bool) error
Called to create the database for the first time This doesn't lock, but nobody else would be trying to do the same thing at the same time, so...
func (*CephStorageProvider) CreateJournalProvider ¶
func (sp *CephStorageProvider) CreateJournalProvider(ournodename string) (jprovider.JournalProvider, bte.BTE)
func (*CephStorageProvider) GetStreamVersion ¶
Gets the version of a stream. Returns 0 if none exists.
func (*CephStorageProvider) Initialize ¶
func (sp *CephStorageProvider) Initialize(cfg configprovider.Configuration, rm *rez.RezManager)
Called at startup of a normal run
func (*CephStorageProvider) LockCoreSegment ¶
func (sp *CephStorageProvider) LockCoreSegment(uuid []byte) bprovider.Segment
func (*CephStorageProvider) LockVectorSegment ¶
func (sp *CephStorageProvider) LockVectorSegment(uuid []byte) bprovider.Segment
func (*CephStorageProvider) ObliterateStreamMetadata ¶
func (sp *CephStorageProvider) ObliterateStreamMetadata(uuid []byte)
func (*CephStorageProvider) Read ¶
func (sp *CephStorageProvider) Read(ctx context.Context, uuid []byte, address uint64, buffer []byte) ([]byte, error)
Read the blob into the given buffer
func (*CephStorageProvider) ReadSuperBlock ¶
func (sp *CephStorageProvider) ReadSuperBlock(ctx context.Context, uuid []byte, version uint64, buffer []byte) ([]byte, error)
Read the given version of superblock into the buffer. mebbeh we want to cache this?
func (*CephStorageProvider) SetStreamVersion ¶
func (sp *CephStorageProvider) SetStreamVersion(uuid []byte, version uint64)
Sets the version of a stream. If it is in the past, it is essentially a rollback, and although no space is freed, the consecutive version numbers can be reused note to self: you must make sure not to call ReadSuperBlock on versions higher than you get from GetStreamVersion because they might succeed
func (*CephStorageProvider) WriteSuperBlock ¶
func (sp *CephStorageProvider) WriteSuperBlock(uuid []byte, version uint64, buffer []byte)
Writes a superblock of the given version
type CheckpointHeap ¶
type CheckpointHeap []uint64
func (CheckpointHeap) Len ¶
func (h CheckpointHeap) Len() int
func (CheckpointHeap) Less ¶
func (h CheckpointHeap) Less(i, j int) bool
func (*CheckpointHeap) Peek ¶
func (h *CheckpointHeap) Peek() uint64
func (*CheckpointHeap) Pop ¶
func (h *CheckpointHeap) Pop() interface{}
func (*CheckpointHeap) Push ¶
func (h *CheckpointHeap) Push(x interface{})
func (CheckpointHeap) Swap ¶
func (h CheckpointHeap) Swap(i, j int)