Documentation ¶
Index ¶
- Constants
- Variables
- func GetService[T startstop.Service](maintainer *QueueMaintainer) T
- type ClientRetryPolicy
- type JobCleaner
- type JobCleanerConfig
- type JobCleanerTestSignals
- type JobRescuer
- type JobRescuerConfig
- type JobRescuerTestSignals
- type JobScheduler
- type JobSchedulerConfig
- type JobSchedulerTestSignals
- type NotifyInsertFunc
- type PeriodicJob
- type PeriodicJobEnqueuer
- func (s *PeriodicJobEnqueuer) Add(periodicJob *PeriodicJob) rivertype.PeriodicJobHandle
- func (s *PeriodicJobEnqueuer) AddMany(periodicJobs []*PeriodicJob) []rivertype.PeriodicJobHandle
- func (s *PeriodicJobEnqueuer) Clear()
- func (s *PeriodicJobEnqueuer) Remove(periodicJobHandle rivertype.PeriodicJobHandle)
- func (s *PeriodicJobEnqueuer) RemoveMany(periodicJobHandles []rivertype.PeriodicJobHandle)
- func (s *PeriodicJobEnqueuer) StaggerStart(ctx context.Context)
- func (s *PeriodicJobEnqueuer) StaggerStartupDisable(disabled bool)
- func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error
- type PeriodicJobEnqueuerConfig
- type PeriodicJobEnqueuerTestSignals
- type QueueCleaner
- type QueueCleanerConfig
- type QueueCleanerTestSignals
- type QueueMaintainer
- type Reindexer
- type ReindexerConfig
- type ReindexerTestSignals
Constants ¶
const ( CancelledJobRetentionPeriodDefault = 24 * time.Hour CompletedJobRetentionPeriodDefault = 24 * time.Hour DiscardedJobRetentionPeriodDefault = 7 * 24 * time.Hour JobCleanerIntervalDefault = 30 * time.Second )
const ( JobRescuerRescueAfterDefault = time.Hour JobRescuerIntervalDefault = 30 * time.Second )
const ( JobSchedulerIntervalDefault = 5 * time.Second JobSchedulerLimitDefault = 10_000 )
const ( // Maintainers will sleep a brief period of time between batches to give the // database some breathing room. BatchBackoffMax = 1 * time.Second BatchBackoffMin = 50 * time.Millisecond // Bulk maintenance tasks like job removal operate in batches so that even // in the event of an enormous backlog of work to do, transactions stay // relatively short and aren't at risk of cancellation. This number is the // batch size, or the number of rows that are handled at a time. // // The specific value is somewhat arbitrary as large enough to make good // progress, but not so large as to make the operation overstay its welcome. // For now it's not configurable because we can likely pick a number that's // suitable for almost everyone. BatchSizeDefault = 1_000 )
const ( ReindexerIntervalDefault = 24 * time.Hour ReindexerTimeoutDefault = 15 * time.Second )
const (
QueueRetentionPeriodDefault = 24 * time.Hour
)
Variables ¶
var ErrNoJobToInsert = errors.New("a nil job was returned, nothing to insert")
ErrNoJobToInsert can be returned by a PeriodicJob's JobToInsertFunc to signal that there's no job to insert at this time.
Functions ¶
func GetService ¶
func GetService[T startstop.Service](maintainer *QueueMaintainer) T
GetService is a convenience method for getting a service by name and casting it to the desired type. It should only be used in tests due to its use of reflection and potential for panics.
Types ¶
type ClientRetryPolicy ¶ added in v0.0.6
type JobCleaner ¶
type JobCleaner struct { startstop.BaseStartStop // exported for test purposes Config *JobCleanerConfig TestSignals JobCleanerTestSignals // contains filtered or unexported fields }
JobCleaner periodically removes finalized jobs that are cancelled, completed, or discarded. Each state's retention time can be configured individually.
func NewJobCleaner ¶
func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, exec riverdriver.Executor) *JobCleaner
func (*JobCleaner) StaggerStart ¶ added in v0.2.0
StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.
func (*JobCleaner) StaggerStartupDisable ¶ added in v0.2.0
func (s *JobCleaner) StaggerStartupDisable(disabled bool)
StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.
type JobCleanerConfig ¶
type JobCleanerConfig struct { // CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs // around before they're removed permanently. CancelledJobRetentionPeriod time.Duration // CompletedJobRetentionPeriod is the amount of time to keep completed jobs // around before they're removed permanently. CompletedJobRetentionPeriod time.Duration // DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs // around before they're removed permanently. DiscardedJobRetentionPeriod time.Duration // Interval is the amount of time to wait between runs of the cleaner. Interval time.Duration }
type JobCleanerTestSignals ¶
type JobCleanerTestSignals struct {
DeletedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce finishes a pass
}
Test-only properties.
func (*JobCleanerTestSignals) Init ¶
func (ts *JobCleanerTestSignals) Init()
type JobRescuer ¶ added in v0.0.23
type JobRescuer struct { startstop.BaseStartStop // exported for test purposes Config *JobRescuerConfig TestSignals JobRescuerTestSignals // contains filtered or unexported fields }
JobRescuer periodically rescues jobs that have been executing for too long and are considered to be "stuck".
func NewRescuer ¶ added in v0.0.6
func NewRescuer(archetype *baseservice.Archetype, config *JobRescuerConfig, exec riverdriver.Executor) *JobRescuer
func (*JobRescuer) StaggerStart ¶ added in v0.2.0
StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.
func (*JobRescuer) StaggerStartupDisable ¶ added in v0.2.0
func (s *JobRescuer) StaggerStartupDisable(disabled bool)
StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.
type JobRescuerConfig ¶ added in v0.0.23
type JobRescuerConfig struct { // ClientRetryPolicy is the default retry policy to use for workers that don't // overide NextRetry. ClientRetryPolicy ClientRetryPolicy // Interval is the amount of time to wait between runs of the rescuer. Interval time.Duration // RescueAfter is the amount of time for a job to be active before it is // considered stuck and should be rescued. RescueAfter time.Duration WorkUnitFactoryFunc func(kind string) workunit.WorkUnitFactory }
type JobRescuerTestSignals ¶ added in v0.0.23
type JobRescuerTestSignals struct { FetchedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs UpdatedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has updated rescued jobs from a batch }
Test-only properties.
func (*JobRescuerTestSignals) Init ¶ added in v0.0.23
func (ts *JobRescuerTestSignals) Init()
type JobScheduler ¶ added in v0.0.23
type JobScheduler struct { startstop.BaseStartStop // exported for test purposes TestSignals JobSchedulerTestSignals // contains filtered or unexported fields }
JobScheduler periodically moves jobs in `scheduled` or `retryable` state and which are ready to run over to `available` so that they're eligible to be worked.
func NewJobScheduler ¶ added in v0.5.0
func NewJobScheduler(archetype *baseservice.Archetype, config *JobSchedulerConfig, exec riverdriver.Executor) *JobScheduler
func (*JobScheduler) StaggerStart ¶ added in v0.2.0
StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.
func (*JobScheduler) StaggerStartupDisable ¶ added in v0.2.0
func (s *JobScheduler) StaggerStartupDisable(disabled bool)
StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.
type JobSchedulerConfig ¶ added in v0.0.23
type JobSchedulerConfig struct { // Interval is the amount of time between periodic checks for jobs to // be moved from "scheduled" to "available". Interval time.Duration // Limit is the maximum number of jobs to transition at once from // "scheduled" to "available" during periodic scheduling checks. Limit int // NotifyInsert is a function to call to emit notifications for queues // where jobs were scheduled. NotifyInsert NotifyInsertFunc }
type JobSchedulerTestSignals ¶ added in v0.0.23
type JobSchedulerTestSignals struct { NotifiedQueues rivercommon.TestSignal[[]string] // notifies when queues are sent an insert notification ScheduledBatch rivercommon.TestSignal[struct{}] // notifies when runOnce finishes a pass }
Test-only properties.
func (*JobSchedulerTestSignals) Init ¶ added in v0.0.23
func (ts *JobSchedulerTestSignals) Init()
type NotifyInsertFunc ¶ added in v0.5.0
type NotifyInsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error
NotifyInsert is a function to call to emit notifications for queues where jobs were scheduled.
type PeriodicJob ¶
type PeriodicJob struct { ConstructorFunc func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) RunOnStart bool ScheduleFunc func(time.Time) time.Time // contains filtered or unexported fields }
PeriodicJob is a periodic job to be run. It's similar to the top-level river.PeriodicJobArgs, but needs a separate type because the enqueuer is in a subpackage.
type PeriodicJobEnqueuer ¶
type PeriodicJobEnqueuer struct { startstop.BaseStartStop // exported for test purposes Config *PeriodicJobEnqueuerConfig TestSignals PeriodicJobEnqueuerTestSignals // contains filtered or unexported fields }
PeriodicJobEnqueuer inserts jobs configured to run periodically as unique jobs to make sure they'll run as frequently as their period dictates.
func NewPeriodicJobEnqueuer ¶
func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJobEnqueuerConfig, exec riverdriver.Executor) *PeriodicJobEnqueuer
func (*PeriodicJobEnqueuer) Add ¶ added in v0.2.0
func (s *PeriodicJobEnqueuer) Add(periodicJob *PeriodicJob) rivertype.PeriodicJobHandle
Add adds a new periodic job to the enqueuer. The service's run loop is woken immediately so that the job is scheduled appropriately, and inserted if its RunOnStart flag is set to true.
func (*PeriodicJobEnqueuer) AddMany ¶ added in v0.2.0
func (s *PeriodicJobEnqueuer) AddMany(periodicJobs []*PeriodicJob) []rivertype.PeriodicJobHandle
AddMany adds many new periodic job to the enqueuer. The service's run loop is woken immediately so that the job is scheduled appropriately, and inserted if any RunOnStart flags are set to true.
func (*PeriodicJobEnqueuer) Clear ¶ added in v0.2.0
func (s *PeriodicJobEnqueuer) Clear()
Clear clears all periodic jobs from the enqueuer.
func (*PeriodicJobEnqueuer) Remove ¶ added in v0.2.0
func (s *PeriodicJobEnqueuer) Remove(periodicJobHandle rivertype.PeriodicJobHandle)
Remove removes a periodic job from the enqueuer. Its current target run time and all future runs are cancelled.
func (*PeriodicJobEnqueuer) RemoveMany ¶ added in v0.2.0
func (s *PeriodicJobEnqueuer) RemoveMany(periodicJobHandles []rivertype.PeriodicJobHandle)
RemoveMany removes many periodic jobs from the enqueuer. Their current target run time and all future runs are cancelled.
func (*PeriodicJobEnqueuer) StaggerStart ¶ added in v0.2.0
StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.
func (*PeriodicJobEnqueuer) StaggerStartupDisable ¶ added in v0.2.0
func (s *PeriodicJobEnqueuer) StaggerStartupDisable(disabled bool)
StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.
type PeriodicJobEnqueuerConfig ¶
type PeriodicJobEnqueuerConfig struct { AdvisoryLockPrefix int32 // NotifyInsert is a function to call to emit notifications for queues // where jobs were scheduled. NotifyInsert NotifyInsertFunc // PeriodicJobs are the periodic jobs with which to configure the enqueuer. PeriodicJobs []*PeriodicJob }
type PeriodicJobEnqueuerTestSignals ¶
type PeriodicJobEnqueuerTestSignals struct { EnteredLoop rivercommon.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop InsertedJobs rivercommon.TestSignal[struct{}] // notifies when a batch of jobs is inserted NotifiedQueues rivercommon.TestSignal[[]string] // notifies when queues are sent an insert notification SkippedJob rivercommon.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams }
Test-only properties.
func (*PeriodicJobEnqueuerTestSignals) Init ¶
func (ts *PeriodicJobEnqueuerTestSignals) Init()
type QueueCleaner ¶ added in v0.5.0
type QueueCleaner struct { startstop.BaseStartStop // exported for test purposes Config *QueueCleanerConfig TestSignals QueueCleanerTestSignals // contains filtered or unexported fields }
QueueCleaner periodically removes queues from the river_queue table that have not been updated in a while, indicating that they are no longer active.
func NewQueueCleaner ¶ added in v0.5.0
func NewQueueCleaner(archetype *baseservice.Archetype, config *QueueCleanerConfig, exec riverdriver.Executor) *QueueCleaner
func (*QueueCleaner) StaggerStart ¶ added in v0.5.0
StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.
func (*QueueCleaner) StaggerStartupDisable ¶ added in v0.5.0
func (s *QueueCleaner) StaggerStartupDisable(disabled bool)
StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.
type QueueCleanerConfig ¶ added in v0.5.0
type QueueCleanerTestSignals ¶ added in v0.5.0
type QueueCleanerTestSignals struct {
DeletedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce finishes a pass
}
Test-only properties.
func (*QueueCleanerTestSignals) Init ¶ added in v0.5.0
func (ts *QueueCleanerTestSignals) Init()
type QueueMaintainer ¶
type QueueMaintainer struct { baseservice.BaseService startstop.BaseStartStop // contains filtered or unexported fields }
QueueMaintainer runs regular maintenance operations against job queues, like pruning completed jobs. It runs only on the client which has been elected leader at any given time.
Its methods are not safe for concurrent usage.
func NewQueueMaintainer ¶
func NewQueueMaintainer(archetype *baseservice.Archetype, services []startstop.Service) *QueueMaintainer
func (*QueueMaintainer) StaggerStartupDisable ¶ added in v0.2.0
func (m *QueueMaintainer) StaggerStartupDisable(disabled bool)
StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.
type Reindexer ¶
type Reindexer struct { startstop.BaseStartStop // exported for test purposes Config *ReindexerConfig TestSignals ReindexerTestSignals // contains filtered or unexported fields }
Reindexer periodically executes a REINDEX command on the important job indexes to rebuild them and fix bloat issues.
func NewReindexer ¶
func NewReindexer(archetype *baseservice.Archetype, config *ReindexerConfig, exec riverdriver.Executor) *Reindexer
func (*Reindexer) StaggerStart ¶ added in v0.2.0
StaggerStart is called when queue maintainer services start. It jitters by sleeping for a short random period so services don't all perform their first run at exactly the same time.
func (*Reindexer) StaggerStartupDisable ¶ added in v0.2.0
func (s *Reindexer) StaggerStartupDisable(disabled bool)
StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.
type ReindexerConfig ¶
type ReindexerConfig struct { // IndexNames is a list of indexes to reindex on each run. IndexNames []string // ScheduleFunc returns the next scheduled run time for the reindexer given the // current time. ScheduleFunc func(time.Time) time.Time // Timeout is the amount of time to wait for a single reindex query to return. Timeout time.Duration }
type ReindexerTestSignals ¶
type ReindexerTestSignals struct {
Reindexed rivercommon.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes
}
Test-only properties.
func (*ReindexerTestSignals) Init ¶
func (ts *ReindexerTestSignals) Init()