kv

package
v0.0.27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2021 License: Apache-2.0 Imports: 17 Imported by: 0

README

Proposed design for JetStream based KV services

This is a proposal to design KV services ontop of JetStream, this requires a KV client to be written, the basic features are:

  • Multiple named buckets full of keys with n historical values kept per value
  • Put and Get string(k)=string(v) values
  • Deleting a key by adding a delete operation to the key, preserving history
  • Per key TTL
  • Watching a specific key or the entire bucket for live updates
  • Encoders and Decoders that transforms both keys and values
  • A read-cache that builds an in-memory cache for fast reads
  • Ability to read from regional read replicas maintained by JS Mirroring (planned)
  • read-after-write safety unless read replicas used
  • Valid keys are \A[-/_a-zA-Z0-9]+\z after encoding
  • Valid buckets are ^[a-zA-Z0-9_-]+$
  • Custom Stream Names and Stream ingest subjects to cater for different domains, mirrors and imports
  • Key starting with _kv is reserved for internal use

This is an implementation of this concept focussed on the CLI, nats.go and others will have to build language specific interfaces focussed on performance and end user.

Design

Storage

Given a bucket CONFIGURATION we will have:

  • A stream called KV_CONFIGURATION with subjects $KV.CONFIGURATION.*
  • The stream has Max Messages Per Subject limit set to history with optional placement, R and max age for TTL
  • Getting a value uses the new JSApiMsgGetRequest feature to get the last message for a subject
  • We store headers as per the table below
Headers
Header Description
KV-Origin-Cluster The cluster where the client was connected to that created the value
KV-Operation If this is a delete operation then DEL else blank meaning PUT
Watchers

Watchers can either be per key or per the entire bucket.

For watching a key we simply send key updates over the watch, starting with the latest value or nothing. We will only send the last result for a subject - NumPending==0.

For watching the bucket we will send a nil if the bucket is empty, else every result even historical ones, deletes should be handled via the Operation() value.

Read Replicas

A read replicas a mirror stream from the primary stream. The KV client is configured to do its reads against the named bucket but all writes go to the main bucket for the KV based on above naming.

This will inevitably result in breaking the read-after-write promises and should be made clear to clients.

Local read caches to be build from the primary bucket not the replica unless this is a RO KV client then from the given stream.

To assist with configuring and discovery of replicas I think once we implement them we should add some values into the bucket:

_kv/replicas: [bob_sfo, bob_lon]
_kv/replicas/cluster/sfo/read: bob_sfo
_kv/replicas/cluster/lon/read: bob_lon

So a kv client in sfo can quickly discover if there is a Read replica in his cluster by doing a single lookup.

The _kv/replicas is there to help tooling also manage all replicas when acting on the main. Maintaining this list is tricky as there can be a race, but we just have to confirm read after write. Eventually there will be a per subject aware Nats-Expected-Last-Sequence which we can use to update the list only if the list has not changed since reading it.

Local Cache

A local cache wraps the JetStream storage with one that passes most calls to the JetStream one but will store results from Get() in a local cache to serve later.

It will also start a Bucket Watch that pro-actively builds and maintains a cache of the entire bucket locally in memory. The cache will only be used when ready - meaning the watch has delivered the last message in the watched bucket to avoid serving stale data. Later we might detect stale cache states and go out of ready state proactively

Documentation

Index

Constants

View Source
const (
	// PutOperation represents a PUT of data into the kv bucket
	PutOperation Operation = "PUT"

	// DeleteOperation represents a delete of a specific key in a bucket
	DeleteOperation Operation = "DEL"

	// ValidKeyPattern is a regular expression key should match, post encoding, to be valid
	ValidKeyPattern = `\A[-/_=\.a-zA-Z0-9]+\z`

	// ValidBucketPattern is a regular expression bucket names should match
	ValidBucketPattern = `\A[a-zA-Z0-9_-]+\z`

	// ReservedKeyPrefix is a prefix for keys reserved for internal use
	ReservedKeyPrefix = "_kv"
)
View Source
const (
	// DefaultTimeout is the default timeout used when waiting for the backend, override using WithTimeout()
	DefaultTimeout = 2 * time.Second

	// DefaultHistory is how many historical values are kept per key
	DefaultHistory uint64 = 1

	// DefaultMaxBucketSize maximum size for the entire bucket, -1 for unlimited
	DefaultMaxBucketSize int64 = -1

	// DefaultMaxValueSize maximum size for individual values
	DefaultMaxValueSize int32 = -1
)

Variables

View Source
var (
	// ErrUnknownKey is returned when a key does not exist
	ErrUnknownKey = errors.New("unknown key")

	// ErrInvalidKey is returned for keys that do not match ValidKeyPattern
	ErrInvalidKey = errors.New("invalid key")

	// ErrInvalidBucketName is returned when trying to access buckets that do not match ValidBucketPattern
	ErrInvalidBucketName = errors.New("invalid bucket name")

	// ErrUnknownBucket is returned when a bucket could not be found
	ErrUnknownBucket = errors.New("unknown bucket")
)

Functions

func IsReservedKey

func IsReservedKey(key string) bool

IsReservedKey determines if key is a reserved key

func IsValidBucket

func IsValidBucket(bucket string) bool

IsValidBucket determines if bucket is a valid bucket name

func IsValidKey

func IsValidKey(key string) bool

IsValidKey determines if key is a valid key

Types

type Codec

type Codec interface {
	Encoder
	Decoder
}

Codec encodes/decodes values using Encoders and Decoders

type Decoder

type Decoder interface {
	Decode(value []byte) ([]byte, error)
}

Decoder decodes values before saving

type Encoder

type Encoder interface {
	Encode(value []byte) ([]byte, error)
}

Encoder encodes values before saving

type Entry deprecated

type Entry interface {
	// Bucket is the bucket the data was loaded from
	Bucket() string
	// Key is the key that was retrieved
	Key() string
	// Value is the retrieved value
	Value() []byte
	// Created is the time the data was received in the bucket
	Created() time.Time
	// Sequence is a unique sequence for this value
	Sequence() uint64
	// Delta is distance from the latest value. If history is enabled this is effectively the index of the historical value, 0 for latest, 1 for most recent etc.
	Delta() uint64
	// Operation is the kind of operation this result represents
	Operation() Operation
}

Deprecated: this is now deprecated, please use the KV feature in nats.go

type GenericEntry

type GenericEntry struct {
	Bucket    string `json:"bucket"`
	Key       string `json:"key"`
	Val       []byte `json:"val"`
	Created   int64  `json:"created"`
	Seq       uint64 `json:"seq"`
	Operation string `json:"operation"`
}

GenericEntry is a generic, non implementation specific, representation of a Entry

type KV deprecated

type KV interface {
	// Put saves a value into a key
	Put(key string, val []byte, opts ...PutOption) (seq uint64, err error)

	// Delete marks the key as deleted, history is retained subject to configured history limit
	Delete(key string) error

	// Purge marks the key as deleted and removes history, after this operation 1 historic value is kept - the purge
	Purge(key string) error

	// Destroy removes the entire bucket and all data, KV cannot be used after
	Destroy() error

	RoKV
}

KV is a read-write interface to a single key-value store bucket

Deprecated: this is now deprecated, please use the KV feature in nats.go

func NewBucket deprecated

func NewBucket(nc *nats.Conn, bucket string, opts ...Option) (KV, error)

NewBucket creates or load a bucket. If the bucket already exist the existing bucket configuration is not reconciled

Deprecated: this is now deprecated, please use the KV feature in nats.go

func NewClient deprecated

func NewClient(nc *nats.Conn, bucket string, opts ...Option) (KV, error)

NewClient creates a new read-write client

Deprecated: this is now deprecated, please use the KV feature in nats.go

type Logger

type Logger interface {
	Debugf(format string, a ...interface{})
	Infof(format string, a ...interface{})
	Warnf(format string, a ...interface{})
	Errorf(format string, a ...interface{})
}

Logger is a custom logger

type Operation

type Operation string

Operation is the type of action taken on a key

type Option

type Option func(o *options) error

Option configures the KV client

func WithCodec

func WithCodec(c Codec) Option

WithCodec sets a value encode/decoder, multiple codecs can be set and will be called in order, programs that read and write values can set this to do bi-directional encoding and decoding

func WithDecoder

func WithDecoder(d Decoder) Option

WithDecoder sets a value decoder, multiple decoders can be set and will be called in order, programs that just read values can use this to avoid the configuring encoders

func WithEncoder

func WithEncoder(e Encoder) Option

WithEncoder sets a value encoder, multiple encoders can be set and will be called in order, programs that just write values can use this to avoid the configuring decoders

func WithHistory

func WithHistory(h uint64) Option

WithHistory sets the number of historic values to keep for a key

func WithLocalCache

func WithLocalCache() Option

WithLocalCache creates a local in-memory cache of the entire bucket that's kept up to date in real time using a watch

func WithLogger

func WithLogger(log Logger) Option

WithLogger sets a logger to use, STDOUT logging otherwise

func WithMaxBucketSize

func WithMaxBucketSize(s int64) Option

WithMaxBucketSize limits the entire bucket to a specific size

func WithMaxValueSize

func WithMaxValueSize(s int32) Option

WithMaxValueSize is the biggest size value that can be placed in the bucket including some header overhead

func WithMirroredBucket

func WithMirroredBucket(b string) Option

WithMirroredBucket creates a read replica that mirrors a specified bucket

func WithPlacementCluster

func WithPlacementCluster(c string) Option

WithPlacementCluster places the bucket in a specific cluster

func WithReplicas

func WithReplicas(r uint) Option

WithReplicas sets the number of replicas to keep for a bucket

func WithStreamName

func WithStreamName(n string) Option

WithStreamName overrides the usual stream name that is formed as KV_<BUCKET>

func WithStreamSubjectPrefix

func WithStreamSubjectPrefix(p string) Option

WithStreamSubjectPrefix overrides the usual stream subject changing the `kv.*.*` to `<prefix>.*.*`

func WithTTL

func WithTTL(ttl time.Duration) Option

WithTTL sets the maximum time a value will be kept in the bucket

func WithTimeout

func WithTimeout(t time.Duration) Option

WithTimeout sets the timeout for calls to the storage layer

type PutOption

type PutOption func(o *putOptions)

PutOption is a option passed to put, reserved for future work like put only if last value had sequence x

func OnlyIfLastValueSequence

func OnlyIfLastValueSequence(seq uint64) PutOption

OnlyIfLastValueSequence the put will only succeed if the last set value for the key had this sequence

type RoKV deprecated

type RoKV interface {
	// Get gets a key from the store
	Get(key string) (Entry, error)

	// History retrieves historic values for a key
	History(ctx context.Context, key string) ([]Entry, error)

	// Watch a key for updates, the same Entry might be delivered more than once, a nil entry means end of available data was reached
	Watch(ctx context.Context, key string) (Watch, error)

	// Close releases in-memory resources held by the KV, called automatically if the context used to create it is canceled
	Close() error

	// Status retrieves the status of the bucket
	Status() (Status, error)
}

RoKV is a read-only interface to a single key-value store bucket

Deprecated: this is now deprecated, please use the KV feature in nats.go

func NewRoClient deprecated

func NewRoClient(nc *nats.Conn, bucket string, opts ...Option) (RoKV, error)

NewRoClient creates a read only key value store.

Deprecated: this is now deprecated, please use the KV feature in nats.go

type Status

type Status interface {
	// Bucket the name of the bucket
	Bucket() string

	// Values is how many messages are in the bucket, including historical values
	Values() uint64

	// History returns the configured history kept per key
	History() int64

	// TTL is how long the bucket keeps values for
	TTL() time.Duration

	// BucketLocation returns the name of the cluster holding the read replica of the data
	BucketLocation() string

	// Replicas returns how many times data in the bucket is replicated at storage
	Replicas() (ok int, failed int)

	// Keys returns a list of all keys in the bucket - not possible now
	Keys() ([]string, error)

	// BackingStore is a backend specific name for the underlying storage - eg. stream name
	BackingStore() string

	// MirrorStatus is the status of a read replica, error when not accessing a replica
	MirrorStatus() (lag int64, active bool, err error)

	// MaxBucketSize is the configured maximum size of the bucket in bytes
	MaxBucketSize() int64

	// MaxValueSize is the configured maximum size of a single value in bytes
	MaxValueSize() int32
}

type Storage deprecated

type Storage interface {
	KV

	Bucket() string
	BucketSubject() string
	CreateBucket() error
}

Deprecated: this is now deprecated, please use the KV feature in nats.go

type Watch deprecated

type Watch interface {
	// Channel returns a channel to read changes from
	Channel() chan Entry

	// Close must be called to dispose of resources, called if the context used to create the watch is canceled
	Close() error
}

Watch observes a bucket and report any changes via NextValue or Channel

Deprecated: this is now deprecated, please use the KV feature in nats.go

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL