maintenance

package
v0.0.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2024 License: MPL-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CancelledJobRetentionPeriodDefault = 24 * time.Hour
	CompletedJobRetentionPeriodDefault = 24 * time.Hour
	DiscardedJobRetentionPeriodDefault = 7 * 24 * time.Hour
	JobCleanerIntervalDefault          = 30 * time.Second
)
View Source
const (
	// Maintainers will sleep a brief period of time between batches to give the
	// database some breathing room.
	BatchBackoffMax = 1 * time.Second
	BatchBackoffMin = 50 * time.Millisecond

	// Bulk maintenance tasks like job removal operate in batches so that even
	// in the event of an enormous backlog of work to do, transactions stay
	// relatively short and aren't at risk of cancellation. This number is the
	// batch size, or the number of rows that are handled at a time.
	//
	// The specific value is somewhat arbitrary as large enough to make good
	// progress, but not so large as to make the operation overstay its welcome.
	// For now it's not configurable because we can likely pick a number that's
	// suitable for almost everyone.
	BatchSizeDefault = 1_000

	JitterMin = 0 * time.Second
	JitterMax = 1 * time.Second
)
View Source
const (
	ReindexerIntervalDefault = 24 * time.Hour
	ReindexerTimeoutDefault  = 15 * time.Second
)
View Source
const (
	RescueAfterDefault     = time.Hour
	RescuerIntervalDefault = 30 * time.Second
)
View Source
const (
	SchedulerIntervalDefault = 5 * time.Second
	SchedulerLimitDefault    = 10_000
)

Variables

View Source
var ErrNoJobToInsert = errors.New("a nil job was returned, nothing to insert")

ErrNoJobToInsert can be returned by a PeriodicJob's JobToInsertFunc to signal that there's no job to insert at this time.

Functions

func GetService

func GetService[T Service](maintainer *QueueMaintainer) T

GetService is a convenience method for getting a service by name and casting it to the desired type. It should only be used in tests due to its use of reflection and potential for panics.

Types

type ClientRetryPolicy added in v0.0.6

type ClientRetryPolicy interface {
	NextRetry(job *rivertype.JobRow) time.Time
}

type JobCleaner

type JobCleaner struct {
	baseservice.BaseService
	startstop.BaseStartStop

	// exported for test purposes
	Config      *JobCleanerConfig
	TestSignals JobCleanerTestSignals
	// contains filtered or unexported fields
}

JobCleaner periodically removes finalized jobs that are cancelled, completed, or discarded. Each state's retention time can be configured individually.

func NewJobCleaner

func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, executor dbutil.Executor) *JobCleaner

func (*JobCleaner) Start

func (s *JobCleaner) Start(ctx context.Context) error

type JobCleanerConfig

type JobCleanerConfig struct {
	// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
	// around before they're removed permanently.
	CancelledJobRetentionPeriod time.Duration

	// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
	// around before they're removed permanently.
	CompletedJobRetentionPeriod time.Duration

	// DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs
	// around before they're removed permanently.
	DiscardedJobRetentionPeriod time.Duration

	// Interval is the amount of time to wait between runs of the cleaner.
	Interval time.Duration
}

type JobCleanerTestSignals

type JobCleanerTestSignals struct {
	DeletedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce finishes a pass
}

Test-only properties.

func (*JobCleanerTestSignals) Init

func (ts *JobCleanerTestSignals) Init()

type PeriodicJob

type PeriodicJob struct {
	ConstructorFunc func() (*dbadapter.JobInsertParams, error)
	RunOnStart      bool
	ScheduleFunc    func(time.Time) time.Time
	// contains filtered or unexported fields
}

PeriodicJob is a periodic job to be run. It's similar to the top-level river.PeriodicJobArgs, but needs a separate type because the enqueuer is in a subpackage.

type PeriodicJobEnqueuer

type PeriodicJobEnqueuer struct {
	baseservice.BaseService
	startstop.BaseStartStop

	// exported for test purposes
	Config      *PeriodicJobEnqueuerConfig
	TestSignals PeriodicJobEnqueuerTestSignals
	// contains filtered or unexported fields
}

PeriodicJobEnqueuer inserts jobs configured to run periodically as unique jobs to make sure they'll run as frequently as their period dictates.

func NewPeriodicJobEnqueuer

func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJobEnqueuerConfig, dbAdapter dbadapter.Adapter) *PeriodicJobEnqueuer

func (*PeriodicJobEnqueuer) Start

func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error

type PeriodicJobEnqueuerConfig

type PeriodicJobEnqueuerConfig struct {
	// PeriodicJobs are the periodic jobs with which to configure the enqueuer.
	PeriodicJobs []*PeriodicJob
}

type PeriodicJobEnqueuerTestSignals

type PeriodicJobEnqueuerTestSignals struct {
	EnteredLoop  rivercommon.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop
	InsertedJobs rivercommon.TestSignal[struct{}] // notifies when a batch of jobs is inserted
	SkippedJob   rivercommon.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams
}

Test-only properties.

func (*PeriodicJobEnqueuerTestSignals) Init

func (ts *PeriodicJobEnqueuerTestSignals) Init()

type QueueMaintainer

type QueueMaintainer struct {
	baseservice.BaseService
	startstop.BaseStartStop
	// contains filtered or unexported fields
}

QueueMaintainer runs regular maintenance operations against job queues, like pruning completed jobs. It runs only on the client which has been elected leader at any given time.

Its methods are not safe for concurrent usage.

func NewQueueMaintainer

func NewQueueMaintainer(archetype *baseservice.Archetype, services []Service) *QueueMaintainer

func (*QueueMaintainer) Start

func (m *QueueMaintainer) Start(ctx context.Context) error

type Reindexer

type Reindexer struct {
	baseservice.BaseService
	startstop.BaseStartStop

	// exported for test purposes
	Config      *ReindexerConfig
	TestSignals ReindexerTestSignals
	// contains filtered or unexported fields
}

Reindexer periodically executes a REINDEX command on the important job indexes to rebuild them and fix bloat issues.

func NewReindexer

func NewReindexer(archetype *baseservice.Archetype, config *ReindexerConfig, dbExecutor dbutil.Executor) *Reindexer

func (*Reindexer) Start

func (s *Reindexer) Start(ctx context.Context) error

type ReindexerConfig

type ReindexerConfig struct {
	// IndexNames is a list of indexes to reindex on each run.
	IndexNames []string

	// ScheduleFunc returns the next scheduled run time for the reindexer given the
	// current time.
	ScheduleFunc func(time.Time) time.Time

	// Timeout is the amount of time to wait for a single reindex query to return.
	Timeout time.Duration
}

type ReindexerTestSignals

type ReindexerTestSignals struct {
	Reindexed rivercommon.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes
}

Test-only properties.

func (*ReindexerTestSignals) Init

func (ts *ReindexerTestSignals) Init()

type Rescuer added in v0.0.6

type Rescuer struct {
	baseservice.BaseService
	startstop.BaseStartStop

	// exported for test purposes
	Config      *RescuerConfig
	TestSignals RescuerTestSignals
	// contains filtered or unexported fields
}

Rescuer periodically rescues jobs that have been executing for too long and are considered to be "stuck".

func NewRescuer added in v0.0.6

func NewRescuer(archetype *baseservice.Archetype, config *RescuerConfig, executor dbutil.Executor) *Rescuer

func (*Rescuer) Start added in v0.0.6

func (s *Rescuer) Start(ctx context.Context) error

type RescuerConfig added in v0.0.6

type RescuerConfig struct {
	// ClientRetryPolicy is the default retry policy to use for workers that don't
	// overide NextRetry.
	ClientRetryPolicy ClientRetryPolicy

	// Interval is the amount of time to wait between runs of the rescuer.
	Interval time.Duration

	// RescueAfter is the amount of time for a job to be active before it is
	// considered stuck and should be rescued.
	RescueAfter time.Duration

	WorkUnitFactoryFunc func(kind string) workunit.WorkUnitFactory
}

type RescuerTestSignals added in v0.0.6

type RescuerTestSignals struct {
	FetchedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs
	UpdatedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has updated rescued jobs from a batch
}

Test-only properties.

func (*RescuerTestSignals) Init added in v0.0.6

func (ts *RescuerTestSignals) Init()

type Scheduler

type Scheduler struct {
	baseservice.BaseService
	startstop.BaseStartStop

	// exported for test purposes
	TestSignals SchedulerTestSignals
	// contains filtered or unexported fields
}

Scheduler periodically moves jobs in `scheduled` or `retryable` state and which are ready to run over to `available` so that they're eligible to be worked.

func NewScheduler

func NewScheduler(archetype *baseservice.Archetype, config *SchedulerConfig, executor dbutil.Executor) *Scheduler

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

type SchedulerConfig

type SchedulerConfig struct {
	// Interval is the amount of time between periodic checks for jobs to
	// be moved from "scheduled" to "available".
	Interval time.Duration

	// Limit is the maximum number of jobs to transition at once from
	// "scheduled" to "available" during periodic scheduling checks.
	Limit int
}

type SchedulerTestSignals

type SchedulerTestSignals struct {
	ScheduledBatch rivercommon.TestSignal[struct{}] // notifies when runOnce finishes a pass
}

Test-only properties.

func (*SchedulerTestSignals) Init

func (ts *SchedulerTestSignals) Init()

type Service

type Service interface {
	// Start starts a service. Services are responsible for backgrounding
	// themselves, so this function should be invoked synchronously. Services
	// may return an error if they have trouble starting up, so the caller
	// should wait and respond to the error if necessary.
	Start(ctx context.Context) error

	// Stop stops a service. Services are responsible for making sure their stop
	// is complete before returning so a caller can wait on this invocation
	// synchronously and be guaranteed the service is fully stopped. Services
	// are expected to be able to tolerate (1) being stopped without having been
	// started, and (2) being double-stopped.
	Stop()
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL