Riverboat
Riverboat is the job queue used in openlane based on the
riverqueue project.
Usage
Jobs can be inserted into the job queue either from this server directly, or
from any codebase with an
Insert Only river client. All
jobs will be processed via the riverboat
server. Since jobs are committed to
the database within a transaction, and stored in the database we do not have to
worry about dropped events.
Getting Started
This repo includes several Taskfiles to assist with
getting things running.
Dependencies
- Go 1.23+
- Docker (used for running Postgres and the river-ui)
- task
Starting the Server
The following will start up postgres, the river-ui, and the riverboat server:
task run-dev
Test Jobs
Included in the test/
directory are test jobs corresponding to the job types
in pkg/jobs
.
-
Start the riverboat
server using task run-dev
-
Run the test main, for example the email
:
go run test/email/main.go
-
This should insert the job successfully, it should be processed by river
and the email should be added to fixtures/email
Adding New Jobs
-
New jobs should be added to the pkg/jobs
directory in a new file, refer to
the upstream docs for
implementation details. The following is a stem job that could be copied to
get you started.
package jobs
import (
"context"
"github.com/riverqueue/river"
"github.com/rs/zerolog/log"
)
// ExampleArgs for the example worker to process the job
type ExampleArgs struct {
// ExampleArg is an example argument
ExampleArg string `json:"example_arg"`
}
// Kind satisfies the river.Job interface
func (ExampleArgs) Kind() string { return "example" }
// ExampleWorker does all sorts of neat stuff
type ExampleWorker struct {
river.WorkerDefaults[ExampleArgs]
ExampleConfig
}
// ExampleConfig contains the configuration for the example worker
type ExampleConfig struct {
// DevMode is a flag to enable dev mode so we don't actually send millions of carrier pigeons
DevMode bool `koanf:"devMode" json:"devMode" jsonschema:"description=enable dev mode" default:"true"`
}
// Work satisfies the river.Worker interface for the example worker
func (w *ExampleConfig) Work(ctx context.Context, job *river.Job[ExampleArgs]) error {
// do some work
return nil
}
-
Add a test for the new job, see email_test.go
as an example. There are
additional helper functions that can be used, see
river test helpers for details.
-
If there are configuration settings, add the worker to pkg/river/config.go
Workers
struct, this will allow the config variables to be set via the
koanf
config setup. Once added you will need to regenerate the config:
task config:generate
-
Register the worker by adding the river.AddWorkerSafely
to the
pkg/river/workers.go
createWorkers
function.
-
Add a test
job to test/
directory by creating a new directory with a
main.go
function that will insert the job into the queue.
Contributing
See the contributing guide for more information.