pipeline

package
v0.0.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2022 License: Apache-2.0 Imports: 29 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DeltaGetter added in v0.0.20

type DeltaGetter interface {
	GetDeltas() []*pbsubstreams.StoreDelta
}

type ForkHandler added in v0.0.20

type ForkHandler struct {
	// contains filtered or unexported fields
}

func NewForkHandler added in v0.0.21

func NewForkHandler() *ForkHandler

type Option

type Option func(p *Pipeline)

func WithPostBlockHook

func WithPostBlockHook(f substreams.BlockHook) Option

func WithPostJobHook

func WithPostJobHook(f substreams.PostJobHook) Option

func WithPreBlockHook

func WithPreBlockHook(f substreams.BlockHook) Option

func WithSyncBlockRangeRestriction

func WithSyncBlockRangeRestriction(maxRangeSize uint64) Option

type Pipeline

type Pipeline struct {
	StoreMap store.Map
	// contains filtered or unexported fields
}

func New

func New(
	ctx context.Context,
	graph *manifest.ModuleGraph,
	blockType string,
	wasmExtensions []wasm.WASMExtensioner,
	execOutputCache execout.CacheEngine,
	runtimeConfig config.RuntimeConfig,
	bounder *StoreBoundary,
	respFunc func(resp *pbsubstreams.Response) error,
	opts ...Option,
) *Pipeline

func (*Pipeline) Init added in v0.0.14

func (p *Pipeline) Init(ctx context.Context) (err error)

func (*Pipeline) Launch added in v0.0.21

func (p *Pipeline) Launch(ctx context.Context, stream Streamable, streamSrv Trailable) error

func (*Pipeline) ProcessBlock added in v0.0.14

func (p *Pipeline) ProcessBlock(block *bstream.Block, obj interface{}) (err error)

type PipelineOptioner

type PipelineOptioner interface {
	PipelineOptions(ctx context.Context, request *pbsubstreams.Request) []Option
}

type StoreBoundary added in v0.0.21

type StoreBoundary struct {
	// contains filtered or unexported fields
}

func NewStoreBoundary added in v0.0.21

func NewStoreBoundary(
	interval uint64,
	requestStopBlock uint64,
) *StoreBoundary

func (*StoreBoundary) BumpBoundary added in v0.0.21

func (r *StoreBoundary) BumpBoundary()

func (*StoreBoundary) GetStoreFlushRanges added in v0.0.21

func (r *StoreBoundary) GetStoreFlushRanges(isSubrequest bool, reqStopBlockNum uint64, blockNum uint64) []uint64

func (*StoreBoundary) InitBoundary added in v0.0.21

func (r *StoreBoundary) InitBoundary(blockNum uint64)

func (*StoreBoundary) OverBoundary added in v0.0.21

func (r *StoreBoundary) OverBoundary(blockNUm uint64) bool

type Streamable added in v0.0.21

type Streamable interface {
	Run(ctx context.Context) error
}

type Trailable added in v0.0.21

type Trailable interface {
	SetTrailer(metadata.MD)
}

type UndoHandler added in v0.0.21

type UndoHandler func(clock *pbsubstreams.Clock, moduleOutput *pbsubstreams.ModuleOutput)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL