Documentation ¶
Index ¶
- Constants
- Variables
- func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, ...) (map[cid.Cid]bool, error)
- func LdWrite(w io.Writer, d ...[]byte) (int64, error)
- func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error)
- type CarShard
- type CarStore
- type CarStoreGormMeta
- func (cs *CarStoreGormMeta) DeleteShardsAndRefs(ctx context.Context, ids []uint) error
- func (cs *CarStoreGormMeta) GetBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error)
- func (cs *CarStoreGormMeta) GetCompactionTargets(ctx context.Context, minShardCount int) ([]CompactionTarget, error)
- func (cs *CarStoreGormMeta) GetLastShard(ctx context.Context, user models.Uid) (*CarShard, error)
- func (cs *CarStoreGormMeta) GetUserShards(ctx context.Context, usr models.Uid) ([]CarShard, error)
- func (cs *CarStoreGormMeta) GetUserShardsDesc(ctx context.Context, usr models.Uid, minSeq int) ([]CarShard, error)
- func (cs *CarStoreGormMeta) GetUserStaleRefs(ctx context.Context, user models.Uid) ([]staleRef, error)
- func (cs *CarStoreGormMeta) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error)
- func (cs *CarStoreGormMeta) Init() error
- func (cs *CarStoreGormMeta) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error)
- func (cs *CarStoreGormMeta) PutShardAndRefs(ctx context.Context, shard *CarShard, brefs []map[string]any, ...) error
- func (cs *CarStoreGormMeta) SeqForRev(ctx context.Context, user models.Uid, sinceRev string) (int, error)
- func (cs *CarStoreGormMeta) SetStaleRef(ctx context.Context, uid models.Uid, staleToKeep []cid.Cid) error
- type CompactionStats
- type CompactionTarget
- type DeltaSession
- func (ds *DeltaSession) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)
- func (ds *DeltaSession) BaseCid() cid.Cid
- func (ds *DeltaSession) CalcDiff(ctx context.Context, skipcids map[cid.Cid]bool) error
- func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev string) ([]byte, error)
- func (ds *DeltaSession) DeleteBlock(ctx context.Context, c cid.Cid) error
- func (ds *DeltaSession) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error)
- func (ds *DeltaSession) GetSize(ctx context.Context, c cid.Cid) (int, error)
- func (ds *DeltaSession) Has(ctx context.Context, c cid.Cid) (bool, error)
- func (ds *DeltaSession) HashOnRead(hor bool)
- func (ds *DeltaSession) Put(ctx context.Context, b blockformat.Block) error
- func (ds *DeltaSession) PutMany(ctx context.Context, bs []blockformat.Block) error
- type ExponentialBackoffRetryPolicy
- type FileCarStore
- func (cs *FileCarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
- func (cs *FileCarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)
- func (cs *FileCarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)
- func (cs *FileCarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)
- func (cs *FileCarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
- func (cs *FileCarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
- func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error)
- func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, ...) error
- func (cs *FileCarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)
- func (cs *FileCarStore) WipeUserData(ctx context.Context, user models.Uid) error
- type LastShardSource
- type NonArchivalCarstore
- func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
- func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)
- func (cs *NonArchivalCarstore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)
- func (cs *NonArchivalCarstore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)
- func (cs *NonArchivalCarstore) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error)
- func (cs *NonArchivalCarstore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
- func (cs *NonArchivalCarstore) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error)
- func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
- func (cs *NonArchivalCarstore) ReadOnlySession(user models.Uid) (*DeltaSession, error)
- func (cs *NonArchivalCarstore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, ...) error
- func (cs *NonArchivalCarstore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)
- func (cs *NonArchivalCarstore) WipeUserData(ctx context.Context, user models.Uid) error
- type SQLiteStore
- func (sqs *SQLiteStore) CarStore() CarStore
- func (sqs *SQLiteStore) Close() error
- func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
- func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)
- func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error)
- func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)
- func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)
- func (sqs *SQLiteStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error)
- func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
- func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
- func (sqs *SQLiteStore) Open(path string) error
- func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error)
- func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, ...) error
- func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)
- func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error
- type ScyllaStore
- func (sqs *ScyllaStore) CarStore() CarStore
- func (sqs *ScyllaStore) Close() error
- func (sqs *ScyllaStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
- func (sqs *ScyllaStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)
- func (sqs *ScyllaStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error)
- func (sqs *ScyllaStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)
- func (sqs *ScyllaStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)
- func (sqs *ScyllaStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error)
- func (sqs *ScyllaStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
- func (sqs *ScyllaStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
- func (sqs *ScyllaStore) Open() error
- func (sqs *ScyllaStore) ReadOnlySession(user models.Uid) (*DeltaSession, error)
- func (sqs *ScyllaStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, ...) error
- func (sqs *ScyllaStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)
- func (sqs *ScyllaStore) WipeUserData(ctx context.Context, user models.Uid) error
- type UserStat
Constants ¶
const BigShardThreshold = 2 << 20
const MaxSliceLength = 2 << 20
Variables ¶
var CacheHits int64
var CacheMiss int64
var ErrNothingThere = errors.New("nothing to read)")
var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head")
Functions ¶
func BlockDiff ¶
func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block, skipcids map[cid.Cid]bool) (map[cid.Cid]bool, error)
Types ¶
type CarShard ¶
type CarShard struct { ID uint `gorm:"primarykey"` CreatedAt time.Time Root models.DbCID `gorm:"index"` DataStart int64 Seq int `gorm:"index:idx_car_shards_seq;index:idx_car_shards_usr_seq,priority:2,sort:desc"` Path string Usr models.Uid `gorm:"index:idx_car_shards_usr;index:idx_car_shards_usr_seq,priority:1"` Rev string }
type CarStore ¶
type CarStore interface { // TODO: not really part of general interface CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) // TODO: not really part of general interface GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) ReadOnlySession(user models.Uid) (*DeltaSession, error) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) WipeUserData(ctx context.Context, user models.Uid) error }
type CarStoreGormMeta ¶
type CarStoreGormMeta struct {
// contains filtered or unexported fields
}
func (*CarStoreGormMeta) DeleteShardsAndRefs ¶
func (cs *CarStoreGormMeta) DeleteShardsAndRefs(ctx context.Context, ids []uint) error
func (*CarStoreGormMeta) GetBlockRefsForShards ¶
func (cs *CarStoreGormMeta) GetBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error)
func (*CarStoreGormMeta) GetCompactionTargets ¶
func (cs *CarStoreGormMeta) GetCompactionTargets(ctx context.Context, minShardCount int) ([]CompactionTarget, error)
func (*CarStoreGormMeta) GetLastShard ¶
func (*CarStoreGormMeta) GetUserShards ¶
return all of a users's shards, ascending by Seq
func (*CarStoreGormMeta) GetUserShardsDesc ¶
func (cs *CarStoreGormMeta) GetUserShardsDesc(ctx context.Context, usr models.Uid, minSeq int) ([]CarShard, error)
return all of a users's shards, descending by Seq
func (*CarStoreGormMeta) GetUserStaleRefs ¶
func (*CarStoreGormMeta) HasUidCid ¶
func (cs *CarStoreGormMeta) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error)
Return true if any known record matches (Uid, Cid)
func (*CarStoreGormMeta) Init ¶
func (cs *CarStoreGormMeta) Init() error
func (*CarStoreGormMeta) LookupBlockRef ¶
func (cs *CarStoreGormMeta) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error)
For some Cid, lookup the block ref. Return the path of the file written, the offset within the file, and the user associated with the Cid.
func (*CarStoreGormMeta) PutShardAndRefs ¶
func (*CarStoreGormMeta) SetStaleRef ¶
type CompactionStats ¶
type CompactionTarget ¶
type DeltaSession ¶
type DeltaSession struct {
// contains filtered or unexported fields
}
func (*DeltaSession) AllKeysChan ¶
func (ds *DeltaSession) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)
func (*DeltaSession) BaseCid ¶
func (ds *DeltaSession) BaseCid() cid.Cid
func (*DeltaSession) CalcDiff ¶
func (ds *DeltaSession) CalcDiff(ctx context.Context, skipcids map[cid.Cid]bool) error
func (*DeltaSession) CloseWithRoot ¶
func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev string) ([]byte, error)
CloseWithRoot writes all new blocks in a car file to the writer with the given cid as the 'root'
func (*DeltaSession) DeleteBlock ¶
func (ds *DeltaSession) DeleteBlock(ctx context.Context, c cid.Cid) error
func (*DeltaSession) Get ¶
func (ds *DeltaSession) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error)
func (*DeltaSession) GetSize ¶
func (ds *DeltaSession) GetSize(ctx context.Context, c cid.Cid) (int, error)
func (*DeltaSession) Has ¶
func (ds *DeltaSession) Has(ctx context.Context, c cid.Cid) (bool, error)
func (*DeltaSession) HashOnRead ¶
func (ds *DeltaSession) HashOnRead(hor bool)
func (*DeltaSession) Put ¶
func (ds *DeltaSession) Put(ctx context.Context, b blockformat.Block) error
func (*DeltaSession) PutMany ¶
func (ds *DeltaSession) PutMany(ctx context.Context, bs []blockformat.Block) error
type ExponentialBackoffRetryPolicy ¶
TODO: copied from tango, re-unify? ExponentialBackoffRetryPolicy sleeps between attempts
func (*ExponentialBackoffRetryPolicy) Attempt ¶
func (e *ExponentialBackoffRetryPolicy) Attempt(q gocql.RetryableQuery) bool
func (*ExponentialBackoffRetryPolicy) GetRetryType ¶
func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) gocql.RetryType
GetRetryType returns the retry type for the given error
type FileCarStore ¶
type FileCarStore struct {
// contains filtered or unexported fields
}
func (*FileCarStore) CompactUserShards ¶
func (cs *FileCarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
func (*FileCarStore) GetCompactionTargets ¶
func (cs *FileCarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)
func (*FileCarStore) GetUserRepoHead ¶
func (*FileCarStore) GetUserRepoRev ¶
func (*FileCarStore) ImportSlice ¶
func (cs *FileCarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
func (*FileCarStore) NewDeltaSession ¶
func (cs *FileCarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
func (*FileCarStore) ReadOnlySession ¶
func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error)
func (*FileCarStore) ReadUserCar ¶
func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error
TODO: incremental is only ever called true, remove the param
func (*FileCarStore) WipeUserData ¶
type LastShardSource ¶
type NonArchivalCarstore ¶
type NonArchivalCarstore struct {
// contains filtered or unexported fields
}
func NewNonArchivalCarstore ¶
func NewNonArchivalCarstore(db *gorm.DB) (*NonArchivalCarstore, error)
func (*NonArchivalCarstore) CompactUserShards ¶
func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
func (*NonArchivalCarstore) GetCompactionTargets ¶
func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)
func (*NonArchivalCarstore) GetUserRepoHead ¶
func (*NonArchivalCarstore) GetUserRepoRev ¶
func (*NonArchivalCarstore) ImportSlice ¶
func (cs *NonArchivalCarstore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
func (*NonArchivalCarstore) LookupBlockRef ¶
func (*NonArchivalCarstore) NewDeltaSession ¶
func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
func (*NonArchivalCarstore) ReadOnlySession ¶
func (cs *NonArchivalCarstore) ReadOnlySession(user models.Uid) (*DeltaSession, error)
func (*NonArchivalCarstore) ReadUserCar ¶
func (cs *NonArchivalCarstore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error
TODO: incremental is only ever called true, remove the param
func (*NonArchivalCarstore) WipeUserData ¶
type SQLiteStore ¶
type SQLiteStore struct {
// contains filtered or unexported fields
}
func NewSqliteStore ¶
func NewSqliteStore(csdir string) (*SQLiteStore, error)
func (*SQLiteStore) CarStore ¶
func (sqs *SQLiteStore) CarStore() CarStore
func (*SQLiteStore) Close ¶
func (sqs *SQLiteStore) Close() error
func (*SQLiteStore) CompactUserShards ¶
func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
func (*SQLiteStore) GetCompactionTargets ¶
func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)
func (*SQLiteStore) GetLastShard ¶
GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache What we actually seem to need from this: last {Rev, Root.CID}
func (*SQLiteStore) GetUserRepoHead ¶
func (*SQLiteStore) GetUserRepoRev ¶
func (*SQLiteStore) ImportSlice ¶
func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
func (*SQLiteStore) NewDeltaSession ¶
func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
func (*SQLiteStore) Open ¶
func (sqs *SQLiteStore) Open(path string) error
func (*SQLiteStore) ReadOnlySession ¶
func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error)
func (*SQLiteStore) ReadUserCar ¶
func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error
ReadUserCar incremental is only ever called true
func (*SQLiteStore) Stat ¶
Stat is only used in a debugging admin handler don't bother implementing it (for now?)
func (*SQLiteStore) WipeUserData ¶
type ScyllaStore ¶
type ScyllaStore struct { WriteSession *gocql.Session ReadSession *gocql.Session // contains filtered or unexported fields }
func NewScyllaStore ¶
func NewScyllaStore(addrs []string, keyspace string) (*ScyllaStore, error)
func (*ScyllaStore) CarStore ¶
func (sqs *ScyllaStore) CarStore() CarStore
func (*ScyllaStore) Close ¶
func (sqs *ScyllaStore) Close() error
func (*ScyllaStore) CompactUserShards ¶
func (sqs *ScyllaStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
func (*ScyllaStore) GetCompactionTargets ¶
func (sqs *ScyllaStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)
func (*ScyllaStore) GetLastShard ¶
GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache What we actually seem to need from this: last {Rev, Root.CID}
func (*ScyllaStore) GetUserRepoHead ¶
func (*ScyllaStore) GetUserRepoRev ¶
func (*ScyllaStore) ImportSlice ¶
func (sqs *ScyllaStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
func (*ScyllaStore) NewDeltaSession ¶
func (sqs *ScyllaStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
func (*ScyllaStore) Open ¶
func (sqs *ScyllaStore) Open() error
func (*ScyllaStore) ReadOnlySession ¶
func (sqs *ScyllaStore) ReadOnlySession(user models.Uid) (*DeltaSession, error)
func (*ScyllaStore) ReadUserCar ¶
func (sqs *ScyllaStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error
ReadUserCar incremental is only ever called true
func (*ScyllaStore) Stat ¶
Stat is only used in a debugging admin handler don't bother implementing it (for now?)