fluxdb

package
v0.1.1-docker Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2020 License: Apache-2.0 Imports: 37 Imported by: 0

README

FluxDB

An historical database for tabular data with loader for EOSIO state table & permissions.

Important While this is used in production currently at dfuse, this repository will undergo a major refactoring in the upcoming months. We will revisit the project, its design, its role and the overall architecture. It will most probably be split into smaller pieces, a core module that does storage of historical data in a really agnostic fashion, even blockchain agnostic and a second component that uses the core API to implement the current set of features in EOSIO.

Installation

Install through dfuse for EOSIO

Usage

Assuming the following has been run already:

export FLUXDB_BASE_URL=https://mainnet.eos.dfuse.io

Get currency balances for user's account eoscanada (specified as scope) on various currencies (specified as accounts).

curl "$FLUXDB_BASE_URL/v0/state/tables/accounts?block_num=26415000&accounts=eosio.token|eosadddddddd|tokenbyeocat|ethsidechain|epraofficial|alibabapoole|hirevibeshvt|oo1122334455|irespotokens|publytoken11|parslseed123|trybenetwork|zkstokensr4u&scope=eoscanadacom&table=accounts&json=true&&token=$DFUSE" | jq .

Features

FluxDB supports parallel ingestion by doing a first sharding pass on the chain history, splitting the different tables being mutated in different shards, then ingesting linearly each table.

This allows ingestion of the whole history in a few hours.

Documentation

See the /v0/state endpoints under https://docs.dfuse.io/reference/eosio/rest/

Contributing

Issues and PR related to FluxDB belong to this repository

See the dfuse-wide contribution guide if you wish to contribute to this code base.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrCleanSourceStop = errors.New("clean source stop")

Functions

func AppBlockNumHigherThanHeadBlockError

func AppBlockNumHigherThanHeadBlockError(ctx context.Context, chosenBlockNum, headBlockNum, lastWrittenBlockNum uint32) *derr.ErrorResponse

func AppBlockNumHigherThanLIBError

func AppBlockNumHigherThanLIBError(ctx context.Context, chosenBlockNum, lastWrittenBlockNum uint32) *derr.ErrorResponse

func BlockNum

func BlockNum(blockID string) uint32

func BuildReprocessingPipeline

func BuildReprocessingPipeline(handler bstream.Handler, blocksStore dstore.Store, startBlockNum uint64, numBlocksBeforeStart uint64, parallelDownloadCount int) bstream.Source

func DataABINotFoundError

func DataABINotFoundError(ctx context.Context, account string, blockNum uint32) *derr.ErrorResponse

func DataDecodingRowError

func DataDecodingRowError(ctx context.Context, hexData string) *derr.ErrorResponse

func DataPublicKeyNotFoundError

func DataPublicKeyNotFoundError(ctx context.Context, publicKey string) *derr.ErrorResponse

func DataRowNotFoundError

func DataRowNotFoundError(ctx context.Context, primaryKey string) *derr.ErrorResponse

account eos.AccountName, table eos.TableName,

func DataTableNotFoundError

func DataTableNotFoundError(ctx context.Context, account eos.AccountName, table eos.TableName) *derr.ErrorResponse

func EN

func EN(name string) uint64

func HexBlockNum

func HexBlockNum(blockNum uint32) string

func HexName

func HexName(name uint64) string

func HexRevBlockNum

func HexRevBlockNum(blockNum uint32) string

func K

func K(key string) []byte

func N

func N(name string) uint64

func NameToString

func NameToString(in uint64) string

func NewKVStore

func NewKVStore(dsnString string) (store.KVStore, error)

NewKVStore creates the underlying KV store engine base on the DSN string received.

This exists in `fluxdb` package since it's shared between `app` and `cmd` packages.

func PreprocessBlock

func PreprocessBlock(rawBlk *bstream.Block) (interface{}, error)

func StringToName

func StringToName(s string) (val uint64, err error)

Types

type ABIRow

type ABIRow struct {
	Account   uint64
	BlockNum  uint32 // in Read operation only
	PackedABI []byte
}

type AuthLinkRow

type AuthLinkRow struct {
	Deletion bool

	Account  uint64
	Contract uint64
	Action   uint64

	PermissionName uint64
}

type FluxDB

type FluxDB struct {
	*shutter.Shutter

	SpeculativeWritesFetcher func(ctx context.Context, headBlockID string, upToBlockNum uint32) (speculativeWrites []*WriteRequest)
	HeadBlock                func(ctx context.Context) bstream.BlockRef
	// contains filtered or unexported fields
}

func New

func New(kvStore store.KVStore) *FluxDB

func (*FluxDB) BuildPipeline

func (fdb *FluxDB) BuildPipeline(getBlockID bstream.EternalSourceStartBackAtBlock, handler bstream.Handler, liveSource bool, blocksStore dstore.Store, publisherAddr string, parallelDownloadCount int)

func (*FluxDB) CheckCleanDBForSharding

func (fdb *FluxDB) CheckCleanDBForSharding() error

func (*FluxDB) Close

func (fdb *FluxDB) Close() error

func (*FluxDB) FetchLastWrittenBlock

func (fdb *FluxDB) FetchLastWrittenBlock(ctx context.Context) (lastWrittenBlock bstream.BlockRef, err error)

func (*FluxDB) GetABI

func (fdb *FluxDB) GetABI(ctx context.Context, blockNum uint32, account uint64, speculativeWrites []*WriteRequest) (out *ABIRow, err error)

func (*FluxDB) HasSeenPublicKeyOnce

func (fdb *FluxDB) HasSeenPublicKeyOnce(
	ctx context.Context,
	publicKey string,
) (exists bool, err error)

func (*FluxDB) HasSeenTableOnce

func (fdb *FluxDB) HasSeenTableOnce(
	ctx context.Context,
	account eos.AccountName,
	table eos.TableName,
) (exists bool, err error)

func (*FluxDB) IndexTables

func (fdb *FluxDB) IndexTables(ctx context.Context) error

func (*FluxDB) IsReady

func (fdb *FluxDB) IsReady() bool

func (*FluxDB) IsSharding

func (fdb *FluxDB) IsSharding() bool

func (*FluxDB) Launch

func (fdb *FluxDB) Launch(devMode bool, httpListenAddr string)

func (*FluxDB) ReadKeyAccounts

func (fdb *FluxDB) ReadKeyAccounts(
	ctx context.Context,
	blockNum uint32,
	publicKey string,
	speculativeWrites []*WriteRequest,
) (accountNames []eos.AccountName, err error)

func (*FluxDB) ReadLinkedPermissions

func (fdb *FluxDB) ReadLinkedPermissions(ctx context.Context, blockNum uint32, account eos.AccountName, speculativeWrites []*WriteRequest) (resp []*LinkedPermission, err error)

func (*FluxDB) ReadTable

func (fdb *FluxDB) ReadTable(ctx context.Context, r *ReadTableRequest) (resp *ReadTableResponse, err error)

func (*FluxDB) ReadTableRow

func (fdb *FluxDB) ReadTableRow(ctx context.Context, r *ReadTableRowRequest) (resp *ReadTableRowResponse, err error)

func (*FluxDB) ReadTableScopes

func (fdb *FluxDB) ReadTableScopes(
	ctx context.Context,
	blockNum uint32,
	account eos.AccountName,
	table eos.TableName,
	speculativeWrites []*WriteRequest,
) (scopes []eos.Name, err error)

func (*FluxDB) SetReady

func (fdb *FluxDB) SetReady()

SetReady marks the process as ready, meaning it has crossed the "close to real-time" threshold.

func (*FluxDB) SetSharding

func (fdb *FluxDB) SetSharding(shardIndex, shardCount int)

func (*FluxDB) SetStopBlock

func (fdb *FluxDB) SetStopBlock(stopBlock uint32)

func (*FluxDB) VerifyAllShardsWritten

func (fdb *FluxDB) VerifyAllShardsWritten() error

func (*FluxDB) WriteBatch

func (fdb *FluxDB) WriteBatch(ctx context.Context, w []*WriteRequest) error

type FluxDBHandler

type FluxDBHandler struct {
	// contains filtered or unexported fields
}

FluxDBHandler is a pipeline that writes in FluxDB

func NewHandler

func NewHandler(db *FluxDB) *FluxDBHandler

func (*FluxDBHandler) EnableWriteOnEachIrreversibleStep

func (p *FluxDBHandler) EnableWriteOnEachIrreversibleStep()

func (*FluxDBHandler) EnableWrites

func (p *FluxDBHandler) EnableWrites()

func (*FluxDBHandler) FetchSpeculativeWrites

func (p *FluxDBHandler) FetchSpeculativeWrites(ctx context.Context, headBlockID string, upToBlockNum uint32) (speculativeWrites []*WriteRequest)

func (*FluxDBHandler) HeadBlock

func (p *FluxDBHandler) HeadBlock(ctx context.Context) bstream.BlockRef

func (*FluxDBHandler) InitializeStartBlockID

func (p *FluxDBHandler) InitializeStartBlockID() (startBlock bstream.BlockRef, err error)

func (*FluxDBHandler) ProcessBlock

func (p *FluxDBHandler) ProcessBlock(rawBlk *bstream.Block, rawObj interface{}) error

type KeyAccountRow

type KeyAccountRow struct {
	PublicKey  string
	Account    uint64
	Permission uint64
	Deletion   bool
}

type LinkedPermission

type LinkedPermission struct {
	Contract       string `json:"contract"`
	Action         string `json:"action"`
	PermissionName string `json:"permission_name"`
}

type ReadTableRequest

type ReadTableRequest struct {
	Account, Scope, Table uint64
	Key                   *uint64
	BlockNum              uint32
	Offset, Limit         *uint32
	SpeculativeWrites     []*WriteRequest
}

type ReadTableResponse

type ReadTableResponse struct {
	ABI  *ABIRow
	Rows []*TableRow
}

type ReadTableRowRequest

type ReadTableRowRequest struct {
	ReadTableRequest
	PrimaryKey uint64
}

type ReadTableRowResponse

type ReadTableRowResponse struct {
	ABI *ABIRow
	Row *TableRow
}

type ShardInjector

type ShardInjector struct {
	// contains filtered or unexported fields
}

func NewShardInjector

func NewShardInjector(shardsStore dstore.Store, db *FluxDB) *ShardInjector

func (*ShardInjector) Run

func (s *ShardInjector) Run() (err error)

type Sharder

type Sharder struct {
	// contains filtered or unexported fields
}

func NewSharder

func NewSharder(shardsStore dstore.Store, shardCount int, startBlock, stopBlock uint32) *Sharder

func (*Sharder) ProcessBlock

func (s *Sharder) ProcessBlock(rawBlk *bstream.Block, rawObj interface{}) error

type TableDataRow

type TableDataRow struct {
	Account, Scope, Table, PrimKey uint64
	Payer                          uint64
	Deletion                       bool
	Data                           []byte
}

type TableIndex

type TableIndex struct {
	AtBlockNum uint32
	Squelched  uint32
	Map        map[string]uint32 // Map[primaryKey] => blockNum
}

func NewTableIndex

func NewTableIndex() *TableIndex

func NewTableIndexFromBinary

func NewTableIndexFromBinary(ctx context.Context, tableKey string, atBlockNum uint32, buffer []byte) (*TableIndex, error)

func (*TableIndex) MarshalBinary

func (index *TableIndex) MarshalBinary(ctx context.Context, tableKey string) ([]byte, error)

func (*TableIndex) String

func (index *TableIndex) String() string

type TableRow

type TableRow struct {
	Key      uint64
	Payer    uint64
	Data     []byte
	BlockNum uint32
}

type TableScopeRow

type TableScopeRow struct {
	Account, Scope, Table uint64
	Deletion              bool
	Payer                 uint64
}

type WriteRequest

type WriteRequest struct {
	ABIs []*ABIRow

	AuthLinks   []*AuthLinkRow
	KeyAccounts []*KeyAccountRow
	TableDatas  []*TableDataRow
	TableScopes []*TableScopeRow

	BlockNum uint32
	BlockID  []byte
}

func (*WriteRequest) AllWritableRows

func (req *WriteRequest) AllWritableRows() (out []writableRow)

Directories

Path Synopsis
app
kv

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL