Documentation ¶
Overview ¶
Package ingest contains the new ingestion system for millennium. It currently runs completely independent of the old one, that means that the new system can be ledgers behind/ahead the old system.
Index ¶
- Constants
- Variables
- type Config
- type Metrics
- type OrderBookStream
- type ParallelSystems
- type ProcessorRunner
- func (s *ProcessorRunner) DisableMemoryStatsLogging()
- func (s *ProcessorRunner) EnableMemoryStatsLogging()
- func (s *ProcessorRunner) RunAllProcessorsOnLedger(sequence uint32) (io.StatsChangeProcessorResults, io.StatsLedgerTransactionProcessorResults, ...)
- func (s *ProcessorRunner) RunHistoryArchiveIngestion(checkpointLedger uint32) (io.StatsChangeProcessorResults, error)
- func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger uint32) (io.StatsLedgerTransactionProcessorResults, error)
- func (s *ProcessorRunner) SetHistoryAdapter(historyAdapter adapters.HistoryArchiveAdapterInterface)
- func (s *ProcessorRunner) SetLedgerBackend(ledgerBackend ledgerbackend.LedgerBackend)
- type ProcessorRunnerInterface
- type System
Constants ¶
const ( // MaxSupportedProtocolVersion defines the maximum supported version of // the AiBlocks protocol. MaxSupportedProtocolVersion = 15 // CurrentVersion reflects the latest version of the ingestion // algorithm. This value is stored in KV store and is used to decide // if there's a need to reprocess the ledger state or reingest data. // // Version history: // - 1: Initial version // - 2: Added the orderbook, offers processors and distributed ingestion. // - 3: Fixed a bug that could potentialy result in invalid state // (#1722). Update the version to clear the state. // - 4: Fixed a bug in AccountSignersChanged method. // - 5: Added trust lines. // - 6: Added accounts and accounts data. // - 7: Fixes a bug in AccountSignersChanged method. // - 8: Fixes AccountSigners processor to remove preauth tx signer // when preauth tx is failed. // - 9: Fixes a bug in asset stats processor that counted unauthorized // trustlines. // - 10: Fixes a bug in meta processing (fees are now processed before // everything else). // - 11: Protocol 14: CAP-23 and CAP-33. // - 12: Trigger state rebuild due to `absTime` -> `abs_time` rename // in ClaimableBalances predicates. CurrentVersion = 12 // MaxDBConnections is the size of the postgres connection pool dedicated to Millennium ingestion: // * Ledger ingestion, // * State verifications, // * Metrics updates. MaxDBConnections = 3 )
Variables ¶
var ( // ErrReingestRangeConflict indicates that the reingest range overlaps with // millennium's most recently ingested ledger ErrReingestRangeConflict = errors.New("reingest range overlaps with millennium ingestion") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { CoreSession *db.Session AiBlocksCoreURL string AiBlocksCoreCursor string EnableCaptiveCore bool AiBlocksCoreBinaryPath string AiBlocksCoreConfigPath string RemoteCaptiveCoreURL string NetworkPassphrase string HistorySession *db.Session HistoryArchiveURL string DisableStateVerification bool MaxReingestRetries int ReingestRetryBackoffSeconds int }
type Metrics ¶
type Metrics struct { // LedgerIngestionDuration exposes timing metrics about the rate and // duration of ledger ingestion (including updating DB and graph). LedgerIngestionDuration prometheus.Summary // StateVerifyDuration exposes timing metrics about the rate and // duration of state verification. StateVerifyDuration prometheus.Summary // StateInvalidGauge exposes state invalid metric. 1 if state is invalid, // 0 otherwise. StateInvalidGauge prometheus.GaugeFunc // LedgerStatsCounter exposes ledger stats counters (like number of ops/changes). LedgerStatsCounter *prometheus.CounterVec }
type OrderBookStream ¶
type OrderBookStream struct { // LatestLedgerGauge exposes the local (order book graph) // latest processed ledger LatestLedgerGauge prometheus.Gauge // contains filtered or unexported fields }
OrderBookStream updates an in memory graph to be consistent with offers in the Millennium DB. Any offers which are created, modified, or removed from the Millennium DB during ingestion will be applied to the in memory order book graph. OrderBookStream assumes that no other component will update the in memory graph. However, it is safe for other go routines to use the in memory graph for read operations.
func NewOrderBookStream ¶
func NewOrderBookStream(historyQ history.IngestionQ, graph orderbook.OBGraph) *OrderBookStream
NewOrderBookStream constructs and initializes an OrderBookStream instance
func (*OrderBookStream) Run ¶
func (o *OrderBookStream) Run(ctx context.Context)
Run will call Update() every 30 seconds until the given context is terminated.
func (*OrderBookStream) Update ¶
func (o *OrderBookStream) Update() error
Update will query the Millennium DB for offers which have been created, removed, or updated since the last time Update() was called. Those changes will then be applied to the in memory order book graph. After calling this function, the the in memory order book graph should be consistent with the Millennium DB (assuming no error is returned).
type ParallelSystems ¶
type ParallelSystems struct {
// contains filtered or unexported fields
}
func NewParallelSystems ¶
func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error)
func (*ParallelSystems) ReingestRange ¶
func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeSuggestion uint32) error
type ProcessorRunner ¶
type ProcessorRunner struct {
// contains filtered or unexported fields
}
func (*ProcessorRunner) DisableMemoryStatsLogging ¶
func (s *ProcessorRunner) DisableMemoryStatsLogging()
func (*ProcessorRunner) EnableMemoryStatsLogging ¶
func (s *ProcessorRunner) EnableMemoryStatsLogging()
func (*ProcessorRunner) RunAllProcessorsOnLedger ¶
func (s *ProcessorRunner) RunAllProcessorsOnLedger(sequence uint32) (io.StatsChangeProcessorResults, io.StatsLedgerTransactionProcessorResults, error)
func (*ProcessorRunner) RunHistoryArchiveIngestion ¶
func (s *ProcessorRunner) RunHistoryArchiveIngestion(checkpointLedger uint32) (io.StatsChangeProcessorResults, error)
func (*ProcessorRunner) RunTransactionProcessorsOnLedger ¶
func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger uint32) (io.StatsLedgerTransactionProcessorResults, error)
func (*ProcessorRunner) SetHistoryAdapter ¶
func (s *ProcessorRunner) SetHistoryAdapter(historyAdapter adapters.HistoryArchiveAdapterInterface)
func (*ProcessorRunner) SetLedgerBackend ¶
func (s *ProcessorRunner) SetLedgerBackend(ledgerBackend ledgerbackend.LedgerBackend)
type ProcessorRunnerInterface ¶
type ProcessorRunnerInterface interface { SetLedgerBackend(ledgerBackend ledgerbackend.LedgerBackend) SetHistoryAdapter(historyAdapter adapters.HistoryArchiveAdapterInterface) EnableMemoryStatsLogging() DisableMemoryStatsLogging() RunHistoryArchiveIngestion(checkpointLedger uint32) (io.StatsChangeProcessorResults, error) RunTransactionProcessorsOnLedger(sequence uint32) (io.StatsLedgerTransactionProcessorResults, error) RunAllProcessorsOnLedger(sequence uint32) ( io.StatsChangeProcessorResults, io.StatsLedgerTransactionProcessorResults, error, ) }