Documentation ¶
Index ¶
- func StatScaleToTime(r float64) time.Duration
- func TimeToStatScale(d time.Duration) float64
- type FillSet
- type NodeValueTracker
- type PrivateClientAPI
- func (api *PrivateClientAPI) Distribution(nodeStr string, normalized bool) (RtDistribution, error)
- func (api *PrivateClientAPI) RequestStats() []RequestStatsItem
- func (api *PrivateClientAPI) Timeout(nodeStr string, failRate float64) (float64, error)
- func (api *PrivateClientAPI) Value(nodeStr string, timeout float64) (float64, error)
- type QueryFunc
- type QueueIterator
- type RequestInfo
- type RequestStatsItem
- type ResponseTimeStats
- func (rt *ResponseTimeStats) Add(respTime time.Duration, weight float64, expFactor utils.ExpirationFactor)
- func (rt *ResponseTimeStats) AddStats(s *ResponseTimeStats)
- func (rt *ResponseTimeStats) DecodeRLP(s *rlp.Stream) error
- func (rt ResponseTimeStats) Distribution(normalized bool, expFactor utils.ExpirationFactor) (res RtDistribution)
- func (rt *ResponseTimeStats) EncodeRLP(w io.Writer) error
- func (rt *ResponseTimeStats) SubStats(s *ResponseTimeStats)
- func (rt ResponseTimeStats) Timeout(failRatio float64) time.Duration
- func (rt ResponseTimeStats) Value(weights ResponseTimeWeights, expFactor utils.ExpirationFactor) float64
- type ResponseTimeWeights
- type RtDistribution
- type ServedRequest
- type ServerPool
- func (s *ServerPool) API() *PrivateClientAPI
- func (s *ServerPool) AddMetrics(...)
- func (s *ServerPool) AddSource(source enode.Iterator)
- func (s *ServerPool) DialNode(n *enode.Node) *enode.Node
- func (s *ServerPool) GetTimeout() time.Duration
- func (s *ServerPool) Persist(n *enode.Node)
- func (s *ServerPool) RegisterNode(node *enode.Node) (*NodeValueTracker, error)
- func (s *ServerPool) Start()
- func (s *ServerPool) Stop()
- func (s *ServerPool) UnregisterNode(node *enode.Node)
- type ValueTracker
- func (vt *ValueTracker) GetNode(id enode.ID) *NodeValueTracker
- func (vt *ValueTracker) Register(id enode.ID) *NodeValueTracker
- func (vt *ValueTracker) RequestStats() []RequestStatsItem
- func (vt *ValueTracker) RtStats() ResponseTimeStats
- func (vt *ValueTracker) StatsExpFactor() utils.ExpirationFactor
- func (vt *ValueTracker) StatsExpirer() *utils.Expirer
- func (vt *ValueTracker) Stop()
- func (vt *ValueTracker) Unregister(id enode.ID)
- type WrsIterator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StatScaleToTime ¶
StatScaleToTime converts a distribution vector index to a response time. The index is represented by a float64 so that linear interpolation can be applied.
func TimeToStatScale ¶
TimeToStatScale converts a response time to a distribution vector index. The index is represented by a float64 so that linear interpolation can be applied.
Types ¶
type FillSet ¶
type FillSet struct {
// contains filtered or unexported fields
}
FillSet tries to read nodes from an input iterator and add them to a node set by setting the specified node state flag(s) until the size of the set reaches the target. Note that other mechanisms (like other FillSet instances reading from different inputs) can also set the same flag(s) and FillSet will always care about the total number of nodes having those flags.
func NewFillSet ¶
func NewFillSet(ns *nodestate.NodeStateMachine, input enode.Iterator, flags nodestate.Flags) *FillSet
NewFillSet creates a new FillSet
func (*FillSet) Close ¶
func (fs *FillSet) Close()
Close shuts FillSet down and closes the input iterator
func (*FillSet) SetTarget ¶
SetTarget sets the current target for node set size. If the previous target was not reached and FillSet was still waiting for the next node from the input then the next incoming node will be added to the set regardless of the target. This ensures that all nodes coming from the input are eventually added to the set.
type NodeValueTracker ¶
type NodeValueTracker struct {
// contains filtered or unexported fields
}
NodeValueTracker collects service value statistics for a specific server node
func (*NodeValueTracker) RtStats ¶
func (nv *NodeValueTracker) RtStats() ResponseTimeStats
RtStats returns the node's own response time distribution statistics
func (*NodeValueTracker) Served ¶
func (nv *NodeValueTracker) Served(reqs []ServedRequest, respTime time.Duration)
Served adds a served request to the node's statistics. An actual request may be composed of one or more request types (service vector indices).
func (*NodeValueTracker) UpdateCosts ¶
func (nv *NodeValueTracker) UpdateCosts(reqCosts []uint64)
UpdateCosts updates the node value tracker's request cost table
type PrivateClientAPI ¶
type PrivateClientAPI struct {
// contains filtered or unexported fields
}
PrivateClientAPI implements the vflux client side API
func NewPrivateClientAPI ¶
func NewPrivateClientAPI(vt *ValueTracker) *PrivateClientAPI
NewPrivateClientAPI creates a PrivateClientAPI
func (*PrivateClientAPI) Distribution ¶
func (api *PrivateClientAPI) Distribution(nodeStr string, normalized bool) (RtDistribution, error)
Distribution returns a distribution as a series of (X, Y) chart coordinates, where the X axis is the response time in seconds while the Y axis is the amount of service value received with a response time close to the X coordinate. The distribution is optionally normalized to a sum of 1. If nodeStr == "" then the global distribution is returned, otherwise the individual distribution of the specified server node.
func (*PrivateClientAPI) RequestStats ¶
func (api *PrivateClientAPI) RequestStats() []RequestStatsItem
RequestStats returns the current contents of the reference request basket, with request values meaning average per request rather than total.
func (*PrivateClientAPI) Timeout ¶
func (api *PrivateClientAPI) Timeout(nodeStr string, failRate float64) (float64, error)
Timeout suggests a timeout value based on either the global distribution or the distribution of the specified node. The parameter is the desired rate of timeouts assuming a similar distribution in the future. Note that the actual timeout should have a sensible minimum bound so that operating under ideal working conditions for a long time (for example, using a local server with very low response times) will not make it very hard for the system to accommodate longer response times in the future.
type QueryFunc ¶
queryFunc sends a pre-negotiation query and blocks until a response arrives or timeout occurs. It returns 1 if the remote node has confirmed that connection is possible, 0 if not possible and -1 if no response arrived (timeout).
type QueueIterator ¶
type QueueIterator struct {
// contains filtered or unexported fields
}
QueueIterator returns nodes from the specified selectable set in the same order as they entered the set.
func NewQueueIterator ¶
func NewQueueIterator(ns *nodestate.NodeStateMachine, requireFlags, disableFlags nodestate.Flags, fifo bool, waitCallback func(bool)) *QueueIterator
NewQueueIterator creates a new QueueIterator. Nodes are selectable if they have all the required and none of the disabled flags set. When a node is selected the selectedFlag is set which also disables further selectability until it is removed or times out.
func (*QueueIterator) Next ¶
func (qi *QueueIterator) Next() bool
Next moves to the next selectable node.
func (*QueueIterator) Node ¶
func (qi *QueueIterator) Node() *enode.Node
Node returns the current node.
type RequestInfo ¶
type RequestInfo struct { // Name identifies the request type and is used for re-mapping the service vector if necessary Name string // InitAmount and InitValue are used to initialize the reference basket InitAmount, InitValue float64 }
RequestInfo is an initializer structure for the service vector.
type RequestStatsItem ¶
type ResponseTimeStats ¶
type ResponseTimeStats struct {
// contains filtered or unexported fields
}
ResponseTimeStats is the response time distribution of a set of answered requests, weighted with request value, either served by a single server or aggregated for multiple servers. It it a fixed length (timeStatLength) distribution vector with linear interpolation. The X axis (the time values) are not linear, they should be transformed with TimeToStatScale and StatScaleToTime.
func (*ResponseTimeStats) Add ¶
func (rt *ResponseTimeStats) Add(respTime time.Duration, weight float64, expFactor utils.ExpirationFactor)
Add adds a new response time with the given weight to the distribution.
func (*ResponseTimeStats) AddStats ¶
func (rt *ResponseTimeStats) AddStats(s *ResponseTimeStats)
AddStats adds the given ResponseTimeStats to the current one.
func (*ResponseTimeStats) DecodeRLP ¶
func (rt *ResponseTimeStats) DecodeRLP(s *rlp.Stream) error
DecodeRLP implements rlp.Decoder
func (ResponseTimeStats) Distribution ¶
func (rt ResponseTimeStats) Distribution(normalized bool, expFactor utils.ExpirationFactor) (res RtDistribution)
Distribution returns a RtDistribution, optionally normalized to a sum of 1.
func (*ResponseTimeStats) EncodeRLP ¶
func (rt *ResponseTimeStats) EncodeRLP(w io.Writer) error
EncodeRLP implements rlp.Encoder
func (*ResponseTimeStats) SubStats ¶
func (rt *ResponseTimeStats) SubStats(s *ResponseTimeStats)
SubStats subtracts the given ResponseTimeStats from the current one.
func (ResponseTimeStats) Timeout ¶
func (rt ResponseTimeStats) Timeout(failRatio float64) time.Duration
Timeout suggests a timeout value based on the previous distribution. The parameter is the desired rate of timeouts assuming a similar distribution in the future. Note that the actual timeout should have a sensible minimum bound so that operating under ideal working conditions for a long time (for example, using a local server with very low response times) will not make it very hard for the system to accommodate longer response times in the future.
func (ResponseTimeStats) Value ¶
func (rt ResponseTimeStats) Value(weights ResponseTimeWeights, expFactor utils.ExpirationFactor) float64
Value calculates the total service value based on the given distribution, using the specified weight function.
type ResponseTimeWeights ¶
type ResponseTimeWeights [timeStatLength]float64
ResponseTimeStats is the response time distribution of a set of answered requests, weighted with request value, either served by a single server or aggregated for multiple servers. It it a fixed length (timeStatLength) distribution vector with linear interpolation. The X axis (the time values) are not linear, they should be transformed with TimeToStatScale and StatScaleToTime.
func TimeoutWeights ¶
func TimeoutWeights(timeout time.Duration) (res ResponseTimeWeights)
TimeoutWeights calculates the weight function used for calculating service value based on the response time distribution of the received service. It is based on the request timeout value of the system. It consists of a half cosine function starting with 1, crossing zero at timeout and reaching -1 at 2*timeout. After 2*timeout the weight is constant -1.
type RtDistribution ¶
type RtDistribution [timeStatLength][2]float64
RtDistribution represents a distribution as a series of (X, Y) chart coordinates, where the X axis is the response time in seconds while the Y axis is the amount of service value received with a response time close to the X coordinate.
type ServedRequest ¶
type ServedRequest struct {
ReqType, Amount uint32
}
type ServerPool ¶
type ServerPool struct {
// contains filtered or unexported fields
}
ServerPool provides a node iterator for dial candidates. The output is a mix of newly discovered nodes, a weighted random selection of known (previously valuable) nodes and trusted/paid nodes.
func NewServerPool ¶
func NewServerPool(db czzdb.KeyValueStore, dbKey []byte, mixTimeout time.Duration, query QueryFunc, clock mclock.Clock, trustedURLs []string, requestList []RequestInfo) (*ServerPool, enode.Iterator)
NewServerPool creates a new server pool
func (*ServerPool) API ¶
func (s *ServerPool) API() *PrivateClientAPI
API returns the vflux client API
func (*ServerPool) AddMetrics ¶
func (s *ServerPool) AddMetrics( suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge metrics.Gauge, sessionValueMeter, serverDialedMeter metrics.Meter)
AddMetrics adds metrics to the server pool. Should be called before Start().
func (*ServerPool) AddSource ¶
func (s *ServerPool) AddSource(source enode.Iterator)
AddSource adds a node discovery source to the server pool (should be called before start)
func (*ServerPool) DialNode ¶
func (s *ServerPool) DialNode(n *enode.Node) *enode.Node
DialNode replaces the given enode with a locally generated one containing the ENR stored in the sfiLocalAddress field if present. This workaround ensures that nodes on the local network can be dialed at the local address if a connection has been successfully established previously. Note that NodeStateMachine always remembers the enode with the latest version of the remote signed ENR. ENR filtering should be performed on that version while dialNode should be used for dialing the node over TCP or UDP.
func (*ServerPool) GetTimeout ¶
func (s *ServerPool) GetTimeout() time.Duration
GetTimeout returns the recommended request timeout.
func (*ServerPool) Persist ¶
func (s *ServerPool) Persist(n *enode.Node)
Persist immediately stores the state of a node in the node database
func (*ServerPool) RegisterNode ¶
func (s *ServerPool) RegisterNode(node *enode.Node) (*NodeValueTracker, error)
RegisterNode implements serverPeerSubscriber
func (*ServerPool) Start ¶
func (s *ServerPool) Start()
start starts the server pool. Note that NodeStateMachine should be started first.
func (*ServerPool) UnregisterNode ¶
func (s *ServerPool) UnregisterNode(node *enode.Node)
UnregisterNode implements serverPeerSubscriber
type ValueTracker ¶
type ValueTracker struct {
// contains filtered or unexported fields
}
ValueTracker coordinates service value calculation for individual servers and updates global statistics
func NewValueTracker ¶
func NewValueTracker(db czzdb.KeyValueStore, clock mclock.Clock, reqInfo []RequestInfo, updatePeriod time.Duration, transferRate, statsExpRate, offlineExpRate float64) *ValueTracker
NewValueTracker creates a new ValueTracker and loads its previously saved state from the database if possible.
func (*ValueTracker) GetNode ¶
func (vt *ValueTracker) GetNode(id enode.ID) *NodeValueTracker
GetNode returns an individual server node's value tracker. If it did not exist before then a new node is created.
func (*ValueTracker) Register ¶
func (vt *ValueTracker) Register(id enode.ID) *NodeValueTracker
Register adds a server node to the value tracker
func (*ValueTracker) RequestStats ¶
func (vt *ValueTracker) RequestStats() []RequestStatsItem
RequestStats returns the current contents of the reference request basket, with request values meaning average per request rather than total.
func (*ValueTracker) RtStats ¶
func (vt *ValueTracker) RtStats() ResponseTimeStats
RtStats returns the global response time distribution statistics
func (*ValueTracker) StatsExpFactor ¶
func (vt *ValueTracker) StatsExpFactor() utils.ExpirationFactor
StatsExpirer returns the current expiration factor so that other values can be expired with the same rate as the service value statistics.
func (*ValueTracker) StatsExpirer ¶
func (vt *ValueTracker) StatsExpirer() *utils.Expirer
StatsExpirer returns the statistics expirer so that other values can be expired with the same rate as the service value statistics.
func (*ValueTracker) Stop ¶
func (vt *ValueTracker) Stop()
Stop saves the value tracker's state and each loaded node's individual state and returns after shutting the internal goroutines down.
func (*ValueTracker) Unregister ¶
func (vt *ValueTracker) Unregister(id enode.ID)
Unregister removes a server node from the value tracker
type WrsIterator ¶
type WrsIterator struct {
// contains filtered or unexported fields
}
WrsIterator returns nodes from the specified selectable set with a weighted random selection. Selection weights are provided by a callback function.
func NewWrsIterator ¶
func NewWrsIterator(ns *nodestate.NodeStateMachine, requireFlags, disableFlags nodestate.Flags, weightField nodestate.Field) *WrsIterator
NewWrsIterator creates a new WrsIterator. Nodes are selectable if they have all the required and none of the disabled flags set. When a node is selected the selectedFlag is set which also disables further selectability until it is removed or times out.