dbadapter

package
v0.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2023 License: MPL-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Adapter

type Adapter interface {
	JobInsert(ctx context.Context, params *JobInsertParams) (*JobInsertResult, error)
	JobInsertTx(ctx context.Context, tx pgx.Tx, params *JobInsertParams) (*JobInsertResult, error)

	// TODO: JobInsertMany functions don't support unique jobs.
	JobInsertMany(ctx context.Context, params []*JobInsertParams) (int64, error)
	JobInsertManyTx(ctx context.Context, tx pgx.Tx, params []*JobInsertParams) (int64, error)

	JobGetAvailable(ctx context.Context, queueName string, limit int32) ([]*dbsqlc.RiverJob, error)
	JobGetAvailableTx(ctx context.Context, tx pgx.Tx, queueName string, limit int32) ([]*dbsqlc.RiverJob, error)

	// JobSetStateIfRunning sets the state of a currently running job. Jobs which are not
	// running (i.e. which have already have had their state set to something
	// new through an explicit snooze or cancellation), are ignored.
	JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error)

	// LeadershipAttemptElect attempts to elect a leader for the given name. The
	// bool alreadyElected indicates whether this is a potential reelection of
	// an already-elected leader. If the election is successful because there is
	// no leader or the previous leader expired, the provided leaderID will be
	// set as the new leader with a TTL of ttl.
	//
	// Returns whether this leader was successfully elected or an error if one
	// occurred.
	LeadershipAttemptElect(ctx context.Context, alreadyElected bool, name, leaderID string, ttl time.Duration) (bool, error)

	// LeadershipResign resigns any currently held leaderships for the given name
	// and leader ID.
	LeadershipResign(ctx context.Context, name, leaderID string) error
}

Adapter is an interface to the various database-level operations which River needs to operate. It's quite non-generic for the moment, but the idea is that it'd give us a way to implement access to non-Postgres databases, and may be reimplemented for pro features or exposed to users for customization.

TODO: If exposing publicly, we must first make sure to add an intermediary layer between Adapter types and dbsqlc types. We return `dbsqlc.RiverJob` for expedience, but this should be converted to a more stable API if Adapter would be exported.

type JobInsertParams

type JobInsertParams struct {
	EncodedArgs    []byte
	Kind           string
	MaxAttempts    int
	Metadata       []byte
	Priority       int
	Queue          string
	ScheduledAt    time.Time
	State          dbsqlc.JobState
	Tags           []string
	Unique         bool
	UniqueByArgs   bool
	UniqueByPeriod time.Duration
	UniqueByQueue  bool
	UniqueByState  []dbsqlc.JobState
}

JobInsertParams are parameters for Adapter's `JobInsert*“ functions. They roughly reflect the properties of an inserted job, but only ones that are allowed to be used on input.

type JobInsertResult

type JobInsertResult struct {
	// Job is information about an inserted job.
	//
	// For an insertion that was skipped due to a duplicate, contains the job
	// that already existed.
	Job *dbsqlc.RiverJob

	// UniqueSkippedAsDuplicate indicates that the insert didn't occur because
	// it was a unique job, and another unique job within the unique parameters
	// was already in the database.
	UniqueSkippedAsDuplicate bool
}

type JobSetStateIfRunningParams added in v0.0.13

type JobSetStateIfRunningParams struct {
	ID int64
	// contains filtered or unexported fields
}

JobSetStateIfRunningParams are parameters to update the state of a currently running job. Use one of the constructors below to ensure a correct combination of parameters.

func JobSetStateCancelled added in v0.0.13

func JobSetStateCancelled(id int64, finalizedAt time.Time, errData []byte) *JobSetStateIfRunningParams

func JobSetStateCompleted added in v0.0.13

func JobSetStateCompleted(id int64, finalizedAt time.Time) *JobSetStateIfRunningParams

func JobSetStateDiscarded added in v0.0.13

func JobSetStateDiscarded(id int64, finalizedAt time.Time, errData []byte) *JobSetStateIfRunningParams

func JobSetStateErrorAvailable added in v0.0.13

func JobSetStateErrorAvailable(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams

func JobSetStateErrorRetryable added in v0.0.13

func JobSetStateErrorRetryable(id int64, scheduledAt time.Time, errData []byte) *JobSetStateIfRunningParams

func JobSetStateSnoozed added in v0.0.13

func JobSetStateSnoozed(id int64, scheduledAt time.Time, maxAttempts int) *JobSetStateIfRunningParams

type JobToComplete

type JobToComplete struct {
	ID          int64
	FinalizedAt time.Time
}

type StandardAdapter

type StandardAdapter struct {
	baseservice.BaseService

	Config *StandardAdapterConfig // exported so top-level package can test against it; unexport if adapterdb is ever made public
	// contains filtered or unexported fields
}

func NewStandardAdapter

func NewStandardAdapter(archetype *baseservice.Archetype, config *StandardAdapterConfig) *StandardAdapter

TODO: If `StandardAdapter` is ever exposed publicly, we should find a way to internalize archetype. Some options might be for `NewStandardAdapter` to return the `Adapter` interface instead of a concrete struct so that its properties aren't visible, and we could move base service initialization out to the client that accepts it so the user is never aware of its existence.

func (*StandardAdapter) JobGetAvailable

func (a *StandardAdapter) JobGetAvailable(ctx context.Context, queueName string, limit int32) ([]*dbsqlc.RiverJob, error)

func (*StandardAdapter) JobGetAvailableTx

func (a *StandardAdapter) JobGetAvailableTx(ctx context.Context, tx pgx.Tx, queueName string, limit int32) ([]*dbsqlc.RiverJob, error)

func (*StandardAdapter) JobInsert

func (a *StandardAdapter) JobInsert(ctx context.Context, params *JobInsertParams) (*JobInsertResult, error)

func (*StandardAdapter) JobInsertMany

func (a *StandardAdapter) JobInsertMany(ctx context.Context, params []*JobInsertParams) (int64, error)

func (*StandardAdapter) JobInsertManyTx

func (a *StandardAdapter) JobInsertManyTx(ctx context.Context, tx pgx.Tx, params []*JobInsertParams) (int64, error)

func (*StandardAdapter) JobInsertTx

func (a *StandardAdapter) JobInsertTx(ctx context.Context, tx pgx.Tx, params *JobInsertParams) (*JobInsertResult, error)

func (*StandardAdapter) JobSetStateIfRunning added in v0.0.13

func (a *StandardAdapter) JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*dbsqlc.RiverJob, error)

func (*StandardAdapter) LeadershipAttemptElect

func (a *StandardAdapter) LeadershipAttemptElect(ctx context.Context, alreadyElected bool, name, leaderID string, ttl time.Duration) (bool, error)

func (*StandardAdapter) LeadershipResign

func (a *StandardAdapter) LeadershipResign(ctx context.Context, name, leaderID string) error

type StandardAdapterConfig

type StandardAdapterConfig struct {
	// AdvisoryLockPrefix is a configurable 32-bit prefix that River will use
	// when generating any key to acquire a Postgres advisory lock.
	AdvisoryLockPrefix int32

	// Executor is a database executor to perform database operations with. In
	// non-test environments it's a database pool.
	Executor dbutil.Executor

	// DeadlineTimeout is a timeout used to set a context deadline for every
	// adapter operation.
	DeadlineTimeout time.Duration

	// WorkerName is a name to assign this worker.
	WorkerName string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL