Documentation ¶
Index ¶
- Constants
- func DefaultExcludeAddress(id flow.ChainID, address flow.Address) bool
- type AddressBatch
- type AddressProvider
- type AddressProviderConfig
- type Component
- type ComponentBase
- type Config
- func (c Config) WithBatchSize(value int) Config
- func (c Config) WithCandidateScanners(value []candidates.CandidateScanner) Config
- func (c Config) WithChainID(value flow.ChainID) Config
- func (c Config) WithContinuousScan(value bool) Config
- func (c Config) WithExcludeAddress(value func(id flow.ChainID, address flow.Address) bool) Config
- func (c Config) WithHandleScriptError(value func(AddressBatch, error) ScriptErrorAction) Config
- func (c Config) WithLogger(value zerolog.Logger) Config
- func (c Config) WithMaxConcurrentScripts(value int) Config
- func (c Config) WithScript(value []byte) Config
- func (c Config) WithScriptResultHandler(value ScriptResultHandler) Config
- func (c Config) WithStatusReporter(value StatusReporter) Config
- type DefaultStatusReporter
- func (r *DefaultStatusReporter) ReportFullScanProgress(current uint64, total uint64)
- func (r *DefaultStatusReporter) ReportIncrementalBlockDiff(diff uint64)
- func (r *DefaultStatusReporter) ReportIncrementalBlockHeight(height uint64)
- func (r *DefaultStatusReporter) ReportIsFullScanRunning(running bool)
- type FullScan
- type FullScanRunner
- type FullScanRunnerConfig
- type IncrementalScanner
- type IncrementalScannerConfig
- type NoOpScriptResultHandler
- type NoOpStatusReporter
- type ProcessedAddressBatch
- type ScanConcluded
- type Scanner
- type ScriptErrorAction
- type ScriptErrorActionExclude
- type ScriptErrorActionNone
- type ScriptErrorActionRetry
- type ScriptErrorActionSplit
- type ScriptErrorActionUnhandled
- type ScriptResultHandler
- type ScriptResultProcessor
- type ScriptRunner
- type ScriptRunnerConfig
- type StatusReporter
- type StatusReporterOption
Constants ¶
const DefaultBatchSize = 1000
const DefaultIncrementalScannerBlockLag = 5
const DefaultIncrementalScannerMaxBlockGap = 100
DefaultIncrementalScannerMaxBlockGap is the maximum number of blocks that can scanned by the incremental scanner. If the gap is larger than this, the incremental scanner will request a full scan.
const DefaultScriptRunnerMaxConcurrentScripts = 20
DefaultScriptRunnerMaxConcurrentScripts is the maximum number of scripts that can be running concurrently at any given time. If this is more than the rate limit, some scripts will be just waiting. As long as they don't wait too long, this is not a problem.
const DefaultStatusReporterPort = 2112
const FullScanReferenceBlockSwitch = 10 * time.Second
Variables ¶
This section is empty.
Functions ¶
func DefaultExcludeAddress ¶ added in v0.3.0
func DefaultExcludeAddress(id flow.ChainID, address flow.Address) bool
Types ¶
type AddressBatch ¶
type AddressBatch struct { Addresses []flow.Address BlockHeight uint64 // contains filtered or unexported fields }
AddressBatch is a batch of addresses that will be the input to the script being run byt the script runner at the given block height.
func NewAddressBatch ¶
func NewAddressBatch( addresses []flow.Address, blockHeight uint64, doneHandling func(), isValid func() bool, ) AddressBatch
func (*AddressBatch) DoneHandling ¶
func (b *AddressBatch) DoneHandling()
DoneHandling should be called when the batch has been processed.
func (*AddressBatch) ExcludeAddress ¶
func (b *AddressBatch) ExcludeAddress(address flow.Address)
func (*AddressBatch) IsValid ¶
func (b *AddressBatch) IsValid() bool
IsValid if the batch is cancelled, it should not be processed.
func (*AddressBatch) Split ¶ added in v0.3.0
func (b *AddressBatch) Split() (AddressBatch, AddressBatch)
Split splits the batch into two batches of equal size.
type AddressProvider ¶
type AddressProvider struct {
// contains filtered or unexported fields
}
AddressProvider Is used to get all the addresses that exists at a certain referenceBlockId this relies on the fact that a certain `endOfAccountsError` will be returned by the `accountStorageUsageScript` if the address doesn't exist yet
func InitAddressProvider ¶
func InitAddressProvider( ctx context.Context, chain flow.ChainID, blockHeight uint64, client client.Client, config AddressProviderConfig, log zerolog.Logger, ) (*AddressProvider, error)
InitAddressProvider uses bisection to get the last existing address.
func (*AddressProvider) AddressesLen ¶
func (p *AddressProvider) AddressesLen() uint
func (*AddressProvider) GenerateAddressBatches ¶
func (p *AddressProvider) GenerateAddressBatches(addressChan chan<- []flow.Address, batchSize int)
func (*AddressProvider) GetNextAddress ¶
func (p *AddressProvider) GetNextAddress() (address flow.Address, isOutOfBounds bool)
func (*AddressProvider) LastAddress ¶
func (p *AddressProvider) LastAddress() flow.Address
type AddressProviderConfig ¶ added in v0.3.0
type AddressProviderConfig struct {
ExcludeAddress func(id flow.ChainID, address flow.Address) bool
}
func DefaultAddressProviderConfig ¶ added in v0.3.0
func DefaultAddressProviderConfig() AddressProviderConfig
type ComponentBase ¶
func NewComponentWithStart ¶ added in v0.3.0
func NewComponentWithStart( name string, start func(ctx context.Context), logger zerolog.Logger, ) *ComponentBase
NewComponentWithStart creates a new component with a start function. The start function will be called when the component is started. The start function should not block.
func (*ComponentBase) Done ¶
func (c *ComponentBase) Done() <-chan struct{}
func (*ComponentBase) Err ¶
func (c *ComponentBase) Err() error
func (*ComponentBase) Finish ¶
func (c *ComponentBase) Finish(err error)
func (*ComponentBase) Start ¶ added in v0.3.0
func (c *ComponentBase) Start(ctx context.Context) <-chan struct{}
type Config ¶ added in v0.3.0
type Config struct { ScriptRunnerConfig FullScanRunnerConfig IncrementalScannerConfig ScriptResultHandler ScriptResultHandler Reporter StatusReporter ContinuousScan bool BatchSize int Logger zerolog.Logger }
func DefaultConfig ¶ added in v0.3.0
func DefaultConfig() Config
func (Config) WithBatchSize ¶ added in v0.3.0
func (Config) WithCandidateScanners ¶ added in v0.3.0
func (c Config) WithCandidateScanners( value []candidates.CandidateScanner, ) Config
func (Config) WithChainID ¶ added in v0.3.0
func (Config) WithContinuousScan ¶ added in v0.3.0
func (Config) WithExcludeAddress ¶ added in v0.3.0
func (Config) WithHandleScriptError ¶ added in v0.3.0
func (c Config) WithHandleScriptError( value func(AddressBatch, error) ScriptErrorAction, ) Config
func (Config) WithLogger ¶ added in v0.3.0
func (Config) WithMaxConcurrentScripts ¶ added in v0.3.0
func (Config) WithScript ¶ added in v0.3.0
func (Config) WithScriptResultHandler ¶ added in v0.3.0
func (c Config) WithScriptResultHandler( value ScriptResultHandler, ) Config
func (Config) WithStatusReporter ¶ added in v0.3.0
func (c Config) WithStatusReporter( value StatusReporter, ) Config
type DefaultStatusReporter ¶ added in v0.3.0
type DefaultStatusReporter struct { *ComponentBase // contains filtered or unexported fields }
func NewStatusReporter ¶
func NewStatusReporter( namespace string, logger zerolog.Logger, options ...StatusReporterOption, ) *DefaultStatusReporter
NewStatusReporter creates a new status reporter that reports the status of the indexer to prometheus. It will start a http server on the given port that exposes the metrics (unless this is disabled for the case where you would want to serve metrics yourself). the namespace is used to namespace all metrics. The status reporter will report: - the incremental block diff (the difference between the last block height handled by the incremental scanner and the current block height) - the incremental block height (the block height last handled by the incremental scanner) - if a full scan is currently running (if it is any data the scanner is tracking is inaccurate) - if a full scan is currently running, the progress of the full scan (from 0 to 1)
func (*DefaultStatusReporter) ReportFullScanProgress ¶ added in v0.3.0
func (r *DefaultStatusReporter) ReportFullScanProgress(current uint64, total uint64)
func (*DefaultStatusReporter) ReportIncrementalBlockDiff ¶ added in v0.3.0
func (r *DefaultStatusReporter) ReportIncrementalBlockDiff(diff uint64)
func (*DefaultStatusReporter) ReportIncrementalBlockHeight ¶ added in v0.3.0
func (r *DefaultStatusReporter) ReportIncrementalBlockHeight(height uint64)
func (*DefaultStatusReporter) ReportIsFullScanRunning ¶ added in v0.3.0
func (r *DefaultStatusReporter) ReportIsFullScanRunning(running bool)
type FullScan ¶
type FullScan struct { *ComponentBase // contains filtered or unexported fields }
type FullScanRunner ¶
type FullScanRunner struct { FullScanRunnerConfig // contains filtered or unexported fields }
func NewFullScanRunner ¶
func NewFullScanRunner( client client.Client, addressBatchChan chan<- AddressBatch, batchSize int, config FullScanRunnerConfig, reporter StatusReporter, logger zerolog.Logger, ) *FullScanRunner
func (*FullScanRunner) NewBatch ¶ added in v0.3.0
func (r *FullScanRunner) NewBatch( blockHeight uint64, ) *FullScan
type FullScanRunnerConfig ¶ added in v0.3.0
type FullScanRunnerConfig struct { AddressProviderConfig ChainID flow.ChainID }
func DefaultFullScanRunnerConfig ¶ added in v0.3.0
func DefaultFullScanRunnerConfig() FullScanRunnerConfig
type IncrementalScanner ¶
type IncrementalScanner struct { *ComponentBase IncrementalScannerConfig // contains filtered or unexported fields }
func NewIncrementalScanner ¶
func NewIncrementalScanner( client client.Client, addressBatchChan chan<- AddressBatch, requestBatchChan chan<- uint64, batchSize int, config IncrementalScannerConfig, reporter StatusReporter, logger zerolog.Logger, ) *IncrementalScanner
func (*IncrementalScanner) LatestHandledBlock ¶
func (r *IncrementalScanner) LatestHandledBlock() uint64
type IncrementalScannerConfig ¶ added in v0.3.0
type IncrementalScannerConfig struct { CandidateScanners []candidates.CandidateScanner // IncrementalScannerBlockLag is the number of blocks the incremental scanner lag behind the latest block from // GetLatestBlockHeader. This is to avoid most of the "retry for collection in finalized block" errors. // Another way to avoid them is to always use the same access node. IncrementalScannerBlockLag uint64 // IncrementalScannerMaxBlockGap is the maximum number of blocks that can scanned by the incremental scanner. // If the gap is larger than this, the incremental scanner will skip ahead and request a full scan. IncrementalScannerMaxBlockGap uint64 }
func DefaultIncrementalScannerConfig ¶ added in v0.3.0
func DefaultIncrementalScannerConfig() IncrementalScannerConfig
type NoOpScriptResultHandler ¶
type NoOpScriptResultHandler struct{}
func (NoOpScriptResultHandler) Handle ¶
func (d NoOpScriptResultHandler) Handle(_ ProcessedAddressBatch) error
type NoOpStatusReporter ¶
type NoOpStatusReporter struct{}
func (NoOpStatusReporter) ReportFullScanProgress ¶
func (n NoOpStatusReporter) ReportFullScanProgress(uint64, uint64)
func (NoOpStatusReporter) ReportIncrementalBlockDiff ¶
func (n NoOpStatusReporter) ReportIncrementalBlockDiff(uint64)
func (NoOpStatusReporter) ReportIncrementalBlockHeight ¶
func (n NoOpStatusReporter) ReportIncrementalBlockHeight(uint64)
func (NoOpStatusReporter) ReportIsFullScanRunning ¶
func (n NoOpStatusReporter) ReportIsFullScanRunning(bool)
type ProcessedAddressBatch ¶
type ProcessedAddressBatch struct { AddressBatch Result cadence.Value }
ProcessedAddressBatch contains the result of running the script on the given batch of addresses.
type ScanConcluded ¶
type Scanner ¶ added in v0.2.0
type Scanner struct { Config // contains filtered or unexported fields }
type ScriptErrorAction ¶ added in v0.3.0
type ScriptErrorAction interface {
// contains filtered or unexported methods
}
func DefaultHandleScriptError ¶ added in v0.3.0
func DefaultHandleScriptError(_ AddressBatch, err error) ScriptErrorAction
type ScriptErrorActionExclude ¶ added in v0.3.0
type ScriptErrorActionExclude struct {
Addresses []flow.Address
}
type ScriptErrorActionNone ¶ added in v0.3.0
type ScriptErrorActionNone struct{}
type ScriptErrorActionRetry ¶ added in v0.3.0
type ScriptErrorActionRetry struct{}
type ScriptErrorActionSplit ¶ added in v0.3.0
type ScriptErrorActionSplit struct{}
type ScriptErrorActionUnhandled ¶ added in v0.3.0
type ScriptErrorActionUnhandled struct{}
type ScriptResultHandler ¶
type ScriptResultHandler interface { // Handle will be called concurrently for each ProcessedAddressBatch. Handle(batch ProcessedAddressBatch) error }
type ScriptResultProcessor ¶
type ScriptResultProcessor struct { *ComponentBase // contains filtered or unexported fields }
func NewScriptResultProcessor ¶
func NewScriptResultProcessor( outChan <-chan ProcessedAddressBatch, handler ScriptResultHandler, logger zerolog.Logger, ) *ScriptResultProcessor
type ScriptRunner ¶
type ScriptRunner struct { *ComponentBase ScriptRunnerConfig // contains filtered or unexported fields }
func NewScriptRunner ¶
func NewScriptRunner( client client.Client, addressBatchChan <-chan AddressBatch, resultsChan chan<- ProcessedAddressBatch, config ScriptRunnerConfig, logger zerolog.Logger, ) *ScriptRunner
type ScriptRunnerConfig ¶ added in v0.3.0
type ScriptRunnerConfig struct { Script []byte MaxConcurrentScripts int HandleScriptError func(AddressBatch, error) ScriptErrorAction }
func DefaultScriptRunnerConfig ¶ added in v0.3.0
func DefaultScriptRunnerConfig() ScriptRunnerConfig
type StatusReporter ¶
type StatusReporterOption ¶ added in v0.2.0
type StatusReporterOption = func(*DefaultStatusReporter)
func WithStartServer ¶ added in v0.2.0
func WithStartServer(shouldStartServer bool) StatusReporterOption
func WithStatusReporterPort ¶ added in v0.2.0
func WithStatusReporterPort(port int) StatusReporterOption