Documentation ¶
Overview ¶
Package jobscheduler provides simple functionalities for reliable scheduling of Faktory jobs.
It is a common pattern to schedule one or more new jobs when a certain system entity (a record) is created or updated on the database. If used properly, this package can help achieve reliability between the data persisted on database and the jobs that should rely upon it.
Being Faktory and the database two separate external systems, failures and latencies can be expected at any level and point of execution.
Let's consider the following illustrative scenario: the program should create a new "Thing" object (database table "things") with auto-generated ID (primary key) "1" and should also schedule the execution of a Faktory job "ProcessThing", specifying the new model's ID. That job will read the record from the DB and do something with it.
It could be trivially implemented with two steps, like this:
- create the new "Thing" record on the DB (and get the ID "1")
- create and push the new job "ProcessThing(1)"
If something goes wrong in between those two steps, you would end up with the new record on the DB, but the job would never be scheduled.
If losing a job is not critical, or detecting a missed job can be done by simply looking at the status of the data from the database, then this might be the simplest desirable implementation.
Otherwise, the job loss could be naively solved by inverting the two steps, also involving a transaction so that we can know the new ID before committing data to the database:
- begin transaction
- create the new "Thing" record (and get the ID "1")
- create and push the new job "ProcessThing(1)"
- end transaction, actually committing data to the DB
Even if we try to reduce as much as possible the time (and actual instructions) between steps 3 and 4, it's still possible that the job will run before the data is committed on DB. This might be annoying but not critical: the job might simply fail and be retried later, possibly with success. But things can get worse: the transaction might fail entirely, or be abruptly interrupted for any reason. In this case, the job will always fail to fetch the new record and its successive retrials are rather a weak point.
The models.PendingJob model can be used as an additional supportive record to increase reliability and reduce the downsides of both scenarios depicted above.
A new pattern can be established following these steps:
- begin transaction
- create the new "Thing" record (and get the ID "1")
- create the new job "ProcessThing(1)", but DO NOT push it yet
- create a new PendingJob object (which contains the serialization of the job created above)
- end transaction, actually committing data to the DB
- if the transaction failed, stop here, otherwise go on
- push the job ("ProcessThing(1)", that was created on step 3)
- if the push failed, stop here, otherwise proceed with the last step
- delete the PendingJob (that was created on step 4 and persisted on step 5) from the database
This approach is somehow closer to the initial two-step scenario, in that the new job is pushed to Faktory server after the new "Thing" is persisted to the DB (step 7). However, here we also keep track of the intention to push a new job within the database.
Creating the two records (the "Thing" and the PendingJob) from the same transaction provides the guarantee that either both are created, or both are discarded. In case of transaction failure, the guard condition at step 6 prevents the actual job to be pushed.
If pushing the job fails, then the PendingJob is still there. It could be used later for recovery. For example, a separate process could periodically check for existing old PendingJob records; each of them can be deserialized and pushed again, finally the record can be removed upon success.
The price to pay for this kind of reliability is the chance for the same job to be scheduled (pushed) more than once. This can happen if something goes wrong between steps 7 and 9: the job is pushed, but the PendingJob is not removed, so the recovery process will possibly try to push it again. The recovery process implementation itself might be susceptible to the same issue as well.
Job idempotency, or any other sort of tolerance to the scheduling of identical jobs, is left to the jobs implementations, or to additional features provided by Faktory.
The previous examples involve single records for the sake of simplicity, but in many real cases it might be possible to consider many records in batch at once, still keeping the same order of operations.
This package provides a JobScheduler. It can be used as a supportive structure for the implementation of some of the described steps, offering simple facilities to reduce repetitive boilerplate code and minimize the chance of programming errors.
Index ¶
- type JobScheduler
- func (js *JobScheduler) AddJob(fj config.FaktoryJob, args ...interface{}) error
- func (js *JobScheduler) AddJobs(fjs []config.FaktoryJob, args ...interface{}) error
- func (js *JobScheduler) CreatePendingJobs(tx *gorm.DB) error
- func (js *JobScheduler) DeletePendingJobs(tx *gorm.DB) error
- func (js *JobScheduler) PushJobs(ctx context.Context) error
- func (js *JobScheduler) PushJobsAndDeletePendingJobs(ctx context.Context, tx *gorm.DB) error
- func (js *JobScheduler) PushJobsWithClient(c *faktory.Client) error
- func (js *JobScheduler) PushJobsWithClientAndDeletePendingJobs(c *faktory.Client, tx *gorm.DB) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type JobScheduler ¶
type JobScheduler struct {
// contains filtered or unexported fields
}
A JobScheduler allows the reliable scheduling of Faktory jobs.
The jobscheduler package provides a general explanation of the problem this object might help to solve.
In practice, a JobScheduler is suitable for scenarios where one or more entities are created/updated on the database and one or more jobs must be scheduled in relation to each new/modified entity.
By using a JobScheduler and calling its functions in the right order and from the right context can make it easy to schedule new jobs reliably.
Here is an outline of the intended usage:
js := jobscheduler.New() trErr := gormDB.Transaction(func(tx *gorm.DB) error { // Create or update one or more new entities on the database // e.g. "tx.Create(...)" or "tx.Save(...)" // ... // Add new jobs to the scheduler, with "js.AddJob(...)" or // "js.AddJobs(...)". // // The arguments for these jobs will probably include the IDs of the // models created/updated above (or other similar references). // // This operation creates and collects new Jobs and PendingJobs in // the JobScheduler (js), but does not perform any action against // the database or the Faktory server. // // It's important to check for errors returned by these functions. In // case of errors, the transaction should be aborted returning an // error. // ... // Once all jobs are added, we can create the pending jobs. // // Note that we are still inside the transaction; indeed, "tx" // is passed to the function (and not the initial "gormDB"). err := js.CreatePendingJobs(tx) // It's important to abort the transaction in case of errors. if err != nil { return err; } // ... }) // Here we are outside the transaction. If it failed, we cannot proceed // with the remaining operations. if trErr != nil { panic(trErr) // return / os.Exit / ... } // Otherwise, we can assume data persisted successfully, so we can push // the jobs to Faktory server. // // If we are inside the processing function of a Faktory job, we can call // "PushJobs", passing to it the Context provided by faktory_worker_go: pushErr := js.PushJobs(ctx) // // ALTERNATIVELY, you can use a Faktory Client and call this function: pushErr := js.PushJobsWithClient(faktoryClient) // In any case, if the push failed, do not proceed further. // // By doing so, the database will preserve the PendingJob records, // which can be found later, on a separate process, to attempt recovery // and schedule them again. if pushErr != nil { panic(trErr) // return / os.Exit / ... } // If the jobs were pushed successfully, we can finally remove the // PendingJobs from the database. delErr := js.DeletePendingJobs(gormDB) // You can do what you want with this error: there's nothing more to do // with the JobScheduler in any case. // // An error in jobs deletion will probably mean that the PendingJob // records will still be present, despite the jobs being successfully // pushed above. This implies that a separate recovery job will still // find those records and attempt rescheduling. It's up to the // implementation of the jobs and the recovery process to tolerate // and handle duplicated job scheduling, according to the specific needs // and requirements. // It's desirable to reduce the time between jobs pushing and PendingJobs // deletion as much as possible. Since the two operations should // always happen in close succession, and in this exact order, // you can use the helper functions "PushJobsAndDeletePendingJobs" // or "PushJobsWithClientAndDeletePendingJobs". // A valid reason for not using them might be special error handling.
func (*JobScheduler) AddJob ¶
func (js *JobScheduler) AddJob(fj config.FaktoryJob, args ...interface{}) error
AddJob adds to the JobScheduler a new Faktory Job, paired with a related PendingJob.
This function does not push the job to the server and does not create a new record in the database.
func (*JobScheduler) AddJobs ¶
func (js *JobScheduler) AddJobs(fjs []config.FaktoryJob, args ...interface{}) error
AddJobs simply calls AddJob for each job type, using the same arguments for all jobs.
func (*JobScheduler) CreatePendingJobs ¶
func (js *JobScheduler) CreatePendingJobs(tx *gorm.DB) error
CreatePendingJobs creates all the collected PendingJobs in the database.
This function does not push any job to the Faktory server.
All new records are created with a single query.
If the JobScheduler does not contain any pending job, the function simply does nothing.
To guarantee reliability, if the jobs to be scheduled are somehow related to other database entities that are being created or updated, this function should be invoked from the same transaction that creates or update those other entities.
In the same scenario, to prevent the new jobs to fail because data is not yet persisted in the database, this function should always be called before PushJobs.
func (*JobScheduler) DeletePendingJobs ¶
func (js *JobScheduler) DeletePendingJobs(tx *gorm.DB) error
DeletePendingJobs deletes the collected PendingJobs from the database.
This function does not perform any operation against the Faktory server.
All records are deleted with a single query.
If the JobScheduler does not contain any pending job, the function simply does nothing.
This function should be invoked only after the jobs were successfully pushed to the Faktory server (with PushJobs or PushJobsWithClient).
After a successful deletion, the JobScheduler has accomplished its goals and can be simply discarded.
func (*JobScheduler) PushJobs ¶
func (js *JobScheduler) PushJobs(ctx context.Context) error
PushJobs pushes all collected Jobs to the Faktory server.
This method must only be called within the context of an executing Faktory job. The context is the same context of the job. If this condition is not met, the method will panic.
Each job is pushed individually. If one Push fails, the error is returned immediately, and any remaining job will not be pushed.
This function does not perform any operation on the database.
If the JobScheduler does not contain any job, the function simply does nothing.
To guarantee reliability, if the jobs to be scheduled are somehow related to other database entities that are being created or updated, this function should be invoked after the database transaction that created or updated those other entities together with the pending job records (see CreatePendingJobs).
func (*JobScheduler) PushJobsAndDeletePendingJobs ¶
PushJobsAndDeletePendingJobs sequentially calls PushJobs and DeletePendingJobs.
If PushJobs fails, the error is returned immediately and the second operation is not performed.
func (*JobScheduler) PushJobsWithClient ¶
func (js *JobScheduler) PushJobsWithClient(c *faktory.Client) error
PushJobsWithClient performs the same operations of PushJobs, but accepts a Faktory Client (instead of a job context).
func (*JobScheduler) PushJobsWithClientAndDeletePendingJobs ¶
func (js *JobScheduler) PushJobsWithClientAndDeletePendingJobs(c *faktory.Client, tx *gorm.DB) error
PushJobsWithClientAndDeletePendingJobs sequentially calls PushJobsWithClient and DeletePendingJobs.
If PushJobs fails, the error is returned immediately and the second operation is not performed.