maintenance

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2024 License: MPL-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CancelledJobRetentionPeriodDefault = 24 * time.Hour
	CompletedJobRetentionPeriodDefault = 24 * time.Hour
	DiscardedJobRetentionPeriodDefault = 7 * 24 * time.Hour
	JobCleanerIntervalDefault          = 30 * time.Second
)
View Source
const (
	JobRescuerRescueAfterDefault = time.Hour
	JobRescuerIntervalDefault    = 30 * time.Second
)
View Source
const (
	JobSchedulerIntervalDefault = 5 * time.Second
	JobSchedulerLimitDefault    = 10_000
)
View Source
const (
	// Maintainers will sleep a brief period of time between batches to give the
	// database some breathing room.
	BatchBackoffMax = 1 * time.Second
	BatchBackoffMin = 50 * time.Millisecond

	// Bulk maintenance tasks like job removal operate in batches so that even
	// in the event of an enormous backlog of work to do, transactions stay
	// relatively short and aren't at risk of cancellation. This number is the
	// batch size, or the number of rows that are handled at a time.
	//
	// The specific value is somewhat arbitrary as large enough to make good
	// progress, but not so large as to make the operation overstay its welcome.
	// For now it's not configurable because we can likely pick a number that's
	// suitable for almost everyone.
	BatchSizeDefault = 1_000
)
View Source
const (
	ReindexerIntervalDefault = 24 * time.Hour
	ReindexerTimeoutDefault  = 15 * time.Second
)
View Source
const (
	QueueRetentionPeriodDefault = 24 * time.Hour
)

Variables

View Source
var ErrNoJobToInsert = errors.New("a nil job was returned, nothing to insert")

ErrNoJobToInsert can be returned by a PeriodicJob's JobToInsertFunc to signal that there's no job to insert at this time.

Functions

func GetService

func GetService[T startstop.Service](maintainer *QueueMaintainer) T

GetService is a convenience method for getting a service by name and casting it to the desired type. It should only be used in tests due to its use of reflection and potential for panics.

Types

type ClientRetryPolicy added in v0.0.6

type ClientRetryPolicy interface {
	NextRetry(job *rivertype.JobRow) time.Time
}

type JobCleaner

type JobCleaner struct {
	startstop.BaseStartStop

	// exported for test purposes
	Config      *JobCleanerConfig
	TestSignals JobCleanerTestSignals
	// contains filtered or unexported fields
}

JobCleaner periodically removes finalized jobs that are cancelled, completed, or discarded. Each state's retention time can be configured individually.

func NewJobCleaner

func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, exec riverdriver.Executor) *JobCleaner

func (*JobCleaner) StaggerStart added in v0.2.0

func (s *JobCleaner) StaggerStart(ctx context.Context)

StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.

func (*JobCleaner) StaggerStartupDisable added in v0.2.0

func (s *JobCleaner) StaggerStartupDisable(disabled bool)

StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.

func (*JobCleaner) Start

func (s *JobCleaner) Start(ctx context.Context) error

type JobCleanerConfig

type JobCleanerConfig struct {
	// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
	// around before they're removed permanently.
	CancelledJobRetentionPeriod time.Duration

	// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
	// around before they're removed permanently.
	CompletedJobRetentionPeriod time.Duration

	// DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs
	// around before they're removed permanently.
	DiscardedJobRetentionPeriod time.Duration

	// Interval is the amount of time to wait between runs of the cleaner.
	Interval time.Duration
}

type JobCleanerTestSignals

type JobCleanerTestSignals struct {
	DeletedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce finishes a pass
}

Test-only properties.

func (*JobCleanerTestSignals) Init

func (ts *JobCleanerTestSignals) Init()

type JobRescuer added in v0.0.23

type JobRescuer struct {
	startstop.BaseStartStop

	// exported for test purposes
	Config      *JobRescuerConfig
	TestSignals JobRescuerTestSignals
	// contains filtered or unexported fields
}

JobRescuer periodically rescues jobs that have been executing for too long and are considered to be "stuck".

func NewRescuer added in v0.0.6

func NewRescuer(archetype *baseservice.Archetype, config *JobRescuerConfig, exec riverdriver.Executor) *JobRescuer

func (*JobRescuer) StaggerStart added in v0.2.0

func (s *JobRescuer) StaggerStart(ctx context.Context)

StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.

func (*JobRescuer) StaggerStartupDisable added in v0.2.0

func (s *JobRescuer) StaggerStartupDisable(disabled bool)

StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.

func (*JobRescuer) Start added in v0.0.23

func (s *JobRescuer) Start(ctx context.Context) error

type JobRescuerConfig added in v0.0.23

type JobRescuerConfig struct {
	// ClientRetryPolicy is the default retry policy to use for workers that don't
	// overide NextRetry.
	ClientRetryPolicy ClientRetryPolicy

	// Interval is the amount of time to wait between runs of the rescuer.
	Interval time.Duration

	// RescueAfter is the amount of time for a job to be active before it is
	// considered stuck and should be rescued.
	RescueAfter time.Duration

	WorkUnitFactoryFunc func(kind string) workunit.WorkUnitFactory
}

type JobRescuerTestSignals added in v0.0.23

type JobRescuerTestSignals struct {
	FetchedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs
	UpdatedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has updated rescued jobs from a batch
}

Test-only properties.

func (*JobRescuerTestSignals) Init added in v0.0.23

func (ts *JobRescuerTestSignals) Init()

type JobScheduler added in v0.0.23

type JobScheduler struct {
	startstop.BaseStartStop

	// exported for test purposes
	TestSignals JobSchedulerTestSignals
	// contains filtered or unexported fields
}

JobScheduler periodically moves jobs in `scheduled` or `retryable` state and which are ready to run over to `available` so that they're eligible to be worked.

func NewJobScheduler added in v0.5.0

func NewJobScheduler(archetype *baseservice.Archetype, config *JobSchedulerConfig, exec riverdriver.Executor) *JobScheduler

func (*JobScheduler) StaggerStart added in v0.2.0

func (s *JobScheduler) StaggerStart(ctx context.Context)

StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.

func (*JobScheduler) StaggerStartupDisable added in v0.2.0

func (s *JobScheduler) StaggerStartupDisable(disabled bool)

StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.

func (*JobScheduler) Start added in v0.0.23

func (s *JobScheduler) Start(ctx context.Context) error

type JobSchedulerConfig added in v0.0.23

type JobSchedulerConfig struct {
	// Interval is the amount of time between periodic checks for jobs to
	// be moved from "scheduled" to "available".
	Interval time.Duration

	// Limit is the maximum number of jobs to transition at once from
	// "scheduled" to "available" during periodic scheduling checks.
	Limit int

	// NotifyInsert is a function to call to emit notifications for queues
	// where jobs were scheduled.
	NotifyInsert NotifyInsertFunc
}

type JobSchedulerTestSignals added in v0.0.23

type JobSchedulerTestSignals struct {
	NotifiedQueues rivercommon.TestSignal[[]string] // notifies when queues are sent an insert notification
	ScheduledBatch rivercommon.TestSignal[struct{}] // notifies when runOnce finishes a pass
}

Test-only properties.

func (*JobSchedulerTestSignals) Init added in v0.0.23

func (ts *JobSchedulerTestSignals) Init()

type NotifyInsertFunc added in v0.5.0

type NotifyInsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error

NotifyInsert is a function to call to emit notifications for queues where jobs were scheduled.

type PeriodicJob

type PeriodicJob struct {
	ConstructorFunc func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error)
	RunOnStart      bool
	ScheduleFunc    func(time.Time) time.Time
	// contains filtered or unexported fields
}

PeriodicJob is a periodic job to be run. It's similar to the top-level river.PeriodicJobArgs, but needs a separate type because the enqueuer is in a subpackage.

type PeriodicJobEnqueuer

type PeriodicJobEnqueuer struct {
	startstop.BaseStartStop

	// exported for test purposes
	Config      *PeriodicJobEnqueuerConfig
	TestSignals PeriodicJobEnqueuerTestSignals
	// contains filtered or unexported fields
}

PeriodicJobEnqueuer inserts jobs configured to run periodically as unique jobs to make sure they'll run as frequently as their period dictates.

func (*PeriodicJobEnqueuer) Add added in v0.2.0

Add adds a new periodic job to the enqueuer. The service's run loop is woken immediately so that the job is scheduled appropriately, and inserted if its RunOnStart flag is set to true.

func (*PeriodicJobEnqueuer) AddMany added in v0.2.0

func (s *PeriodicJobEnqueuer) AddMany(periodicJobs []*PeriodicJob) []rivertype.PeriodicJobHandle

AddMany adds many new periodic job to the enqueuer. The service's run loop is woken immediately so that the job is scheduled appropriately, and inserted if any RunOnStart flags are set to true.

func (*PeriodicJobEnqueuer) Clear added in v0.2.0

func (s *PeriodicJobEnqueuer) Clear()

Clear clears all periodic jobs from the enqueuer.

func (*PeriodicJobEnqueuer) Remove added in v0.2.0

func (s *PeriodicJobEnqueuer) Remove(periodicJobHandle rivertype.PeriodicJobHandle)

Remove removes a periodic job from the enqueuer. Its current target run time and all future runs are cancelled.

func (*PeriodicJobEnqueuer) RemoveMany added in v0.2.0

func (s *PeriodicJobEnqueuer) RemoveMany(periodicJobHandles []rivertype.PeriodicJobHandle)

RemoveMany removes many periodic jobs from the enqueuer. Their current target run time and all future runs are cancelled.

func (*PeriodicJobEnqueuer) StaggerStart added in v0.2.0

func (s *PeriodicJobEnqueuer) StaggerStart(ctx context.Context)

StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.

func (*PeriodicJobEnqueuer) StaggerStartupDisable added in v0.2.0

func (s *PeriodicJobEnqueuer) StaggerStartupDisable(disabled bool)

StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.

func (*PeriodicJobEnqueuer) Start

func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error

type PeriodicJobEnqueuerConfig

type PeriodicJobEnqueuerConfig struct {
	AdvisoryLockPrefix int32

	// NotifyInsert is a function to call to emit notifications for queues
	// where jobs were scheduled.
	NotifyInsert NotifyInsertFunc

	// PeriodicJobs are the periodic jobs with which to configure the enqueuer.
	PeriodicJobs []*PeriodicJob
}

type PeriodicJobEnqueuerTestSignals

type PeriodicJobEnqueuerTestSignals struct {
	EnteredLoop    rivercommon.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop
	InsertedJobs   rivercommon.TestSignal[struct{}] // notifies when a batch of jobs is inserted
	NotifiedQueues rivercommon.TestSignal[[]string] // notifies when queues are sent an insert notification
	SkippedJob     rivercommon.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams
}

Test-only properties.

func (*PeriodicJobEnqueuerTestSignals) Init

func (ts *PeriodicJobEnqueuerTestSignals) Init()

type QueueCleaner added in v0.5.0

type QueueCleaner struct {
	startstop.BaseStartStop

	// exported for test purposes
	Config      *QueueCleanerConfig
	TestSignals QueueCleanerTestSignals
	// contains filtered or unexported fields
}

QueueCleaner periodically removes queues from the river_queue table that have not been updated in a while, indicating that they are no longer active.

func NewQueueCleaner added in v0.5.0

func NewQueueCleaner(archetype *baseservice.Archetype, config *QueueCleanerConfig, exec riverdriver.Executor) *QueueCleaner

func (*QueueCleaner) StaggerStart added in v0.5.0

func (s *QueueCleaner) StaggerStart(ctx context.Context)

StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.

func (*QueueCleaner) StaggerStartupDisable added in v0.5.0

func (s *QueueCleaner) StaggerStartupDisable(disabled bool)

StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.

func (*QueueCleaner) Start added in v0.5.0

func (s *QueueCleaner) Start(ctx context.Context) error

type QueueCleanerConfig added in v0.5.0

type QueueCleanerConfig struct {
	// Interval is the amount of time to wait between runs of the cleaner.
	Interval time.Duration
	// RetentionPeriod is the amount of time to keep queues around before they're
	// removed.
	RetentionPeriod time.Duration
}

type QueueCleanerTestSignals added in v0.5.0

type QueueCleanerTestSignals struct {
	DeletedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce finishes a pass
}

Test-only properties.

func (*QueueCleanerTestSignals) Init added in v0.5.0

func (ts *QueueCleanerTestSignals) Init()

type QueueMaintainer

type QueueMaintainer struct {
	baseservice.BaseService
	startstop.BaseStartStop
	// contains filtered or unexported fields
}

QueueMaintainer runs regular maintenance operations against job queues, like pruning completed jobs. It runs only on the client which has been elected leader at any given time.

Its methods are not safe for concurrent usage.

func NewQueueMaintainer

func NewQueueMaintainer(archetype *baseservice.Archetype, services []startstop.Service) *QueueMaintainer

func (*QueueMaintainer) StaggerStartupDisable added in v0.2.0

func (m *QueueMaintainer) StaggerStartupDisable(disabled bool)

StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.

func (*QueueMaintainer) Start

func (m *QueueMaintainer) Start(ctx context.Context) error

type Reindexer

type Reindexer struct {
	startstop.BaseStartStop

	// exported for test purposes
	Config      *ReindexerConfig
	TestSignals ReindexerTestSignals
	// contains filtered or unexported fields
}

Reindexer periodically executes a REINDEX command on the important job indexes to rebuild them and fix bloat issues.

func NewReindexer

func NewReindexer(archetype *baseservice.Archetype, config *ReindexerConfig, exec riverdriver.Executor) *Reindexer

func (*Reindexer) StaggerStart added in v0.2.0

func (s *Reindexer) StaggerStart(ctx context.Context)

StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.

func (*Reindexer) StaggerStartupDisable added in v0.2.0

func (s *Reindexer) StaggerStartupDisable(disabled bool)

StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.

func (*Reindexer) Start

func (s *Reindexer) Start(ctx context.Context) error

type ReindexerConfig

type ReindexerConfig struct {
	// IndexNames is a list of indexes to reindex on each run.
	IndexNames []string

	// ScheduleFunc returns the next scheduled run time for the reindexer given the
	// current time.
	ScheduleFunc func(time.Time) time.Time

	// Timeout is the amount of time to wait for a single reindex query to return.
	Timeout time.Duration
}

type ReindexerTestSignals

type ReindexerTestSignals struct {
	Reindexed rivercommon.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes
}

Test-only properties.

func (*ReindexerTestSignals) Init

func (ts *ReindexerTestSignals) Init()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL