README
¶
FluxDB
An historical database for tabular data with loader for EOSIO state table & permissions.
Important While this is used in production currently at dfuse, this repository will undergo a major refactoring in the upcoming months. We will revisit the project, its design, its role and the overall architecture. It will most probably be split into smaller pieces, a core module that does storage of historical data in a really agnostic fashion, even blockchain agnostic and a second component that uses the core API to implement the current set of features in EOSIO.
Installation
Install through dfuse for EOSIO
Usage
Assuming the following has been run already:
export FLUXDB_BASE_URL=https://mainnet.eos.dfuse.io
Get currency balances for user's account eoscanada
(specified as scope
) on various currencies (specified as accounts
).
curl "$FLUXDB_BASE_URL/v0/state/tables/accounts?block_num=26415000&accounts=eosio.token|eosadddddddd|tokenbyeocat|ethsidechain|epraofficial|alibabapoole|hirevibeshvt|oo1122334455|irespotokens|publytoken11|parslseed123|trybenetwork|zkstokensr4u&scope=eoscanadacom&table=accounts&json=true&&token=$DFUSE" | jq .
Features
FluxDB supports parallel ingestion by doing a first sharding pass on the chain history, splitting the different tables being mutated in different shards, then ingesting linearly each table.
This allows ingestion of the whole history in a few hours.
Documentation
See the /v0/state
endpoints under https://docs.dfuse.io/reference/eosio/rest/
Contributing
Issues and PR related to FluxDB belong to this repository
See the dfuse-wide contribution guide if you wish to contribute to this code base.
Documentation
¶
Index ¶
- Variables
- func AppBlockNumHigherThanHeadBlockError(ctx context.Context, chosenBlockNum, headBlockNum, lastWrittenBlockNum uint32) *derr.ErrorResponse
- func AppBlockNumHigherThanLIBError(ctx context.Context, chosenBlockNum, lastWrittenBlockNum uint32) *derr.ErrorResponse
- func BlockNum(blockID string) uint32
- func BuildReprocessingPipeline(handler bstream.Handler, blocksStore dstore.Store, startBlockNum uint64, ...) bstream.Source
- func DataABINotFoundError(ctx context.Context, account string, blockNum uint32) *derr.ErrorResponse
- func DataDecodingRowError(ctx context.Context, hexData string) *derr.ErrorResponse
- func DataPublicKeyNotFoundError(ctx context.Context, publicKey string) *derr.ErrorResponse
- func DataRowNotFoundError(ctx context.Context, primaryKey string) *derr.ErrorResponse
- func DataTableNotFoundError(ctx context.Context, account eos.AccountName, table eos.TableName) *derr.ErrorResponse
- func EN(name string) uint64
- func HexBlockNum(blockNum uint32) string
- func HexName(name uint64) string
- func HexRevBlockNum(blockNum uint32) string
- func K(key string) []byte
- func N(name string) uint64
- func NameToString(in uint64) string
- func NewKVStore(dsnString string) (store.KVStore, error)
- func PreprocessBlock(rawBlk *bstream.Block) (interface{}, error)
- func StringToName(s string) (val uint64, err error)
- type ABIRow
- type AuthLinkRow
- type FluxDB
- func (fdb *FluxDB) BuildPipeline(getBlockID bstream.EternalSourceStartBackAtBlock, handler bstream.Handler, ...)
- func (fdb *FluxDB) CheckCleanDBForSharding() error
- func (fdb *FluxDB) Close() error
- func (fdb *FluxDB) FetchLastWrittenBlock(ctx context.Context) (lastWrittenBlock bstream.BlockRef, err error)
- func (fdb *FluxDB) GetABI(ctx context.Context, blockNum uint32, account uint64, ...) (out *ABIRow, err error)
- func (fdb *FluxDB) HasSeenPublicKeyOnce(ctx context.Context, publicKey string) (exists bool, err error)
- func (fdb *FluxDB) HasSeenTableOnce(ctx context.Context, account eos.AccountName, table eos.TableName) (exists bool, err error)
- func (fdb *FluxDB) IndexTables(ctx context.Context) error
- func (fdb *FluxDB) IsReady() bool
- func (fdb *FluxDB) IsSharding() bool
- func (fdb *FluxDB) Launch(devMode bool, httpListenAddr string)
- func (fdb *FluxDB) ReadKeyAccounts(ctx context.Context, blockNum uint32, publicKey string, ...) (accountNames []eos.AccountName, err error)
- func (fdb *FluxDB) ReadLinkedPermissions(ctx context.Context, blockNum uint32, account eos.AccountName, ...) (resp []*LinkedPermission, err error)
- func (fdb *FluxDB) ReadTable(ctx context.Context, r *ReadTableRequest) (resp *ReadTableResponse, err error)
- func (fdb *FluxDB) ReadTableRow(ctx context.Context, r *ReadTableRowRequest) (resp *ReadTableRowResponse, err error)
- func (fdb *FluxDB) ReadTableScopes(ctx context.Context, blockNum uint32, account eos.AccountName, ...) (scopes []eos.Name, err error)
- func (fdb *FluxDB) SetReady()
- func (fdb *FluxDB) SetSharding(shardIndex, shardCount int)
- func (fdb *FluxDB) SetStopBlock(stopBlock uint32)
- func (fdb *FluxDB) VerifyAllShardsWritten() error
- func (fdb *FluxDB) WriteBatch(ctx context.Context, w []*WriteRequest) error
- type FluxDBHandler
- func (p *FluxDBHandler) EnableWriteOnEachIrreversibleStep()
- func (p *FluxDBHandler) EnableWrites()
- func (p *FluxDBHandler) FetchSpeculativeWrites(ctx context.Context, headBlockID string, upToBlockNum uint32) (speculativeWrites []*WriteRequest)
- func (p *FluxDBHandler) HeadBlock(ctx context.Context) bstream.BlockRef
- func (p *FluxDBHandler) InitializeStartBlockID() (startBlock bstream.BlockRef, err error)
- func (p *FluxDBHandler) ProcessBlock(rawBlk *bstream.Block, rawObj interface{}) error
- type KeyAccountRow
- type LinkedPermission
- type ReadTableRequest
- type ReadTableResponse
- type ReadTableRowRequest
- type ReadTableRowResponse
- type ShardInjector
- type Sharder
- type TableDataRow
- type TableIndex
- type TableRow
- type TableScopeRow
- type WriteRequest
Constants ¶
This section is empty.
Variables ¶
var ErrCleanSourceStop = errors.New("clean source stop")
Functions ¶
func AppBlockNumHigherThanHeadBlockError ¶
func AppBlockNumHigherThanHeadBlockError(ctx context.Context, chosenBlockNum, headBlockNum, lastWrittenBlockNum uint32) *derr.ErrorResponse
func AppBlockNumHigherThanLIBError ¶
func AppBlockNumHigherThanLIBError(ctx context.Context, chosenBlockNum, lastWrittenBlockNum uint32) *derr.ErrorResponse
func DataABINotFoundError ¶
func DataDecodingRowError ¶
func DataDecodingRowError(ctx context.Context, hexData string) *derr.ErrorResponse
func DataPublicKeyNotFoundError ¶
func DataPublicKeyNotFoundError(ctx context.Context, publicKey string) *derr.ErrorResponse
func DataRowNotFoundError ¶
func DataRowNotFoundError(ctx context.Context, primaryKey string) *derr.ErrorResponse
account eos.AccountName, table eos.TableName,
func DataTableNotFoundError ¶
func DataTableNotFoundError(ctx context.Context, account eos.AccountName, table eos.TableName) *derr.ErrorResponse
func HexBlockNum ¶
func HexRevBlockNum ¶
func NameToString ¶
func NewKVStore ¶
NewKVStore creates the underlying KV store engine base on the DSN string received.
This exists in `fluxdb` package since it's shared between `app` and `cmd` packages.
func PreprocessBlock ¶
func StringToName ¶
Types ¶
type AuthLinkRow ¶
type FluxDB ¶
type FluxDB struct { *shutter.Shutter SpeculativeWritesFetcher func(ctx context.Context, headBlockID string, upToBlockNum uint32) (speculativeWrites []*WriteRequest) HeadBlock func(ctx context.Context) bstream.BlockRef // contains filtered or unexported fields }
func (*FluxDB) BuildPipeline ¶
func (*FluxDB) CheckCleanDBForSharding ¶
func (*FluxDB) FetchLastWrittenBlock ¶
func (*FluxDB) HasSeenPublicKeyOnce ¶
func (*FluxDB) HasSeenTableOnce ¶
func (*FluxDB) IsSharding ¶
func (*FluxDB) ReadKeyAccounts ¶
func (fdb *FluxDB) ReadKeyAccounts( ctx context.Context, blockNum uint32, publicKey string, speculativeWrites []*WriteRequest, ) (accountNames []eos.AccountName, err error)
func (*FluxDB) ReadLinkedPermissions ¶
func (fdb *FluxDB) ReadLinkedPermissions(ctx context.Context, blockNum uint32, account eos.AccountName, speculativeWrites []*WriteRequest) (resp []*LinkedPermission, err error)
func (*FluxDB) ReadTable ¶
func (fdb *FluxDB) ReadTable(ctx context.Context, r *ReadTableRequest) (resp *ReadTableResponse, err error)
func (*FluxDB) ReadTableRow ¶
func (fdb *FluxDB) ReadTableRow(ctx context.Context, r *ReadTableRowRequest) (resp *ReadTableRowResponse, err error)
func (*FluxDB) ReadTableScopes ¶
func (*FluxDB) SetReady ¶
func (fdb *FluxDB) SetReady()
SetReady marks the process as ready, meaning it has crossed the "close to real-time" threshold.
func (*FluxDB) SetSharding ¶
func (*FluxDB) SetStopBlock ¶
func (*FluxDB) VerifyAllShardsWritten ¶
func (*FluxDB) WriteBatch ¶
func (fdb *FluxDB) WriteBatch(ctx context.Context, w []*WriteRequest) error
type FluxDBHandler ¶
type FluxDBHandler struct {
// contains filtered or unexported fields
}
FluxDBHandler is a pipeline that writes in FluxDB
func NewHandler ¶
func NewHandler(db *FluxDB) *FluxDBHandler
func (*FluxDBHandler) EnableWriteOnEachIrreversibleStep ¶
func (p *FluxDBHandler) EnableWriteOnEachIrreversibleStep()
func (*FluxDBHandler) EnableWrites ¶
func (p *FluxDBHandler) EnableWrites()
func (*FluxDBHandler) FetchSpeculativeWrites ¶
func (p *FluxDBHandler) FetchSpeculativeWrites(ctx context.Context, headBlockID string, upToBlockNum uint32) (speculativeWrites []*WriteRequest)
func (*FluxDBHandler) HeadBlock ¶
func (p *FluxDBHandler) HeadBlock(ctx context.Context) bstream.BlockRef
func (*FluxDBHandler) InitializeStartBlockID ¶
func (p *FluxDBHandler) InitializeStartBlockID() (startBlock bstream.BlockRef, err error)
func (*FluxDBHandler) ProcessBlock ¶
func (p *FluxDBHandler) ProcessBlock(rawBlk *bstream.Block, rawObj interface{}) error
type KeyAccountRow ¶
type LinkedPermission ¶
type ReadTableRequest ¶
type ReadTableRequest struct {
Account, Scope, Table uint64
Key *uint64
BlockNum uint32
Offset, Limit *uint32
SpeculativeWrites []*WriteRequest
}
type ReadTableResponse ¶
type ReadTableRowRequest ¶
type ReadTableRowRequest struct { ReadTableRequest PrimaryKey uint64 }
type ReadTableRowResponse ¶
type ShardInjector ¶
type ShardInjector struct {
// contains filtered or unexported fields
}
func NewShardInjector ¶
func NewShardInjector(shardsStore dstore.Store, db *FluxDB) *ShardInjector
func (*ShardInjector) Run ¶
func (s *ShardInjector) Run() (err error)
type TableDataRow ¶
type TableIndex ¶
type TableIndex struct { AtBlockNum uint32 Squelched uint32 Map map[string]uint32 // Map[primaryKey] => blockNum }
func NewTableIndex ¶
func NewTableIndex() *TableIndex
func NewTableIndexFromBinary ¶
func (*TableIndex) MarshalBinary ¶
func (*TableIndex) String ¶
func (index *TableIndex) String() string
type TableScopeRow ¶
type WriteRequest ¶
type WriteRequest struct { ABIs []*ABIRow AuthLinks []*AuthLinkRow KeyAccounts []*KeyAccountRow TableDatas []*TableDataRow TableScopes []*TableScopeRow BlockNum uint32 BlockID []byte }
func (*WriteRequest) AllWritableRows ¶
func (req *WriteRequest) AllWritableRows() (out []writableRow)