rivertest

package
v0.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2023 License: MPL-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package rivertest contains test assertions that can be used in a project's tests to verify that certain actions occurred from the main river package.

Example (RequireInserted)

Example_requireInserted demonstrates the use of the RequireInserted test assertion, which verifies that a single job was inserted.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"testing"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/internal/riverinternaltest"
	"github.com/riverqueue/river/internal/util/slogutil"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/rivertest"
)

type RequiredArgs struct {
	Message string `json:"message"`
}

func (RequiredArgs) Kind() string { return "required" }

type RequiredWorker struct {
	river.WorkerDefaults[RequiredArgs]
}

func (w *RequiredWorker) Work(ctx context.Context, job *river.Job[RequiredArgs]) error { return nil }

// Example_requireInserted demonstrates the use of the RequireInserted test
// assertion, which verifies that a single job was inserted.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &RequiredWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger:  slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	tx, err := dbPool.Begin(ctx)
	if err != nil {
		panic(err)
	}
	defer tx.Rollback(ctx)

	_, err = riverClient.InsertTx(ctx, tx, &RequiredArgs{
		Message: "Hello.",
	}, nil)
	if err != nil {
		panic(err)
	}

	// Required for purposes of our example here, but in reality t will be the
	// *testing.T that comes from a test's argument.
	t := &testing.T{}

	job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
	fmt.Printf("Test passed with message: %s\n", job.Args.Message)

	// Verify the same job again, and this time that it was inserted at the
	// default priority and default queue.
	_ = rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, &rivertest.RequireInsertedOpts{
		Priority: 1,
		Queue:    river.QueueDefault,
	})

	// Insert and verify one on a pool instead of transaction.
	_, err = riverClient.Insert(ctx, &RequiredArgs{Message: "Hello from pool."}, nil)
	if err != nil {
		panic(err)
	}
	_ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, nil)

}
Output:

Test passed with message: Hello.
Example (RequireManyInserted)

Example_requireManyInserted demonstrates the use of the RequireManyInserted test assertion, which requires that multiple jobs of the specified kinds were inserted.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"testing"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/internal/riverinternaltest"
	"github.com/riverqueue/river/internal/util/slogutil"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/rivertest"
)

type FirstRequiredArgs struct {
	Message string `json:"message"`
}

func (FirstRequiredArgs) Kind() string { return "first_required" }

type FirstRequiredWorker struct {
	river.WorkerDefaults[FirstRequiredArgs]
}

func (w *FirstRequiredWorker) Work(ctx context.Context, job *river.Job[FirstRequiredArgs]) error {
	return nil
}

type SecondRequiredArgs struct {
	Message string `json:"message"`
}

func (SecondRequiredArgs) Kind() string { return "second_required" }

type SecondRequiredWorker struct {
	river.WorkerDefaults[SecondRequiredArgs]
}

func (w *SecondRequiredWorker) Work(ctx context.Context, job *river.Job[SecondRequiredArgs]) error {
	return nil
}

// Example_requireManyInserted demonstrates the use of the RequireManyInserted test
// assertion, which requires that multiple jobs of the specified kinds were
// inserted.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &FirstRequiredWorker{})
	river.AddWorker(workers, &SecondRequiredWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger:  slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	tx, err := dbPool.Begin(ctx)
	if err != nil {
		panic(err)
	}
	defer tx.Rollback(ctx)

	_, err = riverClient.InsertTx(ctx, tx, &FirstRequiredArgs{Message: "Hello from first."}, nil)
	if err != nil {
		panic(err)
	}

	_, err = riverClient.InsertTx(ctx, tx, &SecondRequiredArgs{Message: "Hello from second."}, nil)
	if err != nil {
		panic(err)
	}

	_, err = riverClient.InsertTx(ctx, tx, &FirstRequiredArgs{Message: "Hello from first (again)."}, nil)
	if err != nil {
		panic(err)
	}

	// Required for purposes of our example here, but in reality t will be the
	// *testing.T that comes from a test's argument.
	t := &testing.T{}

	jobs := rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{
		{Args: &FirstRequiredArgs{}},
		{Args: &SecondRequiredArgs{}},
		{Args: &FirstRequiredArgs{}},
	})
	for i, job := range jobs {
		fmt.Printf("Job %d args: %s\n", i, string(job.EncodedArgs))
	}

	// Verify again, and this time that the second job was inserted at the
	// default priority and default queue.
	_ = rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{
		{Args: &SecondRequiredArgs{}, Opts: &rivertest.RequireInsertedOpts{
			Priority: 1,
			Queue:    river.QueueDefault,
		}},
	})

	// Insert and verify one on a pool instead of transaction.
	_, err = riverClient.Insert(ctx, &FirstRequiredArgs{Message: "Hello from pool."}, nil)
	if err != nil {
		panic(err)
	}
	_ = rivertest.RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []rivertest.ExpectedJob{
		{Args: &FirstRequiredArgs{}},
	})

}
Output:

Job 0 args: {"message": "Hello from first."}
Job 1 args: {"message": "Hello from second."}
Job 2 args: {"message": "Hello from first (again)."}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func RequireInserted

func RequireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs]

RequireInserted is a test helper that verifies that a job of the given kind was inserted for work, failing the test if it wasn't. If found, the inserted job is returned so that further assertions can be made against it.

job := RequireInserted(ctx, t, riverpgxv5.New(dbPool), &Job1Args{}, nil)

This variant takes a driver that wraps a database pool. See also RequireManyInsertedTx which takes a transaction.

A RequireInsertedOpts struct can be provided as the last argument, and if it is, its properties (e.g. max attempts, priority, queue name) will act as required assertions in the inserted job row. UniqueOpts is ignored.

The assertion will fail if more than one job of the given kind was found because at that point the job to return is ambiguous. Use RequireManyInserted to cover that case instead.

func RequireInsertedTx added in v0.0.4

func RequireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs]

RequireInsertedTx is a test helper that verifies that a job of the given kind was inserted for work, failing the test if it wasn't. If found, the inserted job is returned so that further assertions can be made against it.

job := RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &Job1Args{}, nil)

This variant takes a transaction. See also RequireInserted which takes a driver that wraps a database pool.

A RequireInsertedOpts struct can be provided as the last argument, and if it is, its properties (e.g. max attempts, priority, queue name) will act as required assertions in the inserted job row. UniqueOpts is ignored.

The assertion will fail if more than one job of the given kind was found because at that point the job to return is ambiguous. Use RequireManyInserted to cover that case instead.

func RequireManyInserted

func RequireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, driver TDriver, expectedJobs []ExpectedJob) []*rivertype.JobRow

RequireManyInserted is a test helper that verifies that jobs of the given kinds were inserted for work, failing the test if they weren't, or were inserted in the wrong order. If found, the inserted jobs are returned so that further assertions can be made against them.

job := RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []river.JobArgs{
	&Job1Args{},
})

This variant takes a driver that wraps a database pool. See also RequireManyInsertedTx which takes a transaction.

A RequireInsertedOpts struct can be provided for each expected job, and if it is, its properties (e.g. max attempts, priority, queue name) will act as required assertions for the corresponding inserted job row. UniqueOpts is ignored.

The assertion expects emitted jobs to have occurred exactly in the order and the number specified, and will fail in case this expectation isn't met. So if a job of a certain kind is emitted multiple times, it must be expected multiple times.

func RequireManyInsertedTx added in v0.0.4

func RequireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, tx TTx, expectedJobs []ExpectedJob) []*rivertype.JobRow

RequireManyInsertedTx is a test helper that verifies that jobs of the given kinds were inserted for work, failing the test if they weren't, or were inserted in the wrong order. If found, the inserted jobs are returned so that further assertions can be made against them.

job := RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []river.JobArgs{
	&Job1Args{},
})

This variant takes a transaction. See also RequireManyInserted which takes a driver that wraps a database pool.

A RequireInsertedOpts struct can be provided for each expected job, and if it is, its properties (e.g. max attempts, priority, queue name) will act as required assertions for the corresponding inserted job row. UniqueOpts is ignored.

The assertion expects emitted jobs to have occurred exactly in the order and the number specified, and will fail in case this expectation isn't met. So if a job of a certain kind is emitted multiple times, it must be expected multiple times.

Types

type ExpectedJob

type ExpectedJob struct {
	// Args are job arguments to expect.
	Args river.JobArgs

	// Opts are options for the specific required job including insertion
	// options to assert against.
	Opts *RequireInsertedOpts
}

ExpectedJob is a single job to expect encapsulating job args and possible insertion options.

type RequireInsertedOpts

type RequireInsertedOpts struct {
	// MaxAttempts is the expected maximum number of total attempts for the
	// inserted job.
	//
	// No assertion is made if left the zero value.
	MaxAttempts int

	// Priority is the expected priority for the inserted job.
	//
	// No assertion is made if left the zero value.
	Priority int

	// Queue is the expected queue name of the inserted job.
	//
	// No assertion is made if left the zero value.
	Queue string

	// ScheduledAt is the expected scheduled at time of the inserted job. Times
	// are truncated to the microsecond level for comparison to account for the
	// difference between Go storing times to nanoseconds and Postgres storing
	// only to microsecond precision.
	//
	// No assertion is made if left the zero value.
	ScheduledAt time.Time

	// State is the expected state of the inserted job.
	//
	// No assertion is made if left the zero value.
	State rivertype.JobState

	// Tags are the expected tags of the inserted job.
	//
	// No assertion is made if left the zero value.
	Tags []string
}

Options for RequireInserted or RequireManyInserted including expectations for various queuing properties that stem from InsertOpts.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL