Documentation ¶
Index ¶
- Variables
- func ErrorToDelayType(errString string) string
- func ResetMapValues(inputMap sync.Map) sync.Map
- type BaseDelay
- type ConnectionAttemptStatus
- type DelayObject
- type Minus1Delay
- type NegativeDelay
- type NegativeWithHopeDelay
- type NegativeWithNoHopeDelay
- type PeerQueue
- func (c *PeerQueue) AddPeer(pPeer *PrunedPeer)
- func (c *PeerQueue) DelayDistribution() sync.Map
- func (c *PeerQueue) GetPeer(peerID string) (*PrunedPeer, bool)
- func (c *PeerQueue) IsPeerAlready(peerID string) bool
- func (c PeerQueue) Len() int
- func (c PeerQueue) Less(i, j int) bool
- func (c *PeerQueue) SortPeerList()
- func (c *PeerQueue) Swap(i, j int)
- func (c *PeerQueue) UpdatePeerListFromPeerStore(peerstore *db.PeerStore) error
- type PeeringOption
- type PeeringService
- type PeeringStrategy
- type PositiveDelay
- type PrunedPeer
- type PruningStrategy
- func (c *PruningStrategy) AttemptedPeersSinceLastIter() int64
- func (c *PruningStrategy) ControlDistribution() sync.Map
- func (c *PruningStrategy) GetErrorAttemptDistribution() sync.Map
- func (c *PruningStrategy) IterForcingNextConnTime() string
- func (c *PruningStrategy) LastIterTime() float64
- func (c *PruningStrategy) NewConnectionAttempt(connAttStat ConnectionAttemptStatus)
- func (c *PruningStrategy) NewConnectionEvent(connEvent hosts.ConnectionEvent)
- func (c *PruningStrategy) NewIdentificationEvent(newIdent hosts.IdentificationEvent)
- func (c *PruningStrategy) NextPeer()
- func (c *PruningStrategy) Run() chan models.Peer
- func (c PruningStrategy) Type() string
- type TimeoutDelay
- type ZeroDelay
Constants ¶
This section is empty.
Variables ¶
var ( ModuleName = "PEERING" ConnectionRefuseTimeout = 10 * time.Second MaxRetries = 1 DefaultWorkers = 50 )
var ( PrunedErrorDistribution = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "peering", Name: "pruned_error_distribution", Help: "Filter peers in Peer Queue by errors that were tracked", }, []string{"controldist"}, ) ErrorAttemptDistribution = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "peering", Name: "iteration_attempts_by_category_distribution", Help: "Filter attempts in Peer Queue by errors that were tracked", }, []string{"controlAttemptdist"}, ) PeersAttemptedInLastIteration = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "peering", Name: "peers_attempted_last_iteration", Help: "The number of discovered peers with the crawler", }) PeerstoreIterTime = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "peering", Name: "peerstore_iteration_time_secs", Help: "The time that the crawler takes to connect the entire peerstore in secs", }) IterForcingNextConnTime = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "peering", Name: "iteration_forcing_next_conn_time", Help: "The time reported by the peer that forced the new peerstore iteration", }) )
List of metrics that we are going to export
var ( PeerStratModuleName = "PRUNING" // Default Delays DeprecationTime = 1024 * time.Minute // mMinutes after first negative connection that has to pass to deprecate a peer. DefaultNegDelay = 12 * time.Hour // Default delay that will be applied for those deprecated peers. DefaultPossitiveDelay = 6 * time.Hour // Default delay after each positive severe negative attempts. StartExpD = 2 * time.Minute // Starting delay that will serve for the Exponential Delay. // Control variables MinIterTime = 15 * time.Second // Minimum time that has to pass before iterating again. )
var ( // define the types of delays in string PositiveDelayType string = "Positive" NegativeWithHopeDelayType string = "NegativeWithHope" NegativeWithNoHopeDelayType string = "NegativeWithNoHope" ZeroDelayType string = "Zero" Minus1DelayType string = "Minus1" TimeoutDelayType string = "Timeout" MaxDelayTime time.Duration = time.Duration(math.Pow(2, 11) * float64(time.Minute)) // define the initial delay we apply in each of the types InitialDelayTime = map[string]time.Duration{ PositiveDelayType: 128 * time.Minute, NegativeWithHopeDelayType: 2 * time.Minute, NegativeWithNoHopeDelayType: 256 * time.Minute, ZeroDelayType: 0 * time.Hour, Minus1DelayType: -1000 * time.Hour, TimeoutDelayType: 32 * time.Minute, } )
Functions ¶
func ErrorToDelayType ¶
ErrorToDelayType: Transforms an error into a DelayType. @param errString: the string to analyze. @return the categroy type in string format.
Types ¶
type BaseDelay ¶
type BaseDelay struct { DelayDegree int // number of times we have delayed Type string // type of delay we apply (positive, negativewithhope...) }
All of our delay types will include this base, as they all have the same data just the delay calculation is different
func NewBaseDelay ¶
NewBaseDelay Constructor. We use pointers so the methods are directly added to inherited structs. @param inputType: the type of delay we want to set (just string).
func (*BaseDelay) AddDegree ¶
func (bd *BaseDelay) AddDegree()
AddDegree: This method will add 1 to the delaydegree.
type ConnectionAttemptStatus ¶
type ConnectionAttemptStatus struct { Peer models.Peer // TODO: right now just sending the entire info about the peer, (recheck after Peer struct subdivision) Attempts int32 // attemps tried on the given peer Timestamp time.Time // Timestamp of when was the attempt done Successful bool // Whether the connection attempt was successfully done or not RecError error // if the connection attempt reported any error, nil otherwise }
ConnectionAttemptStatus * It is the struct that compiles the data of an active connection attempt done by the host * The struct will be shared between peering and strategy.
type DelayObject ¶
type DelayObject interface { CalculateDelay() time.Duration AddDegree() GetType() string SetDegree(int) GetDegree() int }
Basic Structs
the interface to use and defines which methods should be implemented
func ReturnAccordingDelayObject ¶
func ReturnAccordingDelayObject(delayType string) DelayObject
ReturnAccordingDelayObject @param delayType: string representing a type of delay. @return the according delayobject
type Minus1Delay ¶
type Minus1Delay struct {
*BaseDelay
}
Minus1Delay: Delay type applied to new peers coming from the Discovery5 service. These are always set to be connected the first ones.
func NewMinus1Delay ¶
func NewMinus1Delay() Minus1Delay
func (Minus1Delay) CalculateDelay ¶
func (d Minus1Delay) CalculateDelay() time.Duration
CalculateDelay: This method will calculate the delay to be applied based on degree. @return the delay in Time.Duration format.
type NegativeDelay ¶
type NegativeDelay struct {
*BaseDelay
}
NegativeDelay: Delay type applied to peers that had any sort of error. The delays are exponentially increased. The child clases will apply a different type which only varies the baseDelay time.
func NewNegativeDelay ¶
func NewNegativeDelay(inputType string) *NegativeDelay
func (NegativeDelay) CalculateDelay ¶
func (d NegativeDelay) CalculateDelay() time.Duration
CalculateDelay: This method will calculate the delay to be applied based on degree. @return the delay in Time.Duration format.
type NegativeWithHopeDelay ¶
type NegativeWithHopeDelay struct {
*NegativeDelay
}
NegativeWithHopeDelay: In case of "connection reset by peer", "connection refused", "context deadline exceeded", "dial backoff", "metadata error" and default. Usually peers that have returned and error but could possibly be identified. baseDelay = 2 minutes.
func NewNegativeWithHopeDelay ¶
func NewNegativeWithHopeDelay() NegativeWithHopeDelay
type NegativeWithNoHopeDelay ¶
type NegativeWithNoHopeDelay struct {
*NegativeDelay
}
NegativeWithNoHopeDelay: In case of "no route to host", "unreachable network", "peer id mismatch", "dial to self attempted". Usually peers that have returned and error and are not probably running anymore. baseDelay = 256 minutes.
func NewNegativeWithNoHopeDelay ¶
func NewNegativeWithNoHopeDelay() NegativeWithNoHopeDelay
func NewTimeoutDelay ¶
func NewTimeoutDelay() NegativeWithNoHopeDelay
type PeerQueue ¶
type PeerQueue struct { PeerList []*PrunedPeer PeerMap sync.Map // contains filtered or unexported fields }
PeerQueue: Auxiliar peer array and map list to keep the list of peers sorted by connection time, and still able to modify in a short time the values of each peer.
func NewPeerQueue ¶
func NewPeerQueue() PeerQueue
NewPeerQueue: Constructor of a NewPeerQueue. @return new PeerQueue.
func (*PeerQueue) AddPeer ¶
func (c *PeerQueue) AddPeer(pPeer *PrunedPeer)
AddPeer Add a peer to the peerqueue. @params pPeer: the pruned peer to add
func (*PeerQueue) DelayDistribution ¶
DelayDistribution: @return the distribution of the delays in a map.
func (*PeerQueue) GetPeer ¶
func (c *PeerQueue) GetPeer(peerID string) (*PrunedPeer, bool)
GetPeer: Retrieves the info of the peer requested from args. @params peerID: string of the peerID that we want to find. @return pointer to pruned peer. @return bool, true if exists, false if doesn't.
func (*PeerQueue) IsPeerAlready ¶
IsPeerAlready: Check whether a peer is already in the Queue. @params peerID: string of the peerID that we want to find. @return true is peer is already, false if not.
func (PeerQueue) Len ¶
Len is part of sort.Interface. We use the peer list to get the length of the array.
func (PeerQueue) Less ¶
Less is part of sort.Interface. We use c.PeerList.NextConnection as the value to sort by.
func (*PeerQueue) SortPeerList ¶
func (c *PeerQueue) SortPeerList()
SortPeerList: Sort the PeerQueue array leaving at the beginning the peers with the shorter next peer connection.
func (*PeerQueue) UpdatePeerListFromPeerStore ¶
UpdatePeerListFromPeerStore This method will refresh the peerqueue with the peerstore. Basically we add those peers that did not exist before in the peerqueue. @param peerstore: db where to read from.
type PeeringOption ¶
type PeeringOption func(*PeeringService) error
func WithPeeringStrategy ¶
func WithPeeringStrategy(strategy PeeringStrategy) PeeringOption
type PeeringService ¶
type PeeringService struct { PeerStore *db.PeerStore // Control Flags Timeout time.Duration MaxRetries int // contains filtered or unexported fields }
PeeringService is the main service that will connect peers from the given peerstore and using the given Host. It will use the specified peering strategy, which might difer/change from the testing or desired purposes of the run.
func NewPeeringService ¶
func NewPeeringService( ctx context.Context, h *hosts.BasicLibp2pHost, peerstore *db.PeerStore, opts ...PeeringOption) (PeeringService, error)
Constructor
func (*PeeringService) Run ¶
func (c *PeeringService) Run()
Run: Main peering event selector. For every next peer received from the strategy, attempt the connection and record the status of this one. Notify the strategy of any conn/disconn recorded.
func (*PeeringService) ServeMetrics ¶
func (c *PeeringService) ServeMetrics()
ServeMetrics: This method will serve the global peerstore values to the local prometheus instance.
type PeeringStrategy ¶
type PeeringStrategy interface { // one channel to give the next peer, one to request the second one Run() chan models.Peer Type() string // Peering Strategy interaction NextPeer() NewConnectionAttempt(ConnectionAttemptStatus) NewConnectionEvent(hosts.ConnectionEvent) NewIdentificationEvent(hosts.IdentificationEvent) // Prometheus Export Calls LastIterTime() float64 IterForcingNextConnTime() string AttemptedPeersSinceLastIter() int64 ControlDistribution() sync.Map GetErrorAttemptDistribution() sync.Map }
Strategy is the common interface the any desired Peering Strategy should follow TODO: -Still waiting to be defined to make it official
type PositiveDelay ¶
type PositiveDelay struct {
*BaseDelay // include it as pointer to have the methods added directly
}
func NewPositiveDelay ¶
func NewPositiveDelay() PositiveDelay
NewPositiveDelay: Constructor. @return a PositiveDelay object.
func (PositiveDelay) CalculateDelay ¶
func (d PositiveDelay) CalculateDelay() time.Duration
CalculateDelay: This method will calculate the delay to be applied based on degree. @return the delay in Time.Duration format.
type PrunedPeer ¶
type PrunedPeer struct { PeerID string DelayObj DelayObject // define the delay to connect based on error BaseConnectionTimestamp time.Time // define the first event. To calculate the next connection we sum this with delay. BaseDeprecationTimestamp time.Time // this + DeprecationTime defines when we are ready to deprecate }
TODO: think about includint a sync.RWMutex in case we upgrade to workers
func NewPrunedPeer ¶
func NewPrunedPeer(peerID string, inputType string) *PrunedPeer
func (*PrunedPeer) ConnEventHandler ¶
func (c *PrunedPeer) ConnEventHandler(recErr string) string
RecErrorHandler: Function that selects actuation method for each of the possible errors while actively dialing peers. @params peerID in string format, recorded error in string format.
func (*PrunedPeer) Deprecable ¶
func (c *PrunedPeer) Deprecable() bool
Deprecable: This method evaluates if the peer is in time to be deprecated. @return true (in time to be deprecated) / false (not ready to be deprecated).
func (*PrunedPeer) IsReadyForConnection ¶
func (c *PrunedPeer) IsReadyForConnection() bool
IsReadyForConnection: This method evaluates if the given peer is ready to be connected. @return True of False if we are in time to connect or not.
func (*PrunedPeer) NextConnection ¶
func (c *PrunedPeer) NextConnection() time.Time
func (*PrunedPeer) UpdateDelay ¶
func (c *PrunedPeer) UpdateDelay(newDelayType string)
NewEvent: This method will reevaluate the delay in case of a new Positive or NegativeDelay happenned
type PruningStrategy ¶
type PruningStrategy struct { PeerStore *db.PeerStore // List of peers sorted by the amount of time thatwe have to wait PeerQueue PeerQueue PeerQueueIterations int ErrorAttemptDistribution sync.Map // contains filtered or unexported fields }
Pruning Strategy is a Peering Strategy that applies penalties to peers that haven't shown activity when attempting to connect them. Combined with the Deprecated flag in the models.Peer struct, it produces more accurate metrics when exporting pruning peers that are no longer active.
func NewPruningStrategy ¶
func NewPruningStrategy(ctx context.Context, network string, peerstore *db.PeerStore) (PruningStrategy, error)
NewPruningStrategy: Pruning strategy constructor, that will offer a models.Peer stream for the peering service. The provided models.Peer stream are ready to connect. @param ctx: parent context. @param peerstore: db.PeerStore. @param opts: base and logging option. @return peering strategy interface with the prunning service. @return error.
func (*PruningStrategy) AttemptedPeersSinceLastIter ¶
func (c *PruningStrategy) AttemptedPeersSinceLastIter() int64
func (*PruningStrategy) ControlDistribution ¶
func (c *PruningStrategy) ControlDistribution() sync.Map
func (*PruningStrategy) GetErrorAttemptDistribution ¶
func (c *PruningStrategy) GetErrorAttemptDistribution() sync.Map
func (*PruningStrategy) IterForcingNextConnTime ¶
func (c *PruningStrategy) IterForcingNextConnTime() string
func (*PruningStrategy) LastIterTime ¶
func (c *PruningStrategy) LastIterTime() float64
LastIterTime @return the lastiteration time of the peerqueue
func (*PruningStrategy) NewConnectionAttempt ¶
func (c *PruningStrategy) NewConnectionAttempt(connAttStat ConnectionAttemptStatus)
NewConnectionAttempt: Notifies the peerstore iterator that a new ConnStatus has been received. After it, the peerstore iterator will aggregate the extra info. @param connAttStat: the object containing the data from the attempt
func (*PruningStrategy) NewConnectionEvent ¶
func (c *PruningStrategy) NewConnectionEvent(connEvent hosts.ConnectionEvent)
NewConnectionEvent: Notifies the peerstore iterator that a new Connection has been received. It puts the connection metadata in the connNot channel to let the select loop all the metadata of the received connection.
func (*PruningStrategy) NewIdentificationEvent ¶
func (c *PruningStrategy) NewIdentificationEvent(newIdent hosts.IdentificationEvent)
NewIdentificationEvent: This method will insert a new identification item in the identificationeventnorifier channel. @param newIdent: the object containing data about the event.
func (*PruningStrategy) NextPeer ¶
func (c *PruningStrategy) NextPeer()
NextPeer: Notifies the peerstore iterator that a new peer has been requested. After it, the peerstore iterator will put the new peer in the PeerStreamChan.
func (*PruningStrategy) Run ¶
func (c *PruningStrategy) Run() chan models.Peer
Run: Initializes the models.Peer stream on the returning models.Peer chan stores locally an auxiliary map wuth an array that will keep track of the next connection time. @return models.Peer channel with the next peer to connect.
func (PruningStrategy) Type ¶
func (c PruningStrategy) Type() string
Type: Returns the strategy type that has been set. @return string with the name of the pruning strategy.
type TimeoutDelay ¶
type TimeoutDelay struct {
*NegativeDelay
}
TimeoutDelay In case of "i/o timeout" Only peers that have returned a timeout error. baseDelay = 16 minutes.
type ZeroDelay ¶
type ZeroDelay struct {
*BaseDelay
}
ZeroDelay: It could be applied to specific error cases where we apply a delay of 0 minutes.
func NewZeroDelay ¶
func NewZeroDelay() ZeroDelay
func (ZeroDelay) CalculateDelay ¶
CalculateDelay: This method will calculate the delay to be applied based on degree. @return the delay in Time.Duration format.