ingester

package
v0.0.0-...-98adc55 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrFinishedFetchBlockLoop = errors.New("finished FetchBlockLoop")

Functions

func AddBlockGaps

func AddBlockGaps(dlq *dlq.DLQ[int64], gaps []models.BlockGap)

Types

type Config

type Config struct {
	MaxConcurrentBlocks    int
	DLQMaxConcurrentBlocks int
	PollInterval           time.Duration
	PollDLQInterval        time.Duration
	ReportProgressInterval time.Duration
	Stack                  models.EVMStack
	BlockchainName         string
	BlockSubmitInterval    time.Duration
	SkipFailedBlocks       bool
	DLQOnly                bool
	MaxBatchSize           int
}

type ErrorInfo

type ErrorInfo struct {
	Timestamp    time.Time
	BlockNumbers string
	Error        error
}

type ErrorState

type ErrorState struct {
	RPCErrors      []ErrorInfo
	DuneErrors     []ErrorInfo
	RPCErrorCount  int
	DuneErrorCount int
}

func (*ErrorState) ObserveDuneError

func (es *ErrorState) ObserveDuneError(err ErrorInfo)

func (*ErrorState) ObserveRPCError

func (es *ErrorState) ObserveRPCError(err ErrorInfo)

func (*ErrorState) Reset

func (es *ErrorState) Reset()

type Info

type Info struct {
	BlockchainName      string
	Stack               string
	LatestBlockNumber   int64
	IngestedBlockNumber int64
	ConsumedBlockNumber int64
	Errors              ErrorState
	Since               time.Time
}

func NewInfo

func NewInfo(blockchain string, stack string) Info

func (Info) ProgressReportErrors

func (info Info) ProgressReportErrors() []models.BlockchainIndexError

ProgressReportErrors returns a combined list of errors from RPC requests and Dune requests

func (*Info) ResetErrors

func (info *Info) ResetErrors()

func (*Info) ToProgressReport

func (info *Info) ToProgressReport() models.BlockchainIndexProgress

type Ingester

type Ingester interface {
	// Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested
	Run(ctx context.Context, startBlockNumber int64, maxCount int64) error

	// ProduceBlockNumbers sends block numbers from startBlockNumber to endBlockNumber to outChan, inclusive.
	// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
	// it will run continuously until the context is cancelled
	ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber int64, endBlockNumber int64) error

	// FetchBlockLoop fetches blocks sent on the channel and sends them on the other channel.
	// It will run continuously until the context is cancelled, or the channel is closed.
	// It can safely be run concurrently.
	FetchBlockLoop(context.Context, chan int64, chan models.RPCBlock) error

	// SendBlocks consumes RPCBlocks from the channel, reorders them, and sends batches to DuneAPI in an endless loop
	// it will block until:
	//	- the context is cancelled
	//  - channel is closed
	//  - a fatal error occurs
	SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error

	// ProduceBlockNumbersDLQ sends block numbers from the DLQ to outChan.
	// It will run continuously until the context is cancelled.
	// When the DLQ does not return an eligible next block, it waits for PollDLQInterval before trying again
	ProduceBlockNumbersDLQ(ctx context.Context, outChan chan dlq.Item[int64]) error

	// FetchBlockLoopDLQ fetches blocks sent on the channel and sends them on the other channel.
	// It will run continuously until the context is cancelled, or the channel is closed.
	// It can safely be run concurrently.
	FetchBlockLoopDLQ(ctx context.Context,
		blockNumbers <-chan dlq.Item[int64],
		blocks chan<- dlq.Item[models.RPCBlock],
	) error

	// SendBlocksDLQ pushes one RPCBlock at a time to DuneAPI in the order they are received in
	SendBlocksDLQ(ctx context.Context, blocks <-chan dlq.Item[models.RPCBlock]) error

	Close() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL