Documentation ¶
Index ¶
- Constants
- Variables
- func CreateRPCConsumerCobraCommand() *cobra.Command
- func CreateTestRPCConsumerCobraCommand() *cobra.Command
- func ParseEndpoints(viper_endpoints *viper.Viper, geolocation uint64) (endpoints []*lavasession.RPCEndpoint, err error)
- type AnalyticsServerAddressess
- type ConsumerConsistency
- type ConsumerStateTrackerInf
- type ConsumerTxSender
- type RPCConsumer
- type RPCConsumerServer
- func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage chainlib.ChainMessage, directiveHeaders map[string]string)
- func (rpccs *RPCConsumerServer) IsHealthy() bool
- func (rpccs *RPCConsumerServer) LavaDirectiveHeaders(metadata []pairingtypes.Metadata) ([]pairingtypes.Metadata, map[string]string)
- func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveHeaders map[string]string, ...) (*RelayProcessor, error)
- func (rpccs *RPCConsumerServer) SendRelay(ctx context.Context, url string, req string, connectionType string, ...) (relayResult *common.RelayResult, errRet error)
- func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndpoint *lavasession.RPCEndpoint, ...) (err error)
- type RelayError
- type RelayErrors
- type RelayProcessor
- func (rp *RelayProcessor) GetUsedProviders() *lavasession.UsedProviders
- func (rp *RelayProcessor) HasRequiredNodeResults() bool
- func (rp *RelayProcessor) HasResults() bool
- func (rp *RelayProcessor) NodeResults() []common.RelayResult
- func (rp *RelayProcessor) ProcessingResult() (returnedResult *common.RelayResult, processingError error)
- func (rp *RelayProcessor) ProtocolErrors() uint64
- func (rp *RelayProcessor) SetResponse(response *relayResponse)
- func (rp *RelayProcessor) String() string
- func (rp *RelayProcessor) WaitForResults(ctx context.Context) error
- type Selection
Constants ¶
const ( CacheMaxCost = 2000 // each item cost would be 1 CacheNumCounters = 20000 // expect 2000 items EntryTTL = 5 * time.Minute )
this class handles seen block values in requests
const ( DefaultRPCConsumerFileName = "rpcconsumer.yml" DebugRelaysFlagName = "debug-relays" DebugProbesFlagName = "debug-probes" )
const ( MaxRelayRetries = 6 SendRelayAttempts = 3 )
const (
MaxCallsPerRelay = 50
)
Variables ¶
var ( Yaml_config_properties = []string{"network-address", "chain-id", "api-interface"} RelaysHealthEnableFlagDefault = true RelayHealthIntervalFlagDefault = 5 * time.Minute )
var NoResponseTimeout = sdkerrors.New("NoResponseTimeout Error", 685, "timeout occurred while waiting for providers responses")
Functions ¶
func ParseEndpoints ¶
func ParseEndpoints(viper_endpoints *viper.Viper, geolocation uint64) (endpoints []*lavasession.RPCEndpoint, err error)
Types ¶
type ConsumerConsistency ¶
type ConsumerConsistency struct {
// contains filtered or unexported fields
}
func NewConsumerConsistency ¶
func NewConsumerConsistency(specId string) *ConsumerConsistency
func (*ConsumerConsistency) GetSeenBlock ¶
func (cc *ConsumerConsistency) GetSeenBlock(dappId string, ip string) (int64, bool)
func (*ConsumerConsistency) Key ¶
func (cc *ConsumerConsistency) Key(dappId string, ip string) string
func (*ConsumerConsistency) SetSeenBlock ¶
func (cc *ConsumerConsistency) SetSeenBlock(blockSeen int64, dappId string, ip string)
type ConsumerStateTrackerInf ¶
type ConsumerStateTrackerInf interface { RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator updaters.VersionValidationInf) RegisterConsumerSessionManagerForPairingUpdates(ctx context.Context, consumerSessionManager *lavasession.ConsumerSessionManager) RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error RegisterFinalizationConsensusForUpdates(context.Context, *lavaprotocol.FinalizationConsensus) RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error TxConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, sameProviderConflict *conflicttypes.FinalizationConflict, conflictHandler common.ConflictHandlerInterface) error GetConsumerPolicy(ctx context.Context, consumerAddress, chainID string) (*plantypes.Policy, error) GetProtocolVersion(ctx context.Context) (*updaters.ProtocolVersionResponse, error) GetLatestVirtualEpoch() uint64 }
type ConsumerTxSender ¶
type ConsumerTxSender interface { TxConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, sameProviderConflict *conflicttypes.FinalizationConflict, conflictHandler common.ConflictHandlerInterface) error GetConsumerPolicy(ctx context.Context, consumerAddress, chainID string) (*plantypes.Policy, error) GetLatestVirtualEpoch() uint64 }
type RPCConsumer ¶
type RPCConsumer struct {
// contains filtered or unexported fields
}
type RPCConsumerServer ¶
type RPCConsumerServer struct { ConsumerAddress sdk.AccAddress // contains filtered or unexported fields }
implements Relay Sender interfaced and uses an ChainListener to get it called
func (*RPCConsumerServer) HandleDirectiveHeadersForMessage ¶
func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage chainlib.ChainMessage, directiveHeaders map[string]string)
func (*RPCConsumerServer) IsHealthy ¶
func (rpccs *RPCConsumerServer) IsHealthy() bool
func (*RPCConsumerServer) LavaDirectiveHeaders ¶
func (rpccs *RPCConsumerServer) LavaDirectiveHeaders(metadata []pairingtypes.Metadata) ([]pairingtypes.Metadata, map[string]string)
func (*RPCConsumerServer) ProcessRelaySend ¶
func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveHeaders map[string]string, chainMessage chainlib.ChainMessage, relayRequestData *pairingtypes.RelayPrivateData, dappID string, consumerIp string) (*RelayProcessor, error)
func (*RPCConsumerServer) SendRelay ¶
func (rpccs *RPCConsumerServer) SendRelay( ctx context.Context, url string, req string, connectionType string, dappID string, consumerIp string, analytics *metrics.RelayMetrics, metadata []pairingtypes.Metadata, ) (relayResult *common.RelayResult, errRet error)
func (*RPCConsumerServer) ServeRPCRequests ¶
func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndpoint *lavasession.RPCEndpoint, consumerStateTracker ConsumerStateTrackerInf, chainParser chainlib.ChainParser, finalizationConsensus *lavaprotocol.FinalizationConsensus, consumerSessionManager *lavasession.ConsumerSessionManager, requiredResponses int, privKey *btcec.PrivateKey, lavaChainID string, cache *performance.Cache, rpcConsumerLogs *metrics.RPCConsumerLogs, consumerAddress sdk.AccAddress, consumerConsistency *ConsumerConsistency, relaysMonitor *metrics.RelaysMonitor, cmdFlags common.ConsumerCmdFlags, sharedState bool, refererData *chainlib.RefererData, reporter metrics.Reporter, ) (err error)
type RelayError ¶
type RelayError struct { ProviderInfo common.ProviderInfo // contains filtered or unexported fields }
TODO: there's no need to save error twice and provider info twice, this can just be a relayResponse
func (RelayError) String ¶
func (re RelayError) String() string
type RelayErrors ¶
type RelayErrors struct {
// contains filtered or unexported fields
}
func (*RelayErrors) GetBestErrorMessageForUser ¶
func (r *RelayErrors) GetBestErrorMessageForUser() RelayError
type RelayProcessor ¶
type RelayProcessor struct {
// contains filtered or unexported fields
}
func NewRelayProcessor ¶
func NewRelayProcessor(ctx context.Context, usedProviders *lavasession.UsedProviders, requiredSuccesses int, chainMessage chainlib.ChainMessage, consumerConsistency *ConsumerConsistency, dappID string, consumerIp string) *RelayProcessor
func (*RelayProcessor) GetUsedProviders ¶
func (rp *RelayProcessor) GetUsedProviders() *lavasession.UsedProviders
func (*RelayProcessor) HasRequiredNodeResults ¶
func (rp *RelayProcessor) HasRequiredNodeResults() bool
func (*RelayProcessor) HasResults ¶
func (rp *RelayProcessor) HasResults() bool
this function defines if we should use the processor to return the result (meaning it has some insight and responses) or just return to the user
func (*RelayProcessor) NodeResults ¶
func (rp *RelayProcessor) NodeResults() []common.RelayResult
this function returns all results that came from a node, meaning success, and node errors
func (*RelayProcessor) ProcessingResult ¶
func (rp *RelayProcessor) ProcessingResult() (returnedResult *common.RelayResult, processingError error)
this function returns the results according to the defined strategy results were stored in WaitForResults and now there's logic to select which results are returned to the user will return an error if we did not meet quota of replies, if we did we follow the strategies: if return strategy == get_first: return the first success, if none: get best node error if strategy == quorum get majority of node responses on error: we will return a placeholder relayResult, with a provider address and a status code
func (*RelayProcessor) ProtocolErrors ¶
func (rp *RelayProcessor) ProtocolErrors() uint64
func (*RelayProcessor) SetResponse ¶
func (rp *RelayProcessor) SetResponse(response *relayResponse)
func (*RelayProcessor) String ¶
func (rp *RelayProcessor) String() string
func (*RelayProcessor) WaitForResults ¶
func (rp *RelayProcessor) WaitForResults(ctx context.Context) error
this function waits for the processing results, they are written by multiple go routines and read by this go routine it then updates the responses in their respective place, node errors, protocol errors or success results