jobsdb

package
v1.13.0-rc.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2023 License: AGPL-3.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RawDataDestUploadOperation = "S3_DEST_UPLOAD"
)

We keep a journal of all the operations. The journal helps

Variables

View Source
var (
	// Not valid, Not terminal
	Unprocessed = jobStateT{State: "not_picked_yet", /* contains filtered or unexported fields */}

	// Valid, Not terminal
	Failed    = jobStateT{State: "failed", /* contains filtered or unexported fields */}
	Executing = jobStateT{State: "executing", /* contains filtered or unexported fields */}
	Waiting   = jobStateT{State: "waiting", /* contains filtered or unexported fields */}
	Importing = jobStateT{State: "importing", /* contains filtered or unexported fields */}

	// Valid, Terminal
	Succeeded = jobStateT{State: "succeeded", /* contains filtered or unexported fields */}
	Aborted   = jobStateT{State: "aborted", /* contains filtered or unexported fields */}
	Migrated  = jobStateT{State: "migrated", /* contains filtered or unexported fields */}
)

State definitions

Functions

This section is empty.

Types

type GetQueryParams

type GetQueryParams struct {

	// if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used
	IgnoreCustomValFiltersInQuery bool
	WorkspaceID                   string
	CustomValFilters              []string
	ParameterFilters              []ParameterFilterT

	// Limit the total number of jobs.
	// A value less than or equal to zero will return no results
	JobsLimit int
	// Limit the total number of events, 1 job contains 1+ event(s).
	// A value less than or equal to zero will disable this limit (no limit),
	// only values greater than zero are considered as valid limits.
	EventsLimit int
	// Limit the total job payload size
	// A value less than or equal to zero will disable this limit (no limit),
	// only values greater than zero are considered as valid limits.
	PayloadSizeLimit int64
	// contains filtered or unexported fields
}

GetQueryParams is a struct to hold jobsdb query params.

type Handle

type Handle struct {

	// TriggerAddNewDS, TriggerMigrateDS is useful for triggering addNewDS to run from tests.
	// TODO: Ideally we should refactor the code to not use this override.
	TriggerAddNewDS  func() <-chan time.Time
	TriggerMigrateDS func() <-chan time.Time
	TriggerRefreshDS func() <-chan time.Time

	TriggerJobCleanUp func() <-chan time.Time
	// contains filtered or unexported fields
}

Handle is the main type implementing the database for implementing jobs. The caller must call the SetUp function on a Handle object

func NewForRead added in v0.1.10

func NewForRead(tablePrefix string, opts ...OptsFunc) *Handle

func NewForReadWrite added in v0.1.10

func NewForReadWrite(tablePrefix string, opts ...OptsFunc) *Handle

func NewForWrite added in v0.1.10

func NewForWrite(tablePrefix string, opts ...OptsFunc) *Handle

func (*Handle) Close

func (jd *Handle) Close()

Close closes the database connection.

Stop should be called before Close.

func (*Handle) DeleteExecuting

func (jd *Handle) DeleteExecuting()

DeleteExecuting deletes events whose latest job state is executing. This is only done during recovery, which happens during the server start.

func (*Handle) FailExecuting

func (jd *Handle) FailExecuting()

FailExecuting fails events whose latest job state is executing.

This is only done during recovery, which happens during the server start.

func (*Handle) GetAborted

func (jd *Handle) GetAborted(ctx context.Context, params GetQueryParams) (JobsResult, error)

GetAborted finds jobs in aborted state

func (*Handle) GetActiveWorkspaces

func (jd *Handle) GetActiveWorkspaces(ctx context.Context, customVal string) ([]string, error)

func (*Handle) GetDistinctParameterValues

func (jd *Handle) GetDistinctParameterValues(ctx context.Context, parameterName string) ([]string, error)

func (*Handle) GetFailed

func (jd *Handle) GetFailed(ctx context.Context, params GetQueryParams) (JobsResult, error)

GetFailed finds jobs in failed state

func (*Handle) GetImporting

func (jd *Handle) GetImporting(ctx context.Context, params GetQueryParams) (JobsResult, error)

GetImporting finds jobs in importing state

func (*Handle) GetJobs

func (jd *Handle) GetJobs(ctx context.Context, states []string, params GetQueryParams) (JobsResult, error)

GetJobs returns events of a given state. This does not update any state itself and realises on the caller to update it. That means that successive calls to GetJobs("failed") can return the same set of events. It is the responsibility of the caller to call it from one thread, update the state (to "waiting") in the same thread and pass on the processors

func (*Handle) GetJournalEntries

func (jd *Handle) GetJournalEntries(opType string) (entries []JournalEntryT)

func (*Handle) GetLastJob

func (jd *Handle) GetLastJob() *JobT

func (*Handle) GetMaxDSIndex

func (jd *Handle) GetMaxDSIndex() (maxDSIndex int64)

GetMaxDSIndex returns max dataset index in the DB

func (*Handle) GetPileUpCounts

func (jd *Handle) GetPileUpCounts(ctx context.Context) (map[string]map[string]int, error)

func (*Handle) GetSucceeded

func (jd *Handle) GetSucceeded(ctx context.Context, params GetQueryParams) (JobsResult, error)

GetSucceeded finds jobs in succeeded state

func (*Handle) GetToProcess

func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error)

func (*Handle) GetUnprocessed

func (jd *Handle) GetUnprocessed(ctx context.Context, params GetQueryParams) (JobsResult, error)

GetUnprocessed finds unprocessed jobs, i.e. new jobs whose state hasn't been marked in the database yet

func (*Handle) GetWaiting

func (jd *Handle) GetWaiting(ctx context.Context, params GetQueryParams) (JobsResult, error)

GetWaiting finds jobs in waiting state

func (*Handle) Identifier

func (jd *Handle) Identifier() string

Identifier returns the identifier of the jobsdb. Here it is tablePrefix.

func (*Handle) IsMasterBackupEnabled

func (jd *Handle) IsMasterBackupEnabled() bool

func (*Handle) JournalDeleteEntry

func (jd *Handle) JournalDeleteEntry(opID int64)

func (*Handle) JournalMarkDone

func (jd *Handle) JournalMarkDone(opID int64) error

JournalMarkDone marks the end of a journal action

func (*Handle) JournalMarkStart

func (jd *Handle) JournalMarkStart(opType string, opPayload json.RawMessage) (int64, error)

func (*Handle) JournalMarkStartInTx

func (jd *Handle) JournalMarkStartInTx(tx *Tx, opType string, opPayload json.RawMessage) (int64, error)

func (*Handle) Ping

func (jd *Handle) Ping() error

Ping returns health check for pg database

func (*Handle) SchemaMigrationTable

func (jd *Handle) SchemaMigrationTable() string

SchemaMigrationTable returns the table name used for storing current schema version.

func (*Handle) Setup

func (jd *Handle) Setup(
	ownerType OwnerType, clearAll bool, tablePrefix string,
	preBackupHandlers []prebackup.Handler, fileUploaderProvider fileuploader.Provider,
) error

Setup is used to initialize the HandleT structure. clearAll = True means it will remove all existing tables tablePrefix must be unique and is used to separate multiple users of JobsDB

func (*Handle) Start

func (jd *Handle) Start() error

Start starts the jobsdb worker and housekeeping (migration, archive) threads. Start should be called before any other jobsdb methods are called.

func (*Handle) Stop

func (jd *Handle) Stop()

Stop stops the background goroutines and waits until they finish. Stop should be called once only after Start. Only Start and Close can be called after Stop.

func (*Handle) Store

func (jd *Handle) Store(ctx context.Context, jobList []*JobT) error

Store stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.

func (*Handle) StoreEachBatchRetry

func (jd *Handle) StoreEachBatchRetry(
	ctx context.Context,
	jobBatches [][]*JobT,
) map[uuid.UUID]string

func (*Handle) StoreEachBatchRetryInTx

func (jd *Handle) StoreEachBatchRetryInTx(
	ctx context.Context,
	tx StoreSafeTx,
	jobBatches [][]*JobT,
) (map[uuid.UUID]string, error)

func (*Handle) StoreInTx

func (jd *Handle) StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error

StoreInTx stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.

func (*Handle) TearDown

func (jd *Handle) TearDown()

TearDown stops the background goroutines,

waits until they finish and closes the database.

func (*Handle) UpdateJobStatus

func (jd *Handle) UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

func (*Handle) UpdateJobStatusInTx

func (jd *Handle) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

UpdateJobStatusInTx updates the status of a batch of jobs in the past transaction customValFilters[] is passed, so we can efficiently mark empty cache Later we can move this to query IMP NOTE: AcquireUpdateJobStatusLocks Should be called before calling this function

func (*Handle) WithStoreSafeTx

func (jd *Handle) WithStoreSafeTx(ctx context.Context, f func(tx StoreSafeTx) error) error

func (*Handle) WithTx

func (jd *Handle) WithTx(f func(tx *Tx) error) error

func (*Handle) WithUpdateSafeTx

func (jd *Handle) WithUpdateSafeTx(ctx context.Context, f func(tx UpdateSafeTx) error) error

type HandleInspector added in v1.0.2

type HandleInspector struct {
	*Handle
}

HandleInspector is only intended to be used by tests for verifying the handle's internal state

func (*HandleInspector) DSIndicesList added in v1.2.0

func (h *HandleInspector) DSIndicesList() []string

DSIndicesList returns the slice of current ds indices

type JobStatusT

type JobStatusT struct {
	JobID         int64           `json:"JobID"`
	JobState      string          `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry,  failed, aborted, migrating, migrated, wont_migrate
	AttemptNum    int             `json:"AttemptNum"`
	ExecTime      time.Time       `json:"ExecTime"`
	RetryTime     time.Time       `json:"RetryTime"`
	ErrorCode     string          `json:"ErrorCode"`
	ErrorResponse json.RawMessage `json:"ErrorResponse"`
	Parameters    json.RawMessage `json:"Parameters"`
	JobParameters json.RawMessage `json:"-"`
	WorkspaceId   string          `json:"WorkspaceId"`
}

JobStatusT is used for storing status of the job. It is the responsibility of the user of this module to set appropriate job status. State can be one of ENUM waiting, executing, succeeded, waiting_retry, failed, aborted

type JobT

type JobT struct {
	UUID          uuid.UUID       `json:"UUID"`
	JobID         int64           `json:"JobID"`
	UserID        string          `json:"UserID"`
	CreatedAt     time.Time       `json:"CreatedAt"`
	ExpireAt      time.Time       `json:"ExpireAt"`
	CustomVal     string          `json:"CustomVal"`
	EventCount    int             `json:"EventCount"`
	EventPayload  json.RawMessage `json:"EventPayload"`
	PayloadSize   int64           `json:"PayloadSize"`
	LastJobStatus JobStatusT      `json:"LastJobStatus"`
	Parameters    json.RawMessage `json:"Parameters"`
	WorkspaceId   string          `json:"WorkspaceId"`
}

JobT is the basic type for creating jobs. The JobID is generated by the system and LastJobStatus is populated when reading a processed job while rest should be set by the user.

func (*JobT) String added in v0.1.10

func (job *JobT) String() string

type JobsDB added in v0.1.10

type JobsDB interface {
	// Identifier returns the jobsdb's identifier, a.k.a. table prefix
	Identifier() string

	// WithTx begins a new transaction that can be used by the provided function.
	// If the function returns an error, the transaction will be rollbacked and return the error,
	// otherwise the transaction will be committed and a nil error will be returned.
	WithTx(func(tx *Tx) error) error

	// WithStoreSafeTx prepares a store-safe environment and then starts a transaction
	// that can be used by the provided function.
	WithStoreSafeTx(context.Context, func(tx StoreSafeTx) error) error

	// Store stores the provided jobs to the database
	Store(ctx context.Context, jobList []*JobT) error

	// StoreInTx stores the provided jobs to the database using an existing transaction.
	// Please ensure that you are using an StoreSafeTx, e.g.
	//    jobsdb.WithStoreSafeTx(ctx, func(tx StoreSafeTx) error {
	//	      jobsdb.StoreInTx(ctx, tx, jobList)
	//    })
	StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error

	// StoreEachBatchRetry tries to store all the provided job batches to the database
	//
	// returns the uuids of first job of each failed batch
	StoreEachBatchRetry(ctx context.Context, jobBatches [][]*JobT) map[uuid.UUID]string

	// StoreEachBatchRetryInTx tries to store all the provided job batches to the database, using an existing transaction.
	//
	// returns the uuids of first job of each failed batch
	//
	// Please ensure that you are using an StoreSafeTx, e.g.
	//    jobsdb.WithStoreSafeTx(func(tx StoreSafeTx) error {
	//	      jobsdb.StoreEachBatchRetryInTx(ctx, tx, jobBatches)
	//    })
	StoreEachBatchRetryInTx(ctx context.Context, tx StoreSafeTx, jobBatches [][]*JobT) (map[uuid.UUID]string, error)

	// WithUpdateSafeTx prepares an update-safe environment and then starts a transaction
	// that can be used by the provided function. An update-safe transaction shall be used if the provided function
	// needs to call UpdateJobStatusInTx.
	WithUpdateSafeTx(context.Context, func(tx UpdateSafeTx) error) error

	// UpdateJobStatus updates the provided job statuses
	UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

	// UpdateJobStatusInTx updates the provided job statuses in an existing transaction.
	// Please ensure that you are using an UpdateSafeTx, e.g.
	//    jobsdb.WithUpdateSafeTx(ctx, func(tx UpdateSafeTx) error {
	//	      jobsdb.UpdateJobStatusInTx(ctx, tx, statusList, customValFilters, parameterFilters)
	//    })
	UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

	// GetJobs finds jobs in any of the provided state(s)
	GetJobs(ctx context.Context, states []string, params GetQueryParams) (JobsResult, error)

	// GetUnprocessed finds unprocessed jobs, i.e. new jobs whose state hasn't been marked in the database yet
	GetUnprocessed(ctx context.Context, params GetQueryParams) (JobsResult, error)

	// GetImporting finds jobs in importing state
	GetImporting(ctx context.Context, params GetQueryParams) (JobsResult, error)

	// GetAborted finds jobs in aborted state
	GetAborted(ctx context.Context, params GetQueryParams) (JobsResult, error)

	// GetWaiting finds jobs in waiting state
	GetWaiting(ctx context.Context, params GetQueryParams) (JobsResult, error)

	// GetSucceeded finds jobs in succeeded state
	GetSucceeded(ctx context.Context, params GetQueryParams) (JobsResult, error)

	// GetFailed finds jobs in failed state
	GetFailed(ctx context.Context, params GetQueryParams) (JobsResult, error)

	// GetToProcess finds jobs in any of the following states: failed, waiting, unprocessed.
	// It also returns a MoreToken that can be used to fetch more jobs, if available, with a subsequent call.
	GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error)

	// GetPileUpCounts returns statistics (counters) of incomplete jobs
	// grouped by workspaceId and destination type
	GetPileUpCounts(ctx context.Context) (statMap map[string]map[string]int, err error)

	// GetActiveWorkspaces returns a list of active workspace ids. If customVal is not empty, it will be used as a filter
	GetActiveWorkspaces(ctx context.Context, customVal string) (workspaces []string, err error)

	// GetDistinctParameterValues returns the list of distinct parameter values inside the jobs tables
	GetDistinctParameterValues(ctx context.Context, parameterName string) (values []string, err error)

	Ping() error
	DeleteExecuting()
	FailExecuting()

	GetJournalEntries(opType string) (entries []JournalEntryT)
	JournalDeleteEntry(opID int64)
	JournalMarkStart(opType string, opPayload json.RawMessage) (int64, error)
	JournalMarkDone(opID int64) error

	IsMasterBackupEnabled() bool
}

JobsDB interface contains public methods to access JobsDB data

type JobsResult added in v0.1.10

type JobsResult struct {
	Jobs          []*JobT
	LimitsReached bool
	EventsCount   int
	PayloadSize   int64
}

type JournalEntryT

type JournalEntryT struct {
	OpID      int64
	OpType    string
	OpDone    bool
	OpPayload json.RawMessage
}

type MoreJobsResult added in v1.11.0

type MoreJobsResult struct {
	JobsResult
	More MoreToken
}

MoreJobsResult is a JobsResult with a MoreToken

type MoreToken added in v1.3.0

type MoreToken interface{}

MoreToken is a token that can be used to fetch more jobs

type OptsFunc added in v0.1.10

type OptsFunc func(jd *Handle)

func WithClearDB added in v0.1.10

func WithClearDB(clearDB bool) OptsFunc

WithClearDB, if set to true it will remove all existing tables

func WithConfig

func WithConfig(c *config.Config) OptsFunc

func WithDBHandle

func WithDBHandle(dbHandle *sql.DB) OptsFunc

func WithDSLimit added in v1.2.0

func WithDSLimit(limit *int) OptsFunc

func WithFileUploaderProvider added in v1.3.0

func WithFileUploaderProvider(fileUploaderProvider fileuploader.Provider) OptsFunc

func WithJobMaxAge

func WithJobMaxAge(maxAgeFunc func() time.Duration) OptsFunc

func WithPreBackupHandlers added in v0.1.10

func WithPreBackupHandlers(preBackupHandlers []prebackup.Handler) OptsFunc

WithPreBackupHandlers, sets pre-backup handlers

func WithSkipMaintenanceErr added in v1.10.0

func WithSkipMaintenanceErr(ignore bool) OptsFunc

type OwnerType added in v0.1.10

type OwnerType string

OwnerType for this jobsdb instance

const (
	// Read : Only Reader of this jobsdb instance
	Read OwnerType = "READ"
	// Write : Only Writer of this jobsdb instance
	Write OwnerType = "WRITE"
	// ReadWrite : Reader and Writer of this jobsdb instance
	ReadWrite OwnerType = ""
)

type ParameterFilterT

type ParameterFilterT struct {
	Name  string
	Value string
}

func (ParameterFilterT) GetName added in v1.8.0

func (p ParameterFilterT) GetName() string

func (ParameterFilterT) GetValue added in v1.8.0

func (p ParameterFilterT) GetValue() string

type QueryConditions added in v0.1.10

type QueryConditions struct {
	// if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used
	IgnoreCustomValFiltersInQuery bool
	CustomValFilters              []string
	ParameterFilters              []ParameterFilterT
	StateFilters                  []string
	AfterJobID                    *int64
}

QueryConditions holds jobsdb query conditions

type StoreSafeTx added in v0.1.10

type StoreSafeTx interface {
	Tx() *Tx
	SqlTx() *sql.Tx
	// contains filtered or unexported methods
}

StoreSafeTx sealed interface

func EmptyStoreSafeTx added in v0.1.10

func EmptyStoreSafeTx() StoreSafeTx

EmptyStoreSafeTx returns an empty interface usable only for tests

type Tx added in v1.2.1

type Tx struct {
	*sql.Tx
	// contains filtered or unexported fields
}

Tx is a wrapper around sql.Tx that supports registering and executing post-commit actions, a.k.a. success listeners.

func (*Tx) AddSuccessListener added in v1.2.1

func (tx *Tx) AddSuccessListener(listener func())

AddSuccessListener registers a listener to be executed after the transaction has been committed successfully.

func (*Tx) Commit added in v1.2.1

func (tx *Tx) Commit() error

Commit commits the transaction and executes all listeners.

type UpdateSafeTx added in v0.1.10

type UpdateSafeTx interface {
	Tx() *Tx
	SqlTx() *sql.Tx
	// contains filtered or unexported methods
}

UpdateSafeTx sealed interface

func EmptyUpdateSafeTx added in v0.1.10

func EmptyUpdateSafeTx() UpdateSafeTx

EmptyUpdateSafeTx returns an empty interface usable only for tests

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL