carstore

package
v0.0.0-...-953fec9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2025 License: Apache-2.0, MIT Imports: 38 Imported by: 0

README

Carstore

Store a zillion users of PDS-like repo, with more limited operations (mainly: firehose in, firehose out).

ScyllaStore

Blocks stored in ScyllaDB. User and PDS metadata stored in gorm (PostgreSQL or sqlite3).

FileCarStore

Store 'car slices' from PDS source subscribeRepo firehose streams to filesystem. Store metadata to gorm postgresql (or sqlite3). Periodic compaction of car slices into fewer larger car slices. User and PDS metadata stored in gorm (PostgreSQL or sqlite3). FileCarStore was the first production carstore and used through at least 2024-11.

SQLiteStore

Experimental/demo. Blocks stored in trivial local sqlite3 schema. Minimal reference implementation from which fancy scalable/performant implementations may be derived.

CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid))
CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)

INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block

SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1

SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC

DELETE FROM blocks WHERE uid = ?

SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1

SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1

SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1

Documentation

Index

Constants

View Source
const BigShardThreshold = 2 << 20
View Source
const MaxSliceLength = 2 << 20

Variables

View Source
var CacheHits int64
View Source
var CacheMiss int64
View Source
var ErrNothingThere = errors.New("nothing to read)")
View Source
var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head")

Functions

func BlockDiff

func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block, skipcids map[cid.Cid]bool) (map[cid.Cid]bool, error)

func LdWrite

func LdWrite(w io.Writer, d ...[]byte) (int64, error)

Length-delimited Write Writer stream gets Uvarint length then concatenated data

func WriteCarHeader

func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error)

Types

type CarShard

type CarShard struct {
	ID        uint `gorm:"primarykey"`
	CreatedAt time.Time

	Root      models.DbCID `gorm:"index"`
	DataStart int64
	Seq       int `gorm:"index:idx_car_shards_seq;index:idx_car_shards_usr_seq,priority:2,sort:desc"`
	Path      string
	Usr       models.Uid `gorm:"index:idx_car_shards_usr;index:idx_car_shards_usr_seq,priority:1"`
	Rev       string
}

type CarStore

type CarStore interface {
	// TODO: not really part of general interface
	CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
	// TODO: not really part of general interface
	GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)

	GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)
	GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)
	ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
	NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
	ReadOnlySession(user models.Uid) (*DeltaSession, error)
	ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error
	Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)
	WipeUserData(ctx context.Context, user models.Uid) error
}

func NewCarStore

func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error)

type CarStoreGormMeta

type CarStoreGormMeta struct {
	// contains filtered or unexported fields
}

func (*CarStoreGormMeta) DeleteShardsAndRefs

func (cs *CarStoreGormMeta) DeleteShardsAndRefs(ctx context.Context, ids []uint) error

func (*CarStoreGormMeta) GetBlockRefsForShards

func (cs *CarStoreGormMeta) GetBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error)

func (*CarStoreGormMeta) GetCompactionTargets

func (cs *CarStoreGormMeta) GetCompactionTargets(ctx context.Context, minShardCount int) ([]CompactionTarget, error)

func (*CarStoreGormMeta) GetLastShard

func (cs *CarStoreGormMeta) GetLastShard(ctx context.Context, user models.Uid) (*CarShard, error)

func (*CarStoreGormMeta) GetUserShards

func (cs *CarStoreGormMeta) GetUserShards(ctx context.Context, usr models.Uid) ([]CarShard, error)

return all of a users's shards, ascending by Seq

func (*CarStoreGormMeta) GetUserShardsDesc

func (cs *CarStoreGormMeta) GetUserShardsDesc(ctx context.Context, usr models.Uid, minSeq int) ([]CarShard, error)

return all of a users's shards, descending by Seq

func (*CarStoreGormMeta) GetUserStaleRefs

func (cs *CarStoreGormMeta) GetUserStaleRefs(ctx context.Context, user models.Uid) ([]staleRef, error)

func (*CarStoreGormMeta) HasUidCid

func (cs *CarStoreGormMeta) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error)

Return true if any known record matches (Uid, Cid)

func (*CarStoreGormMeta) Init

func (cs *CarStoreGormMeta) Init() error

func (*CarStoreGormMeta) LookupBlockRef

func (cs *CarStoreGormMeta) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error)

For some Cid, lookup the block ref. Return the path of the file written, the offset within the file, and the user associated with the Cid.

func (*CarStoreGormMeta) PutShardAndRefs

func (cs *CarStoreGormMeta) PutShardAndRefs(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool) error

func (*CarStoreGormMeta) SeqForRev

func (cs *CarStoreGormMeta) SeqForRev(ctx context.Context, user models.Uid, sinceRev string) (int, error)

func (*CarStoreGormMeta) SetStaleRef

func (cs *CarStoreGormMeta) SetStaleRef(ctx context.Context, uid models.Uid, staleToKeep []cid.Cid) error

type CompactionStats

type CompactionStats struct {
	TotalRefs     int `json:"totalRefs"`
	StartShards   int `json:"startShards"`
	NewShards     int `json:"newShards"`
	SkippedShards int `json:"skippedShards"`
	ShardsDeleted int `json:"shardsDeleted"`
	DupeCount     int `json:"dupeCount"`
}

type CompactionTarget

type CompactionTarget struct {
	Usr       models.Uid
	NumShards int
}

type DeltaSession

type DeltaSession struct {
	// contains filtered or unexported fields
}

func (*DeltaSession) AllKeysChan

func (ds *DeltaSession) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)

func (*DeltaSession) BaseCid

func (ds *DeltaSession) BaseCid() cid.Cid

func (*DeltaSession) CalcDiff

func (ds *DeltaSession) CalcDiff(ctx context.Context, skipcids map[cid.Cid]bool) error

func (*DeltaSession) CloseWithRoot

func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev string) ([]byte, error)

CloseWithRoot writes all new blocks in a car file to the writer with the given cid as the 'root'

func (*DeltaSession) DeleteBlock

func (ds *DeltaSession) DeleteBlock(ctx context.Context, c cid.Cid) error

func (*DeltaSession) Get

func (ds *DeltaSession) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error)

func (*DeltaSession) GetSize

func (ds *DeltaSession) GetSize(ctx context.Context, c cid.Cid) (int, error)

func (*DeltaSession) Has

func (ds *DeltaSession) Has(ctx context.Context, c cid.Cid) (bool, error)

func (*DeltaSession) HashOnRead

func (ds *DeltaSession) HashOnRead(hor bool)

func (*DeltaSession) Put

func (*DeltaSession) PutMany

func (ds *DeltaSession) PutMany(ctx context.Context, bs []blockformat.Block) error

type ExponentialBackoffRetryPolicy

type ExponentialBackoffRetryPolicy struct {
	NumRetries int
	Min, Max   time.Duration
}

TODO: copied from tango, re-unify? ExponentialBackoffRetryPolicy sleeps between attempts

func (*ExponentialBackoffRetryPolicy) Attempt

func (*ExponentialBackoffRetryPolicy) GetRetryType

func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) gocql.RetryType

GetRetryType returns the retry type for the given error

type FileCarStore

type FileCarStore struct {
	// contains filtered or unexported fields
}

func (*FileCarStore) CompactUserShards

func (cs *FileCarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)

func (*FileCarStore) GetCompactionTargets

func (cs *FileCarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)

func (*FileCarStore) GetUserRepoHead

func (cs *FileCarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)

func (*FileCarStore) GetUserRepoRev

func (cs *FileCarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)

func (*FileCarStore) ImportSlice

func (cs *FileCarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)

func (*FileCarStore) NewDeltaSession

func (cs *FileCarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)

func (*FileCarStore) ReadOnlySession

func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error)

func (*FileCarStore) ReadUserCar

func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error

TODO: incremental is only ever called true, remove the param

func (*FileCarStore) Stat

func (cs *FileCarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)

func (*FileCarStore) WipeUserData

func (cs *FileCarStore) WipeUserData(ctx context.Context, user models.Uid) error

type LastShardSource

type LastShardSource interface {
	GetLastShard(context.Context, models.Uid) (*CarShard, error)
}

type NonArchivalCarstore

type NonArchivalCarstore struct {
	// contains filtered or unexported fields
}

func NewNonArchivalCarstore

func NewNonArchivalCarstore(db *gorm.DB) (*NonArchivalCarstore, error)

func (*NonArchivalCarstore) CompactUserShards

func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)

func (*NonArchivalCarstore) GetCompactionTargets

func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)

func (*NonArchivalCarstore) GetUserRepoHead

func (cs *NonArchivalCarstore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)

func (*NonArchivalCarstore) GetUserRepoRev

func (cs *NonArchivalCarstore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)

func (*NonArchivalCarstore) HasUidCid

func (cs *NonArchivalCarstore) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error)

func (*NonArchivalCarstore) ImportSlice

func (cs *NonArchivalCarstore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)

func (*NonArchivalCarstore) LookupBlockRef

func (cs *NonArchivalCarstore) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error)

func (*NonArchivalCarstore) NewDeltaSession

func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)

func (*NonArchivalCarstore) ReadOnlySession

func (cs *NonArchivalCarstore) ReadOnlySession(user models.Uid) (*DeltaSession, error)

func (*NonArchivalCarstore) ReadUserCar

func (cs *NonArchivalCarstore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error

TODO: incremental is only ever called true, remove the param

func (*NonArchivalCarstore) Stat

func (cs *NonArchivalCarstore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)

func (*NonArchivalCarstore) WipeUserData

func (cs *NonArchivalCarstore) WipeUserData(ctx context.Context, user models.Uid) error

type SQLiteStore

type SQLiteStore struct {
	// contains filtered or unexported fields
}

func NewSqliteStore

func NewSqliteStore(csdir string) (*SQLiteStore, error)

func (*SQLiteStore) CarStore

func (sqs *SQLiteStore) CarStore() CarStore

func (*SQLiteStore) Close

func (sqs *SQLiteStore) Close() error

func (*SQLiteStore) CompactUserShards

func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)

func (*SQLiteStore) GetCompactionTargets

func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)

func (*SQLiteStore) GetLastShard

func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error)

GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache What we actually seem to need from this: last {Rev, Root.CID}

func (*SQLiteStore) GetUserRepoHead

func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)

func (*SQLiteStore) GetUserRepoRev

func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)

func (*SQLiteStore) HasUidCid

func (sqs *SQLiteStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error)

HasUidCid needed for NewDeltaSession userView

func (*SQLiteStore) ImportSlice

func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)

func (*SQLiteStore) NewDeltaSession

func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)

func (*SQLiteStore) Open

func (sqs *SQLiteStore) Open(path string) error

func (*SQLiteStore) ReadOnlySession

func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error)

func (*SQLiteStore) ReadUserCar

func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error

ReadUserCar incremental is only ever called true

func (*SQLiteStore) Stat

func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)

Stat is only used in a debugging admin handler don't bother implementing it (for now?)

func (*SQLiteStore) WipeUserData

func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error

type ScyllaStore

type ScyllaStore struct {
	WriteSession *gocql.Session
	ReadSession  *gocql.Session
	// contains filtered or unexported fields
}

func NewScyllaStore

func NewScyllaStore(addrs []string, keyspace string) (*ScyllaStore, error)

func (*ScyllaStore) CarStore

func (sqs *ScyllaStore) CarStore() CarStore

func (*ScyllaStore) Close

func (sqs *ScyllaStore) Close() error

func (*ScyllaStore) CompactUserShards

func (sqs *ScyllaStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)

func (*ScyllaStore) GetCompactionTargets

func (sqs *ScyllaStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)

func (*ScyllaStore) GetLastShard

func (sqs *ScyllaStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error)

GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache What we actually seem to need from this: last {Rev, Root.CID}

func (*ScyllaStore) GetUserRepoHead

func (sqs *ScyllaStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)

func (*ScyllaStore) GetUserRepoRev

func (sqs *ScyllaStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)

func (*ScyllaStore) HasUidCid

func (sqs *ScyllaStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error)

HasUidCid needed for NewDeltaSession userView

func (*ScyllaStore) ImportSlice

func (sqs *ScyllaStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)

func (*ScyllaStore) NewDeltaSession

func (sqs *ScyllaStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)

func (*ScyllaStore) Open

func (sqs *ScyllaStore) Open() error

func (*ScyllaStore) ReadOnlySession

func (sqs *ScyllaStore) ReadOnlySession(user models.Uid) (*DeltaSession, error)

func (*ScyllaStore) ReadUserCar

func (sqs *ScyllaStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error

ReadUserCar incremental is only ever called true

func (*ScyllaStore) Stat

func (sqs *ScyllaStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)

Stat is only used in a debugging admin handler don't bother implementing it (for now?)

func (*ScyllaStore) WipeUserData

func (sqs *ScyllaStore) WipeUserData(ctx context.Context, user models.Uid) error

type UserStat

type UserStat struct {
	Seq     int
	Root    string
	Created time.Time
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL