store

package
v0.21.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2023 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const ErrUnchanged errUnchanged = 0

ErrUnchanged is an error that indicates that a requested change was not necessary.

View Source
const (
	// HistoryKey is the key where we store the set of keys belonging to a
	// particular deployment. The key should be scoped to a deployment. For
	// example:
	//
	//     /app/collatz/deployment/a47e1a97/key_history
	//     /app/todo/deployment/fd578a20/key_history
	HistoryKey = "key_history"
)

Variables

View Source
var File_internal_store_store_proto protoreflect.FileDescriptor
View Source
var Unchanged = errors.New("unchanged versioned get")

Unchanged is returned when a versioned Get is cancelled prematurely. A store may cancel a versioned Get, for example, as a form of admission control if the store has too many hanging gets.

Functions

func AddToSet

func AddToSet(ctx context.Context, store Store, name string, element string) error

AddToSet adds an element to the set name in the store.

func AppKey

func AppKey(app string, key string) string

AppKey returns keys in the format "/app/collatz/key", where "collatz" is the application name.

func DeploymentKey

func DeploymentKey(cfg *config.GKEConfig, key string) string

DeploymentKey returns keys in the format "/app/collatz/deployment/123/key", where "collatz" is the application name and 123 is the deployment id.

func GlobalKey

func GlobalKey(key string) string

GlobalKey returns keys in the format "/key".

func ReplicaSetKey added in v0.2.0

func ReplicaSetKey(cfg *config.GKEConfig, replicaSet string, key string) string

ReplicaSetKey returns keys in the format "/app/collatz/deployment/123/replica_set/OddEven/key", where "collatz" is the application name, 123 is the deployment id, and OddEven is the Kubernetes ReplicaSet name.

func Sequence

func Sequence(ctx context.Context, store Store, key, value string) (int, error)

Sequence orders a value into a sequence of unique values and returns its index in the sequence. The first unique value is sequenced with index 0, the next with index 1, and so on.

Example
must := func(x int, err error) int {
	if err != nil {
		panic(err)
	}
	return x
}

ctx := context.Background()
store := NewFakeStore()
fmt.Println(must(Sequence(ctx, store, "seq", "a")))
fmt.Println(must(Sequence(ctx, store, "seq", "b")))
fmt.Println(must(Sequence(ctx, store, "seq", "c")))
fmt.Println(must(Sequence(ctx, store, "seq", "a")))
fmt.Println(must(Sequence(ctx, store, "seq", "a")))
fmt.Println(must(Sequence(ctx, store, "seq", "d")))
Output:

0
1
2
0
0
3

Types

type FakeStore

type FakeStore struct {
	// contains filtered or unexported fields
}

FakeStore is a simple, in-memory, map-backed implementation of a Store. FakeStore is a fake test double that is meant for testing.

func NewFakeStore

func NewFakeStore() *FakeStore

func (*FakeStore) Delete

func (f *FakeStore) Delete(_ context.Context, key string) error

func (*FakeStore) Get

func (f *FakeStore) Get(ctx context.Context, key string, version *Version) (string, *Version, error)

func (*FakeStore) List

func (f *FakeStore) List(_ context.Context) ([]string, error)

func (*FakeStore) Put

func (f *FakeStore) Put(_ context.Context, key, value string, version *Version) (*Version, error)

func (*FakeStore) Writes

func (f *FakeStore) Writes() int

Writes returns the number of modifications made to the store so far.

type SQLStore

type SQLStore struct {
	// contains filtered or unexported fields
}

SQLStore is a sqlite-backed implementation of a Store. All of the data in a SQLStore is stored locally in a single sqlite file. SQLStore's data is stored in two tables: (1) a next_version table that stores a global, monotonically increasing integer-valued version, and (2) a data table that stores keys along with their values and versions.

For example, if we execute the following code,

store, err := NewSQLStore("/tmp/weaver.db")
ctx := context.Background()
_, err = store.Put(ctx, "eggs", "green", nil)
_, err = store.Put(ctx, "ham", "green", nil)
_, err = store.Put(ctx, "sam", "unhappy", nil)
_, err = store.Put(ctx, "cat", "hatted", nil)
_, err = store.Put(ctx, "eggs", "scrambled", nil)
err = store.Delete(ctx, "ham")

then the database stored in in "/tmp/weaver.db" would look something like this:

next_version   data
+---------+    +---------+--------------+----------+
| version |    | key     | value        | version  |
+---------+    +---------+--------------+----------+
|       5 |    | "eggs"  | "scrambled"  |        4 |
+---------+    | "sam"   | "unhappy"    |        2 |
               | "cat"   | "hatted"     |        3 |
               +---------+--------------+----------+

All operations on the database are performed as part of strictly serializable transactions.

TODO(mwhittaker): This implementation is not the most efficient. We can use some more advanced SQL to make it faster, if needed. For now, it's written to be as simple as possible.

func (*SQLStore) Delete

func (s *SQLStore) Delete(ctx context.Context, key string) error

func (*SQLStore) Get

func (s *SQLStore) Get(ctx context.Context, key string, version *Version) (string, *Version, error)

func (*SQLStore) List

func (s *SQLStore) List(ctx context.Context) ([]string, error)

func (*SQLStore) Put

func (s *SQLStore) Put(ctx context.Context, key, value string, version *Version) (*Version, error)

type SequenceProto

type SequenceProto struct {

	// sequence maps a value to its index in the sequence. For example, if we
	// sequenced "a", "b", and "c" in order, then "a" maps to 0, "b" maps to 1,
	// and "c" maps to 2.
	Sequence map[string]int64 `` /* 158-byte string literal not displayed */
	// contains filtered or unexported fields
}

SequenceProto contains a sequence of unique values.

func (*SequenceProto) Descriptor deprecated

func (*SequenceProto) Descriptor() ([]byte, []int)

Deprecated: Use SequenceProto.ProtoReflect.Descriptor instead.

func (*SequenceProto) GetSequence

func (x *SequenceProto) GetSequence() map[string]int64

func (*SequenceProto) ProtoMessage

func (*SequenceProto) ProtoMessage()

func (*SequenceProto) ProtoReflect

func (x *SequenceProto) ProtoReflect() protoreflect.Message

func (*SequenceProto) Reset

func (x *SequenceProto) Reset()

func (*SequenceProto) String

func (x *SequenceProto) String() string

type SetProto

type SetProto struct {
	Elements [][]byte `protobuf:"bytes,1,rep,name=elements,proto3" json:"elements,omitempty"`
	// contains filtered or unexported fields
}

SetProto contains the set of elements stored under a key in the store.

func (*SetProto) Descriptor deprecated

func (*SetProto) Descriptor() ([]byte, []int)

Deprecated: Use SetProto.ProtoReflect.Descriptor instead.

func (*SetProto) GetElements

func (x *SetProto) GetElements() [][]byte

func (*SetProto) ProtoMessage

func (*SetProto) ProtoMessage()

func (*SetProto) ProtoReflect

func (x *SetProto) ProtoReflect() protoreflect.Message

func (*SetProto) Reset

func (x *SetProto) Reset()

func (*SetProto) String

func (x *SetProto) String() string

type Stale

type Stale struct {
	// contains filtered or unexported fields
}

Stale is the error returned when a versioned Put fails because of a stale version. You can check to see if an error is Stale using errors.Is:

    value, version, err := store.Get(ctx, "key", nil)
    version, err = store.Put(ctx, "key", "value", version)
    if errors.Is(err, Stale{}) {
	       // Do something with err.
    }

func NewStale

func NewStale(stale Version, newer *Version) Stale

NewStale returns a new Stale error with stale (the stale version the user expected to see) and newer (the newer version that was encountered). If the newer version is unknown, nil can be passed for newer.

func (Stale) Error

func (b Stale) Error() string

Error implements the error interface.

func (Stale) Is

func (b Stale) Is(target error) bool

Is returns true iff target is a Stale error.

type Store

type Store interface {
	// Put writes a value to a key.
	//
	// If version is nil, then the Put is blind (i.e. the write is performed
	// without any version checks). If version is not nil, then the Put
	// succeeds only if the latest version of the key is equal to the provided
	// version. In either case, if the Put succeeds (i.e. the returned error is
	// nil), then the returned version is the version of the value that was
	// just written.
	//
	// If a versioned Put fails because the provided version is stale, then a
	// Stale error is returned.
	Put(ctx context.Context, key, value string, version *Version) (*Version, error)

	// Get gets the value and version associated with a key.
	//
	// If version is nil, then Get simply gets the latest value of the key,
	// along with its version. This is called an unversioned Get. Note that if
	// the key is not present in the store, the returned value is empty, the
	// returned version is Missing, and the returned error is nil.
	//
	// If version is not nil, called an unversioned Get, then Get blocks until
	// the latest value of the key is newer than the provided version. Once it
	// is, Get returns the value and its version. More precisely, calling
	// store.Get(ctx, key, version) is equivalent to running the following
	// code:
	//
	//     for {
	//         // Note the nil. This is an unversioned Get.
	//         value, latest, err := store.Get(ctx, key, nil)
	//         if err != nil {
	//		       return "", Version{}, err
	//         }
	//         if *version != *latest {
	//             return value, latest, nil
	//         }
	//     }
	//
	// That is, performing a versioned Get is logically equivalent to
	// repeatedly performing a unversioned Get until the returned version is
	// different (and therefore newer because a Store is linearizable). Note
	// that this code demonstrates the semantics of a versioned Get, but is
	// clearly inefficient. Actual implementations of a versioned Get preserve
	// these semantics but are significantly more efficient.
	//
	// Also beware that the version provided to Get and the version returned by
	// Get may not be contiguous. The key may have taken on different values
	// with different versions between the two.
	//
	// A store is permitted to prematurely cancel a versioned Get (e.g., if the
	// store has too many hanging gets). In this case, an error that wraps
	// Unchanged is returned.
	Get(ctx context.Context, key string, version *Version) (string, *Version, error)

	// Delete deletes a value from the store. Unlike Puts, Deletes are always
	// unversioned. Deleting a key that is already missing is a noop and
	// doesn't return an error.
	Delete(ctx context.Context, key string) error

	// List returns the set of keys currently stored in the store. Keys can be
	// returned in any order, and every key appears exactly once in the
	// returned list. For example, store.List(ctx) can return ["a", "b"] or
	// ["b", "a"] but not ["a", "a", "b"]. List is linearizable.
	//
	// Unlike Put, Get, and Delete which are core Store operations, List should
	// only be used for debugging and introspection.
	//
	// TODO(mwhittaker): sanjay@ pointed out that List requires all keys to be
	// present in memory. One alternative he suggested is to provide
	// start/limit keys and constrain the returned keys to this range.
	//
	// TODO(mwhittaker): Implementing List in a linearizable way is
	// challenging. Because List is only for debugging, we may want to
	// guarantee a weaker form of consistency to make it easier to implement.
	List(ctx context.Context) ([]string, error)
}

A Store is a handle to a versioned, linearizable key-value store. When a value is put in the store, the store automatically assigns the value a version.

There are two types of Puts: versioned and unversioned. An unversioned Put, e.g. Put(ctx, key, value, nil), is a blind put that can be used to unilaterally create or update a key. A versioned Put, e.g. Put(ctx, key, value, version), succeeds only if the latest version of the key is equal to the provided version. Versioned Puts can be used to avoid lost updates, a phenomenon when multiple clients concurrently write to the same key, and all but one of the updates is overwritten. Versioned Puts can be viewed as a form of optimistic concurrency control or a form of compare-and-swap.

A special Missing version represents a value that is not present in the store. For example, a Put(ctx, key, value, Missing) writes a value to a key, but only if the key does not exist in the store. Versioned Puts with the Missing version can be used to implement consensus. When multiple clients concurrently perform the Put, only one will succeed.

There are also versioned and unversioned Gets. An unversioned Get, e.g., Get(ctx, key, nil), gets the latest value of a key, along with its version. A versioned Get on the other hand, e.g. Get(ctx, key, version), blocks until the latest version of the key is newer than the provided version. Once it is, the value is returned. Versioned Gets allow you to watch the value of a key and get notified when it changes.

Deletes, unlike Puts and Gets, are always unversioned.

Example usage:

// Get, Put, Put, Put, and Delete.
value, version, err := store.Get(ctx, "key", nil)
version, err = store.Put(ctx, "key", "new value", version)
version, err = store.Put(ctx, "key", "newer value", version)
version, err = store.Put(ctx, "key", "newest value", version)
err = store.Delete(ctx, "key")

Example usage:

// Create if not exists.
version, err := store.Put(ctx, "key", "this create succeeds", &Missing)
version, err := store.Put(ctx, "key", "this one doesn't", &Missing)

Example usage:

// Blind Puts.
_, err := store.Put(ctx, "eggs", "green", nil)
_, err = store.Put(ctx, "ham", "green", nil)
_, err = store.Put(ctx, "sam", "not happy", nil)

Example usage:

// Watch.
value, version, err := store.Get(ctx, "key", nil)
for {
    value, version, err = store.Get(ctx, "key", version)
    // Do something with value...
}

Example usage:

// Stale write.
value, version, err := store.Get(ctx, "key", nil)
_, err = store.Put(ctx, "key", "this put succeeds", version)
_, err = store.Put(ctx, "key", "this one doesn't", version)

An implementation of the Store interface should be thread-safe. You should be able to use a single Store from multiple goroutines.

func NewSQLStore

func NewSQLStore(dbfile string) (Store, error)

NewSQLStore returns a new SQLStore with all of its data backed in a sqlite database stored in dbfile. Either dbfile already exists or it doesn't:

  • If dbfile does not exist, then it is created, but the directory in which dbfile is stored (i.e. filepath.Dir(dbfile)) must exist.
  • If dbfile does exist, then the store's data is loaded from it. The file must have been previously created by calling NewSQLStore.

func WithMetrics

func WithMetrics(service string, id string, store Store) Store

WithMetrics returns the provided store, but with methods augmented to update the appropriate store rtmetrics.

type StoreTester

type StoreTester struct {
	Make func(t *testing.T) Store
}

StoreTester is a test suite for Store implementations. Imagine you have a struct MyStore that implements the Store interface. In a file like mystore_test.go, you can use the following code to test that MyStore correctly implements Store:

func TestMyStore(t *testing.T) {
    StoreTester{func(*testing.T) Store { return NewMyStore() }}.Run(t)
}

A StoreTester is constructed with a factory function Make that returns a new instance of a Store every time it is invoked. Make is passed a *testing.T. If in the process of constructing a new Store, an error is encountered, you can call testing.T.Fatal. For example:

    func TestMyStore(t *testing.T) {
        maker := func(t *testing.T) Store {
		       store, err := TryToMakeNewMyStore()
		       if err != nil {
		           t.Fatal(err)
		       }
		       return store
        }
        StoreTester{maker}.Run(t)
    }

func (StoreTester) Run

func (s StoreTester) Run(t *testing.T)

func (StoreTester) TestBasicOps

func (s StoreTester) TestBasicOps(t *testing.T)

func (StoreTester) TestBlindPuts

func (s StoreTester) TestBlindPuts(t *testing.T)

func (StoreTester) TestBlockedVersionedGet

func (s StoreTester) TestBlockedVersionedGet(t *testing.T)

TestBlockedVersionedGet tests that a blocked versioned get respects the context passed into it. We start a versioned get that watches a missing key that is never updated. This get blocks and should only return when its context has expired.

func (StoreTester) TestBytes

func (s StoreTester) TestBytes(t *testing.T)

func (StoreTester) TestConcurrentCreates

func (s StoreTester) TestConcurrentCreates(t *testing.T)

TestConcurrentCreates spawns n goroutines numbered 0, 1, ..., n-1. The goroutines race to reach consensus on the value of a key, with goroutine i proposing that value i be chosen. This test makes sure that every goroutine reaches consensus on the same value. It also checks that Store implementations are thread-safe.

func (StoreTester) TestConcurrentIncrements

func (s StoreTester) TestConcurrentIncrements(t *testing.T)

TestConcurrentIncrements initializes a key with value 0. It then spawns n goroutines that attempt to read, increment, and update the value. The goroutines loop until they successfully update the value. After all goroutines have terminated, we check to make sure the value of the key is n. This test checks that Store implementations are thread-safe.

func (StoreTester) TestConcurrentVersionedGets

func (s StoreTester) TestConcurrentVersionedGets(t *testing.T)

TestConcurrentVersionedGets spawns a number of getter goroutines and a single putter goroutine. The putter creates, updates, and deletes a key, while the getters watch the changes to the key. The getters and putter coordinate using a ready channel to operate in lockstep. This prevents the putter from writing too quickly and avoids the getters skipping updates. This test tests that multiple goroutines can simultaneously watch the same key without any problems.

func (StoreTester) TestCreateConflict

func (s StoreTester) TestCreateConflict(t *testing.T)

func (StoreTester) TestDeleteMissing

func (s StoreTester) TestDeleteMissing(t *testing.T)

func (StoreTester) TestGetMissing

func (s StoreTester) TestGetMissing(t *testing.T)

func (StoreTester) TestList

func (s StoreTester) TestList(t *testing.T)

func (StoreTester) TestPutChain

func (s StoreTester) TestPutChain(t *testing.T)

func (StoreTester) TestPutConflict

func (s StoreTester) TestPutConflict(t *testing.T)

func (StoreTester) TestVersionedGetCreated

func (s StoreTester) TestVersionedGetCreated(t *testing.T)

TestVersionedGetCreated spawns two goroutines: one getter and one putter. The getter performs a versioned Get on a missing key with version Missing, and the putter creates the key. This test tests that versioned Gets properly notice the creation of a key.

func (StoreTester) TestVersionedGetDeleted

func (s StoreTester) TestVersionedGetDeleted(t *testing.T)

TestVersionedGetDeleted spawns two goroutines: one getter and one putter. The getter performs a versioned Get on a key, and the putter deletes the key. This test tests that versioned Gets properly notice the deletion of a key.

func (StoreTester) TestVersionedGetMissing

func (s StoreTester) TestVersionedGetMissing(t *testing.T)

TestVersionedGetMissing tests that a stale versioned get on a value that's currently missing returns the value immediately as missing.

func (StoreTester) TestVersionedGetUpdated

func (s StoreTester) TestVersionedGetUpdated(t *testing.T)

TestVersionedGetUpdated spawns two goroutines: one getter and one putter. The putter repeatedly puts a value to an existing key, and then sends the version it put to the getter, waiting for the getter to receive the version. The getter repeatedly issues a versioned Get to get a value, receives a version from the putter, and then compares the gotten version to the version it received from the putter. This test tests that versioned Gets properly notice updates.

type Version

type Version struct {
	Opaque string
}

Version is the version associated with a value in the store. Versions are opaque entities and should not be inspected or interpreted. Versions can be compared for equality (e.g., version == Missing), but otherwise do not satisfy any properties. For example, they shouldn't be compared with one another using an inequality (e.g., version1 < version2). Versions may also not be unique across keys. Versions should only ever be constructed by calling Get and should only ever be used by passing them to Put.

var Missing Version = Version{"__tombstone__"}

Missing is the version associated with a value that does not exist in the store.

func GetProto

func GetProto(ctx context.Context, store Store, key string, value proto.Message,
	version *Version) (*Version, error)

GetProto gets a value from the store and then calls proto.Unmarshal on it. If the value is missing, value is left untouched. See Store.Get for details.

func GetSet

func GetSet(ctx context.Context, store Store, name string, version *Version) (
	[]string, *Version, error)

GetSet returns the elements of a set name from the store.

Notably, if version is nil, then the Get is a non-blocking Get, and if version is not nil, then the Get is a blocking watch.

func PutProto

func PutProto(ctx context.Context, store Store, key string,
	value proto.Message, version *Version) (*Version, error)

PutProto calls proto.Marshal on a value and then puts it into the underlying store. See Store.Put for details.

func UpdateProto

func UpdateProto(ctx context.Context, store Store, key string, obj proto.Message, edit func(version *Version) error) (*Version, error)

UpdateProto atomically applies the read-modify-write operation edit, retrying if there are collisions. You can think of edit as a very basic type of transaction. For example, we can atomically increment a counter up to a limit of 10 like this:

var counter &Counter{Val: 0}
increment := func(*Version) error {
	if counter.Val >= 10 {
		return ErrUnchanged
	}
	counter.Val++
	return nil
}
version, err := UpdateProto(ctx, store, key, &counter, increment)
fmt.Println(counter.Val) // The incremented value of the counter.

Specifically, UpdateProto will repeatedly perform the following:

  1. Get the current value and version of key and unmarshal it into the provided proto. If key is currently missing, nothing is unmarshalled, and the version is Missing.
  2. Execute edit, which may update the provided proto. If edit returns a non-nil error, then UpdateProto returns immediately with the error.
  3. Otherwise, UpdateProto performs a versioned Put to atomically write the new value of the proto. If the write succeeds, UpdateProto returns the new version. If the write failed because of a stale version, UpdateProto retries, starting again at Step 1. If the write fails with some other error, apply aborts and returns the error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL