task

package
v0.0.0-...-9ab9249 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: Apache-2.0 Imports: 10 Imported by: 33

Documentation

Overview

Package task defines interface between Scheduler engine and implementations of particular tasks (such as URL fetch tasks, Swarming tasks, DM tasks, etc).

Its subpackages contain concrete realizations of Manager interface.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Controller

type Controller interface {
	ControllerReadOnly

	// State returns a mutable portion of task invocation state.
	//
	// TaskManager can modify it in-place and then call Controller.Save to persist
	// the changes. The state will also be saved by the engine automatically if
	// Manager doesn't call Save.
	State() *State

	// AddTimer sets up a new delayed call to Manager.HandleTimer.
	//
	// Timers are active as long as the invocation is not in one of the final
	// states. There is no way to cancel a timer (ignore HandleTimer call
	// instead).
	//
	// 'title' will be visible in logs, it should convey a purpose for this timer.
	// It doesn't have to be unique.
	//
	// 'payload' is any byte blob carried verbatim to Manager.HandleTimer.
	//
	// All timers are actually enabled in Save(), in the same transaction that
	// updates the job state.
	AddTimer(c context.Context, delay time.Duration, title string, payload []byte)

	// PrepareTopic create PubSub topic for notifications related to the task and
	// adds given publisher to its ACL.
	//
	// It returns full name of the topic and a token that will be used to
	// authenticate the PubSub message and bind it to the task the Controller is
	// operating on now. Topic name and its configuration are controlled by the
	// Engine. The publisher to the topic must put the token somewhere inside
	// the message. The engine will ask the task manager to extract the token
	// from the message via ExamineNotification, then it will validate the token
	// and eventually call HandleNotification.
	//
	// 'publisher' can be a service account email, or an URL to some luci service.
	// If URL is given, its /auth/api/v1/server/info endpoint will be used to
	// grab a corresponding service account name. All service that use luci auth
	// component expose this endpoint.
	PrepareTopic(c context.Context, publisher string) (topic string, token string, err error)

	// EmitTrigger delivers a given trigger to all jobs which are triggered by
	// current one.
	EmitTrigger(ctx context.Context, trigger *internal.Trigger)

	// Save updates the state of the task in the persistent store.
	//
	// It also schedules all pending timer ticks added via AddTimer.
	//
	// Will be called by the engine after it launches the task. May also be called
	// by the Manager itself, even multiple times (e.g. once to notify that the
	// task has started, a second time to notify it has finished).
	//
	// Returns error if it couldn't save the invocation state. It is fine to
	// ignore it. The engine will attempt to Save the invocation at the end anyway
	// and it will properly handle the error if it happens again.
	Save(c context.Context) error
}

Controller is passed to LaunchTask by the scheduler engine. It gives Manager control over one job invocation. Manager must not use it outside of LaunchTask. Controller implementation is generally not thread safe (but it's fine to use it from multiple goroutines if access is protected by a lock).

All methods that accept context.Context expect contexts derived from ones passed to 'Manager' methods. A derived context can be used to set custom deadlines for some potentially expensive methods like 'PrepareTopic'.

type ControllerReadOnly

type ControllerReadOnly interface {
	// JobID returns full job ID the controller is operating on.
	JobID() string

	// InvocationID returns unique identifier of this particular invocation.
	InvocationID() int64

	// RealmID returns the full realm ID of the job ("<project>:<realm>").
	RealmID() string

	// Request contains parameters of the invocation supplied when it was created.
	Request() Request

	// Task is proto message with task definition.
	//
	// It is guaranteed to have same underlying type as manager.ProtoMessageType()
	// return value.
	Task() proto.Message

	// DebugLog appends a line to the free form text log of the task.
	DebugLog(format string, args ...any)

	// GetClient returns http.Client that is configured to use job's service
	// account credentials to talk to other services.
	GetClient(c context.Context, opts ...auth.RPCOption) (*http.Client, error)
}

ControllerReadOnly is a subset of Controller interface with methods that do not mutate the job's state.

type Manager

type Manager interface {
	// Name returns task manager name. It identifies the corresponding kind
	// of tasks and used in various resource names (e.g. PubSub topic names).
	Name() string

	// ProtoMessageType returns a pointer to protobuf message struct that
	// describes config for the task kind, e.g. &UrlFetchTask{}. Will be used
	// only for its type signature.
	ProtoMessageType() proto.Message

	// Traits returns properties that influence how the scheduler engine manages
	// tasks handled by this Manager.
	//
	// See Traits struct for more details.
	Traits() Traits

	// ValidateProtoMessage verifies task definition proto message makes sense.
	// msg must have same underlying type as ProtoMessageType() return value.
	//
	// realmID is a full realm name (as "<project>:<realm>") of the job whose
	// definition is being validated. It is never empty, but may be a @legacy
	// realm.
	//
	// Errors are returned via validation.Context.
	ValidateProtoMessage(c *validation.Context, msg proto.Message, realmID string)

	// LaunchTask starts (or starts and finishes in one go) the task.
	//
	// Manager's responsibilities:
	//  * To move the task to some state other than StatusStarting
	//    (by changing ctl.State().Status). If at some point the task has moved
	//    to StatusRunning, the manager MUST setup some way to track the task's
	//    progress to eventually move it to some final state. It can be a status
	//    check via a timer (see `AddTimer` below), or a PubSub callback (see
	//    `PrepareTopic` below).
	//  * Be idempotent, if possible, using ctl.InvocationID() as an operation
	//    key.
	//  * Not to use supplied controller outside of LaunchTask call.
	//  * Not to use supplied controller concurrently without synchronization.
	//
	// If `LaunchTask` crashes before returning or returns a transient error, it
	// will be called again later, receiving exact same ctl.InvocationID().
	//
	// TaskManager may optionally use ctl.Save() to checkpoint progress and save
	// debug log. ctl.Save() is also implicitly called by the engine when
	// `LaunchTask` returns.
	LaunchTask(c context.Context, ctl Controller) error

	// AbortTask is called to opportunistically abort launched task.
	//
	// It is called right before the job is forcefully switched to a failed state.
	// The engine does not wait for the task runner to acknowledge this action.
	//
	// AbortTask must be idempotent since it may be called multiple times in case
	// of errors.
	AbortTask(c context.Context, ctl Controller) error

	// ExamineNotification is called to extract the auth token from the incoming
	// PubSub message.
	//
	// It should return an empty string if the message is unrecognized/malformed
	// or there's no auth token in it. Note that the PubSub message here is not
	// yet validated and can be a total garbage (or even be malicious).
	//
	// See PrepareTopic for more info.
	ExamineNotification(c context.Context, msg *pubsub.PubsubMessage) string

	// HandleNotification is called whenever engine receives a PubSub message sent
	// to a topic created with Controller.PrepareTopic. Expect duplicated and
	// out-of-order messages here. HandleNotification must be idempotent.
	//
	// Returns transient error to trigger a redeliver of the message, no error to
	// to acknowledge the message and fatal error to move the invocation to failed
	// state.
	//
	// Any modifications made to the invocation state will be saved regardless of
	// the return value (to save the debug log).
	HandleNotification(c context.Context, ctl Controller, msg *pubsub.PubsubMessage) error

	// HandleTimer is called to process timers set up by Controller.AddTimer.
	//
	// Expect duplicated or delayed events here. HandleTimer must be idempotent.
	//
	// Returns a transient error to trigger a redelivery of the event (the
	// invocation state won't be saved in this case), no error to acknowledge the
	// event and a fatal error to move the invocation to failed state.
	HandleTimer(c context.Context, ctl Controller, name string, payload []byte) error

	// GetDebugState returns debug info about the state persisted by the manager.
	GetDebugState(c context.Context, ctl ControllerReadOnly) (*internal.DebugManagerState, error)
}

Manager knows how to work with a particular kind of tasks (e.g. URL fetch tasks, Swarming tasks, etc): how to deserialize, validate and execute them.

Manager uses Controller to talk back to the scheduler engine.

type Request

type Request struct {
	// TriggeredBy contains ID of an end user that triggered this invocation (e.g
	// through UI or API) or an empty string if it was triggered by the engine or
	// it is a result of a multiple different triggers.
	//
	// Mostly FYI.
	TriggeredBy identity.Identity

	// IncomingTriggers is a list of all triggers consumed by this invocation.
	//
	// Already sorted by time they were emitted (oldest first).
	IncomingTriggers []*internal.Trigger

	// Properties are arbitrary key-value pairs derived from the triggers by the
	// triggering policy function and interpreted by the triggered task manager.
	Properties *structpb.Struct

	// Tags are arbitrary "<key>:<value>" pairs derived from the triggers by the
	// triggering policy function.
	//
	// Primarily used for indexing and correlation of jobs/invocations with
	// each other (including across different services). Task managers can pass
	// them down the stack.
	Tags []string

	// DebugLog is optional multi-line string to put in the invocation debug log
	// when it starts.
	//
	// It is used to report debug information (produced by the engine triggering
	// guts) to the invocation debug log (visible via UI).
	//
	// This field is used internally by the engine. Task managers will never see
	// it set.
	DebugLog string
}

Request contains parameters of the invocation supplied when it was created.

They are calculated from the pending triggers when the invocation is initiated.

func (*Request) LastTrigger

func (r *Request) LastTrigger() *internal.Trigger

LastTrigger is the most recent trigger from IncomingTriggers list or nil.

func (*Request) StringProperty

func (r *Request) StringProperty(k string) string

StringProperty returns a value of string property or "" if no such property or it has a different type.

This is useful in tests for asserts.

func (*Request) TriggerIDs

func (r *Request) TriggerIDs() []string

TriggerIDs extracts list of IDs from IncomingTriggers.

This is useful in tests for asserts.

type State

type State struct {
	Status   Status // overall status of the invocation, see the enum
	TaskData []byte // storage for TaskManager-specific task data
	ViewURL  string // URL to human readable task page, shows in UI
}

State is mutable portion of the task invocation state.

It can be mutated by TaskManager directly.

type Status

type Status string

Status is status of a single job invocation.

const (
	// StatusStarting means the task is about to start.
	StatusStarting Status = "STARTING"
	// StatusRetrying means the task was starting, but the launch failed in some
	// transient way. The start attempt is retried in this case a bunch of times,
	// until eventually the task moves into either StatusRunning or one of the
	// final states. The only possible transition into StatusRetrying is from
	// StatusStarting. A running task can only succeed or fail.
	StatusRetrying Status = "RETRYING"
	// StatusRunning means the task has started and is running now.
	StatusRunning Status = "RUNNING"
	// StatusSucceeded means the task finished with success.
	StatusSucceeded Status = "SUCCEEDED"
	// StatusFailed means the task finished with error or failed to start.
	StatusFailed Status = "FAILED"
	// StatusOverrun means the task should have been started, but previous one is
	// still running.
	StatusOverrun Status = "OVERRUN"
	// StatusAborted means the task was forcefully aborted (manually or due to
	// hard deadline).
	StatusAborted Status = "ABORTED"
)

func (Status) Final

func (s Status) Final() bool

Final returns true if Status represents some final status.

func (Status) Initial

func (s Status) Initial() bool

Initial returns true if Status is Starting or Retrying.

These statuses indicate an invocation before LaunchTask (perhaps, a retry of it) is finished with the invocation.

type Traits

type Traits struct {
	// Multistage is true if Manager uses Starting -> Running -> Finished
	// state chain for all invocations (instead of just Starting -> Finished).
	//
	// This is the case for "heavy" tasks that can run for undetermined amount
	// of time (e.g. Swarming and Buildbucket tasks). By switching invocation
	// state to Running, the Manager acknowledges that it takes responsibility for
	// eventually moving the invocation to Finished state (perhaps in response to
	// a PubSub notification or a timer tick). In other words, once an invocation
	// is in Running state, the schedule engine will not automatically keep track
	// of it's healthiness (it's the responsibility of the Manager now).
	//
	// For smaller tasks (that finish in seconds, e.g. gitiles poller tasks) it is
	// simpler and more efficient just to do everything in LaunchTask and then
	// move the invocation to Finished state. By doing so, the Manager avoids
	// implementing healthiness checks, piggybacking on LaunchTask retries
	// automatically performed by the scheduler engine.
	//
	// Currently this trait only influences the UI. Invocations with
	// Multistage == false don't show up as "Starting" in the UI (they are
	// displayed as "Running" instead, since it makes more sense from end-user
	// perspective).
	Multistage bool
}

Traits describes properties that influence how the scheduler engine manages tasks handled by this Manager.

Directories

Path Synopsis
Package buildbucket implements tasks that run Buildbucket jobs.
Package buildbucket implements tasks that run Buildbucket jobs.
pb
Package noop implements tasks that do nothing at all.
Package noop implements tasks that do nothing at all.
Package urlfetch implements tasks that just make HTTP calls.
Package urlfetch implements tasks that just make HTTP calls.
Package utils contains a bunch of small functions used by task/ subpackages.
Package utils contains a bunch of small functions used by task/ subpackages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL