Documentation ¶
Overview ¶
package riverqueue is an insert-only wrapper for the river client
Index ¶
- Variables
- func Healthcheck(client *Client) func(ctx context.Context) error
- func RunMigrations(ctx context.Context, dbPool *pgxpool.Pool) error
- type Client
- func (c *Client) Close() error
- func (c *Client) GetPool() *pgxpool.Pool
- func (c *Client) GetRiverClient() *river.Client[pgx.Tx]
- func (c *Client) Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
- func (c *Client) InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
- func (c *Client) InsertManyFast(ctx context.Context, params []river.InsertManyParams) (int, error)
- func (c *Client) InsertManyFastTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) (int, error)
- func (c *Client) InsertManyTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
- func (c *Client) InsertTx(ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
- func (c *Client) JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error)
- func (c *Client) JobCancelTx(ctx context.Context, tx pgx.Tx, jobID int64) (*rivertype.JobRow, error)
- func (c *Client) TruncateRiverTables(ctx context.Context) error
- type Config
- type JobClient
- type Option
- func WithConnectionURI(uri string) Option
- func WithJobTimeout(jobTimeout time.Duration) Option
- func WithLogger(l *slog.Logger) Option
- func WithMaxRetries(maxRetries int) Option
- func WithQueues(q map[string]river.QueueConfig) Option
- func WithRiverConfig(conf river.Config) Option
- func WithRunMigrations(runMigrations bool) Option
- func WithWorkers(workers *river.Workers) Option
Constants ¶
This section is empty.
Variables ¶
var ErrConnectionURIRequired = errors.New("connection URI is required to initialize the client")
Functions ¶
func Healthcheck ¶ added in v0.0.4
Healthcheck pings the DB to check if the connection is working
func RunMigrations ¶
RunMigrations runs the migrations for the river server see https://riverqueue.com/docs/migrations for more information
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a river Client that implements the JobClient interface
func (*Client) GetRiverClient ¶
GetRiverClient returns the underlying river client
func (*Client) Insert ¶
func (c *Client) Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
Insert satisfies the JobClient interface and inserts a new job with the provided args and opts it will start a new transaction and insert the job
func (*Client) InsertMany ¶
func (c *Client) InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
InsertMany satisfies the JobClient interface and inserts many jobs at once it will start a new transaction and insert the jobs
func (*Client) InsertManyFast ¶
InsertManyFast satisfies the JobClient interface and inserts many jobs at once using Postgres' `COPY FROM` mechanism it will start a new transaction and insert the jobs and commit the transaction after the insert
func (*Client) InsertManyFastTx ¶
func (c *Client) InsertManyFastTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) (int, error)
InsertManyFastTx satisfies the JobClient interface
func (*Client) InsertManyTx ¶
func (c *Client) InsertManyTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
InsertManyTx satisfies the JobClient interface
func (*Client) InsertTx ¶
func (c *Client) InsertTx(ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
InsertTx satisfies the JobClient interface
func (*Client) JobCancel ¶
JobCancel satisfies the JobClient interface and cancels the job with the given ID it will start a new transaction and cancel the job and commit the transaction
type Config ¶
type Config struct { // ConnectionURI is the connection URI for the database ConnectionURI string `koanf:"connectionURI" json:"connectionURI" default:"postgres://postgres:password@0.0.0.0:5432/jobs?sslmode=disable"` // RunMigrations is a flag to determine if migrations should be run RunMigrations bool `koanf:"runMigrations" json:"runMigrations" default:"false"` // RiverConf is the river configuration RiverConf river.Config `koanf:"riverConf" json:"riverConf"` }
Config settings for the river client
type JobClient ¶
type JobClient interface { // InsertMany inserts many jobs at once. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided // by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout. InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) // InsertManyTx inserts many jobs at once. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided // by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout. InsertManyTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) // Insert inserts a new job with the provided args. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts, // as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout. Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error) // InsertTx inserts a new job with the provided args on the given transaction. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts, // as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout. InsertTx(ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error) // InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple, // which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. // The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout. InsertManyFast(ctx context.Context, params []river.InsertManyParams) (int, error) // InsertManyTx inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple, // which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. // The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout. InsertManyFastTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) (int, error) // JobCancel cancels the job with the given ID. If possible, the job is cancelled immediately and will not be retried. // The provided context is used for the underlying Postgres update and can be used to cancel the operation or apply a timeout. JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error) // JobCancelTx cancels the job with the given ID within the specified transaction. This variant lets a caller cancel a job atomically alongside other database changes. // A cancelled job doesn't take effect until the transaction commits, and if the transaction rolls back, so too is the cancelled job. JobCancelTx(ctx context.Context, tx pgx.Tx, jobID int64) (*rivertype.JobRow, error) // GetPool returns the underlying pgx pool GetPool() *pgxpool.Pool // TruncateRiverTables truncates River tables in the target database. This is for test cleanup and should obviously only be used in tests. TruncateRiverTables(ctx context.Context) error // GetRiverClient returns the underlying river client // this can be used to interact directly with the river client for more advanced use cases (e.g. starting the river server) // which are outside the scope of the insert-only client interface GetRiverClient() *river.Client[pgx.Tx] // Close closes the underlying pgx pool Close() error }
JobClient is an interface for the river client to insert jobs this interface is only used for inserting new jobs and will not contain any other methods
type Option ¶
type Option func(*Client)
Option is a function that configures a client
func WithConnectionURI ¶
WithConnectionURI sets the connection URI for the client
func WithJobTimeout ¶
WithJobTimeout sets the job timeout for the client
func WithMaxRetries ¶
WithMaxRetries sets the maximum number of retries for the client
func WithQueues ¶
func WithQueues(q map[string]river.QueueConfig) Option
WithQueues sets the queues for the client this should be omitted when creating an insert only client
func WithRiverConfig ¶
WithRiverConfig sets the entire river configuration for the client prefer using the other options when possible
func WithRunMigrations ¶
WithRunMigrations sets the run migrations flag for the client
func WithWorkers ¶
WithWorkers sets the workers for the client this should be omitted when creating an insert only client