reindex

package
v0.0.0-...-89602ce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2020 License: BSD-3-Clause Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// StatusAvailable indicates the job is not in progress and has not been successfully completed.
	// Available jobs will be considered "errored" when attempts >= max attempts.
	StatusAvailable Status = "available"
	// StatusInProgress indicates a job is being processed.
	// In-progress jobs can be claimed by a new caller if last_status_change is more than defaultTimeout ago.
	StatusInProgress Status = "in_progress"
	// StatusComplete indicates a job has been completed successfully.
	StatusComplete Status = "complete"

	// DefaultMaxAttempts is the default max number of attempts at a reindex job before it's considered failed.
	DefaultMaxAttempts uint = 3
)

Variables

View Source
var (
	// TestHookGet is an empty hook function for test coordination.
	// This should only be set by test code.
	TestHookGet = func() {}
)
View Source
var (
	// TestHookReindexComplete is a function called after each reindex Run() completes.
	// This should only be set by test code.
	TestHookReindexComplete = func() {}
)

Functions

func EqualVersions

func EqualVersions(a []*indexer.Versions, b []*indexer.Versions) bool

EqualVersions returns true iff the slices are equal. Assumes the slices are already sorted. Any nil elements results in false.

func GetError

func GetError(queue JobQueue, indexerID string) (string, error)

GetError returns the job error for a particular reindex job. A job only returns an error when its been attempted at least the max number of attempts.

func GetErrors

func GetErrors(queue JobQueue) (map[string]string, error)

GetErrors returns all job errors, keyed by indexer ID. A job only returns an error when its been attempted at least the max number of attempts.

func GetIndexerVersion

func GetIndexerVersion(queue JobQueue, indexerID string) (*indexer.Versions, error)

GetIndexerVersion gets the tracked indexer versions for an indexer ID. Returns nil if not found.

func GetStatuses

func GetStatuses(queue JobQueue) (map[string]Status, error)

GetStatuses returns all job statuses, keyed by indexer ID.

Types

type Error

type Error string
const (
	// ErrDefault is the default error reported for reindex failure.
	ErrDefault Error = "state reindex error"
	// ErrReindexPerState indicates a Reindex error occurred for specific keys. Not included in any job errors.
	ErrReindexPerState Error = "reindex error: per-state errors"

	// ErrPrepare is included in job error when error source is indexer PrepareReindex call.
	ErrPrepare Error = "state reindex error: error from PrepareReindex"
	// ErrReindex is included in job error when error source is indexer Reindex call.
	ErrReindex Error = "state reindex error: error from Reindex"
	// ErrComplete is included in job error when error source is indexer CompleteReindex call.
	ErrComplete Error = "state reindex error: error from CompleteReindex"
)

type Job

type Job struct {
	Idx  indexer.Indexer
	From indexer.Version
	To   indexer.Version
}

Job required to carry out a reindex job.

func (*Job) String

func (j *Job) String() string

type JobInfo

type JobInfo struct {
	IndexerID string
	Status    Status
	Error     string
	Attempts  uint
}

JobInfo provides information about a job's progress.

type JobQueue

type JobQueue interface {
	// Initialize the queue.
	// Call before other methods.
	Initialize() error

	// PopulateJobs populates the queue with the necessary jobs read from the indexer registry.
	// Returns true if job queue was updated with new jobs.
	PopulateJobs() (bool, error)

	// ClaimAvailableJob claims the returned indexer, to safely perform reindexing operations.
	// Returns nil if no job available.
	ClaimAvailableJob() (*Job, error)

	// CompleteJob indicates completion of the reindexing operation and returns ownership of the job to the queue.
	CompleteJob(job *Job, withErr error) error

	// GetJobInfos provides full information about job progress, keyed by indexer ID.
	// A job info only includes an error when its job has been attempted at least the max number of attempts.
	GetJobInfos() (map[string]JobInfo, error)

	// GetIndexerVersions returns version info for all tracked indexers, keyed by indexer ID.
	// Intended for use when automatic reindexing is disabled.
	GetIndexerVersions() ([]*indexer.Versions, error)

	// SetIndexerActualVersion sets the actual version of an indexer, post-reindex.
	// Intended for use when automatic reindexing is disabled.
	SetIndexerActualVersion(indexerID string, actual indexer.Version) error
}

JobQueue is a static, unordered job queue containing state indexers. State indexers are added to the queue for reindexing at initialization, and removed from the queue after the reindex is complete. ClaimAvailableJob should be polled periodically, as jobs may become available at any time.

func NewSQLJobQueue

func NewSQLJobQueue(maxAttempts uint, db *sql.DB, builder sqorc.StatementBuilder) JobQueue

NewSQLJobQueue returns a new SQL-backed implementation of an unordered job queue. The job queue is safe for use across goroutines and processes.

maxAttempts is the max number of times to attempt reindexing the indexer.

Populating the job queue is an exactly-once operation. We handle this in two parts

  • Populate <= 1 time
  • The job queue jobs are written as part of a tx that checks the "stored" indexer versions, and these stored versions are updated the the "desired" versions during the same tx, ensuring no more than one controller instance will write to the job queue per code push.
  • There is a small race condition where multiple callers may both log that they successfully updated the job queue, but this is inconsequential since the condition (a) requires near-simultaneous calls and (b) actually results in the exact same jobs being written.
  • Populate >= 1 time
  • This work is best suited for a future where we have a message broker in the orc8r, so for now each controller warning-logs either success or failure to write to the job queue, and manual inspection of the logs would be required (thankfully, we also have tests to ensure this doesn't happen in the expected case).

Only provides Postgres support due to use of the non-standard "FOR UPDATE SKIP LOCKED" clause.

type Reindexer

type Reindexer interface {
	// Run to progressively complete required reindex jobs.
	// Periodically polls the reindex job queue for reindex jobs, attempts to
	// complete the job, and writes back any encountered errors.
	// Returns only upon context cancellation, which can optionally be nil.
	Run(ctx context.Context)

	// RunUnsafe tries to complete all required reindex jobs.
	// If ID is non-empty, only tries to reindex specified indexer.
	// This function is intended for use only when automatic reindexing (via the
	// reindex queue) is disabled.
	// Arguments:
	//	- Loggable updates sent synchronously via sendUpdate
	// DO NOT use in parallel with Run().
	RunUnsafe(ctx context.Context, indexerID string, sendUpdate func(string)) error

	// GetIndexerVersions returns version info for all tracked indexers, keyed by indexer ID.
	GetIndexerVersions() ([]*indexer.Versions, error)
}

func NewReindexer

func NewReindexer(queue JobQueue, store Store) Reindexer

type Status

type Status string

Status of a reindex job.

func GetStatus

func GetStatus(queue JobQueue, indexerID string) (Status, error)

GetStatus returns the job status of the job for a particular reindex job.

type Store

type Store interface {
	// GetAllIDs returns all IDs known to the state service, keyed by network ID.
	GetAllIDs() (state_types.IDsByNetwork, error)
}

Store provides a cross-network DAO for local usage by the state service.

func NewStore

func NewStore(factory blobstore.BlobStorageFactory) Store

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL