Documentation ¶
Index ¶
- Variables
- func RecoverImmediately(ctx context.Context, dagst *DAGStore, failureCh chan ShardResult, ...)
- type AcquireOpts
- type AllShardsInfo
- type Config
- type DAGStore
- func (d *DAGStore) AcquireShard(ctx context.Context, key shard.Key, out chan ShardResult, _ AcquireOpts) error
- func (d *DAGStore) AllShardsInfo() AllShardsInfo
- func (d *DAGStore) Close() error
- func (d *DAGStore) DestroyShard(ctx context.Context, key shard.Key, out chan ShardResult, _ DestroyOpts) error
- func (d *DAGStore) GC(ctx context.Context) (*GCResult, error)
- func (d *DAGStore) GetIterableIndex(key shard.Key) (carindex.IterableIndex, error)
- func (d *DAGStore) GetShardInfo(k shard.Key) (ShardInfo, error)
- func (d *DAGStore) RecoverShard(ctx context.Context, key shard.Key, out chan ShardResult, _ RecoverOpts) error
- func (d *DAGStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan ShardResult, ...) error
- func (d *DAGStore) ShardsContainingMultihash(ctx context.Context, h mh.Multihash) ([]shard.Key, error)
- func (d *DAGStore) Start(ctx context.Context) error
- func (d *DAGStore) Stats() map[ShardState]int
- type DestroyOpts
- type GCResult
- type Interface
- type OpType
- type PersistedShard
- type ReadBlockstore
- type RecoverOnStartPolicy
- type RecoverOpts
- type RegisterOpts
- type Shard
- type ShardAccessor
- type ShardIndexer
- type ShardInfo
- type ShardResult
- type ShardState
- type Trace
Constants ¶
This section is empty.
Variables ¶
var ( // ErrShardUnknown is the error returned when the requested shard is // not known to the DAG store. ErrShardUnknown = errors.New("shard not found") // ErrShardExists is the error returned upon registering a duplicate shard. ErrShardExists = errors.New("shard already exists") // ErrShardInitializationFailed is returned when shard initialization fails. ErrShardInitializationFailed = errors.New("shard initialization failed") // ErrShardInUse is returned when the user attempts to destroy a shard that // is in use. ErrShardInUse = errors.New("shard in use") )
var ( // StoreNamespace is the namespace under which shard state will be persisted. StoreNamespace = ds.NewKey("dagstore") )
Functions ¶
func RecoverImmediately ¶
func RecoverImmediately(ctx context.Context, dagst *DAGStore, failureCh chan ShardResult, maxAttempts uint64, onDone func())
RecoverImmediately takes a failureCh where DAGStore failures are sent, and attempts to recover the shard immediately up until maxAttempts for each unique shard.
Attempt tracking does not survive restarts. When the passed context fires, the failure handler will yield and the given `onDone` function is called before returning. It is recommended to call this method from a dedicated goroutine, as it runs an infinite event loop.
Types ¶
type AcquireOpts ¶
type AcquireOpts struct { }
type AllShardsInfo ¶
type Config ¶
type Config struct { // TransientsDir is the path to directory where local transient files will // be created for remote mounts. TransientsDir string // IndexRepo is the full index repo to use. IndexRepo index.FullIndexRepo TopLevelIndex index.Inverted // Datastore is the datastore where shard state will be persisted. Datastore ds.Datastore // MountRegistry contains the set of recognized mount types. MountRegistry *mount.Registry // TraceCh is a channel where the caller desires to be notified of every // shard operation. Publishing to this channel blocks the event loop, so the // caller must ensure the channel is serviced appropriately. // // Note: Not actively consuming from this channel will make the event // loop block. TraceCh chan<- Trace // FailureCh is a channel to be notified every time that a shard moves to // ShardStateErrored. A nil value will send no failure notifications. // Failure events can be used to evaluate the error and call // DAGStore.RecoverShard if deemed recoverable. // // Note: Not actively consuming from this channel will make the event // loop block. FailureCh chan<- ShardResult // MaxConcurrentIndex is the maximum indexing jobs that can // run concurrently. 0 (default) disables throttling. MaxConcurrentIndex int // MaxConcurrentReadyFetches is the maximum number of fetches that will // run concurrently for mounts that are reporting themselves as ready for // immediate fetch. 0 (default) disables throttling. MaxConcurrentReadyFetches int // RecoverOnStart specifies whether failed shards should be recovered // on start. RecoverOnStart RecoverOnStartPolicy // ShardIndexer sets a custom callback for determining the index // mapping of CID->Offset that should be registered for a shard. ShardIndexer ShardIndexer }
type DAGStore ¶
type DAGStore struct { // TopLevelIndex is the top level (cid -> []shards) index that maps a cid to all the shards that is present in. TopLevelIndex index.Inverted // contains filtered or unexported fields }
DAGStore is the central object of the DAG store.
func NewDAGStore ¶
NewDAGStore constructs a new DAG store with the supplied configuration.
You must call Start for processing to begin.
func (*DAGStore) AcquireShard ¶
func (d *DAGStore) AcquireShard(ctx context.Context, key shard.Key, out chan ShardResult, _ AcquireOpts) error
AcquireShard acquires access to the specified shard, and returns a ShardAccessor, an object that enables various patterns of access to the data contained within the shard.
This operation may resolve near-instantaneously if the shard is available locally. If not, the shard data may be fetched from its mount.
This method returns an error synchronously if preliminary validation fails. Otherwise, it queues the shard for acquisition. The caller should monitor supplied channel for a result.
func (*DAGStore) AllShardsInfo ¶
func (d *DAGStore) AllShardsInfo() AllShardsInfo
AllShardsInfo returns the current state of all registered shards, as well as any errors.
func (*DAGStore) DestroyShard ¶
func (d *DAGStore) DestroyShard(ctx context.Context, key shard.Key, out chan ShardResult, _ DestroyOpts) error
func (*DAGStore) GC ¶
GC performs DAG store garbage collection by reclaiming transient files of shards that are currently available but inactive, or errored.
GC runs with exclusivity from the event loop.
func (*DAGStore) GetIterableIndex ¶
func (*DAGStore) GetShardInfo ¶
GetShardInfo returns the current state of shard with key k.
If the shard is not known, ErrShardUnknown is returned.
func (*DAGStore) RecoverShard ¶
func (d *DAGStore) RecoverShard(ctx context.Context, key shard.Key, out chan ShardResult, _ RecoverOpts) error
RecoverShard recovers a shard in ShardStateErrored state.
If the shard referenced by the key doesn't exist, an error is returned immediately and no result is delivered on the supplied channel.
If the shard is not in the ShardStateErrored state, the operation is accepted but an error will be returned quickly on the supplied channel.
Otherwise, the recovery operation will be queued and the supplied channel will be notified when it completes.
TODO add an operation identifier to ShardResult -- starts to look like
a Trace event?
func (*DAGStore) RegisterShard ¶
func (d *DAGStore) RegisterShard( ctx context.Context, key shard.Key, mnt mount.Mount, out chan ShardResult, opts RegisterOpts, ) error
RegisterShard initiates the registration of a new shard.
This method returns an error synchronously if preliminary validation fails. Otherwise, it queues the shard for registration. The caller should monitor supplied channel for a result.
func (*DAGStore) ShardsContainingMultihash ¶
func (*DAGStore) Stats ¶
func (d *DAGStore) Stats() map[ShardState]int
type DestroyOpts ¶
type DestroyOpts struct { }
type GCResult ¶
type GCResult struct { // Shards includes an entry for every shard whose transient was reclaimed. // Nil error values indicate success. Shards map[shard.Key]error }
GCResult is the result of performing a GC operation. It holds the results from deleting unused transients.
func (*GCResult) ShardFailures ¶
ShardFailures returns the number of shards whose transient reclaim failed.
type Interface ¶
type Interface interface { Start(ctx context.Context) error RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan ShardResult, opts RegisterOpts) error DestroyShard(ctx context.Context, key shard.Key, out chan ShardResult, _ DestroyOpts) error AcquireShard(ctx context.Context, key shard.Key, out chan ShardResult, _ AcquireOpts) error RecoverShard(ctx context.Context, key shard.Key, out chan ShardResult, _ RecoverOpts) error GetShardInfo(k shard.Key) (ShardInfo, error) GetIterableIndex(key shard.Key) (carindex.IterableIndex, error) AllShardsInfo() AllShardsInfo ShardsContainingMultihash(ctx context.Context, h mh.Multihash) ([]shard.Key, error) GC(ctx context.Context) (*GCResult, error) Close() error }
Interface is the publicly exposed interface of the DAGStore. It exists for mocking or DI purposes.
type PersistedShard ¶
type PersistedShard struct { Key string `json:"k"` URL string `json:"u"` TransientPath string `json:"t"` State ShardState `json:"s"` Lazy bool `json:"l"` Error string `json:"e"` }
PersistedShard is the persistent representation of the Shard.
type ReadBlockstore ¶
type ReadBlockstore interface { Has(context.Context, cid.Cid) (bool, error) Get(context.Context, cid.Cid) (blocks.Block, error) GetSize(context.Context, cid.Cid) (int, error) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) HashOnRead(enabled bool) }
ReadBlockstore is a read-only view of Blockstores. This will be implemented by the CARv2 indexed blockstore.
type RecoverOnStartPolicy ¶
type RecoverOnStartPolicy int
RecoverOnStartPolicy specifies the recovery policy for failed shards on DAGStore start.
const ( // DoNotRecover will not recover any failed shards on start. Recovery // must be performed manually. DoNotRecover RecoverOnStartPolicy = iota // RecoverOnAcquire will automatically queue a recovery for a failed shard // on the first acquire attempt, and will park that acquire while recovery // is in progress. RecoverOnAcquire // RecoverNow will eagerly trigger a recovery for all failed shards // upon start. RecoverNow )
type RecoverOpts ¶
type RecoverOpts struct { }
type RegisterOpts ¶
type RegisterOpts struct { // ExistingTransient can be supplied when registering a shard to indicate // that there's already an existing local transient copy that can be used // for indexing. ExistingTransient string // LazyInitialization defers shard indexing to the first access instead of // performing it at registration time. Use this option when fetching the // asset is expensive. // // When true, the registration channel will fire as soon as the DAG store // has acknowledged the inclusion of the shard, without waiting for any // indexing to happen. LazyInitialization bool }
type Shard ¶
type Shard struct {
// contains filtered or unexported fields
}
Shard encapsulates the state of a shard within the DAG store.
func (*Shard) MarshalJSON ¶
MarshalJSON returns a serialized representation of the state. It must be called with a shard lock (read, at least), such as from inside the event loop, as it accesses mutable state.
func (*Shard) UnmarshalJSON ¶
type ShardAccessor ¶
type ShardAccessor struct {
// contains filtered or unexported fields
}
ShardAccessor provides various means to access the data contained in a shard.
func NewShardAccessor ¶
func (*ShardAccessor) Blockstore ¶
func (sa *ShardAccessor) Blockstore() (ReadBlockstore, error)
func (*ShardAccessor) Close ¶
func (sa *ShardAccessor) Close() error
Close terminates this shard accessor, releasing any resources associated with it, and decrementing internal refcounts.
func (*ShardAccessor) Reader ¶
func (sa *ShardAccessor) Reader() io.Reader
Reader returns an io.Reader that can be used to read the data from the shard.
func (*ShardAccessor) Shard ¶
func (sa *ShardAccessor) Shard() shard.Key
type ShardIndexer ¶
type ShardInfo ¶
type ShardInfo struct { ShardState Error error // contains filtered or unexported fields }
type ShardResult ¶
type ShardResult struct { Key shard.Key Error error Accessor *ShardAccessor }
ShardResult encapsulates a result from an asynchronous operation.
type ShardState ¶
type ShardState byte
const ( // ShardStateNew indicates that a shard has just been registered and is // about to be processed for activation. ShardStateNew ShardState = iota // ShardStateInitializing indicates that the shard is being initialized // by being fetched from the mount and being indexed. ShardStateInitializing // ShardStateAvailable indicates that the shard has been initialized and is // active for serving queries. There are no active shard readers. ShardStateAvailable // ShardStateServing indicates the shard has active readers and thus is // currently actively serving requests. ShardStateServing // ShardStateRecovering indicates that the shard is recovering from an // errored state. Such recoveries are always initiated by the user through // DAGStore.RecoverShard(). ShardStateRecovering ShardState = 0x80 // ShardStateErrored indicates that an unexpected error was encountered // during a shard operation, and therefore the shard needs to be recovered. ShardStateErrored ShardState = 0xf0 // ShardStateUnknown indicates that it's not possible to determine the state // of the shard. This state is currently unused, but it's reserved. ShardStateUnknown ShardState = 0xff )
func (ShardState) String ¶
func (ss ShardState) String() string
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package throttle includes throttlers for composing in various contexts, such as inside Mounts for costly operations, and within the DAG store itself.
|
Package throttle includes throttlers for composing in various contexts, such as inside Mounts for costly operations, and within the DAG store itself. |