job

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2021 License: BSD-3-Clause Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WaitForNextElementChanCapacity = 1000

	QueueErrorCodeEmptyQueue            = "empty-queue"
	QueueErrorCodeLockedQueue           = "locked-queue"
	QueueErrorCodeIndexOutOfBounds      = "index-out-of-bounds"
	QueueErrorCodeFullCapacity          = "full-capacity"
	QueueErrorCodeInternalChannelClosed = "internal-channel-closed"
)

Variables

This section is empty.

Functions

func IsDirEmpty

func IsDirEmpty(path string) (bool, error)

IsDirEmpty is a method used to determine whether a directory is empty

func PrepareEnvironment

func PrepareEnvironment(cpus []int, dryRun bool) error

func RevertEnvironment

func RevertEnvironment(dryRun bool) error

RevertEnvironment sets original Procfs entries

Types

type ActiveTaskRun

type ActiveTaskRun struct {
	Task   *Task
	Runner *run.Runner

	CoreIds []int // the exact core numbers this task is using
	// contains filtered or unexported fields
}

ActiveTaskRun contains information about a particular task's run.

func NewActiveTaskRun

func NewActiveTaskRun(task *Task, run run.Run, coreIds []int, bridge *run.Bridge, dryRun bool, maxRetries int) (*ActiveTaskRun, error)

NewActiveTaskRun initializes the current task and the run step for the the specified cores.

func (*ActiveTaskRun) Start

func (atr *ActiveTaskRun) Start() (int, time.Duration, error)

Start the task's run

func (*ActiveTaskRun) UUID

func (atr *ActiveTaskRun) UUID() string

UUID returns the Unique ID for the task and run

type CoreMap

type CoreMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

CoreMap holds onto to a reference to the particular task which is currently running on the core number defined as the index.

func NewCoreMap

func NewCoreMap(cores []int) *CoreMap

CoreMap creates a fixed-length map of cores with their ID as index.

func (*CoreMap) All

func (cm *CoreMap) All() map[int]*ActiveTaskRun

All returns a list of all of the cores and its tasks Warning: Concurrency should now be handled by the routine which uses this method.

func (*CoreMap) FreeCores

func (cm *CoreMap) FreeCores() []int

Retrieve a list of core numbers whch are free

func (*CoreMap) Get

func (cm *CoreMap) Get(coreId int) *ActiveTaskRun

Get retrieves the ActiveTaskRun at the coreId

func (*CoreMap) Set

func (cm *CoreMap) Set(coreId int, atr *ActiveTaskRun) error

Set updates the core ID with the task which is actively using it

func (*CoreMap) Unset

func (cm *CoreMap) Unset(coreId int)

Unset updates the core ID to be free

type Job

type Job struct {
	Params  []JobParam   `yaml:"params"`
	Inputs  []run.Input  `yaml:"inputs"`
	Outputs []run.Output `yaml:"outputs"`
	Runs    []run.Run    `yaml:"runs"`
	// contains filtered or unexported fields
}

func NewJob

func NewJob(filePath string, cfg *RuntimeConfig, dryRun bool) (*Job, error)

NewJob prepares a job yaml file

func (*Job) Cleanup

func (j *Job) Cleanup()

Cleanup provides a way to deschedule all currently active tasks

func (*Job) Start

func (j *Job) Start() error

Start the job and all of its tasks

type JobParam

type JobParam struct {
	Name     string   `yaml:"name"`
	Type     string   `yaml:"type"`
	Default  string   `yaml:"default"`
	Only     []string `yaml:"only"`
	Min      string   `yaml:"min"`
	Max      string   `yaml:"max"`
	Step     string   `yaml:"step"`
	StepMode string   `yaml:"step_mode"`
}

type List

type List struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

List holds onto a generic out-of-order concurrency-safe array.

func NewList

func NewList(capacity int) *List

NewList creates a generic out-of-order concurrency-safe array.

func (*List) Add

func (l *List) Add(item interface{})

Add to the list

func (*List) Get

func (l *List) Get(i int) (interface{}, error)

Get an item

func (*List) Len

func (l *List) Len() int

Len returns the length of the list

func (*List) Remove

func (l *List) Remove(i int) interface{}

Remove from the list

type Proc

type Proc struct {
	Items []ProcValue
}

type ProcValue

type ProcValue struct {
	Path     string
	Original string
	Current  string
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Fixed capacity FIFO (First In First Out) concurrent queue

func NewQueue

func NewQueue(capacity int) *Queue

func (*Queue) Clear

func (st *Queue) Clear() error

Clear the queue empties it

func (*Queue) Dequeue

func (st *Queue) Dequeue() (interface{}, error)

Dequeue dequeues an element. Returns error if: queue is locked, queue is empty or internal channel is closed.

func (*Queue) DequeueOrWaitForNextElement

func (st *Queue) DequeueOrWaitForNextElement() (interface{}, error)

DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it. Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements.

func (*Queue) DequeueOrWaitForNextElementContext

func (st *Queue) DequeueOrWaitForNextElementContext(ctx context.Context) (interface{}, error)

DequeueOrWaitForNextElementContext dequeues an element (if exist) or waits until the next element gets enqueued and returns it. Multiple calls to DequeueOrWaitForNextElementContext() would enqueue multiple "listeners" for future enqueued elements. When the passed context expires this function exits and returns the context' error.

func (*Queue) Enqueue

func (st *Queue) Enqueue(value interface{}) error

Enqueue enqueues an element. Returns error if queue is locked or it is at full capacity.

func (*Queue) GetCap

func (st *Queue) GetCap() int

GetCap returns the queue's capacity

func (*Queue) IsLocked

func (st *Queue) IsLocked() bool

func (*Queue) Len

func (st *Queue) Len() int

Len returns queue's length (total enqueued elements)

func (*Queue) Lock

func (st *Queue) Lock()

func (*Queue) Peak

func (st *Queue) Peak() (interface{}, error)

Peak looks at the next element in the Queue without removing it. Returns error if the queue is empty or internal channel is closed.

func (*Queue) Unlock

func (st *Queue) Unlock()

type QueueError

type QueueError struct {
	// contains filtered or unexported fields
}

func NewQueueError

func NewQueueError(code string, message string) *QueueError

func (*QueueError) Code

func (st *QueueError) Code() string

func (*QueueError) Error

func (st *QueueError) Error() string

type RuntimeConfig

type RuntimeConfig struct {
	Cpus          []int
	BridgeName    string
	BridgeIface   string
	BridgeSubnet  string
	ScheduleGrace int
	WorkDir       string
	AllowOverride bool
	MaxRetries    int
}

RuntimeConfig contains details about the runtime of wayfinder

type Task

type Task struct {
	Params  []TaskParam
	Inputs  *[]run.Input
	Outputs *[]run.Output

	AllowOverride bool
	// contains filtered or unexported fields
}

Task is the specific iterated configuration

func (*Task) Cancel

func (t *Task) Cancel()

Cancel the task by removing everything from the queue

func (*Task) Init

func (t *Task) Init(workDir string, allowOverride bool, runs *[]run.Run, dryRun bool) error

Init prepare the task

func (*Task) UUID

func (t *Task) UUID() string

type TaskParam

type TaskParam struct {
	Name  string
	Type  string
	Value string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL