execout

package
v1.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotFound = errors.New("inputs module value not found")

Functions

This section is empty.

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer holds the values produced by modules and exchanged between them as a sort of buffer. Here are the types of exec outputs per module type:

               values         valuesForFileOutput
---------------------------------------------------
store:         deltas               kvops
mapper:         data              same data
index:          keys                 --

func NewBuffer

func NewBuffer(blockType string, block *pbbstream.Block, clock *pbsubstreams.Clock) (*Buffer, error)

func (*Buffer) Clock

func (i *Buffer) Clock() *pbsubstreams.Clock

func (*Buffer) Get

func (i *Buffer) Get(moduleName string) (value []byte, cached bool, err error)

func (*Buffer) Len added in v1.4.0

func (i *Buffer) Len() (out int)

func (*Buffer) Set

func (i *Buffer) Set(moduleName string, value []byte) (err error)

func (*Buffer) SetFileOutput added in v1.6.0

func (i *Buffer) SetFileOutput(moduleName string, value []byte) (err error)

type Config

type Config struct {
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(name string, moduleInitialBlock uint64, modKind pbsubstreams.ModuleKind, moduleHash string, baseStore dstore.Store, logger *zap.Logger) (*Config, error)

func (*Config) ListSnapshotFiles

func (c *Config) ListSnapshotFiles(ctx context.Context, inRange *bstream.Range) (files FileInfos, err error)

func (*Config) ModuleInitialBlock

func (c *Config) ModuleInitialBlock() uint64

func (*Config) ModuleKind

func (c *Config) ModuleKind() pbsubstreams.ModuleKind

func (*Config) Name

func (c *Config) Name() string

func (*Config) NewFile

func (c *Config) NewFile(targetRange *block.Range) *File

func (*Config) ReadFile added in v1.4.0

func (c *Config) ReadFile(ctx context.Context, inrange *block.Range) (*File, error)

type Configs

type Configs struct {
	ConfigMap map[string]*Config
	// contains filtered or unexported fields
}

func NewConfigs

func NewConfigs(baseObjectStore dstore.Store, allRequestedModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, execOutputSaveInterval uint64, logger *zap.Logger) (*Configs, error)

func (*Configs) NewFile

func (c *Configs) NewFile(moduleName string, targetRange *block.Range) *File

func (*Configs) NewFileWalker added in v1.1.9

func (c *Configs) NewFileWalker(moduleName string, segmenter *block.Segmenter) *FileWalker

type ExecutionOutput

type ExecutionOutput interface {
	ExecutionOutputGetter
	ExecutionOutputSetter
}

ExecutionOutput gets/sets execution output for a given graph at a given block

type ExecutionOutputGetter

type ExecutionOutputGetter interface {
	Len() int
	Clock() *pbsubstreams.Clock
	Get(name string) (value []byte, cached bool, err error)
}

type ExecutionOutputSetter

type ExecutionOutputSetter interface {
	Set(name string, value []byte) (err error)
	SetFileOutput(name string, value []byte) (err error)
}

type File

type File struct {
	sync.RWMutex
	*block.Range

	ModuleName string
	Kv         map[string]*pboutput.Item
	// contains filtered or unexported fields
}

A File in `execout` stores, for a given module (with a given hash), the outputs of module execution for _multiple blocks_, based on their block ID.

func (*File) ExtractClocks added in v1.5.3

func (c *File) ExtractClocks(clocksMap map[uint64]*pbsubstreams.Clock)

func (*File) Filename

func (c *File) Filename() string

func (*File) Get

func (c *File) Get(clock *pbsubstreams.Clock) ([]byte, bool)

func (*File) GetAtBlock

func (c *File) GetAtBlock(blockNumber uint64) ([]byte, bool)

func (*File) Load

func (c *File) Load(ctx context.Context) error

func (*File) MarshalLogObject

func (c *File) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*File) Save

func (c *File) Save(ctx context.Context) error

func (*File) SetItem

func (c *File) SetItem(clock *pbsubstreams.Clock, data []byte)

func (*File) SortedItems

func (c *File) SortedItems() (out []*pboutput.Item)

func (*File) String

func (c *File) String() string

type FileInfo

type FileInfo struct {
	Filename   string
	BlockRange *block.Range
}

type FileInfos

type FileInfos = []*FileInfo

type FileWalker added in v1.1.9

type FileWalker struct {
	IsLocal bool
	// contains filtered or unexported fields
}

FileWalker allows you to jump from file to file, from segment to segment

func NewFileWalker added in v1.6.2

func NewFileWalker(c *Config, segmenter *block.Segmenter, logger *zap.Logger) *FileWalker

func (*FileWalker) File added in v1.1.9

func (fw *FileWalker) File() *File

File returns the current segment's file. If the current segment is out of ranges, returns nil.

func (*FileWalker) IsDone added in v1.1.9

func (fw *FileWalker) IsDone() bool

func (*FileWalker) Next added in v1.1.9

func (fw *FileWalker) Next()

Move to the next

func (*FileWalker) PreloadNext added in v1.6.0

func (fw *FileWalker) PreloadNext(ctx context.Context)

PreloadNext loads the next file in the background so the consumer doesn't wait between each file. This affects maximum throughput

func (*FileWalker) Progress added in v1.3.2

func (fw *FileWalker) Progress() (first, current, last int)

type Writer

type Writer struct {
	CurrentFile *File
	// contains filtered or unexported fields
}

The Writer writes a single file with executionOutputs that will be read by the LinearExecOutReader. `initialBlockBoundary` is expected to be on a boundary, or to be the module's initial block.

func NewWriter

func NewWriter(initialBlockBoundary, exclusiveEndBlock uint64, outputModule string, configs *Configs, isWriterForIndex bool) *Writer

func (*Writer) Close

func (w *Writer) Close(ctx context.Context) error

func (*Writer) Write

func (w *Writer) Write(clock *pbsubstreams.Clock, buffer *Buffer)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL