Documentation
¶
Overview ¶
Package flamenco receives task updates from workers, queues them, and forwards them to the Flamenco Server.
Package flamenco periodically fetches new tasks from the Flamenco Server, and sends updates back.
Index ¶
- Constants
- Variables
- func CleanSlate(db *mgo.Database)
- func ConvertAndForward(images <-chan string, storagePath string) <-chan string
- func Count(coll *mgo.Collection) (int, error)
- func CreateTestTask(worker *Worker, conf *Conf, db *mgo.Database) (string, error)
- func DecodeJSON(w http.ResponseWriter, r io.Reader, document interface{}, logprefix string) error
- func Equal(a, b []string) bool
- func GetOrCreateMap(document bson.M, key string) bson.M
- func ImageWatcherHTTPPush(w http.ResponseWriter, r *http.Request, broadcaster *chantools.OneToManyChan)
- func IsRunnableTaskStatus(status string) bool
- func MaxInt(a, b int) int
- func MongoSession(config *Conf) *mgo.Session
- func ObjectIDFromRequest(w http.ResponseWriter, r *http.Request, variableName string) (bson.ObjectId, error)
- func PurgeOutgoingQueue(db *mgo.Database)
- func RegisterWorker(w http.ResponseWriter, r *http.Request, db *mgo.Database)
- func ReplaceLocal(strvalue string, config *Conf) string
- func ReplaceVariables(config *Conf, task *Task, worker *Worker)
- func SaveSettings(db *mgo.Database, settings *SettingsInMongo)
- func SendJSON(logprefix, method string, url *url.URL, payload interface{}, ...) error
- func ServeTaskLog(w http.ResponseWriter, r *http.Request, jobID, taskID bson.ObjectId, ...)
- func StoreNewWorker(winfo *Worker, db *mgo.Database) error
- func TemplatePathPrefix(fileToFind string) string
- func Timer(name string, sleepDuration, initialDelay time.Duration, closable *closable) <-chan struct{}
- func UtcNow() *time.Time
- func WorkerAckStatusChange(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, ...)
- func WorkerCount(db *mgo.Database) int
- func WorkerGetStatusChange(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database)
- func WorkerPingedTask(workerID bson.ObjectId, taskID bson.ObjectId, taskStatus string, ...)
- func WorkerSecret(user string, db *mgo.Database) string
- func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, ...)
- func WorkerSignOn(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, ...)
- type BlenderRenderConfig
- type Command
- type Conf
- type Dashboard
- type FileProduced
- type ImageWatcher
- type JobTask
- type LatestImageSystem
- type Lazyness
- type M
- type MayKeepRunningResponse
- type OpenRegAuth
- type PSKRegAuth
- type RegistrationAuth
- type ScheduleInfo
- type ScheduledTasks
- type SettingsInMongo
- type SleepScheduler
- func (ss *SleepScheduler) Close()
- func (ss *SleepScheduler) DeactivateSleepSchedule(worker *Worker, db *mgo.Database) error
- func (ss *SleepScheduler) Go()
- func (ss *SleepScheduler) RefreshAllWorkers()
- func (ss *SleepScheduler) RequestWorkerStatus(worker *Worker, db *mgo.Database)
- func (ss *SleepScheduler) SetSleepSchedule(worker *Worker, schedule ScheduleInfo, db *mgo.Database) error
- type StatusReport
- type Task
- type TaskCleaner
- type TaskLogUploader
- type TaskMetrics
- type TaskScheduler
- func (ts *TaskScheduler) ReturnTask(worker *Worker, logFields log.Fields, db *mgo.Database, task *Task, ...) error
- func (ts *TaskScheduler) ReturnTaskFromWorker(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, ...)
- func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.AuthenticatedRequest)
- func (ts *TaskScheduler) WorkerMayRunTask(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, ...)
- type TaskUpdate
- type TaskUpdatePusher
- type TaskUpdateQueue
- func (tuq *TaskUpdateQueue) LogTaskActivity(worker *Worker, task *Task, activity, logLine string, db *mgo.Database)
- func (tuq *TaskUpdateQueue) QueueTaskUpdate(task *Task, tupdate *TaskUpdate, db *mgo.Database) error
- func (tuq *TaskUpdateQueue) QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, ...)
- func (tuq *TaskUpdateQueue) QueueTaskUpdateWithExtra(task *Task, tupdate *TaskUpdate, db *mgo.Database, extraUpdates bson.M) error
- type TaskUpdateResponse
- type TestTasks
- type TimeOfDay
- func (ot TimeOfDay) Equals(other TimeOfDay) bool
- func (ot TimeOfDay) GetBSON() (interface{}, error)
- func (ot TimeOfDay) IsAfter(other TimeOfDay) bool
- func (ot TimeOfDay) IsBefore(other TimeOfDay) bool
- func (ot TimeOfDay) MarshalJSON() ([]byte, error)
- func (ot TimeOfDay) OnDate(date time.Time) time.Time
- func (ot *TimeOfDay) SetBSON(raw bson.Raw) error
- func (ot TimeOfDay) String() string
- func (ot *TimeOfDay) UnmarshalJSON(b []byte) error
- type TimeoutChecker
- type UpstreamConnection
- func (uc *UpstreamConnection) Close()
- func (uc *UpstreamConnection) KickDownloader(synchronous bool)
- func (uc *UpstreamConnection) RefetchTask(task *Task) bool
- func (uc *UpstreamConnection) ResolveURL(relativeURL string, a ...interface{}) (*url.URL, error)
- func (uc *UpstreamConnection) SendJSON(logprefix, method string, url *url.URL, payload interface{}, ...) error
- func (uc *UpstreamConnection) SendTaskUpdates(updates []TaskUpdate) (*TaskUpdateResponse, error)
- type UpstreamNotification
- type UpstreamNotifier
- type Worker
- func (worker *Worker) AckStatusChange(newStatus string, db *mgo.Database) error
- func (worker *Worker) AckTimeout(db *mgo.Database) error
- func (worker *Worker) Identifier() string
- func (worker *Worker) RequestStatusChange(newStatus string, lazy Lazyness, db *mgo.Database) error
- func (worker *Worker) Seen(r *http.Request, db *mgo.Database)
- func (worker *Worker) SeenEx(r *http.Request, db *mgo.Database, set bson.M, unset bson.M) error
- func (worker *Worker) SetAwake(db *mgo.Database) error
- func (worker *Worker) SetCurrentTask(taskID bson.ObjectId, db *mgo.Database) error
- func (worker *Worker) SetStatus(status string, db *mgo.Database) error
- func (worker *Worker) Timeout(db *mgo.Database, scheduler *TaskScheduler)
- func (worker *Worker) TimeoutOnTask(task *Task, db *mgo.Database, scheduler *TaskScheduler)
- type WorkerBlacklist
- func (wbl *WorkerBlacklist) Add(workerID bson.ObjectId, task *Task) error
- func (wbl *WorkerBlacklist) BlacklistForWorker(workerID bson.ObjectId) M
- func (wbl *WorkerBlacklist) EnsureDBIndices()
- func (wbl *WorkerBlacklist) RemoveLine(workerID bson.ObjectId, jobID bson.ObjectId, taskType string) error
- func (wbl *WorkerBlacklist) WorkersLeft(jobID bson.ObjectId, taskType string) map[bson.ObjectId]bool
- type WorkerBlacklistEntry
- type WorkerRef
- type WorkerRegistration
- type WorkerRemover
- type WorkerSignonDoc
- type WorkerStatus
Constants ¶
const IsoFormat = "2006-01-02T15:04:05-0700"
IsoFormat is used for timestamp parsing
Variables ¶
var ( // ErrDuplicateVariables is returned when the same name is used as regular and path-replacement variable. ErrDuplicateVariables = errors.New("duplicate variables found") )
Functions ¶
func CleanSlate ¶
CleanSlate erases all tasks in the flamenco_tasks collection.
func ConvertAndForward ¶
ConvertAndForward copies each image it reads from 'images', converts it to a browser- friendly file, and forwards the new filename to the returned channel. It always converts to JPEG, even when the file is a browser-supported format (like PNG), so that the HTML can always refer to /static/latest-image.jpg to show the latest render.
func Count ¶
func Count(coll *mgo.Collection) (int, error)
Count returns the number of documents in the given collection.
func CreateTestTask ¶
CreateTestTask constructs a Manager-local test task and queues it for the worker to pick up.
func DecodeJSON ¶
DecodeJSON decodes JSON from an io.Reader, and writes a Bad Request status if it fails.
func Equal ¶
Equal tells whether a and b contain the same elements. A nil argument is equivalent to an empty slice.
func GetOrCreateMap ¶
GetOrCreateMap returns document[key] as bson.M, creating it if necessary.
func ImageWatcherHTTPPush ¶
func ImageWatcherHTTPPush(w http.ResponseWriter, r *http.Request, broadcaster *chantools.OneToManyChan)
ImageWatcherHTTPPush starts a server-side events channel.
func IsRunnableTaskStatus ¶
IsRunnableTaskStatus returns whether the given status is considered "runnable".
func MongoSession ¶
MongoSession returns a MongoDB session.
The database name should be configured in the database URL. You can use this default database using session.DB("").
func ObjectIDFromRequest ¶
func ObjectIDFromRequest(w http.ResponseWriter, r *http.Request, variableName string) (bson.ObjectId, error)
ObjectIDFromRequest parses the request variable value as Object ID.
func PurgeOutgoingQueue ¶
PurgeOutgoingQueue erases all queued task updates from the local DB
func RegisterWorker ¶
RegisterWorker creates a new Worker in the DB, based on the WorkerRegistration document received.
func ReplaceLocal ¶
ReplaceLocal performs variable and path replacement for strings based on the local platform.
func ReplaceVariables ¶
ReplaceVariables performs variable and path replacement for tasks.
func SaveSettings ¶
func SaveSettings(db *mgo.Database, settings *SettingsInMongo)
SaveSettings stores the given settings in MongoDB.
func SendJSON ¶
func SendJSON(logprefix, method string, url *url.URL, payload interface{}, tweakrequest func(req *http.Request), responsehandler func(resp *http.Response, body []byte) error, ) error
SendJSON sends a JSON document to some URL via HTTP. :param tweakrequest: can be used to tweak the request before sending it, for
example by adding authentication headers. May be nil.
:param responsehandler: is called when a non-error response has been read.
May be nil.
func ServeTaskLog ¶
func ServeTaskLog(w http.ResponseWriter, r *http.Request, jobID, taskID bson.ObjectId, tuq *TaskUpdateQueue)
ServeTaskLog serves the latest task log file for the given job+task. Depending on the User-Agent header it servers head+tail or the entire file.
func StoreNewWorker ¶
StoreNewWorker saves the given worker in the database.
func TemplatePathPrefix ¶
TemplatePathPrefix returns the filename prefix to find template files. Templates are searched for relative to the current working directory as well as relative to the currently running executable.
func Timer ¶
func Timer(name string, sleepDuration, initialDelay time.Duration, closable *closable) <-chan struct{}
Timer is a generic timer for periodic signals.
func WorkerAckStatusChange ¶
func WorkerAckStatusChange(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, ackStatus string)
WorkerAckStatusChange allows a Worker to acknowledge a requested status change.
func WorkerCount ¶
WorkerCount returns the number of registered workers.
func WorkerGetStatusChange ¶
func WorkerGetStatusChange(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database)
WorkerGetStatusChange allows a Worker to fetch any pending status change.
func WorkerPingedTask ¶
func WorkerPingedTask(workerID bson.ObjectId, taskID bson.ObjectId, taskStatus string, db *mgo.Database)
WorkerPingedTask marks the task as pinged by the worker. If worker_id is not nil, sets the worker_id field of the task. Otherwise doesn't touch that field and only updates last_worker_ping.
func WorkerSecret ¶
WorkerSecret returns the hashed secret of the worker.
func WorkerSignOff ¶
func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, scheduler *TaskScheduler)
WorkerSignOff re-queues all active tasks (should be only one) that are assigned to this worker.
func WorkerSignOn ¶
func WorkerSignOn(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, notifier *UpstreamNotifier)
WorkerSignOn allows a Worker to register a new list of supported task types. It also clears the worker's "current" task from the dashboard, so that it's clear that the now-active worker is not actually working on that task.
Types ¶
type BlenderRenderConfig ¶
type BlenderRenderConfig struct { JobStorage string `yaml:"job_storage"` RenderOutput string `yaml:"render_output"` }
BlenderRenderConfig represents the configuration required for a test render.
type Command ¶
type Command struct { Name string `bson:"name" json:"name"` Settings bson.M `bson:"settings" json:"settings"` }
Command is an executable part of a Task
type Conf ¶
type Conf struct { Mode string `yaml:"mode"` // either "develop" or "production" ManagerName string `yaml:"manager_name"` DatabaseURL string `yaml:"database_url"` DatabasePath string `yaml:"database_path"` TaskLogsPath string `yaml:"task_logs_path"` Listen string `yaml:"listen"` OwnURL string `yaml:"own_url"` FlamencoStr string `yaml:"flamenco"` Flamenco *url.URL `yaml:"-"` ManagerID string `yaml:"manager_id"` ManagerSecret string `yaml:"manager_secret"` TLSKey string `yaml:"tlskey"` TLSCert string `yaml:"tlscert"` DownloadTaskSleep time.Duration `yaml:"download_task_sleep"` /* The number of seconds between rechecks when there are no more tasks for workers. * If set to 0, will not throttle at all. * If set to -1, will never check when a worker asks for a task (so only every * download_task_sleep_seconds seconds). */ DownloadTaskRecheckThrottle time.Duration `yaml:"download_task_recheck_throttle"` /* Variables, stored differently in YAML and these settings. * Variables: variable name -> platform -> value * VariablesPerPlatform: platform -> variable name -> value */ VariablesByVarname map[string]map[string]string `yaml:"variables"` VariablesByPlatform map[string]map[string]string `yaml:"-"` PathReplacementByVarname map[string]map[string]string `yaml:"path_replacement"` PathReplacementByPlatform map[string]map[string]string `yaml:"-"` TaskUpdatePushMaxInterval time.Duration `yaml:"task_update_push_max_interval"` TaskUpdatePushMaxCount int `yaml:"task_update_push_max_count"` CancelTaskFetchInterval time.Duration `yaml:"cancel_task_fetch_max_interval"` ActiveTaskTimeoutInterval time.Duration `yaml:"active_task_timeout_interval"` ActiveWorkerTimeoutInterval time.Duration `yaml:"active_worker_timeout_interval"` TaskCleanupMaxAge time.Duration `yaml:"task_cleanup_max_age"` WorkerCleanupMaxAge time.Duration `yaml:"worker_cleanup_max_age"` WorkerCleanupStatus []string `yaml:"worker_cleanup_status"` /* This many failures (on a given job+task type combination) will ban a worker * from that task type on that job. */ BlacklistThreshold int `yaml:"blacklist_threshold"` // When this many workers have tried the task and failed, it will be hard-failed // (even when there are workers left that could technically retry the task). TaskFailAfterSoftFailCount int `yaml:"task_fail_after_softfail_count"` WatchForLatestImage string `yaml:"watch_for_latest_image"` SSDPDiscovery bool `yaml:"ssdp_discovery"` SSDPDeviceUUID string `yaml:"ssdp_device_uuid"` TestTasks TestTasks `yaml:"test_tasks"` // Shaman configuration settings. Shaman shamanconfig.Config `yaml:"shaman"` // Authentication settings. JWT jwtauth.Config `yaml:"user_authentication"` WorkerRegistrationSecret string `yaml:"worker_registration_secret"` }
Conf represents the Manager's configuration file.
func GetTestConfig ¶
func GetTestConfig() Conf
GetTestConfig returns the configuration for unit tests.
func (*Conf) OverrideMode ¶
OverrideMode checks the mode parameter for validity and logs that it's being overridden.
type Dashboard ¶
type Dashboard struct { // Set by main.go RestartFunction func() // contains filtered or unexported fields }
Dashboard can show HTML and JSON reports.
func CreateDashboard ¶
func CreateDashboard(config *Conf, session *mgo.Session, sleeper *SleepScheduler, blacklist *WorkerBlacklist, flamencoVersion string, ) *Dashboard
CreateDashboard creates a new Dashboard object.
type FileProduced ¶
type FileProduced struct {
Paths []string `json:"paths"`
}
FileProduced is sent by the worker whenever it produces (e.g. renders) some file. This hooks into the fswatcher system.
type ImageWatcher ¶
type ImageWatcher struct { // The public channel, from which can only be read. ImageCreated <-chan string // contains filtered or unexported fields }
ImageWatcher watches a filesystem directory.
func CreateImageWatcher ¶
func CreateImageWatcher(pathToWatch string, bufferSize int) *ImageWatcher
CreateImageWatcher creates a new ImageWatcher for the given directory. bufferSize is the size of the iw.ImageCreated channel.
type LatestImageSystem ¶
type LatestImageSystem struct {
// contains filtered or unexported fields
}
LatestImageSystem ties an ImageWatcher to a the fswatcher_middle and fswatcher_http stuff, allowing the results to be pushed via HTTP to browsers.
func CreateLatestImageSystem ¶
func CreateLatestImageSystem(watchPath string) *LatestImageSystem
CreateLatestImageSystem sets up a LatestImageSystem
func (*LatestImageSystem) AddRoutes ¶
func (lis *LatestImageSystem) AddRoutes( router *mux.Router, workerAuth *auth.BasicAuth, userAuth jwtauth.Authenticator, )
AddRoutes adds the HTTP Server-Side Events endpoint to the router.
func (*LatestImageSystem) Close ¶
func (lis *LatestImageSystem) Close()
Close gracefully shuts down the image watcher, if the path to watch isn't empty.
func (*LatestImageSystem) Go ¶
func (lis *LatestImageSystem) Go()
Go starts the image watcher, if the path to watch isn't empty.
type Lazyness ¶
type Lazyness bool
Lazyness indicates whether a worker's requested status change is lazy (true) or immediate (false).
type MayKeepRunningResponse ¶
type MayKeepRunningResponse struct { MayKeepRunning bool `json:"may_keep_running"` Reason string `json:"reason,omitempty"` // For controlling sleeping & waking up. For values, see the workerStatusXXX constants. StatusRequested string `json:"status_requested,omitempty"` }
MayKeepRunningResponse is sent to workers to indicate whether they can keep running their task.
type OpenRegAuth ¶
type OpenRegAuth struct{}
OpenRegAuth allows anybody to register as a Worker.
func (OpenRegAuth) Wrap ¶
func (ora OpenRegAuth) Wrap(handler http.Handler) http.Handler
Wrap does not do anything.
func (OpenRegAuth) WrapFunc ¶
func (ora OpenRegAuth) WrapFunc(handlerFunc func(w http.ResponseWriter, r *http.Request)) http.Handler
WrapFunc does not do anything.
type PSKRegAuth ¶
type PSKRegAuth struct {
// contains filtered or unexported fields
}
PSKRegAuth authorises based on JWT tokens signed with HMAC + a pre-shared key.
func (*PSKRegAuth) Wrap ¶
func (pra *PSKRegAuth) Wrap(handler http.Handler) http.Handler
Wrap requires that the request is authenticated with the proper JWT bearer token.
func (*PSKRegAuth) WrapFunc ¶
func (pra *PSKRegAuth) WrapFunc(handlerFunc func(w http.ResponseWriter, r *http.Request)) http.Handler
WrapFunc requires that the request is authenticated with the proper JWT bearer token.
type RegistrationAuth ¶
type RegistrationAuth interface { Wrap(handler http.Handler) http.Handler WrapFunc(handlerFunc func(w http.ResponseWriter, r *http.Request)) http.Handler }
RegistrationAuth is the interface for all worker registration authorisers.
func NewWorkerRegistrationAuthoriser ¶
func NewWorkerRegistrationAuthoriser(config *Conf) RegistrationAuth
NewWorkerRegistrationAuthoriser creates a new RegistrationAuth. If a pre-shared secret key is configured, creates a PSKRegAuth, otherwise creates an OpenRegAuth.
type ScheduleInfo ¶
type ScheduleInfo struct { ScheduleActive bool `bson:"schedule_active" json:"schedule_active"` // Space-separated two-letter strings indicating days of week the schedule is active. // Empty means "every day". DaysOfWeek string `bson:"days_of_week,omitempty" json:"days_of_week,omitempty"` // Start and end time of the day at which the schedule is active. // Applies only when today is in DaysOfWeek, or when DaysOfWeek is empty. // No 'time_' prefix for BSON as it already serialises {time: "15:04:05"}. TimeStart *TimeOfDay `bson:"start,omitempty" json:"time_start,omitempty"` TimeEnd *TimeOfDay `bson:"end,omitempty" json:"time_end,omitempty"` NextCheck *time.Time `bson:"next_check,omitempty" json:"next_check,omitempty"` }
ScheduleInfo for automatically sending a Worker to sleep & waking up.
type ScheduledTasks ¶
type ScheduledTasks struct {
Depsgraph []Task `json:"depsgraph"`
}
ScheduledTasks contains a dependency graph response from Server.
type SettingsInMongo ¶
type SettingsInMongo struct {
DepsgraphLastModified *string `bson:"depsgraph_last_modified"`
}
SettingsInMongo contains settings we want to be able to update from within Flamenco Manager itself, so those are stored in MongoDB.
func GetSettings ¶
func GetSettings(db *mgo.Database) *SettingsInMongo
GetSettings returns the settings as saved in our MongoDB.
type SleepScheduler ¶
type SleepScheduler struct {
// contains filtered or unexported fields
}
SleepScheduler manages wake/sleep cycles of Workers.
func CreateSleepScheduler ¶
func CreateSleepScheduler(session *mgo.Session) *SleepScheduler
CreateSleepScheduler creates a new SleepScheduler.
func (*SleepScheduler) Close ¶
func (ss *SleepScheduler) Close()
Close gracefully shuts down the sleep scheduler goroutine.
func (*SleepScheduler) DeactivateSleepSchedule ¶
func (ss *SleepScheduler) DeactivateSleepSchedule(worker *Worker, db *mgo.Database) error
DeactivateSleepSchedule deactivates the worker's sleep schedule.
func (*SleepScheduler) Go ¶
func (ss *SleepScheduler) Go()
Go starts a new goroutine to perform the periodic checking of the schedule.
func (*SleepScheduler) RefreshAllWorkers ¶
func (ss *SleepScheduler) RefreshAllWorkers()
RefreshAllWorkers updates the status of all workers for which a schedule is active.
func (*SleepScheduler) RequestWorkerStatus ¶
func (ss *SleepScheduler) RequestWorkerStatus(worker *Worker, db *mgo.Database)
RequestWorkerStatus sets worker.StatusRequested if the scheduler demands a status change.
func (*SleepScheduler) SetSleepSchedule ¶
func (ss *SleepScheduler) SetSleepSchedule(worker *Worker, schedule ScheduleInfo, db *mgo.Database) error
SetSleepSchedule stores the given schedule as the worker's new sleep schedule and applies it. Updates both the Worker object itself and the Mongo database. Instantly requests a new status for the worker according to the schedule.
type StatusReport ¶
type StatusReport struct { NrOfWorkers int `json:"nr_of_workers"` NrOfTasks int `json:"nr_of_tasks"` UpstreamQueueSize int `json:"upstream_queue_size"` Version string `json:"version"` Workers []Worker `json:"workers"` ManagerName string `json:"manager_name"` ManagerMode string `json:"manager_mode"` // either "develop" or "production", see settings.go Conf.Mode. Server struct { Name string `json:"name"` URL string `json:"url"` } `json:"server"` }
StatusReport is sent in response to a query on the / URL.
type Task ¶
type Task struct { ID bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"` Etag string `bson:"_etag,omitempty" json:"_etag,omitempty"` Job bson.ObjectId `bson:"job,omitempty" json:"job"` Manager bson.ObjectId `bson:"manager,omitempty" json:"manager"` Project bson.ObjectId `bson:"project,omitempty" json:"project"` User bson.ObjectId `bson:"user,omitempty" json:"user"` Name string `bson:"name" json:"name"` Status string `bson:"status" json:"status"` Priority int `bson:"priority" json:"priority"` JobPriority int `bson:"job_priority" json:"job_priority"` JobType string `bson:"job_type" json:"job_type"` TaskType string `bson:"task_type" json:"task_type"` Commands []Command `bson:"commands" json:"commands"` Log string `bson:"log,omitempty" json:"log,omitempty"` Activity string `bson:"activity,omitempty" json:"activity,omitempty"` Parents []bson.ObjectId `bson:"parents,omitempty" json:"parents,omitempty"` Worker string `bson:"worker,omitempty" json:"worker,omitempty"` FailedByWorkers []WorkerRef `bson:"failed_by_workers,omitempty" json:"failed_by_workers,omitempty"` // Workers who tried this task and failed. Metrics *TaskMetrics `bson:"metrics,omitempty" json:"metrics,omitempty"` // Internal bookkeeping WorkerID *bson.ObjectId `bson:"worker_id,omitempty" json:"-"` // The worker assigned to this task. LastWorkerPing *time.Time `bson:"last_worker_ping,omitempty" json:"-"` // When a worker last said it was working on this. Might not have been a task update. LastUpdated *time.Time `bson:"last_updated,omitempty" json:"-"` // when we have last seen an update. }
Task contains a Flamenco task, with some BSON-only fields for local Manager use.
type TaskCleaner ¶
type TaskCleaner struct {
// contains filtered or unexported fields
}
TaskCleaner periodically deletes tasks that haven't been touched in a long time.
func CreateTaskCleaner ¶
func CreateTaskCleaner(config *Conf, session *mgo.Session) *TaskCleaner
CreateTaskCleaner creates a new TaskCleaner with default timings.
func (*TaskCleaner) Close ¶
func (tc *TaskCleaner) Close()
Close gracefully shuts down the task timeout checker goroutine.
func (*TaskCleaner) Go ¶
func (tc *TaskCleaner) Go()
Go starts a new goroutine to perform the periodic checking.
type TaskLogUploader ¶
TaskLogUploader sends compressed task log files to Flamenco Server.
The task IDs are queued first. If the tuple is already queued, queueing is a no-op, even when uploading is already in progress. This allows the Server to maintain the queue of to-be-uploaded task logs; we don't have to persist anything to disk.
func CreateTaskLogUploader ¶
func CreateTaskLogUploader(config *Conf, upstream *UpstreamConnection) *TaskLogUploader
CreateTaskLogUploader creates a new TaskLogUploader.
func (*TaskLogUploader) Close ¶
func (tlu *TaskLogUploader) Close()
Close gracefully shuts down the task uploader goroutine.
func (*TaskLogUploader) Go ¶
func (tlu *TaskLogUploader) Go()
Go starts a goroutine that monitors the queue and uploads task logs.
func (*TaskLogUploader) QueueAll ¶
func (tlu *TaskLogUploader) QueueAll(jobTasks []JobTask)
QueueAll places all (Job ID, Task ID) tuples on the queue for uploading later. This function will keep getting called with tasks until those tasks have had their logfile uploaded to the Server.
type TaskMetrics ¶
type TaskMetrics struct {
Timing map[string]float64 `bson:"timing,omitempty" json:"timing,omitempty"`
}
TaskMetrics contains metrics on a specific task, such as timing information.
type TaskScheduler ¶
type TaskScheduler struct {
// contains filtered or unexported fields
}
TaskScheduler offers tasks to Workers when they ask for them.
func CreateTaskScheduler ¶
func CreateTaskScheduler(config *Conf, upstream *UpstreamConnection, session *mgo.Session, queue *TaskUpdateQueue, blacklist *WorkerBlacklist, pusher *TaskUpdatePusher, ) *TaskScheduler
CreateTaskScheduler constructs a new TaskScheduler, including private fields.
func (*TaskScheduler) ReturnTask ¶
func (ts *TaskScheduler) ReturnTask(worker *Worker, logFields log.Fields, db *mgo.Database, task *Task, reasonForReturn string) error
ReturnTask lets a Worker return its tasks to the queue, for execution by another worker.
func (*TaskScheduler) ReturnTaskFromWorker ¶
func (ts *TaskScheduler) ReturnTaskFromWorker(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, taskID bson.ObjectId)
ReturnTaskFromWorker is the HTTP interface for workers to return a specific task to the queue.
func (*TaskScheduler) ScheduleTask ¶
func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.AuthenticatedRequest)
ScheduleTask assigns a task to a worker.
func (*TaskScheduler) WorkerMayRunTask ¶
func (ts *TaskScheduler) WorkerMayRunTask(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, taskID bson.ObjectId)
WorkerMayRunTask tells the worker whether it's allowed to keep running the given task.
type TaskUpdate ¶
type TaskUpdate struct { ID bson.ObjectId `bson:"_id" json:"_id"` TaskID bson.ObjectId `bson:"task_id" json:"task_id,omitempty"` TaskStatus string `bson:"task_status,omitempty" json:"task_status,omitempty"` ReceivedOnManager time.Time `bson:"received_on_manager" json:"received_on_manager"` Activity string `bson:"activity,omitempty" json:"activity,omitempty"` TaskProgressPercentage int `bson:"task_progress_percentage" json:"task_progress_percentage"` CurrentCommandIdx int `bson:"current_command_idx" json:"current_command_idx"` CommandProgressPercentage int `bson:"command_progress_percentage" json:"command_progress_percentage"` Log string `bson:"log,omitempty" json:"log,omitempty"` // for appending to Server-side log LogTail string `bson:"log_tail,omitempty" json:"log_tail,omitempty"` // for overwriting on Server-side task Worker string `bson:"worker" json:"worker"` FailedByWorkers []WorkerRef `bson:"failed_by_workers,omitempty" json:"failed_by_workers,omitempty"` // Workers who tried this task and failed. Metrics *TaskMetrics `bson:"metrics,omitempty" json:"metrics,omitempty"` // contains filtered or unexported fields }
TaskUpdate is both sent from Worker to Manager, as well as from Manager to Server.
type TaskUpdatePusher ¶
type TaskUpdatePusher struct {
// contains filtered or unexported fields
}
TaskUpdatePusher pushes queued task updates to the Flamenco Server.
func CreateTaskUpdatePusher ¶
func CreateTaskUpdatePusher( config *Conf, upstream *UpstreamConnection, session *mgo.Session, queue *TaskUpdateQueue, taskLogUploader *TaskLogUploader, ) *TaskUpdatePusher
CreateTaskUpdatePusher creates a new task update pusher that runs in a separate goroutine.
func (*TaskUpdatePusher) Close ¶
func (pusher *TaskUpdatePusher) Close()
Close closes the task update pusher by stopping all timers & goroutines.
func (*TaskUpdatePusher) Kick ¶
func (pusher *TaskUpdatePusher) Kick()
Kick forces a task update push.
type TaskUpdateQueue ¶
type TaskUpdateQueue struct {
// contains filtered or unexported fields
}
TaskUpdateQueue queues task updates for later pushing, and writes log files to disk.
func CreateTaskUpdateQueue ¶
func CreateTaskUpdateQueue(config *Conf, blacklist *WorkerBlacklist) *TaskUpdateQueue
CreateTaskUpdateQueue creates a new TaskUpdateQueue.
func (*TaskUpdateQueue) LogTaskActivity ¶
func (tuq *TaskUpdateQueue) LogTaskActivity(worker *Worker, task *Task, activity, logLine string, db *mgo.Database)
LogTaskActivity creates and queues a TaskUpdate to store activity and a log line.
func (*TaskUpdateQueue) QueueTaskUpdate ¶
func (tuq *TaskUpdateQueue) QueueTaskUpdate(task *Task, tupdate *TaskUpdate, db *mgo.Database) error
QueueTaskUpdate queues the task update, without any extra updates.
func (*TaskUpdateQueue) QueueTaskUpdateFromWorker ¶
func (tuq *TaskUpdateQueue) QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, taskID bson.ObjectId)
QueueTaskUpdateFromWorker receives a task update from a worker, and queues it for sending to Flamenco Server.
func (*TaskUpdateQueue) QueueTaskUpdateWithExtra ¶
func (tuq *TaskUpdateQueue) QueueTaskUpdateWithExtra(task *Task, tupdate *TaskUpdate, db *mgo.Database, extraUpdates bson.M) error
QueueTaskUpdateWithExtra does the same as QueueTaskUpdate(), but with extra updates to the local flamenco_tasks collection.
type TaskUpdateResponse ¶
type TaskUpdateResponse struct { ModifiedCount int `json:"modified_count"` HandledUpdateIds []bson.ObjectId `json:"handled_update_ids,omitempty"` CancelTasksIds []bson.ObjectId `json:"cancel_task_ids,omitempty"` // Job/Task IDs for which we should send the task log to the Server. UploadTaskFileQueue []JobTask `json:"upload_task_file_queue,omitempty"` }
TaskUpdateResponse is received from Server.
type TestTasks ¶
type TestTasks struct {
BlenderRender BlenderRenderConfig `yaml:"test_blender_render"`
}
TestTasks represents the 'test_tasks' key in the Manager's configuration file.
type TimeOfDay ¶
TimeOfDay is marshalled as 'HH:MM'. Its date and timezone components are ignored, and the time is supposed to be interpreted as local time on any date (f.e. a scheduled sleep time of some Worker on a certain day-of-week & local timezone).
func MakeTimeOfDay ¶
MakeTimeOfDay converts a time.Time into a TimeOfDay.
func (TimeOfDay) IsAfter ¶
IsAfter returns True iff ot is after other. Ignores everything except hour and minute fields.
func (TimeOfDay) IsBefore ¶
IsBefore returns True iff ot is before other. Ignores everything except hour and minute fields.
func (TimeOfDay) MarshalJSON ¶
MarshalJSON turns a time.Time instance into a "HH:MM" string.
func (*TimeOfDay) UnmarshalJSON ¶
UnmarshalJSON turns a "HH:MM" string into a time.Time instance.
type TimeoutChecker ¶
type TimeoutChecker struct {
// contains filtered or unexported fields
}
TimeoutChecker periodically times out tasks and workers if the worker hasn't sent any update recently.
func CreateTimeoutChecker ¶
func CreateTimeoutChecker(config *Conf, session *mgo.Session, queue *TaskUpdateQueue, scheduler *TaskScheduler) *TimeoutChecker
CreateTimeoutChecker creates a new TimeoutChecker.
func (*TimeoutChecker) Close ¶
func (ttc *TimeoutChecker) Close()
Close gracefully shuts down the task timeout checker goroutine.
func (*TimeoutChecker) Go ¶
func (ttc *TimeoutChecker) Go()
Go starts a new goroutine to perform the periodic checking.
type UpstreamConnection ¶
type UpstreamConnection struct {
// contains filtered or unexported fields
}
UpstreamConnection represents a connection to an upstream Flamenco Server.
func ConnectUpstream ¶
func ConnectUpstream(config *Conf, session *mgo.Session) *UpstreamConnection
ConnectUpstream creates a new UpstreamConnection object and starts the task download loop.
func (*UpstreamConnection) Close ¶
func (uc *UpstreamConnection) Close()
Close gracefully closes the upstream connection by stopping all upload/download loops.
func (*UpstreamConnection) KickDownloader ¶
func (uc *UpstreamConnection) KickDownloader(synchronous bool)
KickDownloader fetches new tasks from the Flamenco Server.
func (*UpstreamConnection) RefetchTask ¶
func (uc *UpstreamConnection) RefetchTask(task *Task) bool
RefetchTask re-fetches a task from the Server, but only if its etag changed.
- If the etag changed, the differences between the old and new status are handled.
- If the Server cannot be reached, this error is ignored and the task is untouched.
- If the Server returns an error code other than 500 Internal Server Error, (Forbidden, Not Found, etc.) the task is removed from the local task queue.
If the task was untouched, this function returns false. If it was changed or removed, this function return true.
func (*UpstreamConnection) ResolveURL ¶
func (uc *UpstreamConnection) ResolveURL(relativeURL string, a ...interface{}) (*url.URL, error)
ResolveURL returns the given URL relative to the base URL of the upstream server, as absolute URL.
func (*UpstreamConnection) SendJSON ¶
func (uc *UpstreamConnection) SendJSON(logprefix, method string, url *url.URL, payload interface{}, responsehandler func(resp *http.Response, body []byte) error, ) error
SendJSON sends a JSON document to the given URL.
func (*UpstreamConnection) SendTaskUpdates ¶
func (uc *UpstreamConnection) SendTaskUpdates(updates []TaskUpdate) (*TaskUpdateResponse, error)
SendTaskUpdates performs a POST to /api/flamenco/managers/{manager-id}/task-update-batch to send a batch of task updates to the Server.
type UpstreamNotification ¶
type UpstreamNotification struct { // Settings ManagerURL string `json:"manager_url"` VariablesByVarname map[string]map[string]string `json:"variables"` PathReplacementByVarname map[string]map[string]string `json:"path_replacement"` // From our local database NumberOfWorkers int `json:"nr_of_workers"` WorkerTaskTypes []string `json:"worker_task_types"` }
UpstreamNotification sent to upstream Flamenco Server upon startup and when workers change their task types. This is a combination of settings (see settings.go) and information from the database.
type UpstreamNotifier ¶
type UpstreamNotifier struct {
// contains filtered or unexported fields
}
UpstreamNotifier sends a signal to Flamenco Server that we've started or changed configuration.
func CreateUpstreamNotifier ¶
func CreateUpstreamNotifier(config *Conf, upstream *UpstreamConnection, session *mgo.Session) *UpstreamNotifier
CreateUpstreamNotifier creates a new notifier.
func (*UpstreamNotifier) Close ¶
func (un *UpstreamNotifier) Close()
Close performs a clean shutdown.
func (*UpstreamNotifier) SendStartupNotification ¶
func (un *UpstreamNotifier) SendStartupNotification()
SendStartupNotification sends a StartupNotification document to upstream Flamenco Server. Keeps trying in a goroutine until the notification was succesful.
func (*UpstreamNotifier) SendTaskTypesNotification ¶
func (un *UpstreamNotifier) SendTaskTypesNotification()
SendTaskTypesNotification sends a StartupNotification document to upstream Flamenco Server. Keeps trying in a goroutine until the notification was succesful.
type Worker ¶
type Worker struct { ID bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"` Secret string `bson:"-" json:"-"` HashedSecret []byte `bson:"hashed_secret" json:"-"` Nickname string `bson:"nickname" json:"nickname"` Address string `bson:"address" json:"address"` Status string `bson:"status" json:"status"` Platform string `bson:"platform" json:"platform"` CurrentTask *bson.ObjectId `bson:"current_task,omitempty" json:"current_task,omitempty"` TimeCost int `bson:"time_cost" json:"time_cost"` LastActivity *time.Time `bson:"last_activity,omitempty" json:"last_activity,omitempty"` SupportedTaskTypes []string `bson:"supported_task_types" json:"supported_task_types"` Software string `bson:"software" json:"software"` // For dashboard CurrentTaskStatus string `bson:"current_task_status,omitempty" json:"current_task_status,omitempty"` CurrentTaskUpdated *time.Time `bson:"current_task_updated,omitempty" json:"current_task_updated,omitempty"` CurrentJob bson.ObjectId `bson:"current_job,omitempty" json:"current_job,omitempty"` // For controlling sleeping & waking up. For values, see the workerStatusXXX constants. StatusRequested string `bson:"status_requested" json:"status_requested"` LazyStatusRequest Lazyness `bson:"lazy_status_request" json:"lazy_status_request"` // Only apply requested status when current task is finished. SleepSchedule ScheduleInfo `bson:"sleep_schedule,omitempty" json:"sleep_schedule"` // For preventing a failing worker from eating up all tasks of a certain job. Blacklist []WorkerBlacklistEntry `json:"blacklist,omitempty"` }
Worker contains all information about a specific Worker. Some fields come from the WorkerRegistration, whereas others are filled by us.
func FindWorker ¶
FindWorker returns the worker given its ID in string form.
func FindWorkerByID ¶
FindWorkerByID returns the entire worker, no projections.
func (*Worker) AckStatusChange ¶
AckStatusChange acknowledges the requested status change by moving it to the actual status. Only the "shutdown" status should not be acknowledged, but just result in a signoff and thus directly go to "offline" state.
func (*Worker) AckTimeout ¶
AckTimeout acknowledges the timeout and just sets the worker to "offline".
func (*Worker) Identifier ¶
Identifier returns the worker's address, with the nickname in parentheses (if set).
Make sure that you include the nickname in the projection when you fetch the worker from MongoDB.
func (*Worker) RequestStatusChange ¶
RequestStatusChange stores the new requested status in MongoDB, so that it gets picked up by the worker the next time it asks for it. Parameter 'lazy' indicates that the worker can finish the current task first, before applying the status change.
func (*Worker) Seen ¶
Seen registers that we have seen this worker at a certain address and with certain software.
func (*Worker) SeenEx ¶
SeenEx is same as Seen(), but allows for extra updates on the worker in the database, and returns err
func (*Worker) SetAwake ¶
SetAwake sets the worker status to Awake, but only if's not already awake or testing.
func (*Worker) SetCurrentTask ¶
SetCurrentTask sets the worker's current task, and updates the database too.
func (*Worker) SetStatus ¶
SetStatus sets the worker's status, and updates the database too. Use SetAwake() instead of calling this function with status="awake".
func (*Worker) Timeout ¶
func (worker *Worker) Timeout(db *mgo.Database, scheduler *TaskScheduler)
Timeout marks the worker as timed out.
func (*Worker) TimeoutOnTask ¶
func (worker *Worker) TimeoutOnTask(task *Task, db *mgo.Database, scheduler *TaskScheduler)
TimeoutOnTask marks the worker as timed out on a given task. The task is just used for logging.
type WorkerBlacklist ¶
type WorkerBlacklist struct {
// contains filtered or unexported fields
}
WorkerBlacklist stores (worker ID, job ID, task type) tuples for failed tasks.
func CreateWorkerBlackList ¶
func CreateWorkerBlackList(config *Conf, session *mgo.Session) *WorkerBlacklist
CreateWorkerBlackList creates a new WorkerBlackList instance.
func (*WorkerBlacklist) Add ¶
func (wbl *WorkerBlacklist) Add(workerID bson.ObjectId, task *Task) error
Add makes it impossible for the worker to run tasks of the same type on the same job.
func (*WorkerBlacklist) BlacklistForWorker ¶
func (wbl *WorkerBlacklist) BlacklistForWorker(workerID bson.ObjectId) M
BlacklistForWorker returns a partial MongoDB query that can be used to filter out blacklisted tasks.
func (*WorkerBlacklist) EnsureDBIndices ¶
func (wbl *WorkerBlacklist) EnsureDBIndices()
EnsureDBIndices ensures the MongoDB indices are there.
func (*WorkerBlacklist) RemoveLine ¶
func (wbl *WorkerBlacklist) RemoveLine(workerID bson.ObjectId, jobID bson.ObjectId, taskType string) error
RemoveLine removes a single blacklist entry. This is a no-op if the entry doesn't exist.
func (*WorkerBlacklist) WorkersLeft ¶
func (wbl *WorkerBlacklist) WorkersLeft(jobID bson.ObjectId, taskType string) map[bson.ObjectId]bool
WorkersLeft returns the IDs of workers NOT blacklisted for this task type on this job.
type WorkerBlacklistEntry ¶
type WorkerBlacklistEntry struct { Created time.Time `bson:"_created" json:"_created"` WorkerID bson.ObjectId `bson:"worker_id" json:"worker_id,omitempty"` JobID bson.ObjectId `bson:"job_id" json:"job_id"` TaskType string `bson:"task_type" json:"task_type"` }
WorkerBlacklistEntry prevents a certain worker from running certain task types on certain jobs.
type WorkerRef ¶
type WorkerRef struct { // ID is the worker's ID, and is the actual reference. It is not guaranteed to exist because workers can be deleted. ID bson.ObjectId `bson:"id" json:"id"` // Identifier is the human-readable identification of the worker (IP address + nickname). Identifier string `bson:"identifier" json:"identifier"` }
WorkerRef is a reference to a worker.
type WorkerRegistration ¶
type WorkerRegistration struct { Secret string `json:"secret"` Platform string `json:"platform"` SupportedTaskTypes []string `json:"supported_task_types"` Nickname string `json:"nickname"` }
WorkerRegistration is sent by the Worker to register itself at this Manager.
type WorkerRemover ¶
type WorkerRemover struct {
// contains filtered or unexported fields
}
WorkerRemover periodically removes offline workers.
func CreateWorkerRemover ¶
func CreateWorkerRemover(config *Conf, session *mgo.Session, scheduler *TaskScheduler) *WorkerRemover
CreateWorkerRemover creates a WorkerRemover, or returns nil if the configuration disables automatic worker removal.
func (*WorkerRemover) Close ¶
func (wr *WorkerRemover) Close()
Close signals the WorkerRemover goroutine to stop and waits for it to close.
func (*WorkerRemover) Go ¶
func (wr *WorkerRemover) Go()
Go starts a goroutine that periodically checks workers.
type WorkerSignonDoc ¶
type WorkerSignonDoc struct { SupportedTaskTypes []string `json:"supported_task_types,omitempty"` Nickname string `json:"nickname,omitempty"` }
WorkerSignonDoc is sent by the Worker upon sign-on.
type WorkerStatus ¶
type WorkerStatus struct { // For controlling sleeping & waking up. For values, see the workerStatusXXX constants. StatusRequested string `bson:"status_requested" json:"status_requested"` }
WorkerStatus indicates that a status change was requested on the worker. It is sent as response by the scheduler when a worker is not allowed to receive a new task.
Source Files
¶
- blacklist.go
- closable.go
- dashboard.go
- db.go
- documents.go
- fswatcher.go
- fswatcher_http.go
- fswatcher_middle.go
- http.go
- log_rotation.go
- log_server.go
- math.go
- scheduler.go
- settings.go
- sleep_scheduler.go
- task_cleanup.go
- task_log_uploader.go
- task_updates.go
- testtask.go
- time_of_day.go
- timeout_checker.go
- timer.go
- upstream.go
- upstream_notification.go
- varrepl.go
- worker_registration.go
- worker_remover.go
- workers.go
Directories
¶
Path | Synopsis |
---|---|
Package chantools was obtained from https://github.com/theepicsnail/goChanTools and subsequently altered for our needs.
|
Package chantools was obtained from https://github.com/theepicsnail/goChanTools and subsequently altered for our needs. |
Package slugify provide a function that gives a non accentuated and minus separated string from a accentuated string.
|
Package slugify provide a function that gives a non accentuated and minus separated string from a accentuated string. |