Documentation ¶
Index ¶
- Constants
- Variables
- func GetConnectionString() string
- func Init()
- func Init2()
- func Init3()
- func IsMasterBackupEnabled() bool
- func QueryWorkspacePileupWithRetries(parentContext context.Context, timeout time.Duration, maxAttempts int, ...) (map[string]map[string]int, error)
- type DSPair
- type ErrorCodeCountStats
- type ErrorCodeCountsByDestination
- type EventStatusDetailed
- type EventStatusStats
- type FailedJobs
- type FailedJobsStats
- type FailedStatusStats
- type GetQueryParamsT
- type HandleInspector
- type HandleT
- func (jd *HandleT) Checkpoint(migrationCheckpoint MigrationCheckpointT) int64
- func (jd *HandleT) CheckpointInTxn(txHandler transactionHandler, migrationCheckpoint MigrationCheckpointT) (int64, error)
- func (jd *HandleT) Close()
- func (jd *HandleT) DeleteExecuting()
- func (jd *HandleT) GetCheckpoints(migrationType MigrationOp, status string) []MigrationCheckpointT
- func (jd *HandleT) GetExecuting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (jd *HandleT) GetImporting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)
- func (jd *HandleT) GetLastJob() *JobT
- func (jd *HandleT) GetLastJobID() int64
- func (jd *HandleT) GetLastJobIDBeforeImport() int64
- func (jd *HandleT) GetMaxDSIndex() (maxDSIndex int64)
- func (jd *HandleT) GetMaxIDForDs(ds dataSetT) int64
- func (jd *HandleT) GetNonMigratedAndMarkMigrating(count int) []*JobT
- func (jd *HandleT) GetPileUpCounts(ctx context.Context) (map[string]map[string]int, error)
- func (jd *HandleT) GetProcessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (jd *HandleT) GetSetupCheckpoint(migrationType MigrationOp) (MigrationCheckpointT, bool)
- func (jd *HandleT) GetTablePrefix() string
- func (jd *HandleT) GetToRetry(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (jd *HandleT) GetUnprocessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (*HandleT) GetUserID(job *JobT) string
- func (jd *HandleT) GetWaiting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (jd *HandleT) Identifier() string
- func (jd *HandleT) IsMigrating() bool
- func (jd *HandleT) JournalDeleteEntry(opID int64)
- func (jd *HandleT) JournalMarkDone(opID int64)
- func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64
- func (jd *HandleT) JournalMarkStartInTx(tx *sql.Tx, opType string, opPayload json.RawMessage) (int64, error)
- func (jd *HandleT) Ping() error
- func (jd *HandleT) PostExportCleanup()
- func (jd *HandleT) PreExportCleanup()
- func (jd *HandleT) RecoverFromMigrationJournal()
- func (jd *HandleT) SchemaMigrationTable() string
- func (jd *HandleT) Setup(ownerType OwnerType, clearAll bool, tablePrefix, migrationMode string, ...) error
- func (jd *HandleT) SetupCheckpointTable()
- func (jd *HandleT) SetupForExport()
- func (jd *HandleT) SetupForImport()
- func (jd *HandleT) SetupForMigration(fromVersion, toVersion int)
- func (jd *HandleT) Start() error
- func (jd *HandleT) Status() interface{}
- func (jd *HandleT) Stop()
- func (jd *HandleT) Store(ctx context.Context, jobList []*JobT) error
- func (jd *HandleT) StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error
- func (jd *HandleT) StoreJobsAndCheckpoint(jobList []*JobT, migrationCheckpoint MigrationCheckpointT)
- func (jd *HandleT) StoreWithRetryEach(ctx context.Context, jobList []*JobT) map[uuid.UUID]string
- func (jd *HandleT) StoreWithRetryEachInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) (map[uuid.UUID]string, error)
- func (jd *HandleT) TearDown()
- func (jd *HandleT) UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, ...) error
- func (jd *HandleT) UpdateJobStatusAndCheckpoint(statusList []*JobStatusT, fromNodeID, toNodeID string, jobsCount int64, ...)
- func (jd *HandleT) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, ...) error
- func (jd *HandleT) UpdateSequenceNumberOfLatestDS(seqNoForNewDS int64)
- func (jd *HandleT) WithStoreSafeTx(f func(tx StoreSafeTx) error) error
- func (jd *HandleT) WithTx(f func(tx *sql.Tx) error) error
- func (jd *HandleT) WithUpdateSafeTx(f func(tx UpdateSafeTx) error) error
- type JobStatusT
- type JobT
- type JobsDB
- type JobsResult
- type JobsdbUtilsHandler
- type JournalEntryT
- type MigrationCheckpointT
- type MigrationOp
- type MultiTenantHandleT
- type MultiTenantJobsDB
- type MultiTenantLegacy
- type OptsFunc
- type OwnerType
- type ParameterFilterT
- type QueryConditions
- type ReadonlyHandleT
- func (jd *ReadonlyHandleT) GetDSListString() (string, error)
- func (jd *ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination(args []string) (string, error)
- func (jd *ReadonlyHandleT) GetJobByID(job_id, prefix string) (string, error)
- func (jd *ReadonlyHandleT) GetJobIDStatus(jobID, _ string) (string, error)
- func (jd *ReadonlyHandleT) GetJobIDsForUser(args []string) (string, error)
- func (jd *ReadonlyHandleT) GetJobSummaryCount(arg, prefix string) (string, error)
- func (jd *ReadonlyHandleT) GetLatestFailedJobs(arg, prefix string) (string, error)
- func (jd *ReadonlyHandleT) HavePendingJobs(ctx context.Context, customValFilters []string, count int, ...) (bool, error)
- func (jd *ReadonlyHandleT) Setup(tablePrefix string)
- func (jd *ReadonlyHandleT) TearDown()
- type ReadonlyJobsDB
- type SQLJobStatusT
- type StoreSafeTx
- type UpdateSafeTx
Constants ¶
const ( ExportOp = "export" ImportOp = "import" AcceptNewEventsOp = "acceptNewEvents" )
ENUM Values for MigrationOp
const ( SetupForExport = "setup_for_export" Exported = "exported" Notified = "notified" Completed = "completed" SetupToAcceptNewEvents = "setup_to_accept_new_events" SetupForImport = "setup_for_import" PreparedForImport = "prepared_for_import" Imported = "imported" )
ENUM Values for Status
const ( MigrationCheckpointSuffix = "migration_checkpoints" UniqueConstraintSuffix = "unique_checkpoint" )
MigrationCheckpointSuffix : Suffix for checkpoints table
const (
RawDataDestUploadOperation = "S3_DEST_UPLOAD"
)
We keep a journal of all the operations. The journal helps
Variables ¶
var ( // Not valid, Not terminal NotProcessed = jobStateT{State: "not_picked_yet", /* contains filtered or unexported fields */} // Valid, Not terminal Failed = jobStateT{State: "failed", /* contains filtered or unexported fields */} Executing = jobStateT{State: "executing", /* contains filtered or unexported fields */} Waiting = jobStateT{State: "waiting", /* contains filtered or unexported fields */} WaitingRetry = jobStateT{State: "waiting_retry", /* contains filtered or unexported fields */} Migrating = jobStateT{State: "migrating", /* contains filtered or unexported fields */} Importing = jobStateT{State: "importing", /* contains filtered or unexported fields */} // Valid, Terminal Succeeded = jobStateT{State: "succeeded", /* contains filtered or unexported fields */} Aborted = jobStateT{State: "aborted", /* contains filtered or unexported fields */} Migrated = jobStateT{State: "migrated", /* contains filtered or unexported fields */} WontMigrate = jobStateT{State: "wont_migrate", /* contains filtered or unexported fields */} )
State definitions
var CacheKeyParameterFilters = []string{"destination_id"}
Functions ¶
func GetConnectionString ¶
func GetConnectionString() string
GetConnectionString Returns Jobs DB connection configuration
func IsMasterBackupEnabled ¶ added in v0.1.10
func IsMasterBackupEnabled() bool
Types ¶
type ErrorCodeCountStats ¶ added in v0.1.10
type ErrorCodeCountStats struct {
ErrorCodeCounts []ErrorCodeCountsByDestination
}
type ErrorCodeCountsByDestination ¶ added in v0.1.10
type EventStatusDetailed ¶ added in v0.1.10
type EventStatusStats ¶ added in v0.1.10
type EventStatusStats struct { StatsNums []EventStatusDetailed DSList string }
type FailedJobs ¶ added in v0.1.10
type FailedJobsStats ¶ added in v0.1.10
type FailedJobsStats struct {
FailedNums []FailedJobs
}
type FailedStatusStats ¶ added in v0.1.10
type FailedStatusStats struct {
FailedStatusStats []JobStatusT
}
type GetQueryParamsT ¶ added in v0.1.10
type GetQueryParamsT struct { // if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used IgnoreCustomValFiltersInQuery bool CustomValFilters []string ParameterFilters []ParameterFilterT StateFilters []string // Limit the total number of jobs. // A value less than or equal to zero will return no results JobsLimit int // Limit the total number of events, 1 job contains 1+ event(s). // A value less than or equal to zero will disable this limit (no limit), // only values greater than zero are considered as valid limits. EventsLimit int // Limit the total job payload size // A value less than or equal to zero will disable this limit (no limit), // only values greater than zero are considered as valid limits. PayloadSizeLimit int64 }
GetQueryParamsT is a struct to hold jobsdb query params.
type HandleInspector ¶ added in v1.0.2
type HandleInspector struct {
*HandleT
}
HandleInspector is only intended to be used by tests for verifying the handle's internal state
func (*HandleInspector) DSListSize ¶ added in v1.0.2
func (h *HandleInspector) DSListSize() int
DSListSize returns the current size of the handle's dsList
type HandleT ¶
type HandleT struct { MinDSRetentionPeriod time.Duration MaxDSRetentionPeriod time.Duration BackupSettings *backupSettings MaxDSSize *int // TriggerAddNewDS, TriggerMigrateDS is useful for triggering addNewDS to run from tests. // TODO: Ideally we should refactor the code to not use this override. TriggerAddNewDS func() <-chan time.Time TriggerMigrateDS func() <-chan time.Time TriggerRefreshDS func() <-chan time.Time // contains filtered or unexported fields }
HandleT is the main type implementing the database for implementing jobs. The caller must call the SetUp function on a HandleT object
func NewForRead ¶ added in v0.1.10
func NewForReadWrite ¶ added in v0.1.10
func NewForWrite ¶ added in v0.1.10
func (*HandleT) Checkpoint ¶ added in v0.1.10
func (jd *HandleT) Checkpoint(migrationCheckpoint MigrationCheckpointT) int64
Checkpoint writes a migration event if id is passed as 0. Else it will update status and start_sequence
func (*HandleT) CheckpointInTxn ¶ added in v0.1.10
func (jd *HandleT) CheckpointInTxn(txHandler transactionHandler, migrationCheckpoint MigrationCheckpointT) (int64, error)
CheckpointInTxn writes a migration event if id is passed as 0. Else it will update status and start_sequence If txn is passed, it will run the statement in that txn, otherwise it will execute without a transaction
func (*HandleT) Close ¶ added in v0.1.10
func (jd *HandleT) Close()
Close closes the database connection.
Stop should be called before Close.
func (*HandleT) DeleteExecuting ¶ added in v0.1.10
func (jd *HandleT) DeleteExecuting()
DeleteExecuting deletes events whose latest job state is executing. This is only done during recovery, which happens during the server start.
func (*HandleT) GetCheckpoints ¶ added in v0.1.10
func (jd *HandleT) GetCheckpoints(migrationType MigrationOp, status string) []MigrationCheckpointT
GetCheckpoints gets all checkpoints and
func (*HandleT) GetExecuting ¶
func (jd *HandleT) GetExecuting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
func (*HandleT) GetImporting ¶ added in v0.1.10
func (jd *HandleT) GetImporting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
func (*HandleT) GetJournalEntries ¶
func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)
func (*HandleT) GetLastJob ¶ added in v0.1.10
func (*HandleT) GetLastJobID ¶ added in v0.1.10
func (*HandleT) GetLastJobIDBeforeImport ¶ added in v0.1.10
GetLastJobIDBeforeImport should return the largest job id stored so far
func (*HandleT) GetMaxDSIndex ¶
GetMaxDSIndex returns max dataset index in the DB
func (*HandleT) GetMaxIDForDs ¶ added in v0.1.10
func (*HandleT) GetNonMigratedAndMarkMigrating ¶ added in v0.1.10
GetNonMigratedAndMarkMigrating all jobs with no filters
func (*HandleT) GetPileUpCounts ¶ added in v0.1.10
func (*HandleT) GetProcessed ¶
func (jd *HandleT) GetProcessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
GetProcessed returns events of a given state. This does not update any state itself and realises on the caller to update it. That means that successive calls to GetProcessed("failed") can return the same set of events. It is the responsibility of the caller to call it from one thread, update the state (to "waiting") in the same thread and pass on the processors
func (*HandleT) GetSetupCheckpoint ¶ added in v0.1.10
func (jd *HandleT) GetSetupCheckpoint(migrationType MigrationOp) (MigrationCheckpointT, bool)
GetSetupCheckpoint gets all checkpoints and picks out the setup event for that type
func (*HandleT) GetTablePrefix ¶ added in v0.1.10
GetTablePrefix returns the table prefix of the jobsdb.
func (*HandleT) GetToRetry ¶
func (jd *HandleT) GetToRetry(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
GetToRetry returns events which need to be retried. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.
func (*HandleT) GetUnprocessed ¶
func (jd *HandleT) GetUnprocessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
GetUnprocessed returns the unprocessed events. Unprocessed events are those whose state hasn't been marked in the DB. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.
func (*HandleT) GetWaiting ¶
func (jd *HandleT) GetWaiting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
GetWaiting returns events which are under processing If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.
func (*HandleT) Identifier ¶ added in v0.1.10
Identifier returns the identifier of the jobsdb. Here it is tablePrefix.
func (*HandleT) IsMigrating ¶ added in v0.1.10
IsMigrating returns true if there are non zero jobs with status = 'migrating'
func (*HandleT) JournalDeleteEntry ¶
func (*HandleT) JournalMarkDone ¶
JournalMarkDone marks the end of a journal action
func (*HandleT) JournalMarkStart ¶
func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64
func (*HandleT) JournalMarkStartInTx ¶ added in v0.1.10
func (*HandleT) PostExportCleanup ¶ added in v0.1.10
func (jd *HandleT) PostExportCleanup()
PostExportCleanup removes all the entries from job_status_tables that are of state 'wont_migrate' or 'migrating'
func (*HandleT) PreExportCleanup ¶ added in v0.1.10
func (jd *HandleT) PreExportCleanup()
PreExportCleanup removes all the entries from job_status_tables that are of state 'migrating'
func (*HandleT) RecoverFromMigrationJournal ¶ added in v0.1.10
func (jd *HandleT) RecoverFromMigrationJournal()
RecoverFromMigrationJournal is an exposed function for migrator package to handle journal crashes during migration
func (*HandleT) SchemaMigrationTable ¶ added in v0.1.10
SchemaMigrationTable returns the table name used for storing current schema version.
func (*HandleT) Setup ¶
func (jd *HandleT) Setup( ownerType OwnerType, clearAll bool, tablePrefix, migrationMode string, registerStatusHandler bool, preBackupHandlers []prebackup.Handler, ) error
Setup is used to initialize the HandleT structure. clearAll = True means it will remove all existing tables tablePrefix must be unique and is used to separate multiple users of JobsDB
func (*HandleT) SetupCheckpointTable ¶ added in v0.1.10
func (jd *HandleT) SetupCheckpointTable()
SetupCheckpointTable creates a table
func (*HandleT) SetupForExport ¶ added in v0.1.10
func (jd *HandleT) SetupForExport()
SetupForExport is used to setup jobsdb for export or for import or for both
func (*HandleT) SetupForImport ¶ added in v0.1.10
func (jd *HandleT) SetupForImport()
SetupForImport is used to setup jobsdb for export or for import or for both
func (*HandleT) SetupForMigration ¶ added in v0.1.10
SetupForMigration prepares jobsdb to start migrations
func (*HandleT) Start ¶ added in v0.1.10
Start starts the jobsdb worker and housekeeping (migration, archive) threads. Start should be called before any other jobsdb methods are called.
func (*HandleT) Stop ¶ added in v0.1.10
func (jd *HandleT) Stop()
Stop stops the background goroutines and waits until they finish. Stop should be called once only after Start. Only Start and Close can be called after Stop.
func (*HandleT) Store ¶
Store stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.
func (*HandleT) StoreInTx ¶ added in v0.1.10
StoreInTx stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.
func (*HandleT) StoreJobsAndCheckpoint ¶ added in v0.1.10
func (jd *HandleT) StoreJobsAndCheckpoint(jobList []*JobT, migrationCheckpoint MigrationCheckpointT)
StoreJobsAndCheckpoint is used to write the jobs to _tables
func (*HandleT) StoreWithRetryEach ¶ added in v0.1.10
func (*HandleT) StoreWithRetryEachInTx ¶ added in v0.1.10
func (*HandleT) TearDown ¶
func (jd *HandleT) TearDown()
TearDown stops the background goroutines,
waits until they finish and closes the database.
func (*HandleT) UpdateJobStatus ¶
func (jd *HandleT) UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error
func (*HandleT) UpdateJobStatusAndCheckpoint ¶ added in v0.1.10
func (jd *HandleT) UpdateJobStatusAndCheckpoint(statusList []*JobStatusT, fromNodeID, toNodeID string, jobsCount int64, uploadLocation string)
UpdateJobStatusAndCheckpoint does update job status and checkpoint in a single transaction
func (*HandleT) UpdateJobStatusInTx ¶ added in v0.1.10
func (jd *HandleT) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error
UpdateJobStatusInTx updates the status of a batch of jobs in the past transaction customValFilters[] is passed, so we can efficiently mark empty cache Later we can move this to query IMP NOTE: AcquireUpdateJobStatusLocks Should be called before calling this function
func (*HandleT) UpdateSequenceNumberOfLatestDS ¶ added in v0.1.10
func (*HandleT) WithStoreSafeTx ¶ added in v0.1.10
func (jd *HandleT) WithStoreSafeTx(f func(tx StoreSafeTx) error) error
func (*HandleT) WithUpdateSafeTx ¶ added in v0.1.10
func (jd *HandleT) WithUpdateSafeTx(f func(tx UpdateSafeTx) error) error
type JobStatusT ¶
type JobStatusT struct { JobID int64 `json:"JobID"` JobState string `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry, failed, aborted, migrating, migrated, wont_migrate AttemptNum int `json:"AttemptNum"` ExecTime time.Time `json:"ExecTime"` RetryTime time.Time `json:"RetryTime"` ErrorCode string `json:"ErrorCode"` ErrorResponse json.RawMessage `json:"ErrorResponse"` Parameters json.RawMessage `json:"Parameters"` WorkspaceId string `json:"WorkspaceId"` }
JobStatusT is used for storing status of the job. It is the responsibility of the user of this module to set appropriate job status. State can be one of ENUM waiting, executing, succeeded, waiting_retry, failed, aborted
func BuildStatus ¶ added in v0.1.10
func BuildStatus(job *JobT, jobState string) *JobStatusT
BuildStatus generates a struct of type JobStatusT for a given job and jobState
type JobT ¶
type JobT struct { UUID uuid.UUID `json:"UUID"` JobID int64 `json:"JobID"` UserID string `json:"UserID"` CreatedAt time.Time `json:"CreatedAt"` ExpireAt time.Time `json:"ExpireAt"` CustomVal string `json:"CustomVal"` EventCount int `json:"EventCount"` EventPayload json.RawMessage `json:"EventPayload"` PayloadSize int64 `json:"PayloadSize"` LastJobStatus JobStatusT `json:"LastJobStatus"` Parameters json.RawMessage `json:"Parameters"` WorkspaceId string `json:"WorkspaceId"` }
JobT is the basic type for creating jobs. The JobID is generated by the system and LastJobStatus is populated when reading a processed job while rest should be set by the user.
func QueryJobsWithRetries ¶ added in v1.1.0
type JobsDB ¶ added in v0.1.10
type JobsDB interface { // Identifier returns the jobsdb's identifier, a.k.a. table prefix Identifier() string // WithTx begins a new transaction that can be used by the provided function. // If the function returns an error, the transaction will be rollbacked and return the error, // otherwise the transaction will be committed and a nil error will be returned. WithTx(func(tx *sql.Tx) error) error // WithStoreSafeTx prepares a store-safe environment and then starts a transaction // that can be used by the provided function. WithStoreSafeTx(func(tx StoreSafeTx) error) error // Store stores the provided jobs to the database Store(ctx context.Context, jobList []*JobT) error // StoreInTx stores the provided jobs to the database using an existing transaction. // Please ensure that you are using an StoreSafeTx, e.g. // jobsdb.WithStoreSafeTx(func(tx StoreSafeTx) error { // jobsdb.StoreInTx(ctx, tx, jobList) // }) StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error // StoreWithRetryEach tries to store all the provided jobs to the database and returns the job uuids which failed StoreWithRetryEach(ctx context.Context, jobList []*JobT) map[uuid.UUID]string // StoreWithRetryEachInTx tries to store all the provided jobs to the database and returns the job uuids which failed, using an existing transaction. // Please ensure that you are using an StoreSafeTx, e.g. // jobsdb.WithStoreSafeTx(func(tx StoreSafeTx) error { // jobsdb.StoreWithRetryEachInTx(ctx, tx, jobList) // }) StoreWithRetryEachInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) (map[uuid.UUID]string, error) // WithUpdateSafeTx prepares an update-safe environment and then starts a transaction // that can be used by the provided function. An update-safe transaction shall be used if the provided function // needs to call UpdateJobStatusInTx. WithUpdateSafeTx(func(tx UpdateSafeTx) error) error // UpdateJobStatus updates the provided job statuses UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error // UpdateJobStatusInTx updates the provided job statuses in an existing transaction. // Please ensure that you are using an UpdateSafeTx, e.g. // jobsdb.WithUpdateSafeTx(func(tx UpdateSafeTx) error { // jobsdb.UpdateJobStatusInTx(ctx, tx, statusList, customValFilters, parameterFilters) // }) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error // GetUnprocessed finds unprocessed jobs. Unprocessed are new // jobs whose state hasn't been marked in the database yet GetUnprocessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetProcessed finds jobs in some state, i.e. not unprocessed GetProcessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetToRetry finds jobs in failed state GetToRetry(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetWaiting finds jobs in waiting state GetWaiting(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetExecuting finds jobs in executing state GetExecuting(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetImporting finds jobs in importing state GetImporting(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetPileUpCounts returns statistics (counters) of incomplete jobs // grouped by workspaceId and destination type GetPileUpCounts(ctx context.Context) (statMap map[string]map[string]int, err error) Status() interface{} Ping() error DeleteExecuting() GetJournalEntries(opType string) (entries []JournalEntryT) JournalDeleteEntry(opID int64) JournalMarkStart(opType string, opPayload json.RawMessage) int64 }
JobsDB interface contains public methods to access JobsDB data
type JobsResult ¶ added in v0.1.10
func QueryJobsResultWithRetries ¶ added in v1.1.0
type JobsdbUtilsHandler ¶ added in v0.1.10
type JobsdbUtilsHandler struct{}
Admin Handlers
func (*JobsdbUtilsHandler) RunSQLQuery ¶ added in v0.1.10
func (*JobsdbUtilsHandler) RunSQLQuery(argString string, reply *string) (err error)
type JournalEntryT ¶
type JournalEntryT struct { OpID int64 OpType string OpDone bool OpPayload json.RawMessage }
type MigrationCheckpointT ¶ added in v0.1.10
type MigrationCheckpointT struct { ID int64 `json:"ID"` MigrationType MigrationOp `json:"MigrationType"` // ENUM : export, import, acceptNewEvents FromNode string `json:"FromNode"` ToNode string `json:"ToNode"` JobsCount int64 `json:"JobsCount"` FileLocation string `json:"FileLocation"` Status string `json:"Status"` // ENUM : Look up 'Values for Status' StartSeq int64 `json:"StartSeq"` Payload json.RawMessage `json:"Payload"` TimeStamp time.Time `json:"TimeStamp"` }
MigrationCheckpointT captures an event of export/import to recover from incase of a crash during migration
func NewMigrationCheckpoint ¶ added in v0.1.10
func NewMigrationCheckpoint(migrationType MigrationOp, fromNode, toNode string, jobsCount int64, fileLocation, status string, startSeq int64) MigrationCheckpointT
NewMigrationCheckpoint is a constructor for MigrationCheckpoint struct
func NewSetupCheckpointEvent ¶ added in v0.1.10
func NewSetupCheckpointEvent(migrationType MigrationOp, node string) MigrationCheckpointT
NewSetupCheckpointEvent returns a new migration event that captures setup for export, import of new event acceptance
type MigrationOp ¶ added in v0.1.10
type MigrationOp string
MigrationOp is a custom type for supported types in migrationCheckpoint
type MultiTenantHandleT ¶ added in v0.1.10
type MultiTenantHandleT struct {
*HandleT
}
func (*MultiTenantHandleT) GetAllJobs ¶ added in v0.1.10
func (mj *MultiTenantHandleT) GetAllJobs(ctx context.Context, workspaceCount map[string]int, params GetQueryParamsT, maxDSQuerySize int) ([]*JobT, error)
type MultiTenantJobsDB ¶ added in v0.1.10
type MultiTenantJobsDB interface { GetAllJobs(context.Context, map[string]int, GetQueryParamsT, int) ([]*JobT, error) WithUpdateSafeTx(func(tx UpdateSafeTx) error) error UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error DeleteExecuting() GetJournalEntries(opType string) (entries []JournalEntryT) JournalMarkStart(opType string, opPayload json.RawMessage) int64 JournalDeleteEntry(opID int64) GetPileUpCounts(context.Context) (map[string]map[string]int, error) }
type MultiTenantLegacy ¶ added in v0.1.10
type MultiTenantLegacy struct {
*HandleT
}
func (*MultiTenantLegacy) GetAllJobs ¶ added in v0.1.10
func (mj *MultiTenantLegacy) GetAllJobs(ctx context.Context, workspaceCount map[string]int, params GetQueryParamsT, _ int) ([]*JobT, error)
type OptsFunc ¶ added in v0.1.10
type OptsFunc func(jd *HandleT)
func WithClearDB ¶ added in v0.1.10
WithClearDB, if set to true it will remove all existing tables
func WithMigrationMode ¶ added in v0.1.10
func WithPreBackupHandlers ¶ added in v0.1.10
WithPreBackupHandlers, sets pre-backup handlers
func WithStatusHandler ¶ added in v0.1.10
func WithStatusHandler() OptsFunc
type ParameterFilterT ¶
type QueryConditions ¶ added in v0.1.10
type QueryConditions struct { // if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used IgnoreCustomValFiltersInQuery bool CustomValFilters []string ParameterFilters []ParameterFilterT StateFilters []string }
QueryConditions holds jobsdb query conditions
type ReadonlyHandleT ¶ added in v0.1.10
func (*ReadonlyHandleT) GetDSListString ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetDSListString() (string, error)
func (*ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination(args []string) (string, error)
func (*ReadonlyHandleT) GetJobByID ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetJobByID(job_id, prefix string) (string, error)
func (*ReadonlyHandleT) GetJobIDStatus ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetJobIDStatus(jobID, _ string) (string, error)
func (*ReadonlyHandleT) GetJobIDsForUser ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetJobIDsForUser(args []string) (string, error)
func (*ReadonlyHandleT) GetJobSummaryCount ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetJobSummaryCount(arg, prefix string) (string, error)
func (*ReadonlyHandleT) GetLatestFailedJobs ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetLatestFailedJobs(arg, prefix string) (string, error)
func (*ReadonlyHandleT) HavePendingJobs ¶ added in v0.1.10
func (jd *ReadonlyHandleT) HavePendingJobs(ctx context.Context, customValFilters []string, count int, parameterFilters []ParameterFilterT) (bool, error)
Count queries
HavePendingJobs returns the true if there are pending events, else false. Pending events are those whose jobs don't have a state or whose jobs status is neither succeeded nor aborted
func (*ReadonlyHandleT) Setup ¶ added in v0.1.10
func (jd *ReadonlyHandleT) Setup(tablePrefix string)
Setup is used to initialize the ReadonlyHandleT structure.
func (*ReadonlyHandleT) TearDown ¶ added in v0.1.10
func (jd *ReadonlyHandleT) TearDown()
TearDown releases all the resources
type ReadonlyJobsDB ¶ added in v0.1.10
type ReadonlyJobsDB interface { HavePendingJobs(ctx context.Context, customValFilters []string, count int, parameterFilters []ParameterFilterT) (bool, error) GetJobSummaryCount(arg, prefix string) (string, error) GetLatestFailedJobs(arg, prefix string) (string, error) GetJobIDsForUser(args []string) (string, error) GetFailedStatusErrorCodeCountsByDestination(args []string) (string, error) GetDSListString() (string, error) GetJobIDStatus(job_id, prefix string) (string, error) GetJobByID(job_id, prefix string) (string, error) }
ReadonlyJobsDB interface contains public methods to access JobsDB data
type SQLJobStatusT ¶ added in v0.1.10
type SQLJobStatusT struct { JobID sql.NullInt64 JobState sql.NullString // ENUM waiting, executing, succeeded, waiting_retry, failed, aborted, migrated AttemptNum sql.NullInt64 ExecTime sql.NullTime RetryTime sql.NullTime ErrorCode sql.NullString ErrorResponse sql.NullString }
SQLJobStatusT is a temporary struct to handle nulls from postgres query
type StoreSafeTx ¶ added in v0.1.10
StoreSafeTx sealed interface
func EmptyStoreSafeTx ¶ added in v0.1.10
func EmptyStoreSafeTx() StoreSafeTx
EmptyStoreSafeTx returns an empty interface usable only for tests
type UpdateSafeTx ¶ added in v0.1.10
UpdateSafeTx sealed interface
func EmptyUpdateSafeTx ¶ added in v0.1.10
func EmptyUpdateSafeTx() UpdateSafeTx
EmptyUpdateSafeTx returns an empty interface usable only for tests