Documentation ¶
Index ¶
- Variables
- func AddAuditToHistory(a *pb.AuditHistory, online bool, auditTime time.Time, ...) error
- func DuplicateAuditHistory(auditHistory *pb.AuditHistory) *pb.AuditHistory
- func MergeAuditHistories(history *pb.AuditHistory, addHistory []*pb.AuditWindow, ...) (trackingPeriodFull bool)
- func RecalculateScore(history *pb.AuditHistory)
- func UpdateReputation(isSuccess bool, alpha, beta, lambda, w float64) (newAlpha, newBeta float64)
- func UpdateReputationMultiple(count int, alpha, beta, lambda, w float64) (newAlpha, newBeta float64)
- type AuditHistoryConfig
- type AuditType
- type CachingDB
- func (cdb *CachingDB) ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates Mutations, config Config, ...) (info *Info, err error)
- func (cdb *CachingDB) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, ...) (err error)
- func (cdb *CachingDB) FlushAll(ctx context.Context) (err error)
- func (cdb *CachingDB) Get(ctx context.Context, nodeID storj.NodeID) (info *Info, err error)
- func (cdb *CachingDB) Manage(ctx context.Context) error
- func (cdb *CachingDB) RequestSync(ctx context.Context, nodeID storj.NodeID) (err error)
- func (cdb *CachingDB) SetNowFunc(timeFunc func() time.Time)
- func (cdb *CachingDB) SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
- func (cdb *CachingDB) UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error)
- func (cdb *CachingDB) Update(ctx context.Context, request UpdateRequest, auditTime time.Time) (info *Info, err error)
- type Config
- type DB
- type Info
- type Mutations
- type Service
- func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, reputation overlay.ReputationStatus, ...) (err error)
- func (service *Service) Close() error
- func (service *Service) FlushNodeInfo(ctx context.Context, nodeID storj.NodeID) (err error)
- func (service *Service) Get(ctx context.Context, nodeID storj.NodeID) (info *Info, err error)
- func (service *Service) TestDisqualifyNode(ctx context.Context, nodeID storj.NodeID, ...) (err error)
- func (service *Service) TestFlushAllNodeInfo(ctx context.Context) (err error)
- func (service *Service) TestSuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
- func (service *Service) TestUnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error)
- type UpdateAuditHistoryResponse
- type UpdateRequest
Constants ¶
This section is empty.
Variables ¶
var ( // Error is the default reputation errs class. Error = errs.Class("reputation") // ErrNodeNotFound is returned if a node does not exist in database. ErrNodeNotFound = errs.Class("node not found") )
Functions ¶
func AddAuditToHistory ¶ added in v1.56.1
func AddAuditToHistory(a *pb.AuditHistory, online bool, auditTime time.Time, config AuditHistoryConfig) error
AddAuditToHistory adds a single online/not-online event to an AuditHistory. If the AuditHistory contains windows that are now outside the tracking period, those windows will be trimmed.
func DuplicateAuditHistory ¶ added in v1.56.1
func DuplicateAuditHistory(auditHistory *pb.AuditHistory) *pb.AuditHistory
DuplicateAuditHistory creates a duplicate (deep copy) of an AuditHistory object.
func MergeAuditHistories ¶ added in v1.57.1
func MergeAuditHistories(history *pb.AuditHistory, addHistory []*pb.AuditWindow, config AuditHistoryConfig) (trackingPeriodFull bool)
MergeAuditHistories merges two audit histories into one, including all windows that are present in either input and summing counts for any windows that appear in _both_ inputs. Any windows that are now outside the tracking period will be trimmed.
The history parameter will be mutated to include the windows passed as addHistory.
Returns true if the number of windows in the new history is the maximum possible for the tracking config.
func RecalculateScore ¶ added in v1.57.1
func RecalculateScore(history *pb.AuditHistory)
RecalculateScore calculates and assigns the Score field in a pb.AuditHistory object. The score is calculated by averaging the online percentage in each window (not including the last).
func UpdateReputation ¶ added in v1.57.1
UpdateReputation uses the Beta distribution model to determine a node's reputation. lambda is the "forgetting factor" which determines how much past info is kept when determining current reputation score. w is the normalization weight that affects how severely new updates affect the current reputation distribution.
func UpdateReputationMultiple ¶ added in v1.57.1
func UpdateReputationMultiple(count int, alpha, beta, lambda, w float64) (newAlpha, newBeta float64)
UpdateReputationMultiple works like UpdateReputation, but applies multiple successive counts of an event type to the alpha and beta measures.
With the arguments as named, applies 'count' successful audits. To apply negative audits, swap the alpha and beta parameters and return values.
WARNING: GREEK LETTER MATH AHEAD
Applying n successful audit results to an initial alpha value of α₀ gives a new α₁ value of:
α₁ = λⁿα₀ + λⁿ⁻¹w + λⁿ⁻²w + ... + λ²w + λw + w
The terms with w are the first n terms of a geometric series with coefficient w and common ratio λ. The closed form formula for the sum of those first n terms is (w(1-λⁿ) / (1-λ)) (https://en.wikipedia.org/wiki/Geometric_series#Closed-form_formula). Adding the initial λⁿα₀ term, we get
α₁ = λⁿα₀ + w(1-λⁿ) / (1-λ)
The formula has the same structure for beta for n _failures_.
β₁ = λⁿβ₀ + w(1-λⁿ) / (1-λ)
For n _failures_,
α₁ = λⁿα₀
For n _successes_,
β₁ = λⁿβ₀
Types ¶
type AuditHistoryConfig ¶
type AuditHistoryConfig struct { WindowSize time.Duration `help:"The length of time spanning a single audit window" releaseDefault:"12h" devDefault:"5m" testDefault:"10m"` TrackingPeriod time.Duration `` /* 127-byte string literal not displayed */ GracePeriod time.Duration `` /* 236-byte string literal not displayed */ OfflineThreshold float64 `` /* 226-byte string literal not displayed */ OfflineDQEnabled bool `` /* 134-byte string literal not displayed */ OfflineSuspensionEnabled bool `help:"whether nodes will be suspended if they have low online score" releaseDefault:"true" devDefault:"true"` }
AuditHistoryConfig is a configuration struct defining time periods and thresholds for penalizing nodes for being offline. It is used for downtime suspension and disqualification.
type AuditType ¶
type AuditType int
AuditType is an enum representing the outcome of a particular audit.
const ( // AuditSuccess represents a successful audit. AuditSuccess AuditType = iota // AuditFailure represents a failed audit. AuditFailure // AuditUnknown represents an audit that resulted in an unknown error from the node. AuditUnknown // AuditOffline represents an audit where a node was offline. AuditOffline )
type CachingDB ¶ added in v1.60.1
type CachingDB struct {
// contains filtered or unexported fields
}
CachingDB acts like a reputation.DB but caches reads and writes, to minimize load on the backing store.
func NewCachingDB ¶ added in v1.60.1
NewCachingDB creates a new CachingDB instance.
func (*CachingDB) ApplyUpdates ¶ added in v1.60.1
func (cdb *CachingDB) ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates Mutations, config Config, now time.Time) (info *Info, err error)
ApplyUpdates applies multiple updates (defined by the updates parameter) to a node's reputations record.
If the node (as represented in the returned info) becomes newly vetted, disqualified, or suspended as a result of these updates, the caller is responsible for updating the records in the overlay to match.
func (*CachingDB) DisqualifyNode ¶ added in v1.60.1
func (cdb *CachingDB) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error)
DisqualifyNode disqualifies a storage node.
func (*CachingDB) FlushAll ¶ added in v1.60.1
FlushAll syncs all pending reputation mutations to the backing store.
func (*CachingDB) Get ¶ added in v1.60.1
Get retrieves the cached *Info record for the given node ID. If the information is not already in the cache, the information is fetched from the backing store.
If an error occurred syncing the entry with the backing store, it will be returned. In this case, the returned value for 'info' might be nil, or it might contain data cached longer than FlushInterval.
func (*CachingDB) Manage ¶ added in v1.60.1
Manage should be run in its own goroutine while a CachingDB is in use. This will schedule database flushes, trying to avoid too much load all at once.
func (*CachingDB) RequestSync ¶ added in v1.60.1
RequestSync requests the managing goroutine to perform a sync of cached info about the specified node to the backing store. This involves applying the cached mutations and resetting the info attribute to match a snapshot of what is in the backing store after the mutations.
func (*CachingDB) SetNowFunc ¶ added in v1.60.1
SetNowFunc supplies a new function to use for determining the current time, for synchronization timing and scheduling purposes. This is frequently useful in test scenarios.
func (*CachingDB) SuspendNodeUnknownAudit ¶ added in v1.60.1
func (cdb *CachingDB) SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
SuspendNodeUnknownAudit suspends a storage node for unknown audits.
func (*CachingDB) UnsuspendNodeUnknownAudit ¶ added in v1.60.1
func (cdb *CachingDB) UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error)
UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
func (*CachingDB) Update ¶ added in v1.60.1
func (cdb *CachingDB) Update(ctx context.Context, request UpdateRequest, auditTime time.Time) (info *Info, err error)
Update applies a single update (one audit outcome) to a node's reputations record.
If the node (as represented in the returned info) becomes newly vetted, disqualified, or suspended as a result of this update, the caller is responsible for updating the records in the overlay to match.
type Config ¶
type Config struct { AuditRepairWeight float64 `help:"weight to apply to audit reputation for total repair reputation calculation" default:"1.0"` AuditUplinkWeight float64 `help:"weight to apply to audit reputation for total uplink reputation calculation" default:"1.0"` AuditLambda float64 `help:"the forgetting factor used to update storage node reputation due to audits" default:"0.999"` AuditWeight float64 `help:"the normalization weight used to calculate the audit SNs reputation" default:"1.0"` AuditDQ float64 `help:"the reputation cut-off for disqualifying SNs based on audit history" default:"0.96"` UnknownAuditLambda float64 `` /* 130-byte string literal not displayed */ UnknownAuditDQ float64 `help:"the reputation cut-off for disqualifying SNs based on returning 'unknown' errors during audit" default:"0.6"` SuspensionGracePeriod time.Duration `help:"the time period that must pass before suspended nodes will be disqualified" releaseDefault:"168h" devDefault:"1h"` SuspensionDQEnabled bool `` /* 153-byte string literal not displayed */ AuditCount int64 `help:"the number of times a node has been audited to not be considered a New Node" releaseDefault:"100" devDefault:"0"` AuditHistory AuditHistoryConfig FlushInterval time.Duration `` /* 184-byte string literal not displayed */ ErrorRetryInterval time.Duration `` /* 132-byte string literal not displayed */ InitialAlpha float64 `help:"the value to which an alpha reputation value should be initialized" default:"1000"` InitialBeta float64 `help:"the value to which a beta reputation value should be initialized" default:"0"` }
Config contains all config values for the reputation service.
type DB ¶
type DB interface { Update(ctx context.Context, request UpdateRequest, now time.Time) (_ *Info, err error) Get(ctx context.Context, nodeID storj.NodeID) (*Info, error) // ApplyUpdates applies multiple updates (defined by the updates // parameter) to a node's reputations record. ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates Mutations, reputationConfig Config, now time.Time) (_ *Info, err error) // UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits. UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error) // DisqualifyNode disqualifies a storage node. DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error) // SuspendNodeUnknownAudit suspends a storage node for unknown audits. SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) }
DB is an interface for storing reputation data.
type Info ¶
type Info struct { AuditSuccessCount int64 TotalAuditCount int64 VettedAt *time.Time UnknownAuditSuspended *time.Time OfflineSuspended *time.Time UnderReview *time.Time Disqualified *time.Time DisqualificationReason overlay.DisqualificationReason OnlineScore float64 AuditHistory *pb.AuditHistory AuditReputationAlpha float64 AuditReputationBeta float64 UnknownAuditReputationAlpha float64 UnknownAuditReputationBeta float64 }
Info contains all reputation data to be stored in DB.
type Mutations ¶ added in v1.57.1
type Mutations struct { PositiveResults int FailureResults int UnknownResults int OfflineResults int OnlineHistory *pb.AuditHistory }
Mutations represents changes which should be made to a particular node's reputation, in terms of counts and/or timestamps of events which have occurred. A Mutations record can be applied to a reputations row without prior knowledge of that row's contents.
func UpdateRequestToMutations ¶ added in v1.57.1
func UpdateRequestToMutations(updateReq UpdateRequest, now time.Time) (Mutations, error)
UpdateRequestToMutations transforms an UpdateRequest into the equivalent Mutations structure, which can be used with ApplyUpdates.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service handles storing node reputation data and updating the overlay cache when a node's status changes.
func NewService ¶
NewService creates a new reputation service.
func (*Service) ApplyAudit ¶
func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, reputation overlay.ReputationStatus, result AuditType) (err error)
ApplyAudit receives an audit result and applies it to the relevant node in DB.
func (*Service) FlushNodeInfo ¶ added in v1.60.1
FlushNodeInfo flushes any cached information about the specified node to the backing store, if the attached reputationDB does any caching at all.
func (*Service) Get ¶
Get returns a node's reputation info from DB. If a node is not found in the DB, default reputation information is returned.
func (*Service) TestDisqualifyNode ¶ added in v1.36.1
func (service *Service) TestDisqualifyNode(ctx context.Context, nodeID storj.NodeID, reason overlay.DisqualificationReason) (err error)
TestDisqualifyNode disqualifies a storage node.
func (*Service) TestFlushAllNodeInfo ¶ added in v1.60.1
TestFlushAllNodeInfo flushes any and all cached information about all nodes to the backing store, if the attached reputationDB does any caching at all.
type UpdateAuditHistoryResponse ¶ added in v1.36.1
UpdateAuditHistoryResponse contains information returned by UpdateAuditHistory.
type UpdateRequest ¶
type UpdateRequest struct { NodeID storj.NodeID AuditOutcome AuditType // Config is a copy of the Config struct from the satellite. // It is part of the UpdateRequest struct in order to be more easily // accessible from satellitedb code. Config }
UpdateRequest is used to update a node's reputation status.