Documentation ¶
Index ¶
- Constants
- Variables
- func GetConnectionString() string
- type AssertInterface
- type BackupSettingsT
- type DSPair
- type ErrorCodeCountStats
- type ErrorCodeCountsByDestination
- type EventStatusDetailed
- type EventStatusStats
- type FailedJobs
- type FailedJobsStats
- type FailedStatusStats
- type GetQueryParamsT
- type HandleT
- func (jd *HandleT) AcquireStoreLock()
- func (jd *HandleT) AcquireUpdateJobStatusLocks()
- func (jd *HandleT) BeginGlobalTransaction() *sql.Tx
- func (jd *HandleT) CheckPGHealth() bool
- func (jd *HandleT) Checkpoint(migrationCheckpoint MigrationCheckpointT) int64
- func (jd *HandleT) CheckpointInTxn(txHandler transactionHandler, migrationCheckpoint MigrationCheckpointT) (int64, error)
- func (jd *HandleT) CommitTransaction(txn *sql.Tx)
- func (jd *HandleT) DeleteExecuting(params GetQueryParamsT)
- func (jd *HandleT) GetCheckpoints(migrationType MigrationOp, status string) []MigrationCheckpointT
- func (jd *HandleT) GetExecuting(params GetQueryParamsT) []*JobT
- func (jd *HandleT) GetIdentifier() string
- func (jd *HandleT) GetImportingList(params GetQueryParamsT) []*JobT
- func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)
- func (jd *HandleT) GetLastJob() *JobT
- func (jd *HandleT) GetLastJobID() int64
- func (jd *HandleT) GetLastJobIDBeforeImport() int64
- func (jd *HandleT) GetMaxDSIndex() (maxDSIndex int64)
- func (jd *HandleT) GetMaxIDForDs(ds dataSetT) int64
- func (jd *HandleT) GetNonMigratedAndMarkMigrating(count int) []*JobT
- func (jd *HandleT) GetProcessed(params GetQueryParamsT) []*JobT
- func (jd *HandleT) GetSetupCheckpoint(migrationType MigrationOp) (MigrationCheckpointT, bool)
- func (jd *HandleT) GetTablePrefix() string
- func (jd *HandleT) GetThrottled(params GetQueryParamsT) []*JobT
- func (jd *HandleT) GetToRetry(params GetQueryParamsT) []*JobT
- func (jd *HandleT) GetUnprocessed(params GetQueryParamsT) []*JobT
- func (jd *HandleT) GetUserID(job *JobT) string
- func (jd *HandleT) GetWaiting(params GetQueryParamsT) []*JobT
- func (jd *HandleT) IsMigrating() bool
- func (jd *HandleT) JournalDeleteEntry(opID int64)
- func (jd *HandleT) JournalMarkDone(opID int64)
- func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64
- func (jd *HandleT) PostExportCleanup()
- func (jd *HandleT) PreExportCleanup()
- func (jd *HandleT) RecoverFromMigrationJournal()
- func (jd *HandleT) ReleaseStoreLock()
- func (jd *HandleT) ReleaseUpdateJobStatusLocks()
- func (jd *HandleT) SchemaMigrationTable() string
- func (jd *HandleT) Setup(ownerType OwnerType, clearAll bool, tablePrefix string, ...)
- func (jd *HandleT) SetupCheckpointTable()
- func (jd *HandleT) SetupForExport()
- func (jd *HandleT) SetupForImport()
- func (jd *HandleT) SetupForMigration(fromVersion int, toVersion int)
- func (jd *HandleT) Status() interface{}
- func (jd *HandleT) Store(jobList []*JobT) error
- func (jd *HandleT) StoreJobsAndCheckpoint(jobList []*JobT, migrationCheckpoint MigrationCheckpointT)
- func (jd *HandleT) StoreWithRetryEach(jobList []*JobT) map[uuid.UUID]string
- func (jd *HandleT) TearDown()
- func (jd *HandleT) UpdateJobStatus(statusList []*JobStatusT, customValFilters []string, ...) error
- func (jd *HandleT) UpdateJobStatusAndCheckpoint(statusList []*JobStatusT, fromNodeID string, toNodeID string, jobsCount int64, ...)
- func (jd *HandleT) UpdateJobStatusInTxn(txn *sql.Tx, statusList []*JobStatusT, customValFilters []string, ...) error
- func (jd *HandleT) UpdateSequenceNumberOfLatestDS(seqNoForNewDS int64)
- type JobStatusT
- type JobT
- type JobsDB
- type JobsdbUtilsHandler
- type JournalEntryT
- type MigrationCheckpointT
- type MigrationOp
- type MigrationState
- type OwnerType
- type ParameterFilterT
- type QueryFiltersT
- type ReadonlyHandleT
- func (jd *ReadonlyHandleT) GetDSListString() (string, error)
- func (jd *ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination(args []string) (string, error)
- func (jd *ReadonlyHandleT) GetJobByID(job_id string, prefix string) (string, error)
- func (jd *ReadonlyHandleT) GetJobIDStatus(job_id string, prefix string) (string, error)
- func (jd *ReadonlyHandleT) GetJobIDsForUser(args []string) (string, error)
- func (jd *ReadonlyHandleT) GetJobSummaryCount(arg string, prefix string) (string, error)
- func (jd *ReadonlyHandleT) GetLatestFailedJobs(arg string, prefix string) (string, error)
- func (jd *ReadonlyHandleT) GetPendingJobsCount(customValFilters []string, count int, parameterFilters []ParameterFilterT) int64
- func (jd *ReadonlyHandleT) GetUnprocessedCount(customValFilters []string, parameterFilters []ParameterFilterT) int64
- func (jd *ReadonlyHandleT) Setup(tablePrefix string)
- func (jd *ReadonlyHandleT) TearDown()
- type ReadonlyJobsDB
- type SQLJobStatusT
- type StatTagsT
- type StoreJobRespT
Constants ¶
const ( ExportOp = "export" ImportOp = "import" AcceptNewEventsOp = "acceptNewEvents" )
ENUM Values for MigrationOp
const ( SetupForExport = "setup_for_export" Exported = "exported" Notified = "notified" Completed = "completed" SetupToAcceptNewEvents = "setup_to_accept_new_events" SetupForImport = "setup_for_import" PreparedForImport = "prepared_for_import" Imported = "imported" )
ENUM Values for Status
const ( MigrationCheckpointSuffix = "migration_checkpoints" UniqueConstraintSuffix = "unique_checkpoint" )
MigrationCheckpointSuffix : Suffix for checkpoints table
const (
RawDataDestUploadOperation = "S3_DEST_UPLOAD"
)
We keep a journal of all the operations. The journal helps
Variables ¶
var ( //Not valid, Not terminal NotProcessed = jobStateT{State: "not_picked_yet", /* contains filtered or unexported fields */} //Valid, Not terminal Failed = jobStateT{State: "failed", /* contains filtered or unexported fields */} Executing = jobStateT{State: "executing", /* contains filtered or unexported fields */} Waiting = jobStateT{State: "waiting", /* contains filtered or unexported fields */} WaitingRetry = jobStateT{State: "waiting_retry", /* contains filtered or unexported fields */} Migrating = jobStateT{State: "migrating", /* contains filtered or unexported fields */} Throttled = jobStateT{State: "throttled", /* contains filtered or unexported fields */} Importing = jobStateT{State: "importing", /* contains filtered or unexported fields */} //Valid, Terminal Succeeded = jobStateT{State: "succeeded", /* contains filtered or unexported fields */} Aborted = jobStateT{State: "aborted", /* contains filtered or unexported fields */} Migrated = jobStateT{State: "migrated", /* contains filtered or unexported fields */} WontMigrate = jobStateT{State: "wont_migrate", /* contains filtered or unexported fields */} )
State definitions
Functions ¶
func GetConnectionString ¶
func GetConnectionString() string
GetConnectionString Returns Jobs DB connection configuration
Types ¶
type AssertInterface ¶ added in v1.0.0
type AssertInterface interface {
// contains filtered or unexported methods
}
AssertInterface contains public assert methods
type BackupSettingsT ¶
BackupSettingsT is for capturing the backup configuration from the config/env files to instantiate jobdb correctly
type ErrorCodeCountStats ¶ added in v0.1.10
type ErrorCodeCountStats struct {
ErrorCodeCounts []ErrorCodeCountsByDestination
}
type ErrorCodeCountsByDestination ¶ added in v0.1.10
type EventStatusDetailed ¶ added in v0.1.10
type EventStatusStats ¶ added in v0.1.10
type EventStatusStats struct { StatsNums []EventStatusDetailed DSList string }
type FailedJobs ¶ added in v0.1.10
type FailedJobsStats ¶ added in v0.1.10
type FailedJobsStats struct {
FailedNums []FailedJobs
}
type FailedStatusStats ¶ added in v0.1.10
type FailedStatusStats struct {
FailedStatusStats []JobStatusT
}
type GetQueryParamsT ¶ added in v0.1.10
type GetQueryParamsT struct { CustomValFilters []string ParameterFilters []ParameterFilterT StateFilters []string Count int IgnoreCustomValFiltersInQuery bool UseTimeFilter bool Before time.Time }
GetQueryParamsT is a struct to hold jobsdb query params
type HandleT ¶
type HandleT struct { BackupSettings *BackupSettingsT // contains filtered or unexported fields }
HandleT is the main type implementing the database for implementing jobs. The caller must call the SetUp function on a HandleT object
func (*HandleT) AcquireStoreLock ¶ added in v1.0.0
func (jd *HandleT) AcquireStoreLock()
AcquireStoreLock acquires locks necessary for storing jobs in transaction
func (*HandleT) AcquireUpdateJobStatusLocks ¶ added in v1.0.0
func (jd *HandleT) AcquireUpdateJobStatusLocks()
AcquireUpdateJobStatusLocks acquires locks necessary for updating job statuses in transaction
func (*HandleT) BeginGlobalTransaction ¶ added in v1.0.0
BeginGlobalTransaction starts a transaction on the globalDBHandle to be used across jobsdb instances
func (*HandleT) CheckPGHealth ¶
CheckPGHealth returns health check for pg database
func (*HandleT) Checkpoint ¶ added in v0.1.10
func (jd *HandleT) Checkpoint(migrationCheckpoint MigrationCheckpointT) int64
Checkpoint writes a migration event if id is passed as 0. Else it will update status and start_sequence
func (*HandleT) CheckpointInTxn ¶ added in v0.1.10
func (jd *HandleT) CheckpointInTxn(txHandler transactionHandler, migrationCheckpoint MigrationCheckpointT) (int64, error)
CheckpointInTxn writes a migration event if id is passed as 0. Else it will update status and start_sequence If txn is passed, it will run the statement in that txn, otherwise it will execute without a transaction
func (*HandleT) CommitTransaction ¶ added in v1.0.0
CommitTransaction commits the passed transaction
func (*HandleT) DeleteExecuting ¶ added in v0.1.10
func (jd *HandleT) DeleteExecuting(params GetQueryParamsT)
DeleteExecuting deletes events whose latest job state is executing. This is only done during recovery, which happens during the server start.
func (*HandleT) GetCheckpoints ¶ added in v0.1.10
func (jd *HandleT) GetCheckpoints(migrationType MigrationOp, status string) []MigrationCheckpointT
GetCheckpoints gets all checkpoints and
func (*HandleT) GetExecuting ¶
func (jd *HandleT) GetExecuting(params GetQueryParamsT) []*JobT
func (*HandleT) GetIdentifier ¶ added in v1.0.0
GetIdentifier returns the identifier of the jobsdb. Here it is tablePrefix.
func (*HandleT) GetImportingList ¶ added in v1.0.0
func (jd *HandleT) GetImportingList(params GetQueryParamsT) []*JobT
func (*HandleT) GetJournalEntries ¶
func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)
func (*HandleT) GetLastJob ¶ added in v0.1.10
func (*HandleT) GetLastJobID ¶ added in v0.1.10
func (*HandleT) GetLastJobIDBeforeImport ¶ added in v0.1.10
GetLastJobIDBeforeImport should return the largest job id stored so far
func (*HandleT) GetMaxDSIndex ¶
* Function to return max dataset index in the DB
func (*HandleT) GetMaxIDForDs ¶ added in v0.1.10
func (*HandleT) GetNonMigratedAndMarkMigrating ¶ added in v0.1.10
GetNonMigratedAndMarkMigrating all jobs with no filters
func (*HandleT) GetProcessed ¶
func (jd *HandleT) GetProcessed(params GetQueryParamsT) []*JobT
GetProcessed returns events of a given state. This does not update any state itself and realises on the caller to update it. That means that successive calls to GetProcessed("failed") can return the same set of events. It is the responsibility of the caller to call it from one thread, update the state (to "waiting") in the same thread and pass on the the processors
func (*HandleT) GetSetupCheckpoint ¶ added in v0.1.10
func (jd *HandleT) GetSetupCheckpoint(migrationType MigrationOp) (MigrationCheckpointT, bool)
GetSetupCheckpoint gets all checkpoints and picks out the setup event for that type
func (*HandleT) GetTablePrefix ¶ added in v0.1.10
GetTablePrefix returns the table prefix of the jobsdb.
func (*HandleT) GetThrottled ¶ added in v1.0.0
func (jd *HandleT) GetThrottled(params GetQueryParamsT) []*JobT
GetThrottled returns events which were throttled before If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.
func (*HandleT) GetToRetry ¶
func (jd *HandleT) GetToRetry(params GetQueryParamsT) []*JobT
GetToRetry returns events which need to be retried. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.
func (*HandleT) GetUnprocessed ¶
func (jd *HandleT) GetUnprocessed(params GetQueryParamsT) []*JobT
GetUnprocessed returns the unprocessed events. Unprocessed events are those whose state hasn't been marked in the DB. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.
func (*HandleT) GetWaiting ¶
func (jd *HandleT) GetWaiting(params GetQueryParamsT) []*JobT
GetWaiting returns events which are under processing If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.
func (*HandleT) IsMigrating ¶ added in v0.1.10
IsMigrating returns true if there are non zero jobs with status = 'migrating'
func (*HandleT) JournalDeleteEntry ¶
func (*HandleT) JournalMarkDone ¶
JournalMarkDone marks the end of a journal action
func (*HandleT) JournalMarkStart ¶
func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64
func (*HandleT) PostExportCleanup ¶ added in v0.1.10
func (jd *HandleT) PostExportCleanup()
PostExportCleanup removes all the entries from job_status_tables that are of state 'wont_migrate' or 'migrating'
func (*HandleT) PreExportCleanup ¶ added in v0.1.10
func (jd *HandleT) PreExportCleanup()
PreExportCleanup removes all the entries from job_status_tables that are of state 'migrating'
func (*HandleT) RecoverFromMigrationJournal ¶ added in v0.1.10
func (jd *HandleT) RecoverFromMigrationJournal()
RecoverFromMigrationJournal is an exposed function for migrator package to handle journal crashes during migration
func (*HandleT) ReleaseStoreLock ¶ added in v1.0.0
func (jd *HandleT) ReleaseStoreLock()
ReleaseStoreLock releases locks held to store jobs in transaction
func (*HandleT) ReleaseUpdateJobStatusLocks ¶ added in v1.0.0
func (jd *HandleT) ReleaseUpdateJobStatusLocks()
ReleaseUpdateJobStatusLocks releases locks held to update job statuses in transaction
func (*HandleT) SchemaMigrationTable ¶ added in v0.1.10
SchemaMigrationTable returns the table name used for storing current schema version.
func (*HandleT) Setup ¶
func (jd *HandleT) Setup(ownerType OwnerType, clearAll bool, tablePrefix string, retentionPeriod time.Duration, migrationMode string, registerStatusHandler bool, queryFilterKeys QueryFiltersT)
Setup is used to initialize the HandleT structure. clearAll = True means it will remove all existing tables tablePrefix must be unique and is used to separate multiple users of JobsDB dsRetentionPeriod = A DS is not deleted if it has some activity in the retention time
func (*HandleT) SetupCheckpointTable ¶ added in v0.1.10
func (jd *HandleT) SetupCheckpointTable()
SetupCheckpointTable creates a table
func (*HandleT) SetupForExport ¶ added in v0.1.10
func (jd *HandleT) SetupForExport()
SetupForExport is used to setup jobsdb for export or for import or for both
func (*HandleT) SetupForImport ¶ added in v0.1.10
func (jd *HandleT) SetupForImport()
SetupForImport is used to setup jobsdb for export or for import or for both
func (*HandleT) SetupForMigration ¶ added in v0.1.10
SetupForMigration prepares jobsdb to start migrations
func (*HandleT) Store ¶
Store call is used to create new Jobs If enableWriterQueue is true, this goes through writer worker pool.
func (*HandleT) StoreJobsAndCheckpoint ¶ added in v0.1.10
func (jd *HandleT) StoreJobsAndCheckpoint(jobList []*JobT, migrationCheckpoint MigrationCheckpointT)
StoreJobsAndCheckpoint is used to write the jobs to _tables
func (*HandleT) StoreWithRetryEach ¶ added in v0.1.10
func (*HandleT) UpdateJobStatus ¶
func (jd *HandleT) UpdateJobStatus(statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error
func (*HandleT) UpdateJobStatusAndCheckpoint ¶ added in v0.1.10
func (jd *HandleT) UpdateJobStatusAndCheckpoint(statusList []*JobStatusT, fromNodeID string, toNodeID string, jobsCount int64, uploadLocation string)
UpdateJobStatusAndCheckpoint does update job status and checkpoint in a single transaction
func (*HandleT) UpdateJobStatusInTxn ¶ added in v1.0.0
func (jd *HandleT) UpdateJobStatusInTxn(txn *sql.Tx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error
UpdateJobStatusInTxn updates the status of a batch of jobs in the passed transaction customValFilters[] is passed so we can efficinetly mark empty cache Later we can move this to query IMP NOTE: AcquireUpdateJobStatusLocks Should be called before calling this function
func (*HandleT) UpdateSequenceNumberOfLatestDS ¶ added in v0.1.10
UpdateSequenceNumberOfLatestDS updates (if not already updated) the sequence number of the right most dataset to the seq no provided.
type JobStatusT ¶
type JobStatusT struct { JobID int64 `json:"JobID"` JobState string `json:"JobState"` //ENUM waiting, executing, succeeded, waiting_retry, failed, aborted, migrating, migrated, wont_migrate AttemptNum int `json:"AttemptNum"` ExecTime time.Time `json:"ExecTime"` RetryTime time.Time `json:"RetryTime"` ErrorCode string `json:"ErrorCode"` ErrorResponse json.RawMessage `json:"ErrorResponse"` Parameters json.RawMessage `json:"Parameters"` }
JobStatusT is used for storing status of the job. It is the responsibility of the user of this module to set appropriate job status. State can be one of ENUM waiting, executing, succeeded, waiting_retry, failed, aborted
func BuildStatus ¶ added in v0.1.10
func BuildStatus(job *JobT, jobState string) *JobStatusT
BuildStatus generates a struct of type JobStatusT for a given job and jobState
type JobT ¶
type JobT struct { UUID uuid.UUID `json:"UUID"` JobID int64 `json:"JobID"` UserID string `json:"UserID"` CreatedAt time.Time `json:"CreatedAt"` ExpireAt time.Time `json:"ExpireAt"` CustomVal string `json:"CustomVal"` EventPayload json.RawMessage `json:"EventPayload"` LastJobStatus JobStatusT `json:"LastJobStatus"` Parameters json.RawMessage `json:"Parameters"` }
JobT is the basic type for creating jobs. The JobID is generated by the system and LastJobStatus is populated when reading a processed job while rest should be set by the user.
type JobsDB ¶ added in v0.1.10
type JobsDB interface { Store(jobList []*JobT) error BeginGlobalTransaction() *sql.Tx CommitTransaction(txn *sql.Tx) AcquireStoreLock() ReleaseStoreLock() StoreWithRetryEach(jobList []*JobT) map[uuid.UUID]string CheckPGHealth() bool UpdateJobStatus(statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error UpdateJobStatusInTxn(txHandler *sql.Tx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error AcquireUpdateJobStatusLocks() ReleaseUpdateJobStatusLocks() GetToRetry(params GetQueryParamsT) []*JobT GetWaiting(params GetQueryParamsT) []*JobT GetThrottled(params GetQueryParamsT) []*JobT GetProcessed(params GetQueryParamsT) []*JobT GetUnprocessed(params GetQueryParamsT) []*JobT GetExecuting(params GetQueryParamsT) []*JobT GetImportingList(params GetQueryParamsT) []*JobT Status() interface{} GetIdentifier() string DeleteExecuting(params GetQueryParamsT) GetJournalEntries(opType string) (entries []JournalEntryT) JournalDeleteEntry(opID int64) JournalMarkStart(opType string, opPayload json.RawMessage) int64 }
JobsDB interface contains public methods to access JobsDB data
type JobsdbUtilsHandler ¶ added in v0.1.10
type JobsdbUtilsHandler struct { }
Admin Handlers
func (*JobsdbUtilsHandler) RunSQLQuery ¶ added in v0.1.10
func (handler *JobsdbUtilsHandler) RunSQLQuery(argString string, reply *string) (err error)
type JournalEntryT ¶
type JournalEntryT struct { OpID int64 OpType string OpDone bool OpPayload json.RawMessage }
type MigrationCheckpointT ¶ added in v0.1.10
type MigrationCheckpointT struct { ID int64 `json:"ID"` MigrationType MigrationOp `json:"MigrationType"` //ENUM : export, import, acceptNewEvents FromNode string `json:"FromNode"` ToNode string `json:"ToNode"` JobsCount int64 `json:"JobsCount"` FileLocation string `json:"FileLocation"` Status string `json:"Status"` //ENUM : Look up 'Values for Status' StartSeq int64 `json:"StartSeq"` Payload json.RawMessage `json:"Payload"` TimeStamp time.Time `json:"TimeStamp"` }
MigrationCheckpointT captures an event of export/import to recover from incase of a crash during migration
func NewMigrationCheckpoint ¶ added in v0.1.10
func NewMigrationCheckpoint(migrationType MigrationOp, fromNode string, toNode string, jobsCount int64, fileLocation string, status string, startSeq int64) MigrationCheckpointT
NewMigrationCheckpoint is a constructor for MigrationCheckpoint struct
func NewSetupCheckpointEvent ¶ added in v0.1.10
func NewSetupCheckpointEvent(migrationType MigrationOp, node string) MigrationCheckpointT
NewSetupCheckpointEvent returns a new migration event that captures setup for export, import of new event acceptance
type MigrationOp ¶ added in v0.1.10
type MigrationOp string
MigrationOp is a custom type for supported types in migrationCheckpoint
type MigrationState ¶ added in v1.0.0
type MigrationState struct {
// contains filtered or unexported fields
}
MigrationState maintains the state required during the migration process
type ParameterFilterT ¶
type QueryFiltersT ¶ added in v0.1.10
type ReadonlyHandleT ¶ added in v0.1.10
func (*ReadonlyHandleT) GetDSListString ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetDSListString() (string, error)
func (*ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination(args []string) (string, error)
func (*ReadonlyHandleT) GetJobByID ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetJobByID(job_id string, prefix string) (string, error)
func (*ReadonlyHandleT) GetJobIDStatus ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetJobIDStatus(job_id string, prefix string) (string, error)
func (*ReadonlyHandleT) GetJobIDsForUser ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetJobIDsForUser(args []string) (string, error)
func (*ReadonlyHandleT) GetJobSummaryCount ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetJobSummaryCount(arg string, prefix string) (string, error)
func (*ReadonlyHandleT) GetLatestFailedJobs ¶ added in v0.1.10
func (jd *ReadonlyHandleT) GetLatestFailedJobs(arg string, prefix string) (string, error)
func (*ReadonlyHandleT) GetPendingJobsCount ¶ added in v1.0.0
func (jd *ReadonlyHandleT) GetPendingJobsCount(customValFilters []string, count int, parameterFilters []ParameterFilterT) int64
Count queries
GetPendingJobsCount returns the count of pending events. Pending events are those whose jobs don't have a state or whose jobs status is neither succeeded nor aborted
func (*ReadonlyHandleT) GetUnprocessedCount ¶ added in v1.0.0
func (jd *ReadonlyHandleT) GetUnprocessedCount(customValFilters []string, parameterFilters []ParameterFilterT) int64
GetUnprocessedCount returns the number of unprocessed events. Unprocessed events are those whose state hasn't been marked in the DB
func (*ReadonlyHandleT) Setup ¶ added in v0.1.10
func (jd *ReadonlyHandleT) Setup(tablePrefix string)
Setup is used to initialize the ReadonlyHandleT structure.
func (*ReadonlyHandleT) TearDown ¶ added in v0.1.10
func (jd *ReadonlyHandleT) TearDown()
TearDown releases all the resources
type ReadonlyJobsDB ¶ added in v0.1.10
type ReadonlyJobsDB interface { GetPendingJobsCount(customValFilters []string, count int, parameterFilters []ParameterFilterT) int64 GetUnprocessedCount(customValFilters []string, parameterFilters []ParameterFilterT) int64 GetJobSummaryCount(arg string, prefix string) (string, error) GetLatestFailedJobs(arg string, prefix string) (string, error) GetJobIDsForUser(args []string) (string, error) GetFailedStatusErrorCodeCountsByDestination(args []string) (string, error) GetDSListString() (string, error) GetJobIDStatus(job_id string, prefix string) (string, error) GetJobByID(job_id string, prefix string) (string, error) }
ReadonlyJobsDB interface contains public methods to access JobsDB data
type SQLJobStatusT ¶ added in v0.1.10
type SQLJobStatusT struct { JobID sql.NullInt64 JobState sql.NullString //ENUM waiting, executing, succeeded, waiting_retry, failed, aborted, migrated AttemptNum sql.NullInt64 ExecTime sql.NullTime RetryTime sql.NullTime ErrorCode sql.NullString ErrorResponse sql.NullString }
SQLJobStatusT is a temporary struct to handle nulls from postgres query
type StatTagsT ¶ added in v1.0.0
type StatTagsT struct { CustomValFilters []string ParameterFilters []ParameterFilterT StateFilters []string }
StatTagsT is a struct to hold tags for stats