objectstorage

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2023 License: Apache-2.0, BSD-2-Clause Imports: 18 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

View Source
var LeakDetection = struct {
	WrapCachedObject                 func(cachedObject *CachedObjectImpl, skipCallerFrames int) CachedObject
	ReportCachedObjectClosedTooOften func(wrappedCachedObject LeakDetectionWrapper, secondCallStack *reflect.CallStack)
	MonitorCachedObjectReleased      func(wrappedCachedObject LeakDetectionWrapper, options *LeakDetectionOptions)
	RegisterCachedObjectRetained     func(wrappedCachedObject LeakDetectionWrapper, options *LeakDetectionOptions)
	RegisterCachedObjectReleased     func(wrappedCachedObject LeakDetectionWrapper, options *LeakDetectionOptions)
}{
	WrapCachedObject:                 wrapCachedObject,
	ReportCachedObjectClosedTooOften: reportCachedObjectClosedTooOften,
	MonitorCachedObjectReleased:      monitorCachedObjectReleased,
	RegisterCachedObjectRetained:     registerCachedObjectRetained,
	RegisterCachedObjectReleased:     registerCachedObjectReleased,
}

Functions

This section is empty.

Types

type CachedObject

type CachedObject interface {
	Key() []byte
	Exists() bool
	Get() (result StorableObject)
	Consume(consumer func(StorableObject), forceRelease ...bool) bool
	Retain() CachedObject

	Release(force ...bool)
	Transaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject
	RTransaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject

	kvstore.BatchWriteObject
	// contains filtered or unexported methods
}

type CachedObjectImpl

type CachedObjectImpl struct {
	// contains filtered or unexported fields
}

func NewEmptyCachedObject

func NewEmptyCachedObject(key []byte) (result *CachedObjectImpl)

Creates an "empty" CachedObjectImpl, that is not part of any ObjectStorage.

Sometimes, we want to be able to offer a "filtered view" on the ObjectStorage and therefore be able to return an "empty" value on load operations even if the underlying object exists (i.e. the value tangle on top of the normal tangle only returns value transactions in its load operations).

func (*CachedObjectImpl) BatchWrite

func (cachedObject *CachedObjectImpl) BatchWrite(batchedMuts kvstore.BatchedMutations)

BatchWrite checks if the cachedObject should be persisted. If all checks pass, the cachedObject is marshaled and added to the BatchedMutations. Do not call this method for objects that should not be persisted.

func (*CachedObjectImpl) BatchWriteDone

func (cachedObject *CachedObjectImpl) BatchWriteDone()

BatchWriteDone is called after the cachedObject was persisted. It releases the cachedObject from the cache if no consumers are left and it was not modified in the meantime.

func (*CachedObjectImpl) BatchWriteScheduled

func (cachedObject *CachedObjectImpl) BatchWriteScheduled() bool

BatchWriteScheduled returns true if the cachedObject is already scheduled for a BatchWrite operation.

func (*CachedObjectImpl) Consume

func (cachedObject *CachedObjectImpl) Consume(consumer func(StorableObject), forceRelease ...bool) bool

Directly consumes the StorableObject. This method automatically Release()s the object when the callback is done. Returns true if the callback was called.

func (*CachedObjectImpl) Exists

func (cachedObject *CachedObjectImpl) Exists() bool

Exists returns true if the StorableObject in this container does exist (could be found in the database and was not marked as deleted).

func (*CachedObjectImpl) Get

func (cachedObject *CachedObjectImpl) Get() (result StorableObject)

Retrieves the StorableObject, that is cached in this container.

func (*CachedObjectImpl) Key

func (cachedObject *CachedObjectImpl) Key() []byte

Key returns the object storage key that is used to address the object.

func (*CachedObjectImpl) RTransaction

func (cachedObject *CachedObjectImpl) RTransaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject

RTransaction is a synchronization primitive that executes the callback together with other RTransactions but never together with a normal Transaction.

The identifiers allow to define the scope of the RTransaction. RTransactions with different scopes can run at the same time independently of other RTransactions and act as if they are secured by different mutexes.

It is also possible to provide multiple identifiers and the callback waits until all of them can be acquired at the same time. In contrast to normal mutexes where acquiring multiple locks can lead to deadlocks, this method is deadlock safe.

Note: It is the equivalent of a mutex.RLock/RUnlock.

func (*CachedObjectImpl) Release

func (cachedObject *CachedObjectImpl) Release(force ...bool)

Releases the object, to be picked up by the persistence layer (as soon as all consumers are done).

func (*CachedObjectImpl) ResetBatchWriteScheduled

func (cachedObject *CachedObjectImpl) ResetBatchWriteScheduled()

ResetBatchWriteScheduled resets the flag that the cachedObject is scheduled for a BatchWrite operation.

func (*CachedObjectImpl) Retain

func (cachedObject *CachedObjectImpl) Retain() CachedObject

Registers a new consumer for this cached object.

func (*CachedObjectImpl) Transaction

func (cachedObject *CachedObjectImpl) Transaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject

Transaction is a synchronization primitive that executes the callback atomically which means that if multiple Transactions are being started from different goroutines, then only one of them can run at the same time.

The identifiers allow to define the scope of the Transaction. Transactions with different scopes can run at the same time and act as if they are secured by different mutexes.

It is also possible to provide multiple identifiers and the callback waits until all of them can be acquired at the same time. In contrast to normal mutexes where acquiring multiple locks can lead to deadlocks, this method is deadlock safe.

Note: It is the equivalent of a mutex.Lock/Unlock.

type CachedObjects

type CachedObjects []CachedObject

func (CachedObjects) Release

func (cachedObjects CachedObjects) Release(force ...bool)

type ConsumerFunc

type ConsumerFunc = func(key []byte, cachedObject *CachedObjectImpl) bool

type Events

type Events struct {
	ObjectEvicted *event.Event2[[]byte, StorableObject]
}

type Factory

type Factory struct {
	// contains filtered or unexported fields
}

Factory is a utility that offers an api for a more compact creation of multiple ObjectStorage instances from within the same package. It will automatically configure a new KVStore instance with the corresponding realm and provide it to the created ObjectStorage instances.

func NewFactory

func NewFactory(store kvstore.KVStore, packagePrefix byte) *Factory

NewFactory creates a new Factory with the given ObjectStorage parameters.

func (*Factory) New

func (factory *Factory) New(storagePrefix byte, objectFactory StorableObjectFactory, optionalOptions ...Option) *ObjectStorage

New creates a new ObjectStorage with the given parameters. It combines the store specific prefix with the package prefix, to create a unique realm for the KVStore of the ObjectStorage.

type IteratorOption

type IteratorOption func(opts *IteratorOptions)

IteratorOption is a function setting an iterator option.

func WithIteratorMaxIterations

func WithIteratorMaxIterations(maxIterations int) IteratorOption

WithIteratorMaxIterations is used to stop the iteration after a certain amount of iterations. 0 disables the limit.

func WithIteratorPrefix

func WithIteratorPrefix(prefix []byte) IteratorOption

WithIteratorPrefix is used to iterate a subset of elements with a defined prefix.

func WithIteratorSkipCache

func WithIteratorSkipCache(skipCache bool) IteratorOption

WithIteratorSkipCache is used to skip the elements in the cache.

func WithIteratorSkipStorage

func WithIteratorSkipStorage(skipStorage bool) IteratorOption

WithIteratorSkipStorage is used to skip the elements in the storage.

type IteratorOptions

type IteratorOptions struct {
	// contains filtered or unexported fields
}

IteratorOptions define options for iterations in the object storage.

type LeakDetectionOptions

type LeakDetectionOptions struct {
	MaxConsumersPerObject int
	MaxConsumerHoldTime   time.Duration
}

type LeakDetectionWrapper

type LeakDetectionWrapper interface {
	CachedObject

	Base() *CachedObjectImpl
	GetInternalID() int64
	SetRetainCallStack(callStack *reflect.CallStack)
	GetRetainCallStack() *reflect.CallStack
	GetRetainTime() time.Time
	SetReleaseCallStack(callStack *reflect.CallStack)
	GetReleaseCallStack() *reflect.CallStack
	WasReleased() bool
}

type LeakDetectionWrapperImpl

type LeakDetectionWrapperImpl struct {
	*CachedObjectImpl
	// contains filtered or unexported fields
}

func (*LeakDetectionWrapperImpl) Base

func (wrappedCachedObject *LeakDetectionWrapperImpl) Base() *CachedObjectImpl

func (*LeakDetectionWrapperImpl) Consume

func (wrappedCachedObject *LeakDetectionWrapperImpl) Consume(consumer func(StorableObject), forceRelease ...bool) bool

func (*LeakDetectionWrapperImpl) GetInternalID

func (wrappedCachedObject *LeakDetectionWrapperImpl) GetInternalID() int64

func (*LeakDetectionWrapperImpl) GetReleaseCallStack

func (wrappedCachedObject *LeakDetectionWrapperImpl) GetReleaseCallStack() *reflect.CallStack

func (*LeakDetectionWrapperImpl) GetRetainCallStack

func (wrappedCachedObject *LeakDetectionWrapperImpl) GetRetainCallStack() *reflect.CallStack

func (*LeakDetectionWrapperImpl) GetRetainTime

func (wrappedCachedObject *LeakDetectionWrapperImpl) GetRetainTime() time.Time

func (*LeakDetectionWrapperImpl) RTransaction

func (wrappedCachedObject *LeakDetectionWrapperImpl) RTransaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject

RTransaction is a synchronization primitive that executes the callback together with other RTransactions but never together with a normal Transaction.

The identifiers allow to define the scope of the RTransaction. RTransactions with different scopes can run at the same time independently of other RTransactions and act as if they are secured by different mutexes.

It is also possible to provide multiple identifiers and the callback waits until all of them can be acquired at the same time. In contrast to normal mutexes where acquiring multiple locks can lead to deadlocks, this method is deadlock safe.

Note: It is the equivalent of a mutex.RLock/RUnlock.

func (*LeakDetectionWrapperImpl) Release

func (wrappedCachedObject *LeakDetectionWrapperImpl) Release(force ...bool)

func (*LeakDetectionWrapperImpl) Retain

func (wrappedCachedObject *LeakDetectionWrapperImpl) Retain() CachedObject

func (*LeakDetectionWrapperImpl) SetReleaseCallStack

func (wrappedCachedObject *LeakDetectionWrapperImpl) SetReleaseCallStack(releaseCallStack *reflect.CallStack)

func (*LeakDetectionWrapperImpl) SetRetainCallStack

func (wrappedCachedObject *LeakDetectionWrapperImpl) SetRetainCallStack(retainCallStack *reflect.CallStack)

func (*LeakDetectionWrapperImpl) Transaction

func (wrappedCachedObject *LeakDetectionWrapperImpl) Transaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject

Transaction is a synchronization primitive that executes the callback atomically which means that if multiple Transactions are being started from different goroutines, then only one of them can run at the same time.

The identifiers allow to define the scope of the Transaction. Transactions with different scopes can run at the same time and act as if they are secured by different mutexes.

It is also possible to provide multiple identifiers and the callback waits until all of them can be acquired at the same time. In contrast to normal mutexes where acquiring multiple locks can lead to deadlocks, this method is deadlock safe.

Note: It is the equivalent of a mutex.Lock/Unlock.

func (*LeakDetectionWrapperImpl) WasReleased

func (wrappedCachedObject *LeakDetectionWrapperImpl) WasReleased() bool

type ObjectStorage

type ObjectStorage struct {
	Events Events
	// contains filtered or unexported fields
}

ObjectStorage is a manual cache which keeps objects as long as consumers are using it.

func New

func New(store kvstore.KVStore, objectFactory StorableObjectFactory, optionalOptions ...Option) *ObjectStorage

New is the constructor for the ObjectStorage.

func (*ObjectStorage) ComputeIfAbsent

func (objectStorage *ObjectStorage) ComputeIfAbsent(key []byte, remappingFunction func(key []byte) StorableObject) CachedObject

func (*ObjectStorage) Contains

func (objectStorage *ObjectStorage) Contains(key []byte, options ...ReadOption) (result bool)

func (*ObjectStorage) Delete

func (objectStorage *ObjectStorage) Delete(key []byte)

Performs a "blind delete", where we do not check the objects existence. blindDelete is used to delete without accessing the value log.

func (*ObjectStorage) DeleteEntriesFromStore

func (objectStorage *ObjectStorage) DeleteEntriesFromStore(keys [][]byte) error

DeleteEntriesFromStore deletes entries from the persistence layer.

func (*ObjectStorage) DeleteEntryFromStore

func (objectStorage *ObjectStorage) DeleteEntryFromStore(key []byte)

DeleteEntryFromStore deletes an entry from the persistence layer.

func (*ObjectStorage) DeleteIfPresent

func (objectStorage *ObjectStorage) DeleteIfPresent(key []byte) bool

This method deletes an element and return true if the element was deleted.

func (*ObjectStorage) DeleteIfPresentAndReturn

func (objectStorage *ObjectStorage) DeleteIfPresentAndReturn(key []byte) StorableObject

DeleteIfPresentAndReturn deletes an element and returns it. If the element does not exist then the return value is nil.

func (*ObjectStorage) Flush

func (objectStorage *ObjectStorage) Flush()

func (*ObjectStorage) ForEach

func (objectStorage *ObjectStorage) ForEach(consumer func(key []byte, cachedObject CachedObject) bool, options ...IteratorOption)

ForEach calls the consumer function on every object residing within the cache and the underlying persistence layer.

func (*ObjectStorage) ForEachKeyOnly

func (objectStorage *ObjectStorage) ForEachKeyOnly(consumer func(key []byte) bool, options ...IteratorOption)

ForEachKeyOnly calls the consumer function on every storage key residing within the cache and the underlying persistence layer.

func (*ObjectStorage) FreeMemory

func (objectStorage *ObjectStorage) FreeMemory()

FreeMemory copies the content of the internal maps to newly created maps. This is necessary, otherwise the GC is not able to free the memory used by the old maps. "delete" doesn't shrink the maximum memory used by the map, since it only marks the entry as deleted.

func (*ObjectStorage) Get

func (objectStorage *ObjectStorage) Get(key []byte) CachedObject

func (*ObjectStorage) GetSize

func (objectStorage *ObjectStorage) GetSize() int

func (*ObjectStorage) Load

func (objectStorage *ObjectStorage) Load(key []byte) CachedObject

func (*ObjectStorage) LoadObjectFromStore

func (objectStorage *ObjectStorage) LoadObjectFromStore(key []byte) StorableObject

LoadObjectFromStore loads a storable object from the persistence layer.

func (*ObjectStorage) ObjectExistsInStore

func (objectStorage *ObjectStorage) ObjectExistsInStore(key []byte) bool

func (*ObjectStorage) Prune

func (objectStorage *ObjectStorage) Prune() error

func (*ObjectStorage) Put

func (objectStorage *ObjectStorage) Put(object StorableObject) CachedObject

func (*ObjectStorage) ReleaseExecutor

func (objectStorage *ObjectStorage) ReleaseExecutor() (releaseExecutor *timed.Executor)

ReleaseExecutor returns the executor that schedules releases of CachedObjects after the configured CacheTime.

func (*ObjectStorage) Shutdown

func (objectStorage *ObjectStorage) Shutdown()

func (*ObjectStorage) Store

func (objectStorage *ObjectStorage) Store(object StorableObject) CachedObject

func (*ObjectStorage) StoreIfAbsent

func (objectStorage *ObjectStorage) StoreIfAbsent(object StorableObject) (CachedObject, bool)

Stores an object only if it was not stored before. In contrast to "ComputeIfAbsent", this method does not access the value log. If the object was not stored, then the returned CachedObject is nil and does not need to be Released.

type Option

type Option func(*Options)

func CacheTime

func CacheTime(duration time.Duration) Option

func KeysOnly

func KeysOnly(keysOnly bool) Option

func LeakDetectionEnabled

func LeakDetectionEnabled(leakDetectionEnabled bool, options ...LeakDetectionOptions) Option

func LogAccess

func LogAccess(fileName string, commandsFilter ...debug.Command) Option

LogAccess sets up a logger that logs all calls to the underlying store in the given file. It is possible to filter the logged commands by providing an optional filter flag.

func OnEvictionCallback

func OnEvictionCallback(cb func(cachedObject CachedObject)) Option

OnEvictionCallback sets a function that is called on eviction of the object.

func OverrideLeakDetectionWrapper

func OverrideLeakDetectionWrapper(wrapperFunc func(cachedObject *CachedObjectImpl) LeakDetectionWrapper) Option

func PartitionKey

func PartitionKey(keyPartitions ...int) Option

func PersistenceEnabled

func PersistenceEnabled(persistenceEnabled bool) Option

func ReleaseExecutorWorkerCount

func ReleaseExecutorWorkerCount(releaseExecutorWorkerCount int) Option

ReleaseExecutorWorkerCount sets the number of workers that execute the scheduled eviction of the objects in parallel (whenever they become due).

func StoreOnCreation

func StoreOnCreation(store bool) Option

StoreOnCreation writes an object directly to the persistence layer on creation.

type Options

type Options struct {
	// contains filtered or unexported fields
}

type PartitionsManager

type PartitionsManager struct {
	// contains filtered or unexported fields
}

func NewPartitionsManager

func NewPartitionsManager() *PartitionsManager

func (*PartitionsManager) FreeMemory

func (partitionsManager *PartitionsManager) FreeMemory()

FreeMemory copies the content of the internal maps to newly created maps. This is necessary, otherwise the GC is not able to free the memory used by the old maps. "delete" doesn't shrink the maximum memory used by the map, since it only marks the entry as deleted.

func (*PartitionsManager) IsEmpty

func (partitionsManager *PartitionsManager) IsEmpty() bool

func (*PartitionsManager) IsRetained

func (partitionsManager *PartitionsManager) IsRetained(keys []string) bool

func (*PartitionsManager) Release

func (partitionsManager *PartitionsManager) Release(keysToRelease []string) bool

func (*PartitionsManager) Retain

func (partitionsManager *PartitionsManager) Retain(keysToRetain []string)

type ReadOption

type ReadOption func(opts *ReadOptions)

ReadOption is a function setting a read option.

func WithReadSkipCache

func WithReadSkipCache(skipCache bool) ReadOption

WithReadSkipCache is used to skip the elements in the cache.

func WithReadSkipStorage

func WithReadSkipStorage(skipStorage bool) ReadOption

WithReadSkipStorage is used to skip the elements in the storage.

type ReadOptions

type ReadOptions struct {
	// contains filtered or unexported fields
}

ReadOptions define options for Contains calls in the object storage.

type StorableObject

type StorableObject interface {
	// Marks the object as modified, which causes it to be written to the disk (if persistence is enabled).
	// Returns the former state of the boolean.
	SetModified(modified ...bool) (wasSet bool)

	// Returns true if the object was marked as modified.
	IsModified() bool

	// Marks the object to be deleted from the persistence layer.
	// Returns the former state of the boolean.
	//nolint:predeclared // lets keep this for now
	Delete(delete ...bool) (wasSet bool)

	// Returns true if the object was marked as deleted.
	IsDeleted() bool

	// Enables or disables persistence for this object. Objects that have persistence disabled get discarded once they
	// are evicted from the cache.
	// Returns the former state of the boolean.
	Persist(enabled ...bool) (wasSet bool)

	// Returns "true" if this object is going to be persisted.
	ShouldPersist() bool

	// ObjectStorageKey returns the bytes, that are used as a key to store the object in the k/v store.
	ObjectStorageKey() []byte

	// ObjectStorageValue returns the bytes, that are stored in the value part of the k/v store.
	ObjectStorageValue() []byte
}

type StorableObjectFactory

type StorableObjectFactory func(key []byte, data []byte) (result StorableObject, err error)

StorableObjectFactory is used to address the factory method that generically creates StorableObjects. It receives the key and the serialized data of the object and returns an "empty" StorableObject that just has its key set. The object is then fully unmarshaled by the ObjectStorage which calls the UnmarshalObjectStorageValue with the data. The data is anyway provided in this method already to allow the dynamic creation of different object types depending on the stored data.

type StorableObjectFlags

type StorableObjectFlags struct {
	// contains filtered or unexported fields
}

func (*StorableObjectFlags) Delete

func (of *StorableObjectFlags) Delete(delete ...bool) (wasSet bool)

func (*StorableObjectFlags) IsDeleted

func (of *StorableObjectFlags) IsDeleted() bool

func (*StorableObjectFlags) IsModified

func (of *StorableObjectFlags) IsModified() bool

func (*StorableObjectFlags) Persist

func (of *StorableObjectFlags) Persist(persist ...bool) (wasSet bool)

func (*StorableObjectFlags) SetModified

func (of *StorableObjectFlags) SetModified(modified ...bool) (wasSet bool)

func (*StorableObjectFlags) ShouldPersist

func (of *StorableObjectFlags) ShouldPersist() bool

ShouldPersist returns "true" if this object is going to be persisted.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL