storage

package
v0.10.1-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2022 License: Apache-2.0, MIT Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FilePatternTokenTable   = "{table}"
	FilePatternTokenJobName = "{jobname}"

	DefaultFilePattern = FilePatternTokenTable + ".csv"
)
View Source
const MaxPostgresNameLength = 64
View Source
const PostgresTimestampFormat = "2006-01-02T15:04:05.999Z07:00"

Variables

View Source
var (
	ErrSchemaTooOld = errors.New("database schema is too old and requires migration")
	ErrSchemaTooNew = errors.New("database schema is too new for this version of lily")
	ErrNameTooLong  = errors.New("name exceeds maximum length for postgres application names")
)
View Source
var ErrLockNotAcquired = errors.New("lock not acquired")
View Source
var ErrLockNotReleased = errors.New("lock not released")
View Source
var ErrMarshalUnsupportedType = errors.New("cannot marshal unsupported type")

Note this list is manually updated. Its only significant use is to verify schema compatibility between the version of lily being used and the database being written to.

Functions

func GenerateUpsertStrings

func GenerateUpsertStrings(model interface{}) (string, string)

GenerateUpsertString accepts a lily model and returns two string containing SQL that may be used to upsert the model. The first string is the conflict statement and the second is the insert.

Example given the below model:

type SomeModel struct {
	Height    int64  `pg:",pk,notnull,use_zero"`
	MinerID   string `pg:",pk,notnull"`
	StateRoot string `pg:",pk,notnull"`
	OwnerID  string `pg:",notnull"`
	WorkerID string `pg:",notnull"`
}

The strings returned are: conflict string:

"(cid, height, state_root) DO UPDATE"

update string:

"owner_id" = EXCLUDED.owner_id, "worker_id" = EXCLUDED.worker_id

func LatestSchemaVersion

func LatestSchemaVersion() model.Version

LatestSchemaVersion returns the most recent version of the model schema. It is based on the highest migration version in the highest major schema version

Types

type AdvisoryLock

type AdvisoryLock int64

An AdvisoryLock is a lock that is managed by Postgres but is only enforced by the application. Advisory locks are automatically released at the end of a session. It is safe to hold both a shared and exclusive lock within a single session.

var (
	SchemaLock AdvisoryLock = 1
)

Advisory locks

func (AdvisoryLock) LockExclusive

func (l AdvisoryLock) LockExclusive(ctx context.Context, db *pg.DB) error

LockShared tries to acquire a session scoped exclusive advisory lock.

func (AdvisoryLock) LockShared

func (l AdvisoryLock) LockShared(ctx context.Context, db *pg.DB) error

LockShared tries to acquire a session scoped shared advisory lock.

func (AdvisoryLock) UnlockExclusive

func (l AdvisoryLock) UnlockExclusive(ctx context.Context, db *pg.DB) error

UnlockExclusive releases an exclusive advisory lock.

func (AdvisoryLock) UnlockShared

func (l AdvisoryLock) UnlockShared(ctx context.Context, db *pg.DB) error

UnlockShared releases a shared advisory lock.

type CSVBatch

type CSVBatch struct {
	// contains filtered or unexported fields
}

func (*CSVBatch) PersistModel

func (c *CSVBatch) PersistModel(ctx context.Context, m interface{}) error

type CSVStorage

type CSVStorage struct {
	// contains filtered or unexported fields
}

func NewCSVStorage

func NewCSVStorage(path string, version model.Version, opts CSVStorageOptions) (*CSVStorage, error)

func NewCSVStorageLatest

func NewCSVStorageLatest(path string, opts CSVStorageOptions) (*CSVStorage, error)

func (*CSVStorage) ModelHeaders added in v0.8.6

func (c *CSVStorage) ModelHeaders(v interface{}) ([]string, error)

ModelHeaders returns the column headers used for csv output of the type of model held in v

func (*CSVStorage) PersistBatch

func (c *CSVStorage) PersistBatch(ctx context.Context, ps ...model.Persistable) error

PersistBatch persists a batch of models to CSV, creating new files if they don't already exist otherwise appending to existing ones.

func (*CSVStorage) WithMetadata

func (c *CSVStorage) WithMetadata(md Metadata) model.Storage

type CSVStorageOptions

type CSVStorageOptions struct {
	OmitHeader  bool
	FilePattern string
}

func DefaultCSVStorageOptions

func DefaultCSVStorageOptions() CSVStorageOptions

type Catalog

type Catalog struct {
	// contains filtered or unexported fields
}

A Catalog holds a list of pre-configured storage systems and can open them when requested.

func NewCatalog

func NewCatalog(cfg config.StorageConf) (*Catalog, error)

func (*Catalog) Connect

func (c *Catalog) Connect(ctx context.Context, name string, md Metadata) (model.Storage, error)

Connect returns a storage that is ready for use. If name is empty, a null storage will be returned

func (*Catalog) ConnectAsDatabase

func (c *Catalog) ConnectAsDatabase(ctx context.Context, name string, md Metadata) (*Database, error)

ConnectAsDatabase returns a storage that is ready to use for reading and writing: `name` must corresponds to a Database storage.

type Connector

type Connector interface {
	Connect(context.Context) error
	IsConnected(context.Context) bool
	Close(context.Context) error
}

type Database

type Database struct {
	Clock  clock.Clock
	Upsert bool
	// contains filtered or unexported fields
}

func NewDatabase

func NewDatabase(ctx context.Context, url string, poolSize int, name string, schemaName string, upsert bool) (*Database, error)

func NewDatabaseFromDB

func NewDatabaseFromDB(ctx context.Context, db *pg.DB, schemaName string) (*Database, error)

func (*Database) AsORM

func (d *Database) AsORM() *pg.DB

MUST call Connect before using TODO(frrist): this is lazy, but good enough to MVP

func (*Database) Close

func (d *Database) Close(ctx context.Context) error

func (*Database) Connect

func (d *Database) Connect(ctx context.Context) error

Connect opens a connection to the database and checks that the schema is compatible with the version required by this version of visor. ErrSchemaTooOld is returned if the database schema is older than the current schema, ErrSchemaTooNew if it is newer.

func (*Database) ConsolidateGaps added in v0.10.0

func (d *Database) ConsolidateGaps(ctx context.Context, minHeight, maxHeight int64, tasks ...string) (map[int64][]string, []int64, error)

returns a map of heights to missing tasks, and a list of heights to iterate the map in order with.

func (*Database) ExecContext

func (d *Database) ExecContext(c context.Context, query interface{}, params ...interface{}) (pg.Result, error)

func (*Database) GetSchemaVersions

func (d *Database) GetSchemaVersions(ctx context.Context) (model.Version, model.Version, error)

GetSchemaVersions returns the schema version in the database and the latest schema version defined by the available migrations.

func (*Database) IsConnected

func (d *Database) IsConnected(ctx context.Context) bool

func (*Database) MigrateSchema

func (d *Database) MigrateSchema(ctx context.Context) error

MigrateSchema migrates the database schema to the latest version based on the list of migrations available

func (*Database) MigrateSchemaTo

func (d *Database) MigrateSchemaTo(ctx context.Context, target model.Version) error

MigrateSchema migrates the database schema to a specific version. Note that downgrading a schema to an earlier version is destructive and may result in the loss of data.

func (*Database) PersistBatch

func (d *Database) PersistBatch(ctx context.Context, ps ...model.Persistable) error

PersistBatch persists a batch of persistables in a single transaction

func (*Database) QueryGaps added in v0.10.0

func (d *Database) QueryGaps(ctx context.Context, minHeight, maxHeight int64, tasks ...string) ([]*visor.GapReport, error)

func (*Database) SchemaConfig

func (d *Database) SchemaConfig() schemas.Config

func (*Database) SetGapsFilled added in v0.10.0

func (d *Database) SetGapsFilled(ctx context.Context, height int64, tasks ...string) error

mark all gaps at height as filled.

func (*Database) VerifyCurrentSchema

func (d *Database) VerifyCurrentSchema(ctx context.Context) error

VerifyCurrentSchema compares the schema present in the database with the models used by visor and returns an error if they are incompatible

type MemStorage

type MemStorage struct {
	// TODO parallel map?
	Data    map[string][]interface{}
	DataMu  sync.Mutex
	Version model.Version
}

func NewMemStorage

func NewMemStorage(version model.Version) *MemStorage

func NewMemStorageLatest

func NewMemStorageLatest() *MemStorage

func (*MemStorage) PersistBatch

func (j *MemStorage) PersistBatch(ctx context.Context, ps ...model.Persistable) error

func (*MemStorage) PersistModel

func (j *MemStorage) PersistModel(ctx context.Context, m interface{}) error

type Metadata

type Metadata struct {
	JobName string // name of the job using the storage
}

Metadata is additional information that a storage may use to annotate the data it writes

type NullStorage

type NullStorage struct {
}

A NullStorage ignores any requests to persist a model

func (*NullStorage) PersistBatch

func (*NullStorage) PersistBatch(ctx context.Context, p ...model.Persistable) error

func (*NullStorage) PersistModel

func (*NullStorage) PersistModel(ctx context.Context, m interface{}) error

type StorageWithMetadata

type StorageWithMetadata interface {
	// WithMetadata returns a storage based configured with the supplied metadata
	WithMetadata(Metadata) model.Storage
}

type TxStorage

type TxStorage struct {
	// contains filtered or unexported fields
}

func (*TxStorage) PersistModel

func (s *TxStorage) PersistModel(ctx context.Context, m interface{}) error

PersistModel persists a single model

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL