flamenco

package
v2.1.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2018 License: GPL-2.0 Imports: 31 Imported by: 2

Documentation

Overview

Package flamenco receives task updates from workers, queues them, and forwards them to the Flamenco Server.

Package flamenco periodically fetches new tasks from the Flamenco Server, and sends updates back.

Index

Constants

View Source
const IsoFormat = "2006-01-02T15:04:05-0700"

IsoFormat is used for timestamp parsing

Variables

View Source
var (
	// ErrDuplicateVariables is returned when the same name is used as regular and path-replacement variable.
	ErrDuplicateVariables = errors.New("duplicate variables found")
)

Functions

func CleanSlate

func CleanSlate(db *mgo.Database)

CleanSlate erases all tasks in the flamenco_tasks collection.

func ConvertAndForward

func ConvertAndForward(images <-chan string, storagePath string) <-chan string

ConvertAndForward copies each image it reads from 'images', converts it to a browser- friendly file, and forwards the new filename to the returned channel. It always converts to JPEG, even when the file is a browser-supported format (like PNG), so that the HTML can always refer to /static/latest-image.jpg to show the latest render.

func Count

func Count(coll *mgo.Collection) (int, error)

Count returns the number of documents in the given collection.

func DecodeJSON

func DecodeJSON(w http.ResponseWriter, r io.Reader, document interface{},
	logprefix string) error

DecodeJSON decodes JSON from an io.Reader, and writes a Bad Request status if it fails.

func ImageWatcherHTTPPush

func ImageWatcherHTTPPush(w http.ResponseWriter, r *http.Request, broadcaster *chantools.OneToManyChan)

ImageWatcherHTTPPush starts a server-side events channel.

func IsRunnableTaskStatus

func IsRunnableTaskStatus(status string) bool

IsRunnableTaskStatus returns whether the given status is considered "runnable".

func LogTaskActivity

func LogTaskActivity(worker *Worker, task *Task, activity, logLine string, db *mgo.Database)

LogTaskActivity creates and queues a TaskUpdate to store activity and a log line.

func MaxInt

func MaxInt(a, b int) int

MaxInt returns the maximum of a and b.

func MongoSession

func MongoSession(config *Conf) *mgo.Session

MongoSession returns a MongoDB session.

The database name should be configured in the database URL. You can use this default database using session.DB("").

func PurgeOutgoingQueue

func PurgeOutgoingQueue(db *mgo.Database)

PurgeOutgoingQueue erases all queued task updates from the local DB

func QueueTaskUpdate

func QueueTaskUpdate(tupdate *TaskUpdate, db *mgo.Database) error

QueueTaskUpdate queues the task update, without any extra updates.

func QueueTaskUpdateFromWorker

func QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedRequest,
	db *mgo.Database, taskID bson.ObjectId)

QueueTaskUpdateFromWorker receives a task update from a worker, and queues it for sending to Flamenco Server.

func QueueTaskUpdateWithExtra

func QueueTaskUpdateWithExtra(tupdate *TaskUpdate, db *mgo.Database, extraUpdates bson.M) error

QueueTaskUpdateWithExtra does the same as QueueTaskUpdate(), but with extra updates to the local flamenco_tasks collection.

func RegisterWorker

func RegisterWorker(w http.ResponseWriter, r *http.Request, db *mgo.Database)

RegisterWorker creates a new Worker in the DB, based on the WorkerRegistration document received.

func ReplaceLocal

func ReplaceLocal(strvalue string, config *Conf) string

ReplaceLocal performs variable and path replacement for strings based on the local platform.

func ReplaceVariables

func ReplaceVariables(config *Conf, task *Task, worker *Worker)

ReplaceVariables performs variable and path replacement for tasks.

func SaveSettings

func SaveSettings(db *mgo.Database, settings *SettingsInMongo)

SaveSettings stores the given settings in MongoDB.

func SendJSON

func SendJSON(logprefix, method string, url *url.URL,
	payload interface{},
	tweakrequest func(req *http.Request),
	responsehandler func(resp *http.Response, body []byte) error,
) error

SendJSON sends a JSON document to some URL via HTTP. :param tweakrequest: can be used to tweak the request before sending it, for

example by adding authentication headers. May be nil.

:param responsehandler: is called when a non-error response has been read.

May be nil.

func SendTestJob

func SendTestJob(worker *Worker, conf *Conf, db *mgo.Database) (string, error)

SendTestJob constructs a test job definition at the Server, which queues it for the worker to pick up.

func StoreNewWorker

func StoreNewWorker(winfo *Worker, db *mgo.Database) error

StoreNewWorker saves the given worker in the database.

func TaskStatusTransitionValid

func TaskStatusTransitionValid(taskColl *mgo.Collection, taskID bson.ObjectId, newStatus string) bool

TaskStatusTransitionValid performs a query on the database to determine the current status, then checks whether the new status is acceptable.

func TemplatePathPrefix

func TemplatePathPrefix(fileToFind string) string

TemplatePathPrefix returns the filename prefix to find template files. Templates are searched for relative to the current working directory as well as relative to the currently running executable.

func TimeoutAfter

func TimeoutAfter(duration time.Duration) chan bool

TimeoutAfter sends a 'true' to the channel after the given timeout.

Send a 'false' to the channel yourself if you want to notify the receiver that a timeout didn't happen.

The channel is buffered with size 2, so both your 'false' and this routine's 'true' write won't block.

func Timer

func Timer(name string, sleepDuration, initialDelay time.Duration, closable *closable) <-chan struct{}

Timer is a generic timer for periodic signals.

func UtcNow

func UtcNow() *time.Time

UtcNow returns the current time & date in UTC.

func WorkerAckStatusChange

func WorkerAckStatusChange(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, ackStatus string)

WorkerAckStatusChange allows a Worker to acknowledge a requested status change.

func WorkerCount

func WorkerCount(db *mgo.Database) int

WorkerCount returns the number of registered workers.

func WorkerGetStatusChange

func WorkerGetStatusChange(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database)

WorkerGetStatusChange allows a Worker to fetch any pending status change.

func WorkerMayRunTask

func WorkerMayRunTask(w http.ResponseWriter, r *auth.AuthenticatedRequest,
	db *mgo.Database, taskID bson.ObjectId)

WorkerMayRunTask tells the worker whether it's allowed to keep running the given task.

func WorkerPingedTask

func WorkerPingedTask(workerID bson.ObjectId, taskID bson.ObjectId, taskStatus string, db *mgo.Database)

WorkerPingedTask marks the task as pinged by the worker. If worker_id is not nil, sets the worker_id field of the task. Otherwise doesn't touch that field and only updates last_worker_ping.

func WorkerSecret

func WorkerSecret(user string, db *mgo.Database) string

WorkerSecret returns the hashed secret of the worker.

func WorkerSignOff

func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database)

WorkerSignOff re-queues all active tasks (should be only one) that are assigned to this worker.

func WorkerSignOn

func WorkerSignOn(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database)

WorkerSignOn allows a Worker to register a new list of supported task types. It also clears the worker's "current" task from the dashboard, so that it's clear that the now-active worker is not actually working on that task.

Types

type BlenderRenderConfig

type BlenderRenderConfig struct {
	JobStorage   string `yaml:"job_storage"`
	RenderOutput string `yaml:"render_output"`
}

BlenderRenderConfig represents the configuration required for a test render.

type Command

type Command struct {
	Name     string `bson:"name" json:"name"`
	Settings bson.M `bson:"settings" json:"settings"`
}

Command is an executable part of a Task

type Conf

type Conf struct {
	DatabaseURL   string   `yaml:"database_url"`
	DatabasePath  string   `yaml:"database_path"`
	Listen        string   `yaml:"listen"`
	OwnURL        string   `yaml:"own_url"`
	FlamencoStr   string   `yaml:"flamenco"`
	Flamenco      *url.URL `yaml:"-"`
	ManagerID     string   `yaml:"manager_id"`
	ManagerSecret string   `yaml:"manager_secret"`
	TLSKey        string   `yaml:"tlskey"`
	TLSCert       string   `yaml:"tlscert"`

	DownloadTaskSleep time.Duration `yaml:"download_task_sleep"`

	/* The number of seconds between rechecks when there are no more tasks for workers.
	 * If set to 0, will not throttle at all.
	 * If set to -1, will never check when a worker asks for a task (so only every
	 * download_task_sleep_seconds seconds). */
	DownloadTaskRecheckThrottle time.Duration `yaml:"download_task_recheck_throttle"`

	/* Variables, stored differently in YAML and these settings.
	 * Variables:             variable name -> platform -> value
	 * VariablesPerPlatform:  platform -> variable name -> value
	 */
	VariablesByVarname  map[string]map[string]string `yaml:"variables"`
	VariablesByPlatform map[string]map[string]string `yaml:"-"`

	PathReplacementByVarname  map[string]map[string]string `yaml:"path_replacement"`
	PathReplacementByPlatform map[string]map[string]string `yaml:"-"`

	TaskUpdatePushMaxInterval time.Duration `yaml:"task_update_push_max_interval"`
	TaskUpdatePushMaxCount    int           `yaml:"task_update_push_max_count"`
	CancelTaskFetchInterval   time.Duration `yaml:"cancel_task_fetch_max_interval"`

	ActiveTaskTimeoutInterval   time.Duration `yaml:"active_task_timeout_interval"`
	ActiveWorkerTimeoutInterval time.Duration `yaml:"active_worker_timeout_interval"`

	TaskCleanupMaxAge time.Duration `yaml:"task_cleanup_max_age"`

	WatchForLatestImage string `yaml:"watch_for_latest_image"`

	SSDPDiscovery  bool   `yaml:"ssdp_discovery"`
	SSDPDeviceUUID string `yaml:"ssdp_device_uuid"`

	TestTasks TestTasks `yaml:"test_tasks"`
}

Conf represents the Manager's configuration file.

func GetConf

func GetConf() (Conf, error)

GetConf parses flamenco-manager.yaml and returns its contents as a Conf object.

func GetTestConfig

func GetTestConfig() Conf

GetTestConfig returns the configuration for unit tests.

func LoadConf

func LoadConf(filename string) (Conf, error)

LoadConf parses the given file and returns its contents as a Conf object.

func (*Conf) HasTLS

func (c *Conf) HasTLS() bool

HasTLS returns true if both the TLS certificate and key files are configured.

func (*Conf) Overwrite

func (c *Conf) Overwrite() error

Overwrite stores this configuration object as flamenco-manager.yaml.

func (*Conf) Write

func (c *Conf) Write(filename string) error

Write saves the current in-memory configuration to a YAML file.

type FileProduced

type FileProduced struct {
	Paths []string `json:"paths"`
}

FileProduced is sent by the worker whenever it produces (e.g. renders) some file. This hooks into the fswatcher system.

type ImageWatcher

type ImageWatcher struct {

	// The public channel, from which can only be read.
	ImageCreated <-chan string
	// contains filtered or unexported fields
}

ImageWatcher watches a filesystem directory.

func CreateImageWatcher

func CreateImageWatcher(pathToWatch string, bufferSize int) *ImageWatcher

CreateImageWatcher creates a new ImageWatcher for the given directory. bufferSize is the size of the iw.ImageCreated channel.

func (*ImageWatcher) Close

func (iw *ImageWatcher) Close()

Close cleanly shuts down the watcher.

func (*ImageWatcher) Go

func (iw *ImageWatcher) Go()

Go starts the watcher in a separate gofunc.

type LatestImageSystem

type LatestImageSystem struct {
	// contains filtered or unexported fields
}

LatestImageSystem ties an ImageWatcher to a the fswatcher_middle and fswatcher_http stuff, allowing the results to be pushed via HTTP to browsers.

func CreateLatestImageSystem

func CreateLatestImageSystem(watchPath string) *LatestImageSystem

CreateLatestImageSystem sets up a LatestImageSystem

func (*LatestImageSystem) AddRoutes

func (lis *LatestImageSystem) AddRoutes(router *mux.Router, workerAuth *auth.BasicAuth)

AddRoutes adds the HTTP Server-Side Events endpoint to the router.

func (*LatestImageSystem) Close

func (lis *LatestImageSystem) Close()

Close gracefully shuts down the image watcher, if the path to watch isn't empty.

func (*LatestImageSystem) Go

func (lis *LatestImageSystem) Go()

Go starts the image watcher, if the path to watch isn't empty.

type M

type M bson.M

M is a shortcut for bson.M to make longer queries easier to read.

type MayKeepRunningResponse

type MayKeepRunningResponse struct {
	MayKeepRunning bool   `json:"may_keep_running"`
	Reason         string `json:"reason,omitempty"`

	// For controlling sleeping & waking up. For values, see the workerStatusXXX constants.
	StatusRequested string `json:"status_requested,omitempty"`
}

MayKeepRunningResponse is sent to workers to indicate whether they can keep running their task.

type Reporter

type Reporter struct {
	// contains filtered or unexported fields
}

Reporter can show HTML and JSON reports.

func CreateReporter

func CreateReporter(config *Conf, session *mgo.Session, flamencoVersion string) *Reporter

CreateReporter creates a new Reporter object.

func (*Reporter) AddRoutes

func (rep *Reporter) AddRoutes(router *mux.Router)

AddRoutes adds routes to serve reporting status requests.

type ScheduledTasks

type ScheduledTasks struct {
	Depsgraph []Task `json:"depsgraph"`
}

ScheduledTasks contains a dependency graph response from Server.

type SettingsInMongo

type SettingsInMongo struct {
	DepsgraphLastModified *string `bson:"depsgraph_last_modified"`
}

SettingsInMongo contains settings we want to be able to update from within Flamenco Manager itself, so those are stored in MongoDB.

func GetSettings

func GetSettings(db *mgo.Database) *SettingsInMongo

GetSettings returns the settings as saved in our MongoDB.

type StartupNotification

type StartupNotification struct {
	// Settings
	ManagerURL               string                       `json:"manager_url"`
	VariablesByVarname       map[string]map[string]string `json:"variables"`
	PathReplacementByVarname map[string]map[string]string `json:"path_replacement"`

	// From our local database
	NumberOfWorkers int `json:"nr_of_workers"`
}

StartupNotification sent to upstream Flamenco Server upon startup. This is a combination of settings (see settings.go) and information from the database.

type StartupNotifier

type StartupNotifier struct {
	// contains filtered or unexported fields
}

StartupNotifier sends a signal to Flamenco Server that we've started.

func CreateStartupNotifier

func CreateStartupNotifier(config *Conf, upstream *UpstreamConnection, session *mgo.Session) *StartupNotifier

CreateStartupNotifier creates a new startup notifier.

func (*StartupNotifier) Close

func (sn *StartupNotifier) Close()

Close performs a clean shutdown.

func (*StartupNotifier) Go

func (sn *StartupNotifier) Go()

Go sends a StartupNotification document to upstream Flamenco Server. Keeps trying in a goroutine until the notification was succesful.

type StatusReport

type StatusReport struct {
	NrOfWorkers int      `json:"nr_of_workers"`
	NrOfTasks   int      `json:"nr_of_tasks"`
	Version     string   `json:"version"`
	Workers     []Worker `json:"workers"`
	Server      string   `json:"server"`
}

StatusReport is sent in response to a query on the / URL.

type Task

type Task struct {
	ID          bson.ObjectId   `bson:"_id,omitempty" json:"_id,omitempty"`
	Etag        string          `bson:"_etag,omitempty" json:"_etag,omitempty"`
	Job         bson.ObjectId   `bson:"job,omitempty" json:"job"`
	Manager     bson.ObjectId   `bson:"manager,omitempty" json:"manager"`
	Project     bson.ObjectId   `bson:"project,omitempty" json:"project"`
	User        bson.ObjectId   `bson:"user,omitempty" json:"user"`
	Name        string          `bson:"name" json:"name"`
	Status      string          `bson:"status" json:"status"`
	Priority    int             `bson:"priority" json:"priority"`
	JobPriority int             `bson:"job_priority" json:"job_priority"`
	JobType     string          `bson:"job_type" json:"job_type"`
	TaskType    string          `bson:"task_type" json:"task_type"`
	Commands    []Command       `bson:"commands" json:"commands"`
	Log         string          `bson:"log,omitempty" json:"log,omitempty"`
	Activity    string          `bson:"activity,omitempty" json:"activity,omitempty"`
	Parents     []bson.ObjectId `bson:"parents,omitempty" json:"parents,omitempty"`
	Worker      string          `bson:"worker,omitempty" json:"worker,omitempty"`

	// Internal bookkeeping
	WorkerID       *bson.ObjectId `bson:"worker_id,omitempty" json:"-"`        // The worker assigned to this task.
	LastWorkerPing *time.Time     `bson:"last_worker_ping,omitempty" json:"-"` // When a worker last said it was working on this. Might not have been a task update.
	LastUpdated    *time.Time     `bson:"last_updated,omitempty" json:"-"`     // when we have last seen an update.
}

Task contains a Flamenco task, with some BSON-only fields for local Manager use.

type TaskCleaner

type TaskCleaner struct {
	// contains filtered or unexported fields
}

TaskCleaner periodically deletes tasks that haven't been touched in a long time.

func CreateTaskCleaner

func CreateTaskCleaner(config *Conf, session *mgo.Session) *TaskCleaner

CreateTaskCleaner creates a new TaskCleaner with default timings.

func (*TaskCleaner) Close

func (tc *TaskCleaner) Close()

Close gracefully shuts down the task timeout checker goroutine.

func (*TaskCleaner) Go

func (tc *TaskCleaner) Go()

Go starts a new goroutine to perform the periodic checking.

type TaskScheduler

type TaskScheduler struct {
	// contains filtered or unexported fields
}

TaskScheduler offers tasks to Workers when they ask for them.

func CreateTaskScheduler

func CreateTaskScheduler(config *Conf, upstream *UpstreamConnection, session *mgo.Session) *TaskScheduler

CreateTaskScheduler constructs a new TaskScheduler, including private fields.

func (*TaskScheduler) ScheduleTask

func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.AuthenticatedRequest)

ScheduleTask assigns a task to a worker.

type TaskUpdate

type TaskUpdate struct {
	ID                        bson.ObjectId `bson:"_id" json:"_id"`
	TaskID                    bson.ObjectId `bson:"task_id" json:"task_id,omitempty"`
	TaskStatus                string        `bson:"task_status,omitempty" json:"task_status,omitempty"`
	ReceivedOnManager         time.Time     `bson:"received_on_manager" json:"received_on_manager"`
	Activity                  string        `bson:"activity,omitempty" json:"activity,omitempty"`
	TaskProgressPercentage    int           `bson:"task_progress_percentage" json:"task_progress_percentage"`
	CurrentCommandIdx         int           `bson:"current_command_idx" json:"current_command_idx"`
	CommandProgressPercentage int           `bson:"command_progress_percentage" json:"command_progress_percentage"`
	Log                       string        `bson:"log,omitempty" json:"log,omitempty"`
	Worker                    string        `bson:"worker" json:"worker"`
	// contains filtered or unexported fields
}

TaskUpdate is both sent from Worker to Manager, as well as from Manager to Server.

type TaskUpdatePusher

type TaskUpdatePusher struct {
	// contains filtered or unexported fields
}

TaskUpdatePusher pushes queued task updates to the Flamenco Server.

func CreateTaskUpdatePusher

func CreateTaskUpdatePusher(config *Conf, upstream *UpstreamConnection, session *mgo.Session) *TaskUpdatePusher

CreateTaskUpdatePusher creates a new task update pusher that runs in a separate goroutine.

func (*TaskUpdatePusher) Close

func (pusher *TaskUpdatePusher) Close()

Close closes the task update pusher by stopping all timers & goroutines.

func (*TaskUpdatePusher) Go

func (pusher *TaskUpdatePusher) Go()

Go starts the goroutine.

type TaskUpdateResponse

type TaskUpdateResponse struct {
	ModifiedCount    int             `json:"modified_count"`
	HandledUpdateIds []bson.ObjectId `json:"handled_update_ids,omitempty"`
	CancelTasksIds   []bson.ObjectId `json:"cancel_task_ids,omitempty"`
}

TaskUpdateResponse is received from Server.

type TestTasks

type TestTasks struct {
	BlenderRender BlenderRenderConfig `yaml:"test_blender_render"`
}

TestTasks represents the 'test_tasks' key in the Manager's configuration file.

type TimeoutChecker

type TimeoutChecker struct {
	// contains filtered or unexported fields
}

TimeoutChecker periodically times out tasks and workers if the worker hasn't sent any update recently.

func CreateTimeoutChecker

func CreateTimeoutChecker(config *Conf, session *mgo.Session) *TimeoutChecker

CreateTimeoutChecker creates a new TimeoutChecker.

func (*TimeoutChecker) Close

func (ttc *TimeoutChecker) Close()

Close gracefully shuts down the task timeout checker goroutine.

func (*TimeoutChecker) Go

func (ttc *TimeoutChecker) Go()

Go starts a new goroutine to perform the periodic checking.

type UpstreamConnection

type UpstreamConnection struct {
	// contains filtered or unexported fields
}

UpstreamConnection represents a connection to an upstream Flamenco Server.

func ConnectUpstream

func ConnectUpstream(config *Conf, session *mgo.Session) *UpstreamConnection

ConnectUpstream creates a new UpstreamConnection object and starts the task download loop.

func (*UpstreamConnection) Close

func (uc *UpstreamConnection) Close()

Close gracefully closes the upstream connection by stopping all upload/download loops.

func (*UpstreamConnection) KickDownloader

func (uc *UpstreamConnection) KickDownloader(synchronous bool)

KickDownloader fetches new tasks from the Flamenco Server.

func (*UpstreamConnection) RefetchTask

func (uc *UpstreamConnection) RefetchTask(task *Task) bool

RefetchTask re-fetches a task from the Server, but only if its etag changed.

  • If the etag changed, the differences between the old and new status are handled.
  • If the Server cannot be reached, this error is ignored and the task is untouched.
  • If the Server returns an error code other than 500 Internal Server Error, (Forbidden, Not Found, etc.) the task is removed from the local task queue.

If the task was untouched, this function returns false. If it was changed or removed, this function return true.

func (*UpstreamConnection) ResolveURL

func (uc *UpstreamConnection) ResolveURL(relativeURL string, a ...interface{}) (*url.URL, error)

ResolveURL returns the given URL relative to the base URL of the upstream server, as absolute URL.

func (*UpstreamConnection) SendJSON

func (uc *UpstreamConnection) SendJSON(logprefix, method string, url *url.URL,
	payload interface{},
	responsehandler func(resp *http.Response, body []byte) error,
) error

SendJSON sends a JSON document to the given URL.

func (*UpstreamConnection) SendTaskUpdates

func (uc *UpstreamConnection) SendTaskUpdates(updates *[]TaskUpdate) (*TaskUpdateResponse, error)

SendTaskUpdates performs a POST to /api/flamenco/managers/{manager-id}/task-update-batch to send a batch of task updates to the Server.

type Worker

type Worker struct {
	ID                 bson.ObjectId  `bson:"_id,omitempty" json:"_id,omitempty"`
	Secret             string         `bson:"-" json:"-"`
	HashedSecret       []byte         `bson:"hashed_secret" json:"-"`
	Nickname           string         `bson:"nickname" json:"nickname"`
	Address            string         `bson:"address" json:"address"`
	Status             string         `bson:"status" json:"status"`
	Platform           string         `bson:"platform" json:"platform"`
	CurrentTask        *bson.ObjectId `bson:"current_task,omitempty" json:"current_task,omitempty"`
	TimeCost           int            `bson:"time_cost" json:"time_cost"`
	LastActivity       *time.Time     `bson:"last_activity,omitempty" json:"last_activity,omitempty"`
	SupportedTaskTypes []string       `bson:"supported_task_types" json:"supported_task_types"`
	Software           string         `bson:"software" json:"software"`

	// For dashboard
	CurrentTaskStatus  string     `bson:"current_task_status,omitempty" json:"current_task_status,omitempty"`
	CurrentTaskUpdated *time.Time `bson:"current_task_updated,omitempty" json:"current_task_updated,omitempty"`

	// For controlling sleeping & waking up. For values, see the workerStatusXXX constants.
	StatusRequested string `bson:"status_requested" json:"status_requested"`
}

Worker contains all information about a specific Worker. Some fields come from the WorkerRegistration, whereas others are filled by us.

func FindWorker

func FindWorker(workerID string, projection interface{}, db *mgo.Database) (*Worker, error)

FindWorker returns the worker given its ID in string form.

func FindWorkerByID

func FindWorkerByID(workerID bson.ObjectId, db *mgo.Database) (*Worker, error)

FindWorkerByID returns the entire worker, no projections.

func (*Worker) AckStatusChange

func (worker *Worker) AckStatusChange(newStatus string, db *mgo.Database) error

AckStatusChange acknowledges the requested status change by moving it to the actual status. Only the "shutdown" status should not be acknowledged, but just result in a signoff and thus directly go to "offline" state.

func (*Worker) AckTimeout

func (worker *Worker) AckTimeout(db *mgo.Database) error

AckTimeout acknowledges the timeout and just sets the worker to "offline".

func (*Worker) Identifier

func (worker *Worker) Identifier() string

Identifier returns the worker's address, with the nickname in parentheses (if set).

Make sure that you include the nickname in the projection when you fetch the worker from MongoDB.

func (*Worker) RequestStatusChange

func (worker *Worker) RequestStatusChange(newStatus string, db *mgo.Database) error

RequestStatusChange stores the new requested status in MongoDB, so that it gets picked up by the worker the next time it asks for it.

func (*Worker) Seen

func (worker *Worker) Seen(r *http.Request, db *mgo.Database)

Seen registers that we have seen this worker at a certain address and with certain software.

func (*Worker) SeenEx

func (worker *Worker) SeenEx(r *http.Request, db *mgo.Database, set bson.M, unset bson.M) error

SeenEx is same as Seen(), but allows for extra updates on the worker in the database, and returns err

func (*Worker) SetAwake

func (worker *Worker) SetAwake(db *mgo.Database) error

SetAwake sets the worker status to Awake, but only if's not already awake or testing.

func (*Worker) SetCurrentTask

func (worker *Worker) SetCurrentTask(taskID bson.ObjectId, db *mgo.Database) error

SetCurrentTask sets the worker's current task, and updates the database too.

func (*Worker) SetStatus

func (worker *Worker) SetStatus(status string, db *mgo.Database) error

SetStatus sets the worker's status, and updates the database too. Use SetAwake() instead of calling this function with status="awake".

func (*Worker) Timeout

func (worker *Worker) Timeout(db *mgo.Database)

Timeout marks the worker as timed out.

func (*Worker) TimeoutOnTask

func (worker *Worker) TimeoutOnTask(task *Task, db *mgo.Database)

TimeoutOnTask marks the worker as timed out on a given task. The task is just used for logging.

type WorkerRegistration

type WorkerRegistration struct {
	Secret             string   `json:"secret"`
	Platform           string   `json:"platform"`
	SupportedTaskTypes []string `json:"supported_task_types"`
	Nickname           string   `json:"nickname"`
}

WorkerRegistration is sent by the Worker to register itself at this Manager.

type WorkerSignonDoc

type WorkerSignonDoc struct {
	SupportedTaskTypes []string `json:"supported_task_types,omitempty"`
	Nickname           string   `json:"nickname,omitempty"`
}

WorkerSignonDoc is sent by the Worker upon sign-on.

type WorkerStatus

type WorkerStatus struct {
	// For controlling sleeping & waking up. For values, see the workerStatusXXX constants.
	StatusRequested string `bson:"status_requested" json:"status_requested"`
}

WorkerStatus indicates that a status change was requested on the worker. It is sent as response by the scheduler when a worker is not allowed to receive a new task.

Directories

Path Synopsis
Package chantools was obtained from https://github.com/theepicsnail/goChanTools and subsequently altered for our needs.
Package chantools was obtained from https://github.com/theepicsnail/goChanTools and subsequently altered for our needs.
Package slugify provide a function that gives a non accentuated and minus separated string from a accentuated string.
Package slugify provide a function that gives a non accentuated and minus separated string from a accentuated string.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL