riverdb

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2023 License: MIT Imports: 31 Imported by: 0

README

RiverDB

Static Badge GitHub License

 ███████   ██                         ███████   ██████
░██░░░░██ ░░             To the moon ░██░░░░██ ░█░░░░██
░██   ░██  ██ ██    ██  █████  ██████░██    ░██░█   ░██
░███████  ░██░██   ░██ ██░░░██░░██░░█░██    ░██░██████
░██░░░██  ░██░░██ ░██ ░███████ ░██ ░ ░██    ░██░█░░░░ ██
░██  ░░██ ░██ ░░████  ░██░░░░  ░██   ░██    ██ ░█    ░██
░██   ░░██░██  ░░██   ░░██████░███   ░███████  ░███████
░░     ░░ ░░    ░░     ░░░░░░ ░░░    ░░░░░░░   ░░░░░░░

RiverDB is a light-weight embeddable key-value nosql database, it is base on bitcask model and wal. Features as follows:

  • ACID transactions
  • record ttl
  • custom key sorting rules
  • range matching and iteration
  • event watcher
  • batch write and delete
  • targzip backup and recover from backup

RiverDB can be used as a standalone database or as an underlying storage engine.

The project is still under testing and stability cannot be guaranteed

install

it is a embeddable db, so you can use it in your code without network transportation.

go get -u github.com/246859/river

how to use

quick start

this is a simple example for use put and get operation.

import (
	"fmt"
	riverdb "github.com/246859/river"
)

func main() {
	// open the river db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir("riverdb"))
	if err != nil {
		panic(err)
	}
    defer db.Close()
	// put key-value pairs
	err = db.Put([]byte("key"), []byte("value"), 0)
	if err != nil {
		panic(err)
	}

	// get value from key
	value, err := db.Get([]byte("key"))
	if err != nil {
		panic(err)
	}
	fmt.Println(string(value))
}

Remember to close db after used up.

iteration

riverdb iteration is key-only.

import (
    "fmt"
    riverdb "github.com/246859/river"
)

func main() {
    // open the river db
    db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir("riverdb"))
    if err != nil {
       panic(err)
    }
    defer db.Close()
    // put key-value pairs
    err = db.Put([]byte("key"), []byte("value"), 0)
    if err != nil {
       panic(err)
    }

    // get value from key
    value, err := db.Get([]byte("key"))
    if err != nil {
       panic(err)
    }
    fmt.Println(string(value))

    db.Range(riverdb.RangeOptions{
       Min:     nil,
       Max:     nil,
       Pattern: nil,
       Descend: false,
    }, func(key riverdb.Key) bool {
       fmt.Println(key)
       return false
    })
}
transaction

simplely use transaction by Begin, Commit, RollBack APIs.

import (
	"errors"
	"fmt"
	riverdb "github.com/246859/river"
)

func main() {
	// open the river db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir("riverdb"))
	if err != nil {
		panic(err)
	}
	defer db.Close()
	txn, err := db.Begin(true)
	if err != nil {
		panic(err)
	}
	_, err = txn.Get([]byte("notfund"))
	if errors.Is(err, riverdb.ErrKeyNotFound) {
		fmt.Println("key not found")
	}else {
		txn.RollBack()
	}
	txn.Commit()
}

batch operation

batch operation has better performance than call db.Put or db.Del directly

import (
	"fmt"
	riverdb "github.com/246859/river"
	"os"
	"path/filepath"
	"strconv"
	"strings"
)

func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	if err != nil {
		panic(err)
	}
	// close
	defer db.Close()

	// open batch
	batch, err := db.Batch(riverdb.BatchOption{
		Size:        500,
		SyncOnFlush: true,
	})

	var rs []riverdb.Record
	var ks []riverdb.Key

	for i := 0; i < 1000; i++ {
		rs = append(rs, riverdb.Record{
			K:   []byte(strconv.Itoa(i)),
			V:   []byte(strings.Repeat("a", i)),
			TTL: 0,
		})
		ks = append(ks, rs[i].K)
	}

	// write all
	if err := batch.WriteAll(rs); err != nil {
		panic(err)
	}

	// delete all
	if err := batch.DeleteAll(ks); err != nil {
		panic(err)
	}

	// wait to batch finished
	if err := batch.Flush(); err != nil {
		panic(err)
	}

	// 2000
	fmt.Println(batch.Effected())
}
backup & recover

backup only archive data in datadir

import (
	riverdb "github.com/246859/river"
	"os"
	"path/filepath"
)

func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	if err != nil {
		panic(err)
	}
	// close
	defer db.Close()

	archive := filepath.Join(os.TempDir(), "example.tar.gz")
	err = db.Backup(archive)
	if err != nil {
		panic(err)
	}

	err = db.Recover(archive)
	if err != nil {
		panic(err)
	}
}
statistic
func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	if err != nil {
		panic(err)
	}
	// close
	defer db.Close()
	
    // statistic
	stats := db.Stats()
	fmt.Println(stats.DataSize)
	fmt.Println(stats.HintSize)
	fmt.Println(stats.KeyNums)
	fmt.Println(stats.RecordNums)
}
watcher

you can modfiy which event to watch in db option

import (
	"fmt"
	riverdb "github.com/246859/river"
	"os"
	"path/filepath"
	"time"
)

func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	if err != nil {
		panic(err)
	}

	done := make(chan struct{})
	go func() {
		watch, err := db.Watch()
		if err != nil {
			panic(err)
		}

		for event := range watch {
			fmt.Printf("%+v\n", event)
		}
		done <- struct{}{}
	}()

	err = db.Put([]byte("1"), []byte("1"), 0)
	if err != nil {
		panic(err)
	}
	db.Put([]byte("2"), []byte("3"), 0)
	db.Del([]byte("2"))

	time.Sleep(time.Second * 2)
	db.Close()
	<-done
}

benchmark

goos: windows
goarch: amd64
pkg: github.com/246859/river
cpu: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz
BenchmarkDB_B
BenchmarkDB_B/benchmarkDB_Put_value_16B
BenchmarkDB_B/benchmarkDB_Put_value_16B-16                108243             11050 ns/op           25658 B/op         27 allocs/op
BenchmarkDB_B/benchmarkDB_Put_value_64B
BenchmarkDB_B/benchmarkDB_Put_value_64B-16                 91654             11353 ns/op           25688 B/op         27 allocs/op
BenchmarkDB_B/benchmarkDB_Put_value_128B
BenchmarkDB_B/benchmarkDB_Put_value_128B-16                99914             12012 ns/op           25744 B/op         27 allocs/op
BenchmarkDB_B/benchmarkDB_Put_value_256B
BenchmarkDB_B/benchmarkDB_Put_value_256B-16                89656             12834 ns/op           25929 B/op         28 allocs/op
BenchmarkDB_B/benchmarkDB_Put_value_512B
BenchmarkDB_B/benchmarkDB_Put_value_512B-16                79682             14194 ns/op           26328 B/op         28 allocs/op
BenchmarkDB_B/benchmarkDB_Get_B
BenchmarkDB_B/benchmarkDB_Get_B-16                        472155              2478 ns/op            1322 B/op          5 allocs/op
BenchmarkDB_B/benchmarkDB_Del_B
BenchmarkDB_B/benchmarkDB_Del_B-16                        642699              1752 ns/op             427 B/op          7 allocs/op
BenchmarkDB_KB
BenchmarkDB_KB/benchmarkDB_Put_value_1KB
BenchmarkDB_KB/benchmarkDB_Put_value_1KB-16                58642             20106 ns/op           26844 B/op         28 allocs/op
BenchmarkDB_KB/benchmarkDB_Put_value_16KB
BenchmarkDB_KB/benchmarkDB_Put_value_16KB-16               13563             85796 ns/op           45704 B/op         28 allocs/op
BenchmarkDB_KB/benchmarkDB_Put_value_64KB
BenchmarkDB_KB/benchmarkDB_Put_value_64KB-16                3909            296592 ns/op          151887 B/op         32 allocs/op
BenchmarkDB_KB/benchmarkDB_Put_value_256KB
BenchmarkDB_KB/benchmarkDB_Put_value_256KB-16               1083           1208055 ns/op          725232 B/op         39 allocs/op
BenchmarkDB_KB/benchmarkDB_Put_value_512KB
BenchmarkDB_KB/benchmarkDB_Put_value_512KB-16                466           2341486 ns/op         1636059 B/op         45 allocs/op
BenchmarkDB_KB/benchmarkDB_Get_KB
BenchmarkDB_KB/benchmarkDB_Get_KB-16                       38292             27245 ns/op           41288 B/op          5 allocs/op
BenchmarkDB_KB/benchmarkDB_Del_KB
BenchmarkDB_KB/benchmarkDB_Del_KB-16                     1091222              1041 ns/op             266 B/op          6 allocs/op
BenchmarkDB_MB
BenchmarkDB_MB/benchmarkDB_Put_value_1MB
BenchmarkDB_MB/benchmarkDB_Put_value_1MB-16                  217           4825615 ns/op         1607356 B/op         50 allocs/op
BenchmarkDB_MB/benchmarkDB_Put_value_8MB
BenchmarkDB_MB/benchmarkDB_Put_value_8MB-16                   94          35765929 ns/op        16788028 B/op        164 allocs/op
BenchmarkDB_MB/benchmarkDB_Put_value_16MB
BenchmarkDB_MB/benchmarkDB_Put_value_16MB-16                  27          63377081 ns/op        41576377 B/op        259 allocs/op
BenchmarkDB_MB/benchmarkDB_Put_value_32MB
BenchmarkDB_MB/benchmarkDB_Put_value_32MB-16                  37         170156170 ns/op        117083367 B/op       651 allocs/op
BenchmarkDB_MB/benchmarkDB_Put_value_64MB
BenchmarkDB_MB/benchmarkDB_Put_value_64MB-16                   4         250742625 ns/op        194236838 B/op       901 allocs/op
BenchmarkDB_MB/benchmarkDB_Get_MB
BenchmarkDB_MB/benchmarkDB_Get_MB-16                        2660            381140 ns/op          805636 B/op         14 allocs/op
BenchmarkDB_MB/benchmarkDB_Del_MB
BenchmarkDB_MB/benchmarkDB_Del_MB-16                     1749214               669.4 ns/op           234 B/op          6 allocs/op
BenchmarkDB_Mixed
BenchmarkDB_Mixed/benchmarkDB_Put_Mixed_64B
BenchmarkDB_Mixed/benchmarkDB_Put_Mixed_64B-16             96477             11841 ns/op           25712 B/op         27 allocs/op
BenchmarkDB_Mixed/benchmarkDB_Put_Mixed_128KB
BenchmarkDB_Mixed/benchmarkDB_Put_Mixed_128KB-16            2374            611411 ns/op          314365 B/op         35 allocs/op
BenchmarkDB_Mixed/benchmarkDB_Put_Mixed_MB
BenchmarkDB_Mixed/benchmarkDB_Put_Mixed_MB-16                226           4977799 ns/op         3562019 B/op         57 allocs/op
BenchmarkDB_Mixed/benchmarkDB_Get_Mixed
BenchmarkDB_Mixed/benchmarkDB_Get_Mixed-16                 60637             20533 ns/op           34072 B/op          5 allocs/op
BenchmarkDB_Mixed/benchmarkDB_Del_Mixed
BenchmarkDB_Mixed/benchmarkDB_Del_Mixed-16               1073124              1046 ns/op             262 B/op          6 allocs/op
PASS
ok      github.com/246859/river 49.126s

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrKeyNotFound    = errors.New("key not found")
	ErrNilKey         = entry.ErrNilKey
	ErrDBClosed       = errors.New("db is already closed")
	ErrDirAlreadyUsed = errors.New("dir already used by another process")
)
View Source
var (
	ErrTxnClosed   = errors.New("transaction is closed")
	ErrTxnReadonly = errors.New("transaction is read-only")
	ErrTxnConflict = errors.New("transaction is conflict")
)
View Source
var DefaultOptions = Options{
	MaxSize:        defaultMaxFileSize,
	BlockCache:     defaultMaxFileSize / types.MB,
	Fsync:          false,
	FsyncThreshold: blockSize * (defaultMaxFileSize / types.MB),
	Compare:        index.DefaultCompare,
	WatchSize:      2000,
	WatchEvents:    []EventType{PutEvent, DelEvent},
}
View Source
var ErrBatchClosed = errors.New("batch is closed")
View Source
var (
	ErrMergedNotFinished = errors.New("merged not finished")
)
View Source
var (
	ErrWatcherDisabled = errors.New("event watcher disabled")
)

Functions

func FastRand

func FastRand() uint32

FastRand is a fast thread local random function.

Types

type Batch added in v0.2.0

type Batch struct {
	// contains filtered or unexported fields
}

Batch operator

func (*Batch) DeleteAll added in v0.2.0

func (ba *Batch) DeleteAll(keys []Key) error

DeleteAll delete all given key matching records from db in batchTxn

func (*Batch) Effected added in v0.2.0

func (ba *Batch) Effected() int64

Effected returns how many records has been effected

func (*Batch) Error added in v0.2.0

func (ba *Batch) Error() error

Error returns the error occurred in the batch operation

func (*Batch) Flush added in v0.2.0

func (ba *Batch) Flush() error

Flush close Batch, then waiting for the remaining batchTxn to complete, and call db.Sync to Flush db finally.

func (*Batch) WriteAll added in v0.2.0

func (ba *Batch) WriteAll(records []Record) error

WriteAll writes all given records to db in batchTxn

type BatchOption added in v0.2.0

type BatchOption struct {
	// size of per batch
	Size int64
	// call Fsync after per batch has been written
	SyncPerBatch bool
	// call Fsync after all batch finished, not recommended enabled both SyncPerBatch and SyncOnFlush simultaneously
	// if both of SyncPerBatch and SyncOnFlush is false, batch will apply sync rules in db.options
	SyncOnFlush bool
}

type BitFlag added in v0.2.0

type BitFlag uint64

BitFlag 64-bit mask is used to store different status information

func (*BitFlag) Check added in v0.2.0

func (bf *BitFlag) Check(flags ...uint64) bool

func (*BitFlag) Revoke added in v0.2.0

func (bf *BitFlag) Revoke(flags ...uint64)

func (*BitFlag) Store added in v0.2.0

func (bf *BitFlag) Store(flags ...uint64)

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB represents a db instance, which stores wal file in a specific data directory

func Open

func Open(options Options, opts ...Option) (*DB, error)

Open returns a river db instance

func OpenWithCtx added in v0.1.0

func OpenWithCtx(ctx context.Context, options Options, opts ...Option) (*DB, error)

OpenWithCtx returns a river db instance with context

func (*DB) Backup added in v0.1.0

func (db *DB) Backup(destpath string) error

Backup use tar gzip to compress data wal files to dest path

func (*DB) Batch added in v0.2.0

func (db *DB) Batch(opt BatchOption) (*Batch, error)

Batch provides ability to write or delete entries in batch which is update-only, it will split given records into several batches, and run a new goroutine for each batch to handle in batchTxn. each batch records will be stored in memory temporarily, then they will be written to the database in batch finally update db index if commit succeeds.

func (*DB) Begin

func (db *DB) Begin(readonly bool) (*Txn, error)

Begin begins a new transaction

func (*DB) Close

func (db *DB) Close() error

Close closes db Once the db is closed, it can no longer be used

func (*DB) Del

func (db *DB) Del(key Key) error

Del remove the key-value pair match the give key from db. it will return nil if key not exist

func (*DB) Expire

func (db *DB) Expire(key Key, ttl time.Duration) error

Expire update ttl of the specified key if ttl <= 0, the key will never expired

func (*DB) Get

func (db *DB) Get(key Key) (Value, error)

Get returns value match the given key, if it expired or not found db will return ErrKeyNotFound. nil Key is not allowed.

func (*DB) Merge

func (db *DB) Merge(domerge bool) error

Merge clean the redundant data entry in db, shrinking the db size if domerge is false, it will only record of merged data, will not replace them to data dir

func (*DB) Purge

func (db *DB) Purge() error

Purge remove all entries from data wal

func (*DB) Put

func (db *DB) Put(key Key, value Value, ttl time.Duration) error

Put puts a key-value pair into db, overwrite value if key already exists. nil key is invalid, but nil value is allowed, it will be overwritten to empty []byte. if tll == 0, key will be persisted, or ttl < 0, key will apply the previous ttl.

func (*DB) Range

func (db *DB) Range(option RangeOptions, handler RangeHandler) error

Range iterates over all the keys that match the given RangeOption and call handler for each key-value

func (*DB) Recover added in v0.1.0

func (db *DB) Recover(srcpath string) error

Recover recovers wal files from specified targz archive. it will purge current data, and overwrite by the backup.

func (*DB) Stats added in v0.2.0

func (db *DB) Stats() Stats

func (*DB) Sync

func (db *DB) Sync() error

Sync syncs written buffer to disk

func (*DB) TTL

func (db *DB) TTL(key Key) (time.Duration, error)

TTL returns left live time of the specified key

func (*DB) Watch added in v0.1.0

func (db *DB) Watch() (<-chan *Event, error)

type Event added in v0.1.0

type Event struct {
	Type  EventType
	Value any
}

Event represents a push event

type EventType added in v0.1.0

type EventType uint
const (
	PutEvent EventType = 1 + iota
	DelEvent
	RollbackEvent
	MergeEvent
	BackupEvent
	RecoverEvent
)

type Key

type Key = []byte

Key db key type

type Option

type Option func(option *Options)

Option applying changes to the given option

func WithBlockCache

func WithBlockCache(block uint32) Option

func WithClosedGc

func WithClosedGc(gc bool) Option

func WithCompare

func WithCompare(compare index.Compare) Option

func WithDir

func WithDir(dir string) Option

func WithFsync

func WithFsync(sync bool) Option

func WithFsyncThreshold

func WithFsyncThreshold(threshold int64) Option

func WithMaxSize

func WithMaxSize(size int64) Option

func WithWatchEvent added in v0.1.0

func WithWatchEvent(events ...EventType) Option

func WithWatchSize added in v0.1.0

func WithWatchSize(size int) Option

type Options

type Options struct {
	// data dir that stores data files
	Dir string
	// max bytes size of the single data file can hold
	MaxSize int64
	// wal block cache size
	BlockCache uint32
	// call sync per write
	Fsync bool
	// call sync when reach the threshold
	FsyncThreshold int64
	// kv put/get events size for watch queue, disabled if is 0
	WatchSize int
	// specified events to watch
	WatchEvents []EventType
	// decide how to sort keys
	Compare index.Compare
	// manually gc after closed db
	ClosedGc bool
	// contains filtered or unexported fields
}

Options represent db configuration

type RangeHandler

type RangeHandler = func(key Key) bool

RangeHandler iterate over key-value in db

type RangeOptions

type RangeOptions = index.RangeOption

RangeOptions is alias of index.RangeOption

type Record added in v0.2.0

type Record struct {
	K   Key
	V   Value
	TTL time.Duration
}

Record is a user-oriented struct representing an entry data in db

type Stats added in v0.2.0

type Stats struct {
	// number of key in db
	KeyNums int64
	// number of values in db, due to bitcask model is append-only, it usually greater than KeyNums.
	// normally, result of RecordNums / KeyNums can be used to determine if is needed to merge the wal files
	RecordNums int64
	// real data size
	DataSize int64
	// hint file size
	HintSize int64
}

Stats represents a simple statistics information of db at a moment

type Txn

type Txn struct {
	// contains filtered or unexported fields
}

Txn represents a transaction

func (*Txn) Commit

func (txn *Txn) Commit() error

Commit once a transaction committed successfully, its affects will act on database forever. if crashes, db will reload transaction data from wal and update into memory index.

func (*Txn) Del

func (txn *Txn) Del(key Key) error

func (*Txn) Expire

func (txn *Txn) Expire(key Key, ttl time.Duration) error

func (*Txn) Get

func (txn *Txn) Get(key Key) (Value, error)

func (*Txn) Put

func (txn *Txn) Put(key Key, value Value, ttl time.Duration) error

func (*Txn) Range

func (txn *Txn) Range(opt RangeOptions, handler RangeHandler) error

func (*Txn) RollBack

func (txn *Txn) RollBack() error

RollBack could promise consistence of database. if transaction has been committed successfully, it will return ErrTxnClosed and be ignored, otherwise RollBack will write a flag record to make sure data written in this transaction will never be seen, then discard this transaction.

func (*Txn) TTL

func (txn *Txn) TTL(key Key) (time.Duration, error)

type Value

type Value = []byte

Value db value type

Directories

Path Synopsis
pkg
crc
str

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL