Documentation ¶
Overview ¶
Package ingest contains the ingestion system for horizon. This system takes data produced by the connected stellar-core database, transforms it and inserts it into the horizon database.
Index ¶
- Constants
- type Address
- type AssetStats
- type BatchInsertBuilder
- type Config
- type Cursor
- func (c *Cursor) BeforeAndAfter(target xdr.LedgerKey) (before *xdr.LedgerEntry, after *xdr.LedgerEntry, err error)
- func (c *Cursor) InLedger() bool
- func (c *Cursor) InOperation() bool
- func (c *Cursor) InTransaction() bool
- func (c *Cursor) Ledger() *core.LedgerHeader
- func (c *Cursor) LedgerID() int64
- func (c *Cursor) LedgerRange() (start int64, end int64)
- func (c *Cursor) LedgerSequence() int32
- func (c *Cursor) NextLedger() bool
- func (c *Cursor) NextOp() bool
- func (c *Cursor) NextTx() bool
- func (c *Cursor) Operation() *xdr.Operation
- func (c *Cursor) OperationChanges() xdr.LedgerEntryChanges
- func (c *Cursor) OperationCount() int
- func (c *Cursor) OperationID() int64
- func (c *Cursor) OperationOrder() int32
- func (c *Cursor) OperationResult() *xdr.OperationResultTr
- func (c *Cursor) OperationSourceAccount() xdr.AccountId
- func (c *Cursor) OperationType() xdr.OperationType
- func (c *Cursor) Operations() []xdr.Operation
- func (c *Cursor) SuccessfulLedgerOperationCount() (ret int)
- func (c *Cursor) SuccessfulTransactionCount() (ret int)
- func (c *Cursor) Transaction() *core.Transaction
- func (c *Cursor) TransactionFee() *core.TransactionFee
- func (c *Cursor) TransactionID() int64
- func (c *Cursor) TransactionMetaBundle() *meta.Bundle
- func (c *Cursor) TransactionSourceAccount() xdr.AccountId
- type EffectIngestion
- type IngesterMetrics
- type Ingestion
- func (ingest *Ingestion) Clear(start int64, end int64) error
- func (ingest *Ingestion) ClearAll() error
- func (ingest *Ingestion) Close() error
- func (ingest *Ingestion) Effect(address Address, opid int64, order int, typ history.EffectType, ...) error
- func (ingest *Ingestion) Flush() error
- func (ingest *Ingestion) Ledger(id int64, header *core.LedgerHeader, txs int, ops int)
- func (ingest *Ingestion) Operation(id int64, txid int64, order int32, source xdr.AccountId, typ xdr.OperationType, ...) error
- func (ingest *Ingestion) OperationParticipants(op int64, aids []xdr.AccountId)
- func (ingest *Ingestion) Rollback() (err error)
- func (ingest *Ingestion) Start() (err error)
- func (ingest *Ingestion) Trade(opid int64, order int32, buyer xdr.AccountId, trade xdr.ClaimOfferAtom, ...) error
- func (ingest *Ingestion) Transaction(id int64, tx *core.Transaction, fee *core.TransactionFee)
- func (ingest *Ingestion) TransactionParticipants(tx int64, aids []xdr.AccountId)
- func (ingest *Ingestion) UpdateAccountIDs(tables []TableName) error
- type LedgerBundle
- type Session
- type System
- func (i *System) Backfill(n uint) error
- func (i *System) ClearAll() error
- func (i *System) RebaseHistory() error
- func (i *System) ReingestAll() (int, error)
- func (i *System) ReingestOutdated() (n int, err error)
- func (i *System) ReingestRange(start, end int32) (int, error)
- func (i *System) ReingestSingle(sequence int32) error
- func (i *System) Tick() *Session
- type TableName
Constants ¶
const ( // CurrentVersion reflects the latest version of the ingestion // algorithm. As rows are ingested into the horizon database, this version is // used to tag them. In the future, any breaking changes introduced by a // developer should be accompanied by an increase in this value. // // Scripts, that have yet to be ported to this codebase can then be leveraged // to re-ingest old data with the new algorithm, providing a seamless // transition when the ingested data's structure changes. CurrentVersion = 14 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Address ¶
type Address string
Address is a type of a param provided to BatchInsertBuilder that gets exchanged to record ID in a DB.
type AssetStats ¶
type AssetStats struct { CoreSession *db.Session HistorySession *db.Session // contains filtered or unexported fields }
AssetStats tracks and updates all the assets modified during a cycle of ingestion.
func (*AssetStats) AddAllAssetsFromCore ¶
func (assetStats *AssetStats) AddAllAssetsFromCore() (int, error)
AddAllAssets adds all assets to update list. Used in initialization of stats.
func (*AssetStats) IngestOperation ¶
IngestOperation updates the assetsModified using the passed in operation
func (*AssetStats) UpdateAssetStats ¶
func (assetStats *AssetStats) UpdateAssetStats() error
UpdateAssetStats updates the db with the latest asset stats for the assets that were modified
type BatchInsertBuilder ¶
type BatchInsertBuilder struct { TableName TableName Columns []string // contains filtered or unexported fields }
BatchInsertBuilder works like sq.InsertBuilder but has a better support for batching large number of rows.
func (*BatchInsertBuilder) GetAddresses ¶
func (b *BatchInsertBuilder) GetAddresses() (adds []Address)
func (*BatchInsertBuilder) ReplaceAddressesWithIDs ¶
func (b *BatchInsertBuilder) ReplaceAddressesWithIDs(mapping map[Address]int64)
func (*BatchInsertBuilder) Values ¶
func (b *BatchInsertBuilder) Values(params ...interface{})
type Config ¶
type Config struct { // EnableAssetStats is a feature flag that determines whether to calculate // asset stats in this ingestion system. EnableAssetStats bool }
Config allows passing some configuration values to System and Session.
type Cursor ¶
type Cursor struct { // FirstLedger is the beginning of the range of ledgers (inclusive) that will // attempt to be ingested in this session. FirstLedger int32 // LastLedger is the end of the range of ledgers (inclusive) that will // attempt to be ingested in this session. LastLedger int32 // CoreDB is the stellar-core db that data is ingested from. CoreDB *db.Session Metrics *IngesterMetrics AssetStats *AssetStats // Err is the error that caused this iteration to fail, if any. Err error // contains filtered or unexported fields }
Cursor iterates through a stellar core database's ledgers
func (*Cursor) BeforeAndAfter ¶
func (c *Cursor) BeforeAndAfter(target xdr.LedgerKey) ( before *xdr.LedgerEntry, after *xdr.LedgerEntry, err error, )
BeforeAndAfter loads the ledger entry for `target` before the current operation was applied and after the operation was applied.
func (*Cursor) InOperation ¶
InOperation returns true if the cursor is on a operation. Will return false after advancing to a new transaction but before advancing on to the transaciton's first operation.
func (*Cursor) InTransaction ¶
InTransaction returns true if the cursor is pointing to a transaction. This will return false after advancing to a new ledger but prior to advancing into the ledger's first transaction.
func (*Cursor) Ledger ¶
func (c *Cursor) Ledger() *core.LedgerHeader
Ledger returns the current ledger
func (*Cursor) LedgerRange ¶
LedgerRange returns the beginning and end of id values that map to the current ledger. Useful for clearing a ledgers worth of data.
func (*Cursor) LedgerSequence ¶
LedgerSequence returns the current ledger's sequence
func (*Cursor) NextLedger ¶
NextLedger advances `c` to the next ledger in the iteration, loading a new LedgerBundle from the core database. Returns false if an error occurs or the iteration is complete.
func (*Cursor) NextOp ¶
NextOp advances `c` to the next operation in the current transaction. Returns false if the current transaction has nothing left to visit.
func (*Cursor) NextTx ¶
NextTx advances `c` to the next transaction in the current ledger. Returns false if the current ledger has no transactions left to visit.
func (*Cursor) OperationChanges ¶
func (c *Cursor) OperationChanges() xdr.LedgerEntryChanges
OperationChanges returns all of LedgerEntryChanges that occurred in the course of applying the current operation.
func (*Cursor) OperationCount ¶
OperationCount returns the count of operations in the current transaction
func (*Cursor) OperationID ¶
OperationID returns the current operations id, as used by the history system.
func (*Cursor) OperationOrder ¶
OperationOrder returns the order of the current operation amongst the current transaction's operations.
func (*Cursor) OperationResult ¶
func (c *Cursor) OperationResult() *xdr.OperationResultTr
OperationResult returns the current operation's result record
func (*Cursor) OperationSourceAccount ¶
OperationSourceAccount returns the current operation's effective source account (i.e. default's to the transaction's source account).
func (*Cursor) OperationType ¶
func (c *Cursor) OperationType() xdr.OperationType
OperationType returns the current operation type
func (*Cursor) Operations ¶
Operations returns the current transactions operations.
func (*Cursor) SuccessfulLedgerOperationCount ¶
SuccessfulLedgerOperationCount returns the count of operations in the current ledger
func (*Cursor) SuccessfulTransactionCount ¶
SuccessfulTransactionCount returns the count of transactions in the current ledger that succeeded.
func (*Cursor) Transaction ¶
func (c *Cursor) Transaction() *core.Transaction
Transaction returns the current transaction
func (*Cursor) TransactionFee ¶
func (c *Cursor) TransactionFee() *core.TransactionFee
TransactionFee returns the txfeehistory row for the current transaction.
func (*Cursor) TransactionID ¶
TransactionID returns the current tranaction's id, as used by the history system.
func (*Cursor) TransactionMetaBundle ¶
TransactionMetaBundle provides easier access to the meta data regarding the application of the current transaction.
func (*Cursor) TransactionSourceAccount ¶
TransactionSourceAccount returns the current transaction's source account id
type EffectIngestion ¶
type EffectIngestion struct { Dest *Ingestion OperationID int64 // contains filtered or unexported fields }
EffectIngestion is a helper struct to smooth the ingestion of effects. this struct will track what the correct operation to use and order to use when adding effects into an ingestion.
func (*EffectIngestion) Add ¶
func (ei *EffectIngestion) Add(aid xdr.AccountId, typ history.EffectType, details interface{}) bool
Add writes an effect to the database while automatically tracking the index to use.
func (*EffectIngestion) Finish ¶
func (ei *EffectIngestion) Finish() error
Finish marks this ingestion as complete, returning any error that was recorded.
type IngesterMetrics ¶
type IngesterMetrics struct { ClearLedgerTimer metrics.Timer IngestLedgerTimer metrics.Timer LoadLedgerTimer metrics.Timer }
IngesterMetrics tracks all the metrics for the ingestion subsystem
type Ingestion ¶
type Ingestion struct { // DB is the sql connection to be used for writing any rows into the horizon // database. DB *db.Session // contains filtered or unexported fields }
Ingestion receives write requests from a Session
func (*Ingestion) Clear ¶
Clear removes a range of data from the history database, exclusive of the end id provided.
func (*Ingestion) Effect ¶
func (ingest *Ingestion) Effect(address Address, opid int64, order int, typ history.EffectType, details interface{}) error
Effect adds a new row into the `history_effects` table.
func (*Ingestion) Flush ¶
Flush writes the currently buffered rows to the db, and if successful starts a new transaction.
func (*Ingestion) Operation ¶
func (ingest *Ingestion) Operation( id int64, txid int64, order int32, source xdr.AccountId, typ xdr.OperationType, details map[string]interface{}, ) error
Operation ingests the provided operation data into a new row in the `history_operations` table
func (*Ingestion) OperationParticipants ¶
OperationParticipants ingests the provided accounts `aids` as participants of operation with id `op`, creating a new row in the `history_operation_participants` table.
func (*Ingestion) Trade ¶
func (ingest *Ingestion) Trade( opid int64, order int32, buyer xdr.AccountId, trade xdr.ClaimOfferAtom, ledgerClosedAt int64, ) error
Trade records a trade into the history_trades table
func (*Ingestion) Transaction ¶
func (ingest *Ingestion) Transaction( id int64, tx *core.Transaction, fee *core.TransactionFee, )
Transaction ingests the provided transaction data into a new row in the `history_transactions` table
func (*Ingestion) TransactionParticipants ¶
TransactionParticipants ingests the provided account ids as participants of transaction with id `tx`, creating a new row in the `history_transaction_participants` table.
func (*Ingestion) UpdateAccountIDs ¶
UpdateAccountIDs updates IDs of the accounts before inserting objects into DB.
type LedgerBundle ¶
type LedgerBundle struct { Sequence int32 Header core.LedgerHeader TransactionFees []core.TransactionFee Transactions []core.Transaction }
LedgerBundle represents a single ledger's worth of novelty created by one ledger close
type Session ¶
type Session struct { // Config allows passing some configuration values to System. Config Config Cursor *Cursor Ingestion *Ingestion // Network is the passphrase for the network being imported Network string // StellarCoreURL is the http endpoint of the stellar-core that data is being // ingested from. StellarCoreURL string // ClearExisting causes the session to clear existing data from the horizon db // when the session is run. ClearExisting bool // SkipCursorUpdate causes the session to skip // reporting the "last imported ledger" cursor to // stellar-core SkipCursorUpdate bool // Metrics is a reference to where the session should record its metric information Metrics *IngesterMetrics // AssetStats calculates asset stats AssetStats *AssetStats // Err is the error that caused this session to fail, if any. Err error // Ingested is the number of ledgers that were successfully ingested during // this session. Ingested int }
Session represents a single attempt at ingesting data into the history database.
type System ¶
type System struct { // Config allows passing some configuration values to System. Config Config // HorizonDB is the connection to the horizon database that ingested data will // be written to. HorizonDB *db.Session // CoreDB is the stellar-core db that data is ingested from. CoreDB *db.Session Metrics IngesterMetrics // Network is the passphrase for the network being imported Network string // StellarCoreURL is the http endpoint of the stellar-core that data is being // ingested from. StellarCoreURL string // SkipCursorUpdate causes the ingestor to skip // reporting the "last imported ledger" cursor to // stellar-core SkipCursorUpdate bool // HistoryRetentionCount is the desired minimum number of ledgers to // keep in the history database, working backwards from the latest core // ledger. 0 represents "all ledgers". HistoryRetentionCount uint // contains filtered or unexported fields }
System represents the data ingestion subsystem of horizon.
func New ¶
New initializes the ingester, causing it to begin polling the stellar-core database for now ledgers and ingesting data into the horizon database.
func (*System) Backfill ¶
Backfill ingests history in reverse chronological order, from the current horizon elder query for `n` ledgers
func (*System) ClearAll ¶
ClearAll removes all previously ingested historical data from the horizon database.
func (*System) RebaseHistory ¶
RebaseHistory re-establishes horizon's history database by clearing it, ingesting the latest ledger in stellar-core then backfilling as many ledgers as possible
func (*System) ReingestAll ¶
ReingestAll re-ingests all ledgers
func (*System) ReingestOutdated ¶
ReingestOutdated finds old ledgers and reimports them.
func (*System) ReingestRange ¶
ReingestRange reingests a range of ledgers, from `start` to `end`, inclusive.
func (*System) ReingestSingle ¶
ReingestSingle re-ingests a single ledger
type TableName ¶
type TableName string
const ( AssetStatsTableName TableName = "asset_stats" EffectsTableName TableName = "history_effects" LedgersTableName TableName = "history_ledgers" OperationParticipantsTableName TableName = "history_operation_participants" OperationsTableName TableName = "history_operations" TradesTableName TableName = "history_trades" TransactionParticipantsTableName TableName = "history_transaction_participants" TransactionsTableName TableName = "history_transactions" )
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package participants contains functions to derive a set of "participant" addresses for various data structures in the Stellar network's ledger.
|
Package participants contains functions to derive a set of "participant" addresses for various data structures in the Stellar network's ledger. |