jobs

package
v1.1.97 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2024 License: Apache-2.0 Imports: 8 Imported by: 10

README

SDK Jobs

The jobs library is used to coordinate tasks that run within an agent. There are 4 job types, explained below, that may be used. Single run, Retry, Interval, and Scheduled jobs.

The jobs library keeps track of all jobs that are registered and executes them appropriately. Scheduled and Interval jobs are continuous jobs that execute more than once. These jobs are continuously executed according to their settings. If any of these continuous jobs begin to fail the library will pause execution of all jobs until a running status is
achieved again.

When using the jobs library, remember that the main process of the agent can not exit, otherwise all jobs will exit

Job status

The following are the possible job status values that can be returned by the GetJobStatus method

Status Definition
Initializing Returned when the has been created but not yet started
Running Returned when a job is being executed, or between executions in a working state
Retrying Returned only for retry job types that return an error in a call to Execute
Stopped Returned when a continuous job is in a non-working state and is waiting to be restarted
Failed Returned when a single run or retry job does not Execute without error
Finished Returned when a single run or retry job Executes properly

Implementing the job interface

Before registering a job, the Job interface has to be implemented for your job

package main

import (
  github.com/Axway/agent-sdk/pkg/jobs
)

type MyJob struct {
  jobs.Job // implements interface
}

func (j *MyJob) Status() error {
  // continually called determining the status of any dependencies for the job
  // returning an error means the job should not be executed
}

func (j *MyJob) Ready() bool {
  // called prior to executing the job the first time
  // return true when the job can begin execution, false otherwise
}

func (j *MyJob) Execute() error {
  // called each time the job should be executed
  // returning an error stops continuous jobs from executing
}

Job types

The section covers the following job types

Single run jobs

Single run jobs are used to run a specific task exactly once, regardless of pass or fail.

Register Single run job

Register the job and get its status

package main

import (
  "fmt"

  "github.com/Axway/agent-sdk/pkg/jobs"
)

func main() {
  myJob := MyJob{}
  jobID, err := jobs.RegisterSingleRunJobWithName(myJob, "My Job")
  if err != nil {
    panic(err) // error registering the job
  }
  fmt.Println(GetJobStatus(jobID))
}
Retry jobs

Retry jobs are like single run jobs, but are allowed to retry a specified number of times before failing

Register Retry job

Register the job and get its status

package main

import (
  "fmt"

  "github.com/Axway/agent-sdk/pkg/jobs"
)

func main() {
  myJob := MyJob{}
  retries := 3
  jobID, err := jobs.RegisterRetryJobWithName(myJob, retries, "My Job")
  if err != nil {
    panic(err) // error registering the job
  }
  fmt.Println(GetJobStatus(jobID))
}
Interval jobs

Interval jobs are executed with a certain time duration between the end of one execution to the beginning of the next

Register Interval job

Register the job and get its status

package main

import (
  "fmt"

  "github.com/Axway/agent-sdk/pkg/jobs"
)

func main() {
  myJob := MyJob{}
  interval := 30 * time.Second
  jobID, err := jobs.RegisterIntervalJobWithName(myJob, interval, "My Job")
  if err != nil {
    panic(err) // error registering the job
  }
  fmt.Println(GetJobStatus(jobID))
}
Detached interval jobs

Detached interval jobs are just like Interval jobs except they are not stopped with other jobs fail, they run independent of the job pool

Scheduled jobs

Scheduled jobs are executed on a certain time frame, the previous execution has to end prior to the next execution starting.

Defining a schedule

Scheduled jobs use a cronjob expressions to set up their schedule. The fields are Seconds, Minutes, Hours, Day of month, Month, Day of week, and Year. There are also predefined expressions that may be used.

Cron expressions are defined with the above fields in a single string each field value separated by a space.

Allowed values

Seconds Minutes Hour Day of month Month Day of week Year
0 - 59 0 - 59 0 - 23 1 - 31 1 - 12 0 - 6 1970 - 2099

All of the fields can also utilize the following characters within their schedule

  • Asterick (*) - matches all values for this field
  • Slash (/) - describes increments steps within the field
  • Comma (,) - separates values within the field to match
  • Hyphen (-) - defines a range of values within the field to match

Examples

Expression Description
* * * * * * * Run every second
30 5-59/15 * * * * * Run 30 seconds past the minute, starting at minute 5 and every 15 minutes there after
0 0 1,5,9,15,21 * * * * Run at hour 1, 5, 9, 15, and 21 of each day
0 0 0 * * 6 * Run at midnight every Saturday

Predefined expressions

Expression Description Equivalent
@hourly Run at the top of the hour 0 0 * * * * *
@daily Run at midnight every day 0 0 0 * * * *
@weekly Run at midnight on Sundays 0 0 0 * * 0 *
@monthly Run at midnight on the first of every month 0 0 0 1 * * *
Register Scheduled job

Register the job and get its status

package main

import (
  "fmt"

  "github.com/Axway/agent-sdk/pkg/jobs"
)

func main() {
  myJob := MyJob{}
  runHalfPastHour := "0 30 * * * * *"
  jobID, err := jobs.RegisterScheduledJobWithName(myJob, runHalfPastHour, "My Job")
  if err != nil {
    panic(err) // error registering the job
  }
  fmt.Println(GetJobStatus(jobID))
}
Channel jobs

Channel jobs are executed a single time via a go routine. It is expected that the channel job runs forever.

To allow this job to be stopped and started the implementer will provide a channel that the jobs pool can use to tell the job to stop.

Channel job implementation example
package myjob

import "github.com/Axway/agent-sdk/pkg/jobs"

type MyJob struct {
  jobs.Job
  workChannel chan interface{}
  stopChannel chan interface{}
}

func NewMyJob(stopChannel chan interface{}) *MyJob {
  return &MyJob{
    stopChannel: stopChannel,
    workChannel: make(chan interface{}),
  }
}

func (m *MyJob) Ready() bool {
  // validate all dependencies are ready
  return true
}

func (m *MyJob) Status() error {
  // check that the job is still running without error
  return nil
}

func (m *MyJob) Execute() error {
  for {
    select {
    case <-m.stopChannel:
      //stop the job
      return nil
    case msg := <-workChannel:
      //do work on msg
  }
  }
}

Register Channel job

Register the job and get its status

package main

import (
  "fmt"

  "github.com/Axway/agent-sdk/pkg/jobs"
)

func main() {
  stopChannel := make(chan interface{})
  myJob := NewMyJob(stopChannel)
  jobID, err := jobs.RegisterChannelJobWithName(myJob, stopChannel, "My Job")
  if err != nil {
    panic(err) // error registering the job
  }
  fmt.Println(GetJobStatus(jobID))
}

Job locks

All continuous jobs (Interval and Scheduled) create locks that the agent can use to prevent the job from running at the same time as another process or job. The job will lock itself prior to calling its Execute function and unlock itself after Execute has finished

Here is an example of how to create 2 jobs that can not execute at the same time.

package main

import (
  github.com/Axway/agent-sdk/pkg/jobs
)

type FirstJob struct {
  jobs.Job // implements interface
}

func (j *FirstJob) Status() error {
  ...
}

func (j *FirstJob) Ready() bool {
  ...
}

func (j *FirstJob) Execute() error {
  ...
}

type SecondJob struct {
  jobs.Job // implements interface
  firstJobID string
}

func (j *SecondJob) Status() error {
  ...
}

func (j *SecondJob) Ready() bool {
  ...
}

func (j *SecondJob) Execute() error {
  jobs.JobLock(j.firstJobID)
  defer jobs.JobUnlock(j.firstJobID)
  ...
}

func main() {
  myFirstJob := FirstJob{}
  jobID, err := jobs.RegisterIntervalJob(myFirstJob, 30 * time.Second)
  if err != nil {
    panic(err) // error registering the job
  }

  mySecondJob := jobID{
    firstJobID: jobID,
  }
  _, err := jobs.RegisterIntervalJob(mySecondJob, 30 * time.Second)
  if err != nil {
    panic(err) // error registering the job
  }
}

Documentation

Index

Constants

View Source
const (
	JobTypeSingleRun        = "single run"
	JobTypeRetry            = "retry"
	JobTypeInterval         = "interval"
	JobTypeChannel          = "channel"
	JobTypeDetachedChannel  = "detached channel"
	JobTypeDetachedInterval = "detached interval"
	JobTypeScheduled        = "scheduled"
)

Job type strings

Variables

View Source
var (
	ErrRegisteringJob    = errors.Newf(1600, "%v job registration failed")
	ErrExecutingJob      = errors.Newf(1601, "Error in %v job %v execution")
	ErrExecutingRetryJob = errors.Newf(1602, "Error in %v job %v execution, %v more retries")
)

Errors hit when validating Amplify Central connectivity

Functions

func GetJobStatus

func GetJobStatus(id string) string

GetJobStatus - Returns the Status of the Job based on the id in the globalPool

func GetStatus

func GetStatus() string

GetStatus - Returns the status from the globalPool

func JobLock

func JobLock(id string)

JobLock - Locks the job, returns when the lock is granted

func JobUnlock

func JobUnlock(id string)

JobUnlock - Unlocks the job

func RegisterChannelJob

func RegisterChannelJob(newJob Job, stopChan chan interface{}) (string, error)

RegisterChannelJob - Runs a job with a specific interval between each run in the globalPool

func RegisterChannelJobWithName

func RegisterChannelJobWithName(newJob Job, stopChan chan interface{}, name string) (string, error)

RegisterChannelJobWithName - Runs a job with a specific interval between each run in the globalPool

func RegisterDetachedChannelJob

func RegisterDetachedChannelJob(newJob Job, stopChan chan interface{}) (string, error)

RegisterDetachedChannelJob - Runs a job with a stop channel, detached from other jobs in the globalPool

func RegisterDetachedChannelJobWithName

func RegisterDetachedChannelJobWithName(newJob Job, stopChan chan interface{}, name string) (string, error)

RegisterDetachedChannelJobWithName - Runs a named job with a stop channel, detached from other jobs in the globalPool

func RegisterDetachedIntervalJob

func RegisterDetachedIntervalJob(newJob Job, interval time.Duration) (string, error)

RegisterDetachedIntervalJob - Runs a job with a specific interval between each run in the globalPool, detached from other jobs to always run

func RegisterDetachedIntervalJobWithName

func RegisterDetachedIntervalJobWithName(newJob Job, interval time.Duration, name string) (string, error)

RegisterDetachedIntervalJobWithName - Runs a job with a specific interval between each run in the globalPool, detached from other jobs to always run

func RegisterIntervalJob

func RegisterIntervalJob(newJob Job, interval time.Duration, opts ...jobOpt) (string, error)

RegisterIntervalJob - Runs a job with a specific interval between each run in the globalPool

func RegisterIntervalJobWithName

func RegisterIntervalJobWithName(newJob Job, interval time.Duration, name string, opts ...jobOpt) (string, error)

RegisterIntervalJobWithName - Runs a job with a specific interval between each run in the globalPool

func RegisterRetryJob

func RegisterRetryJob(newJob Job, retries int) (string, error)

RegisterRetryJob - Runs a job with a WithName

func RegisterRetryJobWithName

func RegisterRetryJobWithName(newJob Job, retries int, name string) (string, error)

RegisterRetryJobWithName - Runs a job with a limited number of retries in the globalPool

func RegisterScheduledJob

func RegisterScheduledJob(newJob Job, schedule string, opts ...jobOpt) (string, error)

RegisterScheduledJob - Runs a job on a specific schedule in the globalPool

func RegisterScheduledJobWithName

func RegisterScheduledJobWithName(newJob Job, schedule, name string, opts ...jobOpt) (string, error)

RegisterScheduledJobWithName - Runs a job on a specific schedule in the globalPool

func RegisterSingleRunJob

func RegisterSingleRunJob(newJob Job) (string, error)

RegisterSingleRunJob - Runs a single run job in the globalPool

func RegisterSingleRunJobWithName

func RegisterSingleRunJobWithName(newJob Job, name string) (string, error)

RegisterSingleRunJobWithName - Runs a single run job in the globalPool

func UnregisterJob

func UnregisterJob(jobID string)

UnregisterJob - Removes the specified job in the globalPool

func UpdateDurations

func UpdateDurations(retryInterval time.Duration, executionTimeout time.Duration)

UpdateDurations - updates settings int he jobs library

func WithJobTimeout added in v1.1.43

func WithJobTimeout(timeout time.Duration) jobOpt

Types

type Job

type Job interface {
	Execute() error
	Status() error
	Ready() bool
}

Job - the job interface, users of this library need to implement these

type JobExecution

type JobExecution interface {
	GetStatus() JobStatus
	GetID() string
	GetName() string
	Ready() bool
	GetJob() JobExecution
	Lock()
	Unlock()
	// contains filtered or unexported methods
}

JobExecution - the wrapper interface for every type of job

controls the calling the methods defined in the Job interface

func GetJob

func GetJob(id string) JobExecution

GetJob - Returns the Job based on the id from the globalPool

type JobStatus

type JobStatus int

JobStatus - integer to represent the status of the job

const (
	//JobStatusInitializing - Initializing
	JobStatusInitializing JobStatus = iota
	//JobStatusRunning - Running
	JobStatusRunning
	//JobStatusRetrying - Retrying
	JobStatusRetrying
	//JobStatusStopped - Stopped
	JobStatusStopped
	//JobStatusFailed - Failed
	JobStatusFailed
	//JobStatusFinished - Finished
	JobStatusFinished
)

func (JobStatus) String

func (s JobStatus) String() string

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool - represents a pool of jobs that are related in such a way that when one is not running none of them should be

func (*Pool) GetJob

func (p *Pool) GetJob(id string) JobExecution

GetJob - Returns the Job based on the id

func (*Pool) GetJobStatus

func (p *Pool) GetJobStatus(id string) string

GetJobStatus - Returns the Status of the Job based on the id

func (*Pool) GetStatus

func (p *Pool) GetStatus() string

GetStatus - returns the status of the pool of jobs

func (*Pool) JobLock

func (p *Pool) JobLock(id string)

JobLock - Locks the job, returns when the lock is granted

func (*Pool) JobUnlock

func (p *Pool) JobUnlock(id string)

JobUnlock - Unlocks the job

func (*Pool) RegisterChannelJob

func (p *Pool) RegisterChannelJob(newJob Job, stopChan chan interface{}) (string, error)

RegisterChannelJob - Runs a job with a specific interval between each run

func (*Pool) RegisterChannelJobWithName

func (p *Pool) RegisterChannelJobWithName(newJob Job, stopChan chan interface{}, name string) (string, error)

RegisterChannelJobWithName - Runs a job with a specific interval between each run

func (*Pool) RegisterDetachedChannelJob

func (p *Pool) RegisterDetachedChannelJob(newJob Job, stopChan chan interface{}) (string, error)

RegisterDetachedChannelJob - Runs a job with a stop channel, detached from other jobs

func (*Pool) RegisterDetachedChannelJobWithName

func (p *Pool) RegisterDetachedChannelJobWithName(newJob Job, stopChan chan interface{}, name string) (string, error)

RegisterDetachedChannelJobWithName - Runs a named job with a stop channel, detached from other jobs

func (*Pool) RegisterDetachedIntervalJob

func (p *Pool) RegisterDetachedIntervalJob(newJob Job, interval time.Duration, opts ...jobOpt) (string, error)

RegisterDetachedIntervalJob - Runs a job with a specific interval between each run, detached from other jobs

func (*Pool) RegisterDetachedIntervalJobWithName

func (p *Pool) RegisterDetachedIntervalJobWithName(newJob Job, interval time.Duration, name string, opts ...jobOpt) (string, error)

RegisterDetachedIntervalJobWithName - Runs a job with a specific interval between each run, detached from other jobs

func (*Pool) RegisterIntervalJob

func (p *Pool) RegisterIntervalJob(newJob Job, interval time.Duration, opts ...jobOpt) (string, error)

RegisterIntervalJob - Runs a job with a specific interval between each run

func (*Pool) RegisterIntervalJobWithName

func (p *Pool) RegisterIntervalJobWithName(newJob Job, interval time.Duration, name string, opts ...jobOpt) (string, error)

RegisterIntervalJobWithName - Runs a job with a specific interval between each run

func (*Pool) RegisterRetryJob

func (p *Pool) RegisterRetryJob(newJob Job, retries int) (string, error)

RegisterRetryJob - Runs a job with a limited number of retries

func (*Pool) RegisterRetryJobWithName

func (p *Pool) RegisterRetryJobWithName(newJob Job, retries int, name string) (string, error)

RegisterRetryJobWithName - Runs a job with a limited number of retries

func (*Pool) RegisterScheduledJob

func (p *Pool) RegisterScheduledJob(newJob Job, schedule string, opts ...jobOpt) (string, error)

RegisterScheduledJob - Runs a job on a specific schedule

func (*Pool) RegisterScheduledJobWithName

func (p *Pool) RegisterScheduledJobWithName(newJob Job, schedule, name string, opts ...jobOpt) (string, error)

RegisterScheduledJobWithName - Runs a job on a specific schedule

func (*Pool) RegisterSingleRunJob

func (p *Pool) RegisterSingleRunJob(newJob Job) (string, error)

RegisterSingleRunJob - Runs a single run job

func (*Pool) RegisterSingleRunJobWithName

func (p *Pool) RegisterSingleRunJobWithName(newJob Job, name string) (string, error)

RegisterSingleRunJobWithName - Runs a single run job

func (*Pool) SetStatus

func (p *Pool) SetStatus(status PoolStatus)

SetStatus - Sets the status of the pool of jobs

func (*Pool) UnregisterJob

func (p *Pool) UnregisterJob(jobID string)

UnregisterJob - Removes the specified job

type PoolStatus

type PoolStatus int

PoolStatus - integer to represent the status of the jobs in the pool

const (
	//PoolStatusInitializing - Initializing
	PoolStatusInitializing PoolStatus = iota
	//PoolStatusRunning - Running
	PoolStatusRunning
	//PoolStatusStopped - Stopped
	PoolStatusStopped
)

func (PoolStatus) String

func (s PoolStatus) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL