execout

package
v1.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2023 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NotFound = errors.New("inputs module value not found")

Functions

func ComputeStartBlock

func ComputeStartBlock(startBlock uint64, saveBlockInterval uint64) uint64

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields

} // TODO(abourget): rename to `Buffer`

Buffer holds the values produced by modules and exchanged between them as a sort of buffer.

func NewBuffer

func NewBuffer(blockType string, block *bstream.Block, clock *pbsubstreams.Clock) (*Buffer, error)

func (*Buffer) Clock

func (i *Buffer) Clock() *pbsubstreams.Clock

func (*Buffer) Get

func (i *Buffer) Get(moduleName string) (value []byte, cached bool, err error)

func (*Buffer) Set

func (i *Buffer) Set(moduleName string, value []byte) (err error)

type Config

type Config struct {
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(name string, moduleInitialBlock uint64, modKind pbsubstreams.ModuleKind, moduleHash string, baseStore dstore.Store, logger *zap.Logger) (*Config, error)

func (*Config) ListSnapshotFiles

func (c *Config) ListSnapshotFiles(ctx context.Context) (files FileInfos, err error)

func (*Config) ModuleInitialBlock

func (c *Config) ModuleInitialBlock() uint64

func (*Config) ModuleKind

func (c *Config) ModuleKind() pbsubstreams.ModuleKind

func (*Config) Name

func (c *Config) Name() string

func (*Config) NewFile

func (c *Config) NewFile(targetRange *block.BoundedRange) *File

type Configs

type Configs struct {
	ConfigMap map[string]*Config
	// contains filtered or unexported fields
}

func NewConfigs

func NewConfigs(baseObjectStore dstore.Store, allRequestedModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, execOutputSaveInterval uint64, logger *zap.Logger) (*Configs, error)

func (*Configs) NewFile

func (c *Configs) NewFile(moduleName string, targetRange *block.BoundedRange) *File

type ExecutionOutput

type ExecutionOutput interface {
	ExecutionOutputGetter
	ExecutionOutputSetter
}

ExecutionOutput gets/sets execution output for a given graph at a given block

type ExecutionOutputGetter

type ExecutionOutputGetter interface {
	Clock() *pbsubstreams.Clock
	Get(name string) (value []byte, cached bool, err error)
}

type ExecutionOutputSetter

type ExecutionOutputSetter interface {
	Set(name string, value []byte) (err error)
}

type File

type File struct {
	sync.RWMutex
	*block.BoundedRange

	ModuleName string
	// contains filtered or unexported fields
}

A File in `execout` stores, for a given module (with a given hash), the outputs of module execution for _multiple blocks_, based on their block ID.

func (*File) Filename

func (c *File) Filename() string

func (*File) Get

func (c *File) Get(clock *pbsubstreams.Clock) ([]byte, bool)

func (*File) GetAtBlock

func (c *File) GetAtBlock(blockNumber uint64) ([]byte, bool)

func (*File) Load

func (c *File) Load(ctx context.Context) error

func (*File) MarshalLogObject

func (c *File) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*File) NextFile

func (c *File) NextFile() *File

NextFile initializes a new *File pointing to the next boundary, according to `targetRange`.

func (*File) Save

func (c *File) Save(ctx context.Context) (func(), error)

func (*File) SetItem

func (c *File) SetItem(clock *pbsubstreams.Clock, data []byte)

func (*File) SortedItems

func (c *File) SortedItems() (out []*pboutput.Item)

func (*File) String

func (c *File) String() string

type FileInfo

type FileInfo struct {
	Filename   string
	BlockRange *block.Range
}

type FileInfos

type FileInfos = []*FileInfo

type LinearReader

type LinearReader struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewLinearReader

func NewLinearReader(
	startBlock uint64,
	exclusiveEndBlock uint64,
	module *pbsubstreams.Module,
	firstFile *File,
	responseFunc substreams.ResponseFunc,
	execOutputSaveInterval uint64,
	pendingUndoMessage *pbsubstreamsrpc.Response,
) *LinearReader

func (*LinearReader) Launch

func (r *LinearReader) Launch(ctx context.Context)

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

The Writer is responsible for writing an rotating files containing execution outputs. Those files will then be read by the LinearExecOutReader. `initialBlockBoundary` is expected to be on a boundary, or to be modules' initial blocks.

func NewWriter

func NewWriter(initialBlockBoundary, exclusiveEndBlock uint64, outputModule string, configs *Configs, isSubRequest bool) *Writer

func (*Writer) Close

func (w *Writer) Close()

func (*Writer) MaybeRotate

func (w *Writer) MaybeRotate(ctx context.Context, clockNumber uint64) error

func (*Writer) Write

func (w *Writer) Write(clock *pbsubstreams.Clock, buffer *Buffer)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL