indexer

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2024 License: GPL-3.0, Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EventZKCertificateAddition   = "zkCertificateAddition"
	EventZKCertificateRevocation = "zkCertificateRevocation"
)
View Source
const (
	MaxBlocksDistance = 10000
	SinkSize          = 100
	SinkProgressTick  = 5 * time.Second

	BlockTypeLength = 8
)
View Source
const (
	ContractZkCertificateRegistry       Contract = 1
	ContractZkCertificateRegistryString string   = "ZkCertificateRegistry"

	ContractTypeLength = 1
)

Variables

This section is empty.

Functions

func OperationsBufferFromContext added in v0.2.0

func OperationsBufferFromContext(ctx context.Context) (*zkregistry.OperationsBuffer, bool)

OperationsBufferFromContext returns the leaves buffer from the context.

func WithOperationsBuffer added in v0.2.0

func WithOperationsBuffer(ctx context.Context, buffer *zkregistry.OperationsBuffer) context.Context

WithOperationsBuffer sets the leaves buffer in the context.

Types

type CommitFunc

type CommitFunc func(ctx context.Context, batch db.Batch, block uint64) error

CommitFunc is called after each time indexer queries and handles historical events in some block range. It passes block number of the end of this range, which means that this is the highest block number that was indexed.

type Config added in v0.2.0

type Config struct {
	MaxBlocksDistance uint64        `yaml:"max_blocks_distance"`
	SinkChannelSize   uint          `yaml:"sink_channel_size"`
	SinkProgressTick  time.Duration `yaml:"sink_progress_tick"`
}

type Configurator

type Configurator struct {
	// contains filtered or unexported fields
}

Configurator manages set of currently active EVMLogsIndexer jobs. It allows to dynamically start and stop new jobs.

func InitConfiguratorFromStorage

func InitConfiguratorFromStorage(
	ctx context.Context,
	jobStorage *JobStorage,
	jobHandlersProducer JobHandlersProducer,
	evmIndexer EVMIndexer,
	activeSet []JobDescriptor,
	logger log.Logger,
) (*Configurator, error)

InitConfiguratorFromStorage creates new instance of configurator, and resumes jobs from the last known block. JobStorage provides a list of jobs and their corresponding last known blocks.

func NewConfigurator

func NewConfigurator(
	jobStorage *JobStorage,
	jobHandlersProducer JobHandlersProducer,
	indexer EVMIndexer,
	logger log.Logger,
) *Configurator

func (*Configurator) ReloadConfig

func (c *Configurator) ReloadConfig(ctx context.Context, config []JobDescriptor) error

ReloadConfig starts new and stops active jobs based on the provided config. Every job that was started but is not in the provided config, will be stopped. This method uses shallow comparison (a == b) of two JobDescriptor to determine if they are equal.

func (*Configurator) StartJob

func (c *Configurator) StartJob(ctx context.Context, jobDescriptor JobDescriptor, startBlock uint64) error

StartJob starts new job for the indexer. Additionally, it's responsible for saving progress (last known block) of this job, so it can resume paused job later.

func (*Configurator) StopJob

func (c *Configurator) StopJob(ctx context.Context, store db.Batch, job JobDescriptor) error

StopJob cancels execution of the specified job. It also removes progress (last known block) from the storage, so the job wouldn't resume after restart.

func (*Configurator) Wait

func (c *Configurator) Wait() chan error

type Contract

type Contract uint8

func ContractFromString

func ContractFromString(s string) (Contract, error)

func (Contract) MarshalJSON

func (s Contract) MarshalJSON() ([]byte, error)

func (Contract) MarshalText

func (s Contract) MarshalText() ([]byte, error)

func (Contract) MarshalYAML

func (s Contract) MarshalYAML() (interface{}, error)

func (Contract) String

func (s Contract) String() string

func (*Contract) UnmarshalJSON

func (s *Contract) UnmarshalJSON(data []byte) error

func (*Contract) UnmarshalText

func (s *Contract) UnmarshalText(text []byte) error

func (*Contract) UnmarshalYAML

func (s *Contract) UnmarshalYAML(unmarshal func(interface{}) error) error

type DBBatchCreator

type DBBatchCreator interface {
	NewBatch() db.Batch
}

type EVMIndexer

type EVMIndexer interface {
	IndexEVMLogs(ctx context.Context, query ethereum.FilterQuery, startBlock uint64, handler JobHandler) error
}

type EVMLogHandler

type EVMLogHandler func(ctx context.Context, log types.Log) error

EVMLogHandler is called for every log that was retrieved from the blockchain.

  1. It's guaranteed that the handler will (almost always) be called for consecutive logs in the chronological order.
  2. It's guaranteed that the handler will be called firstly for every historical log, then for live subscribed logs.

EVMLogsIndexer may call this handler for duplicating logs if the live subscription will return the same part of logs as the last historical query. Thus, it's the one and the only case, when rule 1 will be violated.

type EthereumClient

type EthereumClient interface {
	ethereum.BlockNumberReader
	ethereum.LogFilterer
}

type EvmHandlers

type EvmHandlers struct {
	// contains filtered or unexported fields
}

func NewEVMJob

func NewEVMJob(
	prepCtxHandler PrepareContextFunc,
	evmLogHandler EVMLogHandler,
	committer CommitFunc,
	filterQuery FilterQueryFunc,
) *EvmHandlers

func (*EvmHandlers) Commit

func (h *EvmHandlers) Commit(ctx context.Context, batch db.Batch, block uint64) error

func (*EvmHandlers) FilterQuery

func (h *EvmHandlers) FilterQuery() (ethereum.FilterQuery, error)

func (*EvmHandlers) HandleEVMLog

func (h *EvmHandlers) HandleEVMLog(ctx context.Context, log types.Log) error

func (*EvmHandlers) PrepareContext

func (h *EvmHandlers) PrepareContext(ctx context.Context) (context.Context, error)

type FilterQueryFunc

type FilterQueryFunc func() (ethereum.FilterQuery, error)

FilterQueryFunc returns a filter query that should be used for subscribing to new logs.

type Indexer

type Indexer struct {
	// contains filtered or unexported fields
}

Indexer indexes events by subscribing to new and filtering historical events using EthereumClient.

func NewEVMIndexer

func NewEVMIndexer(
	client EthereumClient,
	config Config,
	logger log.Logger,
) *Indexer

func (*Indexer) IndexEVMLogs

func (ixr *Indexer) IndexEVMLogs(ctx context.Context, query ethereum.FilterQuery, startBlock uint64, handler JobHandler) error

IndexEVMLogs indexed events logs using the provided query from the provided start block. Because of subscription, it blocks forever unless the context will be cancelled or some error will arise.

type Job

type Job struct {
	JobDescriptor
	CurrentBlock uint64
}

func (*Job) String

func (j *Job) String() string

String returns a string representation of the Job.

type JobContext

type JobContext struct {
	CancelFunc context.CancelFunc // Function that cancels execution of the corresponding job.
	WG         *errgroup.Group    // WG stores error of the job and provides a method to wait until the job finishes.
}

JobContext holds an execution context of a single job.

type JobDescriptor

type JobDescriptor struct {
	Address  common.Address `json:"address" yaml:"address"`   // Address of smart contract that emits events.
	Contract Contract       `json:"contract" yaml:"contract"` // Contract determines contract's name to Indexer subscribes.

	// First block to query for events.
	// Usually it's a block number when the smart contract was deployed or the first event was emitted.
	StartBlock uint64 `json:"start_block" yaml:"start_block"`
}

JobDescriptor uniquely determines a job. Speaking in RDBMS terms, each field is a part of a composite primary key.

func (*JobDescriptor) String

func (j *JobDescriptor) String() string

String returns a string representation of the Job.

type JobFactory

type JobFactory struct {
	// contains filtered or unexported fields
}

JobFactory is a factory that produces event handlers based on provided contract.

func NewJobFactory

func NewJobFactory(
	client bind.ContractFilterer,
	registryService *zkregistry.Service,
	jobUpdater JobUpdater,
	batchCreator DBBatchCreator,
	logger log.Logger,
) *JobFactory

func (*JobFactory) Produce

func (f *JobFactory) Produce(ctx context.Context, jobDescriptor JobDescriptor) (JobHandler, error)

Produce creates a new event handler based on provided job descriptor.

type JobHandler

type JobHandler interface {
	PrepareContext(ctx context.Context) (context.Context, error)
	HandleEVMLog(ctx context.Context, log types.Log) error
	Commit(ctx context.Context, block uint64) error
	FilterQuery() (ethereum.FilterQuery, error)
}

type JobHandlersProducer

type JobHandlersProducer interface {
	Produce(ctx context.Context, jobDescriptor JobDescriptor) (JobHandler, error)
}

type JobStorage

type JobStorage struct {
	db.DB
}

func NewJobStorage

func NewJobStorage(db db.DB) *JobStorage

func (*JobStorage) DeleteJob

func (q *JobStorage) DeleteJob(_ context.Context, store StoreDeleter, jobDescriptor JobDescriptor) error

DeleteJob deletes a job from the storage.

func (*JobStorage) SelectAllJobs

func (q *JobStorage) SelectAllJobs(ctx context.Context) ([]Job, error)

SelectAllJobs returns all jobs stored in the storage.

func (*JobStorage) SelectJob added in v0.2.0

func (q *JobStorage) SelectJob(_ context.Context, jobDescriptor JobDescriptor) (*Job, error)

SelectJob returns a job by its descriptor.

func (*JobStorage) UpsertJob

func (q *JobStorage) UpsertJob(_ context.Context, store StoreSetter, job Job) error

UpsertJob inserts or updates a job in the storage.

type JobUpdater

type JobUpdater interface {
	UpsertJob(_ context.Context, store StoreSetter, job Job) error
}

type PrepareContextFunc

type PrepareContextFunc func(ctx context.Context) (context.Context, error)

PrepareContextFunc prepares storage.Context for handling events. It should be called before handling events. It's called once for every block range that is queried.

type StoreDeleter

type StoreDeleter interface {
	Delete(key []byte) error
}

type StoreSetter

type StoreSetter interface {
	Set(key, value []byte) error
}

type ZkCertificateRegistryJob

type ZkCertificateRegistryJob struct {
	// contains filtered or unexported fields
}

func NewZkCertificateRegistryJob added in v0.2.0

func NewZkCertificateRegistryJob(
	jobDescriptor JobDescriptor,
	jobUpdater JobUpdater,
	dbBatchCreator DBBatchCreator,
	registry *zkregistry.ZKCertificateRegistry,
	logger log.Logger,
) *ZkCertificateRegistryJob

func (*ZkCertificateRegistryJob) Commit

func (job *ZkCertificateRegistryJob) Commit(ctx context.Context, block uint64) error

Commit commits the leaves buffer to the database.

func (*ZkCertificateRegistryJob) FilterQuery

func (job *ZkCertificateRegistryJob) FilterQuery() (ethereum.FilterQuery, error)

FilterQuery returns the filter query for the job to listen to the contract events.

func (*ZkCertificateRegistryJob) HandleEVMLog

func (job *ZkCertificateRegistryJob) HandleEVMLog(ctx context.Context, log types.Log) error

HandleEVMLog handles the EVM log and updates the leaves buffer.

func (*ZkCertificateRegistryJob) PrepareContext

func (job *ZkCertificateRegistryJob) PrepareContext(ctx context.Context) (context.Context, error)

PrepareContext prepares the context for the job.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL