jobsdb

package
v1.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2023 License: AGPL-3.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RawDataDestUploadOperation = "S3_DEST_UPLOAD"
)

We keep a journal of all the operations. The journal helps

Variables

View Source
var (
	// Not valid, Not terminal
	NotProcessed = jobStateT{State: "not_picked_yet", /* contains filtered or unexported fields */}

	// Valid, Not terminal
	Failed    = jobStateT{State: "failed", /* contains filtered or unexported fields */}
	Executing = jobStateT{State: "executing", /* contains filtered or unexported fields */}
	Waiting   = jobStateT{State: "waiting", /* contains filtered or unexported fields */}
	Importing = jobStateT{State: "importing", /* contains filtered or unexported fields */}

	// Valid, Terminal
	Succeeded = jobStateT{State: "succeeded", /* contains filtered or unexported fields */}
	Aborted   = jobStateT{State: "aborted", /* contains filtered or unexported fields */}
	Migrated  = jobStateT{State: "migrated", /* contains filtered or unexported fields */}
)

State definitions

Functions

func Init added in v0.1.10

func Init()

func Init2 added in v0.1.10

func Init2()

func IsMasterBackupEnabled added in v0.1.10

func IsMasterBackupEnabled() bool

Types

type GetAllJobsResult added in v1.3.0

type GetAllJobsResult struct {
	Jobs []*JobT
	More MoreToken
}

type GetQueryParamsT added in v0.1.10

type GetQueryParamsT struct {

	// if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used
	IgnoreCustomValFiltersInQuery bool
	WorkspaceID                   string
	CustomValFilters              []string
	ParameterFilters              []ParameterFilterT
	StateFilters                  []string
	AfterJobID                    *int64

	// Limit the total number of jobs.
	// A value less than or equal to zero will return no results
	JobsLimit int
	// Limit the total number of events, 1 job contains 1+ event(s).
	// A value less than or equal to zero will disable this limit (no limit),
	// only values greater than zero are considered as valid limits.
	EventsLimit int
	// Limit the total job payload size
	// A value less than or equal to zero will disable this limit (no limit),
	// only values greater than zero are considered as valid limits.
	PayloadSizeLimit int64
}

GetQueryParamsT is a struct to hold jobsdb query params.

type HandleInspector added in v1.0.2

type HandleInspector struct {
	*HandleT
}

HandleInspector is only intended to be used by tests for verifying the handle's internal state

func (*HandleInspector) DSIndicesList added in v1.2.0

func (h *HandleInspector) DSIndicesList() []string

DSIndicesList returns the slice of current ds indices

type HandleT

type HandleT struct {
	MinDSRetentionPeriod time.Duration
	MaxDSRetentionPeriod time.Duration

	BackupSettings *backupSettings

	MaxDSSize *int

	// TriggerAddNewDS, TriggerMigrateDS is useful for triggering addNewDS to run from tests.
	// TODO: Ideally we should refactor the code to not use this override.
	TriggerAddNewDS  func() <-chan time.Time
	TriggerMigrateDS func() <-chan time.Time

	TriggerRefreshDS func() <-chan time.Time
	// contains filtered or unexported fields
}

HandleT is the main type implementing the database for implementing jobs. The caller must call the SetUp function on a HandleT object

func NewForRead added in v0.1.10

func NewForRead(tablePrefix string, opts ...OptsFunc) *HandleT

func NewForReadWrite added in v0.1.10

func NewForReadWrite(tablePrefix string, opts ...OptsFunc) *HandleT

func NewForWrite added in v0.1.10

func NewForWrite(tablePrefix string, opts ...OptsFunc) *HandleT

func (*HandleT) Close added in v0.1.10

func (jd *HandleT) Close()

Close closes the database connection.

Stop should be called before Close.

func (*HandleT) DeleteExecuting added in v0.1.10

func (jd *HandleT) DeleteExecuting()

DeleteExecuting deletes events whose latest job state is executing. This is only done during recovery, which happens during the server start.

func (*HandleT) FailExecuting added in v1.5.0

func (jd *HandleT) FailExecuting()

FailExecuting fails events whose latest job state is executing.

This is only done during recovery, which happens during the server start.

func (*HandleT) GetActiveWorkspaces added in v1.6.0

func (jd *HandleT) GetActiveWorkspaces(ctx context.Context, customVal string) ([]string, error)

func (*HandleT) GetDistinctParameterValues added in v1.8.0

func (jd *HandleT) GetDistinctParameterValues(ctx context.Context, parameterName string) ([]string, error)

func (*HandleT) GetExecuting

func (jd *HandleT) GetExecuting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

func (*HandleT) GetImporting added in v0.1.10

func (jd *HandleT) GetImporting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

func (*HandleT) GetJournalEntries

func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)

func (*HandleT) GetLastJob added in v0.1.10

func (jd *HandleT) GetLastJob() *JobT

func (*HandleT) GetMaxDSIndex

func (jd *HandleT) GetMaxDSIndex() (maxDSIndex int64)

GetMaxDSIndex returns max dataset index in the DB

func (*HandleT) GetPileUpCounts added in v0.1.10

func (jd *HandleT) GetPileUpCounts(ctx context.Context) (map[string]map[string]int, error)

func (*HandleT) GetProcessed

func (jd *HandleT) GetProcessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

GetProcessed returns events of a given state. This does not update any state itself and realises on the caller to update it. That means that successive calls to GetProcessed("failed") can return the same set of events. It is the responsibility of the caller to call it from one thread, update the state (to "waiting") in the same thread and pass on the processors

func (*HandleT) GetToRetry

func (jd *HandleT) GetToRetry(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

GetToRetry returns events which need to be retried. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.

func (*HandleT) GetUnprocessed

func (jd *HandleT) GetUnprocessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

GetUnprocessed returns the unprocessed events. Unprocessed events are those whose state hasn't been marked in the DB. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.

func (*HandleT) GetWaiting

func (jd *HandleT) GetWaiting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

GetWaiting returns events which are under processing If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.

func (*HandleT) Identifier added in v0.1.10

func (jd *HandleT) Identifier() string

Identifier returns the identifier of the jobsdb. Here it is tablePrefix.

func (*HandleT) JournalDeleteEntry

func (jd *HandleT) JournalDeleteEntry(opID int64)

func (*HandleT) JournalMarkDone

func (jd *HandleT) JournalMarkDone(opID int64)

JournalMarkDone marks the end of a journal action

func (*HandleT) JournalMarkStart

func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64

func (*HandleT) JournalMarkStartInTx added in v0.1.10

func (jd *HandleT) JournalMarkStartInTx(tx *Tx, opType string, opPayload json.RawMessage) (int64, error)

func (*HandleT) Ping added in v0.1.10

func (jd *HandleT) Ping() error

Ping returns health check for pg database

func (*HandleT) SchemaMigrationTable added in v0.1.10

func (jd *HandleT) SchemaMigrationTable() string

SchemaMigrationTable returns the table name used for storing current schema version.

func (*HandleT) Setup

func (jd *HandleT) Setup(
	ownerType OwnerType, clearAll bool, tablePrefix string,
	preBackupHandlers []prebackup.Handler, fileUploaderProvider fileuploader.Provider,
) error

Setup is used to initialize the HandleT structure. clearAll = True means it will remove all existing tables tablePrefix must be unique and is used to separate multiple users of JobsDB

func (*HandleT) Start added in v0.1.10

func (jd *HandleT) Start() error

Start starts the jobsdb worker and housekeeping (migration, archive) threads. Start should be called before any other jobsdb methods are called.

func (*HandleT) Stop added in v0.1.10

func (jd *HandleT) Stop()

Stop stops the background goroutines and waits until they finish. Stop should be called once only after Start. Only Start and Close can be called after Stop.

func (*HandleT) Store

func (jd *HandleT) Store(ctx context.Context, jobList []*JobT) error

Store stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.

func (*HandleT) StoreInTx added in v0.1.10

func (jd *HandleT) StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error

StoreInTx stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.

func (*HandleT) StoreWithRetryEach added in v0.1.10

func (jd *HandleT) StoreWithRetryEach(ctx context.Context, jobList []*JobT) map[uuid.UUID]string

func (*HandleT) StoreWithRetryEachInTx added in v0.1.10

func (jd *HandleT) StoreWithRetryEachInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) (map[uuid.UUID]string, error)

func (*HandleT) TearDown

func (jd *HandleT) TearDown()

TearDown stops the background goroutines,

waits until they finish and closes the database.

func (*HandleT) UpdateJobStatus

func (jd *HandleT) UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

func (*HandleT) UpdateJobStatusInTx added in v0.1.10

func (jd *HandleT) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

UpdateJobStatusInTx updates the status of a batch of jobs in the past transaction customValFilters[] is passed, so we can efficiently mark empty cache Later we can move this to query IMP NOTE: AcquireUpdateJobStatusLocks Should be called before calling this function

func (*HandleT) WithStoreSafeTx added in v0.1.10

func (jd *HandleT) WithStoreSafeTx(ctx context.Context, f func(tx StoreSafeTx) error) error

func (*HandleT) WithTx added in v0.1.10

func (jd *HandleT) WithTx(f func(tx *Tx) error) error

func (*HandleT) WithUpdateSafeTx added in v0.1.10

func (jd *HandleT) WithUpdateSafeTx(ctx context.Context, f func(tx UpdateSafeTx) error) error

type JobStatusT

type JobStatusT struct {
	JobID         int64           `json:"JobID"`
	JobState      string          `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry,  failed, aborted, migrating, migrated, wont_migrate
	AttemptNum    int             `json:"AttemptNum"`
	ExecTime      time.Time       `json:"ExecTime"`
	RetryTime     time.Time       `json:"RetryTime"`
	ErrorCode     string          `json:"ErrorCode"`
	ErrorResponse json.RawMessage `json:"ErrorResponse"`
	Parameters    json.RawMessage `json:"Parameters"`
	JobParameters json.RawMessage `json:"-"`
	WorkspaceId   string          `json:"WorkspaceId"`
}

JobStatusT is used for storing status of the job. It is the responsibility of the user of this module to set appropriate job status. State can be one of ENUM waiting, executing, succeeded, waiting_retry, failed, aborted

type JobT

type JobT struct {
	UUID          uuid.UUID       `json:"UUID"`
	JobID         int64           `json:"JobID"`
	UserID        string          `json:"UserID"`
	CreatedAt     time.Time       `json:"CreatedAt"`
	ExpireAt      time.Time       `json:"ExpireAt"`
	CustomVal     string          `json:"CustomVal"`
	EventCount    int             `json:"EventCount"`
	EventPayload  json.RawMessage `json:"EventPayload"`
	PayloadSize   int64           `json:"PayloadSize"`
	LastJobStatus JobStatusT      `json:"LastJobStatus"`
	Parameters    json.RawMessage `json:"Parameters"`
	WorkspaceId   string          `json:"WorkspaceId"`
}

JobT is the basic type for creating jobs. The JobID is generated by the system and LastJobStatus is populated when reading a processed job while rest should be set by the user.

func (*JobT) String added in v0.1.10

func (job *JobT) String() string

type JobsDB added in v0.1.10

type JobsDB interface {
	// Identifier returns the jobsdb's identifier, a.k.a. table prefix
	Identifier() string

	// WithTx begins a new transaction that can be used by the provided function.
	// If the function returns an error, the transaction will be rollbacked and return the error,
	// otherwise the transaction will be committed and a nil error will be returned.
	WithTx(func(tx *Tx) error) error

	// WithStoreSafeTx prepares a store-safe environment and then starts a transaction
	// that can be used by the provided function.
	WithStoreSafeTx(context.Context, func(tx StoreSafeTx) error) error

	// Store stores the provided jobs to the database
	Store(ctx context.Context, jobList []*JobT) error

	// StoreInTx stores the provided jobs to the database using an existing transaction.
	// Please ensure that you are using an StoreSafeTx, e.g.
	//    jobsdb.WithStoreSafeTx(ctx, func(tx StoreSafeTx) error {
	//	      jobsdb.StoreInTx(ctx, tx, jobList)
	//    })
	StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error

	// StoreWithRetryEach tries to store all the provided jobs to the database and returns the job uuids which failed
	StoreWithRetryEach(ctx context.Context, jobList []*JobT) map[uuid.UUID]string

	// StoreWithRetryEachInTx tries to store all the provided jobs to the database and returns the job uuids which failed, using an existing transaction.
	// Please ensure that you are using an StoreSafeTx, e.g.
	//    jobsdb.WithStoreSafeTx(func(tx StoreSafeTx) error {
	//	      jobsdb.StoreWithRetryEachInTx(ctx, tx, jobList)
	//    })
	StoreWithRetryEachInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) (map[uuid.UUID]string, error)

	// WithUpdateSafeTx prepares an update-safe environment and then starts a transaction
	// that can be used by the provided function. An update-safe transaction shall be used if the provided function
	// needs to call UpdateJobStatusInTx.
	WithUpdateSafeTx(context.Context, func(tx UpdateSafeTx) error) error

	// UpdateJobStatus updates the provided job statuses
	UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

	// UpdateJobStatusInTx updates the provided job statuses in an existing transaction.
	// Please ensure that you are using an UpdateSafeTx, e.g.
	//    jobsdb.WithUpdateSafeTx(ctx, func(tx UpdateSafeTx) error {
	//	      jobsdb.UpdateJobStatusInTx(ctx, tx, statusList, customValFilters, parameterFilters)
	//    })
	UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

	// GetUnprocessed finds unprocessed jobs. Unprocessed are new
	// jobs whose state hasn't been marked in the database yet
	GetUnprocessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

	// GetProcessed finds jobs in some state, i.e. not unprocessed
	GetProcessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

	// GetToRetry finds jobs in failed state
	GetToRetry(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

	// GetWaiting finds jobs in waiting state
	GetWaiting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

	// GetExecuting finds jobs in executing state
	GetExecuting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

	// GetImporting finds jobs in importing state
	GetImporting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)

	// GetPileUpCounts returns statistics (counters) of incomplete jobs
	// grouped by workspaceId and destination type
	GetPileUpCounts(ctx context.Context) (statMap map[string]map[string]int, err error)

	// GetActiveWorkspaces returns a list of active workspace ids. If customVal is not empty, it will be used as a filter
	GetActiveWorkspaces(ctx context.Context, customVal string) (workspaces []string, err error)

	// GetDistinctParameterValues returns the list of distinct parameter values inside the jobs tables
	GetDistinctParameterValues(ctx context.Context, parameterName string) (values []string, err error)

	Ping() error
	DeleteExecuting()
	FailExecuting()

	GetJournalEntries(opType string) (entries []JournalEntryT)
	JournalDeleteEntry(opID int64)
	JournalMarkStart(opType string, opPayload json.RawMessage) int64
}

JobsDB interface contains public methods to access JobsDB data

type JobsResult added in v0.1.10

type JobsResult struct {
	Jobs          []*JobT
	LimitsReached bool
	EventsCount   int
	PayloadSize   int64
}

type JournalEntryT

type JournalEntryT struct {
	OpID      int64
	OpType    string
	OpDone    bool
	OpPayload json.RawMessage
}

type MoreToken added in v1.3.0

type MoreToken interface{}

type MultiTenantHandleT added in v0.1.10

type MultiTenantHandleT struct {
	*HandleT
}

func (*MultiTenantHandleT) GetAllJobs added in v0.1.10

func (mj *MultiTenantHandleT) GetAllJobs(ctx context.Context, pickup map[string]int, params GetQueryParamsT, maxDSQuerySize int, more MoreToken) (*GetAllJobsResult, error)

GetAllJobs gets jobs from all workspaces according to the pickup map

type MultiTenantJobsDB added in v0.1.10

type MultiTenantJobsDB interface {
	GetAllJobs(context.Context, map[string]int, GetQueryParamsT, int, MoreToken) (*GetAllJobsResult, error)

	WithUpdateSafeTx(context.Context, func(tx UpdateSafeTx) error) error
	UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error
	UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

	DeleteExecuting()
	FailExecuting()

	GetJournalEntries(opType string) (entries []JournalEntryT)
	JournalMarkStart(opType string, opPayload json.RawMessage) int64
	JournalDeleteEntry(opID int64)
	GetPileUpCounts(context.Context) (map[string]map[string]int, error)
}

type MultiTenantLegacy added in v0.1.10

type MultiTenantLegacy struct {
	*HandleT
}

func (*MultiTenantLegacy) GetAllJobs added in v0.1.10

func (mj *MultiTenantLegacy) GetAllJobs(ctx context.Context, pickup map[string]int, params GetQueryParamsT, _ int, more MoreToken) (*GetAllJobsResult, error)

type OptsFunc added in v0.1.10

type OptsFunc func(jd *HandleT)

func WithClearDB added in v0.1.10

func WithClearDB(clearDB bool) OptsFunc

WithClearDB, if set to true it will remove all existing tables

func WithDSLimit added in v1.2.0

func WithDSLimit(limit *int) OptsFunc

func WithFileUploaderProvider added in v1.3.0

func WithFileUploaderProvider(fileUploaderProvider fileuploader.Provider) OptsFunc

func WithPreBackupHandlers added in v0.1.10

func WithPreBackupHandlers(preBackupHandlers []prebackup.Handler) OptsFunc

WithPreBackupHandlers, sets pre-backup handlers

type OwnerType added in v0.1.10

type OwnerType string

OwnerType for this jobsdb instance

const (
	// Read : Only Reader of this jobsdb instance
	Read OwnerType = "READ"
	// Write : Only Writer of this jobsdb instance
	Write OwnerType = "WRITE"
	// ReadWrite : Reader and Writer of this jobsdb instance
	ReadWrite OwnerType = ""
)

type ParameterFilterT

type ParameterFilterT struct {
	Name  string
	Value string
}

func (ParameterFilterT) GetName added in v1.8.0

func (p ParameterFilterT) GetName() string

func (ParameterFilterT) GetValue added in v1.8.0

func (p ParameterFilterT) GetValue() string

type QueryConditions added in v0.1.10

type QueryConditions struct {
	// if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used
	IgnoreCustomValFiltersInQuery bool
	CustomValFilters              []string
	ParameterFilters              []ParameterFilterT
	StateFilters                  []string
	AfterJobID                    *int64
}

QueryConditions holds jobsdb query conditions

type StoreSafeTx added in v0.1.10

type StoreSafeTx interface {
	Tx() *Tx
	SqlTx() *sql.Tx
	// contains filtered or unexported methods
}

StoreSafeTx sealed interface

func EmptyStoreSafeTx added in v0.1.10

func EmptyStoreSafeTx() StoreSafeTx

EmptyStoreSafeTx returns an empty interface usable only for tests

type Tx added in v1.2.1

type Tx struct {
	*sql.Tx
	// contains filtered or unexported fields
}

Tx is a wrapper around sql.Tx that supports registering and executing post-commit actions, a.k.a. success listeners.

func (*Tx) AddSuccessListener added in v1.2.1

func (tx *Tx) AddSuccessListener(listener func())

AddSuccessListener registers a listener to be executed after the transaction has been committed successfully.

func (*Tx) Commit added in v1.2.1

func (tx *Tx) Commit() error

Commit commits the transaction and executes all listeners.

type UpdateSafeTx added in v0.1.10

type UpdateSafeTx interface {
	Tx() *Tx
	SqlTx() *sql.Tx
	// contains filtered or unexported methods
}

UpdateSafeTx sealed interface

func EmptyUpdateSafeTx added in v0.1.10

func EmptyUpdateSafeTx() UpdateSafeTx

EmptyUpdateSafeTx returns an empty interface usable only for tests

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL