Indexing Engine
Description
Indexing engine helps to build indexers using simple to use DSL. It's goal is to provide a logical structure to
the process of indexing.
Every indexing pipeline has fixed stages available to hook in to:
Setup stage
(Chore): performs setup tasks
Syncer stage
(Syncing): creates syncable
Fetcher stage
(Syncing): fetches data for indexing
Parser stage
(Syncing): parses and normalizes fetched data to a single structure
Validator stage
(Syncing): validates parsed data
Sequencer stage
(Indexing): Creates sequences from fetched or/and parsed data
Aggregator stage
(Indexing): Creates aggregates from fetched or/and parsed data
Cleanup stage
(Chore): Cleans up after execution
Besides that there are 2 additional components: Source and Sink.
Source
is responsible for providing height iterator for the pipeline and Sink
is gathering output data which can be used after the pipeline is done processing.
Below flow-chart depicts all the available stages that come with this package and order in which they are executed.
Please note that all syncing phase stages are executed in sequence, whereas indexing phase stages are executed concurrently
in order to speed up the indexing process.
Installation
To install github.com/figment-networks/indexing-engine use:
go get https://github.com/figment-networks/github.com/figment-networks/indexing-engine
Usage
Setting up stages
In order to set a specific stage you can use:
p.Set[StageName]Stage(
pipeline.SyncRunner(NewTask()),
)
As a parameter to p.Set[StageName]Stage
functions you pass in StageRunner instance.
StageRunner is responsible for running individual tasks inside of the stage.
This package provides 2 types of StageRunners:
SyncRunner
- executes tasks one by one
AsyncRunner
- executes tasks concurrently
If you want to use your own method of running task inside of stage, you can easily
create your own implementation of StageRunner and pass it in to p.Set[StageName]Stage
.
Adding custom stages
If you want to perform some action on but provided stages are not good logic fit for it, you can always add
custom stages BEFORE or AFTER existing ones. In order to do that you can use:
AddStageBefore
- adds stage before provided existing stage
AddStageAfter
- adds stage after provided existing stage
Below is an example showing how you can add custom stage (as a func) after Fetcher stage
const (
CustomStageName = "AfterFetcher"
)
afterFetcherFunc := pipeline.StageRunnerFunc(func(ctx context.Context, p pipeline.Payload, f pipeline.TaskValidator) error {
//...
return nil
})
p.AddStageBefore(pipeline.StageFetcher, CustomStageName, afterFetcherFunc)
Retrying
github.com/figment-networks/indexing-engine provides 2 types of retrying mechanisms:
RetryingStageRunner
- which is responsible for retrying the entire stage if error occurred
RetryingTask
- which is responsible for retrying individual tasks if it return error
In order to implement retrying mechanism you need to wrap stage or task with above functions.
Here is an example of use of RetryingTask
:
p.SetFetcherStage(
pipeline.AsyncRunner(
pipeline.RetryingTask(NewFetcherTask(), func(err error) bool {
// Make error always transient for simplicity
return true
}, 3),
),
)
Selective execution
Indexing engine provides you with options to run stages and individual indexing tasks selectively.
You have 4 options you can use for this purpose:
StagesWhitelist
- list of stages to execute
StagesBlacklist
- list of stages to NOT execute
IndexingTasksWhitelist
- list of indexing tasks to execute
IndexingTasksBlacklist
- list of indexing tasks to NOT execute
In order to use above options you have to use setOptions
method of pipeline like so:
p.SetOptions(&pipeline.Options{
IndexingTasksWhitelist: []string{"SequencerTask"},
})
Above example would run only SequencerTask
during indexing process. It is useful if you want to reindex the data but you only care about specific set of data.
Examples
In /examples
folder you can find an example of a pipeline. To run it use:
go run example/main.go
To-Dos:
- Collects stats for evert stage and every task
- Add context cancellation