Documentation ¶
Index ¶
- Constants
- Variables
- func GetService[T Service](maintainer *QueueMaintainer) T
- type ClientRetryPolicy
- type JobCleaner
- type JobCleanerConfig
- type JobCleanerTestSignals
- type PeriodicJob
- type PeriodicJobEnqueuer
- type PeriodicJobEnqueuerConfig
- type PeriodicJobEnqueuerTestSignals
- type QueueMaintainer
- type Reindexer
- type ReindexerConfig
- type ReindexerTestSignals
- type Rescuer
- type RescuerConfig
- type RescuerTestSignals
- type Scheduler
- type SchedulerConfig
- type SchedulerTestSignals
- type Service
Constants ¶
const ( CancelledJobRetentionPeriodDefault = 24 * time.Hour CompletedJobRetentionPeriodDefault = 24 * time.Hour DiscardedJobRetentionPeriodDefault = 7 * 24 * time.Hour JobCleanerIntervalDefault = 30 * time.Second )
const ( // Maintainers will sleep a brief period of time between batches to give the // database some breathing room. BatchBackoffMax = 1 * time.Second BatchBackoffMin = 50 * time.Millisecond // Bulk maintenance tasks like job removal operate in batches so that even // in the event of an enormous backlog of work to do, transactions stay // relatively short and aren't at risk of cancellation. This number is the // batch size, or the number of rows that are handled at a time. // // The specific value is somewhat arbitrary as large enough to make good // progress, but not so large as to make the operation overstay its welcome. // For now it's not configurable because we can likely pick a number that's // suitable for almost everyone. BatchSizeDefault = 1_000 JitterMin = 0 * time.Second JitterMax = 1 * time.Second )
const ( ReindexerIntervalDefault = 24 * time.Hour ReindexerTimeoutDefault = 15 * time.Second )
const ( RescueAfterDefault = time.Hour RescuerIntervalDefault = 30 * time.Second )
const ( SchedulerIntervalDefault = 5 * time.Second SchedulerLimitDefault = 10_000 )
Variables ¶
var ErrNoJobToInsert = errors.New("a nil job was returned, nothing to insert")
ErrNoJobToInsert can be returned by a PeriodicJob's JobToInsertFunc to signal that there's no job to insert at this time.
Functions ¶
func GetService ¶
func GetService[T Service](maintainer *QueueMaintainer) T
GetService is a convenience method for getting a service by name and casting it to the desired type. It should only be used in tests due to its use of reflection and potential for panics.
Types ¶
type ClientRetryPolicy ¶ added in v0.0.6
type JobCleaner ¶
type JobCleaner struct { baseservice.BaseService startstop.BaseStartStop // exported for test purposes Config *JobCleanerConfig TestSignals JobCleanerTestSignals // contains filtered or unexported fields }
JobCleaner periodically removes finalized jobs that are cancelled, completed, or discarded. Each state's retention time can be configured individually.
func NewJobCleaner ¶
func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, executor dbutil.Executor) *JobCleaner
type JobCleanerConfig ¶
type JobCleanerConfig struct { // CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs // around before they're removed permanently. CancelledJobRetentionPeriod time.Duration // CompletedJobRetentionPeriod is the amount of time to keep completed jobs // around before they're removed permanently. CompletedJobRetentionPeriod time.Duration // DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs // around before they're removed permanently. DiscardedJobRetentionPeriod time.Duration // Interval is the amount of time to wait between runs of the cleaner. Interval time.Duration }
type JobCleanerTestSignals ¶
type JobCleanerTestSignals struct {
DeletedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce finishes a pass
}
Test-only properties.
func (*JobCleanerTestSignals) Init ¶
func (ts *JobCleanerTestSignals) Init()
type PeriodicJob ¶
type PeriodicJob struct { ConstructorFunc func() (*dbadapter.JobInsertParams, error) RunOnStart bool ScheduleFunc func(time.Time) time.Time // contains filtered or unexported fields }
PeriodicJob is a periodic job to be run. It's similar to the top-level river.PeriodicJobArgs, but needs a separate type because the enqueuer is in a subpackage.
type PeriodicJobEnqueuer ¶
type PeriodicJobEnqueuer struct { baseservice.BaseService startstop.BaseStartStop // exported for test purposes Config *PeriodicJobEnqueuerConfig TestSignals PeriodicJobEnqueuerTestSignals // contains filtered or unexported fields }
PeriodicJobEnqueuer inserts jobs configured to run periodically as unique jobs to make sure they'll run as frequently as their period dictates.
func NewPeriodicJobEnqueuer ¶
func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJobEnqueuerConfig, dbAdapter dbadapter.Adapter) *PeriodicJobEnqueuer
type PeriodicJobEnqueuerConfig ¶
type PeriodicJobEnqueuerConfig struct { // PeriodicJobs are the periodic jobs with which to configure the enqueuer. PeriodicJobs []*PeriodicJob }
type PeriodicJobEnqueuerTestSignals ¶
type PeriodicJobEnqueuerTestSignals struct { EnteredLoop rivercommon.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop InsertedJobs rivercommon.TestSignal[struct{}] // notifies when a batch of jobs is inserted SkippedJob rivercommon.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams }
Test-only properties.
func (*PeriodicJobEnqueuerTestSignals) Init ¶
func (ts *PeriodicJobEnqueuerTestSignals) Init()
type QueueMaintainer ¶
type QueueMaintainer struct { baseservice.BaseService startstop.BaseStartStop // contains filtered or unexported fields }
QueueMaintainer runs regular maintenance operations against job queues, like pruning completed jobs. It runs only on the client which has been elected leader at any given time.
Its methods are not safe for concurrent usage.
func NewQueueMaintainer ¶
func NewQueueMaintainer(archetype *baseservice.Archetype, services []Service) *QueueMaintainer
type Reindexer ¶
type Reindexer struct { baseservice.BaseService startstop.BaseStartStop // exported for test purposes Config *ReindexerConfig TestSignals ReindexerTestSignals // contains filtered or unexported fields }
Reindexer periodically executes a REINDEX command on the important job indexes to rebuild them and fix bloat issues.
func NewReindexer ¶
func NewReindexer(archetype *baseservice.Archetype, config *ReindexerConfig, dbExecutor dbutil.Executor) *Reindexer
type ReindexerConfig ¶
type ReindexerConfig struct { // IndexNames is a list of indexes to reindex on each run. IndexNames []string // ScheduleFunc returns the next scheduled run time for the reindexer given the // current time. ScheduleFunc func(time.Time) time.Time // Timeout is the amount of time to wait for a single reindex query to return. Timeout time.Duration }
type ReindexerTestSignals ¶
type ReindexerTestSignals struct {
Reindexed rivercommon.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes
}
Test-only properties.
func (*ReindexerTestSignals) Init ¶
func (ts *ReindexerTestSignals) Init()
type Rescuer ¶ added in v0.0.6
type Rescuer struct { baseservice.BaseService startstop.BaseStartStop // exported for test purposes Config *RescuerConfig TestSignals RescuerTestSignals // contains filtered or unexported fields }
Rescuer periodically rescues jobs that have been executing for too long and are considered to be "stuck".
func NewRescuer ¶ added in v0.0.6
func NewRescuer(archetype *baseservice.Archetype, config *RescuerConfig, executor dbutil.Executor) *Rescuer
type RescuerConfig ¶ added in v0.0.6
type RescuerConfig struct { // ClientRetryPolicy is the default retry policy to use for workers that don't // overide NextRetry. ClientRetryPolicy ClientRetryPolicy // Interval is the amount of time to wait between runs of the rescuer. Interval time.Duration // RescueAfter is the amount of time for a job to be active before it is // considered stuck and should be rescued. RescueAfter time.Duration WorkUnitFactoryFunc func(kind string) workunit.WorkUnitFactory }
type RescuerTestSignals ¶ added in v0.0.6
type RescuerTestSignals struct { FetchedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs UpdatedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has updated rescued jobs from a batch }
Test-only properties.
func (*RescuerTestSignals) Init ¶ added in v0.0.6
func (ts *RescuerTestSignals) Init()
type Scheduler ¶
type Scheduler struct { baseservice.BaseService startstop.BaseStartStop // exported for test purposes TestSignals SchedulerTestSignals // contains filtered or unexported fields }
Scheduler periodically moves jobs in `scheduled` or `retryable` state and which are ready to run over to `available` so that they're eligible to be worked.
func NewScheduler ¶
func NewScheduler(archetype *baseservice.Archetype, config *SchedulerConfig, executor dbutil.Executor) *Scheduler
type SchedulerConfig ¶
type SchedulerConfig struct { // Interval is the amount of time between periodic checks for jobs to // be moved from "scheduled" to "available". Interval time.Duration // Limit is the maximum number of jobs to transition at once from // "scheduled" to "available" during periodic scheduling checks. Limit int }
type SchedulerTestSignals ¶
type SchedulerTestSignals struct {
ScheduledBatch rivercommon.TestSignal[struct{}] // notifies when runOnce finishes a pass
}
Test-only properties.
func (*SchedulerTestSignals) Init ¶
func (ts *SchedulerTestSignals) Init()
type Service ¶
type Service interface { // Start starts a service. Services are responsible for backgrounding // themselves, so this function should be invoked synchronously. Services // may return an error if they have trouble starting up, so the caller // should wait and respond to the error if necessary. Start(ctx context.Context) error // Stop stops a service. Services are responsible for making sure their stop // is complete before returning so a caller can wait on this invocation // synchronously and be guaranteed the service is fully stopped. Services // are expected to be able to tolerate (1) being stopped without having been // started, and (2) being double-stopped. Stop() }