Documentation ¶
Index ¶
- Constants
- func GetConnectionString() string
- type BackupSettingsT
- type HandleT
- func (jd *HandleT) CheckPGHealth() bool
- func (jd *HandleT) GetDBHandle() *sql.DB
- func (jd *HandleT) GetExecuting(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT
- func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)
- func (jd *HandleT) GetMaxDSIndex() (maxDSIndex int64)
- func (jd *HandleT) GetProcessed(stateFilter []string, customValFilters []string, count int, ...) []*JobT
- func (jd *HandleT) GetToRetry(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT
- func (jd *HandleT) GetUnprocessed(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT
- func (jd *HandleT) GetWaiting(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT
- func (jd *HandleT) JournalDeleteEntry(opID int64)
- func (jd *HandleT) JournalMarkDone(opID int64)
- func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64
- func (jd *HandleT) RunTest()
- func (jd *HandleT) Setup(clearAll bool, tablePrefix string, retentionPeriod time.Duration)
- func (jd *HandleT) Store(jobList []*JobT) map[uuid.UUID]string
- func (jd *HandleT) TearDown()
- func (jd *HandleT) UpdateJobStatus(statusList []*JobStatusT, customValFilters []string, ...)
- type JobStatusT
- type JobT
- type JournalEntryT
- type ParameterFilterT
- type StoreJobRespT
Constants ¶
const ( SucceededState = "succeeded" FailedState = "failed" ExecutingState = "executing" AbortedState = "aborted" WaitingState = "waiting" WaitingRetryState = "waiting_retry" InternalState = "NP" )
constants for JobStatusT JobState
const (
RawDataDestUploadOperation = "S3_DEST_UPLOAD"
)
We keep a journal of all the operations. The journal helps
Variables ¶
This section is empty.
Functions ¶
func GetConnectionString ¶
func GetConnectionString() string
Types ¶
type BackupSettingsT ¶
BackupSettingsT is for capturing the backup configuration from the config/env files to instantiate jobdb correctly
type HandleT ¶
type HandleT struct { BackupSettings *BackupSettingsT // contains filtered or unexported fields }
HandleT is the main type implementing the database for implementing jobs. The caller must call the SetUp function on a HandleT object
func (*HandleT) CheckPGHealth ¶
CheckPGHealth returns health check for pg database
func (*HandleT) GetDBHandle ¶
func (*HandleT) GetExecuting ¶
func (jd *HandleT) GetExecuting(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT
GetExecuting returns events which in executing state
func (*HandleT) GetJournalEntries ¶
func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)
func (*HandleT) GetMaxDSIndex ¶
* Function to return max dataset index in the DB
func (*HandleT) GetProcessed ¶
func (jd *HandleT) GetProcessed(stateFilter []string, customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT
GetProcessed returns events of a given state. This does not update any state itself and relises on the caller to update it. That means that successive calls to GetProcessed("failed") can return the same set of events. It is the responsibility of the caller to call it from one thread, update the state (to "waiting") in the same thread and pass on the the processors
func (*HandleT) GetToRetry ¶
func (jd *HandleT) GetToRetry(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT
GetToRetry returns events which need to be retried. This is a wrapper over GetProcessed call above
func (*HandleT) GetUnprocessed ¶
func (jd *HandleT) GetUnprocessed(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT
GetUnprocessed returns the unprocessed events. Unprocessed events are those whose state hasn't been marked in the DB
func (*HandleT) GetWaiting ¶
func (jd *HandleT) GetWaiting(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT
GetWaiting returns events which are under processing This is a wrapper over GetProcessed call above
func (*HandleT) JournalDeleteEntry ¶
func (*HandleT) JournalMarkDone ¶
func (*HandleT) JournalMarkStart ¶
func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64
func (*HandleT) Setup ¶
Setup is used to initialize the HandleT structure. clearAll = True means it will remove all existing tables tablePrefix must be unique and is used to separate multiple users of JobsDB dsRetentionPeriod = A DS is not deleted if it has some activity in the retention time
func (*HandleT) UpdateJobStatus ¶
func (jd *HandleT) UpdateJobStatus(statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT)
UpdateJobStatus updates the status of a batch of jobs customValFilters[] is passed so we can efficinetly mark empty cache Later we can move this to query
type JobStatusT ¶
type JobStatusT struct { JobID int64 JobState string //ENUM waiting, executing, succeeded, waiting_retry, failed, aborted AttemptNum int ExecTime time.Time RetryTime time.Time ErrorCode string ErrorResponse json.RawMessage }
JobStatusT is used for storing status of the job. It is the responsibility of the user of this module to set appropriate job status. State can be one of ENUM waiting, executing, succeeded, waiting_retry, failed, aborted
type JobT ¶
type JobT struct { UUID uuid.UUID JobID int64 CreatedAt time.Time ExpireAt time.Time CustomVal string EventPayload json.RawMessage LastJobStatus JobStatusT Parameters json.RawMessage }
JobT is the basic type for creating jobs. The JobID is generated by the system and LastJobStatus is populated when reading a processed job while rest should be set by the user.
type JournalEntryT ¶
type JournalEntryT struct { OpID int64 OpType string OpDone bool OpPayload json.RawMessage }