Documentation ¶
Index ¶
- Constants
- Variables
- func NewAccountActivitySeqCreatorTask(cfg *config.Config) *accountActivitySeqCreatorTask
- func NewAccountActivitySeqPersistorTask(accountActivitySeqDb store.AccountActivitySeq) pipeline.Task
- func NewBackfillSource(cfg *config.Config, client client.Client, syncableDb store.Syncables, ...) (*backfillSource, error)
- func NewBlockDownloaderTask() pipeline.Task
- func NewBlockFetcherTask(client figmentclient.Client) pipeline.Task
- func NewBlockSeqCreatorTask(blockSeqDb store.BlockSeq) *blockSeqCreatorTask
- func NewBlockSeqPersistorTask(blockSeqDb store.BlockSeq) pipeline.Task
- func NewBlockUploaderTask() pipeline.Task
- func NewConfigParser(file string) (*configParser, error)
- func NewGovernanceActivitySeqCreatorTask(cfg *config.Config) *governanceActivitySeqCreatorTask
- func NewGovernanceActivitySeqPersistorTask(governanceActivitySeqDb store.GovernanceActivitySeq) pipeline.Task
- func NewGovernanceLogsParserTask() *governanceLogsParserTask
- func NewHeightMetaDownloaderTask() pipeline.Task
- func NewHeightMetaFetcherTask(client figmentclient.Client) pipeline.Task
- func NewHeightMetaUploaderTask() pipeline.Task
- func NewIndexSource(cfg *config.Config, client figmentclient.Client, syncableDb store.Syncables, ...) (*indexSource, error)
- func NewLogger() pipeline.Logger
- func NewMainSyncerTask(syncableDb store.Syncables) pipeline.Task
- func NewPayloadFactory(dl *datalake.DataLake) *payloadFactory
- func NewPipeline(cfg *config.Config, client figmentclient.Client, dl *datalake.DataLake, ...) (*indexingPipeline, error)
- func NewProposalAggCreatorTask(proposalAggDb store.ProposalAgg) *proposalAggCreatorTask
- func NewProposalAggPersistorTask(proposalAggDb store.ProposalAgg) pipeline.Task
- func NewSetupTask(c figmentclient.Client) *setupTask
- func NewSink(syncableDb store.Syncables, databaseDb store.Database, c figmentclient.Client, ...) *sink
- func NewSyncerPersistorTask(syncableDb store.Syncables) pipeline.Task
- func NewSystemEventCreatorTask(cfg *config.Config, validatorSeqDb store.ValidatorSeq, ...) *systemEventCreatorTask
- func NewSystemEventPersistorTask(systemEventDb store.SystemEvents) pipeline.Task
- func NewTransactionDownloaderTask() pipeline.Task
- func NewTransactionFetcherTask(client figmentclient.Client) pipeline.Task
- func NewTransactionUploaderTask() pipeline.Task
- func NewValidatorAggCreatorTask(validatorAggDb store.ValidatorAgg) *validatorAggCreatorTask
- func NewValidatorAggPersistorTask(validatorAggDb store.ValidatorAgg) pipeline.Task
- func NewValidatorDownloaderTask() pipeline.Task
- func NewValidatorFetcherTask(client figmentclient.Client) pipeline.Task
- func NewValidatorGroupAggCreatorTask(validatorGroupAggDb store.ValidatorGroupAgg) *validatorGroupAggCreatorTask
- func NewValidatorGroupAggPersistorTask(validatorGroupAggDb store.ValidatorGroupAgg) pipeline.Task
- func NewValidatorGroupDownloaderTask() pipeline.Task
- func NewValidatorGroupFetcherTask(client figmentclient.Client) pipeline.Task
- func NewValidatorGroupSeqCreatorTask(cfg *config.Config) *validatorGroupSeqCreatorTask
- func NewValidatorGroupSeqPersistorTask(validatorGroupSeqDb store.ValidatorGroupSeq) pipeline.Task
- func NewValidatorGroupUploaderTask() pipeline.Task
- func NewValidatorSeqCreatorTask(cfg *config.Config) *validatorSeqCreatorTask
- func NewValidatorSeqPersistorTask(validatorSeqDb store.ValidatorSeq) pipeline.Task
- func NewValidatorUploaderTask() pipeline.Task
- func RunFetcherPipeline(height int64, client figmentclient.Client, dl *datalake.DataLake) error
- func ToAccountActivitySequence(syncable *model.Syncable, rawTransactions []*figmentclient.Transaction) ([]model.AccountActivitySeq, error)
- func ToBlockSequence(syncable *model.Syncable, rawBlock *figmentclient.Block) (*model.BlockSeq, error)
- func ToGovernanceActivitySequence(syncable *model.Syncable, parsedGovernanceLogs []*ParsedGovernanceLogs) ([]model.GovernanceActivitySeq, error)
- func ToValidatorGroupSequence(syncable *model.Syncable, rawValidatorGroups []*figmentclient.ValidatorGroup, ...) ([]model.ValidatorGroupSeq, error)
- func ToValidatorSequence(syncable *model.Syncable, rawValidators []*figmentclient.Validator) ([]model.ValidatorSeq, error)
- type BackfillConfig
- type BlockDownloaderTask
- type BlockFetcherTask
- type BlockUploaderTask
- type ConfigParser
- type HeightMeta
- type HeightMetaDownloaderTask
- type HeightMetaFetcherTask
- type HeightMetaUploaderTask
- type IndexConfig
- type IndexSourceConfig
- type ParsedGovernanceLogs
- type PipelineSyncableStore
- type RunConfig
- type TransactionsDownloaderTask
- type TransactionsFetcherTask
- type TransactionsUploaderTask
- type ValidatorGroupsDownloaderTask
- type ValidatorGroupsFetcherTask
- type ValidatorGroupsUploaderTask
- type ValidatorsDownloaderTask
- type ValidatorsFetcherTask
- type ValidatorsUploaderTask
Constants ¶
const ( ValidatorAggCreatorTaskName = "ValidatorAggCreator" ValidatorGroupAggCreatorTaskName = "ValidatorGroupAggCreator" ProposalAggCreatorTaskName = "ProposalAggCreator" )
const ( TargetIndexBlockSequences = iota + 1 TargetIndexValidatorSequences TargetIndexValidatorGroupSequences TargetIndexValidatorAggregates TargetIndexValidatorGroupAggregates )
const ( TaskNameHeightMetaDownloader = "HeightMetaDownloader" TaskNameBlockDownloader = "BlockDownloader" TaskNameValidatorsDownloader = "ValidatorsDownloader" TaskNameValidatorGroupsDownloader = "ValidatorGroupsDownloader" TaskNameTransactionsDownloader = "TransactionsDownloader" )
const ( TaskNameHeightMetaFetcher = "HeightMetaFetcher" TaskNameBlockFetcher = "BlockFetcher" TaskNameValidatorsFetcher = "ValidatorsFetcher" TaskNameValidatorGroupsFetcher = "ValidatorGroupsFetcher" TaskNameTransactionsFetcher = "TransactionsFetcher" )
const ( SyncerPersistorTaskName = "SyncerPersistor" BlockSeqPersistorTaskName = "BlockSeqPersistor" ValidatorSeqPersistorTaskName = "ValidatorSeqPersistor" ValidatorGroupSeqPersistorTaskName = "ValidatorGroupSeqPersistor" AccountActivitySeqPersistorTaskName = "AccountActivitySeqPersistor" ValidatorAggPersistorTaskName = "ValidatorAggPersistor" ValidatorGroupAggPersistorTaskName = "ValidatorGroupAggPersistor" ProposalAggPersistorTaskName = "ProposalAggPersistor" TaskNameSystemEventPersistor = "SystemEventPersistor" GovernanceActivitySeqPersistorTaskName = "GovernanceActivitySeqPersistor" )
const ( BlockSeqCreatorTaskName = "BlockSeqCreator" ValidatorSeqCreatorTaskName = "ValidatorSeqCreator" ValidatorGroupSeqCreatorTaskName = "ValidatorGroupSeqCreator" AccountActivitySeqCreatorTaskName = "AccountActivitySeqCreator" GovernanceActivitySeqCreatorTaskName = "GovernanceActivitySeqCreator" )
const ( TaskNameHeightMetaUploader = "HeightMetaUploader" TaskNameBlockUploader = "BlockUploader" TaskNameValidatorsUploader = "ValidatorsUploader" TaskNameValidatorGroupsUploader = "ValidatorGroupsUploader" TaskNameTransactionsUploader = "TransactionsUploader" )
const (
CtxReport = "context_report"
)
const (
MainSyncerTaskName = "MainSyncer"
)
const (
TaskNameGovernanceLogsParser = "GovernanceLogsParser"
)
const (
TaskNameSetup = "Setup"
)
const (
TaskNameSystemEventCreator = "SystemEventCreator"
)
Variables ¶
var ( ErrGroupRewardOutsideOfRange = errors.New("group reward is outside of specified buckets") MaxValidatorSequences int64 = 1000 MissedForMaxThreshold int64 = 50 MissedInRowThreshold int64 = 50 )
var ( OperationTypeInternalTransferReceived = fmt.Sprintf("%sReceived", figmentclient.OperationTypeInternalTransfer) OperationTypeInternalTransferSent = fmt.Sprintf("%sSent", figmentclient.OperationTypeInternalTransfer) OperationTypeValidatorGroupVoteCastReceived = fmt.Sprintf("%sReceived", figmentclient.OperationTypeValidatorGroupVoteCast) OperationTypeValidatorGroupVoteCastSent = fmt.Sprintf("%sSent", figmentclient.OperationTypeValidatorGroupVoteCast) OperationTypeValidatorGroupVoteActivatedReceived = fmt.Sprintf("%sReceived", figmentclient.OperationTypeValidatorGroupVoteActivated) OperationTypeValidatorGroupVoteActivatedSent = fmt.Sprintf("%sSent", figmentclient.OperationTypeValidatorGroupVoteActivated) OperationTypeValidatorGroupPendingVoteRevokedReceived = fmt.Sprintf("%sReceived", figmentclient.OperationTypeValidatorGroupPendingVoteRevoked) OperationTypeValidatorGroupPendingVoteRevokedSent = fmt.Sprintf("%sSent", figmentclient.OperationTypeValidatorGroupPendingVoteRevoked) OperationTypeValidatorGroupActiveVoteRevokedReceived = fmt.Sprintf("%sReceived", figmentclient.OperationTypeValidatorGroupActiveVoteRevoked) OperationTypeValidatorGroupActiveVoteRevokedSent = fmt.Sprintf("%sSent", figmentclient.OperationTypeValidatorGroupActiveVoteRevoked) OperationTypeValidatorEpochPaymentDistributedForGroup = fmt.Sprintf("%sForGroup", figmentclient.OperationTypeValidatorEpochPaymentDistributed) OperationTypeValidatorEpochPaymentDistributedForValidator = fmt.Sprintf("%sForValidator", figmentclient.OperationTypeValidatorEpochPaymentDistributed) ErrBlockSequenceNotValid = errors.New("block sequence not valid") ErrValidatorSequenceNotValid = errors.New("validator sequence not valid") ErrValidatorGroupSequenceNotValid = errors.New("validator group sequence not valid") )
var ( StageAnalyzer pipeline.StageName = "stage_analyzer" ErrIsPristine = errors.New("cannot run because database is empty") ErrIndexCannotBeRun = errors.New("cannot run index process") ErrBackfillCannotBeRun = errors.New("cannot run backfill process") )
var (
ErrNothingToProcess = errors.New("nothing to process")
)
Functions ¶
func NewAccountActivitySeqCreatorTask ¶ added in v0.1.0
NewAccountActivitySeqCreatorTask creates account activity sequences
func NewAccountActivitySeqPersistorTask ¶ added in v0.1.0
func NewAccountActivitySeqPersistorTask(accountActivitySeqDb store.AccountActivitySeq) pipeline.Task
NewAccountActivitySeqPersistorTask is responsible for storing validator info to persistence layer
func NewBackfillSource ¶
func NewBlockDownloaderTask ¶ added in v0.2.0
func NewBlockFetcherTask ¶
func NewBlockFetcherTask(client figmentclient.Client) pipeline.Task
func NewBlockSeqCreatorTask ¶
NewBlockSeqCreatorTask creates block sequences
func NewBlockSeqPersistorTask ¶
NewBlockSeqPersistorTask is responsible for storing block to persistence layer
func NewBlockUploaderTask ¶ added in v0.2.0
func NewConfigParser ¶
func NewGovernanceActivitySeqCreatorTask ¶ added in v0.1.0
NewGovernanceActivitySeqCreatorTask creates account activity sequences
func NewGovernanceActivitySeqPersistorTask ¶ added in v0.1.0
func NewGovernanceActivitySeqPersistorTask(governanceActivitySeqDb store.GovernanceActivitySeq) pipeline.Task
NewGovernanceActivitySeqPersistorTask is responsible for storing validator info to persistence layer
func NewGovernanceLogsParserTask ¶ added in v0.1.0
func NewGovernanceLogsParserTask() *governanceLogsParserTask
NewGovernanceLogsParserTask parses transaction logs to data about governance
func NewHeightMetaDownloaderTask ¶ added in v0.2.0
func NewHeightMetaFetcherTask ¶ added in v0.2.0
func NewHeightMetaFetcherTask(client figmentclient.Client) pipeline.Task
func NewHeightMetaUploaderTask ¶ added in v0.2.0
func NewIndexSource ¶
func NewIndexSource(cfg *config.Config, client figmentclient.Client, syncableDb store.Syncables, jobsDb store.Jobs, sourceCfg *IndexSourceConfig) (*indexSource, error)
func NewPayloadFactory ¶
func NewPipeline ¶
func NewPipeline( cfg *config.Config, client figmentclient.Client, dl *datalake.DataLake, syncableDb store.Syncables, jobsDb store.Jobs, databaseDb store.Database, reportsDb store.Reports, blockSeqDb store.BlockSeq, validatorSeqDb store.ValidatorSeq, accountActivitySeqDb store.AccountActivitySeq, validatorGroupSeqDb store.ValidatorGroupSeq, validatorAggDb store.ValidatorAgg, validatorGroupAggDb store.ValidatorGroupAgg, proposalAggDb store.ProposalAgg, systemEventDb store.SystemEvents, governanceActivitySeqDb store.GovernanceActivitySeq, ) (*indexingPipeline, error)
func NewProposalAggCreatorTask ¶ added in v0.1.0
func NewProposalAggCreatorTask(proposalAggDb store.ProposalAgg) *proposalAggCreatorTask
func NewProposalAggPersistorTask ¶ added in v0.1.0
func NewProposalAggPersistorTask(proposalAggDb store.ProposalAgg) pipeline.Task
NewProposalAggPersistorTask psql validator aggregate to persistence layer
func NewSetupTask ¶ added in v0.2.0
func NewSetupTask(c figmentclient.Client) *setupTask
func NewSyncerPersistorTask ¶
NewSyncerPersistorTask is responsible for storing syncable to persistence layer
func NewSystemEventCreatorTask ¶ added in v0.1.0
func NewSystemEventCreatorTask(cfg *config.Config, validatorSeqDb store.ValidatorSeq, accountActivitySeqDb store.AccountActivitySeq, validatorGroupSeqDb store.ValidatorGroupSeq) *systemEventCreatorTask
NewSystemEventCreatorTask creates system events
func NewSystemEventPersistorTask ¶ added in v0.1.0
func NewSystemEventPersistorTask(systemEventDb store.SystemEvents) pipeline.Task
NewSystemEventPersistorTask psql system events to persistance layer
func NewTransactionDownloaderTask ¶ added in v0.2.0
func NewTransactionFetcherTask ¶ added in v0.1.0
func NewTransactionFetcherTask(client figmentclient.Client) pipeline.Task
func NewTransactionUploaderTask ¶ added in v0.2.0
func NewValidatorAggCreatorTask ¶
func NewValidatorAggCreatorTask(validatorAggDb store.ValidatorAgg) *validatorAggCreatorTask
func NewValidatorAggPersistorTask ¶
func NewValidatorAggPersistorTask(validatorAggDb store.ValidatorAgg) pipeline.Task
NewValidatorAggPersistorTask store validator aggregate to persistence layer
func NewValidatorDownloaderTask ¶ added in v0.2.0
func NewValidatorFetcherTask ¶
func NewValidatorFetcherTask(client figmentclient.Client) pipeline.Task
func NewValidatorGroupAggCreatorTask ¶
func NewValidatorGroupAggCreatorTask(validatorGroupAggDb store.ValidatorGroupAgg) *validatorGroupAggCreatorTask
func NewValidatorGroupAggPersistorTask ¶
func NewValidatorGroupAggPersistorTask(validatorGroupAggDb store.ValidatorGroupAgg) pipeline.Task
NewValidatorGroupAggPersistorTask psql validator group aggregate to persistence layer
func NewValidatorGroupDownloaderTask ¶ added in v0.2.0
func NewValidatorGroupFetcherTask ¶
func NewValidatorGroupFetcherTask(client figmentclient.Client) pipeline.Task
func NewValidatorGroupSeqCreatorTask ¶
NewValidatorGroupSeqCreatorTask creates validator era sequences
func NewValidatorGroupSeqPersistorTask ¶
func NewValidatorGroupSeqPersistorTask(validatorGroupSeqDb store.ValidatorGroupSeq) pipeline.Task
NewValidatorGroupSeqPersistorTask is responsible for storing validator era info to persistence layer
func NewValidatorGroupUploaderTask ¶ added in v0.2.0
func NewValidatorSeqCreatorTask ¶
NewValidatorSeqCreatorTask creates validator sequences
func NewValidatorSeqPersistorTask ¶
func NewValidatorSeqPersistorTask(validatorSeqDb store.ValidatorSeq) pipeline.Task
NewValidatorSeqPersistorTask is responsible for storing validator info to persistence layer
func NewValidatorUploaderTask ¶ added in v0.2.0
func RunFetcherPipeline ¶ added in v0.2.0
func ToAccountActivitySequence ¶ added in v0.1.0
func ToAccountActivitySequence(syncable *model.Syncable, rawTransactions []*figmentclient.Transaction) ([]model.AccountActivitySeq, error)
func ToBlockSequence ¶
func ToGovernanceActivitySequence ¶ added in v0.1.0
func ToGovernanceActivitySequence(syncable *model.Syncable, parsedGovernanceLogs []*ParsedGovernanceLogs) ([]model.GovernanceActivitySeq, error)
func ToValidatorGroupSequence ¶
func ToValidatorGroupSequence(syncable *model.Syncable, rawValidatorGroups []*figmentclient.ValidatorGroup, rawValidators []*figmentclient.Validator) ([]model.ValidatorGroupSeq, error)
func ToValidatorSequence ¶
func ToValidatorSequence(syncable *model.Syncable, rawValidators []*figmentclient.Validator) ([]model.ValidatorSeq, error)
Types ¶
type BackfillConfig ¶
type BlockDownloaderTask ¶ added in v0.2.0
type BlockDownloaderTask struct{}
func (*BlockDownloaderTask) GetName ¶ added in v0.2.0
func (t *BlockDownloaderTask) GetName() string
type BlockFetcherTask ¶
type BlockFetcherTask struct {
// contains filtered or unexported fields
}
func (*BlockFetcherTask) GetName ¶
func (t *BlockFetcherTask) GetName() string
type BlockUploaderTask ¶ added in v0.2.0
type BlockUploaderTask struct{}
func (*BlockUploaderTask) GetName ¶ added in v0.2.0
func (t *BlockUploaderTask) GetName() string
type ConfigParser ¶
type ConfigParser interface { GetCurrentVersionId() int64 GetAllVersionedVersionIds() []int64 IsAnyVersionSequential(versionIds []int64) bool GetAllAvailableTasks() []pipeline.TaskName GetAllVersionedTasks() ([]pipeline.TaskName, error) GetTasksByVersionIds([]int64) ([]pipeline.TaskName, error) GetTasksByTargetIds([]int64) ([]pipeline.TaskName, error) }
type HeightMeta ¶
type HeightMetaDownloaderTask ¶ added in v0.2.0
type HeightMetaDownloaderTask struct{}
func (*HeightMetaDownloaderTask) GetName ¶ added in v0.2.0
func (t *HeightMetaDownloaderTask) GetName() string
type HeightMetaFetcherTask ¶ added in v0.2.0
type HeightMetaFetcherTask struct {
// contains filtered or unexported fields
}
func (*HeightMetaFetcherTask) GetName ¶ added in v0.2.0
func (t *HeightMetaFetcherTask) GetName() string
type HeightMetaUploaderTask ¶ added in v0.2.0
type HeightMetaUploaderTask struct{}
func (*HeightMetaUploaderTask) GetName ¶ added in v0.2.0
func (t *HeightMetaUploaderTask) GetName() string
type IndexConfig ¶
type IndexSourceConfig ¶
type ParsedGovernanceLogs ¶ added in v0.1.0
type PipelineSyncableStore ¶
type TransactionsDownloaderTask ¶ added in v0.2.0
type TransactionsDownloaderTask struct{}
func (*TransactionsDownloaderTask) GetName ¶ added in v0.2.0
func (t *TransactionsDownloaderTask) GetName() string
type TransactionsFetcherTask ¶ added in v0.1.0
type TransactionsFetcherTask struct {
// contains filtered or unexported fields
}
func (*TransactionsFetcherTask) GetName ¶ added in v0.1.0
func (t *TransactionsFetcherTask) GetName() string
type TransactionsUploaderTask ¶ added in v0.2.0
type TransactionsUploaderTask struct{}
func (*TransactionsUploaderTask) GetName ¶ added in v0.2.0
func (t *TransactionsUploaderTask) GetName() string
type ValidatorGroupsDownloaderTask ¶ added in v0.2.0
type ValidatorGroupsDownloaderTask struct{}
func (*ValidatorGroupsDownloaderTask) GetName ¶ added in v0.2.0
func (t *ValidatorGroupsDownloaderTask) GetName() string
type ValidatorGroupsFetcherTask ¶
type ValidatorGroupsFetcherTask struct {
// contains filtered or unexported fields
}
func (*ValidatorGroupsFetcherTask) GetName ¶
func (t *ValidatorGroupsFetcherTask) GetName() string
type ValidatorGroupsUploaderTask ¶ added in v0.2.0
type ValidatorGroupsUploaderTask struct{}
func (*ValidatorGroupsUploaderTask) GetName ¶ added in v0.2.0
func (t *ValidatorGroupsUploaderTask) GetName() string
type ValidatorsDownloaderTask ¶ added in v0.2.0
type ValidatorsDownloaderTask struct{}
func (*ValidatorsDownloaderTask) GetName ¶ added in v0.2.0
func (t *ValidatorsDownloaderTask) GetName() string
type ValidatorsFetcherTask ¶
type ValidatorsFetcherTask struct {
// contains filtered or unexported fields
}
func (*ValidatorsFetcherTask) GetName ¶
func (t *ValidatorsFetcherTask) GetName() string
type ValidatorsUploaderTask ¶ added in v0.2.0
type ValidatorsUploaderTask struct{}
func (*ValidatorsUploaderTask) GetName ¶ added in v0.2.0
func (t *ValidatorsUploaderTask) GetName() string
Source Files ¶
- aggregator_tasks.go
- analyzer_tasks.go
- config_parser.go
- downloader_tasks.go
- fetcher_tasks.go
- logger.go
- mappers.go
- parser_tasks.go
- payload.go
- persistor_tasks.go
- pipeline.go
- pipeline_options_creator.go
- pipeline_status_checker.go
- report_creator.go
- sequencer_tasks.go
- setup_tasks.go
- sink.go
- source_backfill.go
- source_index.go
- syncer_tasks.go
- uploader_tasks.go