riverqueue

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2024 License: Apache-2.0 Imports: 12 Imported by: 2

Documentation

Overview

package riverqueue is an insert-only wrapper for the river client

Index

Constants

This section is empty.

Variables

View Source
var ErrConnectionURIRequired = errors.New("connection URI is required to initialize the client")

Functions

func Healthcheck added in v0.0.4

func Healthcheck(client *Client) func(ctx context.Context) error

Healthcheck pings the DB to check if the connection is working

func RunMigrations

func RunMigrations(ctx context.Context, dbPool *pgxpool.Pool) error

RunMigrations runs the migrations for the river server see https://riverqueue.com/docs/migrations for more information

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a river Client that implements the JobClient interface

func New

func New(ctx context.Context, opts ...Option) (c *Client, err error)

New creates a new river client with the options provided

func (*Client) Close added in v0.0.4

func (c *Client) Close() error

Close satisfies the JobClient interface

func (*Client) GetPool

func (c *Client) GetPool() *pgxpool.Pool

GetPool returns the underlying pgx pool

func (*Client) GetRiverClient

func (c *Client) GetRiverClient() *river.Client[pgx.Tx]

GetRiverClient returns the underlying river client

func (*Client) Insert

Insert satisfies the JobClient interface and inserts a new job with the provided args and opts it will start a new transaction and insert the job

func (*Client) InsertMany

func (c *Client) InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)

InsertMany satisfies the JobClient interface and inserts many jobs at once it will start a new transaction and insert the jobs

func (*Client) InsertManyFast

func (c *Client) InsertManyFast(ctx context.Context, params []river.InsertManyParams) (int, error)

InsertManyFast satisfies the JobClient interface and inserts many jobs at once using Postgres' `COPY FROM` mechanism it will start a new transaction and insert the jobs and commit the transaction after the insert

func (*Client) InsertManyFastTx

func (c *Client) InsertManyFastTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) (int, error)

InsertManyFastTx satisfies the JobClient interface

func (*Client) InsertManyTx

func (c *Client) InsertManyTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)

InsertManyTx satisfies the JobClient interface

func (*Client) InsertTx

func (c *Client) InsertTx(ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)

InsertTx satisfies the JobClient interface

func (*Client) JobCancel

func (c *Client) JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error)

JobCancel satisfies the JobClient interface and cancels the job with the given ID it will start a new transaction and cancel the job and commit the transaction

func (*Client) JobCancelTx

func (c *Client) JobCancelTx(ctx context.Context, tx pgx.Tx, jobID int64) (*rivertype.JobRow, error)

JobCancelTx satisfies the JobClient interface

func (*Client) TruncateRiverTables

func (c *Client) TruncateRiverTables(ctx context.Context) error

TruncateRiverTables truncates River tables in the target database. This is for test cleanup and should obviously only be used in tests.

type Config

type Config struct {
	// ConnectionURI is the connection URI for the database
	ConnectionURI string `koanf:"connectionURI" json:"connectionURI" default:"postgres://postgres:password@0.0.0.0:5432/jobs?sslmode=disable"`
	// RunMigrations is a flag to determine if migrations should be run
	RunMigrations bool `koanf:"runMigrations" json:"runMigrations" default:"false"`
	// RiverConf is the river configuration
	RiverConf river.Config `koanf:"riverConf" json:"riverConf"`
}

Config settings for the river client

type JobClient

type JobClient interface {
	// InsertMany inserts many jobs at once. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided
	// by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.
	InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
	// InsertManyTx inserts many jobs at once. Each job is inserted as an InsertManyParams tuple, which takes job args along with an optional set of insert options, which override insert options provided
	// by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.
	InsertManyTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
	// Insert inserts a new job with the provided args. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts,
	// as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout.
	Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
	// InsertTx inserts a new job with the provided args on the given transaction. Job opts can be used to override any defaults that may have been provided by an implementation of JobArgsWithInsertOpts.InsertOpts,
	// as well as any global defaults. The provided context is used for the underlying Postgres insert and can be used to cancel the operation or apply a timeout.
	InsertTx(ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
	// InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple,
	// which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
	// The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.
	InsertManyFast(ctx context.Context, params []river.InsertManyParams) (int, error)
	// InsertManyTx inserts many jobs at once using Postgres' `COPY FROM` mechanism, making the operation quite fast and memory efficient. Each job is inserted as an InsertManyParams tuple,
	// which takes job args along with an optional set of insert options, which override insert options provided by an JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
	// The provided context is used for the underlying Postgres inserts and can be used to cancel the operation or apply a timeout.
	InsertManyFastTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) (int, error)
	// JobCancel cancels the job with the given ID. If possible, the job is cancelled immediately and will not be retried.
	// The provided context is used for the underlying Postgres update and can be used to cancel the operation or apply a timeout.
	JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error)
	// JobCancelTx cancels the job with the given ID within the specified transaction. This variant lets a caller cancel a job atomically alongside other database changes.
	// A cancelled job doesn't take effect until the transaction commits, and if the transaction rolls back, so too is the cancelled job.
	JobCancelTx(ctx context.Context, tx pgx.Tx, jobID int64) (*rivertype.JobRow, error)

	// GetPool returns the underlying pgx pool
	GetPool() *pgxpool.Pool
	// TruncateRiverTables truncates River tables in the target database. This is for test cleanup and should obviously only be used in tests.
	TruncateRiverTables(ctx context.Context) error
	// GetRiverClient returns the underlying river client
	// this can be used to interact directly with the river client for more advanced use cases (e.g. starting the river server)
	// which are outside the scope of the insert-only client interface
	GetRiverClient() *river.Client[pgx.Tx]
	// Close closes the underlying pgx pool
	Close() error
}

JobClient is an interface for the river client to insert jobs this interface is only used for inserting new jobs and will not contain any other methods

type Option

type Option func(*Client)

Option is a function that configures a client

func WithConnectionURI

func WithConnectionURI(uri string) Option

WithConnectionURI sets the connection URI for the client

func WithJobTimeout

func WithJobTimeout(jobTimeout time.Duration) Option

WithJobTimeout sets the job timeout for the client

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger sets the logger for the client

func WithMaxRetries

func WithMaxRetries(maxRetries int) Option

WithMaxRetries sets the maximum number of retries for the client

func WithQueues

func WithQueues(q map[string]river.QueueConfig) Option

WithQueues sets the queues for the client this should be omitted when creating an insert only client

func WithRiverConfig

func WithRiverConfig(conf river.Config) Option

WithRiverConfig sets the entire river configuration for the client prefer using the other options when possible

func WithRunMigrations

func WithRunMigrations(runMigrations bool) Option

WithRunMigrations sets the run migrations flag for the client

func WithWorkers

func WithWorkers(workers *river.Workers) Option

WithWorkers sets the workers for the client this should be omitted when creating an insert only client

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL