Documentation ¶
Overview ¶
Package logpoller is a service for querying EVM log data.
It can be thought of as a more performant and sophisticated version of eth_getLogs https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_getlogs. Having a local table of relevant, continually canonical logs allows us to 2 main advantages:
- Have hundreds of jobs/clients querying for logs without overloading the underlying RPC provider.
- Do more sophisticated querying (filter by confirmations/time/log contents, efficiently join between the logs table and other tables on the node, etc.)
Guarantees provided by the poller:
- Queries always return the logs from the _current_ canonical chain (same as eth_getLogs). In particular that means that querying unfinalized logs may change between queries but finalized logs remain stable. The threshold between unfinalized and finalized logs is the finalityDepth parameter, chosen such that with exceedingly high probability logs finalityDepth deep cannot be reorged.
- After calling RegisterFilter with a particular event, it will never miss logs for that event despite node crashes and reorgs. The granularity of the filter is always at least one block (more when backfilling).
- Old logs stored in the db will only be deleted if all filters matching them have explicit retention periods set, and all of them have expired. Default retention of 0 on any matching filter guarantees permanent retention.
- After calling Replay(fromBlock), all blocks including that one to the latest chain tip will be polled with the current filter. This can be used on first time job add to specify a start block from which you wish to capture existing logs.
Index ¶
- Variables
- func EvmWord(i uint64) common.Hash
- func FilterName(id string, args ...any) string
- func NewLogPoller(orm *ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, ...) *logPoller
- type Client
- type Exp
- type Filter
- type Log
- type LogPoller
- type LogPollerBlock
- type LogPollerTest
- type ORM
- func (o *ORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error
- func (o *ORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error
- func (o *ORM) DeleteExpiredLogs(qopts ...pg.QOpt) error
- func (o *ORM) DeleteFilter(name string, qopts ...pg.QOpt) error
- func (o *ORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error
- func (o *ORM) GetBlocksRange(start uint64, end uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error)
- func (o *ORM) InsertBlock(h common.Hash, n int64, t time.Time, qopts ...pg.QOpt) error
- func (o *ORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error)
- func (o *ORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error
- func (o *ORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error)
- func (o *ORM) SelectBlockByHash(h common.Hash, qopts ...pg.QOpt) (*LogPollerBlock, error)
- func (o *ORM) SelectBlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error)
- func (o *ORM) SelectDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, ...) ([]Log, error)
- func (o *ORM) SelectDataWordRange(address common.Address, eventSig common.Hash, wordIndex int, ...) ([]Log, error)
- func (o *ORM) SelectIndexLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, ...) ([]Log, error)
- func (o *ORM) SelectIndexLogsTopicRange(address common.Address, eventSig common.Hash, topicIndex int, ...) ([]Log, error)
- func (o *ORM) SelectIndexedLogs(address common.Address, eventSig common.Hash, topicIndex int, ...) ([]Log, error)
- func (o *ORM) SelectIndexedLogsByBlockRangeFilter(start, end int64, address common.Address, eventSig common.Hash, topicIndex int, ...) ([]Log, error)
- func (o *ORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIndex int, address common.Address, ...) ([]Log, error)
- func (o *ORM) SelectLatestBlock(qopts ...pg.QOpt) (*LogPollerBlock, error)
- func (o *ORM) SelectLatestLogEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error)
- func (o *ORM) SelectLatestLogEventSigsAddrsWithConfs(fromBlock int64, addresses []common.Address, eventSigs []common.Hash, ...) ([]Log, error)
- func (o *ORM) SelectLogsByBlockRange(start, end int64) ([]Log, error)
- func (o *ORM) SelectLogsByBlockRangeFilter(start, end int64, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *ORM) SelectLogsWithSigsByBlockRangeFilter(start, end int64, address common.Address, eventSigs []common.Hash, ...) (logs []Log, err error)
- type ObservedLogPoller
- func (o *ObservedLogPoller) IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, ...) ([]Log, error)
- func (o *ObservedLogPoller) IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, address common.Address, topicIndex int, ...) ([]Log, error)
- func (o *ObservedLogPoller) IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, ...) ([]Log, error)
- func (o *ObservedLogPoller) IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, ...) ([]Log, error)
- func (o *ObservedLogPoller) IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, ...) ([]Log, error)
- func (o *ObservedLogPoller) LatestLogEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, ...) ([]Log, error)
- func (o *ObservedLogPoller) LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, ...) ([]Log, error)
- func (o *ObservedLogPoller) LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, ...) ([]Log, error)
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func FilterName ¶
FilterName is a suggested convenience function for clients to construct unique filter names to populate Name field of struct Filter
func NewLogPoller ¶
func NewLogPoller(orm *ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepBlocksDepth int64) *logPoller
NewLogPoller creates a log poller. Note there is an assumption that blocks can be processed faster than they are produced for the given chain, or the poller will fall behind. Block processing involves the following calls in steady state (without reorgs):
- eth_getBlockByNumber - headers only (transaction hashes, not full transaction objects),
- eth_getLogs - get the logs for the block
- 1 db read latest block - for checking reorgs
- 1 db tx including block write and logs write to logs.
How fast that can be done depends largely on network speed and DB, but even for the fastest support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency
Types ¶
type Client ¶
type Client interface { HeadByNumber(ctx context.Context, n *big.Int) (*evmtypes.Head, error) HeadByHash(ctx context.Context, n common.Hash) (*evmtypes.Head, error) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) ConfiguredChainID() *big.Int }
type Filter ¶
type Log ¶
type Log struct { EvmChainId *utils.Big LogIndex int64 BlockHash common.Hash BlockNumber int64 BlockTimestamp time.Time Topics pq.ByteaArray EventSig common.Hash Address common.Address TxHash common.Hash Data []byte CreatedAt time.Time }
Log represents an EVM log.
type LogPoller ¶
type LogPoller interface { services.ServiceCtx Replay(ctx context.Context, fromBlock int64) error ReplayAsync(fromBlock int64) RegisterFilter(filter Filter) error UnregisterFilter(name string, q pg.Queryer) error LatestBlock(qopts ...pg.QOpt) (int64, error) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) // General querying Logs(start, end int64, eventSig common.Hash, address common.Address, qopts ...pg.QOpt) ([]Log, error) LogsWithSigs(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]Log, error) LatestLogByEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error) LatestLogEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) ([]Log, error) // Content based querying IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error) IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs int, qopts ...pg.QOpt) ([]Log, error) LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) }
func NewObservedLogPoller ¶ added in v2.2.0
func NewObservedLogPoller(orm *ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepBlocksDepth int64) LogPoller
NewObservedLogPoller creates an observed version of log poller created by NewLogPoller Please see ObservedLogPoller for more details on how latencies are measured
type LogPollerBlock ¶
type LogPollerBlock struct { EvmChainId *utils.Big BlockHash common.Hash // Note geth uses int64 internally https://github.com/ethereum/go-ethereum/blob/f66f1a16b3c480d3a43ac7e8a09ab3e362e96ae4/eth/filters/api.go#L340 BlockNumber int64 BlockTimestamp time.Time CreatedAt time.Time }
LogPollerBlock represents an unfinalized block used for reorg detection when polling.
type LogPollerTest ¶
type LogPollerTest interface { LogPoller PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64) Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery GetReplayFromBlock(ctx context.Context, requested int64) (int64, error) }
type ORM ¶
type ORM struct {
// contains filtered or unexported fields
}
func (*ORM) DeleteBlocksAfter ¶
DeleteBlocksAfter delete all blocks after and including start.
func (*ORM) DeleteBlocksBefore ¶
DeleteBlocksBefore delete all blocks before and including end.
func (*ORM) DeleteExpiredLogs ¶ added in v2.2.0
func (*ORM) DeleteFilter ¶
DeleteFilter removes all events,address pairs associated with the Filter
func (*ORM) GetBlocksRange ¶
func (*ORM) InsertBlock ¶
InsertBlock is idempotent to support replays.
func (*ORM) InsertFilter ¶
InsertFilter is idempotent.
Each address/event pair must have a unique job id, so it may be removed when the job is deleted. If a second job tries to overwrite the same pair, this should fail.
func (*ORM) InsertLogs ¶
InsertLogs is idempotent to support replays.
func (*ORM) LoadFilters ¶
LoadFiltersForChain returns all filters for this chain
func (*ORM) SelectBlockByHash ¶
func (*ORM) SelectBlockByNumber ¶
func (*ORM) SelectDataWordGreaterThan ¶
func (*ORM) SelectDataWordRange ¶
func (*ORM) SelectIndexLogsTopicGreaterThan ¶
func (*ORM) SelectIndexLogsTopicRange ¶
func (*ORM) SelectIndexedLogs ¶
func (*ORM) SelectIndexedLogsByBlockRangeFilter ¶
func (o *ORM) SelectIndexedLogsByBlockRangeFilter(start, end int64, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error)
SelectIndexedLogsByBlockRangeFilter finds the indexed logs in a given block range.
func (*ORM) SelectIndexedLogsWithSigsExcluding ¶ added in v2.1.0
func (o *ORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs int, qopts ...pg.QOpt) ([]Log, error)
SelectIndexedLogsWithSigsExcluding query's for logs that have signature A and exclude logs that have a corresponding signature B, matching is done based on the topic index both logs should be inside the block range and have the minimum number of confirmations
func (*ORM) SelectLatestBlock ¶
func (o *ORM) SelectLatestBlock(qopts ...pg.QOpt) (*LogPollerBlock, error)
func (*ORM) SelectLatestLogEventSigWithConfs ¶
func (*ORM) SelectLatestLogEventSigsAddrsWithConfs ¶
func (o *ORM) SelectLatestLogEventSigsAddrsWithConfs(fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
SelectLatestLogEventSigsAddrsWithConfs finds the latest log by (address, event) combination that matches a list of Addresses and list of events
func (*ORM) SelectLogsByBlockRange ¶
func (*ORM) SelectLogsByBlockRangeFilter ¶
func (o *ORM) SelectLogsByBlockRangeFilter(start, end int64, address common.Address, eventSig common.Hash, qopts ...pg.QOpt) ([]Log, error)
SelectLogsByBlockRangeFilter finds the logs in a given block range.
func (*ORM) SelectLogsWithSigsByBlockRangeFilter ¶
func (o *ORM) SelectLogsWithSigsByBlockRangeFilter(start, end int64, address common.Address, eventSigs []common.Hash, qopts ...pg.QOpt) (logs []Log, err error)
SelectLogsWithSigsByBlockRangeFilter finds the logs in the given block range with the given event signatures emitted from the given address.
type ObservedLogPoller ¶ added in v2.2.0
type ObservedLogPoller struct { LogPoller // contains filtered or unexported fields }
ObservedLogPoller is a decorator layer for LogPoller, responsible for adding pushing histogram metrics for some of the queries. It doesn't change internal logic, because all calls are delegated to the origin LogPoller