reclustering

package
v0.0.0-...-72ee14c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2025 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// TargetTaskDuration is the desired duration of a re-clustering task.
	// If a task completes before the reclustering run has completed, a
	// continuation task will be scheduled.
	//
	// Longer durations will incur lower task queuing/re-queueing overhead,
	// but limit the ability of autoscaling to move tasks between instances
	// in response to load.
	TargetTaskDuration = 2 * time.Second

	// ProgressInterval is the amount of time between progress updates.
	//
	// Note that this is the frequency at which updates should
	// be reported for a shard of work; individual tasks are usually
	// much shorter lived and consequently most will not report any progress
	// (unless it is time for the shard to report progress again).
	ProgressInterval = 5 * time.Second
)

Variables

View Source
var UpdateRaceErr = errors.New("concurrent modification to cluster")

UpdateRaceErr is the error returned by UpdateClustering if a concurrent modification (or deletion) of a chunk is detected.

Functions

func Ruleset

func Ruleset(ctx context.Context, project string, minimumPredicatesVersion time.Time) (*cache.Ruleset, error)

Ruleset returns the cached ruleset for the given project. If a minimum version of rule predicates is required, pass it as minimumPredicatesVersion. If a strong read is required, pass cache.StrongRead. Otherwise, pass rules.StartingEpoch.

Types

type Analysis

type Analysis interface {
	// HandleUpdatedClusters handles (re-)clustered test results. It is called
	// after the spanner transaction effecting the (re-)clustering has
	// committed. commitTime is the Spanner time the transaction committed.
	HandleUpdatedClusters(ctx context.Context, updates *clustering.Update, commitTime time.Time) error
}

Analysis is the interface for cluster analysis.

type ChunkStore

type ChunkStore interface {
	// Get retrieves the chunk with the specified object ID and returns it.
	Get(ctx context.Context, project, objectID string) (*cpb.Chunk, error)
}

ChunkStore is the interface for the blob store archiving chunks of test results for later re-clustering.

type PendingUpdate

type PendingUpdate struct {
	// Chunk is the identity of the chunk which will be updated.
	Chunk state.ChunkKey
	// contains filtered or unexported fields
}

PendingUpdate is a (re-)clustering of a chunk of test results that has not been applied to Spanner and/or sent for re-analysis yet.

func PrepareUpdate

func PrepareUpdate(ctx context.Context, ruleset *cache.Ruleset, config *compiledcfg.ProjectConfig, chunk *cpb.Chunk, existingState *state.Entry) (upd *PendingUpdate, err error)

PrepareUpdate will (re-)cluster the specific chunk of test results, preparing an updated state for Spanner and updates to be exported to analysis. The caller can determine how to batch these updates/ exports together, with help of the Size() method on the returned pending update.

If the chunk does not exist in Spanner, pass a *state.Entry with project, chunkID, objectID and partitionTime set but with LastUpdated set to its zero value. The chunk will be clustered for the first time and saved to Spanner.

If the chunk does exist in Spanner, pass the state.Entry read from Spanner, along with the test results. The chunk will be re-clustered and updated.

func (*PendingUpdate) ApplyToAnalysis

func (p *PendingUpdate) ApplyToAnalysis(ctx context.Context, analysis Analysis, commitTime time.Time) error

ApplyToAnalysis exports changed failures for re-analysis. The Spanner commit time must be provided so that analysis has the correct update chronology.

func (*PendingUpdate) ApplyToSpanner

func (p *PendingUpdate) ApplyToSpanner(ctx context.Context) error

Attempts to apply the update to Spanner.

Important: Before calling this method, the caller should verify the chunks in Spanner still have the same LastUpdatedTime as passed to PrepareUpdate, in the same transaction as attempting this update. This will prevent clobbering a concurrently applied update or create.

In case of an update race, PrepareUpdate should be retried with a more recent version of the chunk.

func (*PendingUpdate) EstimatedTransactionSize

func (p *PendingUpdate) EstimatedTransactionSize() int

EstimatedTransactionSize returns the estimated size of the Spanner transaction, in bytes.

func (*PendingUpdate) FailuresUpdated

func (p *PendingUpdate) FailuresUpdated() int

FailuresUpdated returns the number of failures that will exported for re-analysis as a result of the update.

type PendingUpdates

type PendingUpdates struct {
	// contains filtered or unexported fields
}

PendingUpdates represents a pending set of chunk updates. It facilitates batching updates together for efficiency.

func NewPendingUpdates

func NewPendingUpdates(ctx context.Context) *PendingUpdates

NewPendingUpdates initialises a new PendingUpdates.

func (*PendingUpdates) Add

func (p *PendingUpdates) Add(update *PendingUpdate)

Add adds the specified update to the set of pending updates.

func (*PendingUpdates) Apply

func (p *PendingUpdates) Apply(ctx context.Context, analysis Analysis) (err error)

Apply applies the chunk updates to Spanner and exports them for re-analysis. If some applications failed because of a concurrent modification, the method returns UpdateRaceErr. In this case, the caller should construct the updates again from a fresh read of the Clustering State and retry. Note that some of the updates may have successfully applied.

func (*PendingUpdates) ShouldApply

func (p *PendingUpdates) ShouldApply(ctx context.Context) bool

ShouldApply returns whether the updates should be applied now because they have reached a maximum size or time limit.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker provides methods to process re-clustering tasks. It is safe to be used by multiple threads concurrently.

func NewWorker

func NewWorker(chunkStore ChunkStore, analysis Analysis) *Worker

NewWorker initialises a new Worker.

func (*Worker) Do

Do works on a re-clustering task for approximately duration, returning a continuation task (if the run end time has not been reached).

Continuation tasks are used to better integrate with GAE autoscaling, autoscaling work best when tasks are relatively small (so that work can be moved between instances in real time).

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL