Documentation ¶
Index ¶
- Constants
- Variables
- type GetQueryParams
- type Handle
- func (jd *Handle) Close()
- func (jd *Handle) DeleteExecuting()
- func (jd *Handle) FailExecuting()
- func (jd *Handle) GetAborted(ctx context.Context, params GetQueryParams) (JobsResult, error)
- func (jd *Handle) GetActiveWorkspaces(ctx context.Context, customVal string) ([]string, error)
- func (jd *Handle) GetDistinctParameterValues(ctx context.Context, parameterName string) ([]string, error)
- func (jd *Handle) GetFailed(ctx context.Context, params GetQueryParams) (JobsResult, error)
- func (jd *Handle) GetImporting(ctx context.Context, params GetQueryParams) (JobsResult, error)
- func (jd *Handle) GetJobs(ctx context.Context, states []string, params GetQueryParams) (JobsResult, error)
- func (jd *Handle) GetJournalEntries(opType string) (entries []JournalEntryT)
- func (jd *Handle) GetLastJob() *JobT
- func (jd *Handle) GetMaxDSIndex() (maxDSIndex int64)
- func (jd *Handle) GetPileUpCounts(ctx context.Context) (map[string]map[string]int, error)
- func (jd *Handle) GetSucceeded(ctx context.Context, params GetQueryParams) (JobsResult, error)
- func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error)
- func (jd *Handle) GetUnprocessed(ctx context.Context, params GetQueryParams) (JobsResult, error)
- func (jd *Handle) GetWaiting(ctx context.Context, params GetQueryParams) (JobsResult, error)
- func (jd *Handle) Identifier() string
- func (jd *Handle) IsMasterBackupEnabled() bool
- func (jd *Handle) JournalDeleteEntry(opID int64)
- func (jd *Handle) JournalMarkDone(opID int64) error
- func (jd *Handle) JournalMarkStart(opType string, opPayload json.RawMessage) (int64, error)
- func (jd *Handle) JournalMarkStartInTx(tx *Tx, opType string, opPayload json.RawMessage) (int64, error)
- func (jd *Handle) Ping() error
- func (jd *Handle) SchemaMigrationTable() string
- func (jd *Handle) Setup(ownerType OwnerType, clearAll bool, tablePrefix string, ...) error
- func (jd *Handle) Start() error
- func (jd *Handle) Stop()
- func (jd *Handle) Store(ctx context.Context, jobList []*JobT) error
- func (jd *Handle) StoreEachBatchRetry(ctx context.Context, jobBatches [][]*JobT) map[uuid.UUID]string
- func (jd *Handle) StoreEachBatchRetryInTx(ctx context.Context, tx StoreSafeTx, jobBatches [][]*JobT) (map[uuid.UUID]string, error)
- func (jd *Handle) StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error
- func (jd *Handle) TearDown()
- func (jd *Handle) UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, ...) error
- func (jd *Handle) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, ...) error
- func (jd *Handle) WithStoreSafeTx(ctx context.Context, f func(tx StoreSafeTx) error) error
- func (jd *Handle) WithTx(f func(tx *Tx) error) error
- func (jd *Handle) WithUpdateSafeTx(ctx context.Context, f func(tx UpdateSafeTx) error) error
- type HandleInspector
- type JobStatusT
- type JobT
- type JobsDB
- type JobsResult
- type JournalEntryT
- type MoreJobsResult
- type MoreToken
- type OptsFunc
- func WithClearDB(clearDB bool) OptsFunc
- func WithConfig(c *config.Config) OptsFunc
- func WithDBHandle(dbHandle *sql.DB) OptsFunc
- func WithDSLimit(limit *int) OptsFunc
- func WithFileUploaderProvider(fileUploaderProvider fileuploader.Provider) OptsFunc
- func WithJobMaxAge(maxAgeFunc func() time.Duration) OptsFunc
- func WithPreBackupHandlers(preBackupHandlers []prebackup.Handler) OptsFunc
- func WithSkipMaintenanceErr(ignore bool) OptsFunc
- type OwnerType
- type ParameterFilterT
- type QueryConditions
- type StoreSafeTx
- type Tx
- type UpdateSafeTx
Constants ¶
const (
RawDataDestUploadOperation = "S3_DEST_UPLOAD"
)
We keep a journal of all the operations. The journal helps
Variables ¶
var ( // Not valid, Not terminal Unprocessed = jobStateT{State: "not_picked_yet", /* contains filtered or unexported fields */} // Valid, Not terminal Failed = jobStateT{State: "failed", /* contains filtered or unexported fields */} Executing = jobStateT{State: "executing", /* contains filtered or unexported fields */} Waiting = jobStateT{State: "waiting", /* contains filtered or unexported fields */} Importing = jobStateT{State: "importing", /* contains filtered or unexported fields */} // Valid, Terminal Succeeded = jobStateT{State: "succeeded", /* contains filtered or unexported fields */} Aborted = jobStateT{State: "aborted", /* contains filtered or unexported fields */} Migrated = jobStateT{State: "migrated", /* contains filtered or unexported fields */} )
State definitions
Functions ¶
This section is empty.
Types ¶
type GetQueryParams ¶
type GetQueryParams struct { // if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used IgnoreCustomValFiltersInQuery bool WorkspaceID string CustomValFilters []string ParameterFilters []ParameterFilterT // Limit the total number of jobs. // A value less than or equal to zero will return no results JobsLimit int // Limit the total number of events, 1 job contains 1+ event(s). // A value less than or equal to zero will disable this limit (no limit), // only values greater than zero are considered as valid limits. EventsLimit int // Limit the total job payload size // A value less than or equal to zero will disable this limit (no limit), // only values greater than zero are considered as valid limits. PayloadSizeLimit int64 // contains filtered or unexported fields }
GetQueryParams is a struct to hold jobsdb query params.
type Handle ¶
type Handle struct { // TriggerAddNewDS, TriggerMigrateDS is useful for triggering addNewDS to run from tests. // TODO: Ideally we should refactor the code to not use this override. TriggerAddNewDS func() <-chan time.Time TriggerMigrateDS func() <-chan time.Time TriggerRefreshDS func() <-chan time.Time TriggerJobCleanUp func() <-chan time.Time // contains filtered or unexported fields }
Handle is the main type implementing the database for implementing jobs. The caller must call the SetUp function on a Handle object
func NewForRead ¶ added in v0.1.10
func NewForReadWrite ¶ added in v0.1.10
func NewForWrite ¶ added in v0.1.10
func (*Handle) Close ¶
func (jd *Handle) Close()
Close closes the database connection.
Stop should be called before Close.
func (*Handle) DeleteExecuting ¶
func (jd *Handle) DeleteExecuting()
DeleteExecuting deletes events whose latest job state is executing. This is only done during recovery, which happens during the server start.
func (*Handle) FailExecuting ¶
func (jd *Handle) FailExecuting()
FailExecuting fails events whose latest job state is executing.
This is only done during recovery, which happens during the server start.
func (*Handle) GetAborted ¶
func (jd *Handle) GetAborted(ctx context.Context, params GetQueryParams) (JobsResult, error)
GetAborted finds jobs in aborted state
func (*Handle) GetActiveWorkspaces ¶
func (*Handle) GetDistinctParameterValues ¶
func (*Handle) GetFailed ¶
func (jd *Handle) GetFailed(ctx context.Context, params GetQueryParams) (JobsResult, error)
GetFailed finds jobs in failed state
func (*Handle) GetImporting ¶
func (jd *Handle) GetImporting(ctx context.Context, params GetQueryParams) (JobsResult, error)
GetImporting finds jobs in importing state
func (*Handle) GetJobs ¶
func (jd *Handle) GetJobs(ctx context.Context, states []string, params GetQueryParams) (JobsResult, error)
GetJobs returns events of a given state. This does not update any state itself and realises on the caller to update it. That means that successive calls to GetJobs("failed") can return the same set of events. It is the responsibility of the caller to call it from one thread, update the state (to "waiting") in the same thread and pass on the processors
func (*Handle) GetJournalEntries ¶
func (jd *Handle) GetJournalEntries(opType string) (entries []JournalEntryT)
func (*Handle) GetLastJob ¶
func (*Handle) GetMaxDSIndex ¶
GetMaxDSIndex returns max dataset index in the DB
func (*Handle) GetPileUpCounts ¶
func (*Handle) GetSucceeded ¶
func (jd *Handle) GetSucceeded(ctx context.Context, params GetQueryParams) (JobsResult, error)
GetSucceeded finds jobs in succeeded state
func (*Handle) GetToProcess ¶
func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error)
func (*Handle) GetUnprocessed ¶
func (jd *Handle) GetUnprocessed(ctx context.Context, params GetQueryParams) (JobsResult, error)
GetUnprocessed finds unprocessed jobs, i.e. new jobs whose state hasn't been marked in the database yet
func (*Handle) GetWaiting ¶
func (jd *Handle) GetWaiting(ctx context.Context, params GetQueryParams) (JobsResult, error)
GetWaiting finds jobs in waiting state
func (*Handle) Identifier ¶
Identifier returns the identifier of the jobsdb. Here it is tablePrefix.
func (*Handle) IsMasterBackupEnabled ¶
func (*Handle) JournalDeleteEntry ¶
func (*Handle) JournalMarkDone ¶
JournalMarkDone marks the end of a journal action
func (*Handle) JournalMarkStart ¶
func (*Handle) JournalMarkStartInTx ¶
func (*Handle) SchemaMigrationTable ¶
SchemaMigrationTable returns the table name used for storing current schema version.
func (*Handle) Setup ¶
func (jd *Handle) Setup( ownerType OwnerType, clearAll bool, tablePrefix string, preBackupHandlers []prebackup.Handler, fileUploaderProvider fileuploader.Provider, ) error
Setup is used to initialize the HandleT structure. clearAll = True means it will remove all existing tables tablePrefix must be unique and is used to separate multiple users of JobsDB
func (*Handle) Start ¶
Start starts the jobsdb worker and housekeeping (migration, archive) threads. Start should be called before any other jobsdb methods are called.
func (*Handle) Stop ¶
func (jd *Handle) Stop()
Stop stops the background goroutines and waits until they finish. Stop should be called once only after Start. Only Start and Close can be called after Stop.
func (*Handle) Store ¶
Store stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.
func (*Handle) StoreEachBatchRetry ¶
func (*Handle) StoreEachBatchRetryInTx ¶
func (*Handle) StoreInTx ¶
StoreInTx stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.
func (*Handle) TearDown ¶
func (jd *Handle) TearDown()
TearDown stops the background goroutines,
waits until they finish and closes the database.
func (*Handle) UpdateJobStatus ¶
func (jd *Handle) UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error
func (*Handle) UpdateJobStatusInTx ¶
func (jd *Handle) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error
UpdateJobStatusInTx updates the status of a batch of jobs in the past transaction customValFilters[] is passed, so we can efficiently mark empty cache Later we can move this to query IMP NOTE: AcquireUpdateJobStatusLocks Should be called before calling this function
func (*Handle) WithStoreSafeTx ¶
func (*Handle) WithUpdateSafeTx ¶
type HandleInspector ¶ added in v1.0.2
type HandleInspector struct {
*Handle
}
HandleInspector is only intended to be used by tests for verifying the handle's internal state
func (*HandleInspector) DSIndicesList ¶ added in v1.2.0
func (h *HandleInspector) DSIndicesList() []string
DSIndicesList returns the slice of current ds indices
type JobStatusT ¶
type JobStatusT struct { JobID int64 `json:"JobID"` JobState string `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry, failed, aborted, migrating, migrated, wont_migrate AttemptNum int `json:"AttemptNum"` ExecTime time.Time `json:"ExecTime"` RetryTime time.Time `json:"RetryTime"` ErrorCode string `json:"ErrorCode"` ErrorResponse json.RawMessage `json:"ErrorResponse"` Parameters json.RawMessage `json:"Parameters"` JobParameters json.RawMessage `json:"-"` WorkspaceId string `json:"WorkspaceId"` }
JobStatusT is used for storing status of the job. It is the responsibility of the user of this module to set appropriate job status. State can be one of ENUM waiting, executing, succeeded, waiting_retry, failed, aborted
type JobT ¶
type JobT struct { UUID uuid.UUID `json:"UUID"` JobID int64 `json:"JobID"` UserID string `json:"UserID"` CreatedAt time.Time `json:"CreatedAt"` ExpireAt time.Time `json:"ExpireAt"` CustomVal string `json:"CustomVal"` EventCount int `json:"EventCount"` EventPayload json.RawMessage `json:"EventPayload"` PayloadSize int64 `json:"PayloadSize"` LastJobStatus JobStatusT `json:"LastJobStatus"` Parameters json.RawMessage `json:"Parameters"` WorkspaceId string `json:"WorkspaceId"` }
JobT is the basic type for creating jobs. The JobID is generated by the system and LastJobStatus is populated when reading a processed job while rest should be set by the user.
type JobsDB ¶ added in v0.1.10
type JobsDB interface { // Identifier returns the jobsdb's identifier, a.k.a. table prefix Identifier() string // WithTx begins a new transaction that can be used by the provided function. // If the function returns an error, the transaction will be rollbacked and return the error, // otherwise the transaction will be committed and a nil error will be returned. WithTx(func(tx *Tx) error) error // WithStoreSafeTx prepares a store-safe environment and then starts a transaction // that can be used by the provided function. WithStoreSafeTx(context.Context, func(tx StoreSafeTx) error) error // Store stores the provided jobs to the database Store(ctx context.Context, jobList []*JobT) error // StoreInTx stores the provided jobs to the database using an existing transaction. // Please ensure that you are using an StoreSafeTx, e.g. // jobsdb.WithStoreSafeTx(ctx, func(tx StoreSafeTx) error { // jobsdb.StoreInTx(ctx, tx, jobList) // }) StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error // StoreEachBatchRetry tries to store all the provided job batches to the database // // returns the uuids of first job of each failed batch StoreEachBatchRetry(ctx context.Context, jobBatches [][]*JobT) map[uuid.UUID]string // StoreEachBatchRetryInTx tries to store all the provided job batches to the database, using an existing transaction. // // returns the uuids of first job of each failed batch // // Please ensure that you are using an StoreSafeTx, e.g. // jobsdb.WithStoreSafeTx(func(tx StoreSafeTx) error { // jobsdb.StoreEachBatchRetryInTx(ctx, tx, jobBatches) // }) StoreEachBatchRetryInTx(ctx context.Context, tx StoreSafeTx, jobBatches [][]*JobT) (map[uuid.UUID]string, error) // WithUpdateSafeTx prepares an update-safe environment and then starts a transaction // that can be used by the provided function. An update-safe transaction shall be used if the provided function // needs to call UpdateJobStatusInTx. WithUpdateSafeTx(context.Context, func(tx UpdateSafeTx) error) error // UpdateJobStatus updates the provided job statuses UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error // UpdateJobStatusInTx updates the provided job statuses in an existing transaction. // Please ensure that you are using an UpdateSafeTx, e.g. // jobsdb.WithUpdateSafeTx(ctx, func(tx UpdateSafeTx) error { // jobsdb.UpdateJobStatusInTx(ctx, tx, statusList, customValFilters, parameterFilters) // }) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error // GetJobs finds jobs in any of the provided state(s) GetJobs(ctx context.Context, states []string, params GetQueryParams) (JobsResult, error) // GetUnprocessed finds unprocessed jobs, i.e. new jobs whose state hasn't been marked in the database yet GetUnprocessed(ctx context.Context, params GetQueryParams) (JobsResult, error) // GetImporting finds jobs in importing state GetImporting(ctx context.Context, params GetQueryParams) (JobsResult, error) // GetAborted finds jobs in aborted state GetAborted(ctx context.Context, params GetQueryParams) (JobsResult, error) // GetWaiting finds jobs in waiting state GetWaiting(ctx context.Context, params GetQueryParams) (JobsResult, error) // GetSucceeded finds jobs in succeeded state GetSucceeded(ctx context.Context, params GetQueryParams) (JobsResult, error) // GetFailed finds jobs in failed state GetFailed(ctx context.Context, params GetQueryParams) (JobsResult, error) // GetToProcess finds jobs in any of the following states: failed, waiting, unprocessed. // It also returns a MoreToken that can be used to fetch more jobs, if available, with a subsequent call. GetToProcess(ctx context.Context, params GetQueryParams, more MoreToken) (*MoreJobsResult, error) // GetPileUpCounts returns statistics (counters) of incomplete jobs // grouped by workspaceId and destination type GetPileUpCounts(ctx context.Context) (statMap map[string]map[string]int, err error) // GetActiveWorkspaces returns a list of active workspace ids. If customVal is not empty, it will be used as a filter GetActiveWorkspaces(ctx context.Context, customVal string) (workspaces []string, err error) // GetDistinctParameterValues returns the list of distinct parameter values inside the jobs tables GetDistinctParameterValues(ctx context.Context, parameterName string) (values []string, err error) Ping() error DeleteExecuting() FailExecuting() GetJournalEntries(opType string) (entries []JournalEntryT) JournalDeleteEntry(opID int64) JournalMarkStart(opType string, opPayload json.RawMessage) (int64, error) JournalMarkDone(opID int64) error IsMasterBackupEnabled() bool }
JobsDB interface contains public methods to access JobsDB data
type JobsResult ¶ added in v0.1.10
type JournalEntryT ¶
type JournalEntryT struct { OpID int64 OpType string OpDone bool OpPayload json.RawMessage }
type MoreJobsResult ¶ added in v1.11.0
type MoreJobsResult struct { JobsResult More MoreToken }
MoreJobsResult is a JobsResult with a MoreToken
type MoreToken ¶ added in v1.3.0
type MoreToken interface{}
MoreToken is a token that can be used to fetch more jobs
type OptsFunc ¶ added in v0.1.10
type OptsFunc func(jd *Handle)
func WithClearDB ¶ added in v0.1.10
WithClearDB, if set to true it will remove all existing tables
func WithConfig ¶
func WithDBHandle ¶
func WithDSLimit ¶ added in v1.2.0
func WithFileUploaderProvider ¶ added in v1.3.0
func WithFileUploaderProvider(fileUploaderProvider fileuploader.Provider) OptsFunc
func WithJobMaxAge ¶
func WithPreBackupHandlers ¶ added in v0.1.10
WithPreBackupHandlers, sets pre-backup handlers
func WithSkipMaintenanceErr ¶ added in v1.10.0
type ParameterFilterT ¶
func (ParameterFilterT) GetName ¶ added in v1.8.0
func (p ParameterFilterT) GetName() string
func (ParameterFilterT) GetValue ¶ added in v1.8.0
func (p ParameterFilterT) GetValue() string
type QueryConditions ¶ added in v0.1.10
type QueryConditions struct { // if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used IgnoreCustomValFiltersInQuery bool CustomValFilters []string ParameterFilters []ParameterFilterT StateFilters []string AfterJobID *int64 }
QueryConditions holds jobsdb query conditions
type StoreSafeTx ¶ added in v0.1.10
StoreSafeTx sealed interface
func EmptyStoreSafeTx ¶ added in v0.1.10
func EmptyStoreSafeTx() StoreSafeTx
EmptyStoreSafeTx returns an empty interface usable only for tests
type Tx ¶ added in v1.2.1
Tx is a wrapper around sql.Tx that supports registering and executing post-commit actions, a.k.a. success listeners.
func (*Tx) AddSuccessListener ¶ added in v1.2.1
func (tx *Tx) AddSuccessListener(listener func())
AddSuccessListener registers a listener to be executed after the transaction has been committed successfully.
type UpdateSafeTx ¶ added in v0.1.10
UpdateSafeTx sealed interface
func EmptyUpdateSafeTx ¶ added in v0.1.10
func EmptyUpdateSafeTx() UpdateSafeTx
EmptyUpdateSafeTx returns an empty interface usable only for tests