storage

package
v0.0.0-...-623d1b0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2024 License: Apache-2.0 Imports: 11 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEventChannelClosed = errors.New("event channel closed")
	ErrObjectDeleted      = errors.New("object deleted")
)
View Source
var (
	ErrNotFound      = status.Error(codes.NotFound, "not found")
	ErrAlreadyExists = status.Error(codes.AlreadyExists, "already exists")
	ErrConflict      = lo.Must(status.New(codes.Aborted, "conflict").WithDetails(ErrDetailsConflict)).Err()
)
View Source
var (
	ErrDetailsConflict      = &errdetails.ErrorInfo{Reason: "CONFLICT"}
	ErrDetailsDiscontinuity = &errdetails.ErrorInfo{Reason: "DISCONTINUITY"}
)

Functions

func GetStoreBuilder

func GetStoreBuilder[T ~string](name T) func(...any) (any, error)

func IsAlreadyExists

func IsAlreadyExists(err error) bool

Use this instead of errors.Is(err, ErrAlreadyExists). The implementation of Is() for grpc status errors compares the error message, which can result in false negatives.

func IsConflict

func IsConflict(err error) bool

Use this instead of errors.Is(err, ErrConflict). The status code is too generic to identify conflict errors, so there are additional details added to conflict errors to disambiguate them.

func IsDiscontinuity

func IsDiscontinuity(err error) bool

func IsNotFound

func IsNotFound(err error) bool

Use this instead of errors.Is(err, ErrNotFound). The implementation of Is() for grpc status errors compares the error message, which can result in false negatives.

func NewWatchContext

func NewWatchContext[T any](
	base context.Context,
	eventC <-chan WatchEvent[T],
) context.Context

Returns a context that listens on a watch event channel and closes its Done channel when the object is deleted. This context should have exclusive read access to the event channel to avoid missing events.

func RegisterStoreBuilder

func RegisterStoreBuilder[T ~string](name T, builder func(...any) (any, error))

Types

type DeleteOpt

type DeleteOpt interface{ ApplyDeleteOption(*DeleteOptions) }

type DeleteOptions

type DeleteOptions struct {
	// Delete only if the latest Revision matches
	Revision *int64
}

func (*DeleteOptions) Apply

func (o *DeleteOptions) Apply(opts ...DeleteOpt)

type GetOpt

type GetOpt interface{ ApplyGetOption(*GetOptions) }

type GetOptions

type GetOptions struct {
	// If set, will return the config at the specified revision instead of
	// the current config.
	Revision *int64

	// If non-nil, will be set to the current revision of the key after the Get
	// operation completes successfully. If an error occurs, no changes
	// will be made to the value.
	RevisionOut *int64
}

func (*GetOptions) Apply

func (o *GetOptions) Apply(opts ...GetOpt)

type HistoryOpt

type HistoryOpt interface{ ApplyHistoryOption(*HistoryOptions) }

type HistoryOptions

type HistoryOptions struct {
	// Specifies the latest modification revision to include in the returned
	// history. The history will contain all revisions of the key, starting at
	// the most recent creation revision, and ending at either the specified
	// revision, or the most recent modification revision of the key. If the
	// specified revision is before the latest creation revision, and the
	// key has multiple creation revisions (due to a delete and re-create),
	// then the history will instead start at the most recent creation
	// revision that is <= the specified revision.
	Revision *int64
	// Include the values in the response, not just the metadata. This could
	// have performance implications, so use with caution.
	IncludeValues bool
}

func (*HistoryOptions) Apply

func (o *HistoryOptions) Apply(opts ...HistoryOpt)

type IncludeValuesOpt

type IncludeValuesOpt bool

func IncludeValues

func IncludeValues(include bool) IncludeValuesOpt

IncludeValues can be used for HistoryOptions.

func (IncludeValuesOpt) ApplyHistoryOption

func (i IncludeValuesOpt) ApplyHistoryOption(opts *HistoryOptions)

type KeyRevision

type KeyRevision[T any] interface {
	Key() string
	SetKey(string)

	// If values were requested, returns the value at this revision. Otherwise,
	// returns the zero value for T.
	// Note that if the value has a revision field, it will *not*
	// be populated, and should be set manually if needed using the Revision()
	// method.
	Value() T
	// Returns the revision of this key. Larger values are newer, but the
	// revision number should otherwise be treated as an opaque value.
	Revision() int64
	// Returns the timestamp of this revision. This may or may not always be
	// available, depending on if the underlying store supports it.
	Timestamp() time.Time
}

type KeyRevisionImpl

type KeyRevisionImpl[T any] struct {
	K    string
	V    T
	Rev  int64
	Time time.Time
}

func (*KeyRevisionImpl[T]) Key

func (k *KeyRevisionImpl[T]) Key() string

func (*KeyRevisionImpl[T]) Revision

func (k *KeyRevisionImpl[T]) Revision() int64

func (*KeyRevisionImpl[T]) SetKey

func (k *KeyRevisionImpl[T]) SetKey(key string)

func (*KeyRevisionImpl[T]) Timestamp

func (k *KeyRevisionImpl[T]) Timestamp() time.Time

func (*KeyRevisionImpl[T]) Value

func (k *KeyRevisionImpl[T]) Value() T

type KeyValueStore

type KeyValueStore = KeyValueStoreT[[]byte]

type KeyValueStoreBroker

type KeyValueStoreBroker interface {
	KeyValueStore(namespace string) KeyValueStore
}

type KeyValueStoreT

type KeyValueStoreT[T any] interface {
	Put(ctx context.Context, key string, value T, opts ...PutOpt) error
	Get(ctx context.Context, key string, opts ...GetOpt) (T, error)

	// Starts a watch on the specified key. The returned channel will receive
	// events for the key until the context is canceled, after which the
	// channel will be closed. This function does not block. An error will only
	// be returned if the key is invalid or the watch fails to start.
	//
	// When the watch is started, the current value of the key will be sent
	// if and only if both of the following conditions are met:
	// 1. A revision is explicitly set in the watch options. If no revision is
	//    specified, only future events will be sent. Revision 0 is equivalent
	//    to the oldest revision among all keys matching the prefix, not
	//    including deleted keys.
	// 2. The key exists; or in prefix mode, there is at least one key matching
	//    the prefix.
	//
	// In most cases a starting revision should be specified, as this will
	// ensure no events are missed.
	//
	// This function can be called multiple times for the same key, prefix, or
	// overlapping prefixes. Each call will initiate a separate watch, and events
	// are always replicated to all active watches.
	//
	// The channels are buffered to hold at least 64 events. Ensure that events
	// are read from the channel in a timely manner if a large volume of events
	// are expected; otherwise it will block and events may be delayed, or be
	// dropped by the backend.
	Watch(ctx context.Context, key string, opts ...WatchOpt) (<-chan WatchEvent[KeyRevision[T]], error)
	Delete(ctx context.Context, key string, opts ...DeleteOpt) error
	ListKeys(ctx context.Context, prefix string, opts ...ListOpt) ([]string, error)
	History(ctx context.Context, key string, opts ...HistoryOpt) ([]KeyRevision[T], error)
}

type KeyValueStoreTBroker

type KeyValueStoreTBroker[T any] interface {
	KeyValueStore(namespace string) KeyValueStoreT[T]
}

type LimitOpt

type LimitOpt int64

func WithLimit

func WithLimit(limit int64) LimitOpt

WithLimit can be used for ListKeysOptions or HistoryOptions.

func (LimitOpt) ApplyListOption

func (l LimitOpt) ApplyListOption(opts *ListKeysOptions)

type ListKeysOptions

type ListKeysOptions struct {
	// Maximum number of keys to return
	Limit *int64
}

func (*ListKeysOptions) Apply

func (o *ListKeysOptions) Apply(opts ...ListOpt)

type ListOpt

type ListOpt interface{ ApplyListOption(*ListKeysOptions) }

type Lock

type Lock interface {
	// Lock acquires a lock on the key. If the lock is already held, it will block until the lock is acquired or
	// the context fails.
	// Lock returns an error if the context expires or an unrecoverable error occurs when trying to acquire the lock.
	Lock(ctx context.Context) (expired chan struct{}, err error)

	// TryLock tries to acquire the lock on the key and reports whether it succeeded.
	// It blocks until at least one attempt was made to acquired the lock, and returns acquired=false and no error
	// if the lock is known to be held by someone else
	TryLock(ctx context.Context) (acquired bool, expired chan struct{}, err error)

	// Unlock releases the lock on the key in a non-blocking fashion.
	// It spawns a goroutine that will perform the unlock mechanism until it succeeds or the the lock is
	// expired by the server.
	// It immediately signals to the lock's original expired channel that the lock is released.
	Unlock() error
}

Lock is a distributed lock that can be used to coordinate access to a resource or interest in such a resource. Locks follow the following liveliness & atomicity guarantees to prevent distributed deadlocks and guarantee atomicity in the critical section.

Liveliness A : A lock is always eventually released when the process holding it crashes or exits unexpectedly. Liveliness B : A lock is always eventually released when its backend store is unavailable. Atomicity A : No two processes or threads can hold the same lock at the same time. Atomicity B : Any call to unlock will always eventually release the lock

type PrefixOpt

type PrefixOpt bool

func (PrefixOpt) ApplyWatchOption

func (p PrefixOpt) ApplyWatchOption(opts *WatchOptions)

type PutOpt

type PutOpt interface{ ApplyPutOption(*PutOptions) }

type PutOptions

type PutOptions struct {
	// Put only if the latest Revision matches
	Revision *int64

	// If non-nil, will be set to the updated revision of the key after the Put
	// operation completes successfully. If an error occurs, no changes
	// will be made to the value.
	RevisionOut *int64
}

func (*PutOptions) Apply

func (o *PutOptions) Apply(opts ...PutOpt)

type RevisionOpt

type RevisionOpt int64

func WithRevision

func WithRevision(rev int64) RevisionOpt

WithRevision can be used for GetOptions, PutOptions, WatchOptions, or DeleteOptions

func (RevisionOpt) ApplyDeleteOption

func (r RevisionOpt) ApplyDeleteOption(opts *DeleteOptions)

func (RevisionOpt) ApplyGetOption

func (r RevisionOpt) ApplyGetOption(opts *GetOptions)

func (RevisionOpt) ApplyHistoryOption

func (r RevisionOpt) ApplyHistoryOption(opts *HistoryOptions)

func (RevisionOpt) ApplyPutOption

func (r RevisionOpt) ApplyPutOption(opts *PutOptions)

func (RevisionOpt) ApplyWatchOption

func (r RevisionOpt) ApplyWatchOption(opts *WatchOptions)

type RevisionOutOpt

type RevisionOutOpt struct {
	// contains filtered or unexported fields
}

func WithRevisionOut

func WithRevisionOut(out *int64) RevisionOutOpt

WithRevisionOut can be used for GetOptions or PutOptions.

func (RevisionOutOpt) ApplyGetOption

func (r RevisionOutOpt) ApplyGetOption(opts *GetOptions)

func (RevisionOutOpt) ApplyPutOption

func (r RevisionOutOpt) ApplyPutOption(opts *PutOptions)

type ValueStoreT

type ValueStoreT[T any] interface {
	Put(ctx context.Context, value T, opts ...PutOpt) error
	Get(ctx context.Context, opts ...GetOpt) (T, error)
	Watch(ctx context.Context, opts ...WatchOpt) (<-chan WatchEvent[KeyRevision[T]], error)
	Delete(ctx context.Context, opts ...DeleteOpt) error
	History(ctx context.Context, opts ...HistoryOpt) ([]KeyRevision[T], error)
}

type WatchEvent

type WatchEvent[T any] struct {
	EventType WatchEventType
	Current   T
	Previous  T
}

type WatchEventType

type WatchEventType string
const (
	// An operation that creates a new key OR modifies an existing key.
	//
	// NB: The Watch API does not distinguish between create and modify events.
	// It is not practical (nor desired, in most cases) to provide this info
	// to the caller, because it cannot be guaranteed to be accurate in all cases.
	// Because of the inability to make this guarantee, any client code that
	// relies on this distinction would be highly likely to end up in an invalid
	// state after a sufficient amount of time, or after issuing a watch request
	// on a key that has a complex and/or truncated history. However, in certain
	// cases, clients may be able to correlate events with out-of-band information
	// to reliably disambiguate Put events. This is necessarily an implementation
	// detail and may not always be possible.
	WatchEventPut WatchEventType = "Put"

	// An operation that removes an existing key.
	//
	// Delete events make few guarantees, as different backends handle deletes
	// differently. Backends are not required to discard revision history, or
	// to stop sending events for a key after it has been deleted. Keys may
	// be recreated after a delete event, in which case a Put event will follow.
	// Such events may or may not contain a previous revision value, depending
	// on implementation details of the backend (they will always contain a
	// current revision value, though).
	WatchEventDelete WatchEventType = "Delete"
)

type WatchOpt

type WatchOpt interface{ ApplyWatchOption(*WatchOptions) }

func WithPrefix

func WithPrefix() WatchOpt

type WatchOptions

type WatchOptions struct {
	// Starting revision for the watch. If not specified, will start at the
	// latest revision.
	Revision *int64

	// If true, all keys under the same prefix will be watched.
	// When prefix mode is disabled (default), events will only be sent for
	// the single key specified in the request.
	// If used in combination with the revision option, will effectively
	// "replay" the history of all keys under the prefix starting at the
	// specified revision.
	// Care should be taken when using this option, especially in combination
	// with a past revision, as it could cause performance issues.
	Prefix bool
}

func (*WatchOptions) Apply

func (o *WatchOptions) Apply(opts ...WatchOpt)

Directories

Path Synopsis
drivers
crds Module
etcd Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL