dbadapter

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2023 License: MPL-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Adapter

type Adapter interface {
	JobCompleteMany(ctx context.Context, jobs ...JobToComplete) error
	// TODO: should all dbsqlc need to implement this? Or is it a pro feature?
	JobCompleteManyTx(ctx context.Context, tx pgx.Tx, jobs ...JobToComplete) error
	JobInsert(ctx context.Context, params *JobInsertParams) (*JobInsertResult, error)
	JobInsertTx(ctx context.Context, tx pgx.Tx, params *JobInsertParams) (*JobInsertResult, error)

	// TODO: JobInsertMany functions don't support unique jobs.
	JobInsertMany(ctx context.Context, params []*JobInsertParams) (int64, error)
	JobInsertManyTx(ctx context.Context, tx pgx.Tx, params []*JobInsertParams) (int64, error)

	JobGetAvailable(ctx context.Context, queueName string, limit int32) ([]*dbsqlc.RiverJob, error)
	JobGetAvailableTx(ctx context.Context, tx pgx.Tx, queueName string, limit int32) ([]*dbsqlc.RiverJob, error)

	JobSetCancelledIfRunning(ctx context.Context, id int64, cancelledAt time.Time, err []byte) (*dbsqlc.RiverJob, error)
	JobSetCompletedIfRunning(ctx context.Context, job JobToComplete) (*dbsqlc.RiverJob, error)
	JobSetCompletedTx(ctx context.Context, tx pgx.Tx, id int64, completedAt time.Time) (*dbsqlc.RiverJob, error)
	JobSetDiscardedIfRunning(ctx context.Context, id int64, discardedAt time.Time, err []byte) (*dbsqlc.RiverJob, error)
	JobSetErroredIfRunning(ctx context.Context, id int64, scheduledAt time.Time, err []byte) (*dbsqlc.RiverJob, error)
	JobSetSnoozedIfRunning(ctx context.Context, id int64, scheduledAt time.Time) (*dbsqlc.RiverJob, error)

	// LeadershipAttemptElect attempts to elect a leader for the given name. The
	// bool alreadyElected indicates whether this is a potential reelection of
	// an already-elected leader. If the election is successful because there is
	// no leader or the previous leader expired, the provided leaderID will be
	// set as the new leader with a TTL of ttl.
	//
	// Returns whether this leader was successfully elected or an error if one
	// occurred.
	LeadershipAttemptElect(ctx context.Context, alreadyElected bool, name, leaderID string, ttl time.Duration) (bool, error)

	// LeadershipResign resigns any currently held leaderships for the given name
	// and leader ID.
	LeadershipResign(ctx context.Context, name, leaderID string) error
}

Adapter is an interface to the various database-level operations which River needs to operate. It's quite non-generic for the moment, but the idea is that it'd give us a way to implement access to non-Postgres databases, and may be reimplemented for pro features or exposed to users for customization.

TODO: If exposing publicly, we must first make sure to add an intermediary layer between Adapter types and dbsqlc types. We return `dbsqlc.RiverJob` for expedience, but this should be converted to a more stable API if Adapter would be exported.

type JobInsertParams

type JobInsertParams struct {
	EncodedArgs    []byte
	Kind           string
	MaxAttempts    int
	Metadata       []byte
	Priority       int
	Queue          string
	ScheduledAt    time.Time
	State          dbsqlc.JobState
	Tags           []string
	Unique         bool
	UniqueByArgs   bool
	UniqueByPeriod time.Duration
	UniqueByQueue  bool
	UniqueByState  []dbsqlc.JobState
}

JobInsertParams are parameters for Adapter's `JobInsert*“ functions. They roughly reflect the properties of an inserted job, but only ones that are allowed to be used on input.

type JobInsertResult

type JobInsertResult struct {
	// Job is information about an inserted job.
	//
	// For an insertion that was skipped due to a duplicate, contains the job
	// that already existed.
	Job *dbsqlc.RiverJob

	// UniqueSkippedAsDuplicate indicates that the insert didn't occur because
	// it was a unique job, and another unique job within the unique parameters
	// was already in the database.
	UniqueSkippedAsDuplicate bool
}

type JobToComplete

type JobToComplete struct {
	ID          int64
	FinalizedAt time.Time
}

type StandardAdapter

type StandardAdapter struct {
	baseservice.BaseService

	Config *StandardAdapterConfig // exported so top-level package can test against it; unexport if adapterdb is ever made public
	// contains filtered or unexported fields
}

func NewStandardAdapter

func NewStandardAdapter(archetype *baseservice.Archetype, config *StandardAdapterConfig) *StandardAdapter

TODO: If `StandardAdapter` is ever exposed publicly, we should find a way to internalize archetype. Some options might be for `NewStandardAdapter` to return the `Adapter` interface instead of a concrete struct so that its properties aren't visible, and we could move base service initialization out to the client that accepts it so the user is never aware of its existence.

func (*StandardAdapter) JobCompleteMany

func (a *StandardAdapter) JobCompleteMany(ctx context.Context, jobs ...JobToComplete) error

func (*StandardAdapter) JobCompleteManyTx

func (a *StandardAdapter) JobCompleteManyTx(ctx context.Context, tx pgx.Tx, jobs ...JobToComplete) error

func (*StandardAdapter) JobGetAvailable

func (a *StandardAdapter) JobGetAvailable(ctx context.Context, queueName string, limit int32) ([]*dbsqlc.RiverJob, error)

func (*StandardAdapter) JobGetAvailableTx

func (a *StandardAdapter) JobGetAvailableTx(ctx context.Context, tx pgx.Tx, queueName string, limit int32) ([]*dbsqlc.RiverJob, error)

func (*StandardAdapter) JobInsert

func (a *StandardAdapter) JobInsert(ctx context.Context, params *JobInsertParams) (*JobInsertResult, error)

func (*StandardAdapter) JobInsertMany

func (a *StandardAdapter) JobInsertMany(ctx context.Context, params []*JobInsertParams) (int64, error)

func (*StandardAdapter) JobInsertManyTx

func (a *StandardAdapter) JobInsertManyTx(ctx context.Context, tx pgx.Tx, params []*JobInsertParams) (int64, error)

func (*StandardAdapter) JobInsertTx

func (a *StandardAdapter) JobInsertTx(ctx context.Context, tx pgx.Tx, params *JobInsertParams) (*JobInsertResult, error)

func (*StandardAdapter) JobSetCancelledIfRunning

func (a *StandardAdapter) JobSetCancelledIfRunning(ctx context.Context, id int64, finalizedAt time.Time, err []byte) (*dbsqlc.RiverJob, error)

func (*StandardAdapter) JobSetCompletedIfRunning

func (a *StandardAdapter) JobSetCompletedIfRunning(ctx context.Context, job JobToComplete) (*dbsqlc.RiverJob, error)

func (*StandardAdapter) JobSetCompletedTx

func (a *StandardAdapter) JobSetCompletedTx(ctx context.Context, tx pgx.Tx, id int64, completedAt time.Time) (*dbsqlc.RiverJob, error)

func (*StandardAdapter) JobSetDiscardedIfRunning

func (a *StandardAdapter) JobSetDiscardedIfRunning(ctx context.Context, id int64, finalizedAt time.Time, err []byte) (*dbsqlc.RiverJob, error)

func (*StandardAdapter) JobSetErroredIfRunning

func (a *StandardAdapter) JobSetErroredIfRunning(ctx context.Context, id int64, scheduledAt time.Time, err []byte) (*dbsqlc.RiverJob, error)

func (*StandardAdapter) JobSetSnoozedIfRunning

func (a *StandardAdapter) JobSetSnoozedIfRunning(ctx context.Context, id int64, scheduledAt time.Time) (*dbsqlc.RiverJob, error)

func (*StandardAdapter) LeadershipAttemptElect

func (a *StandardAdapter) LeadershipAttemptElect(ctx context.Context, alreadyElected bool, name, leaderID string, ttl time.Duration) (bool, error)

func (*StandardAdapter) LeadershipResign

func (a *StandardAdapter) LeadershipResign(ctx context.Context, name, leaderID string) error

type StandardAdapterConfig

type StandardAdapterConfig struct {
	// AdvisoryLockPrefix is a configurable 32-bit prefix that River will use
	// when generating any key to acquire a Postgres advisory lock.
	AdvisoryLockPrefix int32

	// Executor is a database executor to perform database operations with. In
	// non-test environments it's a database pool.
	Executor dbutil.Executor

	// DeadlineTimeout is a timeout used to set a context deadline for every
	// adapter operation.
	DeadlineTimeout time.Duration

	// WorkerName is a name to assign this worker.
	WorkerName string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL