Documentation ¶
Index ¶
- Constants
- Variables
- func GenerateUpsertStrings(model interface{}) (string, string)
- func LatestSchemaVersion() model.Version
- type AdvisoryLock
- type CSVBatch
- type CSVStorage
- type CSVStorageOptions
- type Catalog
- type Connector
- type Database
- func (d *Database) AsORM() *pg.DB
- func (d *Database) Close(ctx context.Context) error
- func (d *Database) Connect(ctx context.Context) error
- func (d *Database) ConsolidateGaps(ctx context.Context, minHeight, maxHeight int64, tasks ...string) (map[int64][]string, []int64, error)
- func (d *Database) ExecContext(c context.Context, query interface{}, params ...interface{}) (pg.Result, error)
- func (d *Database) GetSchemaVersions(ctx context.Context) (model.Version, model.Version, error)
- func (d *Database) IsConnected(ctx context.Context) bool
- func (d *Database) MigrateSchema(ctx context.Context) error
- func (d *Database) MigrateSchemaTo(ctx context.Context, target model.Version) error
- func (d *Database) PersistBatch(ctx context.Context, ps ...model.Persistable) error
- func (d *Database) QueryGaps(ctx context.Context, minHeight, maxHeight int64, tasks ...string) ([]*visor.GapReport, error)
- func (d *Database) SchemaConfig() schemas.Config
- func (d *Database) SetGapsFilled(ctx context.Context, height int64, tasks ...string) error
- func (d *Database) VerifyCurrentSchema(ctx context.Context) error
- type MemStorage
- type Metadata
- type NullStorage
- type StorageWithMetadata
- type TxStorage
Constants ¶
const ( FilePatternTokenTable = "{table}" FilePatternTokenJobName = "{jobname}" DefaultFilePattern = FilePatternTokenTable + ".csv" )
const MaxPostgresNameLength = 64
const PostgresTimestampFormat = "2006-01-02T15:04:05.999Z07:00"
Variables ¶
var ( ErrSchemaTooOld = errors.New("database schema is too old and requires migration") ErrSchemaTooNew = errors.New("database schema is too new for this version of lily") ErrNameTooLong = errors.New("name exceeds maximum length for postgres application names") )
var ErrLockNotAcquired = errors.New("lock not acquired")
var ErrLockNotReleased = errors.New("lock not released")
var ErrMarshalUnsupportedType = errors.New("cannot marshal unsupported type")
var Models = []interface{}{ (*blocks.BlockHeader)(nil), (*blocks.BlockParent)(nil), (*blocks.DrandBlockEntrie)(nil), (*datacap.DataCapBalance)(nil), (*miner.MinerBeneficiary)(nil), (*miner.MinerSectorDeal)(nil), (*miner.MinerSectorInfoV7)(nil), (*miner.MinerSectorInfoV1_6)(nil), (*miner.MinerSectorPost)(nil), (*miner.MinerPreCommitInfo)(nil), (*miner.MinerPreCommitInfoV9)(nil), (*miner.MinerSectorEvent)(nil), (*miner.MinerCurrentDeadlineInfo)(nil), (*miner.MinerFeeDebt)(nil), (*miner.MinerLockedFund)(nil), (*miner.MinerInfo)(nil), (*market.MarketDealProposal)(nil), (*market.MarketDealState)(nil), (*messages.Message)(nil), (*messages.BlockMessage)(nil), (*messages.Receipt)(nil), (*messages.MessageGasEconomy)(nil), (*messages.ParsedMessage)(nil), (*messages.InternalMessage)(nil), (*messages.InternalParsedMessage)(nil), (*messages.VMMessage)(nil), (*messages.ActorEvent)(nil), (*messages.MessageParam)(nil), (*messages.ReceiptReturn)(nil), (*multisig.MultisigTransaction)(nil), (*power.ChainPower)(nil), (*power.PowerActorClaim)(nil), (*reward.ChainReward)(nil), (*common.Actor)(nil), (*common.ActorState)(nil), (*init_.IDAddress)(nil), (*derived.GasOutputs)(nil), (*chain.ChainEconomics)(nil), (*chain.ChainConsensus)(nil), (*msapprovals.MultisigApproval)(nil), (*verifreg.VerifiedRegistryVerifier)(nil), (*verifreg.VerifiedRegistryVerifiedClient)(nil), (*verifreg.VerifiedRegistryClaim)(nil), (*fevm.FEVMActorStats)(nil), (*fevm.FEVMBlockHeader)(nil), (*fevm.FEVMReceipt)(nil), (*fevm.FEVMTransaction)(nil), (*fevm.FEVMContract)(nil), (*fevm.FEVMTrace)(nil), (*actordumps.FEVMActorDump)(nil), (*actordumps.MinerActorDump)(nil), }
Note this list is manually updated. Its only significant use is to verify schema compatibility between the version of lily being used and the database being written to.
Functions ¶
func GenerateUpsertStrings ¶
GenerateUpsertString accepts a lily model and returns two string containing SQL that may be used to upsert the model. The first string is the conflict statement and the second is the insert.
Example given the below model:
type SomeModel struct { Height int64 `pg:",pk,notnull,use_zero"` MinerID string `pg:",pk,notnull"` StateRoot string `pg:",pk,notnull"` OwnerID string `pg:",notnull"` WorkerID string `pg:",notnull"` }
The strings returned are: conflict string:
"(cid, height, state_root) DO UPDATE"
update string:
"owner_id" = EXCLUDED.owner_id, "worker_id" = EXCLUDED.worker_id
func LatestSchemaVersion ¶
LatestSchemaVersion returns the most recent version of the model schema. It is based on the highest migration version in the highest major schema version
Types ¶
type AdvisoryLock ¶
type AdvisoryLock int64
An AdvisoryLock is a lock that is managed by Postgres but is only enforced by the application. Advisory locks are automatically released at the end of a session. It is safe to hold both a shared and exclusive lock within a single session.
var (
SchemaLock AdvisoryLock = 1
)
Advisory locks
func (AdvisoryLock) LockExclusive ¶
func (l AdvisoryLock) LockExclusive(ctx context.Context, db *pg.DB) error
LockShared tries to acquire a session scoped exclusive advisory lock.
func (AdvisoryLock) LockShared ¶
func (l AdvisoryLock) LockShared(ctx context.Context, db *pg.DB) error
LockShared tries to acquire a session scoped shared advisory lock.
func (AdvisoryLock) UnlockExclusive ¶
func (l AdvisoryLock) UnlockExclusive(ctx context.Context, db *pg.DB) error
UnlockExclusive releases an exclusive advisory lock.
func (AdvisoryLock) UnlockShared ¶
func (l AdvisoryLock) UnlockShared(ctx context.Context, db *pg.DB) error
UnlockShared releases a shared advisory lock.
type CSVStorage ¶
type CSVStorage struct {
// contains filtered or unexported fields
}
func NewCSVStorage ¶
func NewCSVStorage(path string, version model.Version, opts CSVStorageOptions) (*CSVStorage, error)
func NewCSVStorageLatest ¶
func NewCSVStorageLatest(path string, opts CSVStorageOptions) (*CSVStorage, error)
func (*CSVStorage) ModelHeaders ¶ added in v0.8.6
func (c *CSVStorage) ModelHeaders(v interface{}) ([]string, error)
ModelHeaders returns the column headers used for csv output of the type of model held in v
func (*CSVStorage) PersistBatch ¶
func (c *CSVStorage) PersistBatch(ctx context.Context, ps ...model.Persistable) error
PersistBatch persists a batch of models to CSV, creating new files if they don't already exist otherwise appending to existing ones.
func (*CSVStorage) WithMetadata ¶
func (c *CSVStorage) WithMetadata(md Metadata) model.Storage
type CSVStorageOptions ¶
func DefaultCSVStorageOptions ¶
func DefaultCSVStorageOptions() CSVStorageOptions
type Catalog ¶
type Catalog struct {
// contains filtered or unexported fields
}
A Catalog holds a list of pre-configured storage systems and can open them when requested.
func NewCatalog ¶
func NewCatalog(cfg config.StorageConf) (*Catalog, error)
type Database ¶
func NewDatabase ¶
func NewDatabaseFromDB ¶
func (*Database) AsORM ¶
func (d *Database) AsORM() *pg.DB
MUST call Connect before using TODO(frrist): this is lazy, but good enough to MVP
func (*Database) Connect ¶
Connect opens a connection to the database and checks that the schema is compatible with the version required by this version of visor. ErrSchemaTooOld is returned if the database schema is older than the current schema, ErrSchemaTooNew if it is newer.
func (*Database) ConsolidateGaps ¶ added in v0.10.0
func (d *Database) ConsolidateGaps(ctx context.Context, minHeight, maxHeight int64, tasks ...string) (map[int64][]string, []int64, error)
returns a map of heights to missing tasks, and a list of heights to iterate the map in order with.
func (*Database) ExecContext ¶
func (*Database) GetSchemaVersions ¶
GetSchemaVersions returns the schema version in the database and the latest schema version defined by the available migrations.
func (*Database) MigrateSchema ¶
MigrateSchema migrates the database schema to the latest version based on the list of migrations available
func (*Database) MigrateSchemaTo ¶
MigrateSchema migrates the database schema to a specific version. Note that downgrading a schema to an earlier version is destructive and may result in the loss of data.
func (*Database) PersistBatch ¶
PersistBatch persists a batch of persistables in a single transaction
func (*Database) SchemaConfig ¶
func (*Database) SetGapsFilled ¶ added in v0.10.0
mark all gaps at height as filled.
type MemStorage ¶
type MemStorage struct { // TODO parallel map? Data map[string][]interface{} DataMu sync.Mutex Version model.Version }
func NewMemStorage ¶
func NewMemStorage(version model.Version) *MemStorage
func NewMemStorageLatest ¶
func NewMemStorageLatest() *MemStorage
func (*MemStorage) PersistBatch ¶
func (j *MemStorage) PersistBatch(ctx context.Context, ps ...model.Persistable) error
func (*MemStorage) PersistModel ¶
func (j *MemStorage) PersistModel(ctx context.Context, m interface{}) error
type Metadata ¶
type Metadata struct {
JobName string // name of the job using the storage
}
Metadata is additional information that a storage may use to annotate the data it writes
type NullStorage ¶
type NullStorage struct { }
A NullStorage ignores any requests to persist a model
func (*NullStorage) PersistBatch ¶
func (*NullStorage) PersistBatch(ctx context.Context, p ...model.Persistable) error
func (*NullStorage) PersistModel ¶
func (*NullStorage) PersistModel(ctx context.Context, m interface{}) error