storage

package
v1.0.2-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2021 License: GPL-3.0, GPL-3.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ChunkProcessors       = 8
	DefaultBranches int64 = 128
)
View Source
const (
	DataChunk = 0
	TreeChunk = 1
)
View Source
const (
	BMTHash  = "BMT"
	SHA3Hash = "SHA3" // http://golang.org/pkg/hash/#Hash
)

Variables

View Source
var ZeroKey = Key(common.Hash{}.Bytes())

Functions

func BytesToU64

func BytesToU64(data []byte) uint64

func IsZeroKey

func IsZeroKey(key Key) bool

func NewDpaChunkStore

func NewDpaChunkStore(localStore, netStore ChunkStore) *dpaChunkStore

func U64ToBytes

func U64ToBytes(val uint64) []byte

Types

type Chunk

type Chunk struct {
	Key    Key            // always
	SData  []byte         // nil if request, to be supplied by dpa
	Size   int64          // size of the data covered by the subtree encoded in this chunk
	Source Peer           // peer
	C      chan bool      // to signal data delivery by the dpa
	Req    *RequestStatus // request Status needed by netStore
	// contains filtered or unexported fields
}

Chunk also serves as a request object passed to ChunkStores in case it is a retrieval request, Data is nil and Size is 0 Note that Size is not the size of the data chunk, which is Data.Size() but the size of the subtree encoded in the chunk 0 if request, to be supplied by the dpa

func NewChunk

func NewChunk(key Key, rs *RequestStatus) *Chunk

func (*Chunk) String

func (self *Chunk) String() string

String() for pretty printing

type ChunkStore

type ChunkStore interface {
	Put(*Chunk) // effectively there is no error even if there is an error
	Get(Key) (*Chunk, error)
	Close()
}

The ChunkStore interface is implemented by :

- MemStore: a memory cache - DbStore: local disk/db store - LocalStore: a combination (sequence of) memStore and dbStore - NetStore: cloud storage abstraction layer - DPA: local requests for swarm storage and retrieval

type Chunker

type Chunker interface {
	Joiner
	Splitter
}

type ChunkerParams

type ChunkerParams struct {
	Branches int64
	Hash     string
}

func NewChunkerParams

func NewChunkerParams() *ChunkerParams

type CloudStore

type CloudStore interface {
	Store(*Chunk)
	Deliver(*Chunk)
	Retrieve(*Chunk)
}

backend engine for cloud store It can be aggregate dispatching to several parallel implementations: bzz/network/forwarder. forwarder or IPFS or IPΞS

type DPA

type DPA struct {
	ChunkStore

	Chunker Chunker
	// contains filtered or unexported fields
}

func NewDPA

func NewDPA(store ChunkStore, params *ChunkerParams) *DPA

func NewLocalDPA

func NewLocalDPA(datadir string) (*DPA, error)

for testing locally

func (*DPA) Retrieve

func (self *DPA) Retrieve(key Key) LazySectionReader

Public API. Main entry point for document retrieval directly. Used by the FS-aware API and httpaccess Chunk retrieval blocks on netStore requests with a timeout so reader will report error if retrieval of chunks within requested range time out.

func (*DPA) Start

func (self *DPA) Start()

func (*DPA) Stop

func (self *DPA) Stop()

func (*DPA) Store

func (self *DPA) Store(data io.Reader, size int64, swg *sync.WaitGroup, wwg *sync.WaitGroup) (key Key, err error)

Public API. Main entry point for document storage directly. Used by the FS-aware API and httpaccess

type DbStore

type DbStore struct {
	// contains filtered or unexported fields
}

func NewDbStore

func NewDbStore(path string, hash SwarmHasher, capacity uint64, radius int) (s *DbStore, err error)

func (*DbStore) Cleanup

func (s *DbStore) Cleanup()

func (*DbStore) Close

func (s *DbStore) Close()

func (*DbStore) Counter

func (s *DbStore) Counter() uint64

func (*DbStore) Export

func (s *DbStore) Export(out io.Writer) (int64, error)

Export writes all chunks from the store to a tar archive, returning the number of chunks written.

func (*DbStore) Get

func (s *DbStore) Get(key Key) (chunk *Chunk, err error)

func (*DbStore) Import

func (s *DbStore) Import(in io.Reader) (int64, error)

Import reads chunks into the store from a tar archive, returning the number of chunks read.

func (*DbStore) NewSyncIterator

func (self *DbStore) NewSyncIterator(state DbSyncState) (si *dbSyncIterator, err error)

initialises a sync iterator from a syncToken (passed in with the handshake)

func (*DbStore) Put

func (s *DbStore) Put(chunk *Chunk)

type DbSyncState

type DbSyncState struct {
	Start, Stop Key
	First, Last uint64
}
describes a section of the DbStore representing the unsynced

domain relevant to a peer Start - Stop designate a continuous area Keys in an address space typically the addresses closer to us than to the peer but not closer another closer peer in between From - To designates a time interval typically from the last disconnect till the latest connection (real time traffic is relayed)

type HashWithLength

type HashWithLength struct {
	hash.Hash
}

func (*HashWithLength) ResetWithLength

func (self *HashWithLength) ResetWithLength(length []byte)

type Hasher

type Hasher func() hash.Hash

type Joiner

type Joiner interface {
	/*
	   Join reconstructs original content based on a root key.
	   When joining, the caller gets returned a Lazy SectionReader, which is
	   seekable and implements on-demand fetching of chunks as and where it is read.
	   New chunks to retrieve are coming to caller via the Chunk channel, which the caller provides.
	   If an error is encountered during joining, it appears as a reader error.
	   The SectionReader.
	   As a result, partial reads from a document are possible even if other parts
	   are corrupt or lost.
	   The chunks are not meant to be validated by the chunker when joining. This
	   is because it is left to the DPA to decide which sources are trusted.
	*/
	Join(key Key, chunkC chan *Chunk) LazySectionReader
}

type Key

type Key []byte

func (Key) Hex

func (key Key) Hex() string

func (Key) Log

func (key Key) Log() string

func (Key) MarshalJSON

func (key Key) MarshalJSON() (out []byte, err error)

func (Key) Size

func (x Key) Size() uint

func (Key) String

func (key Key) String() string

func (*Key) UnmarshalJSON

func (key *Key) UnmarshalJSON(value []byte) error

type LDBDatabase

type LDBDatabase struct {
	// contains filtered or unexported fields
}

func NewLDBDatabase

func NewLDBDatabase(file string) (*LDBDatabase, error)

func (*LDBDatabase) Close

func (self *LDBDatabase) Close()

func (*LDBDatabase) Delete

func (self *LDBDatabase) Delete(key []byte) error

func (*LDBDatabase) Get

func (self *LDBDatabase) Get(key []byte) ([]byte, error)

func (*LDBDatabase) LastKnownTD

func (self *LDBDatabase) LastKnownTD() []byte

func (*LDBDatabase) NewIterator

func (self *LDBDatabase) NewIterator() iterator.Iterator

func (*LDBDatabase) Put

func (self *LDBDatabase) Put(key []byte, value []byte)

func (*LDBDatabase) Write

func (self *LDBDatabase) Write(batch *leveldb.Batch) error

type LazyChunkReader

type LazyChunkReader struct {
	// contains filtered or unexported fields
}

LazyChunkReader implements LazySectionReader

func (*LazyChunkReader) Read

func (self *LazyChunkReader) Read(b []byte) (read int, err error)

Read keeps a cursor so cannot be called simulateously, see ReadAt

func (*LazyChunkReader) ReadAt

func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error)

read at can be called numerous times concurrent reads are allowed Size() needs to be called synchronously on the LazyChunkReader first

func (*LazyChunkReader) Seek

func (s *LazyChunkReader) Seek(offset int64, whence int) (int64, error)

func (*LazyChunkReader) Size

func (self *LazyChunkReader) Size(quitC chan bool) (n int64, err error)

Size is meant to be called on the LazySectionReader

type LazySectionReader

type LazySectionReader interface {
	Size(chan bool) (int64, error)
	io.Seeker
	io.Reader
	io.ReaderAt
}

Size, Seek, Read, ReadAt

type LazyTestSectionReader

type LazyTestSectionReader struct {
	*io.SectionReader
}

func (*LazyTestSectionReader) Size

func (self *LazyTestSectionReader) Size(chan bool) (int64, error)

type LocalStore

type LocalStore struct {
	DbStore ChunkStore
	// contains filtered or unexported fields
}

LocalStore is a combination of inmemory db over a disk persisted db implements a Get/Put with fallback (caching) logic using any 2 ChunkStores

func NewLocalStore

func NewLocalStore(hash SwarmHasher, params *StoreParams) (*LocalStore, error)

This constructor uses MemStore and DbStore as components

func (*LocalStore) CacheCounter

func (self *LocalStore) CacheCounter() uint64

func (*LocalStore) Close

func (self *LocalStore) Close()

Close local store

func (*LocalStore) DbCounter

func (self *LocalStore) DbCounter() uint64

func (*LocalStore) Get

func (self *LocalStore) Get(key Key) (chunk *Chunk, err error)

Get(chunk *Chunk) looks up a chunk in the local stores This method is blocking until the chunk is retrieved so additional timeout may be needed to wrap this call if ChunkStores are remote and can have long latency

func (*LocalStore) Put

func (self *LocalStore) Put(chunk *Chunk)

LocalStore is itself a chunk store unsafe, in that the data is not integrity checked

type MemStore

type MemStore struct {
	// contains filtered or unexported fields
}

func NewMemStore

func NewMemStore(d *DbStore, capacity uint) (m *MemStore)

func (*MemStore) Close

func (s *MemStore) Close()

Close memstore

func (*MemStore) Counter

func (s *MemStore) Counter() uint

func (*MemStore) Get

func (s *MemStore) Get(hash Key) (chunk *Chunk, err error)

func (*MemStore) Put

func (s *MemStore) Put(entry *Chunk)

entry (not its copy) is going to be in MemStore

type NetStore

type NetStore struct {
	// contains filtered or unexported fields
}

NetStore is a cloud storage access abstaction layer for swarm it contains the shared logic of network served chunk store/retrieval requests both local (coming from DPA api) and remote (coming from peers via bzz protocol) it implements the ChunkStore interface and embeds LocalStore

It is called by the bzz protocol instances via Depo (the store/retrieve request handler) a protocol instance is running on each peer, so this is heavily parallelised. NetStore falls back to a backend (CloudStorage interface) implemented by bzz/network/forwarder. forwarder or IPFS or IPΞS

func NewNetStore

func NewNetStore(hash SwarmHasher, lstore *LocalStore, cloud CloudStore, params *StoreParams) *NetStore

netstore contructor, takes path argument that is used to initialise dbStore, the persistent (disk) storage component of LocalStore the second argument is the hive, the connection/logistics manager for the node

func (*NetStore) Close

func (self *NetStore) Close()

Close netstore

func (*NetStore) Get

func (self *NetStore) Get(key Key) (*Chunk, error)

retrieve logic common for local and network chunk retrieval requests

func (*NetStore) Put

func (self *NetStore) Put(entry *Chunk)

store logic common to local and network chunk store requests ~ unsafe put in localdb no check if exists no extra copy no hash validation the chunk is forced to propagate (Cloud.Store) even if locally found! caller needs to make sure if that is wanted

type Peer

type Peer interface{}

Peer is the recorded as Source on the chunk should probably not be here? but network should wrap chunk object

type PyramidChunker

type PyramidChunker struct {
	// contains filtered or unexported fields
}

func NewPyramidChunker

func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker)

func (*PyramidChunker) Append

func (self *PyramidChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, storageWG, processorWG *sync.WaitGroup) (Key, error)

func (*PyramidChunker) Join

func (self *PyramidChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader

func (*PyramidChunker) Split

func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, storageWG, processorWG *sync.WaitGroup) (Key, error)

type RequestStatus

type RequestStatus struct {
	Key        Key
	Source     Peer
	C          chan bool
	Requesters map[uint64][]interface{}
}

each chunk when first requested opens a record associated with the request next time a request for the same chunk arrives, this record is updated this request status keeps track of the request ID-s as well as the requesting peers and has a channel that is closed when the chunk is retrieved. Multiple local callers can wait on this channel (or combined with a timeout, block with a select).

type Splitter

type Splitter interface {
	/*
	   When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
	   New chunks to store are coming to caller via the chunk storage channel, which the caller provides.
	   wg is a Waitgroup (can be nil) that can be used to block until the local storage finishes
	   The caller gets returned an error channel, if an error is encountered during splitting, it is fed to errC error channel.
	   A closed error signals process completion at which point the key can be considered final if there were no errors.
	*/
	Split(io.Reader, int64, chan *Chunk, *sync.WaitGroup, *sync.WaitGroup) (Key, error)

	/* This is the first step in making files mutable (not chunks)..
	   Append allows adding more data chunks to the end of the already existsing file.
	   The key for the root chunk is supplied to load the respective tree.
	   Rest of the parameters behave like Split.
	*/
	Append(Key, io.Reader, chan *Chunk, *sync.WaitGroup, *sync.WaitGroup) (Key, error)
}

Chunker is the interface to a component that is responsible for disassembling and assembling larger data and indended to be the dependency of a DPA storage system with fixed maximum chunksize.

It relies on the underlying chunking model.

When calling Split, the caller provides a channel (chan *Chunk) on which it receives chunks to store. The DPA delegates to storage layers (implementing ChunkStore interface).

Split returns an error channel, which the caller can monitor. After getting notified that all the data has been split (the error channel is closed), the caller can safely read or save the root key. Optionally it times out if not all chunks get stored or not the entire stream of data has been processed. By inspecting the errc channel the caller can check if any explicit errors (typically IO read/write failures) occurred during splitting.

When calling Join with a root key, the caller gets returned a seekable lazy reader. The caller again provides a channel on which the caller receives placeholder chunks with missing data. The DPA is supposed to forward this to the chunk stores and notify the chunker if the data has been delivered (i.e. retrieved from memory cache, disk-persisted db or cloud based swarm delivery). As the seekable reader is used, the chunker then puts these together the relevant parts on demand.

type StoreParams

type StoreParams struct {
	ChunkDbPath   string
	DbCapacity    uint64
	CacheCapacity uint
	Radius        int
}

func NewDefaultStoreParams

func NewDefaultStoreParams() (self *StoreParams)

create params with default values

func (*StoreParams) Init

func (self *StoreParams) Init(path string)

this can only finally be set after all config options (file, cmd line, env vars) have been evaluated

type SwarmHash

type SwarmHash interface {
	hash.Hash
	ResetWithLength([]byte)
}

type SwarmHasher

type SwarmHasher func() SwarmHash

func MakeHashFunc

func MakeHashFunc(hash string) SwarmHasher

type TreeChunker

type TreeChunker struct {
	// contains filtered or unexported fields
}

func NewTreeChunker

func NewTreeChunker(params *ChunkerParams) (self *TreeChunker)

func (*TreeChunker) Append

func (self *TreeChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error)

func (*TreeChunker) Join

func (self *TreeChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader

implements the Joiner interface

func (*TreeChunker) Split

func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error)

type TreeEntry

type TreeEntry struct {
	// contains filtered or unexported fields
}

Entry to create a tree node

func NewTreeEntry

func NewTreeEntry(pyramid *PyramidChunker) *TreeEntry

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL