Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrFinishedFetchBlockLoop = errors.New("finished FetchBlockLoop")
Functions ¶
Types ¶
type ErrorState ¶
type ErrorState struct { RPCErrors []ErrorInfo DuneErrors []ErrorInfo RPCErrorCount int DuneErrorCount int }
func (*ErrorState) ObserveDuneError ¶
func (es *ErrorState) ObserveDuneError(err ErrorInfo)
func (*ErrorState) ObserveRPCError ¶
func (es *ErrorState) ObserveRPCError(err ErrorInfo)
func (*ErrorState) Reset ¶
func (es *ErrorState) Reset()
type Info ¶
type Info struct { BlockchainName string Stack string LatestBlockNumber int64 IngestedBlockNumber int64 ConsumedBlockNumber int64 Errors ErrorState Since time.Time }
func (Info) ProgressReportErrors ¶
func (info Info) ProgressReportErrors() []models.BlockchainIndexError
ProgressReportErrors returns a combined list of errors from RPC requests and Dune requests
func (*Info) ResetErrors ¶
func (info *Info) ResetErrors()
func (*Info) ToProgressReport ¶
func (info *Info) ToProgressReport() models.BlockchainIndexProgress
type Ingester ¶
type Ingester interface { // Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested Run(ctx context.Context, startBlockNumber int64, maxCount int64) error // ProduceBlockNumbers sends block numbers from startBlockNumber to endBlockNumber to outChan, inclusive. // If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain // it will run continuously until the context is cancelled ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber int64, endBlockNumber int64) error // FetchBlockLoop fetches blocks sent on the channel and sends them on the other channel. // It will run continuously until the context is cancelled, or the channel is closed. // It can safely be run concurrently. FetchBlockLoop(context.Context, chan int64, chan models.RPCBlock) error // SendBlocks consumes RPCBlocks from the channel, reorders them, and sends batches to DuneAPI in an endless loop // it will block until: // - the context is cancelled // - channel is closed // - a fatal error occurs SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error // ProduceBlockNumbersDLQ sends block numbers from the DLQ to outChan. // It will run continuously until the context is cancelled. // When the DLQ does not return an eligible next block, it waits for PollDLQInterval before trying again ProduceBlockNumbersDLQ(ctx context.Context, outChan chan dlq.Item[int64]) error // FetchBlockLoopDLQ fetches blocks sent on the channel and sends them on the other channel. // It will run continuously until the context is cancelled, or the channel is closed. // It can safely be run concurrently. FetchBlockLoopDLQ(ctx context.Context, blockNumbers <-chan dlq.Item[int64], blocks chan<- dlq.Item[models.RPCBlock], ) error // SendBlocksDLQ pushes one RPCBlock at a time to DuneAPI in the order they are received in SendBlocksDLQ(ctx context.Context, blocks <-chan dlq.Item[models.RPCBlock]) error Close() error }
func New ¶
func New( log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, duneDLQ duneapi.BlockchainIngester, cfg Config, progress *models.BlockchainIndexProgress, dlq *dlq.DLQ[int64], ) Ingester
Click to show internal directories.
Click to hide internal directories.