dagstore

package module
v0.0.0-...-537c012 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2023 License: Apache-2.0, MIT Imports: 26 Imported by: 0

README

DAG store

This README will be populated soon. In the meantime, please refer to the design document.

License

Dual-licensed: MIT, Apache Software License v2, by way of the Permissive License Stack.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrShardUnknown is the error returned when the requested shard is
	// not known to the DAG store.
	ErrShardUnknown = errors.New("shard not found")

	// ErrShardExists is the error returned upon registering a duplicate shard.
	ErrShardExists = errors.New("shard already exists")

	// ErrShardInitializationFailed is returned when shard initialization fails.
	ErrShardInitializationFailed = errors.New("shard initialization failed")

	// ErrShardInUse is returned when the user attempts to destroy a shard that
	// is in use.
	ErrShardInUse = errors.New("shard in use")
)
View Source
var (
	// StoreNamespace is the namespace under which shard state will be persisted.
	StoreNamespace = ds.NewKey("dagstore")
)

Functions

func RecoverImmediately

func RecoverImmediately(ctx context.Context, dagst *DAGStore, failureCh chan ShardResult, maxAttempts uint64, onDone func())

RecoverImmediately takes a failureCh where DAGStore failures are sent, and attempts to recover the shard immediately up until maxAttempts for each unique shard.

Attempt tracking does not survive restarts. When the passed context fires, the failure handler will yield and the given `onDone` function is called before returning. It is recommended to call this method from a dedicated goroutine, as it runs an infinite event loop.

Types

type AcquireOpts

type AcquireOpts struct {
}

type AllShardsInfo

type AllShardsInfo map[shard.Key]ShardInfo

type Config

type Config struct {
	// TransientsDir is the path to directory where local transient files will
	// be created for remote mounts.
	TransientsDir string

	// IndexRepo is the full index repo to use.
	IndexRepo index.FullIndexRepo

	TopLevelIndex index.Inverted

	// Datastore is the datastore where shard state will be persisted.
	Datastore ds.Datastore

	// MountRegistry contains the set of recognized mount types.
	MountRegistry *mount.Registry

	// TraceCh is a channel where the caller desires to be notified of every
	// shard operation. Publishing to this channel blocks the event loop, so the
	// caller must ensure the channel is serviced appropriately.
	//
	// Note: Not actively consuming from this channel will make the event
	// loop block.
	TraceCh chan<- Trace

	// FailureCh is a channel to be notified every time that a shard moves to
	// ShardStateErrored. A nil value will send no failure notifications.
	// Failure events can be used to evaluate the error and call
	// DAGStore.RecoverShard if deemed recoverable.
	//
	// Note: Not actively consuming from this channel will make the event
	// loop block.
	FailureCh chan<- ShardResult

	// MaxConcurrentIndex is the maximum indexing jobs that can
	// run concurrently. 0 (default) disables throttling.
	MaxConcurrentIndex int

	// MaxConcurrentReadyFetches is the maximum number of fetches that will
	// run concurrently for mounts that are reporting themselves as ready for
	// immediate fetch. 0 (default) disables throttling.
	MaxConcurrentReadyFetches int

	// RecoverOnStart specifies whether failed shards should be recovered
	// on start.
	RecoverOnStart RecoverOnStartPolicy

	// ShardIndexer sets a custom callback for determining the index
	// mapping of CID->Offset that should be registered for a shard.
	ShardIndexer ShardIndexer
}

type DAGStore

type DAGStore struct {

	// TopLevelIndex is the top level (cid -> []shards) index that maps a cid to all the shards that is present in.
	TopLevelIndex index.Inverted
	// contains filtered or unexported fields
}

DAGStore is the central object of the DAG store.

func NewDAGStore

func NewDAGStore(cfg Config) (*DAGStore, error)

NewDAGStore constructs a new DAG store with the supplied configuration.

You must call Start for processing to begin.

func (*DAGStore) AcquireShard

func (d *DAGStore) AcquireShard(ctx context.Context, key shard.Key, out chan ShardResult, _ AcquireOpts) error

AcquireShard acquires access to the specified shard, and returns a ShardAccessor, an object that enables various patterns of access to the data contained within the shard.

This operation may resolve near-instantaneously if the shard is available locally. If not, the shard data may be fetched from its mount.

This method returns an error synchronously if preliminary validation fails. Otherwise, it queues the shard for acquisition. The caller should monitor supplied channel for a result.

func (*DAGStore) AllShardsInfo

func (d *DAGStore) AllShardsInfo() AllShardsInfo

AllShardsInfo returns the current state of all registered shards, as well as any errors.

func (*DAGStore) Close

func (d *DAGStore) Close() error

func (*DAGStore) DestroyShard

func (d *DAGStore) DestroyShard(ctx context.Context, key shard.Key, out chan ShardResult, _ DestroyOpts) error

func (*DAGStore) GC

func (d *DAGStore) GC(ctx context.Context) (*GCResult, error)

GC performs DAG store garbage collection by reclaiming transient files of shards that are currently available but inactive, or errored.

GC runs with exclusivity from the event loop.

func (*DAGStore) GetIterableIndex

func (d *DAGStore) GetIterableIndex(key shard.Key) (carindex.IterableIndex, error)

func (*DAGStore) GetShardInfo

func (d *DAGStore) GetShardInfo(k shard.Key) (ShardInfo, error)

GetShardInfo returns the current state of shard with key k.

If the shard is not known, ErrShardUnknown is returned.

func (*DAGStore) RecoverShard

func (d *DAGStore) RecoverShard(ctx context.Context, key shard.Key, out chan ShardResult, _ RecoverOpts) error

RecoverShard recovers a shard in ShardStateErrored state.

If the shard referenced by the key doesn't exist, an error is returned immediately and no result is delivered on the supplied channel.

If the shard is not in the ShardStateErrored state, the operation is accepted but an error will be returned quickly on the supplied channel.

Otherwise, the recovery operation will be queued and the supplied channel will be notified when it completes.

TODO add an operation identifier to ShardResult -- starts to look like

a Trace event?

func (*DAGStore) RegisterShard

func (d *DAGStore) RegisterShard(
	ctx context.Context, key shard.Key, mnt mount.Mount, out chan ShardResult, opts RegisterOpts,
) error

RegisterShard initiates the registration of a new shard.

This method returns an error synchronously if preliminary validation fails. Otherwise, it queues the shard for registration. The caller should monitor supplied channel for a result.

func (*DAGStore) ShardsContainingMultihash

func (d *DAGStore) ShardsContainingMultihash(ctx context.Context, h mh.Multihash) ([]shard.Key, error)

func (*DAGStore) Start

func (d *DAGStore) Start(ctx context.Context) error

Start starts a DAG store.

func (*DAGStore) Stats

func (d *DAGStore) Stats() map[ShardState]int

type DestroyOpts

type DestroyOpts struct {
}

type GCResult

type GCResult struct {
	// Shards includes an entry for every shard whose transient was reclaimed.
	// Nil error values indicate success.
	Shards map[shard.Key]error
}

GCResult is the result of performing a GC operation. It holds the results from deleting unused transients.

func (*GCResult) ShardFailures

func (e *GCResult) ShardFailures() int

ShardFailures returns the number of shards whose transient reclaim failed.

type Interface

type Interface interface {
	Start(ctx context.Context) error
	RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan ShardResult, opts RegisterOpts) error
	DestroyShard(ctx context.Context, key shard.Key, out chan ShardResult, _ DestroyOpts) error
	AcquireShard(ctx context.Context, key shard.Key, out chan ShardResult, _ AcquireOpts) error
	RecoverShard(ctx context.Context, key shard.Key, out chan ShardResult, _ RecoverOpts) error
	GetShardInfo(k shard.Key) (ShardInfo, error)
	GetIterableIndex(key shard.Key) (carindex.IterableIndex, error)
	AllShardsInfo() AllShardsInfo
	ShardsContainingMultihash(ctx context.Context, h mh.Multihash) ([]shard.Key, error)
	GC(ctx context.Context) (*GCResult, error)
	Close() error
}

Interface is the publicly exposed interface of the DAGStore. It exists for mocking or DI purposes.

type OpType

type OpType int
const (
	OpShardRegister OpType = iota
	OpShardInitialize
	OpShardMakeAvailable
	OpShardDestroy
	OpShardAcquire
	OpShardFail
	OpShardRelease
	OpShardRecover
)

func (OpType) String

func (o OpType) String() string

type PersistedShard

type PersistedShard struct {
	Key           string     `json:"k"`
	URL           string     `json:"u"`
	TransientPath string     `json:"t"`
	State         ShardState `json:"s"`
	Lazy          bool       `json:"l"`
	Error         string     `json:"e"`
}

PersistedShard is the persistent representation of the Shard.

type ReadBlockstore

type ReadBlockstore interface {
	Has(context.Context, cid.Cid) (bool, error)
	Get(context.Context, cid.Cid) (blocks.Block, error)
	GetSize(context.Context, cid.Cid) (int, error)
	AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)
	HashOnRead(enabled bool)
}

ReadBlockstore is a read-only view of Blockstores. This will be implemented by the CARv2 indexed blockstore.

type RecoverOnStartPolicy

type RecoverOnStartPolicy int

RecoverOnStartPolicy specifies the recovery policy for failed shards on DAGStore start.

const (
	// DoNotRecover will not recover any failed shards on start. Recovery
	// must be performed manually.
	DoNotRecover RecoverOnStartPolicy = iota

	// RecoverOnAcquire will automatically queue a recovery for a failed shard
	// on the first acquire attempt, and will park that acquire while recovery
	// is in progress.
	RecoverOnAcquire

	// RecoverNow will eagerly trigger a recovery for all failed shards
	// upon start.
	RecoverNow
)

type RecoverOpts

type RecoverOpts struct {
}

type RegisterOpts

type RegisterOpts struct {
	// ExistingTransient can be supplied when registering a shard to indicate
	// that there's already an existing local transient copy that can be used
	// for indexing.
	ExistingTransient string

	// LazyInitialization defers shard indexing to the first access instead of
	// performing it at registration time. Use this option when fetching the
	// asset is expensive.
	//
	// When true, the registration channel will fire as soon as the DAG store
	// has acknowledged the inclusion of the shard, without waiting for any
	// indexing to happen.
	LazyInitialization bool
}

type Shard

type Shard struct {
	// contains filtered or unexported fields
}

Shard encapsulates the state of a shard within the DAG store.

func (*Shard) MarshalJSON

func (s *Shard) MarshalJSON() ([]byte, error)

MarshalJSON returns a serialized representation of the state. It must be called with a shard lock (read, at least), such as from inside the event loop, as it accesses mutable state.

func (*Shard) UnmarshalJSON

func (s *Shard) UnmarshalJSON(b []byte) error

type ShardAccessor

type ShardAccessor struct {
	// contains filtered or unexported fields
}

ShardAccessor provides various means to access the data contained in a shard.

func NewShardAccessor

func NewShardAccessor(data mount.Reader, idx index.Index, s *Shard) (*ShardAccessor, error)

func (*ShardAccessor) Blockstore

func (sa *ShardAccessor) Blockstore() (ReadBlockstore, error)

func (*ShardAccessor) Close

func (sa *ShardAccessor) Close() error

Close terminates this shard accessor, releasing any resources associated with it, and decrementing internal refcounts.

func (*ShardAccessor) Reader

func (sa *ShardAccessor) Reader() io.Reader

Reader returns an io.Reader that can be used to read the data from the shard.

func (*ShardAccessor) Shard

func (sa *ShardAccessor) Shard() shard.Key

type ShardIndexer

type ShardIndexer func(context.Context, shard.Key, mount.Reader) (carindex.Index, error)

type ShardInfo

type ShardInfo struct {
	ShardState
	Error error
	// contains filtered or unexported fields
}

type ShardResult

type ShardResult struct {
	Key      shard.Key
	Error    error
	Accessor *ShardAccessor
}

ShardResult encapsulates a result from an asynchronous operation.

type ShardState

type ShardState byte
const (
	// ShardStateNew indicates that a shard has just been registered and is
	// about to be processed for activation.
	ShardStateNew ShardState = iota

	// ShardStateInitializing indicates that the shard is being initialized
	// by being fetched from the mount and being indexed.
	ShardStateInitializing

	// ShardStateAvailable indicates that the shard has been initialized and is
	// active for serving queries. There are no active shard readers.
	ShardStateAvailable

	// ShardStateServing indicates the shard has active readers and thus is
	// currently actively serving requests.
	ShardStateServing

	// ShardStateRecovering indicates that the shard is recovering from an
	// errored state. Such recoveries are always initiated by the user through
	// DAGStore.RecoverShard().
	ShardStateRecovering ShardState = 0x80

	// ShardStateErrored indicates that an unexpected error was encountered
	// during a shard operation, and therefore the shard needs to be recovered.
	ShardStateErrored ShardState = 0xf0

	// ShardStateUnknown indicates that it's not possible to determine the state
	// of the shard. This state is currently unused, but it's reserved.
	ShardStateUnknown ShardState = 0xff
)

func (ShardState) String

func (ss ShardState) String() string

type Trace

type Trace struct {
	Key   shard.Key
	Op    OpType
	After ShardInfo
}

Directories

Path Synopsis
Package throttle includes throttlers for composing in various contexts, such as inside Mounts for costly operations, and within the DAG store itself.
Package throttle includes throttlers for composing in various contexts, such as inside Mounts for costly operations, and within the DAG store itself.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL