storage

package
v0.6.1-1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2017 License: GPL-3.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NEW_JOB = iota
	CLOSE
)
View Source
const (
	// Assume hourly backups and keep a day's worth.
	// If we take backups every 5 minutes, this will keep
	// two hours worth.
	DefaultKeepBackupsCount int = 24
)

Variables

View Source
var (
	// The default maximum size of a queue.
	// Further Pushes will result in an error.
	//
	// This is known as "back pressue" and is important to
	// prevent bugs in one component from taking down the
	// entire system.
	DefaultMaxSize = uint64(100000)
)
View Source
var (
	ONE = make([]byte, 8)
)
View Source
var (
	ValidQueueName = regexp.MustCompile(`\A[a-zA-Z0-9._-]+\z`)
)

Functions

func DefaultOptions

func DefaultOptions() *gorocksdb.Options

Types

type Backpressure

type Backpressure struct {
	QueueName   string
	CurrentSize uint64
	MaxSize     uint64
}

func (Backpressure) Error

func (bp Backpressure) Error() string

type BackupInfo

type BackupInfo struct {
	Id        int64
	FileCount int32
	Size      int64
	Timestamp int64
}

type Queue

type Queue interface {
	Name() string
	Size() uint64
	Push(uint8, []byte) error
	Pop() ([]byte, error)
	BPop(context.Context) ([]byte, error)
	Clear() (uint64, error)

	// Please note that k/vs are NOT safe to use outside of the func.
	// You must copy the values if you want to stash them for later use.
	//
	//	  cpy = make([]byte, len(k))
	//	  copy(cpy, k)
	Each(func(index int, k, v []byte) error) error
	Page(int64, int64, func(index int, k, v []byte) error) error

	Delete(keys [][]byte) error
}

type QueueWaiter

type QueueWaiter struct {
	// contains filtered or unexported fields
}

type SortedSet

type SortedSet interface {
	Name() string
	Size() int64
	Clear() (int64, error)

	AddElement(timestamp string, jid string, payload []byte) error

	Get(key []byte) ([]byte, error)
	Page(int64, int64, func(index int, key []byte, data []byte) error) error
	Each(func(idx int, key []byte, data []byte) error) error

	Remove(key []byte) error
	RemoveElement(timestamp string, jid string) error
	RemoveBefore(timestamp string) ([][]byte, error)

	// Move the given key from this SortedSet to the given
	// SortedSet atomically.  The given func may mutate the payload and
	// return a new tstamp.
	MoveTo(SortedSet, string, string, func([]byte) (string, []byte, error)) error
}

type Store

type Store interface {
	Close() error
	Retries() SortedSet
	Scheduled() SortedSet
	Working() SortedSet
	Dead() SortedSet
	GetQueue(string) (Queue, error)
	EachQueue(func(Queue))
	Stats() map[string]string
	EnqueueAll(SortedSet) error
	EnqueueFrom(SortedSet, []byte) error

	History(days int, fn func(day string, procCnt int64, failCnt int64)) error
	Success() error
	Processed() int64
	Failure() error
	Failures() int64

	// creates a backup of the current database
	Backup() error
	EachBackup(func(bi BackupInfo)) error
	RestoreFromLatest() error
	PurgeOldBackups(int) error

	// Clear the database of all job data.
	// Equivalent to Redis's FLUSHDB
	Flush() error
}

func Open

func Open(dbtype string, path string) (Store, error)

func OpenRocks

func OpenRocks(path string) (Store, error)

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
adapted from https://github.com/cngkaygusuz/BrodalOkasakiHeap
adapted from https://github.com/cngkaygusuz/BrodalOkasakiHeap

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL