jobsdb

package
v0.1.9-patch-02 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2020 License: AGPL-3.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SucceededState    = "succeeded"
	FailedState       = "failed"
	ExecutingState    = "executing"
	AbortedState      = "aborted"
	WaitingState      = "waiting"
	WaitingRetryState = "waiting_retry"
	InternalState     = "NP"
)

constants for JobStatusT JobState

View Source
const (
	RawDataDestUploadOperation = "S3_DEST_UPLOAD"
)

We keep a journal of all the operations. The journal helps

Variables

This section is empty.

Functions

func GetConnectionString

func GetConnectionString() string

Types

type BackupSettingsT

type BackupSettingsT struct {
	BackupEnabled bool
	FailedOnly    bool
	PathPrefix    string
}

BackupSettingsT is for capturing the backup configuration from the config/env files to instantiate jobdb correctly

type HandleT

type HandleT struct {
	BackupSettings *BackupSettingsT
	// contains filtered or unexported fields
}

HandleT is the main type implementing the database for implementing jobs. The caller must call the SetUp function on a HandleT object

func (*HandleT) CheckPGHealth

func (jd *HandleT) CheckPGHealth() bool

CheckPGHealth returns health check for pg database

func (*HandleT) GetDBHandle

func (jd *HandleT) GetDBHandle() *sql.DB

func (*HandleT) GetExecuting

func (jd *HandleT) GetExecuting(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT

GetExecuting returns events which in executing state

func (*HandleT) GetJournalEntries

func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)

func (*HandleT) GetMaxDSIndex

func (jd *HandleT) GetMaxDSIndex() (maxDSIndex int64)

* Function to return max dataset index in the DB

func (*HandleT) GetProcessed

func (jd *HandleT) GetProcessed(stateFilter []string, customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT

GetProcessed returns events of a given state. This does not update any state itself and relises on the caller to update it. That means that successive calls to GetProcessed("failed") can return the same set of events. It is the responsibility of the caller to call it from one thread, update the state (to "waiting") in the same thread and pass on the the processors

func (*HandleT) GetToRetry

func (jd *HandleT) GetToRetry(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT

GetToRetry returns events which need to be retried. This is a wrapper over GetProcessed call above

func (*HandleT) GetUnprocessed

func (jd *HandleT) GetUnprocessed(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT

GetUnprocessed returns the unprocessed events. Unprocessed events are those whose state hasn't been marked in the DB

func (*HandleT) GetWaiting

func (jd *HandleT) GetWaiting(customValFilters []string, count int, parameterFilters []ParameterFilterT) []*JobT

GetWaiting returns events which are under processing This is a wrapper over GetProcessed call above

func (*HandleT) JournalDeleteEntry

func (jd *HandleT) JournalDeleteEntry(opID int64)

func (*HandleT) JournalMarkDone

func (jd *HandleT) JournalMarkDone(opID int64)

func (*HandleT) JournalMarkStart

func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64

func (*HandleT) RunTest

func (jd *HandleT) RunTest()

RunTest runs some internal tests

func (*HandleT) Setup

func (jd *HandleT) Setup(clearAll bool, tablePrefix string, retentionPeriod time.Duration)

Setup is used to initialize the HandleT structure. clearAll = True means it will remove all existing tables tablePrefix must be unique and is used to separate multiple users of JobsDB dsRetentionPeriod = A DS is not deleted if it has some activity in the retention time

func (*HandleT) Store

func (jd *HandleT) Store(jobList []*JobT) map[uuid.UUID]string

Store call is used to create new Jobs

func (*HandleT) TearDown

func (jd *HandleT) TearDown()

TearDown releases all the resources

func (*HandleT) UpdateJobStatus

func (jd *HandleT) UpdateJobStatus(statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT)

UpdateJobStatus updates the status of a batch of jobs customValFilters[] is passed so we can efficinetly mark empty cache Later we can move this to query

type JobStatusT

type JobStatusT struct {
	JobID         int64
	JobState      string //ENUM waiting, executing, succeeded, waiting_retry,  failed, aborted
	AttemptNum    int
	ExecTime      time.Time
	RetryTime     time.Time
	ErrorCode     string
	ErrorResponse json.RawMessage
}

JobStatusT is used for storing status of the job. It is the responsibility of the user of this module to set appropriate job status. State can be one of ENUM waiting, executing, succeeded, waiting_retry, failed, aborted

type JobT

type JobT struct {
	UUID          uuid.UUID
	JobID         int64
	CreatedAt     time.Time
	ExpireAt      time.Time
	CustomVal     string
	EventPayload  json.RawMessage
	LastJobStatus JobStatusT
	Parameters    json.RawMessage
}

JobT is the basic type for creating jobs. The JobID is generated by the system and LastJobStatus is populated when reading a processed job while rest should be set by the user.

type JournalEntryT

type JournalEntryT struct {
	OpID      int64
	OpType    string
	OpDone    bool
	OpPayload json.RawMessage
}

type ParameterFilterT

type ParameterFilterT struct {
	Name     string
	Value    string
	Optional bool
}

type StoreJobRespT

type StoreJobRespT struct {
	JobID        int64
	ErrorMessage string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL