data

package
v0.0.0-...-663aea3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregator

type Aggregator interface {
	// AggregateDataForSlotFromRelays collects data from multiple relays for a specific slot.
	AggregateDataForSlotFromRelays(
		ctx context.Context,
		slot phase0.Slot,
	) (*common.AggregatedRelayData, error)
}

Aggregator is used to aggregate data from multiple relays.

type DefaultAggregator

type DefaultAggregator struct {
	// contains filtered or unexported fields
}

The DefaultAggregator is an implementation of the Aggregator which uses multiple RelayAPIClient (one per relay) to aggregate relay data.

func NewDefaultAggregator

func NewDefaultAggregator() *DefaultAggregator

NewDefaultAggregator creates an empty and non-initialized DefaultAggregator.

func (*DefaultAggregator) AggregateDataForSlotFromRelays

func (aggregator *DefaultAggregator) AggregateDataForSlotFromRelays(
	ctx context.Context,
	slot phase0.Slot,
) (*common.AggregatedRelayData, error)

AggregateDataForSlotFromRelays implements Aggregator.AggregateDataForSlotFromRelays.

It uses a map/reduce procedure to distribute each request made to the relays to distinct workers (the map phase). Then, it waits for the results to come back and aggregates those in a single placeholder (the reduce phase).

In the scenario where part of the data collection has failed, the aggregator still returns the successfully collected data.

func (*DefaultAggregator) Init

func (aggregator *DefaultAggregator) Init(relayAPIClients ...client.RelayAPI)

Init initializes an DefaultAggregator service given multiple clients able to collect data from relays.

type DefaultDataAggregationError

type DefaultDataAggregationError struct {
	// The slot used as a parameter in the request made to the relay.
	Slot phase0.Slot
	// A 1:1 mapping between a relay API URL and the error that has been encountered when making
	// the request to get the data from the relay.
	RelayErrors map[string]error
}

DefaultDataAggregationError is raised if at least one of the request made to collect data from a relay has encountered an error.

func NewDefaultDataAggregationError

func NewDefaultDataAggregationError() *DefaultDataAggregationError

NewDefaultDataAggregationError creates an empty and non-initialized DefaultDataAggregationError.

func (*DefaultDataAggregationError) Error

func (err *DefaultDataAggregationError) Error() string

func (*DefaultDataAggregationError) RecordFailure

func (err *DefaultDataAggregationError) RecordFailure(relayAPIURL string, relayError error)

RecordFailure is used during the reduce phase of the aggregation process of the AgateDataAggregator service. It saves a record of an error that has been encountered when collecting data from a relay.

type Preprocessor

type Preprocessor struct{}

The Preprocessor is used to transform the raw data acquired by the data aggregator service, which is then used by the storage manager to save it in a database.

func NewPreprocessor

func NewPreprocessor() *Preprocessor

NewPreprocessor creates an empty and non-initialized Preprocessor.

func (*Preprocessor) Preprocess

func (preprocessor *Preprocessor) Preprocess(
	data *common.AggregatedRelayData,
) *common.DataPreprocessorOutput

Preprocess transforms the Aggregator's aggregation output into a data structure that the storage.Manager service can store in the database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL