Documentation ¶
Overview ¶
package indexer provides an indexing service that integrates various types of interceptors to handle data synchronization and state management across blockchain networks. It utilizes goroutines to handle high volumes of data efficiently and implements mechanisms for batch processing and error handling to ensure data integrity and performance.
Index ¶
- Constants
- Variables
- func InjectSubscribers(service *Service, network utils.Network, networkId utils.NetworkID) error
- func RegisterService(ctx *cli.Context, baseService service.Service) (service.Service, error)
- func UnpackerInterceptor(srv *Service) func(event *events.Unpack) error
- type AccountInterceptor
- func (ai *AccountInterceptor) Publish(event *events.Account) error
- func (ai *AccountInterceptor) PublishAddress(network utils.Network, networkId utils.NetworkID, blockNumber *big.Int, ...) error
- func (ai *AccountInterceptor) QueueAddress(network utils.Network, networkId utils.NetworkID, blockNumber *big.Int, ...) error
- func (ai *AccountInterceptor) QueueEvent(event *events.Account) error
- func (ai *AccountInterceptor) Start() error
- func (ai *AccountInterceptor) Unpack(event *events.Account) (*models.Account, error)
- type BlockInterceptor
- func (i *BlockInterceptor) Ch() chan *events.Block
- func (i *BlockInterceptor) QueueBlockFn(network utils.Network, networkId utils.NetworkID, ...) subscribers.BlockHookFn
- func (i *BlockInterceptor) Start() error
- func (i *BlockInterceptor) Stop() error
- func (i *BlockInterceptor) Unpack(event *events.Block) error
- type Service
- func (s *Service) Dependencies() map[service.DependencyName]service.Option
- func (s *Service) GetAccountInterceptor() *AccountInterceptor
- func (s *Service) Start(network utils.Network, networkId utils.NetworkID) error
- func (s *Service) Stop() error
- func (s *Service) Unpack(ctx *cli.Context) error
- func (s *Service) UnpackFromEntry(ctx context.Context, entry *entries.Entry, state machine.State) (*unpacker.Descriptor, error)
- type TransactionInterceptor
- func (i *TransactionInterceptor) Ch() chan *events.Transaction
- func (i *TransactionInterceptor) QueueTx(event *events.Transaction) error
- func (i *TransactionInterceptor) QueueTxFn(network utils.Network, networkId utils.NetworkID, ...) subscribers.TxHookFn
- func (i *TransactionInterceptor) Start() error
- func (i *TransactionInterceptor) Stop() error
- func (i *TransactionInterceptor) Unpack(event *events.Transaction) error
Constants ¶
const DefaultBulkSize = 30
DefaultBulkSize specifies the default number of account events processed in a single batch.
const DefaultBulkTimeout = 2 * time.Second
DefaultBulkTimeout specifies the duration to wait before processing a batch of account events.
Variables ¶
var TransactionsSubscriber subscribers.SubscriberType = "transactions"
TransactionsSubscriber is a constant SubscriberType used to subscribe to new block transaction events.
Functions ¶
func InjectSubscribers ¶
InjectSubscribers will look into map of the subscribers and attempt to register them against subscriber manager. In case of any issues, it will sequentially fail. It's deliberate to fail on sequence instead of spawning goroutines and returning back errors.
func RegisterService ¶
RegisterService initializes and registers a new indexer service using the provided CLI context and an existing base service. This function is typically used to setup the service from command line interface options, adapting the configuration to the operational needs of the indexer.
func UnpackerInterceptor ¶
UnpackerInterceptor returns a closure that processes Unpack events by initializing and managing contract instances. This function sets up the necessary environment to interpret blockchain events, ensuring that each event is unpacked and analyzed accurately. It retrieves or discovers necessary blockchain information (like block, transaction, and receipt details), unpacks the contract data using the provided service's unpacker, and finally, handles errors and panics gracefully.
Types ¶
type AccountInterceptor ¶
type AccountInterceptor struct { *service.BaseService // contains filtered or unexported fields }
AccountInterceptor handles the collection and processing of account events, utilizing channels and batch processing to optimize database interactions.
func NewAccountInterceptor ¶
func NewAccountInterceptor(ctx context.Context, svc *service.BaseService, opts *options.Subscriber) (*AccountInterceptor, error)
NewAccountInterceptor constructs a new AccountInterceptor using a BaseService and Subscriber options. It initializes channels based on the provided buffer size in options.
func (*AccountInterceptor) Publish ¶
func (ai *AccountInterceptor) Publish(event *events.Account) error
Publish sends an account event to a NATS subject. This method serializes the Account event and publishes it to the configured NATS subject, handling network communication and serialization errors. The event is prepared for transmission by converting it into a binary format, which is then published to a specified NATS subject.
func (*AccountInterceptor) PublishAddress ¶
func (ai *AccountInterceptor) PublishAddress(network utils.Network, networkId utils.NetworkID, blockNumber *big.Int, txHash common.Hash, address common.Address) error
PublishAddress creates an account event from provided details and publishes it using the Publish method. This is a convenience function that assembles account information into an event structure and then delegates the publishing process to the Publish method, ensuring that the event is sent to the configured NATS subject. This method is useful for scenarios where account data is readily available and needs to be quickly converted to an event and published.
func (*AccountInterceptor) QueueAddress ¶
func (ai *AccountInterceptor) QueueAddress(network utils.Network, networkId utils.NetworkID, blockNumber *big.Int, txHash common.Hash, address common.Address) error
QueueAddress enqueues a new account event from its components. This is a convenience method that wraps QueueEvent for specific account details.
func (*AccountInterceptor) QueueEvent ¶
func (ai *AccountInterceptor) QueueEvent(event *events.Account) error
QueueEvent enqueues a new account event for processing. This method places the event into the accountsCh channel unless the context has been canceled.
func (*AccountInterceptor) Start ¶
func (ai *AccountInterceptor) Start() error
Start initializes the AccountInterceptor's processing routines. It starts a specified number of workers to process incoming events concurrently.
type BlockInterceptor ¶
type BlockInterceptor struct { *Service // contains filtered or unexported fields }
BlockInterceptor manages the interception and processing of blockchain block events. It utilizes a worker pool to handle events concurrently and forwards blocks to the appropriate transaction interceptors for further processing.
func NewBlockInterceptor ¶
func NewBlockInterceptor(ctx context.Context, svc *Service, txInterceptor *TransactionInterceptor, opts *options.Subscriber) (*BlockInterceptor, error)
NewBlockInterceptor creates a new BlockInterceptor. This constructor requires a Service, TransactionInterceptor, and Subscriber options to set up the interceptor. It returns an error if any input is nil or the options are invalid.
func (*BlockInterceptor) Ch ¶
func (i *BlockInterceptor) Ch() chan *events.Block
Ch returns the channel that receives block events for processing.
func (*BlockInterceptor) QueueBlockFn ¶
func (i *BlockInterceptor) QueueBlockFn(network utils.Network, networkId utils.NetworkID, direction helpers.SyncDirection) subscribers.BlockHookFn
QueueBlockFn returns a function that queues a block for processing based on the given network and sync direction. This function is typically used as a callback in blockchain event listeners to forward blocks to the interceptor.
func (*BlockInterceptor) Start ¶
func (i *BlockInterceptor) Start() error
Start initializes the block processing workers. This method starts the worker pool which listens for new blocks and processes them.
func (*BlockInterceptor) Stop ¶
func (i *BlockInterceptor) Stop() error
Stop halts the processing of block events and closes the event channel. It ensures a graceful shutdown of the block interceptor's workers.
func (*BlockInterceptor) Unpack ¶
func (i *BlockInterceptor) Unpack(event *events.Block) error
Unpack processes an individual block event, delegating transaction events to the transaction interceptor. This method is responsible for unpacking block data and initiating transaction processing through the transaction interceptor.
type Service ¶
type Service struct { *service.BaseService // contains filtered or unexported fields }
Service extends the BaseService with specific interceptors to manage blockchain data indexing such as accounts, transactions, and blocks.
func NewService ¶
func NewService(ctx *cli.Context, baseService *service.BaseService) (*Service, error)
NewService creates and returns a new Service instance configured with necessary interceptors. This setup involves detailed configuration fetching and initialization of interceptors.
func (*Service) Dependencies ¶
func (s *Service) Dependencies() map[service.DependencyName]service.Option
Dependencies returns a map of service dependencies, currently returns an empty map as placeholders for potential future dependencies.
func (*Service) GetAccountInterceptor ¶
func (s *Service) GetAccountInterceptor() *AccountInterceptor
GetAccountInterceptor returns the AccountInterceptor associated with the service.
func (*Service) Start ¶
Start initializes and starts all components of the Service. It sets up interceptors and begins processing blockchain data based on the provided network settings.
func (*Service) Unpack ¶
Unpack processes a smart contract based on the Ethereum address and network provided via the CLI context. It initializes a new contract instance, discovers essential chain information, and handles the unpacking process. It also includes error handling and panic recovery mechanisms to ensure stability.
func (*Service) UnpackFromEntry ¶
func (s *Service) UnpackFromEntry(ctx context.Context, entry *entries.Entry, state machine.State) (*unpacker.Descriptor, error)
UnpackFromEntry handles the unpacking of a smart contract from a provided Entry instance and specified machine state. It manages contract instance re-initialization if necessary and extracts contract data using the provided context and state machine settings. Includes panic recovery to ensure stability during the unpacking process.
type TransactionInterceptor ¶
type TransactionInterceptor struct { *Service // contains filtered or unexported fields }
TransactionInterceptor encapsulates logic to process blockchain transactions. It intercepts transactions from the blockchain network, managing their initial processing, and forwards relevant data to an AccountInterceptor for further action.
func NewTransactionInterceptor ¶
func NewTransactionInterceptor(ctx context.Context, svc *Service, accountInterceptor *AccountInterceptor, opts *options.Subscriber) (*TransactionInterceptor, error)
NewTransactionInterceptor constructs a new TransactionInterceptor with the necessary dependencies. It requires a Service instance and Subscriber options to configure its operational parameters, including error handling and worker management. Returns an error if any required parameter is invalid or missing.
func (*TransactionInterceptor) Ch ¶
func (i *TransactionInterceptor) Ch() chan *events.Transaction
Ch returns the channel used to receive transaction events for processing.
func (*TransactionInterceptor) QueueTx ¶
func (i *TransactionInterceptor) QueueTx(event *events.Transaction) error
QueueTx enqueues a transaction event into the descriptor channel for processing.
func (*TransactionInterceptor) QueueTxFn ¶
func (i *TransactionInterceptor) QueueTxFn(network utils.Network, networkId utils.NetworkID, direction helpers.SyncDirection) subscribers.TxHookFn
QueueTxFn generates a function bound to a specific network and direction to enqueue transaction data for processing. This function is typically used as a hook in blockchain listeners to automatically forward new transactions to the interceptor.
func (*TransactionInterceptor) Start ¶
func (i *TransactionInterceptor) Start() error
Start initializes the workers to begin processing transactions. It sets up the worker environment and begins listening for incoming transaction events.
func (*TransactionInterceptor) Stop ¶
func (i *TransactionInterceptor) Stop() error
Stop terminates the transaction processing, ensuring a graceful shutdown of workers and closing the event channel.
func (*TransactionInterceptor) Unpack ¶
func (i *TransactionInterceptor) Unpack(event *events.Transaction) error
Unpack processes each transaction, extracting critical information and handling errors. It decodes transaction details, determines transaction sender, and forwards necessary information to the AccountInterceptor. It handles various error conditions and ensures state consistency through logging and state management.