scanner

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

README

Flow Batch Scan

A library to make it easy to scan the entire flow chain.

How does it work

The BlockScanner.Scan method will perform a full scan of all the addresses on chain starting at the latest available block using the provided cadence script. This will take some time, during which the full scan has to switch newer reference blocks, because the old ones are no longer available. To ensure that the final data is accurate when the scan ends, an incremental scan also runs besides the full scan. The incremental scan looks at new blocks for any accounts could have had their data changed during that block (the data that the script is looking for). If there are candidates to be scanned, the incremental scanner will scan them with the same script and update the results of the full scan.

In continuous mode the incremental scan will keep running and will scan any candidates that might have changed.

The library expects 3 components:

  • a cadence script that has to accept an address array as input addresses: [Address] and returns any cadence value as the result.
  • an array of candidate scanners which scan a block range looking for accounts that could have had changed, so that the script would now return a different result.
  • a result handler that will be called with the results of the script for each address array.

Use case

Any quantity can be scanned for if:

  • it can be observed by a cadence script
  • the change of the quantity can be observed by looking at transaction results of a block (e.g: events)

Example:

  1. Scanning for contracts deployed on accounts. (see examples/contracts)
  2. Scanning for accounts FT or NFT balance.
  3. Scanning for public keys added to accounts.

Examples

See the examples folder, there are a lot of comments in there. The contract_names example is a one time scan example. the monitoer_contract_deployments example is a continuous scan example and builds on the contract_names example.

Documentation

Index

Constants

View Source
const DefaultBatchSize = 1000
View Source
const DefaultIncrementalScannerBlockLag = 5
View Source
const DefaultIncrementalScannerMaxBlockGap = 100

DefaultIncrementalScannerMaxBlockGap is the maximum number of blocks that can scanned by the incremental scanner. If the gap is larger than this, the incremental scanner will request a full scan.

View Source
const DefaultScriptRunnerMaxConcurrentScripts = 20

DefaultScriptRunnerMaxConcurrentScripts is the maximum number of scripts that can be running concurrently at any given time. If this is more than the rate limit, some scripts will be just waiting. As long as they don't wait too long, this is not a problem.

View Source
const DefaultStatusReporterPort = 2112
View Source
const FullScanReferenceBlockSwitch = 10 * time.Second

Variables

This section is empty.

Functions

func DefaultExcludeAddress added in v0.3.0

func DefaultExcludeAddress(id flow.ChainID, address flow.Address) bool

Types

type AddressBatch

type AddressBatch struct {
	Addresses   []flow.Address
	BlockHeight uint64
	// contains filtered or unexported fields
}

AddressBatch is a batch of addresses that will be the input to the script being run byt the script runner at the given block height.

func NewAddressBatch

func NewAddressBatch(
	addresses []flow.Address,
	blockHeight uint64,
	doneHandling func(),
	isValid func() bool,
) AddressBatch

func (*AddressBatch) DoneHandling

func (b *AddressBatch) DoneHandling()

DoneHandling should be called when the batch has been processed.

func (*AddressBatch) ExcludeAddress

func (b *AddressBatch) ExcludeAddress(address flow.Address)

func (*AddressBatch) IsValid

func (b *AddressBatch) IsValid() bool

IsValid if the batch is cancelled, it should not be processed.

func (*AddressBatch) Split added in v0.3.0

func (b *AddressBatch) Split() (AddressBatch, AddressBatch)

Split splits the batch into two batches of equal size.

type AddressProvider

type AddressProvider struct {
	// contains filtered or unexported fields
}

AddressProvider Is used to get all the addresses that exists at a certain referenceBlockId this relies on the fact that a certain `endOfAccountsError` will be returned by the `accountStorageUsageScript` if the address doesn't exist yet

func InitAddressProvider

func InitAddressProvider(
	ctx context.Context,
	chain flow.ChainID,
	blockHeight uint64,
	client client.Client,
	config AddressProviderConfig,
	log zerolog.Logger,
) (*AddressProvider, error)

InitAddressProvider uses bisection to get the last existing address.

func (*AddressProvider) AddressesLen

func (p *AddressProvider) AddressesLen() uint

func (*AddressProvider) GenerateAddressBatches

func (p *AddressProvider) GenerateAddressBatches(addressChan chan<- []flow.Address, batchSize int)

func (*AddressProvider) GetNextAddress

func (p *AddressProvider) GetNextAddress() (address flow.Address, isOutOfBounds bool)

func (*AddressProvider) LastAddress

func (p *AddressProvider) LastAddress() flow.Address

type AddressProviderConfig added in v0.3.0

type AddressProviderConfig struct {
	ExcludeAddress func(id flow.ChainID, address flow.Address) bool
}

func DefaultAddressProviderConfig added in v0.3.0

func DefaultAddressProviderConfig() AddressProviderConfig

type Component

type Component interface {
	Err() error
	Done() <-chan struct{}
	Start(ctx context.Context) <-chan struct{}
}

type ComponentBase

type ComponentBase struct {
	Logger zerolog.Logger
	// contains filtered or unexported fields
}

func NewComponentWithStart added in v0.3.0

func NewComponentWithStart(
	name string,
	start func(ctx context.Context),
	logger zerolog.Logger,
) *ComponentBase

NewComponentWithStart creates a new component with a start function. The start function will be called when the component is started. The start function should not block.

func (*ComponentBase) Done

func (c *ComponentBase) Done() <-chan struct{}

func (*ComponentBase) Err

func (c *ComponentBase) Err() error

func (*ComponentBase) Finish

func (c *ComponentBase) Finish(err error)

func (*ComponentBase) Start added in v0.3.0

func (c *ComponentBase) Start(ctx context.Context) <-chan struct{}

type Config added in v0.3.0

type Config struct {
	ScriptRunnerConfig
	FullScanRunnerConfig
	IncrementalScannerConfig

	ScriptResultHandler ScriptResultHandler
	Reporter            StatusReporter

	ContinuousScan bool
	BatchSize      int

	Logger zerolog.Logger
}

func DefaultConfig added in v0.3.0

func DefaultConfig() Config

func (Config) WithBatchSize added in v0.3.0

func (c Config) WithBatchSize(
	value int,
) Config

func (Config) WithCandidateScanners added in v0.3.0

func (c Config) WithCandidateScanners(
	value []candidates.CandidateScanner,
) Config

func (Config) WithChainID added in v0.3.0

func (c Config) WithChainID(
	value flow.ChainID,
) Config

func (Config) WithContinuousScan added in v0.3.0

func (c Config) WithContinuousScan(
	value bool,
) Config

func (Config) WithExcludeAddress added in v0.3.0

func (c Config) WithExcludeAddress(
	value func(id flow.ChainID, address flow.Address) bool,
) Config

func (Config) WithHandleScriptError added in v0.3.0

func (c Config) WithHandleScriptError(
	value func(AddressBatch, error) ScriptErrorAction,
) Config

func (Config) WithLogger added in v0.3.0

func (c Config) WithLogger(
	value zerolog.Logger,
) Config

func (Config) WithMaxConcurrentScripts added in v0.3.0

func (c Config) WithMaxConcurrentScripts(
	value int,
) Config

func (Config) WithScript added in v0.3.0

func (c Config) WithScript(
	value []byte,
) Config

func (Config) WithScriptResultHandler added in v0.3.0

func (c Config) WithScriptResultHandler(
	value ScriptResultHandler,
) Config

func (Config) WithStatusReporter added in v0.3.0

func (c Config) WithStatusReporter(
	value StatusReporter,
) Config

type DefaultStatusReporter added in v0.3.0

type DefaultStatusReporter struct {
	*ComponentBase
	// contains filtered or unexported fields
}

func NewStatusReporter

func NewStatusReporter(
	namespace string,
	logger zerolog.Logger,
	options ...StatusReporterOption,
) *DefaultStatusReporter

NewStatusReporter creates a new status reporter that reports the status of the indexer to prometheus. It will start a http server on the given port that exposes the metrics (unless this is disabled for the case where you would want to serve metrics yourself). the namespace is used to namespace all metrics. The status reporter will report: - the incremental block diff (the difference between the last block height handled by the incremental scanner and the current block height) - the incremental block height (the block height last handled by the incremental scanner) - if a full scan is currently running (if it is any data the scanner is tracking is inaccurate) - if a full scan is currently running, the progress of the full scan (from 0 to 1)

func (*DefaultStatusReporter) ReportFullScanProgress added in v0.3.0

func (r *DefaultStatusReporter) ReportFullScanProgress(current uint64, total uint64)

func (*DefaultStatusReporter) ReportIncrementalBlockDiff added in v0.3.0

func (r *DefaultStatusReporter) ReportIncrementalBlockDiff(diff uint64)

func (*DefaultStatusReporter) ReportIncrementalBlockHeight added in v0.3.0

func (r *DefaultStatusReporter) ReportIncrementalBlockHeight(height uint64)

func (*DefaultStatusReporter) ReportIsFullScanRunning added in v0.3.0

func (r *DefaultStatusReporter) ReportIsFullScanRunning(running bool)

type FullScan

type FullScan struct {
	*ComponentBase
	// contains filtered or unexported fields
}

type FullScanRunner

type FullScanRunner struct {
	FullScanRunnerConfig
	// contains filtered or unexported fields
}

func NewFullScanRunner

func NewFullScanRunner(
	client client.Client,
	addressBatchChan chan<- AddressBatch,
	batchSize int,
	config FullScanRunnerConfig,
	reporter StatusReporter,
	logger zerolog.Logger,
) *FullScanRunner

func (*FullScanRunner) NewBatch added in v0.3.0

func (r *FullScanRunner) NewBatch(
	blockHeight uint64,
) *FullScan

type FullScanRunnerConfig added in v0.3.0

type FullScanRunnerConfig struct {
	AddressProviderConfig

	ChainID flow.ChainID
}

func DefaultFullScanRunnerConfig added in v0.3.0

func DefaultFullScanRunnerConfig() FullScanRunnerConfig

type IncrementalScanner

type IncrementalScanner struct {
	*ComponentBase
	IncrementalScannerConfig
	// contains filtered or unexported fields
}

func NewIncrementalScanner

func NewIncrementalScanner(
	client client.Client,

	addressBatchChan chan<- AddressBatch,
	requestBatchChan chan<- uint64,

	batchSize int,
	config IncrementalScannerConfig,

	reporter StatusReporter,
	logger zerolog.Logger,

) *IncrementalScanner

func (*IncrementalScanner) LatestHandledBlock

func (r *IncrementalScanner) LatestHandledBlock() uint64

type IncrementalScannerConfig added in v0.3.0

type IncrementalScannerConfig struct {
	CandidateScanners []candidates.CandidateScanner
	// IncrementalScannerBlockLag is the number of blocks the incremental scanner lag behind the latest block from
	// GetLatestBlockHeader. This is to avoid most of the "retry for collection in finalized block" errors.
	// Another way to avoid them is to always use the same access node.
	IncrementalScannerBlockLag uint64

	// IncrementalScannerMaxBlockGap is the maximum number of blocks that can scanned by the incremental scanner.
	// If the gap is larger than this, the incremental scanner will skip ahead and request a full scan.
	IncrementalScannerMaxBlockGap uint64
}

func DefaultIncrementalScannerConfig added in v0.3.0

func DefaultIncrementalScannerConfig() IncrementalScannerConfig

type NoOpScriptResultHandler

type NoOpScriptResultHandler struct{}

func (NoOpScriptResultHandler) Handle

type NoOpStatusReporter

type NoOpStatusReporter struct{}

func (NoOpStatusReporter) ReportFullScanProgress

func (n NoOpStatusReporter) ReportFullScanProgress(uint64, uint64)

func (NoOpStatusReporter) ReportIncrementalBlockDiff

func (n NoOpStatusReporter) ReportIncrementalBlockDiff(uint64)

func (NoOpStatusReporter) ReportIncrementalBlockHeight

func (n NoOpStatusReporter) ReportIncrementalBlockHeight(uint64)

func (NoOpStatusReporter) ReportIsFullScanRunning

func (n NoOpStatusReporter) ReportIsFullScanRunning(bool)

type ProcessedAddressBatch

type ProcessedAddressBatch struct {
	AddressBatch
	Result cadence.Value
}

ProcessedAddressBatch contains the result of running the script on the given batch of addresses.

type ScanConcluded

type ScanConcluded struct {
	LatestScannedBlockHeight uint64
	// ScanIsComplete is false if a full scan was not completed,
	// this means some accounts may have stale data, or have been missed all together.
	ScanIsComplete bool
}

type Scanner added in v0.2.0

type Scanner struct {
	Config
	// contains filtered or unexported fields
}

func NewScanner added in v0.2.0

func NewScanner(
	client client.Client,
	config Config,
) *Scanner

func (*Scanner) Scan added in v0.2.0

func (scanner *Scanner) Scan(ctx context.Context) (ScanConcluded, error)

type ScriptErrorAction added in v0.3.0

type ScriptErrorAction interface {
	// contains filtered or unexported methods
}

func DefaultHandleScriptError added in v0.3.0

func DefaultHandleScriptError(_ AddressBatch, err error) ScriptErrorAction

type ScriptErrorActionExclude added in v0.3.0

type ScriptErrorActionExclude struct {
	Addresses []flow.Address
}

type ScriptErrorActionNone added in v0.3.0

type ScriptErrorActionNone struct{}

type ScriptErrorActionRetry added in v0.3.0

type ScriptErrorActionRetry struct{}

type ScriptErrorActionSplit added in v0.3.0

type ScriptErrorActionSplit struct{}

type ScriptErrorActionUnhandled added in v0.3.0

type ScriptErrorActionUnhandled struct{}

type ScriptResultHandler

type ScriptResultHandler interface {
	// Handle will be called concurrently for each ProcessedAddressBatch.
	Handle(batch ProcessedAddressBatch) error
}

type ScriptResultProcessor

type ScriptResultProcessor struct {
	*ComponentBase
	// contains filtered or unexported fields
}

func NewScriptResultProcessor

func NewScriptResultProcessor(
	outChan <-chan ProcessedAddressBatch,
	handler ScriptResultHandler,
	logger zerolog.Logger,
) *ScriptResultProcessor

type ScriptRunner

type ScriptRunner struct {
	*ComponentBase

	ScriptRunnerConfig
	// contains filtered or unexported fields
}

func NewScriptRunner

func NewScriptRunner(
	client client.Client,
	addressBatchChan <-chan AddressBatch,
	resultsChan chan<- ProcessedAddressBatch,
	config ScriptRunnerConfig,
	logger zerolog.Logger,
) *ScriptRunner

type ScriptRunnerConfig added in v0.3.0

type ScriptRunnerConfig struct {
	Script []byte

	MaxConcurrentScripts int
	HandleScriptError    func(AddressBatch, error) ScriptErrorAction
}

func DefaultScriptRunnerConfig added in v0.3.0

func DefaultScriptRunnerConfig() ScriptRunnerConfig

type StatusReporter

type StatusReporter interface {
	ReportIncrementalBlockDiff(diff uint64)
	ReportIncrementalBlockHeight(height uint64)
	ReportIsFullScanRunning(running bool)
	ReportFullScanProgress(current uint64, total uint64)
}

type StatusReporterOption added in v0.2.0

type StatusReporterOption = func(*DefaultStatusReporter)

func WithStartServer added in v0.2.0

func WithStartServer(shouldStartServer bool) StatusReporterOption

func WithStatusReporterPort added in v0.2.0

func WithStatusReporterPort(port int) StatusReporterOption

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL