Documentation ¶
Overview ¶
Package river is a robust high-performance job processing system for Go.
Because it is built using Postgres, River enables you to use the same database for both your application data and your job queue. This simplifies operations, but perhaps more importantly it makes it possible to enqueue jobs transactionally with other database changes. This avoids a whole class of distributed systems issues like jobs that execute before the database transaction that enqueued them has even committed, or jobs that attempt to utilize database changes which were rolled back. It also makes it possible for your job to make database changes atomically with the job being marked as complete.
Job args ¶
Jobs need to be able to serialize their state to JSON so that they can round tripped from the database and back. Each job has an args struct with JSON tags on its properties to allow for this:
// SortArgs are arguments for SortWorker. type SortArgs struct { // Strings is a slice of strings to sort. Strings []string `json:"strings"` } func (SortArgs) Kind() string { return "sort_job" }
Args are created to enqueue a new job and are what a worker receives to work one. Each one implements JobArgs.Kind, which returns a unique string that's used to recognize the job as it round trips from the database.
Job workers ¶
Each job kind also has a corresponding worker struct where its core work function is defined:
// SortWorker is a job worker for sorting strings. type SortWorker struct { river.WorkerDefaults[SortArgs] } func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error { sort.Strings(job.Args.Strings) fmt.Printf("Sorted strings: %+v\n", job.Args.Strings) return nil }
A few details to notice:
Although not strictly necessary, workers embed WorkerDefaults with a reference to their args type. This allows them to inherit defaults for the Worker interface, and helps futureproof in case its ever expanded.
Each worker implements Worker.Work, which is where the async heavy-lifting for a background job is done. Work implementations receive a generic like river.Job[SortArgs] for easy access to job arguments.
Registering workers ¶
As a program is initially starting up, worker structs are registered so that River can know how to work them:
workers := river.NewWorkers() river.AddWorker(workers, &SortWorker{})
River client ¶
The main River client takes a pgx connection pool wrapped with River's Pgx v5 driver using riverpgxv5.New and a set of registered workers (see above). Each queue can receive configuration like the maximum number of goroutines that'll be used to work it:
dbConfig, err := pgxpool.ParseConfig("postgres://localhost/river") if err != nil { return err } dbPool, err := pgxpool.NewWithConfig(ctx, dbConfig) if err != nil { return err } defer dbPool.Close() riverClient, err := river.NewClient(&river.Config{ Driver: riverpgxv5.New(dbPool), Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 100}, }, Workers: workers, }) if err := riverClient.Start(ctx); err != nil { ... } ... // Before program exit, try to shut down cleanly. if err := riverClient.Shutdown(ctx); err != nil { return err }
For programs that'll be inserting jobs only, the Queues and Workers configuration keys can be omitted for brevity:
riverClient, err := river.NewClient(&river.Config{ DBPool: dbPool, })
However, if Workers is specified, the client can validate that an inserted job has a worker that's registered with the workers bundle, so it's recommended that Workers is configured anyway if your project is set up to easily allow it.
See Config for details on all configuration options.
Inserting jobs ¶
Insert jobs by opening a transaction and calling Client.InsertTx with a job args instance (a non-transactional Client.Insert is also available) and the transaction wrapped with [riverpgxv5Tx]:
tx, err := dbPool.Begin(ctx) if err != nil { return err } defer tx.Rollback(ctx) _, err = riverClient.InsertTx(ctx, tx, SortArgs{ Strings: []string{ "whale", "tiger", "bear", }, }, nil) if err != nil { return err } if err := tx.Commit(ctx); err != nil { return err }
Due to rules around transaction visibility, inserted jobs aren't visible to workers until the transaction that inserted them is committed. This prevents a whole host of problems like workers trying to work a job before its viable to do so because not all its requisite data has been persisted yet.
See the InsertAndWork example for all this code in one place.
Other features ¶
- Periodic jobs that run on a predefined interval. See the PeriodicJob example below.
Verifying inserted jobs ¶
See the rivertest package for test helpers that can be used to easily verified inserted jobs in a test suite. For example:
job := rivertest.RequireInserted(ctx, t, dbPool, &RequiredArgs{}, nil) fmt.Printf("Test passed with message: %s\n", job.Args.Message)
Example (BatchInsert) ¶
Example_batchInsert demonstrates how many jobs can be inserted for work as part of a single operation.
package main import ( "context" "fmt" "log/slog" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type BatchInsertArgs struct{} func (BatchInsertArgs) Kind() string { return "batch_insert" } // BatchInsertWorker is a job worker demonstrating use of custom // job-specific insertion options. type BatchInsertWorker struct { river.WorkerDefaults[BatchInsertArgs] } func (w *BatchInsertWorker) Work(ctx context.Context, job *river.Job[BatchInsertArgs]) error { fmt.Printf("Worked a job\n") return nil } // Example_batchInsert demonstrates how many jobs can be inserted for work as // part of a single operation. func main() { ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, &BatchInsertWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } // Out of example scope, but used to make wait until a job is worked. subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) defer subscribeCancel() if err := riverClient.Start(ctx); err != nil { panic(err) } count, err := riverClient.InsertMany(ctx, []river.InsertManyParams{ {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}}, {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 4}}, }) if err != nil { panic(err) } fmt.Printf("Inserted %d jobs\n", count) waitForNJobs(subscribeChan, 5) if err := riverClient.Stop(ctx); err != nil { panic(err) } }
Output: Inserted 5 jobs Worked a job Worked a job Worked a job Worked a job Worked a job
Example (CompleteJobWithinTx) ¶
Example_completeJobWithinTx demonstrates how to transactionally complete a job alongside other database changes being made.
package main import ( "context" "fmt" "log/slog" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type TransactionalArgs struct{} func (TransactionalArgs) Kind() string { return "transactional_worker" } // TransactionalWorker is a job worker which runs an operation on the database // and transactionally completes the current job. // // While this example is simplified, any operations could be performed within // the transaction such as inserting additional jobs or manipulating other data. type TransactionalWorker struct { river.WorkerDefaults[TransactionalArgs] dbPool *pgxpool.Pool } func (w *TransactionalWorker) Work(ctx context.Context, job *river.Job[TransactionalArgs]) error { tx, err := w.dbPool.Begin(ctx) if err != nil { return err } defer tx.Rollback(ctx) var result int if err := tx.QueryRow(ctx, "SELECT 1").Scan(&result); err != nil { return err } // The function needs to know the type of the database driver in use by the // Client, but the other generic parameters can be inferred. jobAfter, err := river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job) if err != nil { return err } fmt.Printf("Transitioned TransactionalWorker job from %q to %q\n", job.State, jobAfter.State) if err = tx.Commit(ctx); err != nil { return err } return nil } // Example_completeJobWithinTx demonstrates how to transactionally complete // a job alongside other database changes being made. func main() { ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, &TransactionalWorker{dbPool: dbPool}) river.AddWorker(workers, &SortWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } // Not strictly needed, but used to help this test wait until job is worked. subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) defer subscribeCancel() if err := riverClient.Start(ctx); err != nil { panic(err) } if _, err = riverClient.Insert(ctx, TransactionalArgs{}, nil); err != nil { panic(err) } waitForNJobs(subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) } }
Output: Transitioned TransactionalWorker job from "running" to "completed"
Example (CronJob) ¶
Example_cronJob demonstrates how to create a cron job with a more complex schedule using a third party cron package to parse more elaborate crontab syntax.
package main import ( "context" "fmt" "log/slog" "github.com/jackc/pgx/v5/pgxpool" "github.com/robfig/cron/v3" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type CronJobArgs struct{} // Kind is the unique string name for this job. func (CronJobArgs) Kind() string { return "cron" } // CronJobWorker is a job worker for sorting strings. type CronJobWorker struct { river.WorkerDefaults[CronJobArgs] } func (w *CronJobWorker) Work(ctx context.Context, job *river.Job[CronJobArgs]) error { fmt.Printf("This job will run once immediately then every hour on the half hour\n") return nil } // Example_cronJob demonstrates how to create a cron job with a more complex // schedule using a third party cron package to parse more elaborate crontab // syntax. func main() { ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, &CronJobWorker{}) schedule, err := cron.ParseStandard("30 * * * *") // every hour on the half hour if err != nil { panic(err) } riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), PeriodicJobs: []*river.PeriodicJob{ river.NewPeriodicJob( schedule, func() (river.JobArgs, *river.InsertOpts) { return CronJobArgs{}, nil }, &river.PeriodicJobOpts{RunOnStart: true}, ), }, Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } // Out of example scope, but used to make wait until a job is worked. subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) defer subscribeCancel() // There's no need to explicitly insert a periodic job. One will be inserted // (and worked soon after) as the client starts up. if err := riverClient.Start(ctx); err != nil { panic(err) } waitForNJobs(subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) } }
Output: This job will run once immediately then every hour on the half hour
Example (CustomInsertOpts) ¶
Example_customInsertOpts demonstrates the use of a job with custom job-specific insertion options.
package main import ( "context" "fmt" "log/slog" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type AlwaysHighPriorityArgs struct{} func (AlwaysHighPriorityArgs) Kind() string { return "always_high_priority" } // InsertOpts returns custom insert options that every job of this type will // inherit by default. func (AlwaysHighPriorityArgs) InsertOpts() river.InsertOpts { return river.InsertOpts{ Queue: "high_priority", } } // AlwaysHighPriorityWorker is a job worker demonstrating use of custom // job-specific insertion options. type AlwaysHighPriorityWorker struct { river.WorkerDefaults[AlwaysHighPriorityArgs] } func (w *AlwaysHighPriorityWorker) Work(ctx context.Context, job *river.Job[AlwaysHighPriorityArgs]) error { fmt.Printf("Ran in queue: %s\n", job.Queue) return nil } type SometimesHighPriorityArgs struct{} func (SometimesHighPriorityArgs) Kind() string { return "sometimes_high_priority" } // SometimesHighPriorityWorker is a job worker that's made high-priority // sometimes through the use of options at insertion time. type SometimesHighPriorityWorker struct { river.WorkerDefaults[SometimesHighPriorityArgs] } func (w *SometimesHighPriorityWorker) Work(ctx context.Context, job *river.Job[SometimesHighPriorityArgs]) error { fmt.Printf("Ran in queue: %s\n", job.Queue) return nil } // Example_customInsertOpts demonstrates the use of a job with custom // job-specific insertion options. func main() { ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, &AlwaysHighPriorityWorker{}) river.AddWorker(workers, &SometimesHighPriorityWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 100}, "high_priority": {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } // Out of example scope, but used to make wait until a job is worked. subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) defer subscribeCancel() if err := riverClient.Start(ctx); err != nil { panic(err) } // This job always runs in the high-priority queue because its job-specific // options on the struct above dictate that it will. _, err = riverClient.Insert(ctx, AlwaysHighPriorityArgs{}, nil) if err != nil { panic(err) } // This job will run in the high-priority queue because of the options given // at insertion time. _, err = riverClient.Insert(ctx, SometimesHighPriorityArgs{}, &river.InsertOpts{ Queue: "high_priority", }) if err != nil { panic(err) } waitForNJobs(subscribeChan, 2) if err := riverClient.Stop(ctx); err != nil { panic(err) } }
Output: Ran in queue: high_priority Ran in queue: high_priority
Example (ErrorHandler) ¶
Example_errorHandler demonstrates how to use the ErrorHandler interface for custom application telemetry.
package main import ( "context" "fmt" "log/slog" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type CustomErrorHandler struct{} func (*CustomErrorHandler) HandleError(ctx context.Context, job *river.JobRow, err error) *river.ErrorHandlerResult { fmt.Printf("Job errored with: %s\n", err) return nil } func (*CustomErrorHandler) HandlePanic(ctx context.Context, job *river.JobRow, panicVal any) *river.ErrorHandlerResult { fmt.Printf("Job panicked with: %v\n", panicVal) // Either function can also set the job to be immediately cancelled. return &river.ErrorHandlerResult{SetCancelled: true} } type ErroringArgs struct { ShouldError bool ShouldPanic bool } func (ErroringArgs) Kind() string { return "erroring" } // Here to make sure our jobs are never accidentally retried which would add // additional output and fail the example. func (ErroringArgs) InsertOpts() river.InsertOpts { return river.InsertOpts{MaxAttempts: 1} } type ErroringWorker struct { river.WorkerDefaults[ErroringArgs] } func (w *ErroringWorker) Work(ctx context.Context, j *river.Job[ErroringArgs]) error { switch { case j.Args.ShouldError: return fmt.Errorf("this job errored") case j.Args.ShouldPanic: panic("this job panicked") } return nil } // Example_errorHandler demonstrates how to use the ErrorHandler interface for // custom application telemetry. func main() { ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, &ErroringWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ ErrorHandler: &CustomErrorHandler{}, Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: 9}), // Suppress logging so example output is cleaner (9 > slog.LevelError). Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 10}, }, Workers: workers, }) if err != nil { panic(err) } // Not strictly needed, but used to help this test wait until job is worked. subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCancelled, river.EventKindJobFailed) defer subscribeCancel() if err := riverClient.Start(ctx); err != nil { panic(err) } if _, err = riverClient.Insert(ctx, ErroringArgs{ShouldError: true}, nil); err != nil { panic(err) } // Wait for the first job before inserting another to guarantee test output // is ordered correctly. waitForNJobs(subscribeChan, 1) if _, err = riverClient.Insert(ctx, ErroringArgs{ShouldPanic: true}, nil); err != nil { panic(err) } waitForNJobs(subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) } }
Output: Job errored with: this job errored Job panicked with: this job panicked
Example (GracefulShutdown) ¶
Example_gracefulShutdown demonstrates a realistic-looking stop loop for River. It listens for SIGINT/SIGTERM (like might be received by a Ctrl+C locally or on a platform like Heroku to stop a process) and when received, tries a soft stop that waits for work to finish. If it doesn't finish in time, a second SIGINT/SIGTERM will initiate a hard stop that cancels all jobs using context cancellation. A third will give up on the stop procedure and exit uncleanly.
package main import ( "context" "errors" "fmt" "log/slog" "os" "os/signal" "syscall" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type WaitsForCancelOnlyArgs struct{} func (WaitsForCancelOnlyArgs) Kind() string { return "waits_for_cancel_only" } // WaitsForCancelOnlyWorker is a worker that will never finish jobs until its // context is cancelled. type WaitsForCancelOnlyWorker struct { river.WorkerDefaults[WaitsForCancelOnlyArgs] jobStarted chan struct{} } func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[WaitsForCancelOnlyArgs]) error { fmt.Printf("Working job that doesn't finish until cancelled\n") close(w.jobStarted) <-ctx.Done() fmt.Printf("Job cancelled\n") // In the event of cancellation, an error should be returned so that the job // goes back in the retry queue. return ctx.Err() } // Example_gracefulShutdown demonstrates a realistic-looking stop loop for // River. It listens for SIGINT/SIGTERM (like might be received by a Ctrl+C // locally or on a platform like Heroku to stop a process) and when received, // tries a soft stop that waits for work to finish. If it doesn't finish in // time, a second SIGINT/SIGTERM will initiate a hard stop that cancels all jobs // using context cancellation. A third will give up on the stop procedure and // exit uncleanly. func main() { ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } jobStarted := make(chan struct{}) workers := river.NewWorkers() river.AddWorker(workers, &WaitsForCancelOnlyWorker{jobStarted: jobStarted}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } _, err = riverClient.Insert(ctx, WaitsForCancelOnlyArgs{}, nil) if err != nil { panic(err) } if err := riverClient.Start(ctx); err != nil { panic(err) } riverClientStopped := make(chan struct{}) sigintOrTerm := make(chan os.Signal, 1) signal.Notify(sigintOrTerm, syscall.SIGINT, syscall.SIGTERM) // This is meant to be a realistic-looking stop goroutine that might go in a // real program. It waits for SIGINT/SIGTERM and when received, tries to stop // gracefully by allowing a chance for jobs to finish. But if that isn't // working, a second SIGINT/SIGTERM will tell it to terminate with prejudice and // it'll issue a hard stop that cancels the context of all active jobs. In // case that doesn't work, a third SIGINT/SIGTERM ignores River's stop procedure // completely and exits uncleanly. go func() { defer close(riverClientStopped) <-sigintOrTerm fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n") softStopSucceeded := make(chan struct{}) go func() { if err := riverClient.Stop(ctx); err != nil { if !errors.Is(err, context.Canceled) { panic(err) } } close(softStopSucceeded) }() // Wait for soft stop to succeed, or another SIGINT/SIGTERM. select { case <-sigintOrTerm: fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n") case <-time.After(10 * time.Second): fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n") case <-softStopSucceeded: // Will never be reached in this example. return } hardStopSucceeded := make(chan struct{}) go func() { if err := riverClient.StopAndCancel(ctx); err != nil { if !errors.Is(err, context.Canceled) { panic(err) } } close(hardStopSucceeded) }() // As long as all jobs respect context cancellation, StopAndCancel will // always work. However, in the case of a bug where a job blocks despite // being cancelled, it may be necessary to either ignore River's stop // result (what's shown here) or have a supervisor kill the process. select { case <-sigintOrTerm: fmt.Printf("Received SIGINT/SIGTERM again; ignoring stop procedure and exiting unsafely\n") case <-time.After(10 * time.Second): fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n") case <-hardStopSucceeded: } }() // Make sure our job starts being worked before doing anything else. <-jobStarted // Cheat a little by sending a SIGTERM manually for the purpose of this // example (normally this will be sent by user or supervisory process). The // first SIGTERM tries a soft stop in which jobs are given a chance to // finish up. sigintOrTerm <- syscall.SIGTERM // The soft stop will never work in this example because our job only // respects context cancellation, but wait a short amount of time to give it // a chance. After it elapses, send another SIGTERM to initiate a hard stop. select { case <-riverClientStopped: // Will never be reached in this example because our job will only ever // finish on context cancellation. fmt.Printf("Soft stop succeeded\n") case <-time.After(100 * time.Millisecond): sigintOrTerm <- syscall.SIGTERM <-riverClientStopped } }
Output: Working job that doesn't finish until cancelled Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish) Received SIGINT/SIGTERM again; initiating hard stop (cancel everything) Job cancelled jobExecutor: Job failed
Example (InsertAndWork) ¶
Example_insertAndWork demonstrates how to register job workers, start a client, and insert a job on it to be worked.
package main import ( "context" "fmt" "log/slog" "sort" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type SortArgs struct { // Strings is a slice of strings to sort. Strings []string `json:"strings"` } func (SortArgs) Kind() string { return "sort" } type SortWorker struct { river.WorkerDefaults[SortArgs] } func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error { sort.Strings(job.Args.Strings) fmt.Printf("Sorted strings: %+v\n", job.Args.Strings) return nil } // Example_insertAndWork demonstrates how to register job workers, start a // client, and insert a job on it to be worked. func main() { ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, &SortWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } // Out of example scope, but used to make wait until a job is worked. subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) defer subscribeCancel() if err := riverClient.Start(ctx); err != nil { panic(err) } // Start a transaction to insert a job. It's also possible to insert a job // outside a transaction, but this usage is recommended to ensure that all // data a job needs to run is available by the time it starts. Because of // snapshot visibility guarantees across transactions, the job will not be // worked until the transaction has committed. tx, err := dbPool.Begin(ctx) if err != nil { panic(err) } defer tx.Rollback(ctx) _, err = riverClient.InsertTx(ctx, tx, SortArgs{ Strings: []string{ "whale", "tiger", "bear", }, }, nil) if err != nil { panic(err) } if err := tx.Commit(ctx); err != nil { panic(err) } waitForNJobs(subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) } }
Output: Sorted strings: [bear tiger whale]
Example (JobCancel) ¶
Example_jobCancel demonstrates how to permanently cancel a job from within Work using JobCancel.
package main import ( "context" "fmt" "log/slog" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type CancellingArgs struct { ShouldCancel bool } func (args CancellingArgs) Kind() string { return "Cancelling" } type CancellingWorker struct { river.WorkerDefaults[CancellingArgs] } func (w *CancellingWorker) Work(ctx context.Context, j *river.Job[CancellingArgs]) error { if j.Args.ShouldCancel { fmt.Println("cancelling job") return river.JobCancel(fmt.Errorf("this wrapped error message will be persisted to DB")) } return nil } // Example_jobCancel demonstrates how to permanently cancel a job from within // Work using JobCancel. func main() { //nolint:dupl ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, &CancellingWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 10}, }, Workers: workers, }) if err != nil { panic(err) } // Not strictly needed, but used to help this test wait until job is worked. subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCancelled) defer subscribeCancel() if err := riverClient.Start(ctx); err != nil { panic(err) } if _, err = riverClient.Insert(ctx, CancellingArgs{ShouldCancel: true}, nil); err != nil { panic(err) } waitForNJobs(subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) } }
Output: cancelling job
Example (JobSnooze) ¶
Example_jobSnooze demonstrates how to snooze a job from within Work using JobSnooze. The job will be run again after 5 minutes and the snooze attempt will increment the job's max attempts, ensuring that one can snooze as many times as desired.
package main import ( "context" "fmt" "log/slog" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type SnoozingArgs struct { ShouldSnooze bool } func (args SnoozingArgs) Kind() string { return "Snoozing" } type SnoozingWorker struct { river.WorkerDefaults[SnoozingArgs] } func (w *SnoozingWorker) Work(ctx context.Context, j *river.Job[SnoozingArgs]) error { if j.Args.ShouldSnooze { fmt.Println("snoozing job for 5 minutes") return river.JobSnooze(5 * time.Minute) } return nil } // Example_jobSnooze demonstrates how to snooze a job from within Work using // JobSnooze. The job will be run again after 5 minutes and the snooze attempt // will increment the job's max attempts, ensuring that one can snooze as many // times as desired. func main() { //nolint:dupl ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, &SnoozingWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 10}, }, Workers: workers, }) if err != nil { panic(err) } // The subscription bits are not needed in real usage, but are used to make // sure the test waits until the job is worked. subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobSnoozed) defer subscribeCancel() if err := riverClient.Start(ctx); err != nil { panic(err) } if _, err = riverClient.Insert(ctx, SnoozingArgs{ShouldSnooze: true}, nil); err != nil { panic(err) } waitForNJobs(subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) } }
Output: snoozing job for 5 minutes
Example (PeriodicJob) ¶
Example_periodicJob demonstrates the use of a periodic job.
package main import ( "context" "fmt" "log/slog" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type PeriodicJobArgs struct{} // Kind is the unique string name for this job. func (PeriodicJobArgs) Kind() string { return "periodic" } // PeriodicJobWorker is a job worker for sorting strings. type PeriodicJobWorker struct { river.WorkerDefaults[PeriodicJobArgs] } func (w *PeriodicJobWorker) Work(ctx context.Context, job *river.Job[PeriodicJobArgs]) error { fmt.Printf("This job will run once immediately then approximately once every 15 minutes\n") return nil } // Example_periodicJob demonstrates the use of a periodic job. func main() { ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, &PeriodicJobWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), PeriodicJobs: []*river.PeriodicJob{ river.NewPeriodicJob( river.PeriodicInterval(15*time.Minute), func() (river.JobArgs, *river.InsertOpts) { return PeriodicJobArgs{}, nil }, &river.PeriodicJobOpts{RunOnStart: true}, ), }, Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } // Out of example scope, but used to make wait until a job is worked. subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) defer subscribeCancel() // There's no need to explicitly insert a periodic job. One will be inserted // (and worked soon after) as the client starts up. if err := riverClient.Start(ctx); err != nil { panic(err) } waitForNJobs(subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) } }
Output: This job will run once immediately then approximately once every 15 minutes
Example (Subscription) ¶
Example_subscription demonstrates the use of client subscriptions to receive events containing information about worked jobs.
package main import ( "context" "fmt" "log/slog" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type SubscriptionArgs struct { Cancel bool `json:"cancel"` Fail bool `json:"fail"` } func (SubscriptionArgs) Kind() string { return "subscription" } type SubscriptionWorker struct { river.WorkerDefaults[SubscriptionArgs] } func (w *SubscriptionWorker) Work(ctx context.Context, job *river.Job[SubscriptionArgs]) error { switch { case job.Args.Cancel: return river.JobCancel(fmt.Errorf("cancelling job")) case job.Args.Fail: return fmt.Errorf("failing job") } return nil } // Example_subscription demonstrates the use of client subscriptions to receive // events containing information about worked jobs. func main() { ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, &SubscriptionWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: 9}), // Suppress logging so example output is cleaner (9 > slog.LevelError). Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } // Subscribers tell the River client the kinds of events they'd like to receive. completedChan, completedSubscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) defer completedSubscribeCancel() // Multiple simultaneous subscriptions are allowed. failedChan, failedSubscribeCancel := riverClient.Subscribe(river.EventKindJobFailed) defer failedSubscribeCancel() otherChan, otherSubscribeCancel := riverClient.Subscribe(river.EventKindJobCancelled, river.EventKindJobSnoozed) defer otherSubscribeCancel() if err := riverClient.Start(ctx); err != nil { panic(err) } // Insert one job for each subscription above: one to succeed, one to fail, // and one that's cancelled that'll arrive on the "other" channel. _, err = riverClient.Insert(ctx, SubscriptionArgs{}, nil) if err != nil { panic(err) } _, err = riverClient.Insert(ctx, SubscriptionArgs{Fail: true}, nil) if err != nil { panic(err) } _, err = riverClient.Insert(ctx, SubscriptionArgs{Cancel: true}, nil) if err != nil { panic(err) } waitForJob := func(subscribeChan <-chan *river.Event) { select { case event := <-subscribeChan: if event == nil { fmt.Printf("Channel is closed\n") return } fmt.Printf("Got job with state: %s\n", event.Job.State) case <-time.After(rivercommon.WaitTimeout()): panic("timed out waiting for job") } } waitForJob(completedChan) waitForJob(failedChan) waitForJob(otherChan) if err := riverClient.Stop(ctx); err != nil { panic(err) } fmt.Printf("Client stopped\n") // Try waiting again, but none of these work because stopping the client // closed all subscription channels automatically. waitForJob(completedChan) waitForJob(failedChan) waitForJob(otherChan) }
Output: Got job with state: completed Got job with state: retryable Got job with state: cancelled Client stopped Channel is closed Channel is closed Channel is closed
Example (UniqueJob) ¶
Example_uniqueJob demonstrates the use of a job with custom job-specific insertion options.
package main import ( "context" "fmt" "log/slog" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) // Account represents a minimal account including recent expenditures and a // remaining total. type Account struct { RecentExpenditures int AccountTotal int } // Map of account ID -> account. var allAccounts = map[int]Account{ //nolint:gochecknoglobals 1: {RecentExpenditures: 100, AccountTotal: 1_000}, 2: {RecentExpenditures: 999, AccountTotal: 1_000}, } type ReconcileAccountArgs struct { AccountID int `json:"account_id"` } func (ReconcileAccountArgs) Kind() string { return "reconcile_account" } // InsertOpts returns custom insert options that every job of this type will // inherit, including unique options. func (ReconcileAccountArgs) InsertOpts() river.InsertOpts { return river.InsertOpts{ UniqueOpts: river.UniqueOpts{ ByArgs: true, ByPeriod: 24 * time.Hour, }, } } type ReconcileAccountWorker struct { river.WorkerDefaults[ReconcileAccountArgs] } func (w *ReconcileAccountWorker) Work(ctx context.Context, job *river.Job[ReconcileAccountArgs]) error { account := allAccounts[job.Args.AccountID] account.AccountTotal -= account.RecentExpenditures account.RecentExpenditures = 0 fmt.Printf("Reconciled account %d; new total: %d\n", job.Args.AccountID, account.AccountTotal) return nil } // Example_uniqueJob demonstrates the use of a job with custom // job-specific insertion options. func main() { ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, &ReconcileAccountWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } // Out of example scope, but used to make wait until a job is worked. subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) defer subscribeCancel() if err := riverClient.Start(ctx); err != nil { panic(err) } // First job insertion for account 1. _, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil) if err != nil { panic(err) } // Job is inserted a second time, but it doesn't matter because its unique // args cause the insertion to be skipped because it's meant to only run // once per account per 24 hour period. _, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil) if err != nil { panic(err) } // Cheat a little by waiting for the first job to come back so we can // guarantee that this example's output comes out in order. waitForNJobs(subscribeChan, 1) // Because the job is unique ByArgs, another job for account 2 is allowed. _, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 2}, nil) if err != nil { panic(err) } waitForNJobs(subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) } }
Output: Reconciled account 1; new total: 900 Reconciled account 2; new total: 1
Example (WorkFunc) ¶
Example_workFunc demonstrates the use of river.WorkFunc, which can be used to easily add a worker with only a function instead of having to implement a full worker struct.
package main import ( "context" "fmt" "log/slog" "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) type WorkFuncArgs struct { Message string `json:"message"` } func (WorkFuncArgs) Kind() string { return "work_func" } // Example_workFunc demonstrates the use of river.WorkFunc, which can be used to // easily add a worker with only a function instead of having to implement a // full worker struct. func main() { ctx := context.Background() dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) if err != nil { panic(err) } defer dbPool.Close() // Required for the purpose of this test, but not necessary in real usage. if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { panic(err) } workers := river.NewWorkers() river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, j *river.Job[WorkFuncArgs]) error { fmt.Printf("Message: %s", j.Args.Message) return nil })) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), Queues: map[string]river.QueueConfig{ river.DefaultQueue: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } // Out of example scope, but used to make wait until a job is worked. subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) defer subscribeCancel() if err := riverClient.Start(ctx); err != nil { panic(err) } _, err = riverClient.Insert(ctx, WorkFuncArgs{ Message: "hello from a function!", }, nil) if err != nil { panic(err) } waitForNJobs(subscribeChan, 1) if err := riverClient.Stop(ctx); err != nil { panic(err) } }
Output: Message: hello from a function!
Index ¶
- Constants
- func AddWorker[T JobArgs](workers *Workers, worker Worker[T])
- func AddWorkerSafely[T JobArgs](workers *Workers, worker Worker[T]) error
- func JobCancel(err error) error
- func JobSnooze(duration time.Duration) error
- type AttemptError
- type Client
- func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts) (*JobRow, error)
- func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int64, error)
- func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) (int64, error)
- func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*JobRow, error)
- func (c *Client[TTx]) Start(ctx context.Context) error
- func (c *Client[TTx]) Stop(ctx context.Context) error
- func (c *Client[TTx]) StopAndCancel(ctx context.Context) error
- func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func())
- type ClientRetryPolicy
- type Config
- type DefaultClientRetryPolicy
- type ErrorHandler
- type ErrorHandlerResult
- type Event
- type EventKind
- type InsertManyParams
- type InsertOpts
- type Job
- type JobArgs
- type JobArgsWithInsertOpts
- type JobRow
- type JobState
- type JobStatistics
- type PeriodicJob
- type PeriodicJobConstructor
- type PeriodicJobOpts
- type PeriodicSchedule
- type QueueConfig
- type UniqueOpts
- type UnknownJobKindError
- type Worker
- type WorkerDefaults
- type Workers
Examples ¶
Constants ¶
const ( MaxQueueNumWorkers = 10_000 DefaultFetchCooldown = 100 * time.Millisecond MinFetchCooldown = 1 * time.Millisecond DefaultFetchPollInterval = 1 * time.Second MinFetchPollInterval = 1 * time.Millisecond DefaultJobTimeout = time.Minute DefaultMaxAttempts = rivercommon.DefaultMaxAttempts DefaultQueue = rivercommon.DefaultQueue DefaultPriority = rivercommon.DefaultPriority )
Variables ¶
This section is empty.
Functions ¶
func AddWorker ¶
AddWorker registers a Worker on the provided Workers bundle. Each Worker must be registered so that the Client knows it should handle a specific kind of job (as returned by its `Kind()` method).
Use by explicitly specifying a JobArgs type and then passing an instance of a worker for the same type:
river.AddWorker(workers, &SortWorker{})
Note that AddWorker can panic in some situations, such as if the worker is already registered or if its configuration is otherwise invalid. This default probably makes sense for most applications because you wouldn't want to start an application with invalid hardcoded runtime configuration. If you want to avoid panics, use AddWorkerSafely instead.
func AddWorkerSafely ¶
AddWorkerSafely registers a worker on the provided Workers bundle. Unlike AddWorker, AddWorkerSafely does not panic and instead returns an error if the worker is already registered or if its configuration is invalid.
Use by explicitly specifying a JobArgs type and then passing an instance of a worker for the same type:
river.AddWorkerSafely[SortArgs](workers, &SortWorker{}).
func JobCancel ¶
JobCancel wraps err and can be returned from a Worker's Work method to cancel the job at the end of execution. Regardless of whether or not the job has any remaining attempts, this will ensure the job does not execute again.
func JobSnooze ¶
JobSnooze can be returned from a Worker's Work method to cause the job to be tried again after the specified duration. This also has the effect of incrementing the job's MaxAttempts by 1, meaning that jobs can be repeatedly snoozed without ever being discarded.
Panics if duration is < 0.
Types ¶
type AttemptError ¶
type Client ¶
type Client[TTx any] struct { // contains filtered or unexported fields }
Client is a single isolated instance of River. Your application may use multiple instances operating on different databases or Postgres schemas within a single database.
func NewClient ¶
NewClient creates a new Client with the given database driver and configuration.
Currently only one driver is supported, which is Pgx v5. See package riverpgxv5.
The function takes a generic parameter TTx representing a transaction type, but it can be omitted because it'll generally always be inferred from the driver. For example:
import "github.com/riverqueue/river" import "github.com/riverqueue/river/riverdriver/riverpgxv5" ... dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL")) if err != nil { // handle error } defer dbPool.Close() riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ ... }) if err != nil { // handle error }
func (*Client[TTx]) Insert ¶
Insert inserts a new job with the provided args. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts, as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout.
jobRow, err := client.Insert(insertCtx, MyArgs{}, nil) if err != nil { // handle error }
func (*Client[TTx]) InsertMany ¶
InsertMany inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.
count, err := client.InsertMany(ctx, []river.InsertManyParams{ {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}}, }) if err != nil { // handle error }
func (*Client[TTx]) InsertManyTx ¶
func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) (int64, error)
InsertManyTx inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.
count, err := client.InsertManyTx(ctx, tx, []river.InsertManyParams{ {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}}, }) if err != nil { // handle error }
This variant lets a caller insert jobs atomically alongside other database changes. An inserted job isn't visible to be worked until the transaction commits, and if the transaction rolls back, so too is the inserted job.
func (*Client[TTx]) InsertTx ¶
func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*JobRow, error)
InsertTx inserts a new job with the provided args on the given transaction. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts, as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout.
jobRow, err := client.InsertTx(insertCtx, tx, MyArgs{}, nil) if err != nil { // handle error }
This variant lets a caller insert jobs atomically alongside other database changes. An inserted job isn't visible to be worked until the transaction commits, and if the transaction rolls back, so too is the inserted job.
func (*Client[TTx]) Start ¶
Start starts the client's job fetching and working loops. Once this is called, the client will run in a background goroutine until stopped. All jobs are run with a context inheriting from the provided context, but with a timeout deadline applied based on the job's settings.
A graceful shutdown stops fetching new jobs but allows any previously fetched jobs to complete. This can be initiated with the Stop method.
A more abrupt shutdown can be achieved by either cancelling the provided context or by calling StopAndCancel. This will not only stop fetching new jobs, but will also cancel the context for any currently-running jobs. If using StopAndCancel, there's no need to also call Stop.
func (*Client[TTx]) Stop ¶
Stop performs a graceful shutdown of the Client. It signals all producers to stop fetching new jobs and waits for any fetched or in-progress jobs to complete before exiting.
There's no need to call this method if a hard stop has already been initiated by cancelling the context passed to Start or by calling StopAndCancel.
func (*Client[TTx]) StopAndCancel ¶
StopAndCancel shuts down the client and cancels all work in progress. It is a more aggressive stop than Stop because the contexts for any in-progress jobs are cancelled. However, it still waits for jobs to complete before returning, even though their contexts are cancelled.
This can also be initiated by cancelling the context passed to Run. There is no need to call this method if the context passed to Run is cancelled instead.
func (*Client[TTx]) Subscribe ¶
Subscribe subscribes to the provided kinds of events that occur within the client, like EventKindJobCompleted for when a job completes.
Returns a channel over which to receive events along with a cancel function that can be used to cancel and tear down resources associated with the subscription. It's recommended but not necessary to invoke the cancel function. Resources will be freed when the client stops in case it's not.
The event channel is buffered and sends on it are non-blocking. Consumers must process events in a timely manner or it's possible for events to be dropped. Any slow operations performed in a response to a receipt (e.g. persisting to a database) should be made asynchronous to avoid event loss.
Callers must specify the kinds of events they're interested in. This allows for forward compatibility in case new kinds of events are added in future versions. If new event kinds are added, callers will have to explicitly add them to their requested list and ensure they can be handled correctly.
type ClientRetryPolicy ¶
type ClientRetryPolicy interface { // NextRetry calculates when the next retry for a failed job should take place // given when it was last attempted and its number of attempts, or any other // of the job's properties a user-configured retry policy might want to // consider. NextRetry(job *JobRow) time.Time }
ClientRetryPolicy is an interface that can be implemented to provide a retry policy for how River deals with failed jobs at the client level (when a worker does not define an override for `NextRetry`). Jobs are scheduled to be retried in the future up until they've reached the job's max attempts, at which pointed they're set as discarded.
The ClientRetryPolicy does not have access to generics and operates on the raw JobRow struct with encoded args.
type Config ¶
type Config struct { // AdvisoryLockPrefix is a configurable 32-bit prefix that River will use // when generating any key to acquire a Postgres advisory lock. All advisory // locks share the same 64-bit number space, so this allows a calling // application to guarantee that a River advisory lock will never conflict // with one of its own by cordoning each type to its own prefix. // // If this value isn't set, River defaults to generating key hashes across // the entire 64-bit advisory lock number space, which is large enough that // conflicts are exceedingly unlikely. If callers don't strictly need this // option then it's recommended to leave it unset because the prefix leaves // only 32 bits of number space for advisory lock hashes, so it makes // internally conflicting River-generated keys more likely. AdvisoryLockPrefix int32 // CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs // around before they're removed permanently. // // Defaults to 24 hours. CancelledJobRetentionPeriod time.Duration // CompletedJobRetentionPeriod is the amount of time to keep completed jobs // around before they're removed permanently. // // Defaults to 24 hours. CompletedJobRetentionPeriod time.Duration // DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs // around before they're removed permanently. // // Defaults to 7 days. DiscardedJobRetentionPeriod time.Duration // ErrorHandler can be configured to be invoked in case of an error or panic // occurring in a job. This is often useful for logging and exception // tracking, but can also be used to customize retry behavior. ErrorHandler ErrorHandler // FetchCooldown is the minimum amount of time to wait between fetches of new // jobs. Jobs will only be fetched *at most* this often, but if no new jobs // are coming in via LISTEN/NOTIFY then feches may be delayed as long as // FetchPollInterval. // // Throughput is limited by this value. // // Defaults to 100 ms. FetchCooldown time.Duration // FetchPollInterval is the amount of time between periodic fetches for new // jobs. Typically new jobs will be picked up ~immediately after insert via // LISTEN/NOTIFY, but this provides a fallback. // // Defaults to 1 second. FetchPollInterval time.Duration // JobTimeout is the maximum amount of time a job is allowed to run before its // context is cancelled. A timeout of zero means DefaultJobTimeout will be // used, whereas a value of -1 means the job's context will not be cancelled // unless the Client is shutting down. JobTimeout time.Duration // Logger is the structured logger to use for logging purposes. If none is // specified, logs will be emitted to STDOUT with messages at warn level // or higher. Logger *slog.Logger // PeriodicJobs are a set of periodic jobs to run at the specified intervals // in the client. PeriodicJobs []*PeriodicJob // Queues is a list of queue names for this client to operate on along with // configuration for the queue like the maximum number of workers to run for // each queue. // // This field may be omitted for a program that's only queueing jobs rather // than working them. If it's specified, then Workers must also be given. Queues map[string]QueueConfig // ReindexerSchedule is the schedule for running the reindexer. If nil, the // reindexer will run at midnight UTC every day. ReindexerSchedule PeriodicSchedule // RescueStuckJobsAfter is the amount of time a job can be running before it // is considered stuck. A stuck job which has not yet reached its max attempts // will be scheduled for a retry, while one which has exhausted its attempts // will be discarded. This prevents jobs from being stuck forever if a worker // crashes or is killed. // // Note that this can result in repeat or duplicate execution of a job that is // not actually stuck but is still working. The value should be set higher // than the maximum duration you expect your jobs to run. Setting a value too // low will result in more duplicate executions, whereas too high of a value // will result in jobs being stuck for longer than necessary before they are // retried. // // RescueStuckJobsAfter must be greater than JobTimeout. Otherwise, jobs // would become eligible for rescue while they're still running. // // Defaults to 1 hour, or in cases where JobTimeout has been configured and // is greater than 1 hour, JobTimeout + 1 hour. RescueStuckJobsAfter time.Duration // RetryPolicy is a configurable retry policy for the client. // // Defaults to DefaultRetryPolicy. RetryPolicy ClientRetryPolicy // Workers is a bundle of registered job workers. // // This field may be omitted for a program that's only enqueueing jobs // rather than working them, but if it is configured the client can validate // ahead of time that a worker is properly registered for an inserted job. // (i.e. That it wasn't forgotten by accident.) Workers *Workers // contains filtered or unexported fields }
Config is the configuration for a Client.
type DefaultClientRetryPolicy ¶
type DefaultClientRetryPolicy struct {
// contains filtered or unexported fields
}
River's default retry policy.
func (*DefaultClientRetryPolicy) NextRetry ¶
func (p *DefaultClientRetryPolicy) NextRetry(job *JobRow) time.Time
NextRetry gets the next retry given for the given job, accounting for when it was last attempted and what attempt number that was. Reschedules using a basic exponential backoff of `ATTEMPT^4`, so after the first failure a new try will be scheduled in 1 seconds, 16 seconds after the second, 1 minute and 21 seconds after the third, etc.
In order to avoid penalizing jobs that are snoozed, the number of errors is used instead of the attempt count. This means that snoozing a job (even repeatedly) will not lead to a future error having a longer than expected retry delay.
type ErrorHandler ¶
type ErrorHandler interface { // HandleError is invoked in case of an error occurring in a job. // // Context is descended from the one used to start the River client that // worked the job. HandleError(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult // HandlePanic is invoked in case of a panic occurring in a job. // // Context is descended from the one used to start the River client that // worked the job. HandlePanic(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult }
ErrorHandler provides an interface that will be invoked in case of an error or panic occurring in the job. This is often useful for logging and exception tracking, but can also be used to customize retry behavior.
type ErrorHandlerResult ¶
type ErrorHandlerResult struct { // SetCancelled can be set to true to fail the job immediately and // permanently. By default it'll continue to follow the configured retry // schedule. SetCancelled bool }
type Event ¶
type Event struct { // Kind is the kind of event. Receivers should read this field and respond // accordingly. Subscriptions will only receive event kinds that they // requested when creating a subscription with Subscribe. Kind EventKind // Job contains job-related information. Job *JobRow // JobStats are statistics about the run of a job. JobStats *JobStatistics }
Event wraps an event that occurred within a River client, like a job being completed.
type EventKind ¶
type EventKind string
EventKind is a kind of event to subscribe to from a client.
const ( // EventKindJobCancelled occurs when a job is cancelled. EventKindJobCancelled EventKind = "job_cancelled" // EventKindJobCompleted occurs when a job is completed. EventKindJobCompleted EventKind = "job_completed" // EventKindJobFailed occurs when a job fails. Occurs both when a job fails // and will be retried and when a job fails for the last time and will be // discarded. Callers can use job fields like `Attempt` and `State` to // differentiate each type of occurrence. EventKindJobFailed EventKind = "job_failed" // EventKindJobSnoozed occurs when a job is snoozed. EventKindJobSnoozed EventKind = "job_snoozed" )
type InsertManyParams ¶
type InsertManyParams struct { // Args are the arguments of the job to insert. Args JobArgs // InsertOpts are insertion options for this job. InsertOpts *InsertOpts }
InsertManyParams encapsulates a single job combined with insert options for use with batch insertion.
type InsertOpts ¶
type InsertOpts struct { // MaxAttempts is the maximum number of total attempts (including both the // original run and all retries) before a job is abandoned and set as // discarded. MaxAttempts int // Priority is the priority of the job, with 1 being the highest priority and // 4 being the lowest. When fetching available jobs to work, the highest // priority jobs will always be fetched before any lower priority jobs are // fetched. Note that if your workers are swamped with more high-priority jobs // then they can handle, lower priority jobs may not be fetched. // // Defaults to DefaultPriority. Priority int // Queue is the name of the job queue in which to insert the job. // // Defaults to DefaultQueue. Queue string // ScheduledAt is a time in future at which to schedule the job (i.e. in // cases where it shouldn't be run immediately). The job is guaranteed not // to run before this time, but may run slightly after depending on the // number of other scheduled jobs and how busy the queue is. // // Use of this option generally only makes sense when passing options into // Insert rather than when a job args struct is implementing // JobArgsWithInsertOpts, however, it will work in both cases. ScheduledAt time.Time // Tags are an arbitrary list of keywords to add to the job. They have no // functional behavior and are meant entirely as a user-specified construct // to help group and categorize jobs. // // If tags are specified from both a job args override and from options on // Insert, the latter takes precedence. Tags are not merged. Tags []string // UniqueOpts returns options relating to job uniqueness. An empty struct // avoids setting any worker-level unique options. UniqueOpts UniqueOpts }
InsertOpts are optional settings for a new job which can be provided at job insertion time. These will override any default InsertOpts settings provided by JobArgsWithInsertOpts, as well as any global defaults.
type Job ¶
Job represents a single unit of work, holding both the arguments and information for a job with args of type T.
func JobCompleteTx ¶
func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs]) (*Job[TArgs], error)
JobCompleteTx marks the job as completed as part of transaction tx. If tx is rolled back, the completion will be as well.
The function needs to know the type of the River database driver, which is the same as the one in use by Client, but the other generic parameters can be inferred. An invocation should generally look like:
_, err := river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job) if err != nil { // handle error }
Returns the updated, completed job.
type JobArgs ¶
type JobArgs interface { // Kind is a string that uniquely identifies the type of job. This must be // provided on your job arguments struct. Kind() string }
JobArgs is an interface that represents the arguments for a job of type T. These arguments are serialized into JSON and stored in the database.
type JobArgsWithInsertOpts ¶
type JobArgsWithInsertOpts interface { // InsertOpts returns options for all jobs of this job type, overriding any // system defaults. These can also be overridden at insertion time. InsertOpts() InsertOpts }
JobArgsWithInsertOpts is an extra interface that a job may implement on top of JobArgs to provide insertion-time options for all jobs of this type.
type JobRow ¶
type JobRow struct { // ID of the job. Generated as part of a Postgres sequence and generally // ascending in nature, but there may be gaps in it as transactions roll // back. ID int64 // Attempt is the attempt number of the job. Jobs are inserted at 0, the // number is incremented to 1 the first time work its worked, and may // increment further if it's either snoozed or errors. Attempt int // AttemptedAt is the time that the job was last worked. Starts out as `nil` // on a new insert. AttemptedAt *time.Time // AttemptedBy is the set of worker IDs that have worked this job. A worker // ID differs between different programs, but is shared by all executors // within any given one. (i.e. Different Go processes have different IDs, // but IDs are shared within any given process.) A process generates a new // ULID (an ordered UUID) worker ID when it starts up. AttemptedBy []string // CreatedAt is when the job record was created. CreatedAt time.Time // EncodedArgs is the job's JobArgs encoded as JSON. EncodedArgs []byte // Errors is a set of errors that occurred when the job was worked, one for // each attempt. Ordered from earliest error to the latest error. Errors []AttemptError // FinalizedAt is the time at which the job was "finalized", meaning it was // either completed successfully or errored for the last time such that // it'll no longer be retried. FinalizedAt *time.Time // Kind uniquely identifies the type of job and instructs which worker // should work it. It is set at insertion time via `Kind()` on the // `JobArgs`. Kind string // MaxAttempts is the maximum number of attempts that the job will be tried // before it errors for the last time and will no longer be worked. // // Extracted (in order of precedence) from job-specific InsertOpts // on Insert, from the worker level InsertOpts from JobArgsWithInsertOpts, // or from a client's default value. MaxAttempts int // Priority is the priority of the job, with 1 being the highest priority and // 4 being the lowest. When fetching available jobs to work, the highest // priority jobs will always be fetched before any lower priority jobs are // fetched. Note that if your workers are swamped with more high-priority jobs // then they can handle, lower priority jobs may not be fetched. Priority int // Queue is the name of the queue where the job will be worked. Queues can // be configured independently and be used to isolate jobs. // // Extracted from either specific InsertOpts on Insert, or InsertOpts from // JobArgsWithInsertOpts, or a client's default value. Queue string // ScheduledAt is when the job is scheduled to become available to be // worked. Jobs default to running immediately, but may be scheduled // for the future when they're inserted. They may also be scheduled for // later because they were snoozed or because they errored and have // additional retry attempts remaining. ScheduledAt time.Time // State is the state of job like `available` or `completed`. Jobs are // `available` when they're first inserted. State JobState // Tags are an arbitrary list of keywords to add to the job. They have no // functional behavior and are meant entirely as a user-specified construct // to help group and categorize jobs. Tags []string // contains filtered or unexported fields }
JobRow contains the properties of a job that are persisted to the database. Use of `Job[T]` will generally be preferred in user-facing code like worker interfaces.
type JobState ¶
type JobState string
const ( JobStateAvailable JobState = JobState(dbsqlc.JobStateAvailable) JobStateCancelled JobState = JobState(dbsqlc.JobStateCancelled) JobStateCompleted JobState = JobState(dbsqlc.JobStateCompleted) JobStateDiscarded JobState = JobState(dbsqlc.JobStateDiscarded) JobStateRetryable JobState = JobState(dbsqlc.JobStateRetryable) JobStateRunning JobState = JobState(dbsqlc.JobStateRunning) JobStateScheduled JobState = JobState(dbsqlc.JobStateScheduled) )
type JobStatistics ¶
type JobStatistics struct { CompleteDuration time.Duration // Time it took to set the job completed, discarded, or errored. QueueWaitDuration time.Duration // Time the job spent waiting in available state before starting execution. RunDuration time.Duration // Time job spent running (measured around job worker.) }
JobStatistics contains information about a single execution of a job.
type PeriodicJob ¶
type PeriodicJob struct {
// contains filtered or unexported fields
}
PeriodicJob is a configuration for a periodic job.
func NewPeriodicJob ¶
func NewPeriodicJob(scheduleFunc PeriodicSchedule, constructorFunc PeriodicJobConstructor, opts *PeriodicJobOpts) *PeriodicJob
NewPeriodicJob returns a new PeriodicJob given a schedule and a constructor function.
The schedule returns a time until the next time the periodic job should run. The helper PeriodicInterval is available for jobs that should run on simple, fixed intervals (e.g. every 15 minutes), and a custom schedule or third party cron package can be used for more complex scheduling (see the cron example). The constructor function is invoked each time a periodic job's schedule elapses, returning job arguments to insert along with optional insertion options.
The periodic job scheduler is approximate and doesn't guarantee strong durability. It's started by the elected leader in a River cluster, and each periodic job is assigned an initial run time when that occurs. New run times are scheduled each time a job's target run time is reached and a new job inserted. However, each scheduler only retains in-memory state, so anytime a process quits or a new leader is elected, the whole process starts over without regard for the state of the last scheduler. The RunOnStart option can be used as a hedge to make sure that jobs with long run durations are guaranteed to occasionally run.
type PeriodicJobConstructor ¶
type PeriodicJobConstructor func() (JobArgs, *InsertOpts)
PeriodicJobConstructor is a function that gets called each time the paired PeriodicSchedule is triggered.
A constructor must never block. It may return nil to indicate that no job should be inserted.
type PeriodicJobOpts ¶
type PeriodicJobOpts struct { // RunOnStart can be used to indicate that a periodic job should insert an // initial job as a new scheduler is started. This can be used as a hedge // for jobs with longer scheduled durations that may not get to expiry // before a new scheduler is elected. RunOnStart bool }
PeriodicJobOpts are options for a periodic job.
type PeriodicSchedule ¶
type PeriodicSchedule interface { // Next returns the next time at which the job should be run given the // current time. Next(time.Time) time.Time }
PeriodicSchedule is a schedule for a periodic job. Periodic jobs should generally have an interval of at least 1 minute, and never less than one second.
func PeriodicInterval ¶
func PeriodicInterval(interval time.Duration) PeriodicSchedule
PeriodicInterval returns a simple PeriodicSchedule that runs at the given interval.
type QueueConfig ¶
type QueueConfig struct { // MaxWorkers is the maximum number of workers to run for the queue, or put // otherwise, the maximum parallelism to run. // // This is the maximum number of workers within this particular client // instance, but note that it doesn't control the total number of workers // across parallel processes. Installations will want to calculate their // total number by multiplying this number by the number of parallel nodes // running River clients configured to the same database and queue. // // Requires a minimum of 1, and a maximum of 10,000. MaxWorkers int }
QueueConfig contains queue-specific configuration.
type UniqueOpts ¶
type UniqueOpts struct { // ByArgs indicates that uniqueness should be enforced for any specific // instance of encoded args for a job. // // Default is false, meaning that as long as any other unique property is // enabled, uniqueness will be enforced for a kind regardless of input args. ByArgs bool // ByPeriod defines uniqueness within a given period. On an insert time is // rounded down to the nearest multiple of the given period, and a job is // only inserted if there isn't an existing job that will run between then // and the next multiple of the period. // // Default is no unique period, meaning that as long as any other unique // property is enabled, uniqueness will be enforced across all jobs of the // kind in the database, regardless of when they were scheduled. ByPeriod time.Duration // ByQueue indicates that uniqueness should be enforced within each queue. // // Default is false, meaning that as long as any other unique property is // enabled, uniqueness will be enforced for a kind across all queues. ByQueue bool // ByState indicates that uniqueness should be enforced across any of the // states in the given set. For example, if the given states were // `(scheduled, running)` then a new job inserted as `scheduled` would be // not be inserted by virtue of it being not unique, but a new job marked as // `available` could be inserted. // // Unlike other unique options, ByState gets a default when it's not set for // user convenience. The default is equivalent to: // // ByState: []river.JobState{river.JobStateAvailable, river.JobStateCompleted, river.JobStateRunning, river.JobStateRetryable, river.JobStateScheduled} // // With this setting, any jobs of the same kind that have been completed or // discarded, but not yet cleaned out by the system, won't count towards the // uniqueness of a new insert. ByState []JobState }
UniqueOpts contains parameters for uniqueness for a job.
When the options struct is uninitialized (its zero value) no uniqueness at is enforced. As each property is initialized, it's added as a dimension on the uniqueness matrix, and with any property on, the job's kind always counts toward uniqueness.
So for example, if only ByQueue is on, then for the given job kind, only a single instance is allowed in any given queue, regardless of other properties on the job. If both ByArgs and ByQueue are on, then for the given job kind, a single instance is allowed for each combination of args and queues. If either args or queue is changed on a new job, it's allowed to be inserted as a new job.
type UnknownJobKindError ¶
type UnknownJobKindError struct { // Kind is the string that was returned by the JobArgs Kind method. Kind string }
UnknownJobKindError is returned when a Client fetches and attempts to work a job that has not been registered on the Client's Workers bundle (using AddWorker).
func (*UnknownJobKindError) Error ¶
func (e *UnknownJobKindError) Error() string
Error returns the error string.
func (*UnknownJobKindError) Is ¶
func (e *UnknownJobKindError) Is(target error) bool
Is implements the interface used by errors.Is to determine if errors are equivalent. It returns true for any other UnknownJobKindError without regard to the Kind string so it is possible to detect this type of error with:
errors.Is(err, &UnknownJobKindError{})
type Worker ¶
type Worker[T JobArgs] interface { // NextRetry calculates when the next retry for a failed job should take // place given when it was last attempted and its number of attempts, or any // other of the job's properties a user-configured retry policy might want // to consider. // // Note that this method on a worker overrides any client-level retry policy. // To use the client-level retry policy, return an empty `time.Time{}` or // include WorkerDefaults to do this for you. NextRetry(job *Job[T]) time.Time // Timeout is the maximum amount of time the job is allowed to run before // its context is cancelled. A timeout of zero (the default) means the job // will inherit the Client-level timeout. A timeout of -1 means the job's // context will never time out. Timeout(job *Job[T]) time.Duration // Work performs the job and returns an error if the job failed. The context // will be configured with a timeout according to the worker settings and may // be cancelled for other reasons. // // If no error is returned, the job is assumed to have succeeded and will be // marked completed. // // It is important for any worker to respect context cancellation to enable // the client to respond to shutdown requests; there is no way to cancel a // running job that does not respect context cancellation, other than // terminating the process. Work(ctx context.Context, job *Job[T]) error }
Worker is an interface that can perform a job with args of type T. A typical Worker implementation will be a struct that embeds WorkerDefaults, implements `Kind()` and `Work()`, and optionally overrides other methods to provide job-specific configuration for all jobs of that type:
type SleepArgs struct { Duration time.Duration `json:"duration"` } func (SleepArgs) Kind() string { return "sleep" } type SleepWorker struct { WorkerDefaults[SleepArgs] } func (w *SleepWorker) Work(ctx context.Context, job *Job[SleepArgs]) error { select { case <-ctx.Done(): return ctx.Err() case <-time.After(job.Args.Duration): return nil } }
In addition to fulfilling the Worker interface, workers must be registered with the client using the AddWorker function.
func WorkFunc ¶
WorkFunc wraps a function to implement the Worker interface. A job args struct implementing JobArgs will still be required to specify a Kind.
For example:
river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, j *river.Job[WorkFuncArgs]) error { fmt.Printf("Message: %s", j.Args.Message) return nil }))
type WorkerDefaults ¶
type WorkerDefaults[T JobArgs] struct{}
WorkerDefaults is an empty struct that can be embedded in your worker struct to make it fulfill the Worker interface with default values.
type Workers ¶
type Workers struct {
// contains filtered or unexported fields
}
Workers is a list of available job workers. A Worker must be registered for each type of Job to be handled.
Use the top-level AddWorker function combined with a Workers to register a worker.
func NewWorkers ¶
func NewWorkers() *Workers
NewWorkers initializes a new registry of available job workers.
Use the top-level AddWorker function combined with a Workers registry to register each available worker.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
internal
|
|
baseservice
Package baseservice contains structs and initialization functions for "service-like" objects that provide commonly needed facilities so that they don't have to be redefined on every struct.
|
Package baseservice contains structs and initialization functions for "service-like" objects that provide commonly needed facilities so that they don't have to be redefined on every struct. |
riverinternaltest
Package riverinternaltest contains shared testing utilities for tests throughout the rest of the project.
|
Package riverinternaltest contains shared testing utilities for tests throughout the rest of the project. |
rivertest
Package rivertest contains shared testing utilities for tests throughout the rest of the project.
|
Package rivertest contains shared testing utilities for tests throughout the rest of the project. |
util/maputil
Package maputil contains helpers related to maps, usually ones that are generic-related.
|
Package maputil contains helpers related to maps, usually ones that are generic-related. |
util/sliceutil
Package sliceutil contains helpers related to slices, usually ones that are generic-related, and are broadly useful, but which the Go core team, in its infinite wisdom, has decided are too much power for the unwashed mashes, and therefore omitted from the utilities in `slices`.
|
Package sliceutil contains helpers related to slices, usually ones that are generic-related, and are broadly useful, but which the Go core team, in its infinite wisdom, has decided are too much power for the unwashed mashes, and therefore omitted from the utilities in `slices`. |
Package riverdriver exposes generic constructs to be implemented by specific drivers that wrap third party database packages, with the aim being to keep the main River interface decoupled from a specific database package so that other packages or other major versions of packages can be supported in future River versions.
|
Package riverdriver exposes generic constructs to be implemented by specific drivers that wrap third party database packages, with the aim being to keep the main River interface decoupled from a specific database package so that other packages or other major versions of packages can be supported in future River versions. |
riverdatabasesql
Module
|
|
riverpgxv5
Module
|
|
rivershared
module
|
|
Package rivertest contains test assertions that can be used in a project's tests to verify that certain actions occurred from the main river package.
|
Package rivertest contains test assertions that can be used in a project's tests to verify that certain actions occurred from the main river package. |
rivertype
module
|