engine

package
v0.0.0-...-141b21d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0 Imports: 50 Imported by: 12

Documentation

Overview

Package engine implements the core logic of the scheduler service.

Index

Constants

View Source
const FinishedInvocationsHorizon = 10 * time.Minute

FinishedInvocationsHorizon defines how many invocations to keep in the Job's FinishedInvocations list.

All entries there that are older than FinishedInvocationsHorizon will be evicted next time the list is updated.

Variables

View Source
var (
	// ErrNoPermission indicates the caller doesn't not have permission to perform
	// desired action.
	ErrNoPermission = errors.New("insufficient rights on a job")
	// ErrNoSuchJob indicates the job doesn't exist or not visible.
	ErrNoSuchJob = errors.New("no such job")
	// ErrNoSuchInvocation indicates the invocation doesn't exist or not visible.
	ErrNoSuchInvocation = errors.New("the invocation doesn't exist")
)
View Source
var (
	PermJobsGet     = realms.RegisterPermission("scheduler.jobs.get")
	PermJobsPause   = realms.RegisterPermission("scheduler.jobs.pause")
	PermJobsResume  = realms.RegisterPermission("scheduler.jobs.resume")
	PermJobsAbort   = realms.RegisterPermission("scheduler.jobs.abort")
	PermJobsTrigger = realms.RegisterPermission("scheduler.jobs.trigger")
)

Functions

func CheckPermission

func CheckPermission(ctx context.Context, job *Job, perm realms.Permission) error

CheckPermission returns nil if the caller has the given permission for the job or ErrNoPermission otherwise.

May also return transient errors.

Types

type Config

type Config struct {
	Catalog        catalog.Catalog // provides task.Manager's to run tasks
	Dispatcher     *tq.Dispatcher  // dispatcher for task queue tasks
	PubSubPushPath string          // URL to use in PubSub push config
}

Config contains parameters for the engine.

type DebugJobState

type DebugJobState struct {
	Job                 *Job
	FinishedInvocations []*internal.FinishedInvocation // unmarshalled Job.FinishedInvocationsRaw
	RecentlyFinishedSet []int64                        // in-flight notifications from recentlyFinishedSet()
	PendingTriggersSet  []*internal.Trigger            // triggers from pendingTriggersSet()
	ManagerState        *internal.DebugManagerState    // whatever task.Manager wants to report
}

DebugJobState contains detailed information about a job.

The state is not a transactional snapshot. Shouldn't be used for anything other than just displaying it to humans.

type Engine

type Engine interface {
	// GetVisibleJobs returns all enabled visible jobs.
	//
	// Returns them in no particular order.
	GetVisibleJobs(c context.Context) ([]*Job, error)

	// GetVisibleProjectJobs returns enabled visible jobs belonging to a project.
	//
	// Returns them in no particular order.
	GetVisibleProjectJobs(c context.Context, projectID string) ([]*Job, error)

	// GetVisibleJob returns a single visible job given its full ID.
	//
	// ErrNoSuchJob error is returned if either:
	//   * job doesn't exist,
	//   * job is disabled (i.e. was removed from its project config),
	//   * job isn't visible due to lack of "scheduler.jobs.get" permission.
	GetVisibleJob(c context.Context, jobID string) (*Job, error)

	// GetVisibleJobBatch is like GetVisibleJob, except it operates on a batch of
	// jobs at once.
	//
	// Returns a mapping (jobID => *Job) with only visible jobs. If the check
	// fails returns a transient error.
	GetVisibleJobBatch(c context.Context, jobIDs []string) (map[string]*Job, error)

	// ListInvocations returns invocations of a given job, sorted by their
	// creation time (most recent first).
	//
	// Can optionally return only active invocations (i.e. ones that are pending,
	// starting or running) or only finished ones. See ListInvocationsOpts.
	//
	// Returns invocations and a cursor string if there's more. Returns only
	// transient errors.
	ListInvocations(c context.Context, job *Job, opts ListInvocationsOpts) ([]*Invocation, string, error)

	// GetInvocation returns an invocation of a given job.
	//
	// ErrNoSuchInvocation is returned if the invocation doesn't exist.
	GetInvocation(c context.Context, job *Job, invID int64) (*Invocation, error)

	// PauseJob prevents new automatic invocations of a job.
	//
	// It clears the pending triggers queue, and makes the job ignore all incoming
	// triggers until it is resumed.
	//
	// For cron jobs it also replaces job's schedule with "triggered", effectively
	// preventing them from running automatically (until unpaused).
	//
	// Does nothing if the job is already paused. Any pending or running
	// invocations are still executed.
	PauseJob(c context.Context, job *Job, reason string) error

	// ResumeJob resumes paused job. Does nothing if the job is not paused.
	ResumeJob(c context.Context, job *Job, reason string) error

	// AbortJob aborts all currently pending or running invocations (if any).
	AbortJob(c context.Context, job *Job) error

	// AbortInvocation forcefully moves the invocation to a failed state.
	//
	// It opportunistically tries to send "abort" signal to a job runner if it
	// supports cancellation, but it doesn't wait for reply (proceeds to
	// modifying the local state in the scheduler service datastore immediately).
	//
	// AbortInvocation can be used to manually "unstuck" jobs that got stuck due
	// to missing PubSub notifications or other kinds of unexpected conditions.
	//
	// Does nothing if the invocation is already in some final state.
	AbortInvocation(c context.Context, job *Job, invID int64) error

	// EmitTriggers puts one or more triggers into pending trigger queues of the
	// specified jobs.
	//
	// If the caller has no permission to trigger at least one job, the entire
	// call is aborted. Otherwise, the call is NOT transactional, but can be
	// safely retried (triggers are deduplicated based on their IDs).
	EmitTriggers(c context.Context, perJob map[*Job][]*internal.Trigger) error

	// ListTriggers returns list of job's pending triggers sorted by time, most
	// recent last.
	ListTriggers(c context.Context, job *Job) ([]*internal.Trigger, error)

	// GetJobTriageLog returns a log from the latest job triage procedure.
	//
	// Returns nil if it is not available (for example, the job was just created).
	GetJobTriageLog(c context.Context, job *Job) (*JobTriageLog, error)
}

Engine manages all scheduler jobs: keeps track of their state, runs state machine transactions, starts new invocations, etc.

A method returns errors.Transient if the error is non-fatal and the call should be retried later. Any other error means that retry won't help.

The general pattern for doing something to a job is to get a reference to it via GetVisibleJob() (this call checks "scheduler.jobs.get" permission), and then pass *Job to desired methods (which may additionally check for more permissions).

ACLs are enforced with the following implication:

  • if a caller lacks "scheduler.jobs.get" permission for a job, methods behave as if the job doesn't exist.
  • if a caller has "scheduler.jobs.get" permission but lacks some other permission required to execute an action (e.g. "scheduler.jobs.abort"), ErrNoPermission will be returned.

Use EngineInternal if you need to skip ACL checks.

type EngineInternal

type EngineInternal interface {
	// PublicAPI returns ACL-enforcing API.
	PublicAPI() Engine

	// GetAllProjects returns projects that have at least one enabled job.
	GetAllProjects(c context.Context) ([]string, error)

	// UpdateProjectJobs adds new, removes old and updates existing jobs.
	UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error

	// ResetAllJobsOnDevServer forcefully resets state of all enabled jobs.
	//
	// Supposed to be used only on devserver, where task queue stub state is not
	// preserved between appserver restarts and it messes everything.
	ResetAllJobsOnDevServer(c context.Context) error

	// ProcessPubSubPush is called whenever incoming PubSub message is received.
	//
	// May return an error tagged with tq.Retry or transient.Tag. They indicate
	// the message should be redelivered later.
	ProcessPubSubPush(c context.Context, body []byte, urlValues url.Values) error

	// PullPubSubOnDevServer is called on dev server to pull messages from PubSub
	// subscription associated with given publisher.
	//
	// It is needed to be able to manually tests PubSub related workflows on dev
	// server, since dev server can't accept PubSub push messages.
	PullPubSubOnDevServer(c context.Context, taskManagerName, publisher string) error

	// GetDebugJobState is used by Admin RPC interface for debugging jobs.
	//
	// It fetches Job entity, pending triggers and pending completion
	// notifications.
	GetDebugJobState(c context.Context, jobID string) (*DebugJobState, error)
}

EngineInternal is a variant of engine API that skips ACL checks.

func NewEngine

func NewEngine(cfg Config) EngineInternal

NewEngine returns default implementation of EngineInternal.

type Invocation

type Invocation struct {

	// ID is identifier of this particular attempt to run a job.
	ID int64 `gae:"$id"`

	// JobID is '<ProjectID>/<JobName>' string of a parent job.
	//
	// Set when the invocation is created and never changes.
	JobID string `gae:",noindex"`

	// IndexedJobID is '<ProjectID>/<JobName>' string of a parent job, but it is
	// set only for finished invocations.
	//
	// It is used to make the invocations appear in the listings of finished
	// invocations.
	//
	// We can't use JobID field for this since the invocation launch procedure can
	// potentially generate orphaned "garbage" invocations in some edge cases (if
	// Invocation transaction lands, but separate Job transaction doesn't). They
	// are harmless, but we don't want them to show up in listings.
	IndexedJobID string

	// RealmID is a global realm name (i.e. "<ProjectID>:...") the invocation
	// belongs to.
	//
	// It is copied from the Job entity when the invocation is created. May be
	// empty for old invocations.
	RealmID string `gae:",noindex"`

	// Started is time when this invocation was created.
	Started time.Time `gae:",noindex"`

	// Finished is time when this invocation transitioned to a terminal state.
	Finished time.Time `gae:",noindex"`

	// TriggeredBy is identity of whoever triggered the invocation, if it was
	// triggered via a single trigger submitted by some external user (not by the
	// service itself).
	//
	// Empty identity string if it was triggered by the service itself.
	TriggeredBy identity.Identity

	// PropertiesRaw is a blob with serialized task.Request.Properties supplied
	// when the invocation was created.
	//
	// Task managers use it to prepare the parameters for tasks.
	PropertiesRaw []byte `gae:",noindex"`

	// Tags is a sorted list of indexed "key:value" pairs supplied via
	// task.Request.Tags when the invocation was created.
	//
	// May be passed down the stack by task managers.
	Tags []string

	// IncomingTriggersRaw is a serialized list of triggers that the invocation
	// consumed.
	//
	// They are popped from job's pending triggers set when the invocation
	// starts.
	//
	// Use IncomingTriggers() function to grab them in deserialized form.
	IncomingTriggersRaw []byte `gae:",noindex"`

	// OutgoingTriggersRaw is a serialized list of triggers that the invocation
	// produced.
	//
	// They are fanned out into pending trigger sets of corresponding triggered
	// jobs (specified by TriggeredJobIDs).
	//
	// Use OutgoingTriggers() function to grab them in deserialized form.
	OutgoingTriggersRaw []byte `gae:",noindex"`

	// PendingTimersRaw is a serialized list of pending invocation timers.
	//
	// Timers are emitted by Controller's AddTimer call.
	//
	// Use PendingTimers() function to grab them in deserialized form.
	PendingTimersRaw []byte `gae:",noindex"`

	// Revision is revision number of config.cfg when this invocation was created.
	// For informational purpose.
	Revision string `gae:",noindex"`

	// RevisionURL is URL to human readable page with config file at
	// an appropriate revision. For informational purpose.
	RevisionURL string `gae:",noindex"`

	// Task is the job payload for this invocation in binary serialized form.
	// For informational purpose. See Catalog.UnmarshalTask().
	Task []byte `gae:",noindex"`

	// TriggeredJobIDs is a list of jobIDs of jobs which this job triggers.
	// The list is sorted and without duplicates.
	TriggeredJobIDs []string `gae:",noindex"`

	// DebugLog is short free form text log with debug messages.
	DebugLog string `gae:",noindex"`

	// RetryCount is 0 on a first attempt to launch the task. Increased with each
	// retry. For informational purposes.
	RetryCount int64 `gae:",noindex"`

	// Status is current status of the invocation (e.g. "RUNNING"), see the enum.
	Status task.Status

	// ViewURL is optional URL to a human readable page with task status, e.g.
	// Swarming task page. Populated by corresponding TaskManager.
	ViewURL string `gae:",noindex"`

	// TaskData is a storage where TaskManager can keep task-specific state
	// between calls.
	TaskData []byte `gae:",noindex"`

	// MutationsCount is used for simple compare-and-swap transaction control.
	//
	// It is incremented on each change to the entity.
	MutationsCount int64 `gae:",noindex"`
	// contains filtered or unexported fields
}

Invocation entity stores single invocation of a job (with perhaps multiple attempts due retries if the invocation fails to start).

Root entity. ID is generated based on time by generateInvocationID() function.

func (*Invocation) GetProjectID

func (e *Invocation) GetProjectID() string

GetProjectID parses the ProjectID from the JobID and returns it.

func (*Invocation) IncomingTriggers

func (e *Invocation) IncomingTriggers() ([]*internal.Trigger, error)

IncomingTriggers is a list of triggers that the invocation consumed.

It is deserialized on the fly from IncomingTriggersRaw.

func (*Invocation) OutgoingTriggers

func (e *Invocation) OutgoingTriggers() ([]*internal.Trigger, error)

OutgoingTriggers is a list of triggers that the invocation produced.

It is deserialized on the fly from OutgoingTriggersRaw.

func (*Invocation) PendingTimers

func (e *Invocation) PendingTimers() ([]*internal.Timer, error)

PendingTimers is a list of not-yet-consumed invocation timers.

It is deserialized on the fly from PendingTimersRaw.

type Job

type Job struct {

	// JobID is '<ProjectID>/<JobName>' string. JobName is unique with a project,
	// but not globally. JobID is unique globally.
	JobID string `gae:"$id"`

	// ProjectID exists for indexing. It matches <projectID> portion of JobID.
	ProjectID string

	// RealmID is a global realm name (i.e. "<ProjectID>:...") the job belongs to.
	RealmID string

	// Flavor describes what category of jobs this is, see the enum.
	Flavor catalog.JobFlavor `gae:",noindex"`

	// Enabled is false if the job was disabled or removed from config.
	//
	// Disabled jobs do not show up in UI at all (they are still kept in the
	// datastore though, for audit purposes).
	Enabled bool

	// Paused is true if no new invocations of the job should be started.
	//
	// Paused jobs ignore the cron scheduler and incoming triggers. Triggers are
	// completely skipped (not even enqueued). Pausing a job clears the pending
	// triggers set.
	Paused bool `gae:",noindex"`

	// PausedOrResumedWhen is when the job was paused or resumed.
	PausedOrResumedWhen time.Time `gae:",noindex"`

	// PausedOrResumedBy is who paused or resumed the job the last.
	PausedOrResumedBy identity.Identity `gae:",noindex"`

	// PausedOrResumedReason is the reason the job was paused or resumed.
	PausedOrResumedReason string `gae:",noindex"`

	// Revision is last seen job definition revision.
	Revision string `gae:",noindex"`

	// RevisionURL is URL to human readable page with config file at
	// an appropriate revision.
	RevisionURL string `gae:",noindex"`

	// Schedule is the job's schedule in regular cron expression format.
	Schedule string `gae:",noindex"`

	// Task is the job's payload in serialized form. Opaque from the point of view
	// of the engine. See Catalog.UnmarshalTask().
	Task []byte `gae:",noindex"`

	// TriggeredJobIDs is a list of jobIDs of jobs which this job triggers.
	// The list is sorted and without duplicates.
	TriggeredJobIDs []string `gae:",noindex"`

	// Cron holds the state of the cron state machine.
	Cron cron.State `gae:",noindex"`

	// TriggeringPolicyRaw is job's TriggeringPolicy proto in serialized form.
	//
	// It is taken from the job definition stored in the catalog. Used during
	// the triage.
	TriggeringPolicyRaw []byte `gae:",noindex"`

	// ActiveInvocations is ordered set of active invocation IDs.
	//
	// It contains IDs of pending, running or recently finished invocations,
	// the most recent at the end.
	ActiveInvocations []int64 `gae:",noindex"`

	// FinishedInvocationsRaw is a list of recently finished invocations, along
	// with the time they finished.
	//
	// It is serialized internal.FinishedInvocationList proto, see db.proto. We
	// store it this way to simplify adding more fields if necessary and to avoid
	// paying the cost of the deserialization if the caller is not interested.
	//
	// This list is used to achieve a perfectly consistent listing of all recent
	// invocations of a job.
	//
	// Entries older than FinishedInvocationsHorizon are evicted from this list
	// during triages. We assume that FinishedInvocationsHorizon is enough for
	// datastore indexes to catch up, so all recent invocations older than the
	// horizon can be fetched using a regular datastore query.
	FinishedInvocationsRaw []byte `gae:",noindex"`

	// LastTriage is a time when the last triage transaction was committed.
	LastTriage time.Time `gae:",noindex"`
	// contains filtered or unexported fields
}

Job stores the last known definition of a scheduler job, as well as its current state. Root entity, its kind is "Job".

func (*Job) CronTickTime

func (e *Job) CronTickTime() time.Time

CronTickTime returns time when the cron job is expected to start again.

May return:

Zero time if the job is using relative schedule, or not a cron job at all.
schedule.DistantFuture if the job is paused.

func (*Job) EffectiveSchedule

func (e *Job) EffectiveSchedule() string

EffectiveSchedule returns schedule string to use for the job, considering its Paused field.

Paused jobs always use "triggered" schedule.

func (*Job) IsEqual

func (e *Job) IsEqual(other *Job) bool

IsEqual returns true iff 'e' is equal to 'other'.

func (*Job) JobName

func (e *Job) JobName() string

JobName returns name of this Job as defined its project's config.

This is "<name>" part extracted from "<project>/<name>" job ID.

func (*Job) MatchesDefinition

func (e *Job) MatchesDefinition(def catalog.Definition) bool

MatchesDefinition returns true if job definition in the entity matches the one specified by catalog.Definition struct.

func (*Job) ParseSchedule

func (e *Job) ParseSchedule() (*schedule.Schedule, error)

ParseSchedule returns *Schedule object, parsing e.Schedule field.

If job is paused e.Schedule field is ignored and "triggered" schedule is returned instead.

type JobTriageLog

type JobTriageLog struct {

	// JobID is '<ProjectID>/<JobName>' string, matches corresponding Job.JobID.
	JobID string `gae:"$id"`
	// LastTriage is set to exact same value as corresponding Job.LastTriage.
	LastTriage time.Time `gae:",noindex"`
	// DebugLog is short free form text log with debug messages.
	DebugLog string `gae:",noindex"`
	// contains filtered or unexported fields
}

JobTriageLog contains information about the most recent triage.

To avoid increasing the triage transaction size, and to allow logging triage transaction collisions, this entity is saved non-transactionally in a separate entity group on a best effort basis.

It means it may occasionally be stale. To detect staleness we duplicate LastTriage timestamp here. If Job.LastTriage indicates the triage happened sufficiently log ago (by wall clock), but JobTriageLog.LastTriage is still old, then the log is stale (since JobTriageLog commit should have landed already). When this happens consistently we'll have to use real GAE logs to figure out what's wrong.

func (*JobTriageLog) Stale

func (j *JobTriageLog) Stale() bool

Stale is true if the engine thinks the log is stale.

It does it by comparing LastTriage to the job's LastTriage.

type ListInvocationsOpts

type ListInvocationsOpts struct {
	PageSize     int
	Cursor       string
	FinishedOnly bool
	ActiveOnly   bool
}

ListInvocationsOpts are passed to ListInvocations method.

Directories

Path Synopsis
Package dsset implements a particular flavor of datastore-backed set.
Package dsset implements a particular flavor of datastore-backed set.
Package policy contains implementation of triggering policy functions.
Package policy contains implementation of triggering policy functions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL