storage

package
v0.15.0-1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2020 License: GPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNilValue = errors.New("Nil value not allowed")
)
View Source
var (
	ValidQueueName = regexp.MustCompile(`\A[a-zA-Z0-9._-]+\z`)
)

Functions

func BootRedis

func BootRedis(path string, sock string) (func(), error)

func NewEntry

func NewEntry(score float64, value []byte) *setEntry

func StopRedis

func StopRedis(sock string) error

Types

type BackupInfo

type BackupInfo struct {
	Id        int64
	FileCount int32
	Size      int64
	Timestamp int64
}

type KV

type KV interface {
	Get(key string) ([]byte, error)
	Set(key string, value []byte) error
}

type Queue

type Queue interface {
	Name() string
	Size() uint64

	Add(job *client.Job) error
	Push(priority uint8, data []byte) error

	Pop() ([]byte, error)
	BPop(context.Context) ([]byte, error)
	Clear() (uint64, error)

	Each(func(index int, data []byte) error) error
	Page(start int64, count int64, fn func(index int, data []byte) error) error

	Delete(keys [][]byte) error
}

type Redis

type Redis interface {
	Redis() *redis.Client
}

type SortedEntry

type SortedEntry interface {
	Value() []byte
	Key() ([]byte, error)
	Job() (*client.Job, error)
}

type SortedSet

type SortedSet interface {
	Name() string
	Size() uint64
	Clear() error

	Add(job *client.Job) error
	AddElement(timestamp string, jid string, payload []byte) error

	Get(key []byte) (SortedEntry, error)
	Page(start int, count int, fn func(index int, e SortedEntry) error) (int, error)
	Each(fn func(idx int, e SortedEntry) error) error

	// bool is whether or not the element was actually removed from the sset.
	// the scheduler and other things can be operating on the sset concurrently
	// so we need to be careful about the data changing under us.
	Remove(key []byte) (bool, error)
	RemoveElement(timestamp string, jid string) (bool, error)
	RemoveBefore(timestamp string) ([][]byte, error)

	// Move the given key from this SortedSet to the given
	// SortedSet atomically.  The given func may mutate the payload and
	// return a new tstamp.
	MoveTo(sset SortedSet, entry SortedEntry, newtime time.Time) error
}

type Store

type Store interface {
	Close() error
	Retries() SortedSet
	Scheduled() SortedSet
	Working() SortedSet
	Dead() SortedSet
	GetQueue(string) (Queue, error)
	EachQueue(func(Queue))
	Stats() map[string]string
	EnqueueAll(SortedSet) error
	EnqueueFrom(SortedSet, []byte) error

	History(days int, fn func(day string, procCnt uint64, failCnt uint64)) error
	Success() error
	Failure() error
	TotalProcessed() uint64
	TotalFailures() uint64

	// Clear the database of all job data.
	// Equivalent to Redis's FLUSHDB
	Flush() error

	Raw() KV
	Redis
}

func Open

func Open(dbtype string, path string) (Store, error)

func OpenRedis

func OpenRedis(sock string) (Store, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL