rpcconsumer

package
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2024 License: Apache-2.0 Imports: 49 Imported by: 0

README

Lava Server Kit (RPCConsumer)

This README covers technical aspects of running the Lava Server Kit. For more details visit https://docs.lavanet.xyz/access-server-kit or our Discord

Usage

  1. Clone the repository
  2. cd into the repository folder
  3. Run make install-all
  4. Create a configuration file with the following format:
endpoints:
  - network-address: <network-address>
    chain-id: <chain-id>
    api-interface: <api-interface>
  - network-address: <network-address>
    chain-id: <chain-id>
    api-interface: <api-interface>

The network-address specifies the IP address and port number of the node, chain-id specifies the unique identifier of the blockchain, and api-interface specifies the API interface used by the node.

  1. Start the consumer using the command rpcconsumer --config <path/to/config/file>

Documentation

Index

Constants

View Source
const (
	CacheMaxCost     = 2000  // each item cost would be 1
	CacheNumCounters = 20000 // expect 2000 items
	EntryTTL         = 5 * time.Minute
)

this class handles seen block values in requests

View Source
const (
	DefaultRPCConsumerFileName = "rpcconsumer.yml"
	DebugRelaysFlagName        = "debug-relays"
	DebugProbesFlagName        = "debug-probes"
)
View Source
const (
	MaxCallsPerRelay = 50
)
View Source
const (
	MaxRelayRetries = 6
)

Variables

View Source
var (
	Yaml_config_properties         = []string{"network-address", "chain-id", "api-interface"}
	RelaysHealthEnableFlagDefault  = true
	RelayHealthIntervalFlagDefault = 5 * time.Minute
)
View Source
var NoResponseTimeout = sdkerrors.New("NoResponseTimeout Error", 685, "timeout occurred while waiting for providers responses")

Functions

func CreateRPCConsumerCobraCommand

func CreateRPCConsumerCobraCommand() *cobra.Command

func CreateTestRPCConsumerCobraCommand

func CreateTestRPCConsumerCobraCommand() *cobra.Command

func ParseEndpoints

func ParseEndpoints(viper_endpoints *viper.Viper, geolocation uint64) (endpoints []*lavasession.RPCEndpoint, err error)

Types

type AnalyticsServerAddressess

type AnalyticsServerAddressess struct {
	MetricsListenAddress string
	RelayServerAddress   string
	ReportsAddressFlag   string
}

type ConsumerConsistency

type ConsumerConsistency struct {
	// contains filtered or unexported fields
}

func NewConsumerConsistency

func NewConsumerConsistency(specId string) *ConsumerConsistency

func (*ConsumerConsistency) GetSeenBlock

func (cc *ConsumerConsistency) GetSeenBlock(dappId string, ip string) (int64, bool)

func (*ConsumerConsistency) Key

func (cc *ConsumerConsistency) Key(dappId string, ip string) string

func (*ConsumerConsistency) SetSeenBlock

func (cc *ConsumerConsistency) SetSeenBlock(blockSeen int64, dappId string, ip string)

type ConsumerStateTrackerInf

type ConsumerStateTrackerInf interface {
	RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator updaters.VersionValidationInf)
	RegisterConsumerSessionManagerForPairingUpdates(ctx context.Context, consumerSessionManager *lavasession.ConsumerSessionManager)
	RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error
	RegisterFinalizationConsensusForUpdates(context.Context, *lavaprotocol.FinalizationConsensus)
	RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error
	TxConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, sameProviderConflict *conflicttypes.FinalizationConflict, conflictHandler common.ConflictHandlerInterface) error
	GetConsumerPolicy(ctx context.Context, consumerAddress, chainID string) (*plantypes.Policy, error)
	GetProtocolVersion(ctx context.Context) (*updaters.ProtocolVersionResponse, error)
	GetLatestVirtualEpoch() uint64
}

type ConsumerTxSender

type ConsumerTxSender interface {
	TxConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, sameProviderConflict *conflicttypes.FinalizationConflict, conflictHandler common.ConflictHandlerInterface) error
	GetConsumerPolicy(ctx context.Context, consumerAddress, chainID string) (*plantypes.Policy, error)
	GetLatestVirtualEpoch() uint64
}

type RPCConsumer

type RPCConsumer struct {
	// contains filtered or unexported fields
}

func (*RPCConsumer) Start

func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOptions) (err error)

spawns a new RPCConsumer server with all it's processes and internals ready for communications

type RPCConsumerServer

type RPCConsumerServer struct {
	ConsumerAddress sdk.AccAddress
	// contains filtered or unexported fields
}

implements Relay Sender interfaced and uses an ChainListener to get it called

func (*RPCConsumerServer) HandleDirectiveHeadersForMessage

func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage chainlib.ChainMessage, directiveHeaders map[string]string)

func (*RPCConsumerServer) IsHealthy

func (rpccs *RPCConsumerServer) IsHealthy() bool

func (*RPCConsumerServer) LavaDirectiveHeaders

func (rpccs *RPCConsumerServer) LavaDirectiveHeaders(metadata []pairingtypes.Metadata) ([]pairingtypes.Metadata, map[string]string)

func (*RPCConsumerServer) ProcessRelaySend

func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveHeaders map[string]string, chainMessage chainlib.ChainMessage, relayRequestData *pairingtypes.RelayPrivateData, dappID string, consumerIp string) (*RelayProcessor, error)

func (*RPCConsumerServer) SendRelay

func (rpccs *RPCConsumerServer) SendRelay(
	ctx context.Context,
	url string,
	req string,
	connectionType string,
	dappID string,
	consumerIp string,
	analytics *metrics.RelayMetrics,
	metadata []pairingtypes.Metadata,
) (relayResult *common.RelayResult, errRet error)

func (*RPCConsumerServer) ServeRPCRequests

func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndpoint *lavasession.RPCEndpoint,
	consumerStateTracker ConsumerStateTrackerInf,
	chainParser chainlib.ChainParser,
	finalizationConsensus *lavaprotocol.FinalizationConsensus,
	consumerSessionManager *lavasession.ConsumerSessionManager,
	requiredResponses int,
	privKey *btcec.PrivateKey,
	lavaChainID string,
	cache *performance.Cache,
	rpcConsumerLogs *metrics.RPCConsumerLogs,
	consumerAddress sdk.AccAddress,
	consumerConsistency *ConsumerConsistency,
	relaysMonitor *metrics.RelaysMonitor,
	cmdFlags common.ConsumerCmdFlags,
	sharedState bool,
	refererData *chainlib.RefererData,
	reporter metrics.Reporter,
) (err error)

type RelayError

type RelayError struct {
	ProviderInfo common.ProviderInfo
	// contains filtered or unexported fields
}

TODO: there's no need to save error twice and provider info twice, this can just be a relayResponse

func (RelayError) String

func (re RelayError) String() string

type RelayErrors

type RelayErrors struct {
	// contains filtered or unexported fields
}

func (*RelayErrors) GetBestErrorMessageForUser

func (r *RelayErrors) GetBestErrorMessageForUser() RelayError

type RelayProcessor

type RelayProcessor struct {
	// contains filtered or unexported fields
}

func NewRelayProcessor

func NewRelayProcessor(ctx context.Context, usedProviders *lavasession.UsedProviders, requiredSuccesses int, chainMessage chainlib.ChainMessage, consumerConsistency *ConsumerConsistency, dappID string, consumerIp string) *RelayProcessor

func (*RelayProcessor) GetUsedProviders

func (rp *RelayProcessor) GetUsedProviders() *lavasession.UsedProviders

func (*RelayProcessor) HasRequiredNodeResults

func (rp *RelayProcessor) HasRequiredNodeResults() bool

func (*RelayProcessor) HasResults

func (rp *RelayProcessor) HasResults() bool

this function defines if we should use the processor to return the result (meaning it has some insight and responses) or just return to the user

func (*RelayProcessor) NodeResults

func (rp *RelayProcessor) NodeResults() []common.RelayResult

this function returns all results that came from a node, meaning success, and node errors

func (*RelayProcessor) ProcessingResult

func (rp *RelayProcessor) ProcessingResult() (returnedResult *common.RelayResult, processingError error)

this function returns the results according to the defined strategy results were stored in WaitForResults and now there's logic to select which results are returned to the user will return an error if we did not meet quota of replies, if we did we follow the strategies: if return strategy == get_first: return the first success, if none: get best node error if strategy == quorum get majority of node responses on error: we will return a placeholder relayResult, with a provider address and a status code

func (*RelayProcessor) ProtocolErrors

func (rp *RelayProcessor) ProtocolErrors() uint64

func (*RelayProcessor) SetResponse

func (rp *RelayProcessor) SetResponse(response *relayResponse)

func (*RelayProcessor) String

func (rp *RelayProcessor) String() string

func (*RelayProcessor) WaitForResults

func (rp *RelayProcessor) WaitForResults(ctx context.Context) error

this function waits for the processing results, they are written by multiple go routines and read by this go routine it then updates the responses in their respective place, node errors, protocol errors or success results

type Selection

type Selection int
const (
	Quorum     Selection = iota // get the majority out of requiredSuccesses
	BestResult                  // get the best result, even if it means waiting
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL