Documentation ¶
Overview ¶
Package engine implements the core logic of the scheduler service.
Index ¶
- Variables
- type Action
- type Config
- type Engine
- type EngineInternal
- type Invocation
- type Job
- type JobState
- type RecordOverrunAction
- type StartInvocationAction
- type StateKind
- type StateMachine
- func (m *StateMachine) OnInvocationDone(invocationID int64)
- func (m *StateMachine) OnInvocationStarted(invocationID int64)
- func (m *StateMachine) OnInvocationStarting(invocationNonce, invocationID int64, retryCount int)
- func (m *StateMachine) OnJobDisabled()
- func (m *StateMachine) OnJobEnabled()
- func (m *StateMachine) OnManualAbort()
- func (m *StateMachine) OnManualInvocation(triggeredBy identity.Identity) error
- func (m *StateMachine) OnScheduleChange()
- func (m *StateMachine) OnTimerTick(tickNonce int64) error
- type TickLaterAction
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type Action ¶
type Action interface {
IsAction() bool
}
Action is a particular action to perform when switching the state. Can be type cast to some concrete *Action struct.
type Config ¶
type Config struct { Catalog catalog.Catalog // provides task.Manager's to run tasks TimersQueuePath string // URL of a task queue handler for timer ticks TimersQueueName string // queue name for timer ticks InvocationsQueuePath string // URL of a task queue handler that starts jobs InvocationsQueueName string // queue name for job starts PubSubPushPath string // URL to use in PubSub push config }
Config contains parameters for the engine.
type Engine ¶
type Engine interface { // GetVisibleJobs returns a list of all enabled scheduler jobs in no // particular order. GetVisibleJobs(c context.Context) ([]*Job, error) // GetVisibleProjectJobs returns a list of enabled scheduler jobs of some // project in no particular order. GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, error) // GetVisibleJob returns single scheduler job given its full ID. // ErrNoSuchJob error is returned if job doesn't exist OR isn't visible. GetVisibleJob(c context.Context, jobID string) (*Job, error) // ListVisibleInvocations returns invocations of a visible job, most recent first. // Returns fetched invocations and cursor string if there's more. // error is ErrNoSuchJob if job doesn't exist or isn't visible. ListVisibleInvocations(c context.Context, jobID string, pageSize int, cursor string) ([]*Invocation, string, error) // GetVisibleInvocation returns single invocation of some job given its ID. // ErrNoSuchInvocation is returned if either job or invocation doesn't exist // or job and hence invocation isn't visible. GetVisibleInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) // GetVisibleInvocationsByNonce returns a list of Invocations with given nonce. // // Invocation nonce is a random number that identifies an intent to start // an invocation. Normally one nonce corresponds to one Invocation entity, // but there can be more if job fails to start with a transient error. GetVisibleInvocationsByNonce(c context.Context, invNonce int64) ([]*Invocation, error) // PauseJob replaces job's schedule with "triggered", effectively preventing // it from running automatically (until it is resumed). Manual invocations are // still allowed. Does nothing if job is already paused. Any pending or // running invocations are still executed. PauseJob(c context.Context, jobID string) error // ResumeJob resumes paused job. Does nothing if the job is not paused. ResumeJob(c context.Context, jobID string) error // AbortJob resets the job to scheduled state, aborting a currently pending or // running invocation (if any). // // Returns nil if the job is not currently running. AbortJob(c context.Context, jobID string) error // AbortInvocation forcefully moves the invocation to failed state. // // It opportunistically tries to send "abort" signal to a job runner if it // supports cancellation, but it doesn't wait for reply. It proceeds to // modifying local state in the scheduler service datastore immediately. // // AbortInvocation can be used to manually "unstuck" jobs that got stuck due // to missing PubSub notifications or other kinds of unexpected conditions. // // Does nothing if invocation is already in some final state. AbortInvocation(c context.Context, jobID string, invID int64) error // TriggerInvocation launches job invocation right now if job isn't running // now. Used by "Run now" UI button. // // Returns new invocation nonce (a random number that identifies an intent to // start an invocation). Normally one nonce corresponds to one Invocation // entity, but there can be more if job fails to start with a transient error. TriggerInvocation(c context.Context, jobID string) (int64, error) }
Engine manages all scheduler jobs: keeps track of their state, runs state machine transactions, starts new invocations, etc. A method returns errors.Transient if the error is non-fatal and the call should be retried later. Any other error means that retry won't help. ACLs are enforced unlike EngineInternal with the following implications:
- if caller lacks READER access to Jobs, methods behave as if Jobs do not exist.
- if caller lacks OWNER access, calling mutating methods will result in ErrNoOwnerPermission (assuming caller has READER access, else see above).
type EngineInternal ¶
type EngineInternal interface { // GetAllProjects returns a list of all projects that have at least one // enabled scheduler job. GetAllProjects(c context.Context) ([]string, error) // UpdateProjectJobs adds new, removes old and updates existing jobs. UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error // ResetAllJobsOnDevServer forcefully resets state of all enabled jobs. // Supposed to be used only on devserver, where task queue stub state is not // preserved between appserver restarts and it messes everything. ResetAllJobsOnDevServer(c context.Context) error // ExecuteSerializedAction is called via a task queue to execute an action // produced by job state machine transition. These actions are POSTed // to TimersQueue and InvocationsQueue defined in Config by Engine. // 'retryCount' is 0 on first attempt, 1 if task queue service retries // request once, 2 - if twice, and so on. Returning transient errors here // causes the task queue to retry the task. ExecuteSerializedAction(c context.Context, body []byte, retryCount int) error // ProcessPubSubPush is called whenever incoming PubSub message is received. ProcessPubSubPush(c context.Context, body []byte) error // PullPubSubOnDevServer is called on dev server to pull messages from PubSub // subscription associated with given publisher. // // It is needed to be able to manually tests PubSub related workflows on dev // server, since dev server can't accept PubSub push messages. PullPubSubOnDevServer(c context.Context, taskManagerName, publisher string) error // PublicAPI returns ACL-enforced API. PublicAPI() Engine }
EngineInternal is to be used by frontend initialization code only.
func NewEngine ¶
func NewEngine(conf Config) EngineInternal
NewEngine returns default implementation of EngineInternal.
type Invocation ¶
type Invocation struct { // ID is identifier of this particular attempt to run a job. Multiple attempts // to start an invocation result in multiple entities with different IDs, but // with same InvocationNonce. ID int64 `gae:"$id"` // JobKey is the key of parent Job entity. JobKey *ds.Key `gae:"$parent"` // Started is time when this invocation was created. Started time.Time `gae:",noindex"` // Finished is time when this invocation transitioned to a terminal state. Finished time.Time `gae:",noindex"` // InvocationNonce identifies a request to start a job, produced by // StateMachine. InvocationNonce int64 // TriggeredBy is identity of whoever triggered the invocation, if it was // triggered via TriggerInvocation ("Run now" button). // // Empty identity string if it was triggered by the service itself. TriggeredBy identity.Identity // Revision is revision number of config.cfg when this invocation was created. // For informational purpose. Revision string `gae:",noindex"` // RevisionURL is URL to human readable page with config file at // an appropriate revision. For informational purpose. RevisionURL string `gae:",noindex"` // Task is the job payload for this invocation in binary serialized form. // For informational purpose. See Catalog.UnmarshalTask(). Task []byte `gae:",noindex"` // DebugLog is short free form text log with debug messages. DebugLog string `gae:",noindex"` // RetryCount is 0 on a first attempt to launch the task. Increased with each // retry. For informational purposes. RetryCount int64 `gae:",noindex"` // Status is current status of the invocation (e.g. "RUNNING"), see the enum. Status task.Status // ViewURL is optional URL to a human readable page with task status, e.g. // Swarming task page. Populated by corresponding TaskManager. ViewURL string `gae:",noindex"` // TaskData is a storage where TaskManager can keep task-specific state // between calls. TaskData []byte `gae:",noindex"` // MutationsCount is used for simple compare-and-swap transaction control. // It is incremented on each change to the entity. See 'saveImpl' below. MutationsCount int64 `gae:",noindex"` // contains filtered or unexported fields }
Invocation entity stores single attempt to run a job. Its parent entity is corresponding Job, its ID is generated based on time.
type Job ¶
type Job struct { // JobID is '<ProjectID>/<JobName>' string. JobName is unique with a project, // but not globally. JobID is unique globally. JobID string `gae:"$id"` // ProjectID exists for indexing. It matches <projectID> portion of JobID. ProjectID string // Flavor describes what category of jobs this is, see the enum. Flavor catalog.JobFlavor `gae:",noindex"` // Enabled is false if the job was disabled or removed from config. // // Disabled jobs do not show up in UI at all (they are still kept in the // datastore though, for audit purposes). Enabled bool // Paused is true if job's schedule is ignored and job can only be started // manually via "Run now" button. Paused bool `gae:",noindex"` // Revision is last seen job definition revision. Revision string `gae:",noindex"` // RevisionURL is URL to human readable page with config file at // an appropriate revision. RevisionURL string `gae:",noindex"` // Schedule is the job's schedule in regular cron expression format. Schedule string `gae:",noindex"` // Task is the job's payload in serialized form. Opaque from the point of view // of the engine. See Catalog.UnmarshalTask(). Task []byte `gae:",noindex"` // ACLs are the latest ACLs applied to Job and all its invocations. Acls acl.GrantsByRole `gae:",noindex"` // State is the job's state machine state, see StateMachine. State JobState // contains filtered or unexported fields }
Job stores the last known definition of a scheduler job, as well as its current state. Root entity, its kind is "Job".
type JobState ¶
type JobState struct { // State is the overall state of the job (like "RUNNING" or "DISABLED", etc). State StateKind // Overruns is how many times current invocation overran. Overruns int // TickNonce is id of the next expected OnTimerTick event. TickNonce int64 `gae:",noindex"` // TickTime is when the OnTimerTick event is expected. TickTime time.Time `gae:",noindex"` // PrevTime is when last invocation has finished (successfully or not). PrevTime time.Time `gae:",noindex"` // InvocationNonce is id of the currently queued or running invocation // request. Once it is processed, it produces some new invocation identified // by InvocationID. Single InvocationNonce can spawn many InvocationIDs due to // retries on transient errors when starting an invocation. Only one of them // will end up in Running state though. InvocationNonce int64 `gae:",noindex"` // InvocationRetryCount is how many times an invocation request (identified by // InvocationNonce) was attempted to start and failed. InvocationRetryCount int `gae:",noindex"` // InvocationTime is when the current invocation request was queued. InvocationTime time.Time `gae:",noindex"` // InvocationID is ID of currently running invocation or 0 if none is running. InvocationID int64 `gae:",noindex"` }
JobState contains the current state of a job state machine.
func (*JobState) IsExpectingInvocation ¶
IsExpectingInvocation returns true if the state machine accepts OnInvocationStarting event with given nonce.
func (*JobState) IsRetrying ¶
IsRetrying is true if there some invocation queued that is a retry of a previously attempted invocation.
An invocation is retried when it fails to transition out from STARTING state (e.g. crashes before switching to RUNNING or any of the final states).
type RecordOverrunAction ¶
RecordOverrunAction instructs Engine to record overrun event.
An overrun happens when job's schedule indicates that a new job invocation should start now, but previous one is still running.
func (RecordOverrunAction) IsAction ¶
func (a RecordOverrunAction) IsAction() bool
IsAction makes RecordOverrunAction implement Action interface.
type StartInvocationAction ¶
StartInvocationAction enqueues invocation of the actual job. OnInvocationDone(invocationNonce) will be called sometime later when the job is done.
func (StartInvocationAction) IsAction ¶
func (a StartInvocationAction) IsAction() bool
IsAction makes StartInvocationAction implement Action interface.
type StateKind ¶
type StateKind string
StateKind defines high-level state of the job. See JobState for full state description: it's a StateKind plus additional parameters.
Note that this is low-level state that is not directly visible in the UI. For UI we derive more user-friendly state labels by examining JobState and taking into account job traits. See makeJob in ui/presentation.go.
const ( // JobStateDisabled means the job is disabled or have never been seen before. // This is the initial state. JobStateDisabled StateKind = "DISABLED" // JobStateScheduled means the job is scheduled to start sometime in // the future and previous invocation is NOT running currently. JobStateScheduled StateKind = "SCHEDULED" // JobStateSuspended means the job is not running now, and no ticks are // scheduled. It's used for jobs on "triggered" schedule or for paused jobs. // // Technically SUSPENDED is like SCHEDULED with the tick in the distant // future. JobStateSuspended StateKind = "SUSPENDED" // JobStateQueued means the job's invocation has been added to the task queue // and the job should start (switch to "RUNNING" state inside TaskManager's // LaunchTask call) really soon now. // // The engine will keep trying to launch the queued invocation as long as // the job is in "QUEUED" state (e.g. it makes another launch attempt if // LaunchTask crashes). But once the job is in "RUNNING" state, it's the // responsibility of the particular TaskManager implementation to ensure the // invocation state gets updated in a timely manner. // // Some TaskManagers may opt to skip "RUNNING" state completely and do // everything in one go in LaunchTask (by switching invocation to some final // state in the end). This is allowed. If such task crashes before completion, // it will be retried by the engine (because it is in "QUEUED" state). JobStateQueued StateKind = "QUEUED" // JobStateRunning means the job's invocation is running currently and the job // is scheduled to start again sometime in the future. JobStateRunning StateKind = "RUNNING" // JobStateOverrun is same as "RUNNING", except the engine has also detected // an overrun: the job's new invocation should have been started by now, but // the previous one is still running. JobStateOverrun StateKind = "OVERRUN" // JobStateSlowQueue is same as "QUEUED", except the engine has also detected // an overrun: the job's new invocation should have been started by now, but // the previous one is still sitting in the queue. JobStateSlowQueue StateKind = "SLOW_QUEUE" )
type StateMachine ¶
type StateMachine struct { // Inputs. Now time.Time // current time Schedule *schedule.Schedule // knows when to run the job next time Nonce func() int64 // produces a series of nonces on demand // Mutated. State JobState // state of the job, mutated in On* methods Actions []Action // emitted actions // For adhoc logging when debugging locally. Context context.Context }
StateMachine advances state of some single scheduler job. It performs a single step only (one On* call). As input it takes the state of the job and state of the world (the schedule is considered to be a part of the world state). As output it produces a new state and a set of actions. Handler of the state machine must guarantee that if state change was committed, all actions will be executed at least once.
On* methods mutate State or Actions and return errors only on transient conditions fixable by a retry. State and Actions are mutated in place just to simplify APIs. Otherwise all On* transitions can be considered as pure functions (State, World) -> (State, Actions).
The lifecycle of a healthy job: DISABLED -> SCHEDULED -> QUEUED -> QUEUED (starting) -> RUNNING -> SCHEDULED
func (*StateMachine) OnInvocationDone ¶
func (m *StateMachine) OnInvocationDone(invocationID int64)
OnInvocationDone happens when invocation completes.
func (*StateMachine) OnInvocationStarted ¶
func (m *StateMachine) OnInvocationStarted(invocationID int64)
OnInvocationStarted happens when enqueued invocation finally starts to run.
func (*StateMachine) OnInvocationStarting ¶
func (m *StateMachine) OnInvocationStarting(invocationNonce, invocationID int64, retryCount int)
OnInvocationStarting happens when the engine picks up enqueued invocation and attempts to start it. Engine calls OnInvocationStarted when it succeeds at launching the invocation. Engine calls OnInvocationStarting again with another invocationID if previous launch attempt failed.
func (*StateMachine) OnJobDisabled ¶
func (m *StateMachine) OnJobDisabled()
OnJobDisabled happens when job was disabled or removed. It clears the state and cancels any pending invocations (running ones continue to run though).
func (*StateMachine) OnJobEnabled ¶
func (m *StateMachine) OnJobEnabled()
OnJobEnabled happens when a new job (never seen before) was discovered or a previously disabled job was enabled.
func (*StateMachine) OnManualAbort ¶
func (m *StateMachine) OnManualAbort()
OnManualAbort happens when users aborts the queued or running invocation.
func (*StateMachine) OnManualInvocation ¶
func (m *StateMachine) OnManualInvocation(triggeredBy identity.Identity) error
OnManualInvocation happens when user starts invocation via "Run now" button. Manual invocation only works if the job is currently not running or not queued for run (i.e. it is in Scheduled state waiting for a timer tick).
func (*StateMachine) OnScheduleChange ¶
func (m *StateMachine) OnScheduleChange()
OnScheduleChange happens when job's schedule changes (and the job potentially needs to be rescheduled).
func (*StateMachine) OnTimerTick ¶
func (m *StateMachine) OnTimerTick(tickNonce int64) error
OnTimerTick happens when scheduled timer (added with TickLaterAction) ticks.
type TickLaterAction ¶
TickLaterAction schedules an OnTimerTick(tickNonce) call at give moment in time (or close to it). TickNonce is used to skip canceled or repeated ticks.
func (TickLaterAction) IsAction ¶
func (a TickLaterAction) IsAction() bool
IsAction makes TickLaterAction implement Action interface.
Directories ¶
Path | Synopsis |
---|---|
demo
Package demo shows how cron.Machines can be hosted with Datastore and TQ.
|
Package demo shows how cron.Machines can be hosted with Datastore and TQ. |
Package internal contains internal structs used by the engine.
|
Package internal contains internal structs used by the engine. |