reputation

package
v1.116.1-rc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2024 License: AGPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// Error is the default reputation errs class.
	Error = errs.Class("reputation")
	// ErrNodeNotFound is returned if a node does not exist in database.
	ErrNodeNotFound = errs.Class("node not found")
)

Functions

func AddAuditToHistory added in v1.56.1

func AddAuditToHistory(a *pb.AuditHistory, online bool, auditTime time.Time, config AuditHistoryConfig) error

AddAuditToHistory adds a single online/not-online event to an AuditHistory. If the AuditHistory contains windows that are now outside the tracking period, those windows will be trimmed.

func DuplicateAuditHistory added in v1.56.1

func DuplicateAuditHistory(auditHistory *pb.AuditHistory) *pb.AuditHistory

DuplicateAuditHistory creates a duplicate (deep copy) of an AuditHistory object.

func MergeAuditHistories added in v1.57.1

func MergeAuditHistories(history *pb.AuditHistory, addHistory []*pb.AuditWindow, config AuditHistoryConfig) (trackingPeriodFull bool)

MergeAuditHistories merges two audit histories into one, including all windows that are present in either input and summing counts for any windows that appear in _both_ inputs. Any windows that are now outside the tracking period will be trimmed.

The history parameter will be mutated to include the windows passed as addHistory.

Returns true if the number of windows in the new history is the maximum possible for the tracking config.

func RecalculateScore added in v1.57.1

func RecalculateScore(history *pb.AuditHistory)

RecalculateScore calculates and assigns the Score field in a pb.AuditHistory object. The score is calculated by averaging the online percentage in each window (not including the last).

func UpdateReputation added in v1.57.1

func UpdateReputation(isSuccess bool, alpha, beta, lambda, w float64) (newAlpha, newBeta float64)

UpdateReputation uses the Beta distribution model to determine a node's reputation. lambda is the "forgetting factor" which determines how much past info is kept when determining current reputation score. w is the normalization weight that affects how severely new updates affect the current reputation distribution.

func UpdateReputationMultiple added in v1.57.1

func UpdateReputationMultiple(count int, alpha, beta, lambda, w float64) (newAlpha, newBeta float64)

UpdateReputationMultiple works like UpdateReputation, but applies multiple successive counts of an event type to the alpha and beta measures.

With the arguments as named, applies 'count' successful audits. To apply negative audits, swap the alpha and beta parameters and return values.

WARNING: GREEK LETTER MATH AHEAD

Applying n successful audit results to an initial alpha value of α₀ gives a new α₁ value of:

α₁ = λⁿα₀ + λⁿ⁻¹w + λⁿ⁻²w + ... + λ²w + λw + w

The terms with w are the first n terms of a geometric series with coefficient w and common ratio λ. The closed form formula for the sum of those first n terms is (w(1-λⁿ) / (1-λ)) (https://en.wikipedia.org/wiki/Geometric_series#Closed-form_formula). Adding the initial λⁿα₀ term, we get

α₁ = λⁿα₀ + w(1-λⁿ) / (1-λ)

The formula has the same structure for beta for n _failures_.

β₁ = λⁿβ₀ + w(1-λⁿ) / (1-λ)

For n _failures_,

α₁ = λⁿα₀

For n _successes_,

β₁ = λⁿβ₀

Types

type AuditHistoryConfig

type AuditHistoryConfig struct {
	WindowSize               time.Duration `help:"The length of time spanning a single audit window" releaseDefault:"12h" devDefault:"5m" testDefault:"10m"`
	TrackingPeriod           time.Duration `` /* 127-byte string literal not displayed */
	GracePeriod              time.Duration `` /* 236-byte string literal not displayed */
	OfflineThreshold         float64       `` /* 226-byte string literal not displayed */
	OfflineDQEnabled         bool          `` /* 134-byte string literal not displayed */
	OfflineSuspensionEnabled bool          `help:"whether nodes will be suspended if they have low online score" releaseDefault:"true" devDefault:"true"`
}

AuditHistoryConfig is a configuration struct defining time periods and thresholds for penalizing nodes for being offline. It is used for downtime suspension and disqualification.

type AuditType

type AuditType int

AuditType is an enum representing the outcome of a particular audit.

const (
	// AuditSuccess represents a successful audit.
	AuditSuccess AuditType = iota
	// AuditFailure represents a failed audit.
	AuditFailure
	// AuditUnknown represents an audit that resulted in an unknown error from the node.
	AuditUnknown
	// AuditOffline represents an audit where a node was offline.
	AuditOffline
)

func (AuditType) String added in v1.55.1

func (auditType AuditType) String() string

type CachingDB added in v1.60.1

type CachingDB struct {
	// contains filtered or unexported fields
}

CachingDB acts like a reputation.DB but caches reads and writes, to minimize load on the backing store.

func NewCachingDB added in v1.60.1

func NewCachingDB(log *zap.Logger, backingStore DB, reputationConfig Config) *CachingDB

NewCachingDB creates a new CachingDB instance.

func (*CachingDB) ApplyUpdates added in v1.60.1

func (cdb *CachingDB) ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates Mutations, config Config, now time.Time) (info *Info, err error)

ApplyUpdates applies multiple updates (defined by the updates parameter) to a node's reputations record.

If the node (as represented in the returned info) becomes newly vetted, disqualified, or suspended as a result of these updates, the caller is responsible for updating the records in the overlay to match.

func (*CachingDB) DisqualifyNode added in v1.60.1

func (cdb *CachingDB) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error)

DisqualifyNode disqualifies a storage node.

func (*CachingDB) FlushAll added in v1.60.1

func (cdb *CachingDB) FlushAll(ctx context.Context) (err error)

FlushAll syncs all pending reputation mutations to the backing store.

func (*CachingDB) Get added in v1.60.1

func (cdb *CachingDB) Get(ctx context.Context, nodeID storj.NodeID) (info *Info, err error)

Get retrieves the cached *Info record for the given node ID. If the information is not already in the cache, the information is fetched from the backing store.

If an error occurred syncing the entry with the backing store, it will be returned. In this case, the returned value for 'info' might be nil, or it might contain data cached longer than FlushInterval.

func (*CachingDB) Manage added in v1.60.1

func (cdb *CachingDB) Manage(ctx context.Context) error

Manage should be run in its own goroutine while a CachingDB is in use. This will schedule database flushes, trying to avoid too much load all at once.

func (*CachingDB) RequestSync added in v1.60.1

func (cdb *CachingDB) RequestSync(ctx context.Context, nodeID storj.NodeID) (err error)

RequestSync requests the managing goroutine to perform a sync of cached info about the specified node to the backing store. This involves applying the cached mutations and resetting the info attribute to match a snapshot of what is in the backing store after the mutations.

func (*CachingDB) SetNowFunc added in v1.60.1

func (cdb *CachingDB) SetNowFunc(timeFunc func() time.Time)

SetNowFunc supplies a new function to use for determining the current time, for synchronization timing and scheduling purposes. This is frequently useful in test scenarios.

func (*CachingDB) SuspendNodeUnknownAudit added in v1.60.1

func (cdb *CachingDB) SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)

SuspendNodeUnknownAudit suspends a storage node for unknown audits.

func (*CachingDB) UnsuspendNodeUnknownAudit added in v1.60.1

func (cdb *CachingDB) UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error)

UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.

func (*CachingDB) Update added in v1.60.1

func (cdb *CachingDB) Update(ctx context.Context, request UpdateRequest, auditTime time.Time) (info *Info, err error)

Update applies a single update (one audit outcome) to a node's reputations record.

If the node (as represented in the returned info) becomes newly vetted, disqualified, or suspended as a result of this update, the caller is responsible for updating the records in the overlay to match.

type Config

type Config struct {
	AuditRepairWeight     float64       `help:"weight to apply to audit reputation for total repair reputation calculation" default:"1.0"`
	AuditUplinkWeight     float64       `help:"weight to apply to audit reputation for total uplink reputation calculation" default:"1.0"`
	AuditLambda           float64       `help:"the forgetting factor used to update storage node reputation due to audits" default:"0.999"`
	AuditWeight           float64       `help:"the normalization weight used to calculate the audit SNs reputation" default:"1.0"`
	AuditDQ               float64       `help:"the reputation cut-off for disqualifying SNs based on audit history" default:"0.96"`
	UnknownAuditLambda    float64       `` /* 130-byte string literal not displayed */
	UnknownAuditDQ        float64       `help:"the reputation cut-off for disqualifying SNs based on returning 'unknown' errors during audit" default:"0.6"`
	SuspensionGracePeriod time.Duration `help:"the time period that must pass before suspended nodes will be disqualified" releaseDefault:"168h" devDefault:"1h"`
	SuspensionDQEnabled   bool          `` /* 153-byte string literal not displayed */
	AuditCount            int64         `help:"the number of times a node has been audited to not be considered a New Node" releaseDefault:"100" devDefault:"0"`
	AuditHistory          AuditHistoryConfig
	FlushInterval         time.Duration `` /* 184-byte string literal not displayed */
	ErrorRetryInterval    time.Duration `` /* 132-byte string literal not displayed */
	InitialAlpha          float64       `help:"the value to which an alpha reputation value should be initialized" default:"1000"`
	InitialBeta           float64       `help:"the value to which a beta reputation value should be initialized" default:"0"`
}

Config contains all config values for the reputation service.

type DB

type DB interface {
	Update(ctx context.Context, request UpdateRequest, now time.Time) (_ *Info, err error)
	Get(ctx context.Context, nodeID storj.NodeID) (*Info, error)
	// ApplyUpdates applies multiple updates (defined by the updates
	// parameter) to a node's reputations record.
	ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates Mutations, reputationConfig Config, now time.Time) (_ *Info, err error)

	// UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
	UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error)
	// DisqualifyNode disqualifies a storage node.
	DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error)
	// SuspendNodeUnknownAudit suspends a storage node for unknown audits.
	SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
}

DB is an interface for storing reputation data.

type Info

type Info struct {
	AuditSuccessCount           int64
	TotalAuditCount             int64
	VettedAt                    *time.Time
	UnknownAuditSuspended       *time.Time
	OfflineSuspended            *time.Time
	UnderReview                 *time.Time
	Disqualified                *time.Time
	DisqualificationReason      overlay.DisqualificationReason
	OnlineScore                 float64
	AuditHistory                *pb.AuditHistory
	AuditReputationAlpha        float64
	AuditReputationBeta         float64
	UnknownAuditReputationAlpha float64
	UnknownAuditReputationBeta  float64
}

Info contains all reputation data to be stored in DB.

func (*Info) Copy added in v1.60.1

func (i *Info) Copy() *Info

Copy creates a deep copy of the Info object.

type Mutations added in v1.57.1

type Mutations struct {
	PositiveResults int
	FailureResults  int
	UnknownResults  int
	OfflineResults  int
	OnlineHistory   *pb.AuditHistory
}

Mutations represents changes which should be made to a particular node's reputation, in terms of counts and/or timestamps of events which have occurred. A Mutations record can be applied to a reputations row without prior knowledge of that row's contents.

func UpdateRequestToMutations added in v1.57.1

func UpdateRequestToMutations(updateReq UpdateRequest, now time.Time) (Mutations, error)

UpdateRequestToMutations transforms an UpdateRequest into the equivalent Mutations structure, which can be used with ApplyUpdates.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service handles storing node reputation data and updating the overlay cache when a node's status changes.

func NewService

func NewService(log *zap.Logger, overlay *overlay.Service, db DB, config Config) *Service

NewService creates a new reputation service.

func (*Service) ApplyAudit

func (service *Service) ApplyAudit(ctx context.Context, nodeID storj.NodeID, reputation overlay.ReputationStatus, result AuditType) (err error)

ApplyAudit receives an audit result and applies it to the relevant node in DB.

func (*Service) Close added in v1.36.1

func (service *Service) Close() error

Close closes resources.

func (*Service) FlushNodeInfo added in v1.60.1

func (service *Service) FlushNodeInfo(ctx context.Context, nodeID storj.NodeID) (err error)

FlushNodeInfo flushes any cached information about the specified node to the backing store, if the attached reputationDB does any caching at all.

func (*Service) Get

func (service *Service) Get(ctx context.Context, nodeID storj.NodeID) (info *Info, err error)

Get returns a node's reputation info from DB. If a node is not found in the DB, default reputation information is returned.

func (*Service) TestDisqualifyNode added in v1.36.1

func (service *Service) TestDisqualifyNode(ctx context.Context, nodeID storj.NodeID, reason overlay.DisqualificationReason) (err error)

TestDisqualifyNode disqualifies a storage node.

func (*Service) TestFlushAllNodeInfo added in v1.60.1

func (service *Service) TestFlushAllNodeInfo(ctx context.Context) (err error)

TestFlushAllNodeInfo flushes any and all cached information about all nodes to the backing store, if the attached reputationDB does any caching at all.

func (*Service) TestSuspendNodeUnknownAudit added in v1.36.1

func (service *Service) TestSuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)

TestSuspendNodeUnknownAudit suspends a storage node for unknown audits.

func (*Service) TestUnsuspendNodeUnknownAudit added in v1.36.1

func (service *Service) TestUnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error)

TestUnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.

type UpdateAuditHistoryResponse added in v1.36.1

type UpdateAuditHistoryResponse struct {
	NewScore           float64
	TrackingPeriodFull bool
	History            []byte
}

UpdateAuditHistoryResponse contains information returned by UpdateAuditHistory.

type UpdateRequest

type UpdateRequest struct {
	NodeID       storj.NodeID
	AuditOutcome AuditType
	// Config is a copy of the Config struct from the satellite.
	// It is part of the UpdateRequest struct in order to be more easily
	// accessible from satellitedb code.
	Config
}

UpdateRequest is used to update a node's reputation status.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL