Documentation ¶
Index ¶
- Constants
- Variables
- func PrintRPCEndpoint(endpoint *RPCEndpoint) (retStr string)
- func PrintRPCProviderEndpoint(endpoint *RPCProviderEndpoint) (retStr string)
- type ConsumerSessionManager
- func (csm *ConsumerSessionManager) GetAtomicPairingAddressesLength() uint64
- func (csm *ConsumerSessionManager) GetDataReliabilitySession(ctx context.Context, originalProviderAddress string, index int64, ...) (singleConsumerSession *SingleConsumerSession, providerAddress string, ...)
- func (csm *ConsumerSessionManager) GetReportedProviders(epoch uint64) ([]byte, error)
- func (csm *ConsumerSessionManager) GetSession(ctx context.Context, cuNeededForSession uint64, ...) (consumerSession *SingleConsumerSession, epoch uint64, ...)
- func (csm *ConsumerSessionManager) GetSessionFromAllExcept(ctx context.Context, bannedAddresses map[string]struct{}, cuNeeded uint64, ...) (consumerSession *SingleConsumerSession, epoch uint64, ...)
- func (csm *ConsumerSessionManager) OnDataReliabilitySessionDone(consumerSession *SingleConsumerSession, latestServicedBlock int64, ...) error
- func (csm *ConsumerSessionManager) OnDataReliabilitySessionFailure(consumerSession *SingleConsumerSession, errorReceived error) error
- func (csm *ConsumerSessionManager) OnSessionDone(consumerSession *SingleConsumerSession, epoch uint64, ...) error
- func (csm *ConsumerSessionManager) OnSessionDoneIncreaseRelayAndCu(consumerSession *SingleConsumerSession) error
- func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsumerSession, errorReceived error) error
- func (csm *ConsumerSessionManager) OnSessionUnUsed(consumerSession *SingleConsumerSession) error
- func (csm *ConsumerSessionManager) RPCEndpoint() RPCEndpoint
- func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList []*ConsumerSessionsWithProvider) error
- type ConsumerSessionsWithProvider
- type DataReliabilitySession
- type Endpoint
- type ProviderSessionManager
- func (psm *ProviderSessionManager) GetDataReliabilitySession(address string, epoch uint64) (err error)
- func (psm *ProviderSessionManager) GetSession(address string, id uint64, epoch uint64, relayNum uint64, sessionId uint64) (*SingleProviderSession, error)
- func (psm *ProviderSessionManager) IsActiveConsumer(epoch uint64, address string) (active bool, err error)
- func (psm *ProviderSessionManager) IsValidEpoch(epoch uint64) bool
- func (psm *ProviderSessionManager) OnSessionDone(proof string) (epoch uint64, err error)
- func (psm *ProviderSessionManager) OnSessionFailure() (epoch uint64, err error)
- func (psm *ProviderSessionManager) RPCProviderEndpoint() *RPCProviderEndpoint
- func (psm *ProviderSessionManager) ReportConsumer() (address string, epoch uint64, err error)
- func (psm *ProviderSessionManager) UpdateEpoch(epoch uint64)
- type ProviderSessionsEpochData
- type ProviderSessionsWithConsumer
- type RPCEndpoint
- type RPCProviderEndpoint
- type SingleConsumerSession
- type SingleProviderSession
- type StateQuery
Constants ¶
const ( MaxConsecutiveConnectionAttempts = 10 TimeoutForEstablishingAConnection = 1 * time.Second MaxSessionsAllowedPerProvider = 1000 // Max number of sessions allowed per provider MaxAllowedBlockListedSessionPerProvider = 3 MaximumNumberOfFailuresAllowedPerConsumerSession = 3 RelayNumberIncrement = 1 DataReliabilitySessionId = 0 // data reliability session id is 0. we can change to more sessions later if needed. DataReliabilityCuSum = 0 GeolocationFlag = "geolocation" )
const ( PercentileToCalculateLatency = 0.9 MinProvidersForSync = 0.6 LatencyThresholdStatic = 1 * time.Second LatencyThresholdSlope = 1 * time.Millisecond StaleEpochDistance = 3 // relays done 3 epochs back are ready to be rewarded )
Variables ¶
var ( PairingListEmptyError = sdkerrors.New("pairingListEmpty Error", 665, "No pairings available.") // client could not connect to any provider. UnreachableCodeError = sdkerrors.New("UnreachableCode Error", 666, "Should not get here.") AllProviderEndpointsDisabledError = sdkerrors.New("AllProviderEndpointsDisabled Error", 667, "All endpoints are not available.") // a provider is completely unresponsive all endpoints are not available MaximumNumberOfSessionsExceededError = sdkerrors.New("MaximumNumberOfSessionsExceeded Error", 668, "Provider reached maximum number of active sessions.") MaxComputeUnitsExceededError = sdkerrors.New("MaxComputeUnitsExceeded Error", 669, "Consumer is trying to exceed the maximum number of compute uints available.") EpochMismatchError = sdkerrors.New("ReportingAnOldEpoch Error", 670, "Tried to Report to an older epoch") AddressIndexWasNotFoundError = sdkerrors.New("AddressIndexWasNotFound Error", 671, "address index was not found in list") LockMisUseDetectedError = sdkerrors.New("LockMisUseDetected Error", 672, "Faulty use of locks detected") SessionIsAlreadyBlockListedError = sdkerrors.New("SessionIsAlreadyBlockListed Error", 673, "Session is already in block list") NegativeComputeUnitsAmountError = sdkerrors.New("NegativeComputeUnitsAmount", 674, "Tried to subtract to negative compute units amount") ReportAndBlockProviderError = sdkerrors.New("ReportAndBlockProvider Error", 675, "Report and block the provider") BlockProviderError = sdkerrors.New("BlockProvider Error", 676, "Block the provider") SessionOutOfSyncError = sdkerrors.New("SessionOutOfSync Error", 677, "Session went out of sync with the provider") MaximumNumberOfBlockListedSessionsError = sdkerrors.New("MaximumNumberOfBlockListedSessions Error", 678, "Provider reached maximum number of block listed sessions.") SendRelayError = sdkerrors.New("SendRelay Error", 679, "Failed To Send Relay") DataReliabilityIndexRequestedIsOriginalProviderError = sdkerrors.New("DataReliabilityIndexRequestedIsOriginalProvider Error", 680, "Data reliability session index belongs to the original provider") DataReliabilityIndexOutOfRangeError = sdkerrors.New("DataReliabilityIndexOutOfRange Error", 681, "Trying to get provider index out of range") DataReliabilityAlreadySentThisEpochError = sdkerrors.New("DataReliabilityAlreadySentThisEpoch Error", 682, "Trying to send data reliability more than once per provider per epoch") FailedToConnectToEndPointForDataReliabilityError = sdkerrors.New("FailedToConnectToEndPointForDataReliability Error", 683, "Failed to connect to a providers endpoints") DataReliabilityEpochMismatchError = sdkerrors.New("DataReliabilityEpochMismatch Error", 684, "Data reliability epoch mismatch original session epoch.") )
var ( InvalidEpochError = sdkerrors.New("InvalidEpoch Error", 881, "Requested Epoch Is Too Old") NewSessionWithRelayNumError = sdkerrors.New("NewSessionWithRelayNum Error", 882, "Requested Session With Relay Number Is Invalid") ConsumerIsBlockListed = sdkerrors.New("ConsumerIsBlockListed Error", 883, "This Consumer Is Blocked.") ConsumerNotActive = sdkerrors.New("ConsumerNotActive Error", 884, "This Consumer Is Not Active.") )
var AvailabilityPercentage sdk.Dec = sdk.NewDecWithPrec(5, 2) // TODO move to params pairing
Functions ¶
func PrintRPCEndpoint ¶ added in v0.5.1
func PrintRPCEndpoint(endpoint *RPCEndpoint) (retStr string)
func PrintRPCProviderEndpoint ¶ added in v0.5.1
func PrintRPCProviderEndpoint(endpoint *RPCProviderEndpoint) (retStr string)
Types ¶
type ConsumerSessionManager ¶
type ConsumerSessionManager struct {
// contains filtered or unexported fields
}
created with NewConsumerSessionManager
func NewConsumerSessionManager ¶
func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint) *ConsumerSessionManager
func (*ConsumerSessionManager) GetAtomicPairingAddressesLength ¶
func (csm *ConsumerSessionManager) GetAtomicPairingAddressesLength() uint64
Atomically read csm.pairingAddressesLength for data reliability.
func (*ConsumerSessionManager) GetDataReliabilitySession ¶ added in v0.5.1
func (csm *ConsumerSessionManager) GetDataReliabilitySession(ctx context.Context, originalProviderAddress string, index int64, sessionEpoch uint64) (singleConsumerSession *SingleConsumerSession, providerAddress string, epoch uint64, err error)
Get a Data Reliability Session
func (*ConsumerSessionManager) GetReportedProviders ¶
func (csm *ConsumerSessionManager) GetReportedProviders(epoch uint64) ([]byte, error)
Get the reported providers currently stored in the session manager.
func (*ConsumerSessionManager) GetSession ¶ added in v0.5.1
func (csm *ConsumerSessionManager) GetSession(ctx context.Context, cuNeededForSession uint64, initUnwantedProviders map[string]struct{}) ( consumerSession *SingleConsumerSession, epoch uint64, providerPublicAddress string, reportedProviders []byte, errRet error, )
GetSession will return a ConsumerSession, given cu needed for that session. The user can also request specific providers to not be included in the search for a session.
func (*ConsumerSessionManager) GetSessionFromAllExcept ¶ added in v0.5.1
func (csm *ConsumerSessionManager) GetSessionFromAllExcept(ctx context.Context, bannedAddresses map[string]struct{}, cuNeeded uint64, bannedAddressesEpoch uint64) (consumerSession *SingleConsumerSession, epoch uint64, providerPublicAddress string, reportedProviders []byte, err error)
get a session from the pool except specific providers, which also validates the epoch.
func (*ConsumerSessionManager) OnDataReliabilitySessionDone ¶ added in v0.5.1
func (csm *ConsumerSessionManager) OnDataReliabilitySessionDone(consumerSession *SingleConsumerSession, latestServicedBlock int64, specComputeUnits uint64, currentLatency time.Duration, expectedBH int64, numOfProviders int, providersCount uint64, ) error
On a successful DataReliability session we don't need to increase and update any field, we just need to unlock the session.
func (*ConsumerSessionManager) OnDataReliabilitySessionFailure ¶ added in v0.5.1
func (csm *ConsumerSessionManager) OnDataReliabilitySessionFailure(consumerSession *SingleConsumerSession, errorReceived error) error
On a failed DataReliability session we don't decrease the cu unlike a normal session, we just unlock and verify if we need to block this session or provider.
func (*ConsumerSessionManager) OnSessionDone ¶
func (csm *ConsumerSessionManager) OnSessionDone( consumerSession *SingleConsumerSession, epoch uint64, latestServicedBlock int64, specComputeUnits uint64, currentLatency time.Duration, expectedBH int64, numOfProviders int, providersCount uint64, ) error
On a successful session this function will update all necessary fields in the consumerSession. and unlock it when it finishes
func (*ConsumerSessionManager) OnSessionDoneIncreaseRelayAndCu ¶ added in v0.5.1
func (csm *ConsumerSessionManager) OnSessionDoneIncreaseRelayAndCu(consumerSession *SingleConsumerSession) error
On a successful Subscribe relay
func (*ConsumerSessionManager) OnSessionFailure ¶
func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsumerSession, errorReceived error) error
Report session failure, mark it as blocked from future usages, report if timeout happened.
func (*ConsumerSessionManager) OnSessionUnUsed ¶ added in v0.5.1
func (csm *ConsumerSessionManager) OnSessionUnUsed(consumerSession *SingleConsumerSession) error
A Session can be created but unused if consumer found the response in the cache. So we need to unlock the session and decrease the cu that were applied
func (*ConsumerSessionManager) RPCEndpoint ¶
func (csm *ConsumerSessionManager) RPCEndpoint() RPCEndpoint
func (*ConsumerSessionManager) UpdateAllProviders ¶
func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList []*ConsumerSessionsWithProvider) error
Update the provider pairing list for the ConsumerSessionManager
type ConsumerSessionsWithProvider ¶
type ConsumerSessionsWithProvider struct { Lock utils.LavaMutex PublicLavaAddress string Endpoints []*Endpoint Sessions map[int64]*SingleConsumerSession MaxComputeUnits uint64 UsedComputeUnits uint64 ReliabilitySent bool PairingEpoch uint64 }
func (*ConsumerSessionsWithProvider) GetPairingEpoch ¶
func (cswp *ConsumerSessionsWithProvider) GetPairingEpoch() uint64
type DataReliabilitySession ¶
type DataReliabilitySession struct { SingleConsumerSession *SingleConsumerSession Epoch uint64 ProviderPublicAddress string UniqueIdentifier bool }
type Endpoint ¶
type Endpoint struct { NetworkAddress string // change at the end to NetworkAddress Enabled bool Client *pairingtypes.RelayerClient ConnectionRefusals uint64 }
type ProviderSessionManager ¶
type ProviderSessionManager struct {
// contains filtered or unexported fields
}
func NewProviderSessionManager ¶
func NewProviderSessionManager(rpcProviderEndpoint *RPCProviderEndpoint, stateQuery StateQuery) *ProviderSessionManager
Returning a new provider session manager
func (*ProviderSessionManager) GetDataReliabilitySession ¶ added in v0.5.1
func (psm *ProviderSessionManager) GetDataReliabilitySession(address string, epoch uint64) (err error)
func (*ProviderSessionManager) GetSession ¶
func (psm *ProviderSessionManager) GetSession(address string, id uint64, epoch uint64, relayNum uint64, sessionId uint64) (*SingleProviderSession, error)
func (*ProviderSessionManager) IsActiveConsumer ¶ added in v0.5.1
func (psm *ProviderSessionManager) IsActiveConsumer(epoch uint64, address string) (active bool, err error)
Check if consumer exists and is not blocked, if all is valid return the ProviderSessionsWithConsumer pointer
func (*ProviderSessionManager) IsValidEpoch ¶
func (psm *ProviderSessionManager) IsValidEpoch(epoch uint64) bool
func (*ProviderSessionManager) OnSessionDone ¶
func (psm *ProviderSessionManager) OnSessionDone(proof string) (epoch uint64, err error)
func (*ProviderSessionManager) OnSessionFailure ¶
func (psm *ProviderSessionManager) OnSessionFailure() (epoch uint64, err error)
func (*ProviderSessionManager) RPCProviderEndpoint ¶
func (psm *ProviderSessionManager) RPCProviderEndpoint() *RPCProviderEndpoint
func (*ProviderSessionManager) ReportConsumer ¶
func (psm *ProviderSessionManager) ReportConsumer() (address string, epoch uint64, err error)
func (*ProviderSessionManager) UpdateEpoch ¶
func (psm *ProviderSessionManager) UpdateEpoch(epoch uint64)
type ProviderSessionsWithConsumer ¶ added in v0.5.1
type ProviderSessionsWithConsumer struct { Sessions map[uint64]*SingleProviderSession Lock sync.RWMutex // contains filtered or unexported fields }
holds all of the data for a consumer for a certain epoch
func (*ProviderSessionsWithConsumer) GetExistingSession ¶ added in v0.5.1
func (pswc *ProviderSessionsWithConsumer) GetExistingSession(sessionId uint64) (session *SingleProviderSession, err error)
type RPCEndpoint ¶
type RPCEndpoint struct { NetworkAddress string `yaml:"network-address,omitempty" json:"network-address,omitempty" mapstructure:"network-address"` // IP:PORT ChainID string `yaml:"chain-id,omitempty" json:"chain-id,omitempty" mapstructure:"chain-id"` // spec chain identifier ApiInterface string `yaml:"api-interface,omitempty" json:"api-interface,omitempty" mapstructure:"api-interface"` Geolocation uint64 `yaml:"geolocation,omitempty" json:"geolocation,omitempty" mapstructure:"geolocation"` }
func (*RPCEndpoint) Key ¶
func (rpce *RPCEndpoint) Key() string
func (*RPCEndpoint) New ¶
func (rpce *RPCEndpoint) New(address string, chainID string, apiInterface string, geolocation uint64) *RPCEndpoint
type RPCProviderEndpoint ¶
type RPCProviderEndpoint struct { NetworkAddress string `yaml:"network-address,omitempty" json:"network-address,omitempty" mapstructure:"network-address"` // IP:PORT ChainID string `yaml:"chain-id,omitempty" json:"chain-id,omitempty" mapstructure:"chain-id"` // spec chain identifier ApiInterface string `yaml:"api-interface,omitempty" json:"api-interface,omitempty" mapstructure:"api-interface"` Geolocation uint64 `yaml:"geolocation,omitempty" json:"geolocation,omitempty" mapstructure:"geolocation"` NodeUrl string `yaml:"node-url,omitempty" json:"node-url,omitempty" mapstructure:"node-url"` }
func (*RPCProviderEndpoint) Key ¶
func (rpcpe *RPCProviderEndpoint) Key() string
type SingleConsumerSession ¶
type SingleConsumerSession struct { CuSum uint64 LatestRelayCu uint64 // set by GetSession cuNeededForSession QoSInfo qoSInfo SessionId int64 Client *ConsumerSessionsWithProvider RelayNum uint64 LatestBlock int64 Endpoint *Endpoint BlockListed bool // if session lost sync we blacklist it. ConsecutiveNumberOfFailures uint64 // number of times this session has failed // contains filtered or unexported fields }
func (*SingleConsumerSession) CalculateQoS ¶
func (*SingleConsumerSession) IsDataReliabilitySession ¶ added in v0.5.1
func (scs *SingleConsumerSession) IsDataReliabilitySession() bool
validate if this is a data reliability session
type SingleProviderSession ¶
type SingleProviderSession struct { CuSum uint64 UniqueIdentifier uint64 Lock sync.RWMutex Proof *pairingtypes.RelayRequest // saves last relay request of a session as proof RelayNum uint64 PairingEpoch uint64 // contains filtered or unexported fields }
func (*SingleProviderSession) GetPairingEpoch ¶
func (r *SingleProviderSession) GetPairingEpoch() uint64
func (*SingleProviderSession) SetPairingEpoch ¶
func (r *SingleProviderSession) SetPairingEpoch(epoch uint64)