retain

package
v1.118.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2024 License: AGPL-3.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// Error is the default error class for retain errors.
	Error = errs.Class("retain")
)

Functions

func DeleteFile added in v1.101.1

func DeleteFile(path, filename string) error

DeleteFile removes a file from the filesystem.

func SaveRequest added in v1.101.1

func SaveRequest(path, filename string, request *pb.RetainRequest) error

SaveRequest stores the request to the filesystem.

Types

type BloomFilterManager added in v1.118.4

type BloomFilterManager struct {
	// contains filtered or unexported fields
}

BloomFilterManager manages a directory that holds the most recent bloom filter for satellites and allows getting a callback to query them.

func NewBloomFilterManager added in v1.118.4

func NewBloomFilterManager(dir string) (*BloomFilterManager, error)

NewBloomFilterManager constructs a BloomFilterManager with the given directory. This function does not do the standard pattern where an error means the return result is invalid. Indeed, the result is always valid, and the errors are purely informational.

func (*BloomFilterManager) GetBloomFilter added in v1.118.4

func (bfm *BloomFilterManager) GetBloomFilter(satellite storj.NodeID) ShouldTrashFunc

GetBloomFilter returns a ShouldTrashFunc for the given satellite that always queries whatever the latest bloom filter is for the given satellite.

func (*BloomFilterManager) Queue added in v1.118.4

func (bfm *BloomFilterManager) Queue(ctx context.Context, satellite storj.NodeID, req *pb.RetainRequest) (err error)

Queue stores the RetainRequest for the satellite and sets the current bloom filter to be based on it. It will not update the bloom filter unless the created time increases.

func (*BloomFilterManager) Status added in v1.118.4

func (bfm *BloomFilterManager) Status() Status

Status implements the piecestore.QueueRetain interface.

type Config

type Config struct {
	MaxTimeSkew time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"72h0m0s"`
	Status      Status        `` /* 143-byte string literal not displayed */
	Concurrency int           `help:"how many concurrent retain requests can be processed at the same time." default:"1"`
	CachePath   string        `help:"path to the cache directory for retain requests." default:"$CONFDIR/retain"`
}

Config defines parameters for the retain service.

type Queue added in v1.101.1

type Queue interface {
	// Add adds a request to the queue.
	Add(satelliteID storj.NodeID, request *pb.RetainRequest) (bool, error)
	// Remove removes a request from the queue.
	// Returns true if there was a request to remove.
	Remove(request Request) bool
	// Next returns the next request from the queue.
	Next() (Request, bool)
	// Len returns the number of requests in the queue.
	Len() int
	// DeleteCache removes the request from the queue and deletes the cache file.
	DeleteCache(request Request) error
}

Queue manages the retain requests queue.

type Request

type Request struct {
	Filename      string
	SatelliteID   storj.NodeID
	CreatedBefore time.Time
	Filter        *bloomfilter.Filter
}

Request contains all the info necessary to process a retain request.

func (*Request) GetFilename added in v1.106.1

func (req *Request) GetFilename() string

GetFilename returns the filename used to store the request in the cache directory.

type RequestStore added in v1.101.1

type RequestStore struct {
	// contains filtered or unexported fields
}

RequestStore is a cache of requests to retain pieces.

func NewRequestStore added in v1.101.1

func NewRequestStore(path string) (RequestStore, error)

NewRequestStore loads the request caches from disk.

func (*RequestStore) Add added in v1.101.1

func (store *RequestStore) Add(satelliteID storj.NodeID, pbReq *pb.RetainRequest) (bool, error)

Add adds a request to the store. It returns true if the request was added, and an error if the file could not be saved.

func (*RequestStore) Data added in v1.101.1

func (store *RequestStore) Data() map[storj.NodeID]Request

Data returns the data in the store.

func (*RequestStore) DeleteCache added in v1.101.1

func (store *RequestStore) DeleteCache(req Request) error

DeleteCache removes the request from the store and deletes the cache file.

func (*RequestStore) Len added in v1.101.1

func (store *RequestStore) Len() int

Len returns the number of requests in the store.

func (*RequestStore) Next added in v1.101.1

func (store *RequestStore) Next() (Request, bool)

Next returns the next request from the store.

func (*RequestStore) Remove added in v1.101.1

func (store *RequestStore) Remove(req Request) bool

Remove removes a request from the queue. It returns true if the request was found in the queue. It does not remove the cache file from the filesystem.

type RestoreTimeManager added in v1.118.4

type RestoreTimeManager struct {
	// contains filtered or unexported fields
}

RestoreTimeManager keeps track of the latest timestamp that a restore was called per satellite.

func NewRestoreTimeManager added in v1.118.4

func NewRestoreTimeManager(dir string) *RestoreTimeManager

NewRestoreTimeManager constructs a restoreManager using the given directory.

func (*RestoreTimeManager) GetRestoreTime added in v1.118.4

func (r *RestoreTimeManager) GetRestoreTime(ctx context.Context, satellite storj.NodeID, now time.Time) (_ time.Time)

GetRestoreTime returns the latest restore timestamp for the given satellite. If there is no value, the now value is returned and attempted to be stored.

func (*RestoreTimeManager) SetRestoreTime added in v1.118.4

func (r *RestoreTimeManager) SetRestoreTime(ctx context.Context, satellite storj.NodeID, now time.Time) (err error)

SetRestoreTime sets the restore timestamp for the given satellite.

type RunOnce added in v1.117.3

type RunOnce struct {
	// contains filtered or unexported fields
}

RunOnce is a helper to run the retain cleaner only once.

func NewRunOnce added in v1.117.3

func NewRunOnce(service *Service, stop *modular.StopTrigger) *RunOnce

NewRunOnce creates a new RunOnce.

func (*RunOnce) Run added in v1.117.3

func (r *RunOnce) Run(ctx context.Context) error

Run picks next saved BF, and executes retainPieces.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service queues and processes retain requests from satellites.

architecture: Worker

func NewService

func NewService(log *zap.Logger, store *pieces.Store, config Config) *Service

NewService creates a new retain service.

func (*Service) Close added in v0.20.0

func (s *Service) Close() error

Close causes any pending Run to exit and waits for any retain requests to clean up.

func (*Service) Queue

func (s *Service) Queue(ctx context.Context, satelliteID storj.NodeID, req *pb.RetainRequest) error

Queue adds a retain request to the queue. true is returned if the request is added to the queue, false if queue is closed.

func (*Service) Run

func (s *Service) Run(ctx context.Context) (err error)

Run listens for queued retain requests and processes them as they come in.

func (*Service) Status

func (s *Service) Status() Status

Status returns the retain status.

func (*Service) TestWaitUntilEmpty added in v0.20.0

func (s *Service) TestWaitUntilEmpty()

TestWaitUntilEmpty blocks until the queue and working is empty. When Run exits, it empties the queue.

func (*Service) TestingHowManyQueued added in v1.100.2

func (s *Service) TestingHowManyQueued() int

TestingHowManyQueued peeks at the number of bloom filters queued.

type ShouldTrashFunc added in v1.118.4

type ShouldTrashFunc = func(ctx context.Context, pieceID storj.PieceID, created time.Time) bool

ShouldTrashFunc is an alias for the callback that is used to determine if a piece should be trashed.

type Status

type Status uint32

Status is a type defining the enabled/disabled status of retain requests.

const (
	// Disabled means we do not do anything with retain requests.
	Disabled Status = iota + 1
	// Enabled means we fully enable retain requests and delete data not defined by bloom filter.
	Enabled
	// Debug means we partially enable retain requests, and print out pieces we should delete, without actually deleting them.
	Debug
	// Store means the retain messages will be saved, but not processed.
	Store
)

func (*Status) Set

func (v *Status) Set(s string) error

Set implements pflag.Value.

func (*Status) String

func (v *Status) String() string

String implements pflag.Value.

func (*Status) Type

func (*Status) Type() string

Type implements pflag.Value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL