Documentation ¶
Index ¶
Constants ¶
const ( // TargetTaskDuration is the desired duration of a re-clustering task. // If a task completes before the reclustering run has completed, a // continuation task will be scheduled. // // Longer durations will incur lower task queuing/re-queueing overhead, // but limit the ability of autoscaling to move tasks between instances // in response to load. TargetTaskDuration = 2 * time.Second // ProgressInterval is the amount of time between progress updates. // // Note that this is the frequency at which updates should // be reported for a shard of work; individual tasks are usually // much shorter lived and consequently most will not report any progress // (unless it is time for the shard to report progress again). ProgressInterval = 5 * time.Second )
Variables ¶
var UpdateRaceErr = errors.New("concurrent modification to cluster")
UpdateRaceErr is the error returned by UpdateClustering if a concurrent modification (or deletion) of a chunk is detected.
Functions ¶
func Ruleset ¶
func Ruleset(ctx context.Context, project string, minimumPredicatesVersion time.Time) (*cache.Ruleset, error)
Ruleset returns the cached ruleset for the given project. If a minimum version of rule predicates is required, pass it as minimumPredicatesVersion. If a strong read is required, pass cache.StrongRead. Otherwise, pass rules.StartingEpoch.
Types ¶
type Analysis ¶
type Analysis interface { // HandleUpdatedClusters handles (re-)clustered test results. It is called // after the spanner transaction effecting the (re-)clustering has // committed. commitTime is the Spanner time the transaction committed. HandleUpdatedClusters(ctx context.Context, updates *clustering.Update, commitTime time.Time) error }
Analysis is the interface for cluster analysis.
type ChunkStore ¶
type ChunkStore interface { // Get retrieves the chunk with the specified object ID and returns it. Get(ctx context.Context, project, objectID string) (*cpb.Chunk, error) }
ChunkStore is the interface for the blob store archiving chunks of test results for later re-clustering.
type PendingUpdate ¶
type PendingUpdate struct { // Chunk is the identity of the chunk which will be updated. Chunk state.ChunkKey // contains filtered or unexported fields }
PendingUpdate is a (re-)clustering of a chunk of test results that has not been applied to Spanner and/or sent for re-analysis yet.
func PrepareUpdate ¶
func PrepareUpdate(ctx context.Context, ruleset *cache.Ruleset, config *compiledcfg.ProjectConfig, chunk *cpb.Chunk, existingState *state.Entry) (upd *PendingUpdate, err error)
PrepareUpdate will (re-)cluster the specific chunk of test results, preparing an updated state for Spanner and updates to be exported to analysis. The caller can determine how to batch these updates/ exports together, with help of the Size() method on the returned pending update.
If the chunk does not exist in Spanner, pass a *state.Entry with project, chunkID, objectID and partitionTime set but with LastUpdated set to its zero value. The chunk will be clustered for the first time and saved to Spanner.
If the chunk does exist in Spanner, pass the state.Entry read from Spanner, along with the test results. The chunk will be re-clustered and updated.
func (*PendingUpdate) ApplyToAnalysis ¶
func (p *PendingUpdate) ApplyToAnalysis(ctx context.Context, analysis Analysis, commitTime time.Time) error
ApplyToAnalysis exports changed failures for re-analysis. The Spanner commit time must be provided so that analysis has the correct update chronology.
func (*PendingUpdate) ApplyToSpanner ¶
func (p *PendingUpdate) ApplyToSpanner(ctx context.Context) error
Attempts to apply the update to Spanner.
Important: Before calling this method, the caller should verify the chunks in Spanner still have the same LastUpdatedTime as passed to PrepareUpdate, in the same transaction as attempting this update. This will prevent clobbering a concurrently applied update or create.
In case of an update race, PrepareUpdate should be retried with a more recent version of the chunk.
func (*PendingUpdate) EstimatedTransactionSize ¶
func (p *PendingUpdate) EstimatedTransactionSize() int
EstimatedTransactionSize returns the estimated size of the Spanner transaction, in bytes.
func (*PendingUpdate) FailuresUpdated ¶
func (p *PendingUpdate) FailuresUpdated() int
FailuresUpdated returns the number of failures that will exported for re-analysis as a result of the update.
type PendingUpdates ¶
type PendingUpdates struct {
// contains filtered or unexported fields
}
PendingUpdates represents a pending set of chunk updates. It facilitates batching updates together for efficiency.
func NewPendingUpdates ¶
func NewPendingUpdates(ctx context.Context) *PendingUpdates
NewPendingUpdates initialises a new PendingUpdates.
func (*PendingUpdates) Add ¶
func (p *PendingUpdates) Add(update *PendingUpdate)
Add adds the specified update to the set of pending updates.
func (*PendingUpdates) Apply ¶
func (p *PendingUpdates) Apply(ctx context.Context, analysis Analysis) (err error)
Apply applies the chunk updates to Spanner and exports them for re-analysis. If some applications failed because of a concurrent modification, the method returns UpdateRaceErr. In this case, the caller should construct the updates again from a fresh read of the Clustering State and retry. Note that some of the updates may have successfully applied.
func (*PendingUpdates) ShouldApply ¶
func (p *PendingUpdates) ShouldApply(ctx context.Context) bool
ShouldApply returns whether the updates should be applied now because they have reached a maximum size or time limit.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker provides methods to process re-clustering tasks. It is safe to be used by multiple threads concurrently.
func NewWorker ¶
func NewWorker(chunkStore ChunkStore, analysis Analysis) *Worker
NewWorker initialises a new Worker.
func (*Worker) Do ¶
func (w *Worker) Do(ctx context.Context, task *taskspb.ReclusterChunks, duration time.Duration) (*taskspb.ReclusterChunks, error)
Do works on a re-clustering task for approximately duration, returning a continuation task (if the run end time has not been reached).
Continuation tasks are used to better integrate with GAE autoscaling, autoscaling work best when tasks are relatively small (so that work can be moved between instances in real time).