Documentation ¶
Overview ¶
Package logpoller is a service for querying EVM log data.
It can be thought of as a more performant and sophisticated version of eth_getLogs https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_getlogs. Having a local table of relevant, continually canonical logs allows us to 2 main advantages:
- Have hundreds of jobs/clients querying for logs without overloading the underlying RPC provider.
- Do more sophisticated querying (filter by confirmations/time/log contents, efficiently join between the logs table and other tables on the node, etc.)
Guarantees provided by the poller:
- Queries always return the logs from the _current_ canonical chain (same as eth_getLogs). In particular that means that querying unfinalized logs may change between queries but finalized logs remain stable. The threshold between unfinalized and finalized logs is the finalityDepth parameter, chosen such that with exceedingly high probability logs finalityDepth deep cannot be reorged.
- After calling RegisterFilter with a particular event, it will never miss logs for that event despite node crashes and reorgs. The granularity of the filter is always at least one block (more when backfilling).
- Old logs stored in the db will only be deleted if all filters matching them have explicit retention periods set, and all of them have expired. Default retention of 0 on any matching filter guarantees permanent retention.
- After calling Replay(fromBlock), all blocks including that one to the latest chain tip will be polled with the current filter. This can be used on first time job add to specify a start block from which you wish to capture existing logs.
Index ¶
- Variables
- func EvmWord(i uint64) common.Hash
- func FilterName(id string, args ...any) string
- func NewAddressFilter(address common.Address) query.Expression
- func NewConfirmationsFilter(confirmations evmtypes.Confirmations) query.Expression
- func NewEventByTopicFilter(topicIndex uint64, valueComparers []primitives.ValueComparator) query.Expression
- func NewEventByWordFilter(eventSig common.Hash, wordIndex uint8, ...) query.Expression
- func NewEventSigFilter(hash common.Hash) query.Expression
- func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracker, opts Opts) *logPoller
- type Client
- type DSORM
- func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error)
- func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)
- func (o *DSORM) DeleteFilter(ctx context.Context, name string) error
- func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error
- func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, ...) ([]Log, error)
- func (o *DSORM) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error)
- func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, ...) error
- func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (err error)
- func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error
- func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block LogPollerBlock) error
- func (o *DSORM) LoadFilters(ctx context.Context) (map[string]Filter, error)
- func (o *DSORM) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error)
- func (o *DSORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlock, error)
- func (o *DSORM) SelectIndexedLogs(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *DSORM) SelectIndexedLogsByBlockRange(ctx context.Context, start, end int64, address common.Address, ...) ([]Log, error)
- func (o *DSORM) SelectIndexedLogsByTxHash(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *DSORM) SelectIndexedLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *DSORM) SelectIndexedLogsTopicGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *DSORM) SelectIndexedLogsTopicRange(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, sigB common.Hash, topicIndex int, ...) ([]Log, error)
- func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error)
- func (o *DSORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, ...) (int64, error)
- func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, ...) (*Log, error)
- func (o *DSORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, addresses []common.Address, ...) ([]Log, error)
- func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address common.Address, ...) ([]Log, error)
- func (o *DSORM) SelectLogsByBlockRange(ctx context.Context, start, end int64) ([]Log, error)
- func (o *DSORM) SelectLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *DSORM) SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *DSORM) SelectLogsDataWordGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *DSORM) SelectLogsDataWordRange(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *DSORM) SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, ...) (logs []Log, err error)
- func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error)
- func (o *DSORM) Transact(ctx context.Context, fn func(*DSORM) error) (err error)
- type Exp
- type Filter
- type HeadTracker
- type Log
- type LogPoller
- type LogPollerBlock
- type LogPollerTest
- type ORM
- type ObservedORM
- func (o *ObservedORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error)
- func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)
- func (o *ObservedORM) DeleteFilter(ctx context.Context, name string) error
- func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error
- func (o *ObservedORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, ...) ([]Log, error)
- func (o *ObservedORM) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error)
- func (o *ObservedORM) InsertFilter(ctx context.Context, filter Filter) error
- func (o *ObservedORM) InsertLogs(ctx context.Context, logs []Log) error
- func (o *ObservedORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block LogPollerBlock) error
- func (o *ObservedORM) LoadFilters(ctx context.Context) (map[string]Filter, error)
- func (o *ObservedORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlock, error)
- func (o *ObservedORM) SelectIndexedLogs(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *ObservedORM) SelectIndexedLogsByBlockRange(ctx context.Context, start, end int64, address common.Address, ...) ([]Log, error)
- func (o *ObservedORM) SelectIndexedLogsByTxHash(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *ObservedORM) SelectIndexedLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *ObservedORM) SelectIndexedLogsTopicGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *ObservedORM) SelectIndexedLogsTopicRange(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *ObservedORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, sigB common.Hash, topicIndex int, ...) ([]Log, error)
- func (o *ObservedORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error)
- func (o *ObservedORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, ...) (int64, error)
- func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, ...) (*Log, error)
- func (o *ObservedORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, addresses []common.Address, ...) ([]Log, error)
- func (o *ObservedORM) SelectLogs(ctx context.Context, start, end int64, address common.Address, ...) ([]Log, error)
- func (o *ObservedORM) SelectLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *ObservedORM) SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *ObservedORM) SelectLogsDataWordGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *ObservedORM) SelectLogsDataWordRange(ctx context.Context, address common.Address, eventSig common.Hash, ...) ([]Log, error)
- func (o *ObservedORM) SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, ...) ([]Log, error)
- func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error)
- type Opts
Constants ¶
This section is empty.
Variables ¶
var ( ErrReplayRequestAborted = pkgerrors.New("aborted, replay request cancelled") ErrReplayInProgress = pkgerrors.New("replay request cancelled, but replay is already in progress") ErrLogPollerShutdown = pkgerrors.New("replay aborted due to log poller shutdown") ErrFinalityViolated = pkgerrors.New("finality violated") )
var (
ErrUnexpectedCursorFormat = errors.New("unexpected cursor format")
)
Functions ¶
func FilterName ¶
FilterName is a suggested convenience function for clients to construct unique filter names to populate Name field of struct Filter
func NewAddressFilter ¶ added in v2.13.0
func NewAddressFilter(address common.Address) query.Expression
func NewConfirmationsFilter ¶ added in v2.13.0
func NewConfirmationsFilter(confirmations evmtypes.Confirmations) query.Expression
func NewEventByTopicFilter ¶ added in v2.13.0
func NewEventByTopicFilter(topicIndex uint64, valueComparers []primitives.ValueComparator) query.Expression
func NewEventByWordFilter ¶ added in v2.13.0
func NewEventByWordFilter(eventSig common.Hash, wordIndex uint8, valueComparers []primitives.ValueComparator) query.Expression
func NewEventSigFilter ¶ added in v2.13.0
func NewEventSigFilter(hash common.Hash) query.Expression
func NewLogPoller ¶
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracker, opts Opts) *logPoller
NewLogPoller creates a log poller. Note there is an assumption that blocks can be processed faster than they are produced for the given chain, or the poller will fall behind. Block processing involves the following calls in steady state (without reorgs):
- eth_getBlockByNumber - headers only (transaction hashes, not full transaction objects),
- eth_getLogs - get the logs for the block
- 1 db read latest block - for checking reorgs
- 1 db tx including block write and logs write to logs.
How fast that can be done depends largely on network speed and DB, but even for the fastest support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency
Types ¶
type Client ¶
type Client interface { HeadByNumber(ctx context.Context, n *big.Int) (*evmtypes.Head, error) HeadByHash(ctx context.Context, n common.Hash) (*evmtypes.Head, error) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) ConfiguredChainID() *big.Int }
type DSORM ¶ added in v2.12.0
type DSORM struct {
// contains filtered or unexported fields
}
func (*DSORM) DeleteBlocksBefore ¶ added in v2.12.0
DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks. Otherwise, it will delete all blocks at once.
func (*DSORM) DeleteExpiredLogs ¶ added in v2.12.0
func (*DSORM) DeleteFilter ¶ added in v2.12.0
DeleteFilter removes all events,address pairs associated with the Filter
func (*DSORM) DeleteLogsAndBlocksAfter ¶ added in v2.12.0
func (*DSORM) FilteredLogs ¶ added in v2.13.0
func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, _ string) ([]Log, error)
TODO flaky BCF-3258
func (*DSORM) GetBlocksRange ¶ added in v2.12.0
func (*DSORM) InsertBlock ¶ added in v2.12.0
func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error
InsertBlock is idempotent to support replays.
func (*DSORM) InsertFilter ¶ added in v2.12.0
InsertFilter is idempotent.
Each address/event pair must have a unique job id, so it may be removed when the job is deleted. If a second job tries to overwrite the same pair, this should fail.
func (*DSORM) InsertLogs ¶ added in v2.12.0
InsertLogs is idempotent to support replays.
func (*DSORM) InsertLogsWithBlock ¶ added in v2.12.0
func (*DSORM) LoadFilters ¶ added in v2.12.0
LoadFilters returns all filters for this chain
func (*DSORM) SelectBlockByHash ¶ added in v2.12.0
func (*DSORM) SelectBlockByNumber ¶ added in v2.12.0
func (*DSORM) SelectIndexedLogs ¶ added in v2.12.0
func (*DSORM) SelectIndexedLogsByBlockRange ¶ added in v2.12.0
func (o *DSORM) SelectIndexedLogsByBlockRange(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash) ([]Log, error)
SelectIndexedLogsByBlockRange finds the indexed logs in a given block range.
func (*DSORM) SelectIndexedLogsByTxHash ¶ added in v2.12.0
func (*DSORM) SelectIndexedLogsCreatedAfter ¶ added in v2.12.0
func (*DSORM) SelectIndexedLogsTopicGreaterThan ¶ added in v2.12.0
func (*DSORM) SelectIndexedLogsTopicRange ¶ added in v2.12.0
func (*DSORM) SelectIndexedLogsWithSigsExcluding ¶ added in v2.12.0
func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs evmtypes.Confirmations) ([]Log, error)
SelectIndexedLogsWithSigsExcluding query's for logs that have signature A and exclude logs that have a corresponding signature B, matching is done based on the topic index both logs should be inside the block range and have the minimum number of evmtypes.Confirmations
func (*DSORM) SelectLatestBlock ¶ added in v2.12.0
func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error)
func (*DSORM) SelectLatestBlockByEventSigsAddrsWithConfs ¶ added in v2.12.0
func (o *DSORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs evmtypes.Confirmations) (int64, error)
SelectLatestBlockByEventSigsAddrsWithConfs finds the latest block number that matches a list of Addresses and list of events. It returns 0 if there is no matching block
func (*DSORM) SelectLatestLogByEventSigWithConfs ¶ added in v2.12.0
func (*DSORM) SelectLatestLogEventSigsAddrsWithConfs ¶ added in v2.12.0
func (o *DSORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs evmtypes.Confirmations) ([]Log, error)
SelectLatestLogEventSigsAddrsWithConfs finds the latest log by (address, event) combination that matches a list of Addresses and list of events
func (*DSORM) SelectLogs ¶ added in v2.12.0
func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error)
SelectLogs finds the logs in a given block range.
func (*DSORM) SelectLogsByBlockRange ¶ added in v2.12.0
func (*DSORM) SelectLogsCreatedAfter ¶ added in v2.12.0
func (o *DSORM) SelectLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error)
SelectLogsCreatedAfter finds logs created after some timestamp.
func (*DSORM) SelectLogsDataWordBetween ¶ added in v2.12.0
func (*DSORM) SelectLogsDataWordGreaterThan ¶ added in v2.12.0
func (*DSORM) SelectLogsDataWordRange ¶ added in v2.12.0
func (*DSORM) SelectLogsWithSigs ¶ added in v2.12.0
func (o *DSORM) SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) (logs []Log, err error)
SelectLogsWithSigs finds the logs in the given block range with the given event signatures emitted from the given address.
func (*DSORM) SelectOldestBlock ¶ added in v2.12.0
type Filter ¶
type Filter struct { Name string // see FilterName(id, args) below Addresses evmtypes.AddressArray EventSigs evmtypes.HashArray // list of possible values for eventsig (aka topic1) Topic2 evmtypes.HashArray // list of possible values for topic2 Topic3 evmtypes.HashArray // list of possible values for topic3 Topic4 evmtypes.HashArray // list of possible values for topic4 Retention time.Duration // maximum amount of time to retain logs MaxLogsKept uint64 // maximum number of logs to retain ( 0 = unlimited ) LogsPerBlock uint64 // rate limit ( maximum # of logs per block, 0 = unlimited ) }
type HeadTracker ¶ added in v2.14.0
type Log ¶
type Log struct { EvmChainId *big.Big LogIndex int64 BlockHash common.Hash BlockNumber int64 BlockTimestamp time.Time Topics pq.ByteaArray EventSig common.Hash Address common.Address TxHash common.Hash Data []byte CreatedAt time.Time }
Log represents an EVM log.
type LogPoller ¶
type LogPoller interface { services.Service Healthy() error Replay(ctx context.Context, fromBlock int64) error ReplayAsync(fromBlock int64) RegisterFilter(ctx context.Context, filter Filter) error UnregisterFilter(ctx context.Context, name string) error HasFilter(name string) bool GetFilters() map[string]Filter LatestBlock(ctx context.Context) (LogPollerBlock, error) GetBlocksRange(ctx context.Context, numbers []uint64) ([]LogPollerBlock, error) FindLCA(ctx context.Context) (*LogPollerBlock, error) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error // General querying Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error) LogsWithSigs(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]Log, error) LogsCreatedAfter(ctx context.Context, eventSig common.Hash, address common.Address, time time.Time, confs evmtypes.Confirmations) ([]Log, error) LatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error) LatestLogEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs evmtypes.Confirmations) ([]Log, error) LatestBlockByEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs evmtypes.Confirmations) (int64, error) // Content based querying IndexedLogs(ctx context.Context, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs evmtypes.Confirmations) ([]Log, error) IndexedLogsByBlockRange(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash) ([]Log, error) IndexedLogsCreatedAfter(ctx context.Context, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error) IndexedLogsByTxHash(ctx context.Context, eventSig common.Hash, address common.Address, txHash common.Hash) ([]Log, error) IndexedLogsTopicGreaterThan(ctx context.Context, eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error) IndexedLogsTopicRange(ctx context.Context, eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error) IndexedLogsWithSigsExcluding(ctx context.Context, address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs evmtypes.Confirmations) ([]Log, error) LogsDataWordRange(ctx context.Context, eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error) LogsDataWordGreaterThan(ctx context.Context, eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error) LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error) // chainlink-common query filtering FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) }
type LogPollerBlock ¶
type LogPollerBlock struct { EvmChainId *big.Big BlockHash common.Hash // Note geth uses int64 internally https://github.com/ethereum/go-ethereum/blob/f66f1a16b3c480d3a43ac7e8a09ab3e362e96ae4/eth/filters/api.go#L340 BlockNumber int64 BlockTimestamp time.Time FinalizedBlockNumber int64 CreatedAt time.Time }
LogPollerBlock represents an unfinalized block used for reorg detection when polling.
func NewLogPollerBlock ¶ added in v2.8.0
type LogPollerTest ¶
type LogPollerTest interface { LogPoller PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) BackupPollAndSaveLogs(ctx context.Context) Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery GetReplayFromBlock(ctx context.Context, requested int64) (int64, error) PruneOldBlocks(ctx context.Context) (bool, error) }
type ORM ¶
type ORM interface { InsertLogs(ctx context.Context, logs []Log) error InsertLogsWithBlock(ctx context.Context, logs []Log, block LogPollerBlock) error InsertFilter(ctx context.Context, filter Filter) error LoadFilters(ctx context.Context) (map[string]Filter, error) DeleteFilter(ctx context.Context, name string) error InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error) SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error) SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) ([]Log, error) SelectLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs evmtypes.Confirmations) ([]Log, error) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs evmtypes.Confirmations) (int64, error) SelectLogsByBlockRange(ctx context.Context, start, end int64) ([]Log, error) SelectIndexedLogs(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, confs evmtypes.Confirmations) ([]Log, error) SelectIndexedLogsByBlockRange(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash) ([]Log, error) SelectIndexedLogsCreatedAfter(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs evmtypes.Confirmations) ([]Log, error) SelectIndexedLogsTopicGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error) SelectIndexedLogsTopicRange(ctx context.Context, address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs evmtypes.Confirmations) ([]Log, error) SelectIndexedLogsByTxHash(ctx context.Context, address common.Address, eventSig common.Hash, txHash common.Hash) ([]Log, error) SelectLogsDataWordRange(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error) SelectLogsDataWordGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error) SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error) // FilteredLogs accepts chainlink-common filtering DSL. FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) }
ORM represents the persistent data access layer used by the log poller. At this moment, it's a bit leaky abstraction, because it exposes some of the database implementation details (e.g. pg.Q). Ideally it should be agnostic and could be applied to any persistence layer. What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM.
type ObservedORM ¶ added in v2.7.0
type ObservedORM struct { ORM // contains filtered or unexported fields }
ObservedORM is a decorator layer for ORM used by LogPoller, responsible for pushing Prometheus metrics reporting duration and size of result set for the queries. It doesn't change internal logic, because all calls are delegated to the origin ORM
func NewObservedORM ¶ added in v2.7.0
func NewObservedORM(chainID *big.Int, ds sqlutil.DataSource, lggr logger.Logger) *ObservedORM
NewObservedORM creates an observed version of log poller's ORM created by NewORM Please see ObservedLogPoller for more details on how latencies are measured
func (*ObservedORM) DeleteBlocksBefore ¶ added in v2.7.0
func (*ObservedORM) DeleteExpiredLogs ¶ added in v2.7.0
func (*ObservedORM) DeleteFilter ¶ added in v2.7.0
func (o *ObservedORM) DeleteFilter(ctx context.Context, name string) error
func (*ObservedORM) DeleteLogsAndBlocksAfter ¶ added in v2.8.0
func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error
func (*ObservedORM) FilteredLogs ¶ added in v2.13.0
func (o *ObservedORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
func (*ObservedORM) GetBlocksRange ¶ added in v2.7.0
func (o *ObservedORM) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error)
func (*ObservedORM) InsertFilter ¶ added in v2.7.0
func (o *ObservedORM) InsertFilter(ctx context.Context, filter Filter) error
func (*ObservedORM) InsertLogs ¶ added in v2.7.0
func (o *ObservedORM) InsertLogs(ctx context.Context, logs []Log) error
func (*ObservedORM) InsertLogsWithBlock ¶ added in v2.8.0
func (o *ObservedORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block LogPollerBlock) error
func (*ObservedORM) LoadFilters ¶ added in v2.7.0
func (*ObservedORM) SelectBlockByNumber ¶ added in v2.7.0
func (o *ObservedORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlock, error)
func (*ObservedORM) SelectIndexedLogs ¶ added in v2.7.0
func (*ObservedORM) SelectIndexedLogsByBlockRange ¶ added in v2.7.0
func (*ObservedORM) SelectIndexedLogsByTxHash ¶ added in v2.8.0
func (*ObservedORM) SelectIndexedLogsCreatedAfter ¶ added in v2.7.0
func (*ObservedORM) SelectIndexedLogsTopicGreaterThan ¶ added in v2.7.0
func (*ObservedORM) SelectIndexedLogsTopicRange ¶ added in v2.7.0
func (*ObservedORM) SelectIndexedLogsWithSigsExcluding ¶ added in v2.7.0
func (*ObservedORM) SelectLatestBlock ¶ added in v2.7.0
func (o *ObservedORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error)
func (*ObservedORM) SelectLatestBlockByEventSigsAddrsWithConfs ¶ added in v2.7.0
func (*ObservedORM) SelectLatestLogByEventSigWithConfs ¶ added in v2.7.0
func (*ObservedORM) SelectLatestLogEventSigsAddrsWithConfs ¶ added in v2.7.0
func (*ObservedORM) SelectLogs ¶ added in v2.7.0
func (*ObservedORM) SelectLogsCreatedAfter ¶ added in v2.7.0
func (*ObservedORM) SelectLogsDataWordBetween ¶ added in v2.8.0
func (*ObservedORM) SelectLogsDataWordGreaterThan ¶ added in v2.7.0
func (*ObservedORM) SelectLogsDataWordRange ¶ added in v2.7.0
func (*ObservedORM) SelectLogsWithSigs ¶ added in v2.7.0
func (*ObservedORM) SelectOldestBlock ¶ added in v2.12.0
func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error)