jobsdb

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2022 License: AGPL-3.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExportOp          = "export"
	ImportOp          = "import"
	AcceptNewEventsOp = "acceptNewEvents"
)

ENUM Values for MigrationOp

View Source
const (
	SetupForExport = "setup_for_export"
	Exported       = "exported"
	Notified       = "notified"
	Completed      = "completed"

	SetupToAcceptNewEvents = "setup_to_accept_new_events"
	SetupForImport         = "setup_for_import"
	PreparedForImport      = "prepared_for_import"
	Imported               = "imported"
)

ENUM Values for Status

View Source
const (
	MigrationCheckpointSuffix = "migration_checkpoints"
	UniqueConstraintSuffix    = "unique_checkpoint"
)

MigrationCheckpointSuffix : Suffix for checkpoints table

View Source
const (
	RawDataDestUploadOperation = "S3_DEST_UPLOAD"
)

We keep a journal of all the operations. The journal helps

Variables

View Source
var (
	// Not valid, Not terminal
	NotProcessed = jobStateT{State: "not_picked_yet", /* contains filtered or unexported fields */}

	// Valid, Not terminal
	Failed       = jobStateT{State: "failed", /* contains filtered or unexported fields */}
	Executing    = jobStateT{State: "executing", /* contains filtered or unexported fields */}
	Waiting      = jobStateT{State: "waiting", /* contains filtered or unexported fields */}
	WaitingRetry = jobStateT{State: "waiting_retry", /* contains filtered or unexported fields */}
	Migrating    = jobStateT{State: "migrating", /* contains filtered or unexported fields */}
	Importing    = jobStateT{State: "importing", /* contains filtered or unexported fields */}

	// Valid, Terminal
	Succeeded   = jobStateT{State: "succeeded", /* contains filtered or unexported fields */}
	Aborted     = jobStateT{State: "aborted", /* contains filtered or unexported fields */}
	Migrated    = jobStateT{State: "migrated", /* contains filtered or unexported fields */}
	WontMigrate = jobStateT{State: "wont_migrate", /* contains filtered or unexported fields */}
)

State definitions

Functions

func GetConnectionString

func GetConnectionString() string

GetConnectionString Returns Jobs DB connection configuration

func Init added in v0.1.10

func Init()

func Init2 added in v0.1.10

func Init2()

func Init3 added in v0.1.10

func Init3()

func IsMasterBackupEnabled added in v0.1.10

func IsMasterBackupEnabled() bool

Types

type DSPair added in v0.1.10

type DSPair struct {
	JobTableName       string
	JobStatusTableName string
}

type ErrorCodeCountStats added in v0.1.10

type ErrorCodeCountStats struct {
	ErrorCodeCounts []ErrorCodeCountsByDestination
}

type ErrorCodeCountsByDestination added in v0.1.10

type ErrorCodeCountsByDestination struct {
	Count         int
	ErrorCode     string
	Destination   string
	DestinationID string
}

type EventStatusDetailed added in v0.1.10

type EventStatusDetailed struct {
	Status        string
	SourceID      string
	DestinationID string
	CustomVal     string
	Count         int
}

type EventStatusStats added in v0.1.10

type EventStatusStats struct {
	StatsNums []EventStatusDetailed
	DSList    string
}

type FailedJobs added in v0.1.10

type FailedJobs struct {
	JobID         int
	UserID        string
	CustomVal     string
	ExecTime      time.Time
	ErrorCode     string
	ErrorResponse string
}

type FailedJobsStats added in v0.1.10

type FailedJobsStats struct {
	FailedNums []FailedJobs
}

type FailedStatusStats added in v0.1.10

type FailedStatusStats struct {
	FailedStatusStats []JobStatusT
}

type GetQueryParamsT added in v0.1.10

type GetQueryParamsT struct {

	// if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used
	IgnoreCustomValFiltersInQuery bool
	CustomValFilters              []string
	ParameterFilters              []ParameterFilterT
	StateFilters                  []string

	// Limit the total number of jobs.
	// A value less than or equal to zero will return no results
	JobsLimit int
	// Limit the total number of events, 1 job contains 1+ event(s).
	// A value less than or equal to zero will disable this limit (no limit),
	// only values greater than zero are considered as valid limits.
	EventsLimit int
	// Limit the total job payload size
	// A value less than or equal to zero will disable this limit (no limit),
	// only values greater than zero are considered as valid limits.
	PayloadSizeLimit int64
}

GetQueryParamsT is a struct to hold jobsdb query params.

type HandleInspector added in v1.0.2

type HandleInspector struct {
	*HandleT
}

HandleInspector is only intended to be used by tests for verifying the handle's internal state

func (*HandleInspector) DSListSize added in v1.0.2

func (h *HandleInspector) DSListSize() int

DSListSize returns the current size of the handle's dsList

type HandleT

type HandleT struct {
	MinDSRetentionPeriod time.Duration
	MaxDSRetentionPeriod time.Duration

	BackupSettings *backupSettings

	MaxDSSize *int

	// TriggerAddNewDS, TriggerMigrateDS is useful for triggering addNewDS to run from tests.
	// TODO: Ideally we should refactor the code to not use this override.
	TriggerAddNewDS  func() <-chan time.Time
	TriggerMigrateDS func() <-chan time.Time
	// contains filtered or unexported fields
}

HandleT is the main type implementing the database for implementing jobs. The caller must call the SetUp function on a HandleT object

func NewForRead added in v0.1.10

func NewForRead(tablePrefix string, opts ...OptsFunc) *HandleT

func NewForReadWrite added in v0.1.10

func NewForReadWrite(tablePrefix string, opts ...OptsFunc) *HandleT

func NewForWrite added in v0.1.10

func NewForWrite(tablePrefix string, opts ...OptsFunc) *HandleT

func (*HandleT) Checkpoint added in v0.1.10

func (jd *HandleT) Checkpoint(migrationCheckpoint MigrationCheckpointT) int64

Checkpoint writes a migration event if id is passed as 0. Else it will update status and start_sequence

func (*HandleT) CheckpointInTxn added in v0.1.10

func (jd *HandleT) CheckpointInTxn(txHandler transactionHandler, migrationCheckpoint MigrationCheckpointT) (int64, error)

CheckpointInTxn writes a migration event if id is passed as 0. Else it will update status and start_sequence If txn is passed, it will run the statement in that txn, otherwise it will execute without a transaction

func (*HandleT) Close added in v0.1.10

func (jd *HandleT) Close()

Close closes the database connection.

Stop should be called before Close.

func (*HandleT) DeleteExecuting added in v0.1.10

func (jd *HandleT) DeleteExecuting()

DeleteExecuting deletes events whose latest job state is executing. This is only done during recovery, which happens during the server start.

func (*HandleT) GetCheckpoints added in v0.1.10

func (jd *HandleT) GetCheckpoints(migrationType MigrationOp, status string) []MigrationCheckpointT

GetCheckpoints gets all checkpoints and

func (*HandleT) GetExecuting

func (jd *HandleT) GetExecuting(params GetQueryParamsT) JobsResult

func (*HandleT) GetImporting added in v0.1.10

func (jd *HandleT) GetImporting(params GetQueryParamsT) JobsResult

func (*HandleT) GetJournalEntries

func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)

func (*HandleT) GetLastJob added in v0.1.10

func (jd *HandleT) GetLastJob() *JobT

func (*HandleT) GetLastJobID added in v0.1.10

func (jd *HandleT) GetLastJobID() int64

func (*HandleT) GetLastJobIDBeforeImport added in v0.1.10

func (jd *HandleT) GetLastJobIDBeforeImport() int64

GetLastJobIDBeforeImport should return the largest job id stored so far

func (*HandleT) GetMaxDSIndex

func (jd *HandleT) GetMaxDSIndex() (maxDSIndex int64)

* Function to return max dataset index in the DB

func (*HandleT) GetMaxIDForDs added in v0.1.10

func (jd *HandleT) GetMaxIDForDs(ds dataSetT) int64

func (*HandleT) GetNonMigratedAndMarkMigrating added in v0.1.10

func (jd *HandleT) GetNonMigratedAndMarkMigrating(count int) []*JobT

GetNonMigratedAndMarkMigrating all jobs with no filters

func (*HandleT) GetPileUpCounts added in v0.1.10

func (jd *HandleT) GetPileUpCounts(statMap map[string]map[string]int)

func (*HandleT) GetProcessed

func (jd *HandleT) GetProcessed(params GetQueryParamsT) JobsResult

GetProcessed returns events of a given state. This does not update any state itself and realises on the caller to update it. That means that successive calls to GetProcessed("failed") can return the same set of events. It is the responsibility of the caller to call it from one thread, update the state (to "waiting") in the same thread and pass on the the processors

func (*HandleT) GetSetupCheckpoint added in v0.1.10

func (jd *HandleT) GetSetupCheckpoint(migrationType MigrationOp) (MigrationCheckpointT, bool)

GetSetupCheckpoint gets all checkpoints and picks out the setup event for that type

func (*HandleT) GetTablePrefix added in v0.1.10

func (jd *HandleT) GetTablePrefix() string

GetTablePrefix returns the table prefix of the jobsdb.

func (*HandleT) GetToRetry

func (jd *HandleT) GetToRetry(params GetQueryParamsT) JobsResult

GetToRetry returns events which need to be retried. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.

func (*HandleT) GetUnprocessed

func (jd *HandleT) GetUnprocessed(params GetQueryParamsT) JobsResult

GetUnprocessed returns the unprocessed events. Unprocessed events are those whose state hasn't been marked in the DB. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.

func (*HandleT) GetUserID added in v0.1.10

func (*HandleT) GetUserID(job *JobT) string

GetUserID from job

func (*HandleT) GetWaiting

func (jd *HandleT) GetWaiting(params GetQueryParamsT) JobsResult

GetWaiting returns events which are under processing If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.

func (*HandleT) Identifier added in v0.1.10

func (jd *HandleT) Identifier() string

Identifier returns the identifier of the jobsdb. Here it is tablePrefix.

func (*HandleT) IsMigrating added in v0.1.10

func (jd *HandleT) IsMigrating() bool

IsMigrating returns true if there are non zero jobs with status = 'migrating'

func (*HandleT) JournalDeleteEntry

func (jd *HandleT) JournalDeleteEntry(opID int64)

func (*HandleT) JournalMarkDone

func (jd *HandleT) JournalMarkDone(opID int64)

JournalMarkDone marks the end of a journal action

func (*HandleT) JournalMarkStart

func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64

func (*HandleT) JournalMarkStartInTx added in v0.1.10

func (jd *HandleT) JournalMarkStartInTx(tx *sql.Tx, opType string, opPayload json.RawMessage) (int64, error)

func (*HandleT) Ping added in v0.1.10

func (jd *HandleT) Ping() error

Ping returns health check for pg database

func (*HandleT) PostExportCleanup added in v0.1.10

func (jd *HandleT) PostExportCleanup()

PostExportCleanup removes all the entries from job_status_tables that are of state 'wont_migrate' or 'migrating'

func (*HandleT) PreExportCleanup added in v0.1.10

func (jd *HandleT) PreExportCleanup()

PreExportCleanup removes all the entries from job_status_tables that are of state 'migrating'

func (*HandleT) RecoverFromMigrationJournal added in v0.1.10

func (jd *HandleT) RecoverFromMigrationJournal()

RecoverFromMigrationJournal is an exposed function for migrator package to handle journal crashes during migration

func (*HandleT) SchemaMigrationTable added in v0.1.10

func (jd *HandleT) SchemaMigrationTable() string

SchemaMigrationTable returns the table name used for storing current schema version.

func (*HandleT) Setup

func (jd *HandleT) Setup(
	ownerType OwnerType, clearAll bool, tablePrefix, migrationMode string,
	registerStatusHandler bool, queryFilterKeys QueryFiltersT, preBackupHandlers []prebackup.Handler,
) error

Setup is used to initialize the HandleT structure. clearAll = True means it will remove all existing tables tablePrefix must be unique and is used to separate multiple users of JobsDB

func (*HandleT) SetupCheckpointTable added in v0.1.10

func (jd *HandleT) SetupCheckpointTable()

SetupCheckpointTable creates a table

func (*HandleT) SetupForExport added in v0.1.10

func (jd *HandleT) SetupForExport()

SetupForExport is used to setup jobsdb for export or for import or for both

func (*HandleT) SetupForImport added in v0.1.10

func (jd *HandleT) SetupForImport()

SetupForImport is used to setup jobsdb for export or for import or for both

func (*HandleT) SetupForMigration added in v0.1.10

func (jd *HandleT) SetupForMigration(fromVersion, toVersion int)

SetupForMigration prepares jobsdb to start migrations

func (*HandleT) Start added in v0.1.10

func (jd *HandleT) Start() error

Start starts the jobsdb worker and housekeeping (migration, archive) threads. Start should be called before any other jobsdb methods are called.

func (*HandleT) Status added in v0.1.10

func (jd *HandleT) Status() interface{}

func (*HandleT) Stop added in v0.1.10

func (jd *HandleT) Stop()

Stop stops the background goroutines and waits until they finish. Stop should be called once only after Start. Only Start and Close can be called after Stop.

func (*HandleT) Store

func (jd *HandleT) Store(ctx context.Context, jobList []*JobT) error

Store stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.

func (*HandleT) StoreInTx added in v0.1.10

func (jd *HandleT) StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error

StoreInTx stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.

func (*HandleT) StoreJobsAndCheckpoint added in v0.1.10

func (jd *HandleT) StoreJobsAndCheckpoint(jobList []*JobT, migrationCheckpoint MigrationCheckpointT)

StoreJobsAndCheckpoint is used to write the jobs to _tables

func (*HandleT) StoreWithRetryEach added in v0.1.10

func (jd *HandleT) StoreWithRetryEach(ctx context.Context, jobList []*JobT) map[uuid.UUID]string

func (*HandleT) StoreWithRetryEachInTx added in v0.1.10

func (jd *HandleT) StoreWithRetryEachInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) map[uuid.UUID]string

func (*HandleT) TearDown

func (jd *HandleT) TearDown()

TearDown stops the background goroutines,

waits until they finish and closes the database.

func (*HandleT) UpdateJobStatus

func (jd *HandleT) UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

func (*HandleT) UpdateJobStatusAndCheckpoint added in v0.1.10

func (jd *HandleT) UpdateJobStatusAndCheckpoint(statusList []*JobStatusT, fromNodeID, toNodeID string, jobsCount int64, uploadLocation string)

UpdateJobStatusAndCheckpoint does update job status and checkpoint in a single transaction

func (*HandleT) UpdateJobStatusInTx added in v0.1.10

func (jd *HandleT) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

UpdateJobStatusInTx updates the status of a batch of jobs in the passed transaction customValFilters[] is passed so we can efficinetly mark empty cache Later we can move this to query IMP NOTE: AcquireUpdateJobStatusLocks Should be called before calling this function

func (*HandleT) UpdateSequenceNumberOfLatestDS added in v0.1.10

func (jd *HandleT) UpdateSequenceNumberOfLatestDS(seqNoForNewDS int64)

func (*HandleT) WithStoreSafeTx added in v0.1.10

func (jd *HandleT) WithStoreSafeTx(f func(tx StoreSafeTx) error) error

func (*HandleT) WithTx added in v0.1.10

func (jd *HandleT) WithTx(f func(tx *sql.Tx) error) error

func (*HandleT) WithUpdateSafeTx added in v0.1.10

func (jd *HandleT) WithUpdateSafeTx(f func(tx UpdateSafeTx) error) error

type JobStatusT

type JobStatusT struct {
	JobID         int64           `json:"JobID"`
	JobState      string          `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry,  failed, aborted, migrating, migrated, wont_migrate
	AttemptNum    int             `json:"AttemptNum"`
	ExecTime      time.Time       `json:"ExecTime"`
	RetryTime     time.Time       `json:"RetryTime"`
	ErrorCode     string          `json:"ErrorCode"`
	ErrorResponse json.RawMessage `json:"ErrorResponse"`
	Parameters    json.RawMessage `json:"Parameters"`
	WorkspaceId   string          `json:"WorkspaceId"`
}

JobStatusT is used for storing status of the job. It is the responsibility of the user of this module to set appropriate job status. State can be one of ENUM waiting, executing, succeeded, waiting_retry, failed, aborted

func BuildStatus added in v0.1.10

func BuildStatus(job *JobT, jobState string) *JobStatusT

BuildStatus generates a struct of type JobStatusT for a given job and jobState

type JobT

type JobT struct {
	UUID          uuid.UUID       `json:"UUID"`
	JobID         int64           `json:"JobID"`
	UserID        string          `json:"UserID"`
	CreatedAt     time.Time       `json:"CreatedAt"`
	ExpireAt      time.Time       `json:"ExpireAt"`
	CustomVal     string          `json:"CustomVal"`
	EventCount    int             `json:"EventCount"`
	EventPayload  json.RawMessage `json:"EventPayload"`
	PayloadSize   int64           `json:"PayloadSize"`
	LastJobStatus JobStatusT      `json:"LastJobStatus"`
	Parameters    json.RawMessage `json:"Parameters"`
	WorkspaceId   string          `json:"WorkspaceId"`
}

JobT is the basic type for creating jobs. The JobID is generated by the system and LastJobStatus is populated when reading a processed job while rest should be set by the user.

func (*JobT) String added in v0.1.10

func (job *JobT) String() string

type JobsDB added in v0.1.10

type JobsDB interface {
	// Identifier returns the jobsdb's identifier, a.k.a. table prefix
	Identifier() string

	// WithTx begins a new transaction that can be used by the provided function.
	// If the function returns an error, the transaction will be rollbacked and return the error,
	// otherwise the transaction will be committed and a nil error will be returned.
	WithTx(func(tx *sql.Tx) error) error

	// WithStoreSafeTx prepares a store-safe environment and then starts a transaction
	// that can be used by the provided function.
	WithStoreSafeTx(func(tx StoreSafeTx) error) error

	// Store stores the provided jobs to the database
	Store(ctx context.Context, jobList []*JobT) error

	// StoreInTx stores the provided jobs to the database using an existing transaction.
	// Please ensure that you are using an StoreSafeTx, e.g.
	//    jobsdb.WithStoreSafeTx(func(tx StoreSafeTx) error {
	//	      jobsdb.StoreInTx(ctx, tx, jobList)
	//    })
	StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error

	// StoreWithRetryEach tries to store all the provided jobs to the database and returns the job uuids which failed
	StoreWithRetryEach(ctx context.Context, jobList []*JobT) map[uuid.UUID]string

	// StoreWithRetryEachInTx tries to store all the provided jobs to the database and returns the job uuids which failed, using an existing transaction.
	// Please ensure that you are using an StoreSafeTx, e.g.
	//    jobsdb.WithStoreSafeTx(func(tx StoreSafeTx) error {
	//	      jobsdb.StoreWithRetryEachInTx(ctx, tx, jobList)
	//    })
	StoreWithRetryEachInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) map[uuid.UUID]string

	// WithUpdateSafeTx prepares an update-safe environment and then starts a transaction
	// that can be used by the provided function. An update-safe transaction shall be used if the provided function
	// needs to call UpdateJobStatusInTx.
	WithUpdateSafeTx(func(tx UpdateSafeTx) error) error

	// UpdateJobStatus updates the provided job statuses
	UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

	// UpdateJobStatusInTx updates the provided job statuses in an existing transaction.
	// Please ensure that you are using an UpdateSafeTx, e.g.
	//    jobsdb.WithUpdateSafeTx(func(tx UpdateSafeTx) error {
	//	      jobsdb.UpdateJobStatusInTx(ctx, tx, statusList, customValFilters, parameterFilters)
	//    })
	UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

	// GetUnprocessed finds unprocessed jobs. Unprocessed are new
	// jobs whose state hasn't been marked in the database yet
	GetUnprocessed(params GetQueryParamsT) JobsResult

	// GetProcessed finds jobs in some state, i.e. not unprocessed
	GetProcessed(params GetQueryParamsT) JobsResult

	// GetToRetry finds jobs in failed state
	GetToRetry(params GetQueryParamsT) JobsResult

	// GetToRetry finds jobs in waiting state
	GetWaiting(params GetQueryParamsT) JobsResult

	// GetExecuting finds jobs in executing state
	GetExecuting(params GetQueryParamsT) JobsResult

	// GetImporting finds jobs in importing state
	GetImporting(params GetQueryParamsT) JobsResult

	// GetPileUpCounts returns statistics (counters) of incomplete jobs
	// grouped by workspaceId and destination type
	GetPileUpCounts(statMap map[string]map[string]int)

	Status() interface{}
	Ping() error
	DeleteExecuting()

	GetJournalEntries(opType string) (entries []JournalEntryT)
	JournalDeleteEntry(opID int64)
	JournalMarkStart(opType string, opPayload json.RawMessage) int64
}

JobsDB interface contains public methods to access JobsDB data

type JobsResult added in v0.1.10

type JobsResult struct {
	Jobs          []*JobT
	LimitsReached bool
	EventsCount   int
	PayloadSize   int64
}

type JobsdbUtilsHandler added in v0.1.10

type JobsdbUtilsHandler struct{}

Admin Handlers

func (*JobsdbUtilsHandler) RunSQLQuery added in v0.1.10

func (*JobsdbUtilsHandler) RunSQLQuery(argString string, reply *string) (err error)

type JournalEntryT

type JournalEntryT struct {
	OpID      int64
	OpType    string
	OpDone    bool
	OpPayload json.RawMessage
}

type MigrationCheckpointT added in v0.1.10

type MigrationCheckpointT struct {
	ID            int64           `json:"ID"`
	MigrationType MigrationOp     `json:"MigrationType"` // ENUM : export, import, acceptNewEvents
	FromNode      string          `json:"FromNode"`
	ToNode        string          `json:"ToNode"`
	JobsCount     int64           `json:"JobsCount"`
	FileLocation  string          `json:"FileLocation"`
	Status        string          `json:"Status"` // ENUM : Look up 'Values for Status'
	StartSeq      int64           `json:"StartSeq"`
	Payload       json.RawMessage `json:"Payload"`
	TimeStamp     time.Time       `json:"TimeStamp"`
}

MigrationCheckpointT captures an event of export/import to recover from incase of a crash during migration

func NewMigrationCheckpoint added in v0.1.10

func NewMigrationCheckpoint(migrationType MigrationOp, fromNode, toNode string, jobsCount int64, fileLocation, status string, startSeq int64) MigrationCheckpointT

NewMigrationCheckpoint is a constructor for MigrationCheckpoint struct

func NewSetupCheckpointEvent added in v0.1.10

func NewSetupCheckpointEvent(migrationType MigrationOp, node string) MigrationCheckpointT

NewSetupCheckpointEvent returns a new migration event that captures setup for export, import of new event acceptance

type MigrationOp added in v0.1.10

type MigrationOp string

MigrationOp is a custom type for supported types in migrationCheckpoint

type MultiTenantHandleT added in v0.1.10

type MultiTenantHandleT struct {
	*HandleT
}

func (*MultiTenantHandleT) GetAllJobs added in v0.1.10

func (mj *MultiTenantHandleT) GetAllJobs(workspaceCount map[string]int, params GetQueryParamsT, maxDSQuerySize int) []*JobT

type MultiTenantJobsDB added in v0.1.10

type MultiTenantJobsDB interface {
	GetAllJobs(map[string]int, GetQueryParamsT, int) []*JobT

	WithUpdateSafeTx(func(tx UpdateSafeTx) error) error
	UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error
	UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

	DeleteExecuting()

	GetJournalEntries(opType string) (entries []JournalEntryT)
	JournalMarkStart(opType string, opPayload json.RawMessage) int64
	JournalDeleteEntry(opID int64)
	GetPileUpCounts(map[string]map[string]int)
}

type MultiTenantLegacy added in v0.1.10

type MultiTenantLegacy struct {
	*HandleT
}

func (*MultiTenantLegacy) GetAllJobs added in v0.1.10

func (mj *MultiTenantLegacy) GetAllJobs(workspaceCount map[string]int, params GetQueryParamsT, _ int) []*JobT

type OptsFunc added in v0.1.10

type OptsFunc func(jd *HandleT)

func WithClearDB added in v0.1.10

func WithClearDB(clearDB bool) OptsFunc

WithClearDB, if set to true it will remove all existing tables

func WithMigrationMode added in v0.1.10

func WithMigrationMode(mode string) OptsFunc

func WithPreBackupHandlers added in v0.1.10

func WithPreBackupHandlers(preBackupHandlers []prebackup.Handler) OptsFunc

WithPreBackupHandlers, sets pre-backup handlers

func WithQueryFilterKeys added in v0.1.10

func WithQueryFilterKeys(filters QueryFiltersT) OptsFunc

func WithStatusHandler added in v0.1.10

func WithStatusHandler() OptsFunc

type OwnerType added in v0.1.10

type OwnerType string

OwnerType for this jobsdb instance

const (
	// Read : Only Reader of this jobsdb instance
	Read OwnerType = "READ"
	// Write : Only Writer of this jobsdb instance
	Write OwnerType = "WRITE"
	// ReadWrite : Reader and Writer of this jobsdb instance
	ReadWrite OwnerType = ""
)

type ParameterFilterT

type ParameterFilterT struct {
	Name     string
	Value    string
	Optional bool
}

type QueryConditions added in v0.1.10

type QueryConditions struct {
	// if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used
	IgnoreCustomValFiltersInQuery bool
	CustomValFilters              []string
	ParameterFilters              []ParameterFilterT
	StateFilters                  []string
}

QueryConditions holds jobsdb query conditions

type QueryFiltersT added in v0.1.10

type QueryFiltersT struct {
	CustomVal        bool
	ParameterFilters []string
}

type ReadonlyHandleT added in v0.1.10

type ReadonlyHandleT struct {
	DbHandle *sql.DB
	// contains filtered or unexported fields
}

func (*ReadonlyHandleT) GetDSListString added in v0.1.10

func (jd *ReadonlyHandleT) GetDSListString() (string, error)

func (*ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination added in v0.1.10

func (jd *ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination(args []string) (string, error)

func (*ReadonlyHandleT) GetJobByID added in v0.1.10

func (jd *ReadonlyHandleT) GetJobByID(job_id, prefix string) (string, error)

func (*ReadonlyHandleT) GetJobIDStatus added in v0.1.10

func (jd *ReadonlyHandleT) GetJobIDStatus(job_id, prefix string) (string, error)

func (*ReadonlyHandleT) GetJobIDsForUser added in v0.1.10

func (jd *ReadonlyHandleT) GetJobIDsForUser(args []string) (string, error)

func (*ReadonlyHandleT) GetJobSummaryCount added in v0.1.10

func (jd *ReadonlyHandleT) GetJobSummaryCount(arg, prefix string) (string, error)

func (*ReadonlyHandleT) GetLatestFailedJobs added in v0.1.10

func (jd *ReadonlyHandleT) GetLatestFailedJobs(arg, prefix string) (string, error)

func (*ReadonlyHandleT) HavePendingJobs added in v0.1.10

func (jd *ReadonlyHandleT) HavePendingJobs(ctx context.Context, customValFilters []string, count int, parameterFilters []ParameterFilterT) (bool, error)

Count queries

HavePendingJobs returns the true if there are pending events, else false. Pending events are those whose jobs don't have a state or whose jobs status is neither succeeded nor aborted

func (*ReadonlyHandleT) Setup added in v0.1.10

func (jd *ReadonlyHandleT) Setup(tablePrefix string)

Setup is used to initialize the ReadonlyHandleT structure.

func (*ReadonlyHandleT) TearDown added in v0.1.10

func (jd *ReadonlyHandleT) TearDown()

TearDown releases all the resources

type ReadonlyJobsDB added in v0.1.10

type ReadonlyJobsDB interface {
	HavePendingJobs(ctx context.Context, customValFilters []string, count int, parameterFilters []ParameterFilterT) (bool, error)
	GetJobSummaryCount(arg, prefix string) (string, error)
	GetLatestFailedJobs(arg, prefix string) (string, error)
	GetJobIDsForUser(args []string) (string, error)
	GetFailedStatusErrorCodeCountsByDestination(args []string) (string, error)
	GetDSListString() (string, error)
	GetJobIDStatus(job_id, prefix string) (string, error)
	GetJobByID(job_id, prefix string) (string, error)
}

ReadonlyJobsDB interface contains public methods to access JobsDB data

type SQLJobStatusT added in v0.1.10

type SQLJobStatusT struct {
	JobID         sql.NullInt64
	JobState      sql.NullString // ENUM waiting, executing, succeeded, waiting_retry,  failed, aborted, migrated
	AttemptNum    sql.NullInt64
	ExecTime      sql.NullTime
	RetryTime     sql.NullTime
	ErrorCode     sql.NullString
	ErrorResponse sql.NullString
}

SQLJobStatusT is a temporary struct to handle nulls from postgres query

type StoreSafeTx added in v0.1.10

type StoreSafeTx interface {
	Tx() *sql.Tx
	// contains filtered or unexported methods
}

StoreSafeTx sealed interface

func EmptyStoreSafeTx added in v0.1.10

func EmptyStoreSafeTx() StoreSafeTx

EmptyStoreSafeTx returns an empty interface usable only for tests

type UpdateSafeTx added in v0.1.10

type UpdateSafeTx interface {
	Tx() *sql.Tx
	// contains filtered or unexported methods
}

UpdateSafeTx sealed interface

func EmptyUpdateSafeTx added in v0.1.10

func EmptyUpdateSafeTx() UpdateSafeTx

EmptyUpdateSafeTx returns an empty interface usable only for tests

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL