Documentation ¶
Overview ¶
Package ingest contains the new ingestion system for aurora. It currently runs completely independent of the old one, that means that the new system can be ledgers behind/ahead the old system.
Index ¶
- Constants
- type Config
- type ErrReingestRangeConflict
- type Metrics
- type MockFilters
- type OrderBookStream
- type ParallelSystems
- type ProcessorRunner
- func (s *ProcessorRunner) DisableMemoryStatsLogging()
- func (s *ProcessorRunner) EnableMemoryStatsLogging()
- func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (stats ledgerStats, err error)
- func (s *ProcessorRunner) RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error)
- func (s *ProcessorRunner) RunHistoryArchiveIngestion(checkpointLedger uint32, skipChecks bool, ledgerProtocolVersion uint32, ...) (ingest.StatsChangeProcessorResults, error)
- func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta, execInTx bool) (err error)
- func (s *ProcessorRunner) SetHistoryAdapter(historyAdapter historyArchiveAdapterInterface)
- type ProcessorRunnerInterface
- type State
- type System
Constants ¶
const ( // MaxSupportedProtocolVersion defines the maximum supported version of // the Hcnet protocol. MaxSupportedProtocolVersion uint32 = 21 // CurrentVersion reflects the latest version of the ingestion // algorithm. This value is stored in KV store and is used to decide // if there's a need to reprocess the ledger state or reingest data. // // Version history: // - 1: Initial version // - 2: Added the orderbook, offers processors and distributed ingestion. // - 3: Fixed a bug that could potentially result in invalid state // (#1722). Update the version to clear the state. // - 4: Fixed a bug in AccountSignersChanged method. // - 5: Added trust lines. // - 6: Added accounts and accounts data. // - 7: Fixes a bug in AccountSignersChanged method. // - 8: Fixes AccountSigners processor to remove preauth tx signer // when preauth tx is failed. // - 9: Fixes a bug in asset stats processor that counted unauthorized // trustlines. // - 10: Fixes a bug in meta processing (fees are now processed before // everything else). // - 11: Protocol 14: CAP-23 and CAP-33. // - 12: Trigger state rebuild due to `absTime` -> `abs_time` rename // in ClaimableBalances predicates. // - 13: Trigger state rebuild to include more than just authorized assets. // - 14: Trigger state rebuild to include claimable balances in the asset stats processor. // - 15: Fixed bug in asset stat ingestion where clawback is enabled (#3846). // - 16: Extract claimants to a separate table for better performance of // claimable balances for claimant queries. // - 17: Add contract_id column to exp_asset_stats table which is derived by ingesting // contract data ledger entries. // - 18: Ingest contract asset balances so we can keep track of expired / restore asset // balances for asset stats. CurrentVersion = 18 // MaxDBConnections is the size of the postgres connection pool dedicated to Aurora ingestion: // * Ledger ingestion, // * State verifications, // * Metrics updates. MaxDBConnections = 3 // 100 ledgers per flush has shown in stress tests // to be best point on performance curve, default to that. MaxLedgersPerFlush uint32 = 100 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { HcnetCoreURL string CaptiveCoreBinaryPath string CaptiveCoreStoragePath string CaptiveCoreToml *ledgerbackend.CaptiveCoreToml CaptiveCoreConfigUseDB bool NetworkPassphrase string HistorySession db.SessionInterface HistoryArchiveURLs []string HistoryArchiveCaching bool DisableStateVerification bool EnableReapLookupTables bool EnableExtendedLogLedgerStats bool MaxReingestRetries int ReingestRetryBackoffSeconds int // The checkpoint frequency will be 64 unless you are using an exotic test setup. CheckpointFrequency uint32 StateVerificationCheckpointFrequency uint32 StateVerificationTimeout time.Duration RoundingSlippageFilter int MaxLedgerPerFlush uint32 SkipTxmeta bool }
type ErrReingestRangeConflict ¶ added in v1.11.1
type ErrReingestRangeConflict struct {
// contains filtered or unexported fields
}
ErrReingestRangeConflict indicates that the reingest range overlaps with aurora's most recently ingested ledger
func (ErrReingestRangeConflict) Error ¶ added in v1.11.1
func (e ErrReingestRangeConflict) Error() string
type Metrics ¶ added in v1.11.1
type Metrics struct { // MaxSupportedProtocolVersion exposes the maximum protocol version // supported by this version. MaxSupportedProtocolVersion prometheus.Gauge // LocalLedger exposes the last ingested ledger by this ingesting instance. LocalLatestLedger prometheus.Gauge // LedgerIngestionDuration exposes timing metrics about the rate and // duration of ledger ingestion (including updating DB and graph). LedgerIngestionDuration prometheus.Summary // LedgerIngestionTradeAggregationDuration exposes timing metrics about the rate and // duration of rebuilding trade aggregation buckets. LedgerIngestionTradeAggregationDuration prometheus.Summary // LedgerIngestionReapLookupTablesDuration exposes timing metrics about the rate and // duration of reaping lookup tables. LedgerIngestionReapLookupTablesDuration prometheus.Summary // StateVerifyDuration exposes timing metrics about the rate and // duration of state verification. StateVerifyDuration prometheus.Summary // StateInvalidGauge exposes state invalid metric. 1 if state is invalid, // 0 otherwise. StateInvalidGauge prometheus.GaugeFunc // StateVerifyLedgerEntriesCount exposes total number of ledger entries // checked by the state verifier by type. StateVerifyLedgerEntriesCount *prometheus.GaugeVec // LedgerStatsCounter exposes ledger stats counters (like number of ops/changes). LedgerStatsCounter *prometheus.CounterVec // ProcessorsRunDuration exposes processors run durations. // Deprecated in favor of: ProcessorsRunDurationSummary. ProcessorsRunDuration *prometheus.CounterVec // ProcessorsRunDurationSummary exposes processors run durations. ProcessorsRunDurationSummary *prometheus.SummaryVec // LoadersRunDurationSummary exposes run durations for the ingestion loaders. LoadersRunDurationSummary *prometheus.SummaryVec // LoadersRunDurationSummary exposes stats for the ingestion loaders. LoadersStatsSummary *prometheus.SummaryVec // ArchiveRequestCounter counts how many http requests are sent to history server HistoryArchiveStatsCounter *prometheus.CounterVec }
type MockFilters ¶ added in v1.11.1
func (*MockFilters) GetFilters ¶ added in v1.11.1
func (m *MockFilters) GetFilters(filterQ history.QFilter, ctx context.Context) []processors.LedgerTransactionFilterer
type OrderBookStream ¶ added in v1.11.1
type OrderBookStream struct { // LatestLedgerGauge exposes the local (order book graph) // latest processed ledger LatestLedgerGauge prometheus.Gauge // contains filtered or unexported fields }
OrderBookStream updates an in memory graph to be consistent with offers in the Aurora DB. Any offers which are created, modified, or removed from the Aurora DB during ingestion will be applied to the in memory order book graph. OrderBookStream assumes that no other component will update the in memory graph. However, it is safe for other go routines to use the in memory graph for read operations.
func NewOrderBookStream ¶ added in v1.11.1
func NewOrderBookStream(historyQ history.IngestionQ, graph orderbook.OBGraph) *OrderBookStream
NewOrderBookStream constructs and initializes an OrderBookStream instance
func (*OrderBookStream) Run ¶ added in v1.11.1
func (o *OrderBookStream) Run(ctx context.Context)
Run will call Update() every 30 seconds until the given context is terminated.
func (*OrderBookStream) Update ¶ added in v1.11.1
func (o *OrderBookStream) Update(ctx context.Context) error
Update will query the Aurora DB for offers which have been created, removed, or updated since the last time Update() was called. Those changes will then be applied to the in memory order book graph. After calling this function, the the in memory order book graph should be consistent with the Aurora DB (assuming no error is returned).
type ParallelSystems ¶ added in v1.11.1
type ParallelSystems struct {
// contains filtered or unexported fields
}
func NewParallelSystems ¶ added in v1.11.1
func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error)
func (*ParallelSystems) ReingestRange ¶ added in v1.11.1
func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, batchSizeSuggestion uint32) error
func (*ParallelSystems) Shutdown ¶ added in v1.11.1
func (ps *ParallelSystems) Shutdown()
type ProcessorRunner ¶ added in v1.11.1
type ProcessorRunner struct {
// contains filtered or unexported fields
}
func (*ProcessorRunner) DisableMemoryStatsLogging ¶ added in v1.11.1
func (s *ProcessorRunner) DisableMemoryStatsLogging()
func (*ProcessorRunner) EnableMemoryStatsLogging ¶ added in v1.11.1
func (s *ProcessorRunner) EnableMemoryStatsLogging()
func (*ProcessorRunner) RunAllProcessorsOnLedger ¶ added in v1.11.1
func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( stats ledgerStats, err error, )
func (*ProcessorRunner) RunGenesisStateIngestion ¶ added in v1.11.1
func (s *ProcessorRunner) RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error)
func (*ProcessorRunner) RunHistoryArchiveIngestion ¶ added in v1.11.1
func (s *ProcessorRunner) RunHistoryArchiveIngestion( checkpointLedger uint32, skipChecks bool, ledgerProtocolVersion uint32, bucketListHash xdr.Hash, ) (ingest.StatsChangeProcessorResults, error)
func (*ProcessorRunner) RunTransactionProcessorsOnLedgers ¶ added in v1.11.1
func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta, execInTx bool) (err error)
Runs only transaction processors on the inbound list of ledgers. Updates history tables based on transactions. Intentionally do not make effort to insert or purge tx's on history_transactions_filtered_tmp Thus, using this method does not support tx sub processing for the ledgers passed in, i.e. tx submission queue will not see these.
func (*ProcessorRunner) SetHistoryAdapter ¶ added in v1.11.1
func (s *ProcessorRunner) SetHistoryAdapter(historyAdapter historyArchiveAdapterInterface)
type ProcessorRunnerInterface ¶ added in v1.11.1
type ProcessorRunnerInterface interface { SetHistoryAdapter(historyAdapter historyArchiveAdapterInterface) EnableMemoryStatsLogging() DisableMemoryStatsLogging() RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error) RunHistoryArchiveIngestion( checkpointLedger uint32, skipChecks bool, ledgerProtocolVersion uint32, bucketListHash xdr.Hash, ) (ingest.StatsChangeProcessorResults, error) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta, execInTx bool) error RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( stats ledgerStats, err error, ) }
type System ¶
type System interface { Run() RegisterMetrics(*prometheus.Registry) Metrics() Metrics StressTest(numTransactions, changesPerTransaction int) error VerifyRange(fromLedger, toLedger uint32, verifyState bool) error BuildState(sequence uint32, skipChecks bool) error ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error BuildGenesisState() error Shutdown() GetCurrentState() State RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error }