Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Aggregator ¶
type Aggregator interface { // AggregateDataForSlotFromRelays collects data from multiple relays for a specific slot. AggregateDataForSlotFromRelays( ctx context.Context, slot phase0.Slot, ) (*common.AggregatedRelayData, error) }
Aggregator is used to aggregate data from multiple relays.
type DefaultAggregator ¶
type DefaultAggregator struct {
// contains filtered or unexported fields
}
The DefaultAggregator is an implementation of the Aggregator which uses multiple RelayAPIClient (one per relay) to aggregate relay data.
func NewDefaultAggregator ¶
func NewDefaultAggregator() *DefaultAggregator
NewDefaultAggregator creates an empty and non-initialized DefaultAggregator.
func (*DefaultAggregator) AggregateDataForSlotFromRelays ¶
func (aggregator *DefaultAggregator) AggregateDataForSlotFromRelays( ctx context.Context, slot phase0.Slot, ) (*common.AggregatedRelayData, error)
AggregateDataForSlotFromRelays implements Aggregator.AggregateDataForSlotFromRelays.
It uses a map/reduce procedure to distribute each request made to the relays to distinct workers (the map phase). Then, it waits for the results to come back and aggregates those in a single placeholder (the reduce phase).
In the scenario where part of the data collection has failed, the aggregator still returns the successfully collected data.
func (*DefaultAggregator) Init ¶
func (aggregator *DefaultAggregator) Init(relayAPIClients ...client.RelayAPI)
Init initializes an DefaultAggregator service given multiple clients able to collect data from relays.
type DefaultDataAggregationError ¶
type DefaultDataAggregationError struct { // The slot used as a parameter in the request made to the relay. Slot phase0.Slot // A 1:1 mapping between a relay API URL and the error that has been encountered when making // the request to get the data from the relay. RelayErrors map[string]error }
DefaultDataAggregationError is raised if at least one of the request made to collect data from a relay has encountered an error.
func NewDefaultDataAggregationError ¶
func NewDefaultDataAggregationError() *DefaultDataAggregationError
NewDefaultDataAggregationError creates an empty and non-initialized DefaultDataAggregationError.
func (*DefaultDataAggregationError) Error ¶
func (err *DefaultDataAggregationError) Error() string
func (*DefaultDataAggregationError) RecordFailure ¶
func (err *DefaultDataAggregationError) RecordFailure(relayAPIURL string, relayError error)
RecordFailure is used during the reduce phase of the aggregation process of the AgateDataAggregator service. It saves a record of an error that has been encountered when collecting data from a relay.
type Preprocessor ¶
type Preprocessor struct{}
The Preprocessor is used to transform the raw data acquired by the data aggregator service, which is then used by the storage manager to save it in a database.
func NewPreprocessor ¶
func NewPreprocessor() *Preprocessor
NewPreprocessor creates an empty and non-initialized Preprocessor.
func (*Preprocessor) Preprocess ¶
func (preprocessor *Preprocessor) Preprocess( data *common.AggregatedRelayData, ) *common.DataPreprocessorOutput
Preprocess transforms the Aggregator's aggregation output into a data structure that the storage.Manager service can store in the database.