storer

package
v2.1.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2024 License: BSD-3-Clause Imports: 57 Imported by: 2

Documentation

Index

Constants

View Source
const SampleSize = 16

Variables

View Source
var ErrDBQuit = errors.New("db quit")

Functions

func Compact

func Compact(ctx context.Context, basePath string, opts *Options, validate bool) error

Compact minimizes sharky disk usage by, using the current sharky locations from the storer, relocating chunks starting from the end of the used slots to the first available slots.

func ValidatePinCollectionChunks

func ValidatePinCollectionChunks(ctx context.Context, basePath, pin, location string, opts *Options) error

ValidatePinCollectionChunks collects all chunk addresses that are present in a pin collection but are either invalid or missing altogether.

func ValidateReserve added in v2.1.0

func ValidateReserve(ctx context.Context, basePath string, opts *Options) error

Validate ensures that all retrievalIndex chunks are correctly stored in sharky.

func ValidateRetrievalIndex added in v2.1.0

func ValidateRetrievalIndex(ctx context.Context, basePath string, opts *Options) error

ValidateRetrievalIndex ensures that all retrievalIndex chunks are correctly stored in sharky.

Types

type BinC

type BinC struct {
	Address swarm.Address
	BinID   uint64
	BatchID []byte
}

BinC is the result returned from the SubscribeBin channel that contains the chunk address and the binID

type CacheStat

type CacheStat struct {
	Size     int
	Capacity int
}

type CacheStore

type CacheStore interface {
	// Lookup method provides a storage.Getter wrapped around the underlying
	// ChunkStore which will update cache related indexes if required on successful
	// lookups.
	Lookup() storage.Getter
	// Cache method provides a storage.Putter which will add the chunks to cache.
	// This will add the chunk to underlying store as well as new indexes which
	// will keep track of the chunk in the cache.
	Cache() storage.Putter
}

CacheStore is a logical component of the storer that deals with cache content.

type ChunkStoreStat

type ChunkStoreStat struct {
	TotalChunks    int
	SharedSlots    int
	ReferenceCount int
}

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB implements all the component stores described above.

func New

func New(ctx context.Context, dirPath string, opts *Options) (*DB, error)

New returns a newly constructed DB object which implements all the above component stores.

func (*DB) Cache

func (db *DB) Cache() storage.Putter

Cache is the implementation of the CacheStore.Cache method.

func (*DB) CacheShallowCopy

func (db *DB) CacheShallowCopy(ctx context.Context, store transaction.Storage, addrs ...swarm.Address) error

CacheShallowCopy creates cache entries with the expectation that the chunk already exists in the chunkstore.

func (*DB) ChunkStore

func (db *DB) ChunkStore() storage.ReadOnlyChunkStore

func (*DB) Close

func (db *DB) Close() error

func (*DB) DebugInfo

func (db *DB) DebugInfo(ctx context.Context) (Info, error)

func (*DB) DeletePin

func (db *DB) DeletePin(ctx context.Context, root swarm.Address) (err error)

DeletePin is the implementation of the PinStore.DeletePin method.

func (*DB) DeleteSession

func (db *DB) DeleteSession(tagID uint64) error

DeleteSession is the implementation of the UploadStore.DeleteSession method.

func (*DB) DirectUpload

func (db *DB) DirectUpload() PutterSession

DirectUpload is the implementation of the NetStore.DirectUpload method.

func (*DB) Download

func (db *DB) Download(cache bool) storage.Getter

Download is the implementation of the NetStore.Download method.

func (*DB) EvictBatch

func (db *DB) EvictBatch(ctx context.Context, batchID []byte) error

EvictBatch evicts all chunks belonging to a batch from the reserve.

func (*DB) HasPin

func (db *DB) HasPin(root swarm.Address) (has bool, err error)

HasPin is the implementation of the PinStore.HasPin method.

func (*DB) IsWithinStorageRadius

func (db *DB) IsWithinStorageRadius(addr swarm.Address) bool

func (*DB) IteratePinCollection

func (db *DB) IteratePinCollection(root swarm.Address, iterateFn func(swarm.Address) (bool, error)) error

func (*DB) ListSessions

func (db *DB) ListSessions(offset, limit int) ([]SessionInfo, error)

ListSessions is the implementation of the UploadStore.ListSessions method.

func (*DB) Lock added in v2.1.0

func (db *DB) Lock(strs ...string) func()

func (*DB) Lookup

func (db *DB) Lookup() storage.Getter

Lookup is the implementation of the CacheStore.Lookup method.

func (*DB) Metrics

func (db *DB) Metrics() []prometheus.Collector

Metrics returns set of prometheus collectors.

func (*DB) NewCollection

func (db *DB) NewCollection(ctx context.Context) (PutterSession, error)

NewCollection is the implementation of the PinStore.NewCollection method.

func (*DB) NewSession

func (db *DB) NewSession() (SessionInfo, error)

NewSession is the implementation of UploadStore.NewSession method.

func (*DB) PinIntegrity

func (db *DB) PinIntegrity() *PinIntegrity

func (*DB) Pins

func (db *DB) Pins() (address []swarm.Address, err error)

Pins is the implementation of the PinStore.Pins method.

func (*DB) PusherFeed

func (db *DB) PusherFeed() <-chan *pusher.Op

PusherFeed is the implementation of the NetStore.PusherFeed method.

func (*DB) Report

func (db *DB) Report(ctx context.Context, chunk swarm.Chunk, state storage.ChunkState) error

Report implements the storage.PushReporter by wrapping the internal reporter with a transaction.

func (*DB) ReserveGet

func (db *DB) ReserveGet(ctx context.Context, addr swarm.Address, batchID []byte) (ch swarm.Chunk, err error)

func (*DB) ReserveHas

func (db *DB) ReserveHas(addr swarm.Address, batchID []byte) (has bool, err error)

func (*DB) ReserveIterateChunks

func (db *DB) ReserveIterateChunks(cb func(swarm.Chunk) (bool, error)) error

func (*DB) ReserveLastBinIDs

func (db *DB) ReserveLastBinIDs() ([]uint64, uint64, error)

ReserveLastBinIDs returns all of the highest binIDs from all the bins in the reserve and the epoch time of the reserve.

func (*DB) ReservePutter

func (db *DB) ReservePutter() storage.Putter

ReservePutter returns a Putter for inserting chunks into the reserve.

func (*DB) ReserveSample

func (db *DB) ReserveSample(
	ctx context.Context,
	anchor []byte,
	storageRadius uint8,
	consensusTime uint64,
	minBatchBalance *big.Int,
) (Sample, error)

ReserveSample generates the sample of reserve storage of a node required for the storage incentives agent to participate in the lottery round. In order to generate this sample we need to iterate through all the chunks in the node's reserve and calculate the transformed hashes of all the chunks using the anchor as the salt. In order to generate the transformed hashes, we will use the std hmac keyed-hash implementation by using the anchor as the key. Nodes need to calculate the sample in the most optimal way and there are time restrictions. The lottery round is a time based round, so nodes participating in the round need to perform this calculation within the round limits. In order to optimize this we use a simple pipeline pattern: Iterate chunk addresses -> Get the chunk data and calculate transformed hash -> Assemble the sample

func (*DB) ReserveSize

func (db *DB) ReserveSize() int

func (*DB) ReserveSizeWithinRadius

func (db *DB) ReserveSizeWithinRadius() uint64

func (*DB) Session

func (db *DB) Session(tagID uint64) (SessionInfo, error)

Session is the implementation of the UploadStore.Session method.

func (*DB) SetRetrievalService

func (db *DB) SetRetrievalService(r retrieval.Interface)

func (*DB) StartReserveWorker

func (db *DB) StartReserveWorker(ctx context.Context, s Syncer, radius func() (uint8, error))

func (*DB) Storage added in v2.1.0

func (db *DB) Storage() transaction.Storage

func (*DB) StorageRadius

func (db *DB) StorageRadius() uint8

func (*DB) SubscribeBin

func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan *BinC, func(), <-chan error)

SubscribeBin returns a channel that feeds all the chunks in the reserve from a certain bin between a start and end binIDs.

func (*DB) SubscribePush

func (db *DB) SubscribePush(ctx context.Context) (<-chan swarm.Chunk, func())

func (*DB) Upload

func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession, error)

Upload is the implementation of UploadStore.Upload method.

type Debugger

type Debugger interface {
	DebugInfo(context.Context) (Info, error)
}

Debugger is a helper interface which can be used to debug the storer.

type Info

type Info struct {
	Upload     UploadStat
	Pinning    PinningStat
	Cache      CacheStat
	Reserve    ReserveStat
	ChunkStore ChunkStoreStat
}

type LocalStore

type LocalStore interface {
	ChunkStore() storage.ReadOnlyChunkStore
}

LocalStore is a read-only ChunkStore. It can be used to check if chunk is known locally, but it cannot tell what is the context of the chunk (whether it is pinned, uploaded, etc.).

type NetStore

type NetStore interface {
	// DirectUpload provides a session which can be used to push chunks directly
	// to the network.
	DirectUpload() PutterSession
	// Download provides a getter which can be used to download data. If the data
	// is found locally, its returned immediately, otherwise it is retrieved from
	// the network.
	Download(cache bool) storage.Getter
	// PusherFeed is the feed for direct push chunks. This can be used by the
	// pusher component to push out the chunks.
	PusherFeed() <-chan *pusher.Op
}

NetStore is a logical component of the storer that deals with network. It will push/retrieve chunks from the network.

type Options

type Options struct {
	// These are options related to levelDB. Currently, the underlying storage used is levelDB.
	LdbStats                  atomic.Pointer[prometheus.HistogramVec]
	LdbOpenFilesLimit         uint64
	LdbBlockCacheCapacity     uint64
	LdbWriteBufferSize        uint64
	LdbDisableSeeksCompaction bool
	Logger                    log.Logger
	Tracer                    *tracing.Tracer

	Address        swarm.Address
	WarmupDuration time.Duration
	Batchstore     postage.Storer
	ValidStamp     postage.ValidStampFn
	RadiusSetter   topology.SetStorageRadiuser
	StateStore     storage.StateStorer

	ReserveCapacity       int
	ReserveWakeUpDuration time.Duration
	ReserveMinEvictCount  uint64

	CacheCapacity      uint64
	CacheMinEvictCount uint64
}

Options provides a container to configure different things in the storer.

type PinIntegrity

type PinIntegrity struct {
	Store  storage.Store
	Sharky *sharky.Store
}

func (*PinIntegrity) Check

func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan PinStat)

type PinIterator

type PinIterator interface {
	IteratePinCollection(root swarm.Address, iterateFn func(swarm.Address) (bool, error)) error
}

PinIterator is a helper interface which can be used to iterate over all the chunks in a pinning collection.

type PinStat

type PinStat struct {
	Ref                     swarm.Address
	Total, Missing, Invalid int
}

type PinStore

type PinStore interface {
	// NewCollection can be used to create a new PutterSession which writes a new
	// pinning collection. The address passed in during the Done of the session is
	// used as the root referencce.
	NewCollection(context.Context) (PutterSession, error)
	// DeletePin deletes all the chunks associated with the collection pointed to
	// by the swarm.Address passed in.
	DeletePin(context.Context, swarm.Address) error
	// Pins returns all the root references of pinning collections.
	Pins() ([]swarm.Address, error)
	// HasPin is a helper which checks if a collection exists with the root
	// reference passed in.
	HasPin(swarm.Address) (bool, error)
}

PinStore is a logical component of the storer which deals with pinning functionality.

type PinningStat

type PinningStat struct {
	TotalCollections int
	TotalChunks      int
}

type PutterSession

type PutterSession interface {
	storage.Putter
	// Done is used to close the session and optionally assign a swarm.Address to
	// this session.
	Done(swarm.Address) error
	// Cleanup is used to cleanup any state related to this session in case of
	// any error.
	Cleanup() error
}

PutterSession provides a session around the storage.Putter. The session on successful completion commits all the operations or in case of error, rolls back the state.

type RadiusChecker

type RadiusChecker interface {
	IsWithinStorageRadius(addr swarm.Address) bool
	StorageRadius() uint8
}

RadiusChecker provides the radius related functionality.

type Reserve

type Reserve interface {
	ReserveStore
	EvictBatch(ctx context.Context, batchID []byte) error
	ReserveSample(context.Context, []byte, uint8, uint64, *big.Int) (Sample, error)
	ReserveSize() int
}

Reserve is a logical component of the storer that deals with reserve content. It will implement all the core functionality required for the protocols.

type ReserveIterator

type ReserveIterator interface {
	ReserveIterateChunks(cb func(swarm.Chunk) (bool, error)) error
}

ReserveIterator is a helper interface which can be used to iterate over all the chunks in the reserve.

type ReserveStat

type ReserveStat struct {
	SizeWithinRadius int
	TotalSize        int
	Capacity         int
	LastBinIDs       []uint64
	Epoch            uint64
}

type ReserveStore

type ReserveStore interface {
	ReserveGet(ctx context.Context, addr swarm.Address, batchID []byte) (swarm.Chunk, error)
	ReserveHas(addr swarm.Address, batchID []byte) (bool, error)
	ReservePutter() storage.Putter
	SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan *BinC, func(), <-chan error)
	ReserveLastBinIDs() ([]uint64, uint64, error)
	RadiusChecker
}

ReserveStore is a logical component of the storer that deals with reserve content. It will implement all the core functionality required for the protocols.

type Sample

type Sample struct {
	Stats SampleStats
	Items []SampleItem
}

func MakeSampleUsingChunks

func MakeSampleUsingChunks(chunks []swarm.Chunk, anchor []byte) (Sample, error)

MakeSampleUsingChunks returns Sample constructed using supplied chunks.

func RandSample

func RandSample(t *testing.T, anchor []byte) Sample

RandSample returns Sample with random values.

type SampleItem

type SampleItem struct {
	TransformedAddress swarm.Address
	ChunkAddress       swarm.Address
	ChunkData          []byte
	Stamp              *postage.Stamp
}

type SampleStats

type SampleStats struct {
	TotalDuration             time.Duration
	TotalIterated             int64
	IterationDuration         time.Duration
	SampleInserts             int64
	NewIgnored                int64
	InvalidStamp              int64
	BelowBalanceIgnored       int64
	TaddrDuration             time.Duration
	ValidStampDuration        time.Duration
	BatchesBelowValueDuration time.Duration
	RogueChunk                int64
	ChunkLoadDuration         time.Duration
	ChunkLoadFailed           int64
	StampLoadFailed           int64
}

type SessionInfo

type SessionInfo = upload.TagItem

SessionInfo is a type which exports the storer tag object. This object stores all the relevant information about a particular session.

type Syncer

type Syncer interface {
	// Number of active historical syncing jobs.
	SyncRate() float64
	Start(context.Context)
}

type UploadStat

type UploadStat struct {
	TotalUploaded uint64
	TotalSynced   uint64
	PendingUpload uint64
}

type UploadStore

type UploadStore interface {
	// Upload provides a PutterSession which is tied to the tagID. Optionally if
	// users requests to pin the data, a new pinning collection is created.
	Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession, error)
	// NewSession can be used to obtain a tag ID to use for a new Upload session.
	NewSession() (SessionInfo, error)
	// Session will show the information about the session.
	Session(tagID uint64) (SessionInfo, error)
	// DeleteSession will delete the session info associated with the tag id.
	DeleteSession(tagID uint64) error
	// ListSessions will list all the Sessions currently being tracked.
	ListSessions(offset, limit int) ([]SessionInfo, error)
}

UploadStore is a logical component of the storer which deals with the upload of data to swarm.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL