riverdb

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2023 License: MIT Imports: 31 Imported by: 0

README

RiverDB

Static Badge GitHub License

 ███████   ██                         ███████   ██████
░██░░░░██ ░░             To the moon ░██░░░░██ ░█░░░░██
░██   ░██  ██ ██    ██  █████  ██████░██    ░██░█   ░██
░███████  ░██░██   ░██ ██░░░██░░██░░█░██    ░██░██████
░██░░░██  ░██░░██ ░██ ░███████ ░██ ░ ░██    ░██░█░░░░ ██
░██  ░░██ ░██ ░░████  ░██░░░░  ░██   ░██    ██ ░█    ░██
░██   ░░██░██  ░░██   ░░██████░███   ░███████  ░███████
░░     ░░ ░░    ░░     ░░░░░░ ░░░    ░░░░░░░   ░░░░░░░

RiverDB is a light-weight embeddable key-value nosql database, it is base on bitcask model and wal. Features as follows:

  • ACID transactions
  • record ttl
  • custom key sorting rules
  • range matching and iteration
  • event watcher
  • batch write and delete
  • targzip backup and recover from backup

RiverDB can be used as a standalone database or as an underlying storage engine.

The project is still under testing and stability cannot be guaranteed

install

it is a embeddable db, so you can use it in your code without network transportation.

go get -u github.com/246859/river

how to use

quick start

this is a simple example for use put and get operation.

import (
	"fmt"
	riverdb "github.com/246859/river"
)

func main() {
	// open the river db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir("riverdb"))
	if err != nil {
		panic(err)
	}
    defer db.Close()
	// put key-value pairs
	err = db.Put([]byte("key"), []byte("value"), 0)
	if err != nil {
		panic(err)
	}

	// get value from key
	value, err := db.Get([]byte("key"))
	if err != nil {
		panic(err)
	}
	fmt.Println(string(value))
}

Remember to close db after used up.

iteration

riverdb iteration is key-only.

import (
    "fmt"
    riverdb "github.com/246859/river"
)

func main() {
    // open the river db
    db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir("riverdb"))
    if err != nil {
       panic(err)
    }
    defer db.Close()
    // put key-value pairs
    err = db.Put([]byte("key"), []byte("value"), 0)
    if err != nil {
       panic(err)
    }

    // get value from key
    value, err := db.Get([]byte("key"))
    if err != nil {
       panic(err)
    }
    fmt.Println(string(value))

    db.Range(riverdb.RangeOptions{
       Min:     nil,
       Max:     nil,
       Pattern: nil,
       Descend: false,
    }, func(key riverdb.Key) bool {
       fmt.Println(key)
       return false
    })
}
transaction

simplely use transaction by Begin, Commit, RollBack APIs.

import (
	"errors"
	"fmt"
	riverdb "github.com/246859/river"
)

func main() {
	// open the river db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir("riverdb"))
	if err != nil {
		panic(err)
	}
	defer db.Close()
	txn, err := db.Begin(true)
	if err != nil {
		panic(err)
	}
	_, err = txn.Get([]byte("notfund"))
	if errors.Is(err, riverdb.ErrKeyNotFound) {
		fmt.Println("key not found")
	}else {
		txn.RollBack()
	}
	txn.Commit()
}

batch operation

batch operation has better performance than call db.Put or db.Del directly in large amount of data

import (
	"fmt"
	riverdb "github.com/246859/river"
	"os"
	"path/filepath"
	"strconv"
	"strings"
)

func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	if err != nil {
		panic(err)
	}
	// close
	defer db.Close()

	// open batch
	batch, err := db.Batch(riverdb.BatchOption{
		Size:        500,
		SyncOnFlush: true,
	})

	var rs []riverdb.Record
	var ks []riverdb.Key

	for i := 0; i < 1000; i++ {
		rs = append(rs, riverdb.Record{
			K:   []byte(strconv.Itoa(i)),
			V:   []byte(strings.Repeat("a", i)),
			TTL: 0,
		})
		ks = append(ks, rs[i].K)
	}

	// write all
	if err := batch.WriteAll(rs); err != nil {
		panic(err)
	}

	// delete all
	if err := batch.DeleteAll(ks); err != nil {
		panic(err)
	}

	// wait to batch finished
	if err := batch.Flush(); err != nil {
		panic(err)
	}

	// 2000
	fmt.Println(batch.Effected())
}
backup & recover

backup only archive data in datadir

import (
	riverdb "github.com/246859/river"
	"os"
	"path/filepath"
)

func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	if err != nil {
		panic(err)
	}
	// close
	defer db.Close()

	archive := filepath.Join(os.TempDir(), "example.tar.gz")
	err = db.Backup(archive)
	if err != nil {
		panic(err)
	}

	err = db.Recover(archive)
	if err != nil {
		panic(err)
	}
}
statistic
func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	if err != nil {
		panic(err)
	}
	// close
	defer db.Close()
	
    // statistic
	stats := db.Stats()
	fmt.Println(stats.DataSize)
	fmt.Println(stats.HintSize)
	fmt.Println(stats.KeyNums)
	fmt.Println(stats.RecordNums)
}
watcher

you can modfiy which event to watch in db option

import (
	"fmt"
	riverdb "github.com/246859/river"
	"os"
	"path/filepath"
	"sync"
	"time"
)

func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	defer db.Close()
	if err != nil {
		panic(err)
	}

	var wg sync.WaitGroup
	wg.Add(1)

	watcher, err := db.Watcher(riverdb.PutEvent)
	if err != nil {
		panic(err)
	}

	db.Put([]byte("hello world"), []byte("world"), 0)

	go func() {
		defer wg.Done()
		listen, err := watcher.Listen()
		if err != nil {
			panic(err)
		}

		for event := range listen {
			fmt.Println(event)
		}
	}()

	time.Sleep(time.Second)
	watcher.Close()

	wg.Wait()
}
merge

you can use db.Merge to proactively clean up redundant data in the database.

import (
	riverdb "github.com/246859/river"
	"os"
	"path/filepath"
)

func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	defer db.Close()
	if err != nil {
		panic(err)
	}

	db.Merge(true)
}

set Options.MergeCheckup=0 if you want to disable the default merge check up job.

benchmark

go to view the benchmark

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrKeyNotFound    = errors.New("key not found")
	ErrNilKey         = entry.ErrNilKey
	ErrDBClosed       = errors.New("db is already closed")
	ErrDirAlreadyUsed = errors.New("dir already used by another process")
)
View Source
var (
	ErrTxnClosed   = errors.New("transaction is closed")
	ErrTxnReadonly = errors.New("transaction is read-only")
	ErrTxnConflict = errors.New("transaction is conflict")
)
View Source
var (
	ErrWatcherClosed   = errors.New("watcher is closed")
	ErrInvalidEvent    = errors.New("invalid event")
	ErrWatcherDisabled = errors.New("event watcher disabled")
)
View Source
var DefaultOptions = Options{
	MaxSize:         defaultMaxFileSize,
	BlockCache:      defaultMaxFileSize / types.MB,
	Fsync:           false,
	FsyncThreshold:  blockSize * (defaultMaxFileSize / types.MB) / 4,
	Compare:         index.DefaultCompare,
	WatchSize:       2000,
	WatchEvents:     []EventType{PutEvent, DelEvent},
	MergeCheckpoint: 3.5,
}
View Source
var ErrBatchClosed = errors.New("batch is closed")
View Source
var (
	ErrMergedNotFinished = errors.New("merged not finished")
)

Functions

func FastRand

func FastRand() uint32

FastRand is a fast thread local random function.

Types

type Batch added in v0.2.0

type Batch struct {
	// contains filtered or unexported fields
}

Batch operator

func (*Batch) DeleteAll added in v0.2.0

func (ba *Batch) DeleteAll(keys []Key) error

DeleteAll delete all given key matching records from db in batchTxn

func (*Batch) Effected added in v0.2.0

func (ba *Batch) Effected() int64

Effected returns how many records has been effected

func (*Batch) Error added in v0.2.0

func (ba *Batch) Error() error

Error returns the error occurred in the batch operation

func (*Batch) Flush added in v0.2.0

func (ba *Batch) Flush() error

Flush close Batch, then waiting for the remaining batchTxn to complete, and call db.Sync to Flush db finally.

func (*Batch) WriteAll added in v0.2.0

func (ba *Batch) WriteAll(records []Record) error

WriteAll writes all given records to db in batchTxn

type BatchOption added in v0.2.0

type BatchOption struct {
	// size of per batch
	Size int64
	// call Fsync after per batch has been written
	SyncPerBatch bool
	// call Fsync after all batch finished, not recommended enabled both SyncPerBatch and SyncOnFlush simultaneously
	// if both of SyncPerBatch and SyncOnFlush is false, batch will apply sync rules in db.options
	SyncOnFlush bool
}

type BitFlag added in v0.2.0

type BitFlag uint64

BitFlag 64-bit mask is used to store different status information

func (*BitFlag) Check added in v0.2.0

func (bf *BitFlag) Check(flags ...uint64) bool

func (*BitFlag) Revoke added in v0.2.0

func (bf *BitFlag) Revoke(flags ...uint64)

func (*BitFlag) Store added in v0.2.0

func (bf *BitFlag) Store(flags ...uint64)

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB represents a db instance, which stores wal file in a specific data directory

func Open

func Open(options Options, opts ...Option) (*DB, error)

Open returns a river db instance

func OpenWithCtx added in v0.1.0

func OpenWithCtx(ctx context.Context, options Options, opts ...Option) (*DB, error)

OpenWithCtx returns a river db instance with context

func (*DB) Backup added in v0.1.0

func (db *DB) Backup(destpath string) error

Backup use tar gzip to compress data wal files to dest path

func (*DB) Batch added in v0.2.0

func (db *DB) Batch(opt BatchOption) (*Batch, error)

Batch provides ability to write or delete entries in batch which is update-only, it will split given records into several batches, and run a new goroutine for each batch to handle in batchTxn. each batch records will be stored in memory temporarily, then they will be written to the database in batch finally update db index if commit succeeds.

func (*DB) Begin

func (db *DB) Begin(readonly bool) (*Txn, error)

Begin begins a new transaction

func (*DB) Close

func (db *DB) Close() error

Close closes db Once the db is closed, it can no longer be used

func (*DB) Del

func (db *DB) Del(key Key) error

Del remove the key-value pair match the give key from db. it will return nil if key not exist

func (*DB) Expire

func (db *DB) Expire(key Key, ttl time.Duration) error

Expire update ttl of the specified key if ttl <= 0, the key will never expired

func (*DB) Get

func (db *DB) Get(key Key) (Value, error)

Get returns value match the given key, if it expired or not found db will return ErrKeyNotFound. nil Key is not allowed.

func (*DB) Merge

func (db *DB) Merge(domerge bool) error

Merge clean the redundant data entry in db, shrinking the db size if domerge is false, it will only record of merged data, will not replace them to data dir

func (*DB) Purge

func (db *DB) Purge() error

Purge remove all entries from data wal

func (*DB) Put

func (db *DB) Put(key Key, value Value, ttl time.Duration) error

Put puts a key-value pair into db, overwrite value if key already exists. nil key is invalid, but nil value is allowed, it will be overwritten to empty []byte. if tll == 0, key will be persisted, or ttl < 0, key will apply the previous ttl.

func (*DB) Range

func (db *DB) Range(option RangeOptions, handler RangeHandler) error

Range iterates over all the keys that match the given RangeOption and call handler for each key-value

func (*DB) Recover added in v0.1.0

func (db *DB) Recover(srcpath string) error

Recover recovers wal files from specified targz archive. it will purge current data, and overwrite by the backup.

func (*DB) Stats added in v0.2.0

func (db *DB) Stats() Stats

func (*DB) Sync

func (db *DB) Sync() error

Sync syncs written buffer to disk

func (*DB) TTL

func (db *DB) TTL(key Key) (time.Duration, error)

TTL returns left live time of the specified key

func (*DB) Watcher added in v0.2.2

func (db *DB) Watcher(name string, events ...EventType) (*Watcher, error)

Watcher returns a new event watcher with the given event type, if events is empty, it will apply db.Option

type Event added in v0.1.0

type Event struct {
	Type  EventType
	Value any
}

Event represents a push event

type EventType added in v0.1.0

type EventType uint
const (
	PutEvent EventType = 1 + iota
	DelEvent
	RollbackEvent
	MergeEvent
	BackupEvent
	RecoverEvent
)

func (EventType) String added in v0.2.2

func (e EventType) String() string

type Key

type Key = []byte

Key db key type

type Option

type Option func(option *Options)

Option applying changes to the given option

func WithBlockCache

func WithBlockCache(block uint32) Option

func WithClosedGc

func WithClosedGc(gc bool) Option

func WithCompare

func WithCompare(compare index.Compare) Option

func WithDir

func WithDir(dir string) Option

func WithFsync

func WithFsync(sync bool) Option

func WithFsyncThreshold

func WithFsyncThreshold(threshold int64) Option

func WithMaxSize

func WithMaxSize(size int64) Option

func WithWatchEvent added in v0.1.0

func WithWatchEvent(events ...EventType) Option

func WithWatchSize added in v0.1.0

func WithWatchSize(size int) Option

type Options

type Options struct {
	// data dir that stores data files
	Dir string
	// max bytes size of the single data file can hold
	MaxSize int64
	// wal block cache size
	BlockCache uint32
	// call sync per write
	Fsync bool
	// call sync when reach the threshold
	FsyncThreshold int64
	// kv put/get events size for watch queue, disabled if is 0
	WatchSize int
	// specified events to watch
	WatchEvents []EventType
	// decide how to sort keys
	Compare index.Compare
	// manually gc after closed db to release memory used by index
	ClosedGc bool
	// check point of auto merge, disabled if is 0
	MergeCheckpoint float64
	// contains filtered or unexported fields
}

Options represent db configuration

type RangeHandler

type RangeHandler = func(key Key) bool

RangeHandler iterate over key-value in db

type RangeOptions

type RangeOptions = index.RangeOption

RangeOptions is alias of index.RangeOption

type Record added in v0.2.0

type Record struct {
	K   Key
	V   Value
	TTL time.Duration
}

Record is a user-oriented struct representing an entry data in db

type Stats added in v0.2.0

type Stats struct {
	// number of key in db
	KeyNums int64
	// number of values in db, due to bitcask model is append-only, it usually greater than KeyNums.
	// normally, result of RecordNums / KeyNums can be used to determine if is needed to merge the wal files
	RecordNums int64
	// real data size
	DataSize int64
	// hint file size
	HintSize int64
}

Stats represents a simple statistics information of db at a moment

type Txn

type Txn struct {
	// contains filtered or unexported fields
}

Txn represents a transaction

func (*Txn) Commit

func (txn *Txn) Commit() error

Commit once a transaction committed successfully, its affects will act on database forever. if crashes, db will reload transaction data from wal and update into memory index.

func (*Txn) Del

func (txn *Txn) Del(key Key) error

func (*Txn) Expire

func (txn *Txn) Expire(key Key, ttl time.Duration) error

func (*Txn) Get

func (txn *Txn) Get(key Key) (Value, error)

func (*Txn) Put

func (txn *Txn) Put(key Key, value Value, ttl time.Duration) error

func (*Txn) Range

func (txn *Txn) Range(opt RangeOptions, handler RangeHandler) error

func (*Txn) RollBack

func (txn *Txn) RollBack() error

RollBack could promise consistence of database. if transaction has been committed successfully, it will return ErrTxnClosed and be ignored, otherwise RollBack will write a flag record to make sure data written in this transaction will never be seen, then discard this transaction.

func (*Txn) TTL

func (txn *Txn) TTL(key Key) (time.Duration, error)

type Value

type Value = []byte

Value db value type

type Watcher added in v0.2.2

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher a watcher should be closed after it used up all, or it will be closed when db closed at last

func (*Watcher) Close added in v0.2.2

func (w *Watcher) Close() error

Close closes the watcher.

func (*Watcher) Listen added in v0.2.2

func (w *Watcher) Listen() (<-chan *Event, error)

func (*Watcher) Name added in v0.2.2

func (w *Watcher) Name() string

Directories

Path Synopsis
pkg
crc
str

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL