tork

package module
v0.1.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2023 License: MIT Imports: 2 Imported by: 1

README

Tork

Go Report Card Build Status

A Golang based high-performance, scalable and distributed workflow engine.

Features:

Documentation

See https://www.tork.run for the full documentation.

Quick Start

  1. Ensure you have Docker with API Version >= 1.42 (use docker version | grep API to check).

  2. Download the binary for your system from the releases page.

Hello World

Start in standalone mode:

./tork run standalone

Submit a job in another terminal:

# hello.yaml
---
name: hello job
tasks:
  - name: say hello
    image: ubuntu:mantic #docker image
    run: |
      echo -n hello world
  - name: say goodbye
    image: ubuntu:mantic
    run: |
      echo -n bye world
JOB_ID=$(curl \
  -s \
  -X POST \
  --data-binary @hello.yaml \
  -H "Content-type: text/yaml" \
  http://localhost:8000/jobs | jq -r .id)

Query for the status of the job:

curl -s http://localhost:8000/jobs/$JOB_ID | jq .

{
  "id": "ed0dba93d262492b8cf26e6c1c4f1c98",
  "state": "COMPLETED",
  ...
  "execution": [
    {
      ...
      "state": "COMPLETED",
    }
  ],
}

A slightly more interesting example

The following job:

  1. Downloads a remote video file using a pre task to a shared /tmp volume.
  2. Converts the first 5 seconds of the downloaded video using ffmpeg.
  3. Uploads the converted video to a destination using a post task.
# convert.yaml
---
name: convert a video
inputs:
  source: https://upload.wikimedia.org/wikipedia/commons/1/18/Big_Buck_Bunny_Trailer_1080p.ogv
tasks:
  - name: convert the first 5 seconds of a video
    image: jrottenberg/ffmpeg:3.4-alpine
    run: |
      ffmpeg -i /tmp/input.ogv -t 5 /tmp/output.mp4
    volumes:
      - /tmp
    pre:
      - name: download the remote file
        image: alpine:3.18.3
        env:
          SOURCE_URL: "{{ inputs.source }}"
        run: |
          wget \
          $SOURCE_URL \
          -O /tmp/input.ogv
    post:
      - name: upload the converted file
        image: alpine:3.18.3
        run: |
          wget \
          --post-file=/tmp/output.mp4 \
          https://devnull-as-a-service.com/dev/null

Submit the job in another terminal:

JOB_ID=$(curl \
  -s \
  -X POST \
  --data-binary @convert.yaml \
  -H "Content-type: text/yaml" \
  http://localhost:8000/jobs | jq -r .id)

More examples

Check out the examples folder.

REST API

See the REST API documentation.

Swagger Docs

Make sure you have CORS configured in your config file:

[middleware.web.cors]
enabled = true

Start Tork in standalone or coordinator mode.

go run cmd/main.go run standalone

Serve the Swagger Docs

docker compose up -d swagger

Visit http://localhost:9000

Web UI

Tork Web is a web based tool for interacting with Tork.

License

Copyright (c) 2023-present Arik Cohen. Tork is free and open-source software licensed under the MIT License.

Documentation

Index

Constants

View Source
const (
	Version = "0.1.12"
)

Variables

View Source
var (
	GitCommit string = "develop"
)
View Source
var HEARTBEAT_RATE = time.Second * 30
View Source
var LAST_HEARTBEAT_TIMEOUT = time.Minute * 5

Functions

This section is empty.

Types

type EachTask

type EachTask struct {
	List        string `json:"list,omitempty"`
	Task        *Task  `json:"task,omitempty"`
	Size        int    `json:"size,omitempty"`
	Completions int    `json:"completions,omitempty"`
}

func (*EachTask) Clone

func (e *EachTask) Clone() *EachTask

type Job

type Job struct {
	ID          string            `json:"id,omitempty"`
	ParentID    string            `json:"parentId,omitempty"`
	Name        string            `json:"name,omitempty"`
	Description string            `json:"description,omitempty"`
	State       JobState          `json:"state,omitempty"`
	CreatedAt   time.Time         `json:"createdAt,omitempty"`
	StartedAt   *time.Time        `json:"startedAt,omitempty"`
	CompletedAt *time.Time        `json:"completedAt,omitempty"`
	FailedAt    *time.Time        `json:"failedAt,omitempty"`
	Tasks       []*Task           `json:"tasks"`
	Execution   []*Task           `json:"execution"`
	Position    int               `json:"position"`
	Inputs      map[string]string `json:"inputs,omitempty"`
	Context     JobContext        `json:"context,omitempty"`
	TaskCount   int               `json:"taskCount,omitempty"`
	Output      string            `json:"output,omitempty"`
	Result      string            `json:"result,omitempty"`
	Error       string            `json:"error,omitempty"`
}

func (*Job) Clone

func (j *Job) Clone() *Job

type JobContext

type JobContext struct {
	Inputs map[string]string `json:"inputs,omitempty"`
	Tasks  map[string]string `json:"tasks,omitempty"`
}

func (JobContext) AsMap

func (c JobContext) AsMap() map[string]any

func (JobContext) Clone

func (c JobContext) Clone() JobContext

type JobMetrics added in v0.1.5

type JobMetrics struct {
	Running int `json:"running"`
}

type JobState

type JobState string
const (
	JobStatePending   JobState = "PENDING"
	JobStateRunning   JobState = "RUNNING"
	JobStateCancelled JobState = "CANCELLED"
	JobStateCompleted JobState = "COMPLETED"
	JobStateFailed    JobState = "FAILED"
	JobStateRestart   JobState = "RESTART"
)

type JobSummary added in v0.1.6

type JobSummary struct {
	ID          string     `json:"id,omitempty"`
	ParentID    string     `json:"parentId,omitempty"`
	Name        string     `json:"name,omitempty"`
	Description string     `json:"description,omitempty"`
	State       JobState   `json:"state,omitempty"`
	CreatedAt   time.Time  `json:"createdAt,omitempty"`
	StartedAt   *time.Time `json:"startedAt,omitempty"`
	CompletedAt *time.Time `json:"completedAt,omitempty"`
	FailedAt    *time.Time `json:"failedAt,omitempty"`
	Position    int        `json:"position"`
	TaskCount   int        `json:"taskCount,omitempty"`
	Output      string     `json:"output,omitempty"`
	Result      string     `json:"result,omitempty"`
	Error       string     `json:"error,omitempty"`
}

func NewJobSummary added in v0.1.6

func NewJobSummary(j *Job) *JobSummary

type Metrics added in v0.1.5

type Metrics struct {
	Jobs  JobMetrics  `json:"jobs"`
	Tasks TaskMetrics `json:"tasks"`
	Nodes NodeMetrics `json:"nodes"`
}

type Node

type Node struct {
	ID              string     `json:"id,omitempty"`
	StartedAt       time.Time  `json:"startedAt,omitempty"`
	CPUPercent      float64    `json:"cpuPercent,omitempty"`
	LastHeartbeatAt time.Time  `json:"lastHeartbeatAt,omitempty"`
	Queue           string     `json:"queue,omitempty"`
	Status          NodeStatus `json:"status,omitempty"`
	Hostname        string     `json:"hostname,omitempty"`
	TaskCount       int        `json:"taskCount"`
	Version         string     `json:"version"`
}

func (*Node) Clone added in v0.1.4

func (n *Node) Clone() *Node

type NodeMetrics added in v0.1.5

type NodeMetrics struct {
	Running    int     `json:"online"`
	CPUPercent float64 `json:"cpuPercent"`
}

type NodeStatus

type NodeStatus string
const (
	NodeStatusUP      NodeStatus = "UP"
	NodeStatusDown    NodeStatus = "DOWN"
	NodeStatusOffline NodeStatus = "OFFLINE"
)

type ParallelTask

type ParallelTask struct {
	Tasks       []*Task `json:"tasks,omitempty"`
	Completions int     `json:"completions,omitempty"`
}

func (*ParallelTask) Clone

func (p *ParallelTask) Clone() *ParallelTask

type Registry added in v0.1.11

type Registry struct {
	Username string `json:"username,omitempty"`
	Password string `json:"password,omitempty"`
}

func (*Registry) Clone added in v0.1.11

func (r *Registry) Clone() *Registry

type SubJobTask

type SubJobTask struct {
	ID          string            `json:"id,omitempty"`
	Name        string            `json:"name,omitempty"`
	Description string            `json:"description,omitempty"`
	Tasks       []*Task           `json:"tasks,omitempty"`
	Inputs      map[string]string `json:"inputs,omitempty"`
	Output      string            `json:"output,omitempty"`
}

func (*SubJobTask) Clone

func (s *SubJobTask) Clone() *SubJobTask

type Task

type Task struct {
	ID          string            `json:"id,omitempty"`
	JobID       string            `json:"jobId,omitempty"`
	ParentID    string            `json:"parentId,omitempty"`
	Position    int               `json:"position,omitempty"`
	Name        string            `json:"name,omitempty"`
	Description string            `json:"description,omitempty"`
	State       TaskState         `json:"state,omitempty"`
	CreatedAt   *time.Time        `json:"createdAt,omitempty"`
	ScheduledAt *time.Time        `json:"scheduledAt,omitempty"`
	StartedAt   *time.Time        `json:"startedAt,omitempty"`
	CompletedAt *time.Time        `json:"completedAt,omitempty"`
	FailedAt    *time.Time        `json:"failedAt,omitempty"`
	CMD         []string          `json:"cmd,omitempty"`
	Entrypoint  []string          `json:"entrypoint,omitempty"`
	Run         string            `json:"run,omitempty"`
	Image       string            `json:"image,omitempty"`
	Registry    *Registry         `json:"registry,omitempty"`
	Env         map[string]string `json:"env,omitempty"`
	Files       map[string]string `json:"files,omitempty"`
	Queue       string            `json:"queue,omitempty"`
	Error       string            `json:"error,omitempty"`
	Pre         []*Task           `json:"pre,omitempty"`
	Post        []*Task           `json:"post,omitempty"`
	Volumes     []string          `json:"volumes,omitempty"`
	Networks    []string          `json:"networks,omitempty"`
	NodeID      string            `json:"nodeId,omitempty"`
	Retry       *TaskRetry        `json:"retry,omitempty"`
	Limits      *TaskLimits       `json:"limits,omitempty"`
	Timeout     string            `json:"timeout,omitempty"`
	Result      string            `json:"result,omitempty"`
	Var         string            `json:"var,omitempty"`
	If          string            `json:"if,omitempty"`
	Parallel    *ParallelTask     `json:"parallel,omitempty"`
	Each        *EachTask         `json:"each,omitempty"`
	SubJob      *SubJobTask       `json:"subjob,omitempty"`
}

Task is the basic unit of work that a Worker can handle.

func CloneTasks

func CloneTasks(tasks []*Task) []*Task

func (*Task) Clone

func (t *Task) Clone() *Task

type TaskLimits

type TaskLimits struct {
	CPUs   string `json:"cpus,omitempty"`
	Memory string `json:"memory,omitempty"`
}

func (*TaskLimits) Clone

func (l *TaskLimits) Clone() *TaskLimits

type TaskMetrics added in v0.1.5

type TaskMetrics struct {
	Running int `json:"running"`
}

type TaskRetry

type TaskRetry struct {
	Limit    int `json:"limit,omitempty"`
	Attempts int `json:"attempts,omitempty"`
}

func (*TaskRetry) Clone

func (r *TaskRetry) Clone() *TaskRetry

type TaskState

type TaskState string

State defines the list of states that a task can be in, at any given moment.

const (
	TaskStatePending   TaskState = "PENDING"
	TaskStateScheduled TaskState = "SCHEDULED"
	TaskStateRunning   TaskState = "RUNNING"
	TaskStateCancelled TaskState = "CANCELLED"
	TaskStateStopped   TaskState = "STOPPED"
	TaskStateCompleted TaskState = "COMPLETED"
	TaskStateFailed    TaskState = "FAILED"
)

func (TaskState) IsActive

func (s TaskState) IsActive() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL