indexer

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2024 License: MIT Imports: 27 Imported by: 0

Documentation

Overview

package indexer provides an indexing service that integrates various types of interceptors to handle data synchronization and state management across blockchain networks. It utilizes goroutines to handle high volumes of data efficiently and implements mechanisms for batch processing and error handling to ensure data integrity and performance.

Index

Constants

View Source
const DefaultBulkSize = 30

DefaultBulkSize specifies the default number of account events processed in a single batch.

View Source
const DefaultBulkTimeout = 2 * time.Second

DefaultBulkTimeout specifies the duration to wait before processing a batch of account events.

Variables

View Source
var TransactionsSubscriber subscribers.SubscriberType = "transactions"

TransactionsSubscriber is a constant SubscriberType used to subscribe to new block transaction events.

Functions

func InjectSubscribers

func InjectSubscribers(service *Service, network utils.Network, networkId utils.NetworkID) error

InjectSubscribers will look into map of the subscribers and attempt to register them against subscriber manager. In case of any issues, it will sequentially fail. It's deliberate to fail on sequence instead of spawning goroutines and returning back errors.

func RegisterService

func RegisterService(ctx *cli.Context, baseService service.Service) (service.Service, error)

RegisterService initializes and registers a new indexer service using the provided CLI context and an existing base service. This function is typically used to setup the service from command line interface options, adapting the configuration to the operational needs of the indexer.

func UnpackerInterceptor

func UnpackerInterceptor(srv *Service) func(event *events.Unpack) error

UnpackerInterceptor returns a closure that processes Unpack events by initializing and managing contract instances. This function sets up the necessary environment to interpret blockchain events, ensuring that each event is unpacked and analyzed accurately. It retrieves or discovers necessary blockchain information (like block, transaction, and receipt details), unpacks the contract data using the provided service's unpacker, and finally, handles errors and panics gracefully.

Types

type AccountInterceptor

type AccountInterceptor struct {
	*service.BaseService
	// contains filtered or unexported fields
}

AccountInterceptor handles the collection and processing of account events, utilizing channels and batch processing to optimize database interactions.

func NewAccountInterceptor

func NewAccountInterceptor(ctx context.Context, svc *service.BaseService, opts *options.Subscriber) (*AccountInterceptor, error)

NewAccountInterceptor constructs a new AccountInterceptor using a BaseService and Subscriber options. It initializes channels based on the provided buffer size in options.

func (*AccountInterceptor) Publish

func (ai *AccountInterceptor) Publish(event *events.Account) error

Publish sends an account event to a NATS subject. This method serializes the Account event and publishes it to the configured NATS subject, handling network communication and serialization errors. The event is prepared for transmission by converting it into a binary format, which is then published to a specified NATS subject.

func (*AccountInterceptor) PublishAddress

func (ai *AccountInterceptor) PublishAddress(network utils.Network, networkId utils.NetworkID, blockNumber *big.Int, txHash common.Hash, address common.Address) error

PublishAddress creates an account event from provided details and publishes it using the Publish method. This is a convenience function that assembles account information into an event structure and then delegates the publishing process to the Publish method, ensuring that the event is sent to the configured NATS subject. This method is useful for scenarios where account data is readily available and needs to be quickly converted to an event and published.

func (*AccountInterceptor) QueueAddress

func (ai *AccountInterceptor) QueueAddress(network utils.Network, networkId utils.NetworkID, blockNumber *big.Int, txHash common.Hash, address common.Address) error

QueueAddress enqueues a new account event from its components. This is a convenience method that wraps QueueEvent for specific account details.

func (*AccountInterceptor) QueueEvent

func (ai *AccountInterceptor) QueueEvent(event *events.Account) error

QueueEvent enqueues a new account event for processing. This method places the event into the accountsCh channel unless the context has been canceled.

func (*AccountInterceptor) Start

func (ai *AccountInterceptor) Start() error

Start initializes the AccountInterceptor's processing routines. It starts a specified number of workers to process incoming events concurrently.

func (*AccountInterceptor) Unpack

func (ai *AccountInterceptor) Unpack(event *events.Account) (*models.Account, error)

Unpack converts an event.Account into a models.Account using the account unpacker. It uses a separate context with a timeout to handle each unpack operation, ensuring the process does not hang indefinitely.

type BlockInterceptor

type BlockInterceptor struct {
	*Service
	// contains filtered or unexported fields
}

BlockInterceptor manages the interception and processing of blockchain block events. It utilizes a worker pool to handle events concurrently and forwards blocks to the appropriate transaction interceptors for further processing.

func NewBlockInterceptor

func NewBlockInterceptor(ctx context.Context, svc *Service, txInterceptor *TransactionInterceptor, opts *options.Subscriber) (*BlockInterceptor, error)

NewBlockInterceptor creates a new BlockInterceptor. This constructor requires a Service, TransactionInterceptor, and Subscriber options to set up the interceptor. It returns an error if any input is nil or the options are invalid.

func (*BlockInterceptor) Ch

func (i *BlockInterceptor) Ch() chan *events.Block

Ch returns the channel that receives block events for processing.

func (*BlockInterceptor) QueueBlockFn

func (i *BlockInterceptor) QueueBlockFn(network utils.Network, networkId utils.NetworkID, direction helpers.SyncDirection) subscribers.BlockHookFn

QueueBlockFn returns a function that queues a block for processing based on the given network and sync direction. This function is typically used as a callback in blockchain event listeners to forward blocks to the interceptor.

func (*BlockInterceptor) Start

func (i *BlockInterceptor) Start() error

Start initializes the block processing workers. This method starts the worker pool which listens for new blocks and processes them.

func (*BlockInterceptor) Stop

func (i *BlockInterceptor) Stop() error

Stop halts the processing of block events and closes the event channel. It ensures a graceful shutdown of the block interceptor's workers.

func (*BlockInterceptor) Unpack

func (i *BlockInterceptor) Unpack(event *events.Block) error

Unpack processes an individual block event, delegating transaction events to the transaction interceptor. This method is responsible for unpacking block data and initiating transaction processing through the transaction interceptor.

type Service

type Service struct {
	*service.BaseService
	// contains filtered or unexported fields
}

Service extends the BaseService with specific interceptors to manage blockchain data indexing such as accounts, transactions, and blocks.

func NewService

func NewService(ctx *cli.Context, baseService *service.BaseService) (*Service, error)

NewService creates and returns a new Service instance configured with necessary interceptors. This setup involves detailed configuration fetching and initialization of interceptors.

func ToService

func ToService(s service.Service) *Service

ToService casts a generic service.Service to a *Service.

func (*Service) Dependencies

func (s *Service) Dependencies() map[service.DependencyName]service.Option

Dependencies returns a map of service dependencies, currently returns an empty map as placeholders for potential future dependencies.

func (*Service) GetAccountInterceptor

func (s *Service) GetAccountInterceptor() *AccountInterceptor

GetAccountInterceptor returns the AccountInterceptor associated with the service.

func (*Service) Start

func (s *Service) Start(network utils.Network, networkId utils.NetworkID) error

Start initializes and starts all components of the Service. It sets up interceptors and begins processing blockchain data based on the provided network settings.

func (*Service) Stop

func (s *Service) Stop() error

Stop gracefully shuts down the service without any specific cleanup logic.

func (*Service) Unpack

func (s *Service) Unpack(ctx *cli.Context) error

Unpack processes a smart contract based on the Ethereum address and network provided via the CLI context. It initializes a new contract instance, discovers essential chain information, and handles the unpacking process. It also includes error handling and panic recovery mechanisms to ensure stability.

func (*Service) UnpackFromEntry

func (s *Service) UnpackFromEntry(ctx context.Context, entry *entries.Entry, state machine.State) (*unpacker.Descriptor, error)

UnpackFromEntry handles the unpacking of a smart contract from a provided Entry instance and specified machine state. It manages contract instance re-initialization if necessary and extracts contract data using the provided context and state machine settings. Includes panic recovery to ensure stability during the unpacking process.

type TransactionInterceptor

type TransactionInterceptor struct {
	*Service
	// contains filtered or unexported fields
}

TransactionInterceptor encapsulates logic to process blockchain transactions. It intercepts transactions from the blockchain network, managing their initial processing, and forwards relevant data to an AccountInterceptor for further action.

func NewTransactionInterceptor

func NewTransactionInterceptor(ctx context.Context, svc *Service, accountInterceptor *AccountInterceptor, opts *options.Subscriber) (*TransactionInterceptor, error)

NewTransactionInterceptor constructs a new TransactionInterceptor with the necessary dependencies. It requires a Service instance and Subscriber options to configure its operational parameters, including error handling and worker management. Returns an error if any required parameter is invalid or missing.

func (*TransactionInterceptor) Ch

Ch returns the channel used to receive transaction events for processing.

func (*TransactionInterceptor) QueueTx

func (i *TransactionInterceptor) QueueTx(event *events.Transaction) error

QueueTx enqueues a transaction event into the descriptor channel for processing.

func (*TransactionInterceptor) QueueTxFn

func (i *TransactionInterceptor) QueueTxFn(network utils.Network, networkId utils.NetworkID, direction helpers.SyncDirection) subscribers.TxHookFn

QueueTxFn generates a function bound to a specific network and direction to enqueue transaction data for processing. This function is typically used as a hook in blockchain listeners to automatically forward new transactions to the interceptor.

func (*TransactionInterceptor) Start

func (i *TransactionInterceptor) Start() error

Start initializes the workers to begin processing transactions. It sets up the worker environment and begins listening for incoming transaction events.

func (*TransactionInterceptor) Stop

func (i *TransactionInterceptor) Stop() error

Stop terminates the transaction processing, ensuring a graceful shutdown of workers and closing the event channel.

func (*TransactionInterceptor) Unpack

func (i *TransactionInterceptor) Unpack(event *events.Transaction) error

Unpack processes each transaction, extracting critical information and handling errors. It decodes transaction details, determines transaction sender, and forwards necessary information to the AccountInterceptor. It handles various error conditions and ensures state consistency through logging and state management.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL