Documentation ¶
Overview ¶
Package flatfs is a Datastore implementation that stores all objects in a two-level directory structure in the local file system, regardless of the hierarchy of the keys.
Package flatfs is a Datastore implementation that stores all objects in a two-level directory structure in the local file system, regardless of the hierarchy of the keys.
Index ¶
- Constants
- Variables
- func Create(path string, fun *ShardIdV1) error
- func Deljl(key string)
- func DirIsEmpty(name string) (bool, error)
- func DowngradeV1toV0(path string) error
- func Jl(key string)
- func Lz4_compress(val []byte) (value []byte)
- func Lz4_decompress(data []byte) (value []byte)
- func Move(oldPath string, newPath string, out io.Writer) error
- func Pr()
- func Snappy_compress(val []byte) (value []byte)
- func Snappy_decompress(data []byte) (value []byte)
- func UpgradeV0toV1(path string, prefixLen int) error
- func WriteReadme(dir string, id *ShardIdV1) error
- func WriteShardFunc(dir string, id *ShardIdV1) error
- func Zip_compress(val []byte) (value []byte)
- func Zip_decompress(data []byte) (value []byte)
- func Zlib_compress(val []byte) (value []byte)
- func Zlib_decompress(data []byte) (value []byte)
- func Zstd_compress(val []byte) (value []byte)
- func Zstd_decompress(data []byte) (value []byte)
- type ConcurrentMap
- func (m ConcurrentMap[K, V]) Clear()
- func (m ConcurrentMap[K, V]) Count() int
- func (m ConcurrentMap[K, V]) Get(key K) (V, bool)
- func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V]
- func (m ConcurrentMap[K, V]) Has(key K) bool
- func (m ConcurrentMap[K, V]) IsEmpty() bool
- func (m ConcurrentMap[K, V]) Items() map[K]V
- func (m ConcurrentMap[K, V]) Iter() <-chan Tuple[K, V]deprecated
- func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V]
- func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V])
- func (m ConcurrentMap[K, V]) Keys() []K
- func (m ConcurrentMap[K, V]) MSet(data map[K]V)
- func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error)
- func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool)
- func (m ConcurrentMap[K, V]) Remove(key K)
- func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool
- func (m ConcurrentMap[K, V]) Set(key K, value V)
- func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool
- func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error)
- func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V)
- type ConcurrentMapShared
- type Datastore
- func (fs *Datastore) Accuracy() string
- func (fs *Datastore) Batch(_ context.Context) (datastore.Batch, error)
- func (fs *Datastore) Close() error
- func (fs *Datastore) Delete(ctx context.Context, key datastore.Key) error
- func (fs *Datastore) DiskUsage(ctx context.Context) (uint64, error)
- func (fs *Datastore) Get(ctx context.Context, key datastore.Key) (value []byte, err error)
- func (fs *Datastore) GetSize(ctx context.Context, key datastore.Key) (size int, err error)
- func (fs *Datastore) Get_writer(dir string, path string) (err error)
- func (fs *Datastore) Has(ctx context.Context, key datastore.Key) (exists bool, err error)
- func (fs *Datastore) Put(ctx context.Context, key datastore.Key, value []byte) error
- func (fs *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error)
- func (fs *Datastore) ShardStr() string
- func (fs *Datastore) Sync(ctx context.Context, prefix datastore.Key) error
- func (fs *Datastore) WriteBlockhotFile(hot map[string]int, doSync bool)
- type IterCb
- type RemoveCb
- type ShardFunc
- type ShardIdV1
- type Stringer
- type Tuple
- type UpsertCb
Constants ¶
const PREFIX = "/repo/flatfs/shard/"
const README_FN = "_README"
const SHARDING_FN = "SHARDING"
const SyncThreadsMax = 16
don't block more than 16 threads on sync opearation 16 should be able to sataurate most RAIDs in case of two used disks per write (RAID 1, 5) and queue depth of 2, 16 concurrent Sync calls should be able to saturate 16 HDDs RAID TODO: benchmark it out, maybe provide tweak parmeter
Variables ¶
var ( // DiskUsageFile is the name of the file to cache the size of the // datastore in disk DiskUsageFile = "diskUsage.cache" // DiskUsageFilesAverage is the maximum number of files per folder // to stat in order to calculate the size of the datastore. // The size of the rest of the files in a folder will be assumed // to be the average of the values obtained. This includes // regular files and directories. DiskUsageFilesAverage = 2000 // DiskUsageCalcTimeout is the maximum time to spend // calculating the DiskUsage upon a start when no // DiskUsageFile is present. // If this period did not suffice to read the size of the datastore, // the remaining sizes will be stimated. DiskUsageCalcTimeout = 5 * time.Minute // RetryDelay is a timeout for a backoff on retrying operations // that fail due to transient errors like too many file descriptors open. RetryDelay = time.Millisecond * 200 // RetryAttempts is the maximum number of retries that will be attempted // before giving up. RetryAttempts = 6 )
var ( ErrDatastoreExists = errors.New("datastore already exists") ErrDatastoreDoesNotExist = errors.New("datastore directory does not exist") ErrShardingFileMissing = fmt.Errorf("%s file not found in datastore", SHARDING_FN) ErrClosed = errors.New("datastore closed") ErrInvalidKey = errors.New("key not supported by flatfs") )
var IPFS_DEF_SHARD = NextToLast(2)
var IPFS_DEF_SHARD_STR = IPFS_DEF_SHARD.String()
var README_IPFS_DEF_SHARD = `` /* 1123-byte string literal not displayed */
var SHARD_COUNT = 32
Functions ¶
func DirIsEmpty ¶
From: http://stackoverflow.com/questions/30697324/how-to-check-if-directory-on-path-is-empty
func DowngradeV1toV0 ¶
func Lz4_decompress ¶
func Snappy_decompress ¶
func UpgradeV0toV1 ¶
func WriteReadme ¶
func WriteShardFunc ¶
func Zip_decompress ¶
func Zlib_decompress ¶
func Zstd_decompress ¶
Types ¶
type ConcurrentMap ¶
type ConcurrentMap[K comparable, V any] struct { // contains filtered or unexported fields }
A "thread" safe map of type string:Anything. To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
func NewStringer ¶
func NewStringer[K Stringer, V any]() ConcurrentMap[K, V]
Creates a new concurrent map.
func NewWithCustomShardingFunction ¶
func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V]
Creates a new concurrent map.
func (ConcurrentMap[K, V]) Clear ¶
func (m ConcurrentMap[K, V]) Clear()
Clear removes all items from map.
func (ConcurrentMap[K, V]) Count ¶
func (m ConcurrentMap[K, V]) Count() int
Count returns the number of elements within the map.
func (ConcurrentMap[K, V]) Get ¶
func (m ConcurrentMap[K, V]) Get(key K) (V, bool)
Get retrieves an element from map under given key.
func (ConcurrentMap[K, V]) GetShard ¶
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V]
GetShard returns shard under given key
func (ConcurrentMap[K, V]) Has ¶
func (m ConcurrentMap[K, V]) Has(key K) bool
Looks up an item under specified key
func (ConcurrentMap[K, V]) IsEmpty ¶
func (m ConcurrentMap[K, V]) IsEmpty() bool
IsEmpty checks if map is empty.
func (ConcurrentMap[K, V]) Items ¶
func (m ConcurrentMap[K, V]) Items() map[K]V
Items returns all items as map[string]V
func (ConcurrentMap[K, V]) Iter
deprecated
func (m ConcurrentMap[K, V]) Iter() <-chan Tuple[K, V]
Iter returns an iterator which could be used in a for range loop.
Deprecated: using IterBuffered() will get a better performence
func (ConcurrentMap[K, V]) IterBuffered ¶
func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V]
IterBuffered returns a buffered iterator which could be used in a for range loop.
func (ConcurrentMap[K, V]) IterCb ¶
func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V])
Callback based iterator, cheapest way to read all elements in a map.
func (ConcurrentMap[K, V]) Keys ¶
func (m ConcurrentMap[K, V]) Keys() []K
Keys returns all keys as []string
func (ConcurrentMap[K, V]) MSet ¶
func (m ConcurrentMap[K, V]) MSet(data map[K]V)
func (ConcurrentMap[K, V]) MarshalJSON ¶
func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error)
Reviles ConcurrentMap "private" variables to json marshal.
func (ConcurrentMap[K, V]) Pop ¶
func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool)
Pop removes an element from the map and returns it
func (ConcurrentMap[K, V]) Remove ¶
func (m ConcurrentMap[K, V]) Remove(key K)
Remove removes an element from the map.
func (ConcurrentMap[K, V]) RemoveCb ¶
func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool
RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params If callback returns true and element exists, it will remove it from the map Returns the value returned by the callback (even if element was not present in the map)
func (ConcurrentMap[K, V]) Set ¶
func (m ConcurrentMap[K, V]) Set(key K, value V)
Sets the given value under the specified key.
func (ConcurrentMap[K, V]) SetIfAbsent ¶
func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool
Sets the given value under the specified key if no value was associated with it.
func (*ConcurrentMap[K, V]) UnmarshalJSON ¶
func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error)
Reverse process of Marshal.
func (ConcurrentMap[K, V]) Upsert ¶
func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V)
Insert or Update - updates existing element or inserts a new one using UpsertCb
type ConcurrentMapShared ¶
type ConcurrentMapShared[K comparable, V any] struct { // contains filtered or unexported fields }
A "thread" safe string to anything map.
type Datastore ¶
type Datastore struct {
// contains filtered or unexported fields
}
Datastore implements the go-datastore Interface. Note this datastore cannot guarantee order of concurrent write operations to the same key. See the explanation in Put(). 数据存储实现go数据存储接口。 \/\/请注意,此数据存储不能保证对同一密钥执行并发\/\/写操作的顺序。请参见\/\/Put()中的说明。
func CreateOrOpen ¶
convenience method
func (*Datastore) Accuracy ¶
Accuracy returns a string representing the accuracy of the DiskUsage() result, the value returned is implementation defined and for informational purposes only
func (*Datastore) Delete ¶
Delete removes a key/value from the Datastore. Please read the Put() explanation about the handling of concurrent write operations to the same key.
func (*Datastore) DiskUsage ¶
DiskUsage implements the PersistentDatastore interface and returns the current disk usage in bytes used by this datastore.
The size is approximative and may slightly differ from the real disk values.
func (*Datastore) Put ¶
Put stores a key/value in the datastore.
Note, that we do not guarantee order of write operations (Put or Delete) to the same key in this datastore.
For example. i.e. in the case of two concurrent Put, we only guarantee that one of them will come through, but cannot assure which one even if one arrived slightly later than the other. In the case of a concurrent Put and a Delete operation, we cannot guarantee which one will win.
type IterCb ¶
type IterCb[K comparable, V any] func(key K, v V)
Iterator callbacalled for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards
type RemoveCb ¶
RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held If returns true, the element will be removed from the map
type ShardIdV1 ¶
type ShardIdV1 struct {
// contains filtered or unexported fields
}
func NextToLast ¶
func ParseShardFunc ¶
func ReadShardFunc ¶
type Stringer ¶
type Stringer interface { fmt.Stringer comparable }
type Tuple ¶
type Tuple[K comparable, V any] struct { Key K Val V }
Used by the Iter & IterBuffered functions to wrap two variables together over a channel,