Documentation ¶
Index ¶
- Constants
- Variables
- func Init()
- func Init2()
- func IsMasterBackupEnabled() bool
- type GetAllJobsResult
- type GetQueryParamsT
- type HandleInspector
- type HandleT
- func (jd *HandleT) Close()
- func (jd *HandleT) DeleteExecuting()
- func (jd *HandleT) FailExecuting()
- func (jd *HandleT) GetActiveWorkspaces(ctx context.Context) ([]string, error)
- func (jd *HandleT) GetExecuting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (jd *HandleT) GetImporting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)
- func (jd *HandleT) GetLastJob() *JobT
- func (jd *HandleT) GetMaxDSIndex() (maxDSIndex int64)
- func (jd *HandleT) GetPileUpCounts(ctx context.Context) (map[string]map[string]int, error)
- func (jd *HandleT) GetProcessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (jd *HandleT) GetToRetry(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (jd *HandleT) GetUnprocessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (jd *HandleT) GetWaiting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
- func (jd *HandleT) Identifier() string
- func (jd *HandleT) JournalDeleteEntry(opID int64)
- func (jd *HandleT) JournalMarkDone(opID int64)
- func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64
- func (jd *HandleT) JournalMarkStartInTx(tx *Tx, opType string, opPayload json.RawMessage) (int64, error)
- func (jd *HandleT) Ping() error
- func (jd *HandleT) SchemaMigrationTable() string
- func (jd *HandleT) Setup(ownerType OwnerType, clearAll bool, tablePrefix string, ...) error
- func (jd *HandleT) Start() error
- func (jd *HandleT) Stop()
- func (jd *HandleT) Store(ctx context.Context, jobList []*JobT) error
- func (jd *HandleT) StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error
- func (jd *HandleT) StoreWithRetryEach(ctx context.Context, jobList []*JobT) map[uuid.UUID]string
- func (jd *HandleT) StoreWithRetryEachInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) (map[uuid.UUID]string, error)
- func (jd *HandleT) TearDown()
- func (jd *HandleT) UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, ...) error
- func (jd *HandleT) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, ...) error
- func (jd *HandleT) WithStoreSafeTx(ctx context.Context, f func(tx StoreSafeTx) error) error
- func (jd *HandleT) WithTx(f func(tx *Tx) error) error
- func (jd *HandleT) WithUpdateSafeTx(ctx context.Context, f func(tx UpdateSafeTx) error) error
- type JobStatusT
- type JobT
- type JobsDB
- type JobsResult
- type JournalEntryT
- type MoreToken
- type MultiTenantHandleT
- type MultiTenantJobsDB
- type MultiTenantLegacy
- type OptsFunc
- type OwnerType
- type ParameterFilterT
- type QueryConditions
- type StoreSafeTx
- type Tx
- type UpdateSafeTx
Constants ¶
const (
RawDataDestUploadOperation = "S3_DEST_UPLOAD"
)
We keep a journal of all the operations. The journal helps
Variables ¶
var ( // Not valid, Not terminal NotProcessed = jobStateT{State: "not_picked_yet", /* contains filtered or unexported fields */} // Valid, Not terminal Failed = jobStateT{State: "failed", /* contains filtered or unexported fields */} Executing = jobStateT{State: "executing", /* contains filtered or unexported fields */} Waiting = jobStateT{State: "waiting", /* contains filtered or unexported fields */} WaitingRetry = jobStateT{State: "waiting_retry", /* contains filtered or unexported fields */} Migrating = jobStateT{State: "migrating", /* contains filtered or unexported fields */} Importing = jobStateT{State: "importing", /* contains filtered or unexported fields */} // Valid, Terminal Succeeded = jobStateT{State: "succeeded", /* contains filtered or unexported fields */} Aborted = jobStateT{State: "aborted", /* contains filtered or unexported fields */} Migrated = jobStateT{State: "migrated", /* contains filtered or unexported fields */} WontMigrate = jobStateT{State: "wont_migrate", /* contains filtered or unexported fields */} )
State definitions
var CacheKeyParameterFilters = []string{"destination_id"}
Functions ¶
func IsMasterBackupEnabled ¶ added in v0.1.10
func IsMasterBackupEnabled() bool
Types ¶
type GetAllJobsResult ¶ added in v1.3.0
type GetQueryParamsT ¶ added in v0.1.10
type GetQueryParamsT struct { // if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used IgnoreCustomValFiltersInQuery bool WorkspaceID string CustomValFilters []string ParameterFilters []ParameterFilterT StateFilters []string AfterJobID *int64 // Limit the total number of jobs. // A value less than or equal to zero will return no results JobsLimit int // Limit the total number of events, 1 job contains 1+ event(s). // A value less than or equal to zero will disable this limit (no limit), // only values greater than zero are considered as valid limits. EventsLimit int // Limit the total job payload size // A value less than or equal to zero will disable this limit (no limit), // only values greater than zero are considered as valid limits. PayloadSizeLimit int64 }
GetQueryParamsT is a struct to hold jobsdb query params.
type HandleInspector ¶ added in v1.0.2
type HandleInspector struct {
*HandleT
}
HandleInspector is only intended to be used by tests for verifying the handle's internal state
func (*HandleInspector) DSIndicesList ¶ added in v1.2.0
func (h *HandleInspector) DSIndicesList() []string
DSIndicesList returns the slice of current ds indices
type HandleT ¶
type HandleT struct { MinDSRetentionPeriod time.Duration MaxDSRetentionPeriod time.Duration BackupSettings *backupSettings MaxDSSize *int // TriggerAddNewDS, TriggerMigrateDS is useful for triggering addNewDS to run from tests. // TODO: Ideally we should refactor the code to not use this override. TriggerAddNewDS func() <-chan time.Time TriggerMigrateDS func() <-chan time.Time TriggerRefreshDS func() <-chan time.Time // contains filtered or unexported fields }
HandleT is the main type implementing the database for implementing jobs. The caller must call the SetUp function on a HandleT object
func NewForRead ¶ added in v0.1.10
func NewForReadWrite ¶ added in v0.1.10
func NewForWrite ¶ added in v0.1.10
func (*HandleT) Close ¶ added in v0.1.10
func (jd *HandleT) Close()
Close closes the database connection.
Stop should be called before Close.
func (*HandleT) DeleteExecuting ¶ added in v0.1.10
func (jd *HandleT) DeleteExecuting()
DeleteExecuting deletes events whose latest job state is executing. This is only done during recovery, which happens during the server start.
func (*HandleT) FailExecuting ¶ added in v1.5.0
func (jd *HandleT) FailExecuting()
FailExecuting fails events whose latest job state is executing.
This is only done during recovery, which happens during the server start.
func (*HandleT) GetActiveWorkspaces ¶ added in v1.6.0
func (*HandleT) GetExecuting ¶
func (jd *HandleT) GetExecuting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
func (*HandleT) GetImporting ¶ added in v0.1.10
func (jd *HandleT) GetImporting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
func (*HandleT) GetJournalEntries ¶
func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)
func (*HandleT) GetLastJob ¶ added in v0.1.10
func (*HandleT) GetMaxDSIndex ¶
GetMaxDSIndex returns max dataset index in the DB
func (*HandleT) GetPileUpCounts ¶ added in v0.1.10
func (*HandleT) GetProcessed ¶
func (jd *HandleT) GetProcessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
GetProcessed returns events of a given state. This does not update any state itself and realises on the caller to update it. That means that successive calls to GetProcessed("failed") can return the same set of events. It is the responsibility of the caller to call it from one thread, update the state (to "waiting") in the same thread and pass on the processors
func (*HandleT) GetToRetry ¶
func (jd *HandleT) GetToRetry(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
GetToRetry returns events which need to be retried. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.
func (*HandleT) GetUnprocessed ¶
func (jd *HandleT) GetUnprocessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
GetUnprocessed returns the unprocessed events. Unprocessed events are those whose state hasn't been marked in the DB. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.
func (*HandleT) GetWaiting ¶
func (jd *HandleT) GetWaiting(ctx context.Context, params GetQueryParamsT) (JobsResult, error)
GetWaiting returns events which are under processing If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.
func (*HandleT) Identifier ¶ added in v0.1.10
Identifier returns the identifier of the jobsdb. Here it is tablePrefix.
func (*HandleT) JournalDeleteEntry ¶
func (*HandleT) JournalMarkDone ¶
JournalMarkDone marks the end of a journal action
func (*HandleT) JournalMarkStart ¶
func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64
func (*HandleT) JournalMarkStartInTx ¶ added in v0.1.10
func (*HandleT) SchemaMigrationTable ¶ added in v0.1.10
SchemaMigrationTable returns the table name used for storing current schema version.
func (*HandleT) Setup ¶
func (jd *HandleT) Setup( ownerType OwnerType, clearAll bool, tablePrefix string, registerStatusHandler bool, preBackupHandlers []prebackup.Handler, fileUploaderProvider fileuploader.Provider, ) error
Setup is used to initialize the HandleT structure. clearAll = True means it will remove all existing tables tablePrefix must be unique and is used to separate multiple users of JobsDB
func (*HandleT) Start ¶ added in v0.1.10
Start starts the jobsdb worker and housekeeping (migration, archive) threads. Start should be called before any other jobsdb methods are called.
func (*HandleT) Stop ¶ added in v0.1.10
func (jd *HandleT) Stop()
Stop stops the background goroutines and waits until they finish. Stop should be called once only after Start. Only Start and Close can be called after Stop.
func (*HandleT) Store ¶
Store stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.
func (*HandleT) StoreInTx ¶ added in v0.1.10
StoreInTx stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.
func (*HandleT) StoreWithRetryEach ¶ added in v0.1.10
func (*HandleT) StoreWithRetryEachInTx ¶ added in v0.1.10
func (*HandleT) TearDown ¶
func (jd *HandleT) TearDown()
TearDown stops the background goroutines,
waits until they finish and closes the database.
func (*HandleT) UpdateJobStatus ¶
func (jd *HandleT) UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error
func (*HandleT) UpdateJobStatusInTx ¶ added in v0.1.10
func (jd *HandleT) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error
UpdateJobStatusInTx updates the status of a batch of jobs in the past transaction customValFilters[] is passed, so we can efficiently mark empty cache Later we can move this to query IMP NOTE: AcquireUpdateJobStatusLocks Should be called before calling this function
func (*HandleT) WithStoreSafeTx ¶ added in v0.1.10
func (*HandleT) WithUpdateSafeTx ¶ added in v0.1.10
type JobStatusT ¶
type JobStatusT struct { JobID int64 `json:"JobID"` JobState string `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry, failed, aborted, migrating, migrated, wont_migrate AttemptNum int `json:"AttemptNum"` ExecTime time.Time `json:"ExecTime"` RetryTime time.Time `json:"RetryTime"` ErrorCode string `json:"ErrorCode"` ErrorResponse json.RawMessage `json:"ErrorResponse"` Parameters json.RawMessage `json:"Parameters"` WorkspaceId string `json:"WorkspaceId"` }
JobStatusT is used for storing status of the job. It is the responsibility of the user of this module to set appropriate job status. State can be one of ENUM waiting, executing, succeeded, waiting_retry, failed, aborted
type JobT ¶
type JobT struct { UUID uuid.UUID `json:"UUID"` JobID int64 `json:"JobID"` UserID string `json:"UserID"` CreatedAt time.Time `json:"CreatedAt"` ExpireAt time.Time `json:"ExpireAt"` CustomVal string `json:"CustomVal"` EventCount int `json:"EventCount"` EventPayload json.RawMessage `json:"EventPayload"` PayloadSize int64 `json:"PayloadSize"` LastJobStatus JobStatusT `json:"LastJobStatus"` Parameters json.RawMessage `json:"Parameters"` WorkspaceId string `json:"WorkspaceId"` }
JobT is the basic type for creating jobs. The JobID is generated by the system and LastJobStatus is populated when reading a processed job while rest should be set by the user.
type JobsDB ¶ added in v0.1.10
type JobsDB interface { // Identifier returns the jobsdb's identifier, a.k.a. table prefix Identifier() string // WithTx begins a new transaction that can be used by the provided function. // If the function returns an error, the transaction will be rollbacked and return the error, // otherwise the transaction will be committed and a nil error will be returned. WithTx(func(tx *Tx) error) error // WithStoreSafeTx prepares a store-safe environment and then starts a transaction // that can be used by the provided function. WithStoreSafeTx(context.Context, func(tx StoreSafeTx) error) error // Store stores the provided jobs to the database Store(ctx context.Context, jobList []*JobT) error // StoreInTx stores the provided jobs to the database using an existing transaction. // Please ensure that you are using an StoreSafeTx, e.g. // jobsdb.WithStoreSafeTx(ctx, func(tx StoreSafeTx) error { // jobsdb.StoreInTx(ctx, tx, jobList) // }) StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error // StoreWithRetryEach tries to store all the provided jobs to the database and returns the job uuids which failed StoreWithRetryEach(ctx context.Context, jobList []*JobT) map[uuid.UUID]string // StoreWithRetryEachInTx tries to store all the provided jobs to the database and returns the job uuids which failed, using an existing transaction. // Please ensure that you are using an StoreSafeTx, e.g. // jobsdb.WithStoreSafeTx(func(tx StoreSafeTx) error { // jobsdb.StoreWithRetryEachInTx(ctx, tx, jobList) // }) StoreWithRetryEachInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) (map[uuid.UUID]string, error) // WithUpdateSafeTx prepares an update-safe environment and then starts a transaction // that can be used by the provided function. An update-safe transaction shall be used if the provided function // needs to call UpdateJobStatusInTx. WithUpdateSafeTx(context.Context, func(tx UpdateSafeTx) error) error // UpdateJobStatus updates the provided job statuses UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error // UpdateJobStatusInTx updates the provided job statuses in an existing transaction. // Please ensure that you are using an UpdateSafeTx, e.g. // jobsdb.WithUpdateSafeTx(ctx, func(tx UpdateSafeTx) error { // jobsdb.UpdateJobStatusInTx(ctx, tx, statusList, customValFilters, parameterFilters) // }) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error // GetUnprocessed finds unprocessed jobs. Unprocessed are new // jobs whose state hasn't been marked in the database yet GetUnprocessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetProcessed finds jobs in some state, i.e. not unprocessed GetProcessed(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetToRetry finds jobs in failed state GetToRetry(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetWaiting finds jobs in waiting state GetWaiting(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetExecuting finds jobs in executing state GetExecuting(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetImporting finds jobs in importing state GetImporting(ctx context.Context, params GetQueryParamsT) (JobsResult, error) // GetPileUpCounts returns statistics (counters) of incomplete jobs // grouped by workspaceId and destination type GetPileUpCounts(ctx context.Context) (statMap map[string]map[string]int, err error) // GetActiveWorkspaces returns a list of active workspace ids GetActiveWorkspaces(ctx context.Context) (workspaces []string, err error) Ping() error DeleteExecuting() FailExecuting() GetJournalEntries(opType string) (entries []JournalEntryT) JournalDeleteEntry(opID int64) JournalMarkStart(opType string, opPayload json.RawMessage) int64 }
JobsDB interface contains public methods to access JobsDB data
type JobsResult ¶ added in v0.1.10
type JournalEntryT ¶
type JournalEntryT struct { OpID int64 OpType string OpDone bool OpPayload json.RawMessage }
type MultiTenantHandleT ¶ added in v0.1.10
type MultiTenantHandleT struct {
*HandleT
}
func (*MultiTenantHandleT) GetAllJobs ¶ added in v0.1.10
func (mj *MultiTenantHandleT) GetAllJobs(ctx context.Context, pickup map[string]int, params GetQueryParamsT, maxDSQuerySize int, more MoreToken) (*GetAllJobsResult, error)
GetAllJobs gets jobs from all workspaces according to the pickup map
type MultiTenantJobsDB ¶ added in v0.1.10
type MultiTenantJobsDB interface { GetAllJobs(context.Context, map[string]int, GetQueryParamsT, int, MoreToken) (*GetAllJobsResult, error) WithUpdateSafeTx(context.Context, func(tx UpdateSafeTx) error) error UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error DeleteExecuting() FailExecuting() GetJournalEntries(opType string) (entries []JournalEntryT) JournalMarkStart(opType string, opPayload json.RawMessage) int64 JournalDeleteEntry(opID int64) GetPileUpCounts(context.Context) (map[string]map[string]int, error) }
type MultiTenantLegacy ¶ added in v0.1.10
type MultiTenantLegacy struct {
*HandleT
}
func (*MultiTenantLegacy) GetAllJobs ¶ added in v0.1.10
func (mj *MultiTenantLegacy) GetAllJobs(ctx context.Context, pickup map[string]int, params GetQueryParamsT, _ int, more MoreToken) (*GetAllJobsResult, error)
type OptsFunc ¶ added in v0.1.10
type OptsFunc func(jd *HandleT)
func WithClearDB ¶ added in v0.1.10
WithClearDB, if set to true it will remove all existing tables
func WithDSLimit ¶ added in v1.2.0
func WithFileUploaderProvider ¶ added in v1.3.0
func WithFileUploaderProvider(fileUploaderProvider fileuploader.Provider) OptsFunc
func WithPreBackupHandlers ¶ added in v0.1.10
WithPreBackupHandlers, sets pre-backup handlers
func WithStatusHandler ¶ added in v0.1.10
func WithStatusHandler() OptsFunc
type ParameterFilterT ¶
type QueryConditions ¶ added in v0.1.10
type QueryConditions struct { // if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used IgnoreCustomValFiltersInQuery bool CustomValFilters []string ParameterFilters []ParameterFilterT StateFilters []string AfterJobID *int64 }
QueryConditions holds jobsdb query conditions
type StoreSafeTx ¶ added in v0.1.10
StoreSafeTx sealed interface
func EmptyStoreSafeTx ¶ added in v0.1.10
func EmptyStoreSafeTx() StoreSafeTx
EmptyStoreSafeTx returns an empty interface usable only for tests
type Tx ¶ added in v1.2.1
Tx is a wrapper around sql.Tx that supports registering and executing post-commit actions, a.k.a. success listeners.
func (*Tx) AddSuccessListener ¶ added in v1.2.1
func (tx *Tx) AddSuccessListener(listener func())
AddSuccessListener registers a listener to be executed after the transaction has been committed successfully.
type UpdateSafeTx ¶ added in v0.1.10
UpdateSafeTx sealed interface
func EmptyUpdateSafeTx ¶ added in v0.1.10
func EmptyUpdateSafeTx() UpdateSafeTx
EmptyUpdateSafeTx returns an empty interface usable only for tests