lavasession

package
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2023 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxConsecutiveConnectionAttempts                 = 10
	TimeoutForEstablishingAConnection                = 1 * time.Second
	MaxSessionsAllowedPerProvider                    = 1000 // Max number of sessions allowed per provider
	MaxAllowedBlockListedSessionPerProvider          = 3
	MaximumNumberOfFailuresAllowedPerConsumerSession = 3
	RelayNumberIncrement                             = 1
	DataReliabilitySessionId                         = 0 // data reliability session id is 0. we can change to more sessions later if needed.
	DataReliabilityRelayNumber                       = 1
	DataReliabilityCuSum                             = 0
	GeolocationFlag                                  = "geolocation"
	TendermintUnsubscribeAll                         = "unsubscribe_all"
	IndexNotFound                                    = -15
	AverageWorldLatency                              = 300 * time.Millisecond
	MinValidAddressesForBlockingProbing              = 2
	BACKOFF_TIME_ON_FAILURE                          = 3 * time.Second
)
View Source
const (
	PercentileToCalculateLatency = 0.9
	MinProvidersForSync          = 0.6
	LatencyThresholdStatic       = 1 * time.Second
	LatencyThresholdSlope        = 1 * time.Millisecond
	StaleEpochDistance           = 3 // relays done 3 epochs back are ready to be rewarded

)
View Source
const (
	TRY_LOCK_ATTEMPTS = 30
)

Variables

View Source
var (
	PairingListEmptyError                                = sdkerrors.New("pairingListEmpty Error", 665, "No pairings available.") // client could not connect to any provider.
	UnreachableCodeError                                 = sdkerrors.New("UnreachableCode Error", 666, "Should not get here.")
	AllProviderEndpointsDisabledError                    = sdkerrors.New("AllProviderEndpointsDisabled Error", 667, "All endpoints are not available.") // a provider is completely unresponsive all endpoints are not available
	MaximumNumberOfSessionsExceededError                 = sdkerrors.New("MaximumNumberOfSessionsExceeded Error", 668, "Provider reached maximum number of active sessions.")
	MaxComputeUnitsExceededError                         = sdkerrors.New("MaxComputeUnitsExceeded Error", 669, "Consumer is trying to exceed the maximum number of compute uints available.")
	EpochMismatchError                                   = sdkerrors.New("ReportingAnOldEpoch Error", 670, "Tried to Report to an older epoch")
	AddressIndexWasNotFoundError                         = sdkerrors.New("AddressIndexWasNotFound Error", 671, "address index was not found in list")
	LockMisUseDetectedError                              = sdkerrors.New("LockMisUseDetected Error", 672, "Faulty use of locks detected")
	SessionIsAlreadyBlockListedError                     = sdkerrors.New("SessionIsAlreadyBlockListed Error", 673, "Session is already in block list")
	NegativeComputeUnitsAmountError                      = sdkerrors.New("NegativeComputeUnitsAmount", 674, "Tried to subtract to negative compute units amount")
	ReportAndBlockProviderError                          = sdkerrors.New("ReportAndBlockProvider Error", 675, "Report and block the provider")
	BlockProviderError                                   = sdkerrors.New("BlockProvider Error", 676, "Block the provider")
	SessionOutOfSyncError                                = sdkerrors.New("SessionOutOfSync Error", 677, "Session went out of sync with the provider")
	MaximumNumberOfBlockListedSessionsError              = sdkerrors.New("MaximumNumberOfBlockListedSessions Error", 678, "Provider reached maximum number of block listed sessions.")
	SendRelayError                                       = sdkerrors.New("SendRelay Error", 679, "Failed To Send Relay")
	DataReliabilityIndexRequestedIsOriginalProviderError = sdkerrors.New("DataReliabilityIndexRequestedIsOriginalProvider Error", 680, "Data reliability session index belongs to the original provider")
	DataReliabilityIndexOutOfRangeError                  = sdkerrors.New("DataReliabilityIndexOutOfRange Error", 681, "Trying to get provider index out of range")
	DataReliabilityAlreadySentThisEpochError             = sdkerrors.New("DataReliabilityAlreadySentThisEpoch Error", 682, "Trying to send data reliability more than once per provider per epoch")
	FailedToConnectToEndPointForDataReliabilityError     = sdkerrors.New("FailedToConnectToEndPointForDataReliability Error", 683, "Failed to connect to a providers endpoints")
	DataReliabilityEpochMismatchError                    = sdkerrors.New("DataReliabilityEpochMismatch Error", 684, "Data reliability epoch mismatch original session epoch.")
	NoDataReliabilitySessionWasCreatedError              = sdkerrors.New("NoDataReliabilitySessionWasCreated Error", 685, "No Data reliability session was created")
)
View Source
var (
	InvalidEpochError                                = sdkerrors.New("InvalidEpoch Error", 881, "Requested Epoch Is Too Old")
	NewSessionWithRelayNumError                      = sdkerrors.New("NewSessionWithRelayNum Error", 882, "Requested Session With Relay Number Is Invalid")
	ConsumerIsBlockListed                            = sdkerrors.New("ConsumerIsBlockListed Error", 883, "This Consumer Is Blocked.")
	ConsumerNotRegisteredYet                         = sdkerrors.New("ConsumerNotActive Error", 884, "This Consumer Is Not Currently In The Pool.")
	SessionDoesNotExist                              = sdkerrors.New("SessionDoesNotExist Error", 885, "This Session Id Does Not Exist.")
	MaximumCULimitReachedByConsumer                  = sdkerrors.New("MaximumCULimitReachedByConsumer Error", 886, "Consumer reached maximum cu limit")
	ProviderConsumerCuMisMatch                       = sdkerrors.New("ProviderConsumerCuMisMatch Error", 887, "Provider and Consumer disagree on total cu for session")
	RelayNumberMismatch                              = sdkerrors.New("RelayNumberMismatch Error", 888, "Provider and Consumer disagree on relay number for session")
	SubscriptionInitiationError                      = sdkerrors.New("SubscriptionInitiationError Error", 889, "Provider failed initiating subscription")
	EpochIsNotRegisteredError                        = sdkerrors.New("EpochIsNotRegisteredError Error", 890, "Epoch is not registered in provider session manager")
	ConsumerIsNotRegisteredError                     = sdkerrors.New("ConsumerIsNotRegisteredError Error", 891, "Consumer is not registered in provider session manager")
	SubscriptionAlreadyExistsError                   = sdkerrors.New("SubscriptionAlreadyExists Error", 892, "Subscription already exists in single provider session")
	DataReliabilitySessionAlreadyUsedError           = sdkerrors.New("DataReliabilitySessionAlreadyUsed Error", 893, "Data Reliability Session already used by this consumer in this epoch")
	DataReliabilityCuSumMisMatchError                = sdkerrors.New("DataReliabilityCuSumMisMatch Error", 894, "Data Reliability Cu sum mismatch error")
	DataReliabilityRelayNumberMisMatchError          = sdkerrors.New("DataReliabilityRelayNumberMisMatch Error", 895, "Data Reliability RelayNumber mismatch error")
	SubscriptionPointerIsNilError                    = sdkerrors.New("SubscriptionPointerIsNil Error", 896, "Trying to unsubscribe a nil pointer.")
	CouldNotFindIndexAsConsumerNotYetRegisteredError = sdkerrors.New("CouldNotFindIndexAsConsumerNotYetRegistered Error", 897, "fetching provider index from psm failed")
	ProviderIndexMisMatchError                       = sdkerrors.New("ProviderIndexMisMatch Error", 898, "provider index mismatch")
	SessionIdNotFoundError                           = sdkerrors.New("SessionIdNotFound Error", 899, "Session Id not found")
)
View Source
var AvailabilityPercentage sdk.Dec = sdk.NewDecWithPrec(5, 2) // TODO move to params pairing

Functions

func IsEpochValidForUse

func IsEpochValidForUse(targetEpoch uint64, blockedEpochHeight uint64) bool

Types

type ConsumerSessionManager

type ConsumerSessionManager struct {
	// contains filtered or unexported fields
}

created with NewConsumerSessionManager

func NewConsumerSessionManager

func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint, providerOptimizer ProviderOptimizer) *ConsumerSessionManager

func (*ConsumerSessionManager) GetAtomicPairingAddressesLength

func (csm *ConsumerSessionManager) GetAtomicPairingAddressesLength() uint64

Atomically read csm.pairingAddressesLength for data reliability.

func (*ConsumerSessionManager) GetDataReliabilitySession added in v0.5.1

func (csm *ConsumerSessionManager) GetDataReliabilitySession(ctx context.Context, originalProviderAddress string, index int64, sessionEpoch uint64) (singleConsumerSession *SingleConsumerSession, providerAddress string, epoch uint64, err error)

Get a Data Reliability Session

func (*ConsumerSessionManager) GetReportedProviders

func (csm *ConsumerSessionManager) GetReportedProviders(epoch uint64) ([]byte, error)

Get the reported providers currently stored in the session manager.

func (*ConsumerSessionManager) GetSession added in v0.5.1

func (csm *ConsumerSessionManager) GetSession(ctx context.Context, cuNeededForSession uint64, initUnwantedProviders map[string]struct{}) (
	consumerSession *SingleConsumerSession, epoch uint64, providerPublicAddress string, reportedProviders []byte, errRet error,
)

GetSession will return a ConsumerSession, given cu needed for that session. The user can also request specific providers to not be included in the search for a session.

func (*ConsumerSessionManager) GetSessionFromAllExcept added in v0.5.1

func (csm *ConsumerSessionManager) GetSessionFromAllExcept(ctx context.Context, bannedAddresses map[string]struct{}, cuNeeded uint64, bannedAddressesEpoch uint64) (consumerSession *SingleConsumerSession, epoch uint64, providerPublicAddress string, reportedProviders []byte, err error)

get a session from the pool except specific providers, which also validates the epoch.

func (*ConsumerSessionManager) OnDataReliabilitySessionDone added in v0.5.1

func (csm *ConsumerSessionManager) OnDataReliabilitySessionDone(consumerSession *SingleConsumerSession,
	latestServicedBlock int64,
	specComputeUnits uint64,
	currentLatency time.Duration,
	expectedLatency time.Duration,
	expectedBH int64,
	numOfProviders int,
	providersCount uint64,
) error

On a successful DataReliability session we don't need to increase and update any field, we just need to unlock the session.

func (*ConsumerSessionManager) OnDataReliabilitySessionFailure added in v0.5.1

func (csm *ConsumerSessionManager) OnDataReliabilitySessionFailure(consumerSession *SingleConsumerSession, errorReceived error) error

On a failed DataReliability session we don't decrease the cu unlike a normal session, we just unlock and verify if we need to block this session or provider.

func (*ConsumerSessionManager) OnSessionDone

func (csm *ConsumerSessionManager) OnSessionDone(
	consumerSession *SingleConsumerSession,
	epoch uint64,
	latestServicedBlock int64,
	specComputeUnits uint64,
	currentLatency time.Duration,
	expectedLatency time.Duration,
	expectedBH int64,
	numOfProviders int,
	providersCount uint64,
) error

On a successful session this function will update all necessary fields in the consumerSession. and unlock it when it finishes

func (*ConsumerSessionManager) OnSessionDoneIncreaseCUOnly

func (csm *ConsumerSessionManager) OnSessionDoneIncreaseCUOnly(consumerSession *SingleConsumerSession) error

On a successful Subscribe relay

func (*ConsumerSessionManager) OnSessionFailure

func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsumerSession, errorReceived error) error

Report session failure, mark it as blocked from future usages, report if timeout happened.

func (*ConsumerSessionManager) OnSessionUnUsed added in v0.5.1

func (csm *ConsumerSessionManager) OnSessionUnUsed(consumerSession *SingleConsumerSession) error

A Session can be created but unused if consumer found the response in the cache. So we need to unlock the session and decrease the cu that were applied

func (*ConsumerSessionManager) RPCEndpoint

func (csm *ConsumerSessionManager) RPCEndpoint() RPCEndpoint

func (*ConsumerSessionManager) UpdateAllProviders

func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList map[uint64]*ConsumerSessionsWithProvider) error

Update the provider pairing list for the ConsumerSessionManager

type ConsumerSessionsWithProvider

type ConsumerSessionsWithProvider struct {
	Lock              utils.LavaMutex
	PublicLavaAddress string
	Endpoints         []*Endpoint
	Sessions          map[int64]*SingleConsumerSession
	MaxComputeUnits   uint64
	UsedComputeUnits  uint64
	ReliabilitySent   bool
	PairingEpoch      uint64
}

func (*ConsumerSessionsWithProvider) GetPairingEpoch

func (cswp *ConsumerSessionsWithProvider) GetPairingEpoch() uint64

type DataReliabilitySession

type DataReliabilitySession struct {
	SingleConsumerSession *SingleConsumerSession
	Epoch                 uint64
	ProviderPublicAddress string
	UniqueIdentifier      bool
}

type Endpoint

type Endpoint struct {
	NetworkAddress     string // change at the end to NetworkAddress
	Enabled            bool
	Client             *pairingtypes.RelayerClient
	ConnectionRefusals uint64
}

type ProviderOptimizer

type ProviderOptimizer interface {
	AppendRelayData(providerAddress string, latency time.Duration, failure bool)
}

type ProviderSessionManager

type ProviderSessionManager struct {
	// contains filtered or unexported fields
}

func NewProviderSessionManager

func NewProviderSessionManager(rpcProviderEndpoint *RPCProviderEndpoint, numberOfBlocksKeptInMemory uint64) *ProviderSessionManager

Returning a new provider session manager

func (*ProviderSessionManager) GetBlockedEpochHeight

func (psm *ProviderSessionManager) GetBlockedEpochHeight() uint64

func (*ProviderSessionManager) GetDataReliabilitySession added in v0.5.1

func (psm *ProviderSessionManager) GetDataReliabilitySession(address string, epoch uint64, sessionId uint64, relayNumber uint64, selfProviderIndex int64) (*SingleProviderSession, error)

GetDataReliabilitySession fetches a data reliability session

func (*ProviderSessionManager) GetProviderIndexWithConsumer added in v0.9.0

func (psm *ProviderSessionManager) GetProviderIndexWithConsumer(epoch uint64, consumerAddress string) (int64, error)

func (*ProviderSessionManager) GetSession

func (psm *ProviderSessionManager) GetSession(ctx context.Context, address string, epoch uint64, sessionId uint64, relayNumber uint64) (*SingleProviderSession, error)

func (*ProviderSessionManager) IsActiveConsumer added in v0.5.1

func (psm *ProviderSessionManager) IsActiveConsumer(epoch uint64, address string) (providerSessionWithConsumer *ProviderSessionsWithConsumer, err error)

Check if consumer exists and is not blocked, if all is valid return the ProviderSessionsWithConsumer pointer

func (*ProviderSessionManager) IsValidEpoch

func (psm *ProviderSessionManager) IsValidEpoch(epoch uint64) (valid bool)

func (*ProviderSessionManager) OnSessionDone

func (psm *ProviderSessionManager) OnSessionDone(singleProviderSession *SingleProviderSession, relayNumber uint64) (err error)

OnSessionDone unlocks the session gracefully, this happens when session finished successfully

func (*ProviderSessionManager) OnSessionFailure

func (psm *ProviderSessionManager) OnSessionFailure(singleProviderSession *SingleProviderSession, relayNumber uint64) (err error)

OnSessionDone unlocks the session gracefully, this happens when session finished with an error

func (*ProviderSessionManager) ProcessUnsubscribe

func (psm *ProviderSessionManager) ProcessUnsubscribe(apiName string, subscriptionID string, consumerAddress string, epoch uint64) error

func (*ProviderSessionManager) RPCProviderEndpoint

func (psm *ProviderSessionManager) RPCProviderEndpoint() *RPCProviderEndpoint

func (*ProviderSessionManager) RegisterProviderSessionWithConsumer

func (psm *ProviderSessionManager) RegisterProviderSessionWithConsumer(ctx context.Context, consumerAddress string, epoch uint64, sessionId uint64, relayNumber uint64, maxCuForConsumer uint64, selfProviderIndex int64) (*SingleProviderSession, error)

func (*ProviderSessionManager) ReleaseSessionAndCreateSubscription

func (psm *ProviderSessionManager) ReleaseSessionAndCreateSubscription(session *SingleProviderSession, subscription *RPCSubscription, consumerAddress string, epoch uint64, relayNumber uint64) error

func (*ProviderSessionManager) ReportConsumer

func (psm *ProviderSessionManager) ReportConsumer() (address string, epoch uint64, err error)

func (*ProviderSessionManager) SubscriptionEnded

func (psm *ProviderSessionManager) SubscriptionEnded(consumerAddress string, epoch uint64, subscriptionID string)

try to disconnect the subscription incase we got an error. if fails to find assumes it was unsubscribed normally

func (*ProviderSessionManager) UpdateEpoch

func (psm *ProviderSessionManager) UpdateEpoch(epoch uint64)

on a new epoch we are cleaning stale provider data, also we are making sure consumers who are trying to use past data are not capable to

func (*ProviderSessionManager) UpdateSessionCU

func (psm *ProviderSessionManager) UpdateSessionCU(consumerAddress string, epoch uint64, sessionID uint64, newCU uint64) error

Called when the reward server has information on a higher cu proof and usage and this providerSessionsManager needs to sync up on it

type ProviderSessionsEpochData

type ProviderSessionsEpochData struct {
	UsedComputeUnits    uint64
	MaxComputeUnits     uint64
	MissingComputeUnits uint64
}

type ProviderSessionsWithConsumer added in v0.5.1

type ProviderSessionsWithConsumer struct {
	Sessions map[uint64]*SingleProviderSession

	Lock sync.RWMutex
	// contains filtered or unexported fields
}

holds all of the data for a consumer for a certain epoch

func NewProviderSessionsWithConsumer

func NewProviderSessionsWithConsumer(consumerAddr string, epochData *ProviderSessionsEpochData, isDataReliability uint32, selfProviderIndex int64) *ProviderSessionsWithConsumer

func (*ProviderSessionsWithConsumer) GetExistingSession added in v0.5.1

func (pswc *ProviderSessionsWithConsumer) GetExistingSession(ctx context.Context, sessionId uint64) (session *SingleProviderSession, err error)

this function returns the session locked to be used

func (*ProviderSessionsWithConsumer) SafeAddMissingComputeUnits added in v0.9.3

func (pswc *ProviderSessionsWithConsumer) SafeAddMissingComputeUnits(currentMissingCU uint64, allowedThreshold float64) (legitimate bool)

type QoSReport

type QoSReport struct {
	LastQoSReport    *pairingtypes.QualityOfServiceReport
	LatencyScoreList []sdk.Dec
	SyncScoreSum     int64
	TotalSyncScore   int64
	TotalRelays      uint64
	AnsweredRelays   uint64
}

type RPCEndpoint

type RPCEndpoint struct {
	NetworkAddress string `yaml:"network-address,omitempty" json:"network-address,omitempty" mapstructure:"network-address"` // HOST:PORT
	ChainID        string `yaml:"chain-id,omitempty" json:"chain-id,omitempty" mapstructure:"chain-id"`                      // spec chain identifier
	ApiInterface   string `yaml:"api-interface,omitempty" json:"api-interface,omitempty" mapstructure:"api-interface"`
	Geolocation    uint64 `yaml:"geolocation,omitempty" json:"geolocation,omitempty" mapstructure:"geolocation"`
}

func (*RPCEndpoint) Key

func (rpce *RPCEndpoint) Key() string

func (*RPCEndpoint) New

func (rpce *RPCEndpoint) New(address string, chainID string, apiInterface string, geolocation uint64) *RPCEndpoint

func (*RPCEndpoint) String

func (endpoint *RPCEndpoint) String() (retStr string)

type RPCProviderEndpoint

type RPCProviderEndpoint struct {
	NetworkAddress string           `yaml:"network-address,omitempty" json:"network-address,omitempty" mapstructure:"network-address,omitempty"` // HOST:PORT
	ChainID        string           `yaml:"chain-id,omitempty" json:"chain-id,omitempty" mapstructure:"chain-id"`                                // spec chain identifier
	ApiInterface   string           `yaml:"api-interface,omitempty" json:"api-interface,omitempty" mapstructure:"api-interface"`
	Geolocation    uint64           `yaml:"geolocation,omitempty" json:"geolocation,omitempty" mapstructure:"geolocation"`
	NodeUrls       []common.NodeUrl `yaml:"node-urls,omitempty" json:"node-urls,omitempty" mapstructure:"node-urls"`
}

func (*RPCProviderEndpoint) Key

func (rpcpe *RPCProviderEndpoint) Key() string

func (*RPCProviderEndpoint) String

func (endpoint *RPCProviderEndpoint) String() (retStr string)

func (*RPCProviderEndpoint) UrlsString

func (endpoint *RPCProviderEndpoint) UrlsString() string

func (*RPCProviderEndpoint) Validate

func (endpoint *RPCProviderEndpoint) Validate() error

type RPCSubscription

type RPCSubscription struct {
	Id                   string
	Sub                  *rpcclient.ClientSubscription
	SubscribeRepliesChan chan interface{}
}

type SingleConsumerSession

type SingleConsumerSession struct {
	CuSum         uint64
	LatestRelayCu uint64 // set by GetSession cuNeededForSession
	QoSInfo       QoSReport
	SessionId     int64
	Client        *ConsumerSessionsWithProvider

	RelayNum                    uint64
	LatestBlock                 int64
	Endpoint                    *Endpoint
	BlockListed                 bool   // if session lost sync we blacklist it.
	ConsecutiveNumberOfFailures uint64 // number of times this session has failed
	// contains filtered or unexported fields
}

func (*SingleConsumerSession) CalculateExpectedLatency

func (cs *SingleConsumerSession) CalculateExpectedLatency(timeoutGivenToRelay time.Duration) time.Duration

returns the expected latency to a threshold.

func (*SingleConsumerSession) CalculateQoS

func (cs *SingleConsumerSession) CalculateQoS(cu uint64, latency time.Duration, expectedLatency time.Duration, blockHeightDiff int64, numOfProviders int, servicersToCount int64)

func (*SingleConsumerSession) IsDataReliabilitySession added in v0.5.1

func (scs *SingleConsumerSession) IsDataReliabilitySession() bool

validate if this is a data reliability session

type SingleProviderSession

type SingleProviderSession struct {
	CuSum         uint64
	LatestRelayCu uint64
	SessionID     uint64

	RelayNum     uint64
	PairingEpoch uint64
	// contains filtered or unexported fields
}

func (*SingleProviderSession) GetOccupyingGuid

func (sps *SingleProviderSession) GetOccupyingGuid() uint64

func (*SingleProviderSession) GetPairingEpoch

func (sps *SingleProviderSession) GetPairingEpoch() uint64

func (*SingleProviderSession) IsPayingRelay

func (sps *SingleProviderSession) IsPayingRelay() bool

to be used only when locked, otherwise can return wrong values is used to determine if the proof is beneficial and needs to be sent to rewardServer

func (*SingleProviderSession) PrepareDataReliabilitySessionForUsage

func (sps *SingleProviderSession) PrepareDataReliabilitySessionForUsage(relayRequestTotalCU uint64) error

In case the user session is a data reliability we just need to verify that the cusum is the amount agreed between the consumer and the provider

func (*SingleProviderSession) PrepareSessionForUsage

func (sps *SingleProviderSession) PrepareSessionForUsage(ctx context.Context, cuFromSpec uint64, relayRequestTotalCU uint64, allowedThreshold float64) error

func (*SingleProviderSession) SetPairingEpoch

func (sps *SingleProviderSession) SetPairingEpoch(epoch uint64)

func (*SingleProviderSession) VerifyLock

func (sps *SingleProviderSession) VerifyLock() error

Verify the SingleProviderSession is locked when getting to this function, if its not locked throw an error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL