db

package
v0.0.0-...-9750751 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package db implements Juno storage interfaces with gorocksdb.

Record Encoding Format

 Offset | Field                           | Size
--------+---------------------------------+---------------
      0 | encoding version                | 1 byte
--------+---------------------------------+---------------
      1 | flag                            | 1 byte
--------+---------------------------------+---------------
      2 | reserved                        | 2 bytes
--------+---------------------------------+---------------
      4 | expiration time                 | 4 bytes
--------+---------------------------------+---------------
      8 | version                         | 4 bytes
--------+---------------------------------+---------------
     12 | creation time                   | 4 bytes
--------+---------------------------------+---------------
     16 | last modification time          | 8 bytes
--------+---------------------------------+---------------
     24 | request Id of the last modifier | 16 bytes
--------+---------------------------------+---------------
     40 | request Id of the originator    | 16 bytes
--------+---------------------------------+---------------
     56 | encapsulating payload           | ...

Record Flag
  bit |           0|           1|           2|           3|           4|           5|           6|           7
------+------------+------------+------------+------------+------------+------------+------------+------------+
      | MarkDelete |

Storage Key Format

----------------------------+----------- +--------
  namespace length (1 byte) |  namespace |  key
----------------------------+----------- +--------

Index

Constants

View Source
const (
	MAX_NAMESPACE_LENGTH int = 255
)

Variables

View Source
var DBConfig = defaultFlashConfig

Functions

func ConfigBlockCache

func ConfigBlockCache() *gorocksdb.BlockBasedTableOptions

func DecodeRecordKey

func DecodeRecordKey(sskey []byte) ([]byte, []byte, error)

func DecodeRecordKeyNoShardID

func DecodeRecordKeyNoShardID(storageKey []byte) ([]byte, []byte, error)

func Finalize

func Finalize()

func Initialize

func Initialize(
	numShards int, numMicroShards int, numMicroShardGroups int,
	numPrefixDbs int, zoneId int, nodeId int, shardMap shard.Map, lruCacheSizeInMB int)

only called once during start up

func NewRocksDBptions

func NewRocksDBptions() *gorocksdb.Options

Note: rocksdb C binding does not support getters for option types

func SetEnableMircoShardId

func SetEnableMircoShardId(flag bool)

func WriteSliceTrackerStats

func WriteSliceTrackerStats(w io.Writer)

Types

type Config

type Config struct {

	// rocksdb: size_t write_buffer_size (Default: 64M)
	// Amount of data to build up in memory (backed by an unsorted log
	// on disk) before converting to a sorted on-disk file
	WriteBufferSize int

	// rocksdb: int max_write_buffer_number (2) (min: 2)
	// The maximum number of write buffers that are built up in memory.
	// The default and the minimum number is 2, so that when 1 write buffer
	// is being flushed to storage, new writes can continue to the other
	// write buffer.
	// If max_write_buffer_number > 3, writing will be slowed down to
	// options.delayed_write_rate if we are writing to the last write buffer
	// allowed.
	MaxWriteBufferNumber int

	// rocksdb: int min_write_buffer_number_to_merge (Default: 1)
	// The minimum number of write buffers that will be merged together
	// before writing to storage.  If set to 1, then
	// all write buffers are flushed to L0 as individual files and this increases
	// read amplification because a get request has to check in all of these
	// files. Also, an in-memory merge may result in writing lesser
	// data to storage if there are duplicate records in each of these
	// individual write buffers.
	MinWriteBufferNumberToMerge int

	// rocksdb: int level0_file_num_compaction_trigger (Default: 4)
	// Number of files to trigger level-0 compaction. A value <0 means that
	// level-0 compaction will not be triggered by number of files at all.
	Level0FileNumCompactionTrigger int

	// rocksdb: int level0_slowdown_writes_trigger (Default: 20)
	// Soft limit on number of level-0 files. We start slowing down writes at this
	// point. A value <0 means that no writing slow down will be triggered by
	// number of files in level-0
	Level0SlowdownWritesTrigger int

	// rocksdb: int level0_stop_writes_trigger (Default: 36)
	// Maximum number of level-0 files.  We stop writes at this point.
	Level0StopWritesTrigger int

	// rocksdb: unsigned int stats_dump_period_sec (Default: 600)
	// if not zero, dump rocksdb.stats to LOG every stats_dump_period_sec
	StatsDumpPeriodSec uint

	// rocksdb: uint64_t max_bytes_for_level_base (Default: 256M)
	// Control maximum total data size for a level.
	// max_bytes_for_level_base is the max total for level-1.
	// Maximum number of bytes for level L can be calculated as
	// (max_bytes_for_level_base) * (max_bytes_for_level_multiplier ^ (L-1))
	// For example, if max_bytes_for_level_base is 200MB, and if
	// max_bytes_for_level_multiplier is 10, total data size for level-1
	// will be 200MB, total file size for level-2 will be 2GB,
	// and total file size for level-3 will be 20GB
	MaxBytesForLevelBase uint64

	// rocksdb: double max_bytes_for_level_multiplier (Default: 10)
	MaxBytesForLevelMultiplier float64

	// rocksdb: uint64_t target_file_size_base (Default: 64M)
	TargetFileSizeBase uint64

	// rocksdb: int target_file_size_multiplier (Default: 1)
	TargetFileSizeMultiplier int

	// rocksdb: size_t keep_log_file_num (Default: 1000)
	KeepLogFileNum int

	// rocksdb: int max_background_flushes (Default: 1)
	// Maximum number of concurrent background memtable flush jobs, submitted to
	// the HIGH priority thread pool.
	//
	// By default, all background jobs (major compaction and memtable flush) go
	// to the LOW priority pool. If this option is set to a positive number,
	// memtable flush jobs will be submitted to the HIGH priority pool.
	// It is important when the same Env is shared by multiple db instances.
	// Without a separate pool, long running major compaction jobs could
	// potentially block memtable flush jobs of other db instances, leading to
	// unnecessary Put stalls.
	//
	// If you're increasing this, also consider increasing number of threads in
	// HIGH priority thread pool. For more information, see
	// Env::SetBackgroundThread
	MaxBackgroundFlushes int

	// rocksdb: int max_background_compactions (Default: 1)
	// Maximum number of concurrent background compaction jobs, submitted to
	// the default LOW priority thread pool.
	// We first try to schedule compactions based on
	// `base_background_compactions`. If the compaction cannot catch up , we
	// will increase number of compaction threads up to
	// `max_background_compactions`.
	// If you're increasing this, also consider increasing number of threads in
	// LOW priority thread pool. For more information, see
	// Env::SetBackgroundThreads
	MaxBackgroundCompactions int

	// rocksdb CompressionType compression (Default: kSnappyCompression)
	// if it's supported. If snappy is not linked
	// with the library, the default is kNoCompression.
	// Typical speeds of kSnappyCompression on an Intel(R) Core(TM)2 2.4GHz:
	//    ~200-500MB/s compression
	//    ~400-800MB/s decompression
	// Note that these speeds are significantly faster than most
	// persistent storage speeds, and therefore it is typically never
	// worth switching to kNoCompression.  Even if the input data is
	// incompressible, the kSnappyCompression implementation will
	// efficiently detect that and will switch to uncompressed mode.
	//  kNoCompression = 0x0,
	//  kSnappyCompression = 0x1,
	//  kZlibCompression = 0x2,
	//  kBZip2Compression = 0x3,
	//  kLZ4Compression = 0x4,
	//  kLZ4HCCompression = 0x5,
	//  kXpressCompression = 0x6,
	//  kZSTD = 0x7
	Compression gorocksdb.CompressionType

	//	DebugInfoLogLevel = InfoLogLevel(0)
	//	InfoInfoLogLevel  = InfoLogLevel(1)
	//	WarnInfoLogLevel  = InfoLogLevel(2)
	//	ErrorInfoLogLevel = InfoLogLevel(3)
	//	FatalInfoLogLevel = InfoLogLevel(4
	InfoLogLevel gorocksdb.InfoLogLevel

	// write option
	// rocksdb: bool sync (Default: false)
	// If true, the write will be flushed from the operating system
	// buffer cache (by calling WritableFile::Sync()) before the write
	// is considered complete.  If this flag is true, writes will be
	// slower.
	//
	// If this flag is false, and the machine crashes, some recent
	// writes may be lost.  Note that if it is just the process that
	// crashes (i.e., the machine does not reboot), no writes will be
	// lost even if sync==false.
	//
	// In other words, a DB write with sync==false has similar
	// crash semantics as the "write()" system call.  A DB write
	// with sync==true has similar crash semantics to a "write()"
	// system call followed by "fdatasync()".
	//
	WriteSync bool

	// write option
	// rocksdb: bool disableWAL (Default: false)
	// If true, writes will not first go to the write ahead log,
	// and the write may got lost after a crash.
	WriteDisableWAL bool

	RandomizeWriteBuffer bool

	RateBytesPerSec int64

	HighPriorityBackgroundThreads int

	LowPriorityBackgroundThreads int

	NewLRUCacheSizeInMB int

	DbPaths []DbPath

	WalDir string
}

Based on rocksdb 5.5.1

func (*Config) GetPaths

func (c *Config) GetPaths() []string

func (*Config) OnLoad

func (cfg *Config) OnLoad()

func (*Config) Validate

func (cfg *Config) Validate() (err error)

type DBError

type DBError struct {
	// contains filtered or unexported fields
}

func NewDBError

func NewDBError(e error) *DBError

func (*DBError) Error

func (e *DBError) Error() string

type DbPath

type DbPath struct {
	Path       string
	TargetSize uint64
}

type IDBSharding

type IDBSharding interface {
	// contains filtered or unexported methods
}

type IDatabase

type IDatabase interface {
	Setup()
	TruncateExpired()
	Shutdown()

	Put(id RecordID, value []byte) error
	Get(id RecordID, fetchExpired bool) (*Record, error)
	GetRecord(id RecordID, rec *Record) (recExists bool, err error)
	Delete(id RecordID) error

	IsPresent(id RecordID) (bool, error, *Record)
	IsRecordPresent(id RecordID, rec *Record) (bool, error)

	ReplicateSnapshot(shardId shard.ID, r *redist.Replicator, mshardid int32) bool
	ShardSupported(shardId shard.ID) bool
	UpdateRedistShards(shards shard.Map)
	UpdateShards(shards shard.Map)

	WriteProperty(propKey string, w io.Writer)
	GetIntProperty(propKey string) uint64
}

func GetDB

func GetDB() IDatabase

type MicroShardGroupStats

type MicroShardGroupStats struct {
	// contains filtered or unexported fields
}

type Record

type Record struct {
	RecordHeader
	Payload proto.Payload
	// contains filtered or unexported fields
}

func (*Record) ClearMarkedDelete

func (rec *Record) ClearMarkedDelete()

func (*Record) Decode

func (rec *Record) Decode(data []byte) error

/TODO validation. the slices

func (*Record) DecodeFrom

func (rec *Record) DecodeFrom(holder valueHolderI) error

func (*Record) EncodeRedistMsg

func (rec *Record) EncodeRedistMsg(shardId shard.ID, ns []byte, key []byte, row *proto.RawMessage) (err error)

func (*Record) EncodeToBuffer

func (rec *Record) EncodeToBuffer(buffer *bytes.Buffer) error

func (*Record) EncodingSize

func (rec *Record) EncodingSize() int

func (*Record) IsExpired

func (rec *Record) IsExpired() (expired bool)

func (*Record) IsMarkedDelete

func (rec *Record) IsMarkedDelete() bool

func (*Record) MarkDelete

func (rec *Record) MarkDelete()

func (*Record) PrettyPrint

func (rec *Record) PrettyPrint(w io.Writer)

func (*Record) ResetRecord

func (b *Record) ResetRecord()

func (*Record) String

func (b *Record) String() string

type RecordHeader

type RecordHeader struct {
	Version              uint32
	CreationTime         uint32
	LastModificationTime uint64
	ExpirationTime       uint32

	OriginatorRequestId proto.RequestId
	RequestId           proto.RequestId
	// contains filtered or unexported fields
}

type RecordID

type RecordID []byte

func NewRecordIDWithBuffer

func NewRecordIDWithBuffer(buf *bytes.Buffer, shardId shard.ID, microShardId uint8,
	namespace []byte, key []byte) RecordID

func (*RecordID) GetKey

func (id *RecordID) GetKey() []byte

func (*RecordID) GetKeyWithoutShardID

func (id *RecordID) GetKeyWithoutShardID() []byte

func (*RecordID) GetShardID

func (id *RecordID) GetShardID() shard.ID

func (RecordID) Key

func (recId RecordID) Key() uint32

func (RecordID) String

func (recId RecordID) String() string

type RocksDB

type RocksDB struct {
	// contains filtered or unexported fields
}

func (*RocksDB) Delete

func (r *RocksDB) Delete(recId RecordID) error

func (*RocksDB) Get

func (r *RocksDB) Get(recId RecordID, fetchExpired bool) (*Record, error)

func (*RocksDB) GetIntProperty

func (r *RocksDB) GetIntProperty(propKey string) uint64

func (*RocksDB) GetRecord

func (r *RocksDB) GetRecord(recId RecordID, rec *Record) (exist bool, err error)

Caller's responsibility to 1) zero'd rec before calling, and 2) free rec.holder if not nil afterwards

func (*RocksDB) IsPresent

func (r *RocksDB) IsPresent(key RecordID) (bool, error, *Record)

func (*RocksDB) IsRecordPresent

func (r *RocksDB) IsRecordPresent(recId RecordID, rec *Record) (existAndNotExpired bool, err error)

Caller's responsibility to 1) zero'd rec before calling, and 2) free rec.holder if not nil afterwards

func (*RocksDB) LogCalTransaction

func (r *RocksDB) LogCalTransaction(startT time.Time, name string, err error)

func (*RocksDB) Put

func (r *RocksDB) Put(id RecordID, value []byte) error

func (*RocksDB) ReplicateSnapshot

func (r *RocksDB) ReplicateSnapshot(shardId shard.ID, rb *redist.Replicator, mshardid int32) bool

Run in a seperate go routine - can only have one go routine running per instance at a time - be able to abort

func (*RocksDB) Setup

func (r *RocksDB) Setup()

initial set up

func (*RocksDB) ShardSupported

func (r *RocksDB) ShardSupported(shardId shard.ID) bool

func (*RocksDB) Shutdown

func (r *RocksDB) Shutdown()

TODO: need lock?

func (*RocksDB) TruncateExpired

func (r *RocksDB) TruncateExpired()

func (*RocksDB) UpdateRedistShards

func (r *RocksDB) UpdateRedistShards(shards shard.Map)

called by redist watcher to update the rocksb instances

func (*RocksDB) UpdateShards

func (r *RocksDB) UpdateShards(shards shard.Map)

func (*RocksDB) WriteProperty

func (r *RocksDB) WriteProperty(propKey string, w io.Writer)

type ShardFilter

type ShardFilter struct {
	// contains filtered or unexported fields
}

func (*ShardFilter) Disable

func (s *ShardFilter) Disable()

func (*ShardFilter) SetCompactionFilter

func (s *ShardFilter) SetCompactionFilter(opts *gorocksdb.Options, enable bool)

func (*ShardFilter) SetShardNum

func (s *ShardFilter) SetShardNum(shardNum int32)

type ShardingBase

type ShardingBase struct {
}

type ShardingByInstance

type ShardingByInstance struct {
	ShardingBase
	// contains filtered or unexported fields
}

type ShardingByPrefix

type ShardingByPrefix struct {
	ShardingBase
	DbNames     []string
	PrefixBytes int
	// contains filtered or unexported fields
}

func GetPrefixDB

func GetPrefixDB() *ShardingByPrefix

func (*ShardingByPrefix) CompactRangeByShard

func (s *ShardingByPrefix) CompactRangeByShard(shardId shard.ID) error

func (*ShardingByPrefix) DeleteFilesByShard

func (s *ShardingByPrefix) DeleteFilesByShard(shardId shard.ID) error

Directories

Path Synopsis
main.go
main.go

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL