shardscanner

package
v1.2.15-prerelease11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ActivityScannerEmitMetrics is the activity name for scannerEmitMetricsActivity
	ActivityScannerEmitMetrics = "cadence-sys-shardscanner-emit-metrics-activity"
	// ActivityScannerConfig is the activity name scannerConfigActivity
	ActivityScannerConfig = "cadence-sys-shardscanner-config-activity"
	// ActivityFixerConfig is the activity name fixerConfigActivity
	ActivityFixerConfig = "cadence-sys-shardscanner-fixer-config-activity"
	// ActivityScanShard is the activity name for scanShardActivity
	ActivityScanShard = "cadence-sys-shardscanner-scanshard-activity"
	// ActivityFixerCorruptedKeys is the activity name for fixerCorruptedKeysActivity
	ActivityFixerCorruptedKeys = "cadence-sys-shardscanner-corruptedkeys-activity"
	// ActivityFixShard is the activity name for fixShardActivity
	ActivityFixShard = "cadence-sys-shardscanner-fixshard-activity"
	// ShardCorruptKeysQuery is the query name for the query used to get all completed shards with at least one corruption
	ShardCorruptKeysQuery = "shard_corrupt_keys"
)
View Source
const (
	// ShardReportQuery is the query name for the query used to get a single shard's report
	ShardReportQuery = "shard_report"
	// ShardStatusQuery is the query name for the query used to get the status of all shards
	ShardStatusQuery = "shard_status"
	// ShardStatusSummaryQuery is the query name for the query used to get the shard status -> counts map
	ShardStatusSummaryQuery = "shard_status_summary"
	// AggregateReportQuery is the query name for the query used to get the aggregate result of all finished shards
	AggregateReportQuery = "aggregate_report"
	// ShardSizeQuery is the query name for the query used to get the number of executions per shard in sorted order
	ShardSizeQuery = "shard_size"
	// DomainReportQuery is the query name for the query used to get the reports per domains for all finished shards
	DomainReportQuery = "domain_report"
	// AllResultsQuery returns filenames / paginating data for corruptions and failures in this workflow,
	// for shards which have finished processing.  This works for both scanner and fixer, and the return structures
	// are very similar.
	//
	// This data is also available for a single shard under ShardReportQuery, but using that requires
	// re-querying repeatedly if more than that single shard's data is desired, e.g. for manual
	// troubleshooting purposes.
	AllResultsQuery = "all_results"
)
View Source
const (
	// ErrScanWorkflowNotClosed indicates fix was attempted on scan workflow which was not finished
	ErrScanWorkflowNotClosed = "scan workflow is not closed, only can run fix on output of finished scan workflow"
	// ErrSerialization indicates a serialization or deserialization error occurred
	ErrSerialization = "encountered serialization error"
	// ErrMissingHooks indicates scanner is not providing hooks to Invariant manager or Iterator
	ErrMissingHooks = "hooks are not provided for this scanner"
)

Variables

This section is empty.

Functions

func NewFixerContext added in v0.18.1

func NewFixerContext(
	ctx context.Context,
	workflowName string,
	fixerContext FixerContext,
) context.Context

NewContext provides context to be used as background activity context

func NewScannerContext added in v0.18.1

func NewScannerContext(
	ctx context.Context,
	workflowName string,
	scannerContext ScannerContext,
) context.Context

NewContext provides context to be used as background activity context

Types

type AggregateFixReportResult

type AggregateFixReportResult FixStats

AggregateFixReportResult indicates the result of summing together all shard reports that have finished for fix.

type AggregateScanReportResult

type AggregateScanReportResult ScanStats

AggregateScanReportResult indicates the result of summing together all shard reports which have finished scan.

type ControlFlowFailure

type ControlFlowFailure struct {
	Info        string
	InfoDetails string
}

ControlFlowFailure indicates an error occurred which makes it impossible to even attempt to check or fix one or more execution(s). Note that it is not a ControlFlowFailure if a check or fix fails, it is only a ControlFlowFailure if an error is encountered which makes even attempting to check or fix impossible.

type CorruptedKeysEntry

type CorruptedKeysEntry struct {
	ShardID       int
	CorruptedKeys store.Keys
}

CorruptedKeysEntry is a pair of shardID and corrupted keys

type CustomScannerConfig

type CustomScannerConfig map[string]string

CustomScannerConfig is used to pass key/value parameters between shardscanner activity and scanner/fixer implementations. this is used to have activities with better consistency, as the workflow records one config and uses it for all shards, even after config / code changes.

It is currently only used to pass invariant names to control whether they are enabled or not. Please do not use this for other purposes.

type DomainFixReportQueryResult added in v0.21.0

type DomainFixReportQueryResult struct {
	Reports []DomainFixStats
}

DomainFixReportQueryResult is the query result for DomainReportQuery in the fixer workflow

type DomainFixStats added in v0.21.0

type DomainFixStats struct {
	DomainID string
	Stats    FixStats
}

DomainStats is the report of stats for one domain

type DomainReportQueryRequest added in v0.21.0

type DomainReportQueryRequest struct {
	// DomainID specifies a single domain for which the stats are requested
	// Setting to nil indicates stats for all domains are requested
	DomainID *string
}

DomainReportQueryRequest is the request used for queries which return stats broken by domains

type DomainScanReportQueryResult added in v0.21.0

type DomainScanReportQueryResult struct {
	Reports []DomainScanStats
}

DomainReportQueryResult is the query result for DomainReportQuery in the scanner workflow

type DomainScanStats added in v0.21.0

type DomainScanStats struct {
	DomainID string
	Stats    ScanStats
}

DomainStats is the report of stats for one domain

type DynamicParams

type DynamicParams struct {
	ScannerEnabled          dynamicconfig.BoolPropertyFn
	FixerEnabled            dynamicconfig.BoolPropertyFn
	Concurrency             dynamicconfig.IntPropertyFn
	PageSize                dynamicconfig.IntPropertyFn
	BlobstoreFlushThreshold dynamicconfig.IntPropertyFn
	ActivityBatchSize       dynamicconfig.IntPropertyFn
	AllowDomain             dynamicconfig.BoolPropertyFnWithDomainFilter
}

DynamicParams is the dynamic config for scanner workflow.

type FixKeys

type FixKeys struct {
	Skipped *store.Keys
	Failed  *store.Keys
	Fixed   *store.Keys
}

FixKeys are the keys to the blobs that were uploaded during fix. Keys can be nil if there were no uploads.

type FixReport

type FixReport struct {
	ShardID     int
	Stats       FixStats
	Result      FixResult
	DomainStats map[string]*FixStats
}

FixReport is the report of running Fix on a single shard

type FixReportError

type FixReportError struct {
	Reports  []FixReport
	ErrorStr *string
}

FixReportError is a type that is used to send either error or report on a channel. Exactly one of Report and ErrorStr should be non-nil.

type FixResult

type FixResult struct {
	ShardFixKeys       *FixKeys
	ControlFlowFailure *ControlFlowFailure
}

FixResult indicates the result of running fix on a shard. Exactly one of ControlFlowFailure or FixKeys will be non-nil.

func (*FixResult) Empty added in v1.2.7

func (f *FixResult) Empty() bool

Empty returns true if this FixResult has no "real" data, e.g. only nils or empty values.

type FixShardActivityParams

type FixShardActivityParams struct {
	CorruptedKeysEntries        []CorruptedKeysEntry
	ResolvedFixerWorkflowConfig ResolvedFixerWorkflowConfig

	// EnabledInvariants contains all known invariants for fixer, with "true" or "false" values.
	// In current code it should never be empty.
	//
	// If empty, EnabledInvariants came from old serialized data prior to this field existing,
	// and the historical list of invariants should be used.  This should be a one-time event
	// after upgrading.
	EnabledInvariants CustomScannerConfig
}

FixShardActivityParams is the parameter for fixShardActivity

type FixShardConfigParams added in v1.2.5

type FixShardConfigParams struct {
}

type FixShardConfigResults added in v1.2.5

type FixShardConfigResults struct {
	EnabledInvariants CustomScannerConfig
}

type FixShardHeartbeatDetails

type FixShardHeartbeatDetails struct {
	LastShardIndexHandled int
	Reports               []FixReport
}

FixShardHeartbeatDetails is the heartbeat details for the fix shard

type FixStats

type FixStats struct {
	EntitiesCount int64
	FixedCount    int64
	SkippedCount  int64
	FailedCount   int64
}

FixStats indicates the stats of executions that were handled by shard Fix.

type Fixer

type Fixer interface {
	Fix() FixReport
}

Fixer is used to fix entities in a shard. It is responsible for three things: 1. Confirming that each entity it scans is corrupted. 2. Attempting to fix any confirmed corrupted executions. 3. Recording skipped entities, failed to fix entities and successfully fix entities to durable store. 4. Producing a FixReport

type FixerContext

type FixerContext struct {
	Resource resource.Resource
	Hooks    *FixerHooks
	Scope    metrics.Scope
	Config   *ScannerConfig
	Logger   log.Logger
}

FixerContext is the resource that is available to activities under ShardFixer key

func GetFixerContext added in v0.18.1

func GetFixerContext(
	ctx context.Context,
) (FixerContext, error)

GetFixerContext extracts fixer context from activity context it uses typed, private key to reduce access scope

func NewShardFixerContext added in v0.18.1

func NewShardFixerContext(
	res resource.Resource,
	config *ScannerConfig,
) FixerContext

NewShardFixerContext sets fixer context up

type FixerCorruptedKeysActivityParams

type FixerCorruptedKeysActivityParams struct {
	ScannerWorkflowWorkflowID string
	ScannerWorkflowRunID      string
	StartingShardID           *int
}

FixerCorruptedKeysActivityParams is the parameter for fixerCorruptedKeysActivity

type FixerCorruptedKeysActivityResult

type FixerCorruptedKeysActivityResult struct {
	CorruptedKeys             []CorruptedKeysEntry
	MinShard                  *int
	MaxShard                  *int
	ShardQueryPaginationToken ShardQueryPaginationToken
}

FixerCorruptedKeysActivityResult is the result of fixerCorruptedKeysActivity

func GetCorruptedKeys

func GetCorruptedKeys(
	ctx workflow.Context,
	params FixerWorkflowParams,
) (*FixerCorruptedKeysActivityResult, error)

GetCorruptedKeys is a workflow which is used to retrieve keys for fixer workflow.

type FixerHooks

type FixerHooks struct {
	InvariantManager FixerManagerCB
	Iterator         FixerIteratorCB
	GetFixerConfig   func(fixer FixerContext) CustomScannerConfig
}

FixerHooks holds callback functions for shard scanner workflow implementation.

func NewFixerHooks

func NewFixerHooks(
	manager FixerManagerCB,
	iterator FixerIteratorCB,
	config func(fixer FixerContext) CustomScannerConfig,
) (*FixerHooks, error)

NewFixerHooks returns initialized callbacks for shard scanner workflow implementation.

type FixerIteratorCB

FixerIteratorCB is a function which returns ScanOutputIterator for fixer.

type FixerManagerCB

FixerManagerCB is a function which returns invariant manager for fixer.

type FixerWorkflow

type FixerWorkflow struct {
	Aggregator *ShardFixResultAggregator
	Params     FixerWorkflowParams
	Keys       *FixerCorruptedKeysActivityResult
}

FixerWorkflow is the workflow that fixes all entities from a scan output.

func NewFixerWorkflow

func NewFixerWorkflow(
	ctx workflow.Context,
	name string,
	params FixerWorkflowParams,
) (*FixerWorkflow, error)

NewFixerWorkflow returns a new instance of fixer workflow

func (*FixerWorkflow) Start

func (fx *FixerWorkflow) Start(ctx workflow.Context) error

Start starts a shard fixer workflow.

type FixerWorkflowConfigOverwrites

type FixerWorkflowConfigOverwrites struct {
	Concurrency             *int
	BlobstoreFlushThreshold *int
	ActivityBatchSize       *int
}

FixerWorkflowConfigOverwrites enables overwriting the default values. If provided workflow will favor overwrites over defaults. Any overwrites that are left as nil will fall back to defaults.

type FixerWorkflowParams

type FixerWorkflowParams struct {
	ScannerWorkflowWorkflowID     string
	ScannerWorkflowRunID          string
	FixerWorkflowConfigOverwrites FixerWorkflowConfigOverwrites
}

FixerWorkflowParams are the parameters to the fix workflow

type GenericScannerConfig

type GenericScannerConfig struct {
	Enabled                 bool
	Concurrency             int
	PageSize                int
	BlobstoreFlushThreshold int
	ActivityBatchSize       int
}

GenericScannerConfig is a generic params for all shard scanners

type GenericScannerConfigOverwrites

type GenericScannerConfigOverwrites struct {
	Enabled                 *bool
	Concurrency             *int
	PageSize                *int
	BlobstoreFlushThreshold *int
	ActivityBatchSize       *int
}

GenericScannerConfigOverwrites allows to override generic params

type IteratorCB

IteratorCB is a function which returns iterator for scanner.

type ManagerCB

ManagerCB is a function which returns invariant manager for scanner.

type PaginatedShardQueryRequest

type PaginatedShardQueryRequest struct {
	// StartingShardID is the first shard to start iteration from.
	// Setting to nil will start iteration from the beginning of the shards.
	StartingShardID *int
	// LimitShards indicates the maximum number of results that can be returned.
	// If nil or larger than allowed maximum, will default to maximum allowed.
	LimitShards *int
}

PaginatedShardQueryRequest is the request used for queries which return results over all shards

type ResolvedFixerWorkflowConfig

type ResolvedFixerWorkflowConfig struct {
	Concurrency             int
	BlobstoreFlushThreshold int
	ActivityBatchSize       int
}

ResolvedFixerWorkflowConfig is the resolved config after reading defaults and applying overwrites.

type ResolvedScannerWorkflowConfig

type ResolvedScannerWorkflowConfig struct {
	GenericScannerConfig GenericScannerConfig
	CustomScannerConfig  CustomScannerConfig
}

ResolvedScannerWorkflowConfig is the resolved config after reading dynamic config and applying overwrites.

type ScanKeys

type ScanKeys struct {
	Corrupt *store.Keys
	Failed  *store.Keys
}

ScanKeys are the keys to the blobs that were uploaded during scan. Keys can be nil if there were no uploads.

type ScanReport

type ScanReport struct {
	ShardID     int
	Stats       ScanStats
	Result      ScanResult
	DomainStats map[string]*ScanStats
}

ScanReport is the report of running Scan on a single shard.

type ScanReportError

type ScanReportError struct {
	Reports  []ScanReport
	ErrorStr *string
}

ScanReportError is a type that is used to send either error or report on a channel. Exactly one of Report and ErrorStr should be non-nil.

type ScanResult

type ScanResult struct {
	ShardScanKeys      *ScanKeys
	ControlFlowFailure *ControlFlowFailure
}

ScanResult indicates the result of running scan on a shard. Exactly one of ControlFlowFailure or ScanKeys will be non-nil

func (*ScanResult) Empty added in v1.2.7

func (s *ScanResult) Empty() bool

Empty returns true if this ScanResult has no "real" data, e.g. only nils or empty values.

type ScanShardActivityParams

type ScanShardActivityParams struct {
	Shards                  []int
	PageSize                int
	BlobstoreFlushThreshold int
	ScannerConfig           CustomScannerConfig
}

ScanShardActivityParams is the parameter for scanShardActivity

type ScanShardHeartbeatDetails

type ScanShardHeartbeatDetails struct {
	LastShardIndexHandled int
	Reports               []ScanReport
}

ScanShardHeartbeatDetails is the heartbeat details for scan shard

type ScanStats

type ScanStats struct {
	EntitiesCount    int64
	CorruptedCount   int64
	CheckFailedCount int64
	CorruptionByType map[invariant.Name]int64
}

ScanStats indicates the stats of entities which were handled by shard Scan.

type Scanner

type Scanner interface {
	Scan() ScanReport
}

Scanner is used to scan over given iterator. It is responsible for three things: 1. Checking invariants for each entity. 2. Recording corruption and failures to durable store. 3. Producing a ScanReport

type ScannerConfig

type ScannerConfig struct {
	ScannerWFTypeName    string
	FixerWFTypeName      string
	ScannerHooks         func() *ScannerHooks
	FixerHooks           func() *FixerHooks
	DynamicParams        DynamicParams
	DynamicCollection    *dynamicconfig.Collection
	StartWorkflowOptions client.StartWorkflowOptions
	StartFixerOptions    client.StartWorkflowOptions
}

ScannerConfig is the config for ShardScanner workflow

type ScannerConfigActivityParams

type ScannerConfigActivityParams struct {
	Overwrites ScannerWorkflowConfigOverwrites
}

ScannerConfigActivityParams is the parameter for scannerConfigActivity

type ScannerContext added in v1.2.5

type ScannerContext struct {
	Resource resource.Resource
	Hooks    *ScannerHooks
	Scope    metrics.Scope
	Config   *ScannerConfig
	Logger   log.Logger
}

ScannerContext is the resource that is available in activities under ShardScanner context key

func GetScannerContext added in v0.18.1

func GetScannerContext(
	ctx context.Context,
) (ScannerContext, error)

GetScannerContext extracts scanner context from activity context

func NewShardScannerContext added in v0.18.1

func NewShardScannerContext(
	res resource.Resource,
	config *ScannerConfig,
) ScannerContext

NewShardScannerContext sets scanner context up

type ScannerEmitMetricsActivityParams

type ScannerEmitMetricsActivityParams struct {
	ShardSuccessCount            int
	ShardControlFlowFailureCount int
	AggregateReportResult        AggregateScanReportResult
	ShardDistributionStats       ShardDistributionStats
}

ScannerEmitMetricsActivityParams is the parameter for scannerEmitMetricsActivity

type ScannerHooks

type ScannerHooks struct {
	Manager          ManagerCB
	Iterator         IteratorCB
	GetScannerConfig func(scanner ScannerContext) CustomScannerConfig
}

ScannerHooks allows provide manager and iterator for different types of scanners.

func NewScannerHooks

func NewScannerHooks(manager ManagerCB, iterator IteratorCB, config func(scanner ScannerContext) CustomScannerConfig) (*ScannerHooks, error)

NewScannerHooks is used to have per scanner iterator and invariant manager

type ScannerWorkflow

type ScannerWorkflow struct {
	Aggregator *ShardScanResultAggregator
	Params     ScannerWorkflowParams
	Name       string
	Shards     []int
}

ScannerWorkflow is a workflow which scans and checks entities in a shard.

func NewScannerWorkflow

func NewScannerWorkflow(
	ctx workflow.Context,
	name string,
	params ScannerWorkflowParams,
) (*ScannerWorkflow, error)

NewScannerWorkflow creates instance of shard scanner

func (*ScannerWorkflow) Start

func (wf *ScannerWorkflow) Start(ctx workflow.Context) error

Start starts a shard scanner workflow.

type ScannerWorkflowConfigOverwrites

type ScannerWorkflowConfigOverwrites struct {
	GenericScannerConfig GenericScannerConfigOverwrites
	CustomScannerConfig  *CustomScannerConfig
}

ScannerWorkflowConfigOverwrites enables overwriting the values in dynamic config. If provided workflow will favor overwrites over dynamic config. Any overwrites that are left as nil will fall back to using dynamic config.

type ScannerWorkflowParams

type ScannerWorkflowParams struct {
	Shards                          Shards
	ScannerWorkflowConfigOverwrites ScannerWorkflowConfigOverwrites
}

ScannerWorkflowParams are the parameters to the scan workflow

type ShardCorruptKeysQueryResult

type ShardCorruptKeysQueryResult struct {
	Result                    ShardCorruptKeysResult
	ShardQueryPaginationToken ShardQueryPaginationToken
}

ShardCorruptKeysQueryResult is the query result for ShardCorruptKeysQuery

type ShardCorruptKeysResult

type ShardCorruptKeysResult map[int]store.Keys

ShardCorruptKeysResult is a map of all shards which have finished scan successfully and have at least one corruption

type ShardDistributionStats

type ShardDistributionStats struct {
	Max    int64
	Median int64
	Min    int64
	P90    int64
	P75    int64
	P25    int64
	P10    int64
}

ShardDistributionStats contains stats on the distribution of executions in shards. It is used by the ScannerEmitMetricsActivityParams.

type ShardFixResultAggregator

type ShardFixResultAggregator struct {
	// contains filtered or unexported fields
}

ShardFixResultAggregator is used to keep aggregated fix metrics

func NewShardFixResultAggregator

func NewShardFixResultAggregator(
	corruptKeys []CorruptedKeysEntry,
	minShard int,
	maxShard int,
) *ShardFixResultAggregator

NewShardFixResultAggregator returns an instance of ShardFixResultAggregator

func (*ShardFixResultAggregator) AddReport

func (a *ShardFixResultAggregator) AddReport(report FixReport)

AddReport adds fix report for a shard.

func (*ShardFixResultAggregator) GetAggregation

GetAggregation returns scan aggregation.

func (*ShardFixResultAggregator) GetAllFixResults added in v1.2.7

func (a *ShardFixResultAggregator) GetAllFixResults() (map[int]FixResult, error)

func (*ShardFixResultAggregator) GetDomainStatus added in v0.21.0

GetStatusResult returns stats broken by domain IDs

func (*ShardFixResultAggregator) GetReport

func (a *ShardFixResultAggregator) GetReport(shardID int) (*FixReport, error)

GetReport returns fix report for a shard.

func (*ShardFixResultAggregator) GetStatusResult

GetStatusResult returns paginated results for a range of shards

func (*ShardFixResultAggregator) GetStatusSummary

func (a *ShardFixResultAggregator) GetStatusSummary() ShardStatusSummaryResult

GetStatusSummary returns fix status summary.

type ShardFixer

type ShardFixer struct {
	// contains filtered or unexported fields
}

ShardFixer is a generic fixer which iterates over entities provided by iterator implementations of this fixer have to provided invariant manager and iterator.

func NewFixer

func NewFixer(
	ctx context.Context,
	shardID int,
	manager invariant.Manager,
	iterator store.ScanOutputIterator,
	blobstoreClient blobstore.Client,
	blobstoreFlushThreshold int,
	progressReportFn func(),
	domainCache cache.DomainCache,
	allowDomain dynamicconfig.BoolPropertyFnWithDomainFilter,
	scope metrics.Scope,
) *ShardFixer

NewFixer constructs a new shard fixer.

func (*ShardFixer) Fix

func (f *ShardFixer) Fix() FixReport

Fix scans over all executions in shard and runs invariant fixes per execution.

type ShardQueryPaginationToken

type ShardQueryPaginationToken struct {
	// NextShardID is one greater than the highest shard returned in the current query.
	// NextShardID is nil if IsDone is true.
	// It is possible to get NextShardID != nil and on the next call to get an empty result with IsDone = true.
	NextShardID *int
	IsDone      bool
}

ShardQueryPaginationToken is used to return information used to make the next query

type ShardRange

type ShardRange struct {
	Min int
	Max int
}

ShardRange identifies a set of shards based on min (inclusive) and max (exclusive)

type ShardScanResultAggregator

type ShardScanResultAggregator struct {
	// contains filtered or unexported fields
}

ShardScanResultAggregator is used to keep aggregated scan metrics

func NewShardScanResultAggregator

func NewShardScanResultAggregator(
	shards []int,
	minShard int,
	maxShard int,
) *ShardScanResultAggregator

NewShardScanResultAggregator returns aggregator for a scan result.

func (*ShardScanResultAggregator) AddReport

func (a *ShardScanResultAggregator) AddReport(report ScanReport)

AddReport adds scan report for a shard.

func (*ShardScanResultAggregator) GetAggregateReport

func (a *ShardScanResultAggregator) GetAggregateReport() AggregateScanReportResult

GetAggregateReport returns aggregated scan report.

func (*ShardScanResultAggregator) GetAllScanResults added in v1.2.7

func (a *ShardScanResultAggregator) GetAllScanResults() (map[int]ScanResult, error)

func (*ShardScanResultAggregator) GetCorruptionKeys

GetCorruptionKeys returns a list of corrupt keys

func (*ShardScanResultAggregator) GetDomainStatus added in v0.21.0

GetDomainStatus returns stats broken by domain IDs

func (*ShardScanResultAggregator) GetReport

func (a *ShardScanResultAggregator) GetReport(shardID int) (*ScanReport, error)

GetReport returns a report for a single shard.

func (*ShardScanResultAggregator) GetShardDistributionStats

func (a *ShardScanResultAggregator) GetShardDistributionStats() ShardDistributionStats

GetShardDistributionStats returns aggregated size statistics

func (*ShardScanResultAggregator) GetShardSizeQueryResult

GetShardSizeQueryResult returns shard size statistics.

func (*ShardScanResultAggregator) GetStatusResult

GetStatusResult returns scan status for a range of shards.

func (*ShardScanResultAggregator) GetStatusSummary

GetStatusSummary returns scan status summary.

type ShardScanner

type ShardScanner struct {
	// contains filtered or unexported fields
}

ShardScanner is a generic scanner which iterates over entities provided by iterator implementations of this scanner have to provided invariant manager and iterator

func NewScanner

func NewScanner(
	shardID int,
	iterator pagination.Iterator,
	blobstoreClient blobstore.Client,
	blobstoreFlushThreshold int,
	manager invariant.Manager,
	progressReportFn func(),
	scope metrics.Scope,
	domainCache cache.DomainCache,
) *ShardScanner

NewScanner constructs a new ShardScanner

func (*ShardScanner) Scan

func (s *ShardScanner) Scan(ctx context.Context) ScanReport

Scan scans over all executions in shard and runs invariant checks per execution.

type ShardSizeQueryRequest

type ShardSizeQueryRequest struct {
	StartIndex int
	EndIndex   int
}

ShardSizeQueryRequest is the request used for ShardSizeQuery. The following must be true: 0 <= StartIndex < EndIndex <= len(shards successfully finished) The following must be true: EndIndex - StartIndex <= maxShardQueryResult. StartIndex is inclusive, EndIndex is exclusive.

type ShardSizeQueryResult

type ShardSizeQueryResult []ShardSizeTuple

ShardSizeQueryResult is the result from ShardSizeQuery. Contains sorted list of shards, sorted by the number of executions per shard.

type ShardSizeTuple

type ShardSizeTuple struct {
	ShardID       int
	EntitiesCount int64
}

ShardSizeTuple indicates the size and sorted index of a single shard

type ShardStatus

type ShardStatus string

ShardStatus is the type which indicates the status of a shard scan.

const (

	// ShardStatusSuccess indicates the scan on the shard ran successfully
	ShardStatusSuccess ShardStatus = "success"
	// ShardStatusControlFlowFailure indicates the scan on the shard failed
	ShardStatusControlFlowFailure ShardStatus = "control_flow_failure"
	// ShardStatusRunning indicates the shard has not completed yet
	ShardStatusRunning ShardStatus = "running"
)

type ShardStatusQueryResult

type ShardStatusQueryResult struct {
	Result                    ShardStatusResult
	ShardQueryPaginationToken ShardQueryPaginationToken
}

ShardStatusQueryResult is the query result for ShardStatusQuery

type ShardStatusResult

type ShardStatusResult map[int]ShardStatus

ShardStatusResult indicates the status for all shards

type ShardStatusSummaryResult

type ShardStatusSummaryResult map[ShardStatus]int

ShardStatusSummaryResult indicates the counts of shards in each status

type Shards

type Shards struct {
	List  []int
	Range *ShardRange
}

Shards identify the shards that should be scanned. Exactly one of List or Range should be non-nil.

func (Shards) Flatten

func (s Shards) Flatten() ([]int, int, int)

Flatten flattens Shards to a list of shard IDs and finds the min/max shardID

func (Shards) Validate

func (s Shards) Validate() error

Validate validates shard list or range

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL