Documentation ¶
Index ¶
- Constants
- func NewFixerContext(ctx context.Context, workflowName string, fixerContext FixerContext) context.Context
- func NewScannerContext(ctx context.Context, workflowName string, scannerContext ScannerContext) context.Context
- type AggregateFixReportResult
- type AggregateScanReportResult
- type ControlFlowFailure
- type CorruptedKeysEntry
- type CustomScannerConfig
- type DomainFixReportQueryResult
- type DomainFixStats
- type DomainReportQueryRequest
- type DomainScanReportQueryResult
- type DomainScanStats
- type DynamicParams
- type FixKeys
- type FixReport
- type FixReportError
- type FixResult
- type FixShardActivityParams
- type FixShardConfigParams
- type FixShardConfigResults
- type FixShardHeartbeatDetails
- type FixStats
- type Fixer
- type FixerContext
- type FixerCorruptedKeysActivityParams
- type FixerCorruptedKeysActivityResult
- type FixerHooks
- type FixerIteratorCB
- type FixerManagerCB
- type FixerWorkflow
- type FixerWorkflowConfigOverwrites
- type FixerWorkflowParams
- type GenericScannerConfig
- type GenericScannerConfigOverwrites
- type IteratorCB
- type ManagerCB
- type PaginatedShardQueryRequest
- type ResolvedFixerWorkflowConfig
- type ResolvedScannerWorkflowConfig
- type ScanKeys
- type ScanReport
- type ScanReportError
- type ScanResult
- type ScanShardActivityParams
- type ScanShardHeartbeatDetails
- type ScanStats
- type Scanner
- type ScannerConfig
- type ScannerConfigActivityParams
- type ScannerContext
- type ScannerEmitMetricsActivityParams
- type ScannerHooks
- type ScannerWorkflow
- type ScannerWorkflowConfigOverwrites
- type ScannerWorkflowParams
- type ShardCorruptKeysQueryResult
- type ShardCorruptKeysResult
- type ShardDistributionStats
- type ShardFixResultAggregator
- func (a *ShardFixResultAggregator) AddReport(report FixReport)
- func (a *ShardFixResultAggregator) GetAggregation() AggregateFixReportResult
- func (a *ShardFixResultAggregator) GetAllFixResults() (map[int]FixResult, error)
- func (a *ShardFixResultAggregator) GetDomainStatus(req DomainReportQueryRequest) (*DomainFixReportQueryResult, error)
- func (a *ShardFixResultAggregator) GetReport(shardID int) (*FixReport, error)
- func (a *ShardFixResultAggregator) GetStatusResult(req PaginatedShardQueryRequest) (*ShardStatusQueryResult, error)
- func (a *ShardFixResultAggregator) GetStatusSummary() ShardStatusSummaryResult
- type ShardFixer
- type ShardQueryPaginationToken
- type ShardRange
- type ShardScanResultAggregator
- func (a *ShardScanResultAggregator) AddReport(report ScanReport)
- func (a *ShardScanResultAggregator) GetAggregateReport() AggregateScanReportResult
- func (a *ShardScanResultAggregator) GetAllScanResults() (map[int]ScanResult, error)
- func (a *ShardScanResultAggregator) GetCorruptionKeys(req PaginatedShardQueryRequest) (*ShardCorruptKeysQueryResult, error)
- func (a *ShardScanResultAggregator) GetDomainStatus(req DomainReportQueryRequest) (*DomainScanReportQueryResult, error)
- func (a *ShardScanResultAggregator) GetReport(shardID int) (*ScanReport, error)
- func (a *ShardScanResultAggregator) GetShardDistributionStats() ShardDistributionStats
- func (a *ShardScanResultAggregator) GetShardSizeQueryResult(req ShardSizeQueryRequest) (ShardSizeQueryResult, error)
- func (a *ShardScanResultAggregator) GetStatusResult(req PaginatedShardQueryRequest) (*ShardStatusQueryResult, error)
- func (a *ShardScanResultAggregator) GetStatusSummary() ShardStatusSummaryResult
- type ShardScanner
- type ShardSizeQueryRequest
- type ShardSizeQueryResult
- type ShardSizeTuple
- type ShardStatus
- type ShardStatusQueryResult
- type ShardStatusResult
- type ShardStatusSummaryResult
- type Shards
Constants ¶
const ( // ActivityScannerEmitMetrics is the activity name for scannerEmitMetricsActivity ActivityScannerEmitMetrics = "cadence-sys-shardscanner-emit-metrics-activity" // ActivityScannerConfig is the activity name scannerConfigActivity ActivityScannerConfig = "cadence-sys-shardscanner-config-activity" // ActivityFixerConfig is the activity name fixerConfigActivity ActivityFixerConfig = "cadence-sys-shardscanner-fixer-config-activity" // ActivityScanShard is the activity name for scanShardActivity ActivityScanShard = "cadence-sys-shardscanner-scanshard-activity" // ActivityFixerCorruptedKeys is the activity name for fixerCorruptedKeysActivity ActivityFixerCorruptedKeys = "cadence-sys-shardscanner-corruptedkeys-activity" // ActivityFixShard is the activity name for fixShardActivity ActivityFixShard = "cadence-sys-shardscanner-fixshard-activity" // ShardCorruptKeysQuery is the query name for the query used to get all completed shards with at least one corruption ShardCorruptKeysQuery = "shard_corrupt_keys" )
const ( // ShardReportQuery is the query name for the query used to get a single shard's report ShardReportQuery = "shard_report" // ShardStatusQuery is the query name for the query used to get the status of all shards ShardStatusQuery = "shard_status" // ShardStatusSummaryQuery is the query name for the query used to get the shard status -> counts map ShardStatusSummaryQuery = "shard_status_summary" // AggregateReportQuery is the query name for the query used to get the aggregate result of all finished shards AggregateReportQuery = "aggregate_report" // ShardSizeQuery is the query name for the query used to get the number of executions per shard in sorted order ShardSizeQuery = "shard_size" // DomainReportQuery is the query name for the query used to get the reports per domains for all finished shards DomainReportQuery = "domain_report" // AllResultsQuery returns filenames / paginating data for corruptions and failures in this workflow, // for shards which have finished processing. This works for both scanner and fixer, and the return structures // are very similar. // // This data is also available for a single shard under ShardReportQuery, but using that requires // re-querying repeatedly if more than that single shard's data is desired, e.g. for manual // troubleshooting purposes. AllResultsQuery = "all_results" )
const ( // ErrScanWorkflowNotClosed indicates fix was attempted on scan workflow which was not finished ErrScanWorkflowNotClosed = "scan workflow is not closed, only can run fix on output of finished scan workflow" // ErrSerialization indicates a serialization or deserialization error occurred ErrSerialization = "encountered serialization error" // ErrMissingHooks indicates scanner is not providing hooks to Invariant manager or Iterator ErrMissingHooks = "hooks are not provided for this scanner" )
Variables ¶
This section is empty.
Functions ¶
func NewFixerContext ¶ added in v0.18.1
func NewFixerContext( ctx context.Context, workflowName string, fixerContext FixerContext, ) context.Context
NewContext provides context to be used as background activity context
func NewScannerContext ¶ added in v0.18.1
func NewScannerContext( ctx context.Context, workflowName string, scannerContext ScannerContext, ) context.Context
NewContext provides context to be used as background activity context
Types ¶
type AggregateFixReportResult ¶
type AggregateFixReportResult FixStats
AggregateFixReportResult indicates the result of summing together all shard reports that have finished for fix.
type AggregateScanReportResult ¶
type AggregateScanReportResult ScanStats
AggregateScanReportResult indicates the result of summing together all shard reports which have finished scan.
type ControlFlowFailure ¶
ControlFlowFailure indicates an error occurred which makes it impossible to even attempt to check or fix one or more execution(s). Note that it is not a ControlFlowFailure if a check or fix fails, it is only a ControlFlowFailure if an error is encountered which makes even attempting to check or fix impossible.
type CorruptedKeysEntry ¶
CorruptedKeysEntry is a pair of shardID and corrupted keys
type CustomScannerConfig ¶
CustomScannerConfig is used to pass key/value parameters between shardscanner activity and scanner/fixer implementations. this is used to have activities with better consistency, as the workflow records one config and uses it for all shards, even after config / code changes.
It is currently only used to pass invariant names to control whether they are enabled or not. Please do not use this for other purposes.
type DomainFixReportQueryResult ¶ added in v0.21.0
type DomainFixReportQueryResult struct {
Reports []DomainFixStats
}
DomainFixReportQueryResult is the query result for DomainReportQuery in the fixer workflow
type DomainFixStats ¶ added in v0.21.0
DomainStats is the report of stats for one domain
type DomainReportQueryRequest ¶ added in v0.21.0
type DomainReportQueryRequest struct { // DomainID specifies a single domain for which the stats are requested // Setting to nil indicates stats for all domains are requested DomainID *string }
DomainReportQueryRequest is the request used for queries which return stats broken by domains
type DomainScanReportQueryResult ¶ added in v0.21.0
type DomainScanReportQueryResult struct {
Reports []DomainScanStats
}
DomainReportQueryResult is the query result for DomainReportQuery in the scanner workflow
type DomainScanStats ¶ added in v0.21.0
DomainStats is the report of stats for one domain
type DynamicParams ¶
type DynamicParams struct { ScannerEnabled dynamicconfig.BoolPropertyFn FixerEnabled dynamicconfig.BoolPropertyFn Concurrency dynamicconfig.IntPropertyFn PageSize dynamicconfig.IntPropertyFn BlobstoreFlushThreshold dynamicconfig.IntPropertyFn ActivityBatchSize dynamicconfig.IntPropertyFn AllowDomain dynamicconfig.BoolPropertyFnWithDomainFilter }
DynamicParams is the dynamic config for scanner workflow.
type FixKeys ¶
FixKeys are the keys to the blobs that were uploaded during fix. Keys can be nil if there were no uploads.
type FixReport ¶
type FixReport struct { ShardID int Stats FixStats Result FixResult DomainStats map[string]*FixStats }
FixReport is the report of running Fix on a single shard
type FixReportError ¶
FixReportError is a type that is used to send either error or report on a channel. Exactly one of Report and ErrorStr should be non-nil.
type FixResult ¶
type FixResult struct { ShardFixKeys *FixKeys ControlFlowFailure *ControlFlowFailure }
FixResult indicates the result of running fix on a shard. Exactly one of ControlFlowFailure or FixKeys will be non-nil.
type FixShardActivityParams ¶
type FixShardActivityParams struct { CorruptedKeysEntries []CorruptedKeysEntry ResolvedFixerWorkflowConfig ResolvedFixerWorkflowConfig // EnabledInvariants contains all known invariants for fixer, with "true" or "false" values. // In current code it should never be empty. // // If empty, EnabledInvariants came from old serialized data prior to this field existing, // and the historical list of invariants should be used. This should be a one-time event // after upgrading. EnabledInvariants CustomScannerConfig }
FixShardActivityParams is the parameter for fixShardActivity
type FixShardConfigParams ¶ added in v1.2.5
type FixShardConfigParams struct { }
type FixShardConfigResults ¶ added in v1.2.5
type FixShardConfigResults struct {
EnabledInvariants CustomScannerConfig
}
type FixShardHeartbeatDetails ¶
FixShardHeartbeatDetails is the heartbeat details for the fix shard
type Fixer ¶
type Fixer interface {
Fix() FixReport
}
Fixer is used to fix entities in a shard. It is responsible for three things: 1. Confirming that each entity it scans is corrupted. 2. Attempting to fix any confirmed corrupted executions. 3. Recording skipped entities, failed to fix entities and successfully fix entities to durable store. 4. Producing a FixReport
type FixerContext ¶
type FixerContext struct { Resource resource.Resource Hooks *FixerHooks Scope metrics.Scope Config *ScannerConfig Logger log.Logger }
FixerContext is the resource that is available to activities under ShardFixer key
func GetFixerContext ¶ added in v0.18.1
func GetFixerContext( ctx context.Context, ) (FixerContext, error)
GetFixerContext extracts fixer context from activity context it uses typed, private key to reduce access scope
func NewShardFixerContext ¶ added in v0.18.1
func NewShardFixerContext( res resource.Resource, config *ScannerConfig, ) FixerContext
NewShardFixerContext sets fixer context up
type FixerCorruptedKeysActivityParams ¶
type FixerCorruptedKeysActivityParams struct { ScannerWorkflowWorkflowID string ScannerWorkflowRunID string StartingShardID *int }
FixerCorruptedKeysActivityParams is the parameter for fixerCorruptedKeysActivity
type FixerCorruptedKeysActivityResult ¶
type FixerCorruptedKeysActivityResult struct { CorruptedKeys []CorruptedKeysEntry MinShard *int MaxShard *int ShardQueryPaginationToken ShardQueryPaginationToken }
FixerCorruptedKeysActivityResult is the result of fixerCorruptedKeysActivity
func GetCorruptedKeys ¶
func GetCorruptedKeys( ctx workflow.Context, params FixerWorkflowParams, ) (*FixerCorruptedKeysActivityResult, error)
GetCorruptedKeys is a workflow which is used to retrieve keys for fixer workflow.
type FixerHooks ¶
type FixerHooks struct { InvariantManager FixerManagerCB Iterator FixerIteratorCB GetFixerConfig func(fixer FixerContext) CustomScannerConfig }
FixerHooks holds callback functions for shard scanner workflow implementation.
func NewFixerHooks ¶
func NewFixerHooks( manager FixerManagerCB, iterator FixerIteratorCB, config func(fixer FixerContext) CustomScannerConfig, ) (*FixerHooks, error)
NewFixerHooks returns initialized callbacks for shard scanner workflow implementation.
type FixerIteratorCB ¶
type FixerIteratorCB func( context.Context, blobstore.Client, store.Keys, FixShardActivityParams, ) store.ScanOutputIterator
FixerIteratorCB is a function which returns ScanOutputIterator for fixer.
type FixerManagerCB ¶
type FixerManagerCB func( context.Context, persistence.Retryer, FixShardActivityParams, cache.DomainCache, ) invariant.Manager
FixerManagerCB is a function which returns invariant manager for fixer.
type FixerWorkflow ¶
type FixerWorkflow struct { Aggregator *ShardFixResultAggregator Params FixerWorkflowParams Keys *FixerCorruptedKeysActivityResult }
FixerWorkflow is the workflow that fixes all entities from a scan output.
func NewFixerWorkflow ¶
func NewFixerWorkflow( ctx workflow.Context, name string, params FixerWorkflowParams, ) (*FixerWorkflow, error)
NewFixerWorkflow returns a new instance of fixer workflow
type FixerWorkflowConfigOverwrites ¶
type FixerWorkflowConfigOverwrites struct { Concurrency *int BlobstoreFlushThreshold *int ActivityBatchSize *int }
FixerWorkflowConfigOverwrites enables overwriting the default values. If provided workflow will favor overwrites over defaults. Any overwrites that are left as nil will fall back to defaults.
type FixerWorkflowParams ¶
type FixerWorkflowParams struct { ScannerWorkflowWorkflowID string ScannerWorkflowRunID string FixerWorkflowConfigOverwrites FixerWorkflowConfigOverwrites }
FixerWorkflowParams are the parameters to the fix workflow
type GenericScannerConfig ¶
type GenericScannerConfig struct { Enabled bool Concurrency int PageSize int BlobstoreFlushThreshold int ActivityBatchSize int }
GenericScannerConfig is a generic params for all shard scanners
type GenericScannerConfigOverwrites ¶
type GenericScannerConfigOverwrites struct { Enabled *bool Concurrency *int PageSize *int BlobstoreFlushThreshold *int ActivityBatchSize *int }
GenericScannerConfigOverwrites allows to override generic params
type IteratorCB ¶
type IteratorCB func( context.Context, persistence.Retryer, ScanShardActivityParams, ) pagination.Iterator
IteratorCB is a function which returns iterator for scanner.
type ManagerCB ¶
type ManagerCB func( context.Context, persistence.Retryer, ScanShardActivityParams, cache.DomainCache, ) invariant.Manager
ManagerCB is a function which returns invariant manager for scanner.
type PaginatedShardQueryRequest ¶
type PaginatedShardQueryRequest struct { // StartingShardID is the first shard to start iteration from. // Setting to nil will start iteration from the beginning of the shards. StartingShardID *int // LimitShards indicates the maximum number of results that can be returned. // If nil or larger than allowed maximum, will default to maximum allowed. LimitShards *int }
PaginatedShardQueryRequest is the request used for queries which return results over all shards
type ResolvedFixerWorkflowConfig ¶
type ResolvedFixerWorkflowConfig struct { Concurrency int BlobstoreFlushThreshold int ActivityBatchSize int }
ResolvedFixerWorkflowConfig is the resolved config after reading defaults and applying overwrites.
type ResolvedScannerWorkflowConfig ¶
type ResolvedScannerWorkflowConfig struct { GenericScannerConfig GenericScannerConfig CustomScannerConfig CustomScannerConfig }
ResolvedScannerWorkflowConfig is the resolved config after reading dynamic config and applying overwrites.
type ScanKeys ¶
ScanKeys are the keys to the blobs that were uploaded during scan. Keys can be nil if there were no uploads.
type ScanReport ¶
type ScanReport struct { ShardID int Stats ScanStats Result ScanResult DomainStats map[string]*ScanStats }
ScanReport is the report of running Scan on a single shard.
type ScanReportError ¶
type ScanReportError struct { Reports []ScanReport ErrorStr *string }
ScanReportError is a type that is used to send either error or report on a channel. Exactly one of Report and ErrorStr should be non-nil.
type ScanResult ¶
type ScanResult struct { ShardScanKeys *ScanKeys ControlFlowFailure *ControlFlowFailure }
ScanResult indicates the result of running scan on a shard. Exactly one of ControlFlowFailure or ScanKeys will be non-nil
func (*ScanResult) Empty ¶ added in v1.2.7
func (s *ScanResult) Empty() bool
Empty returns true if this ScanResult has no "real" data, e.g. only nils or empty values.
type ScanShardActivityParams ¶
type ScanShardActivityParams struct { Shards []int PageSize int BlobstoreFlushThreshold int ScannerConfig CustomScannerConfig }
ScanShardActivityParams is the parameter for scanShardActivity
type ScanShardHeartbeatDetails ¶
type ScanShardHeartbeatDetails struct { LastShardIndexHandled int Reports []ScanReport }
ScanShardHeartbeatDetails is the heartbeat details for scan shard
type ScanStats ¶
type ScanStats struct { EntitiesCount int64 CorruptedCount int64 CheckFailedCount int64 CorruptionByType map[invariant.Name]int64 }
ScanStats indicates the stats of entities which were handled by shard Scan.
type Scanner ¶
type Scanner interface {
Scan() ScanReport
}
Scanner is used to scan over given iterator. It is responsible for three things: 1. Checking invariants for each entity. 2. Recording corruption and failures to durable store. 3. Producing a ScanReport
type ScannerConfig ¶
type ScannerConfig struct { ScannerWFTypeName string FixerWFTypeName string ScannerHooks func() *ScannerHooks FixerHooks func() *FixerHooks DynamicParams DynamicParams DynamicCollection *dynamicconfig.Collection StartWorkflowOptions client.StartWorkflowOptions StartFixerOptions client.StartWorkflowOptions }
ScannerConfig is the config for ShardScanner workflow
type ScannerConfigActivityParams ¶
type ScannerConfigActivityParams struct {
Overwrites ScannerWorkflowConfigOverwrites
}
ScannerConfigActivityParams is the parameter for scannerConfigActivity
type ScannerContext ¶ added in v1.2.5
type ScannerContext struct { Resource resource.Resource Hooks *ScannerHooks Scope metrics.Scope Config *ScannerConfig Logger log.Logger }
ScannerContext is the resource that is available in activities under ShardScanner context key
func GetScannerContext ¶ added in v0.18.1
func GetScannerContext( ctx context.Context, ) (ScannerContext, error)
GetScannerContext extracts scanner context from activity context
func NewShardScannerContext ¶ added in v0.18.1
func NewShardScannerContext( res resource.Resource, config *ScannerConfig, ) ScannerContext
NewShardScannerContext sets scanner context up
type ScannerEmitMetricsActivityParams ¶
type ScannerEmitMetricsActivityParams struct { ShardSuccessCount int ShardControlFlowFailureCount int AggregateReportResult AggregateScanReportResult ShardDistributionStats ShardDistributionStats }
ScannerEmitMetricsActivityParams is the parameter for scannerEmitMetricsActivity
type ScannerHooks ¶
type ScannerHooks struct { Manager ManagerCB Iterator IteratorCB GetScannerConfig func(scanner ScannerContext) CustomScannerConfig }
ScannerHooks allows provide manager and iterator for different types of scanners.
func NewScannerHooks ¶
func NewScannerHooks(manager ManagerCB, iterator IteratorCB, config func(scanner ScannerContext) CustomScannerConfig) (*ScannerHooks, error)
NewScannerHooks is used to have per scanner iterator and invariant manager
type ScannerWorkflow ¶
type ScannerWorkflow struct { Aggregator *ShardScanResultAggregator Params ScannerWorkflowParams Name string Shards []int }
ScannerWorkflow is a workflow which scans and checks entities in a shard.
func NewScannerWorkflow ¶
func NewScannerWorkflow( ctx workflow.Context, name string, params ScannerWorkflowParams, ) (*ScannerWorkflow, error)
NewScannerWorkflow creates instance of shard scanner
type ScannerWorkflowConfigOverwrites ¶
type ScannerWorkflowConfigOverwrites struct { GenericScannerConfig GenericScannerConfigOverwrites CustomScannerConfig *CustomScannerConfig }
ScannerWorkflowConfigOverwrites enables overwriting the values in dynamic config. If provided workflow will favor overwrites over dynamic config. Any overwrites that are left as nil will fall back to using dynamic config.
type ScannerWorkflowParams ¶
type ScannerWorkflowParams struct { Shards Shards ScannerWorkflowConfigOverwrites ScannerWorkflowConfigOverwrites }
ScannerWorkflowParams are the parameters to the scan workflow
type ShardCorruptKeysQueryResult ¶
type ShardCorruptKeysQueryResult struct { Result ShardCorruptKeysResult ShardQueryPaginationToken ShardQueryPaginationToken }
ShardCorruptKeysQueryResult is the query result for ShardCorruptKeysQuery
type ShardCorruptKeysResult ¶
ShardCorruptKeysResult is a map of all shards which have finished scan successfully and have at least one corruption
type ShardDistributionStats ¶
type ShardDistributionStats struct { Max int64 Median int64 Min int64 P90 int64 P75 int64 P25 int64 P10 int64 }
ShardDistributionStats contains stats on the distribution of executions in shards. It is used by the ScannerEmitMetricsActivityParams.
type ShardFixResultAggregator ¶
type ShardFixResultAggregator struct {
// contains filtered or unexported fields
}
ShardFixResultAggregator is used to keep aggregated fix metrics
func NewShardFixResultAggregator ¶
func NewShardFixResultAggregator( corruptKeys []CorruptedKeysEntry, minShard int, maxShard int, ) *ShardFixResultAggregator
NewShardFixResultAggregator returns an instance of ShardFixResultAggregator
func (*ShardFixResultAggregator) AddReport ¶
func (a *ShardFixResultAggregator) AddReport(report FixReport)
AddReport adds fix report for a shard.
func (*ShardFixResultAggregator) GetAggregation ¶
func (a *ShardFixResultAggregator) GetAggregation() AggregateFixReportResult
GetAggregation returns scan aggregation.
func (*ShardFixResultAggregator) GetAllFixResults ¶ added in v1.2.7
func (a *ShardFixResultAggregator) GetAllFixResults() (map[int]FixResult, error)
func (*ShardFixResultAggregator) GetDomainStatus ¶ added in v0.21.0
func (a *ShardFixResultAggregator) GetDomainStatus(req DomainReportQueryRequest) (*DomainFixReportQueryResult, error)
GetStatusResult returns stats broken by domain IDs
func (*ShardFixResultAggregator) GetReport ¶
func (a *ShardFixResultAggregator) GetReport(shardID int) (*FixReport, error)
GetReport returns fix report for a shard.
func (*ShardFixResultAggregator) GetStatusResult ¶
func (a *ShardFixResultAggregator) GetStatusResult(req PaginatedShardQueryRequest) (*ShardStatusQueryResult, error)
GetStatusResult returns paginated results for a range of shards
func (*ShardFixResultAggregator) GetStatusSummary ¶
func (a *ShardFixResultAggregator) GetStatusSummary() ShardStatusSummaryResult
GetStatusSummary returns fix status summary.
type ShardFixer ¶
type ShardFixer struct {
// contains filtered or unexported fields
}
ShardFixer is a generic fixer which iterates over entities provided by iterator implementations of this fixer have to provided invariant manager and iterator.
func NewFixer ¶
func NewFixer( ctx context.Context, shardID int, manager invariant.Manager, iterator store.ScanOutputIterator, blobstoreClient blobstore.Client, blobstoreFlushThreshold int, progressReportFn func(), domainCache cache.DomainCache, allowDomain dynamicconfig.BoolPropertyFnWithDomainFilter, scope metrics.Scope, ) *ShardFixer
NewFixer constructs a new shard fixer.
func (*ShardFixer) Fix ¶
func (f *ShardFixer) Fix() FixReport
Fix scans over all executions in shard and runs invariant fixes per execution.
type ShardQueryPaginationToken ¶
type ShardQueryPaginationToken struct { // NextShardID is one greater than the highest shard returned in the current query. // NextShardID is nil if IsDone is true. // It is possible to get NextShardID != nil and on the next call to get an empty result with IsDone = true. NextShardID *int IsDone bool }
ShardQueryPaginationToken is used to return information used to make the next query
type ShardRange ¶
ShardRange identifies a set of shards based on min (inclusive) and max (exclusive)
type ShardScanResultAggregator ¶
type ShardScanResultAggregator struct {
// contains filtered or unexported fields
}
ShardScanResultAggregator is used to keep aggregated scan metrics
func NewShardScanResultAggregator ¶
func NewShardScanResultAggregator( shards []int, minShard int, maxShard int, ) *ShardScanResultAggregator
NewShardScanResultAggregator returns aggregator for a scan result.
func (*ShardScanResultAggregator) AddReport ¶
func (a *ShardScanResultAggregator) AddReport(report ScanReport)
AddReport adds scan report for a shard.
func (*ShardScanResultAggregator) GetAggregateReport ¶
func (a *ShardScanResultAggregator) GetAggregateReport() AggregateScanReportResult
GetAggregateReport returns aggregated scan report.
func (*ShardScanResultAggregator) GetAllScanResults ¶ added in v1.2.7
func (a *ShardScanResultAggregator) GetAllScanResults() (map[int]ScanResult, error)
func (*ShardScanResultAggregator) GetCorruptionKeys ¶
func (a *ShardScanResultAggregator) GetCorruptionKeys(req PaginatedShardQueryRequest) (*ShardCorruptKeysQueryResult, error)
GetCorruptionKeys returns a list of corrupt keys
func (*ShardScanResultAggregator) GetDomainStatus ¶ added in v0.21.0
func (a *ShardScanResultAggregator) GetDomainStatus(req DomainReportQueryRequest) (*DomainScanReportQueryResult, error)
GetDomainStatus returns stats broken by domain IDs
func (*ShardScanResultAggregator) GetReport ¶
func (a *ShardScanResultAggregator) GetReport(shardID int) (*ScanReport, error)
GetReport returns a report for a single shard.
func (*ShardScanResultAggregator) GetShardDistributionStats ¶
func (a *ShardScanResultAggregator) GetShardDistributionStats() ShardDistributionStats
GetShardDistributionStats returns aggregated size statistics
func (*ShardScanResultAggregator) GetShardSizeQueryResult ¶
func (a *ShardScanResultAggregator) GetShardSizeQueryResult(req ShardSizeQueryRequest) (ShardSizeQueryResult, error)
GetShardSizeQueryResult returns shard size statistics.
func (*ShardScanResultAggregator) GetStatusResult ¶
func (a *ShardScanResultAggregator) GetStatusResult(req PaginatedShardQueryRequest) (*ShardStatusQueryResult, error)
GetStatusResult returns scan status for a range of shards.
func (*ShardScanResultAggregator) GetStatusSummary ¶
func (a *ShardScanResultAggregator) GetStatusSummary() ShardStatusSummaryResult
GetStatusSummary returns scan status summary.
type ShardScanner ¶
type ShardScanner struct {
// contains filtered or unexported fields
}
ShardScanner is a generic scanner which iterates over entities provided by iterator implementations of this scanner have to provided invariant manager and iterator
func NewScanner ¶
func NewScanner( shardID int, iterator pagination.Iterator, blobstoreClient blobstore.Client, blobstoreFlushThreshold int, manager invariant.Manager, progressReportFn func(), scope metrics.Scope, domainCache cache.DomainCache, ) *ShardScanner
NewScanner constructs a new ShardScanner
func (*ShardScanner) Scan ¶
func (s *ShardScanner) Scan(ctx context.Context) ScanReport
Scan scans over all executions in shard and runs invariant checks per execution.
type ShardSizeQueryRequest ¶
ShardSizeQueryRequest is the request used for ShardSizeQuery. The following must be true: 0 <= StartIndex < EndIndex <= len(shards successfully finished) The following must be true: EndIndex - StartIndex <= maxShardQueryResult. StartIndex is inclusive, EndIndex is exclusive.
type ShardSizeQueryResult ¶
type ShardSizeQueryResult []ShardSizeTuple
ShardSizeQueryResult is the result from ShardSizeQuery. Contains sorted list of shards, sorted by the number of executions per shard.
type ShardSizeTuple ¶
ShardSizeTuple indicates the size and sorted index of a single shard
type ShardStatus ¶
type ShardStatus string
ShardStatus is the type which indicates the status of a shard scan.
const ( // ShardStatusSuccess indicates the scan on the shard ran successfully ShardStatusSuccess ShardStatus = "success" // ShardStatusControlFlowFailure indicates the scan on the shard failed ShardStatusControlFlowFailure ShardStatus = "control_flow_failure" // ShardStatusRunning indicates the shard has not completed yet ShardStatusRunning ShardStatus = "running" )
type ShardStatusQueryResult ¶
type ShardStatusQueryResult struct { Result ShardStatusResult ShardQueryPaginationToken ShardQueryPaginationToken }
ShardStatusQueryResult is the query result for ShardStatusQuery
type ShardStatusResult ¶
type ShardStatusResult map[int]ShardStatus
ShardStatusResult indicates the status for all shards
type ShardStatusSummaryResult ¶
type ShardStatusSummaryResult map[ShardStatus]int
ShardStatusSummaryResult indicates the counts of shards in each status