Documentation ¶
Index ¶
- Variables
- func LastGoodBlock(result *PollerResult) uint64
- func WrapErrRecordNotFound(err error, keyNotFound string) error
- type Artifact
- type BaseServiceEngine
- type Block
- type BlockHeader
- type BlockHeaderWrapper
- type Config
- type Controller
- type Emitter
- type EmitterClient
- type EmitterPoller
- type Engine
- type EthClient
- type EthClientRPC
- type FuncGetLastRecordedBlock
- type FuncSetLastRecordedBlock
- type GetStateDataGateway
- type Policy
- type PollerResult
- type ServiceEngine
- type SetStateDataGateway
- type StateDataGateway
- type SuperWatcher
- type ThinServiceEngine
Constants ¶
This section is empty.
Variables ¶
var ( // Ethereum node fetch error ErrFetchError = errors.New("fetch from ethclient failed") // Chain is reorging - will not cause a return from emitter // e.g. when logs filtered from the same block has different hashes ErrChainIsReorging = errors.New("chain is reorging and data is not usable for now") ErrFromBlockReorged = errors.Wrap(ErrChainIsReorging, "fromBlock reorged") // Bug from my own part ErrSuperwatcherBug = errors.New("superwatcher bug") ErrProcessReorg = errors.Wrap(ErrSuperwatcherBug, "error in emitter reorg detection logic") // Bug in reorg detection logic // User violates some rules/policies, e.g. downgrading poller Policy ErrUserError = errors.New("user error") ErrBadPolicy = errors.Wrap(ErrUserError, "invalid policy") )
var ErrRecordNotFound = errors.New("record not found")
ErrRecordNotFound is checked for in emitter.loopEmit. If the error is ErrRecordNotFound, the emitter assumes the service has never run on this host (hence no data in the database), and will not attempt to go back.
Functions ¶
func LastGoodBlock ¶
func LastGoodBlock( result *PollerResult, ) uint64
LastGoodBlock computes `PollerResult.LastGoodBlock` based on |result|.
func WrapErrRecordNotFound ¶
Types ¶
type BaseServiceEngine ¶
BaseServiceEngine is shared by ServiceEngine and ServiceThinEngine
type Block ¶
type Block struct { // LogsMigrated indicates whether all interesting logs were moved/migrated // _from_ this block after a chain reorg or not. The field is primarily used // by EmitterPoller to trigger the poller to get new, fresh block hash for a block. // The field should always be false if the Block is in PollerResult.GoodBlocks. LogsMigrated bool `json:"logsMigrated"` Number uint64 `json:"number"` Hash common.Hash `json:"hash"` Header BlockHeader `json:"-"` Logs []*types.Log `json:"logs"` }
Block represents the minimum block info needed for superwatcher. Block data can be retrieved from Block itself or its Header field.
type BlockHeader ¶
type BlockHeader interface { Number() uint64 Hash() common.Hash Nonce() types.BlockNonce Time() uint64 GasLimit() uint64 GasUsed() uint64 }
BlockHeader is implemented by `blockHeaderWrapper` and `*reorgsim.Block`. It is used in place of *types.Header to make writing tests with reorgsim easier. More methods may be added as our needs for data from the headers grow, or we (i.e. you) can mock the actual *types.Header in reorgsim instead :)
type BlockHeaderWrapper ¶
BlockHeaderWrappers wrap *types.Header to implenent BlockHeader
func (BlockHeaderWrapper) GasLimit ¶
func (h BlockHeaderWrapper) GasLimit() uint64
func (BlockHeaderWrapper) GasUsed ¶
func (h BlockHeaderWrapper) GasUsed() uint64
func (BlockHeaderWrapper) Hash ¶
func (h BlockHeaderWrapper) Hash() common.Hash
func (BlockHeaderWrapper) Nonce ¶
func (h BlockHeaderWrapper) Nonce() types.BlockNonce
func (BlockHeaderWrapper) Number ¶
func (h BlockHeaderWrapper) Number() uint64
func (BlockHeaderWrapper) Time ¶
func (h BlockHeaderWrapper) Time() uint64
type Config ¶
type Config struct { // External dependencies NodeURL string `mapstructure:"node_url" yaml:"node_url" json:"nodeURL"` // StartBlock is the shortest block height the emitter will consider as base, usually a contract's genesis block StartBlock uint64 `mapstructure:"start_block" yaml:"start_block" json:"startBlock"` // FilterRange is the forward range (number of new blocks) each call to emitter.poller.poll will perform FilterRange uint64 `mapstructure:"filter_range" yaml:"filter_range" json:"filterRange"` // DoReorg specifies whether superwatcher superwatcher.EmitterPoller will process chain reorg for PollerResult DoReorg bool `mapstructure:"do_reorg" yaml:"do_reorg" json:"doReorg"` // DoHeader specifies whether superwatcher.EmitterPoller should fetch block headers too DoHeader bool `mapstructure:"do_header" yaml:"do_header" json:"doHeader"` // MaxGoBackRetries is the maximum number of blocks the emitter will go back for. Once this is reached, // the emitter exits on error ErrMaxRetriesReached MaxGoBackRetries uint64 `mapstructure:"max_go_back_retries" yaml:"max_go_back_retries" json:"maxGoBackRetries"` // LoopInterval is the number of seconds the emitter sleeps after each call to emitter.poller.poll LoopInterval uint64 `mapstructure:"loop_interval" yaml:"loop_interval" json:"loopInterval"` // LogLevel for debugger.Debugger, the higher the more verbose LogLevel uint8 `mapstructure:"log_level" yaml:"log_level" json:"logLevel"` // Policy is for configuring EmitterPoller behavior Policy Policy `mapstructure:"policy" yaml:"policy" json:"policy"` }
Config is superwatcher-wide configuration
type Controller ¶
type Controller interface { // SetDoReorg makes the EmitterPoller engage chain reorg detection logic SetDoReorg(bool) // DoReorg returns if EmitterPoller is currently processing chain reorg inside EmitterPoller.Poll DoReorg() bool // SetDoHeader makes the EmitterPoller fetch block header for every block with interesting logs SetDoHeader(bool) // DoHeader returns if the EmitterPoller will fetch block headers for blocks with interesting logs DoHeader() bool // Addresses reads EmitterPoller's current event log addresses for filter query Addresses() []common.Address // Topics reads EmitterPoller's current event log topics for filter query Topics() [][]common.Hash // AddAddresses adds (appends) addresses to EmitterPoller's filter query AddAddresses(...common.Address) // AddTopics adds (appends) topics to EmitterPoller's filter query AddTopics(...[]common.Hash) // SetAddresses changes EmitterPoller's event log addresses on-the-fly SetAddresses([]common.Address) // SetTopics changes EmitterPoller's event log topics on-the-fly SetTopics([][]common.Hash) }
Controller gives users the means and methods to change some of EmitterPoller parameters
type Emitter ¶
type Emitter interface { // Loop is the entry point for Emitter. // Users will call Loop in a different loop than Engine.Loop // to make both components run concurrently. Loop(context.Context) error // SyncsEngine waits until engine is done processing the last batch SyncsEngine() // Shutdown closes emitter channels Shutdown() // Poller returns the current Poller in use by Emitter Poller() EmitterPoller // SetPoller overwrites emitter's Poller with a new one SetPoller(EmitterPoller) }
Emitter receives results from Poller and emits them to Engine. Emitter is aware of the current service states (via StateDataGateway), and uses that information to determine fromBlock and toBlock for Poller.Poll.
type EmitterClient ¶
type EmitterClient interface { // WatcherResult returns result from Emitter to caller WatcherResult() *PollerResult // WatcherError returns error sent by Emitter WatcherError() error // WatcherConfig returns config used to create its Emitter WatcherConfig() *Config // SyncsEmitter sends sync signal to Emitter so it can continue SyncsEmitter() // Shutdown closes Emitter comms channels Shutdown() }
EmitterClient interfaces with Emitter. It can help abstract the complexity of receiving of channel data away from Engine. It can be ignored by superwatcher users if they are not implementing their own Engine.
type EmitterPoller ¶
type EmitterPoller interface { // Poll polls event logs from fromBlock to toBlock, and process the logs into *PollerResult for Emitter Poll(ctx context.Context, fromBlock, toBlock uint64) (*PollerResult, error) // Policy gets current Policy Policy() Policy // SetPolicy sets new Policy (NOTE: changing Policy mid-run not tested) SetPolicy(Policy) error // EmitterPoller also implements Controller Controller }
EmitterPoller fetches event logs and other blockchain data and maps it into *PollerResult. The result of EmitterPoller.poll is later used by Emitter to emit to Engine.
type Engine ¶
type Engine interface { // Loop is the entry point for Engine. // Call it in a different Goroutine than Emitter.Loop to make both run concurrently. Loop(context.Context) error }
Engine receives PollerResult emitted from Emitter and executes business service logic on PollerResult with ServiceEngine.
type EthClient ¶
type EthClient interface { BlockNumber(context.Context) (uint64, error) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) HeaderByNumber(context.Context, *big.Int) (BlockHeader, error) EthClientRPC // EthClient will need to be able to do batch RPC calls }
EthClient defines all Ethereum client methods used in superwatcher. HeaderByNumber returns BlockHeader because if it uses the actual *types.Header then the mock client in `reorgsim` would have to mock types.Header too, which is an overkill for now.
type EthClientRPC ¶
EthClientRPC is used by poller to get data from client in batch, e.g. when getting blocks or block headers in batch
type GetStateDataGateway ¶
GetStateDataGateway is used by the emitter to get last recorded block.
func GetStateDataGatewayFunc ¶
func GetStateDataGatewayFunc(f FuncGetLastRecordedBlock) GetStateDataGateway
type Policy ¶
type Policy uint8
Policy (enum) specifies how EmitterPoller considers which blocks to include in its _tracking list_. For every block in this _tracking list_, the poller compares the saved block hash with newly polled one.
const ( // PolicyFast makes poller only process and track blocks with interesting logs. // Hashes from blocks without logs are not processed, unless they were reorged // and had their logs removed, in which case the poller gets their headers _once_ // to check their newer hashes, and remove the empty block from tracking list. PolicyFast Policy = iota // PolicyNormal makes poller only process and track blocks with interesting logs, // but if the poller detects that a block has its logs removed, it will process // and start tracking that block until the block goes out of poller scope. // The difference between PolicyFast and PolicyNormal // that PolicyNormal will keep tracking the reorged empty blocks. PolicyNormal // PolicyExpensive makes poller process and track all blocks' headers, // regardless of whether the blocks have interesting logs or not, or Config.DoHeader value. PolicyExpensive // PolicyFastBlock behaves like PolicyExpensive, but instead of fetching // event logs and headers, the poller fetches event logs and blocks. PolicyExpensiveBlock )
type PollerResult ¶
type PollerResult struct { FromBlock uint64 // The poller's `fromBlock` ToBlock uint64 // The poller's `toBlock` LastGoodBlock uint64 // This number should be saved to StateDataGateway with SetLastRecordedBlock for the emitter GoodBlocks []*Block // Can be either (1) fresh, new blocks, or (2) blocks whose hashes had not changed yet. ReorgedBlocks []*Block // Blocks that poller marked as removed. A service should undo/revert its actions done on the blocks. }
PollerResult is created in Poller.Poll, and emitted by Emitter to Engine.
type ServiceEngine ¶
type ServiceEngine interface { BaseServiceEngine // HandleGoodLogs handles new, canonical `Block`s. The return type is map of blockHash to []Artifact HandleGoodBlocks([]*Block, []Artifact) (map[common.Hash][]Artifact, error) // HandleReorgedLogs handles reorged (removed) `Block`s. The return type is map of blockHash to []Artifact HandleReorgedBlocks([]*Block, []Artifact) (map[common.Hash][]Artifact, error) }
ServiceEngine is embedded and injected into Engine to perform business logic. It is the preferred way to use superwatcher
type SetStateDataGateway ¶
SetStateDataGateway is used by the engine to set last recorded block.
func SetStateDataGatewayFunc ¶
func SetStateDataGatewayFunc(f FuncSetLastRecordedBlock) SetStateDataGateway
type StateDataGateway ¶
type StateDataGateway interface { GetStateDataGateway SetStateDataGateway }
StateDataGateway is an interface that could both set and get lastRecordedBlock for superwatcher. Note: Graceful shutdowns for the StateDataGateway should be performed by service code.
type SuperWatcher ¶
type SuperWatcher interface { // Run is the entry point for SuperWatcher Run(context.Context, context.CancelFunc) error Emitter() Emitter Engine() Engine Shutdown() Controller }
type ThinServiceEngine ¶
type ThinServiceEngine interface { BaseServiceEngine HandleFilterResult(*PollerResult) error }
ThinServiceEngine is embedded and injected into thinEngine, a thin implementation of Engine without managed states. It is recommended for niche use cases and advanced users