logpoller

package
v0.0.0-...-3c6a880 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2025 License: MIT Imports: 36 Imported by: 1

Documentation

Overview

Package logpoller is a service for querying EVM log data.

It can be thought of as a more performant and sophisticated version of eth_getLogs https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_getlogs. Having a local table of relevant, continually canonical logs allows us to 2 main advantages:

  • Have hundreds of jobs/clients querying for logs without overloading the underlying RPC provider.
  • Do more sophisticated querying (filter by confirmations/time/log contents, efficiently join between the logs table and other tables on the node, etc.)

Guarantees provided by the poller:

  • Queries always return the logs from the _current_ canonical chain (same as eth_getLogs). In particular that means that querying unfinalized logs may change between queries but finalized logs remain stable. The threshold between unfinalized and finalized logs is the finalityDepth parameter, chosen such that with exceedingly high probability logs finalityDepth deep cannot be reorged.
  • After calling RegisterFilter with a particular event, it will never miss logs for that event despite node crashes and reorgs. The granularity of the filter is always at least one block (more when backfilling).
  • Old logs stored in the db will only be deleted if all filters matching them have explicit retention periods set, and all of them have expired. Default retention of 0 on any matching filter guarantees permanent retention.
  • After calling Replay(fromBlock), all blocks including that one to the latest chain tip will be polled with the current filter. This can be used on first time job add to specify a start block from which you wish to capture existing logs.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrReplayRequestAborted = pkgerrors.New("aborted, replay request cancelled")
	ErrReplayInProgress     = pkgerrors.New("replay request cancelled, but replay is already in progress")
	ErrLogPollerShutdown    = pkgerrors.New("replay aborted due to log poller shutdown")
)
View Source
var (
	ErrUnexpectedCursorFormat = errors.New("unexpected cursor format")
)

Functions

func EvmWord

func EvmWord(i uint64) common.Hash

func FilterName

func FilterName(id string, args ...any) string

FilterName is a suggested convenience function for clients to construct unique filter names to populate Name field of struct Filter

func FormatContractReaderCursor

func FormatContractReaderCursor(log Log) string

MakeContractReaderCursor is exported to ensure cursor structure remains consistent.

func NewAddressFilter

func NewAddressFilter(address common.Address) query.Expression

func NewConfirmationsFilter

func NewConfirmationsFilter(confirmations evmtypes.Confirmations) query.Expression

func NewEventByTopicFilter

func NewEventByTopicFilter(topicIndex uint64, valueComparers []HashedValueComparator) query.Expression

func NewEventByWordFilter

func NewEventByWordFilter(wordIndex int, valueComparers []HashedValueComparator) query.Expression

func NewEventSigFilter

func NewEventSigFilter(hash common.Hash) query.Expression

func NewLogPoller

func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracker, opts Opts) *logPoller

NewLogPoller creates a log poller. Note there is an assumption that blocks can be processed faster than they are produced for the given chain, or the poller will fall behind. Block processing involves the following calls in steady state (without reorgs):

  • eth_getBlockByNumber - headers only (transaction hashes, not full transaction objects),
  • eth_getLogs - get the logs for the block
  • 1 db read latest block - for checking reorgs
  • 1 db tx including block write and logs write to logs.

How fast that can be done depends largely on network speed and DB, but even for the fastest support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency

func Where

func Where(expressions ...query.Expression) ([]query.Expression, error)

Where is a query.Where wrapper that ignores the Key and returns a slice of query.Expression rather than query.KeyFilter. If no expressions are provided, or an error occurs, an empty slice is returned.

Types

type Block

type Block struct {
	EVMChainID *big.Big
	BlockHash  common.Hash
	// Note geth uses int64 internally https://github.com/ethereum/go-ethereum/blob/f66f1a16b3c480d3a43ac7e8a09ab3e362e96ae4/eth/filters/api.go#L340
	BlockNumber          int64
	BlockTimestamp       time.Time
	FinalizedBlockNumber int64
	CreatedAt            time.Time
}

Block represents an unfinalized block used for reorg detection when polling.

type Client

type Client interface {
	HeadByNumber(ctx context.Context, n *big.Int) (*evmtypes.Head, error)
	HeadByHash(ctx context.Context, n common.Hash) (*evmtypes.Head, error)
	BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
	FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
	ConfiguredChainID() *big.Int
}

type DSORM

type DSORM struct {
	// contains filtered or unexported fields
}

func NewORM

func NewORM(chainID *big.Int, ds sqlutil.DataSource, lggr logger.Logger) *DSORM

NewORM creates an DSORM scoped to chainID.

func (*DSORM) DeleteBlocksBefore

func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error)

DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks. Otherwise, it will delete all blocks at once.

func (*DSORM) DeleteExpiredLogs

func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)

DeleteExpiredLogs removes any logs which either:

  • don't match any currently registered filters, or
  • have a timestamp older than any matching filter's retention, UNLESS there is at least one matching filter with retention=0

func (*DSORM) DeleteFilter

func (o *DSORM) DeleteFilter(ctx context.Context, name string) error

DeleteFilter removes all events,address pairs associated with the Filter

func (*DSORM) DeleteLogsAndBlocksAfter

func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error

func (*DSORM) DeleteLogsByRowID

func (o *DSORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error)

DeleteLogsByRowID accepts a list of log row id's to delete

func (*DSORM) FilteredLogs

func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, _ string) ([]Log, error)

func (*DSORM) GetBlocksRange

func (o *DSORM) GetBlocksRange(ctx context.Context, start int64, end int64) ([]Block, error)

func (*DSORM) InsertBlock

func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error

InsertBlock is idempotent to support replays.

func (*DSORM) InsertFilter

func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (err error)

InsertFilter is idempotent.

Each address/event pair must have a unique job id, so it may be removed when the job is deleted. If a second job tries to overwrite the same pair, this should fail.

func (*DSORM) InsertLogs

func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error

InsertLogs is idempotent to support replays.

func (*DSORM) InsertLogsWithBlock

func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error

func (*DSORM) LoadFilters

func (o *DSORM) LoadFilters(ctx context.Context) (map[string]Filter, error)

LoadFilters returns all filters for this chain

func (*DSORM) SelectBlockByHash

func (o *DSORM) SelectBlockByHash(ctx context.Context, hash common.Hash) (*Block, error)

func (*DSORM) SelectBlockByNumber

func (o *DSORM) SelectBlockByNumber(ctx context.Context, n int64) (*Block, error)

func (*DSORM) SelectExcessLogIDs

func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results []uint64, err error)

SelectExcessLogIDs finds any logs old enough that MaxLogsKept has been exceeded for every filter they match.

func (*DSORM) SelectIndexedLogs

func (o *DSORM) SelectIndexedLogs(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*DSORM) SelectIndexedLogsByBlockRange

func (o *DSORM) SelectIndexedLogsByBlockRange(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash) ([]Log, error)

SelectIndexedLogsByBlockRange finds the indexed logs in a given block range.

func (*DSORM) SelectIndexedLogsByTxHash

func (o *DSORM) SelectIndexedLogsByTxHash(ctx context.Context, address common.Address, eventSig common.Hash, txHash common.Hash) ([]Log, error)

func (*DSORM) SelectIndexedLogsCreatedAfter

func (o *DSORM) SelectIndexedLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error)

func (*DSORM) SelectIndexedLogsTopicGreaterThan

func (o *DSORM) SelectIndexedLogsTopicGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*DSORM) SelectIndexedLogsTopicRange

func (o *DSORM) SelectIndexedLogsTopicRange(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*DSORM) SelectIndexedLogsWithSigsExcluding

func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs evmtypes.Confirmations) ([]Log, error)

SelectIndexedLogsWithSigsExcluding query's for logs that have signature A and exclude logs that have a corresponding signature B, matching is done based on the topic index both logs should be inside the block range and have the minimum number of evmtypes.Confirmations

func (*DSORM) SelectLatestBlock

func (o *DSORM) SelectLatestBlock(ctx context.Context) (*Block, error)

func (*DSORM) SelectLatestBlockByEventSigsAddrsWithConfs

func (o *DSORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs evmtypes.Confirmations) (int64, error)

SelectLatestBlockByEventSigsAddrsWithConfs finds the latest block number that matches a list of Addresses and list of events. It returns 0 if there is no matching block

func (*DSORM) SelectLatestFinalizedBlock

func (o *DSORM) SelectLatestFinalizedBlock(ctx context.Context) (*Block, error)

func (*DSORM) SelectLatestLogByEventSigWithConfs

func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error)

func (*DSORM) SelectLatestLogEventSigsAddrsWithConfs

func (o *DSORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs evmtypes.Confirmations) ([]Log, error)

SelectLatestLogEventSigsAddrsWithConfs finds the latest log by (address, event) combination that matches a list of Addresses and list of events

func (*DSORM) SelectLogs

func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error)

SelectLogs finds the logs in a given block range.

func (*DSORM) SelectLogsByBlockRange

func (o *DSORM) SelectLogsByBlockRange(ctx context.Context, start, end int64) ([]Log, error)

func (*DSORM) SelectLogsCreatedAfter

func (o *DSORM) SelectLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error)

SelectLogsCreatedAfter finds logs created after some timestamp.

func (*DSORM) SelectLogsDataWordBetween

func (o *DSORM) SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*DSORM) SelectLogsDataWordGreaterThan

func (o *DSORM) SelectLogsDataWordGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*DSORM) SelectLogsDataWordRange

func (o *DSORM) SelectLogsDataWordRange(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*DSORM) SelectLogsWithSigs

func (o *DSORM) SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) (logs []Log, err error)

SelectLogsWithSigs finds the logs in the given block range with the given event signatures emitted from the given address.

func (*DSORM) SelectOldestBlock

func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*Block, error)

func (*DSORM) SelectUnmatchedLogIDs

func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error)

func (*DSORM) Transact

func (o *DSORM) Transact(ctx context.Context, fn func(*DSORM) error) (err error)

type Exp

type Exp struct {
	Address      common.Address
	EventSig     common.Hash
	Expiration   time.Time
	TimeNow      time.Time
	ShouldDelete bool
}

type Filter

type Filter struct {
	Name         string // see FilterName(id, args) below
	Addresses    evmtypes.AddressArray
	EventSigs    evmtypes.HashArray // list of possible values for eventsig (aka topic1)
	Topic2       evmtypes.HashArray // list of possible values for topic2
	Topic3       evmtypes.HashArray // list of possible values for topic3
	Topic4       evmtypes.HashArray // list of possible values for topic4
	Retention    time.Duration      // maximum amount of time to retain logs
	MaxLogsKept  uint64             // maximum number of logs to retain ( 0 = unlimited )
	LogsPerBlock uint64             // rate limit ( maximum # of logs per block, 0 = unlimited )
}

func (*Filter) Contains

func (filter *Filter) Contains(other *Filter) bool

Contains returns true if this filter already fully Contains a filter passed to it.

type HashedValueComparator

type HashedValueComparator struct {
	Values   []common.Hash
	Operator primitives.ComparisonOperator
}

type HeadTracker

type HeadTracker interface {
	services.Service
	LatestAndFinalizedBlock(ctx context.Context) (latest, finalized *evmtypes.Head, err error)
}

type Log

type Log struct {
	EVMChainID     *big.Big
	LogIndex       int64
	BlockHash      common.Hash
	BlockNumber    int64
	BlockTimestamp time.Time
	Topics         pq.ByteaArray
	EventSig       common.Hash
	Address        common.Address
	TxHash         common.Hash
	Data           []byte
	CreatedAt      time.Time
}

Log represents an EVM log.

func (*Log) GetTopics

func (l *Log) GetTopics() []common.Hash

func (*Log) ToGethLog

func (l *Log) ToGethLog() types.Log

type LogPoller

type LogPoller interface {
	services.Service
	Healthy() error
	Replay(ctx context.Context, fromBlock int64) error
	ReplayAsync(fromBlock int64)
	RegisterFilter(ctx context.Context, filter Filter) error
	UnregisterFilter(ctx context.Context, name string) error
	HasFilter(name string) bool
	GetFilters() map[string]Filter
	LatestBlock(ctx context.Context) (Block, error)
	GetBlocksRange(ctx context.Context, numbers []uint64) ([]Block, error)
	FindLCA(ctx context.Context) (*Block, error)
	DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error

	// General querying
	Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error)
	LogsWithSigs(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]Log, error)
	LogsCreatedAfter(ctx context.Context, eventSig common.Hash, address common.Address, time time.Time, confs evmtypes.Confirmations) ([]Log, error)
	LatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error)
	LatestLogEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs evmtypes.Confirmations) ([]Log, error)
	LatestBlockByEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs evmtypes.Confirmations) (int64, error)

	// Content based querying
	IndexedLogs(ctx context.Context, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs evmtypes.Confirmations) ([]Log, error)
	IndexedLogsByBlockRange(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash) ([]Log, error)
	IndexedLogsCreatedAfter(ctx context.Context, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error)
	IndexedLogsByTxHash(ctx context.Context, eventSig common.Hash, address common.Address, txHash common.Hash) ([]Log, error)
	IndexedLogsTopicGreaterThan(ctx context.Context, eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)
	IndexedLogsTopicRange(ctx context.Context, eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error)
	IndexedLogsWithSigsExcluding(ctx context.Context, address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs evmtypes.Confirmations) ([]Log, error)
	LogsDataWordRange(ctx context.Context, eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error)
	LogsDataWordGreaterThan(ctx context.Context, eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)
	LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

	// chainlink-common query filtering
	FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}
var (
	ErrDisabled                 = pkgerrors.New("log poller disabled")
	LogPollerDisabled LogPoller = disabled{}
)

type LogPollerTest

type LogPollerTest interface {
	LogPoller
	PollAndSaveLogs(ctx context.Context, currentBlockNumber int64)
	BackupPollAndSaveLogs(ctx context.Context) error
	Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery
	GetReplayFromBlock(ctx context.Context, requested int64) (int64, error)
	PruneOldBlocks(ctx context.Context) (bool, error)
}

type ORM

type ORM interface {
	InsertLogs(ctx context.Context, logs []Log) error
	InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error
	InsertFilter(ctx context.Context, filter Filter) error

	LoadFilters(ctx context.Context) (map[string]Filter, error)
	DeleteFilter(ctx context.Context, name string) error

	DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error)
	InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error
	DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error)
	DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error
	SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error)
	DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)
	SelectExcessLogIDs(ctx context.Context, limit int64) (rowIDs []uint64, err error)

	GetBlocksRange(ctx context.Context, start int64, end int64) ([]Block, error)
	SelectBlockByNumber(ctx context.Context, blockNumber int64) (*Block, error)
	SelectBlockByHash(ctx context.Context, hash common.Hash) (*Block, error)
	SelectLatestBlock(ctx context.Context) (*Block, error)
	SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*Block, error)
	SelectLatestFinalizedBlock(ctx context.Context) (*Block, error)

	SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error)
	SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) ([]Log, error)
	SelectLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error)
	SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error)
	SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs evmtypes.Confirmations) ([]Log, error)
	SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs evmtypes.Confirmations) (int64, error)
	SelectLogsByBlockRange(ctx context.Context, start, end int64) ([]Log, error)

	SelectIndexedLogs(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, confs evmtypes.Confirmations) ([]Log, error)
	SelectIndexedLogsByBlockRange(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash) ([]Log, error)
	SelectIndexedLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error)
	SelectIndexedLogsTopicGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)
	SelectIndexedLogsTopicRange(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error)
	SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs evmtypes.Confirmations) ([]Log, error)
	SelectIndexedLogsByTxHash(ctx context.Context, address common.Address, eventSig common.Hash, txHash common.Hash) ([]Log, error)
	SelectLogsDataWordRange(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error)
	SelectLogsDataWordGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)
	SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

	// FilteredLogs accepts chainlink-common filtering DSL.
	FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

ORM represents the persistent data access layer used by the log poller. At this moment, it's a bit leaky abstraction, because it exposes some of the database implementation details (e.g. pg.Q). Ideally it should be agnostic and could be applied to any persistence layer. What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM.

type ObservedORM

type ObservedORM struct {
	ORM
	// contains filtered or unexported fields
}

ObservedORM is a decorator layer for ORM used by LogPoller, responsible for pushing Prometheus metrics reporting duration and size of result set for the queries. It doesn't change internal logic, because all calls are delegated to the origin ORM

func NewObservedORM

func NewObservedORM(chainID *big.Int, ds sqlutil.DataSource, lggr logger.Logger) *ObservedORM

NewObservedORM creates an observed version of log poller's ORM created by NewORM Please see ObservedLogPoller for more details on how latencies are measured

func (*ObservedORM) DeleteBlocksBefore

func (o *ObservedORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error)

func (*ObservedORM) DeleteExpiredLogs

func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)

func (*ObservedORM) DeleteFilter

func (o *ObservedORM) DeleteFilter(ctx context.Context, name string) error

func (*ObservedORM) DeleteLogsAndBlocksAfter

func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error

func (*ObservedORM) DeleteLogsByRowID

func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error)

func (*ObservedORM) FilteredLogs

func (o *ObservedORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)

func (*ObservedORM) GetBlocksRange

func (o *ObservedORM) GetBlocksRange(ctx context.Context, start int64, end int64) ([]Block, error)

func (*ObservedORM) InsertFilter

func (o *ObservedORM) InsertFilter(ctx context.Context, filter Filter) error

func (*ObservedORM) InsertLogs

func (o *ObservedORM) InsertLogs(ctx context.Context, logs []Log) error

func (*ObservedORM) InsertLogsWithBlock

func (o *ObservedORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error

func (*ObservedORM) LoadFilters

func (o *ObservedORM) LoadFilters(ctx context.Context) (map[string]Filter, error)

func (*ObservedORM) SelectBlockByNumber

func (o *ObservedORM) SelectBlockByNumber(ctx context.Context, n int64) (*Block, error)

func (*ObservedORM) SelectExcessLogIDs

func (o *ObservedORM) SelectExcessLogIDs(ctx context.Context, limit int64) ([]uint64, error)

func (*ObservedORM) SelectIndexedLogs

func (o *ObservedORM) SelectIndexedLogs(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*ObservedORM) SelectIndexedLogsByBlockRange

func (o *ObservedORM) SelectIndexedLogsByBlockRange(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash) ([]Log, error)

func (*ObservedORM) SelectIndexedLogsByTxHash

func (o *ObservedORM) SelectIndexedLogsByTxHash(ctx context.Context, address common.Address, eventSig common.Hash, txHash common.Hash) ([]Log, error)

func (*ObservedORM) SelectIndexedLogsCreatedAfter

func (o *ObservedORM) SelectIndexedLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error)

func (*ObservedORM) SelectIndexedLogsTopicGreaterThan

func (o *ObservedORM) SelectIndexedLogsTopicGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*ObservedORM) SelectIndexedLogsTopicRange

func (o *ObservedORM) SelectIndexedLogsTopicRange(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*ObservedORM) SelectIndexedLogsWithSigsExcluding

func (o *ObservedORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs evmtypes.Confirmations) ([]Log, error)

func (*ObservedORM) SelectLatestBlock

func (o *ObservedORM) SelectLatestBlock(ctx context.Context) (*Block, error)

func (*ObservedORM) SelectLatestBlockByEventSigsAddrsWithConfs

func (o *ObservedORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs evmtypes.Confirmations) (int64, error)

func (*ObservedORM) SelectLatestLogByEventSigWithConfs

func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error)

func (*ObservedORM) SelectLatestLogEventSigsAddrsWithConfs

func (o *ObservedORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*ObservedORM) SelectLogs

func (o *ObservedORM) SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error)

func (*ObservedORM) SelectLogsCreatedAfter

func (o *ObservedORM) SelectLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error)

func (*ObservedORM) SelectLogsDataWordBetween

func (o *ObservedORM) SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*ObservedORM) SelectLogsDataWordGreaterThan

func (o *ObservedORM) SelectLogsDataWordGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*ObservedORM) SelectLogsDataWordRange

func (o *ObservedORM) SelectLogsDataWordRange(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error)

func (*ObservedORM) SelectLogsWithSigs

func (o *ObservedORM) SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) ([]Log, error)

func (*ObservedORM) SelectOldestBlock

func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*Block, error)

func (*ObservedORM) SelectUnmatchedLogIDs

func (o *ObservedORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error)

type Opts

type Opts struct {
	PollPeriod               time.Duration
	UseFinalityTag           bool
	FinalityDepth            int64
	BackfillBatchSize        int64
	RPCBatchSize             int64
	KeepFinalizedBlocksDepth int64
	BackupPollerBlockDelay   int64
	LogPrunePageSize         int64
	ClientErrors             config.ClientErrors
}

type RangeQueryer

type RangeQueryer[T comparable] struct {
	// contains filtered or unexported fields
}

func NewRangeQueryer

func NewRangeQueryer[T comparable](evmChainID *big.Int, ds sqlutil.DataSource, query func(ctx context.Context, r *RangeQueryer[T], lower, upper int64) (rowsAffected int64, err error)) *RangeQueryer[T]

func (*RangeQueryer[T]) AddResults

func (r *RangeQueryer[T]) AddResults(moreResults []T)

func (*RangeQueryer[T]) AllResults

func (r *RangeQueryer[T]) AllResults() []T

func (*RangeQueryer[T]) ExecPagedQuery

func (r *RangeQueryer[T]) ExecPagedQuery(ctx context.Context, limit, end int64) (rowsAffected int64, err error)

ExecPagedQuery runs a query accepting an upper limit block (end) in a fast paged way. limit is the maximum number of results to be returned, but it is also used to break the query up into smaller queries restricted to limit # of blocks. The first range of blocks will be from MIN(block_number) to MIN(block_number) + limit. The iterative process ends either once the limit on results is reached or block_number = end. The query will never be executed on blocks where block_number > end, and it will never be executed on block_number = B unless it has also been executed on all blocks with block_number < B r.AddResults(moreResults []T) should be called if this is a query returning results (ie, SELECT). These will be accumulated in r.acc and can be retrieved later with r.AllResults()

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL