model

package
v0.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2021 License: Apache-2.0 Imports: 25 Imported by: 1

Documentation

Index

Constants

View Source
const (
	JOB_STATUS_PENDING     = "PENDING"
	JOB_STATUS_IN_PROGRESS = "RUNNING"
	JOB_STATUS_SUCCESS     = "SUCCESS"
	JOB_STATUS_RUN_OK      = "RUN_OK"
	JOB_STATUS_RUN_FAILED  = "RUN_FAILED"
	JOB_STATUS_ERROR       = "ERROR"
	JOB_STATUS_CANCELED    = "CANCELED"
	JOB_STATUS_TIMEOUT     = "TIMEOUT"
)

Variables

View Source
var (
	ErrFailedSendRequest       = errors.New("Failed to send request")
	ErrInvalidNegativeExitCode = errors.New("Invalid negative exit code")
	ErrJobCancelled            = errors.New("Job cancelled")
	ErrJobTimeout              = errors.New("Job timeout")
	ErrJobGotSignal            = errors.New("Job got sgnal")
	ErrorJobNotInWaitStatus    = errors.New("Process state is not in WaitStatus")

	// FetchNewJobAPIURL is URL for pulling new jobs
	FetchNewJobAPIURL string
	// FetchNewJobAPIMethod is Http METHOD for fetch Jobs API
	FetchNewJobAPIMethod = "POST"
	// FetchNewJobAPIParams is used in eqch requesto for a new job
	FetchNewJobAPIParams = make(map[string]string)

	// StreamingAPIURL is URL for uploading log steams
	StreamingAPIURL string
	// StreamingAPIMethod is Http METHOD for streaming log API
	StreamingAPIMethod = "POST"
)

Functions

func CmdWrapper added in v0.1.2

func CmdWrapper(RunAs string, UseSHELL bool, CMD string) (shell string, args []string)

CmdWrapper wraps command.

func ContainsIntInIntSlice added in v0.3.3

func ContainsIntInIntSlice(s []int, e int) bool

ContainsIntInIntSlice check whether int in the slice

func DefaultPath added in v0.1.2

func DefaultPath() string

DefaultPath returns PATH variable

func DoApi added in v0.4.0

func DoApi(ctx context.Context, params map[string]interface{}, stage string) error

DoApi REST calls via communicators

func DoApiCall

func DoApiCall(ctx context.Context, params map[string]string, stage string) (error, []map[string]interface{})

DoApiCall for the jobs stages TODO: add custom headers

func GetAPIParamsFromSection

func GetAPIParamsFromSection(stage string) map[string]string

GetAPIParamsFromSection of the configuration

func GetParamsFromSection

func GetParamsFromSection(stage string, param string) map[string]string

GetParamsFromSection from stage & with sub-param Example stage = `jobs.logstream` and param `params` in the following config: with `GetParamsFromSection("jobs.logstream", "params")` var yamlExample = []byte(` jobs:

logstream: &update
  url: "localhost"
  method: post
  params:
    "job_uid": "job_uid"
    "run_uid": "1"

func GetSliceParamsFromSection

func GetSliceParamsFromSection(stage string, param string) []string

GetSliceParamsFromSection from stage & with sub-param Example stage = `jobs.logstream` and param `params` in the following config: with `GetParamsFromSection("jobs.logstream", "resend-params")` var yamlExample = []byte(` jobs:

logstream: &update
  url: "localhost"
  method: post
  resend-params:
   - "job_uid"
   - "run_uid"
   - "extra_run_id"

func IsTerminalStatus

func IsTerminalStatus(status string) bool

IsTerminalStatus returns true if status is terminal: - Failed - Canceled - Successful

func MergeEnvVars added in v0.1.2

func MergeEnvVars(CmdENVs []string) (uniqueMergedENV []string)

MergeEnvVars merges enviroments variables.

func NewRemoteApiRequest

func NewRemoteApiRequest(ctx context.Context, section string, method string, url string) (error, []map[string]interface{})

NewRemoteApiRequest perform request to API.

func ReinitializeConfig

func ReinitializeConfig() error

ReinitializeConfig on load or config file changes

func StoreKey

func StoreKey(Id string, RunUID string, ExtraRunUID string) string

StoreKey returns Job unique store key

Types

type ContextKey added in v0.4.0

type ContextKey string
const (
	CtxKeyRequestTimeout ContextKey = "ctx_req_timeout"
	CtxKeyRequestWorker  ContextKey = "ctx_req_worker_id"
)

type Job

type Job struct {
	Id                     string // Identification for Job
	RunUID                 string // Running identification
	ExtraRunUID            string // Extra identification
	ExtraSendParams        map[string]string
	Priority               int64         // Priority for a Job
	CreateAt               time.Time     // When Job was created
	StartAt                time.Time     // When command started
	LastActivityAt         time.Time     // When job metadata last changed
	StoppedAt              time.Time     // When job stopped
	CMDExecutionStoppedAt  time.Time     // When execution stopped
	PreviousStatus         string        // Previous Status
	Status                 string        // Currently status
	MaxAttempts            int           // Absolute max num of attempts.
	MaxFails               int           // Absolute max number of failures.
	TTR                    uint64        // Time-to-run in Millisecond
	CMD                    string        // Command
	CmdENV                 []string      // Command
	RunAs                  string        // RunAs defines user
	ResetBackPressureTimer time.Duration // how often we will dump the logs
	StreamInterval         time.Duration

	ExitCode int // Exit code

	// params got from your API
	RawParams []map[string]interface{}

	// If we should use shell and wrap the command
	UseSHELL bool
	// contains filtered or unexported fields
}

Job public structure

func NewJob

func NewJob(id string, cmd string) *Job

NewJob return Job with defaults

func NewTestJob

func NewTestJob(id string, cmd string) *Job

NewTestJob return Job with defaults for test

func (*Job) AddToContext added in v0.4.0

func (j *Job) AddToContext(key interface{}, value interface{})

AddToContext for job in case there is time limit for example

func (*Job) AppendLogStream

func (j *Job) AppendLogStream(logStream []string) (err error)

Appends log stream to the buffer. The content of the buffer will be uploaded to API after:

  • high volume log producers - after j.elements
  • after buffer is full
  • after slow log interval

func (*Job) Cancel

func (j *Job) Cancel() error

Cancel job It triggers an update for the your API if it's configured

func (*Job) Failed

func (j *Job) Failed() error

Failed job flow update your API

func (*Job) Finish

func (j *Job) Finish() error

Finish is triggered when execution is successful.

func (*Job) FlushSteamsBuffer

func (j *Job) FlushSteamsBuffer() error

FlushSteamsBuffer - empty current job's streams lines

func (*Job) GetAPIParams

func (j *Job) GetAPIParams(stage string) map[string]string

GetAPIParams for stage from all previous calls

func (*Job) GetContext added in v0.4.0

func (j *Job) GetContext() *context.Context

GetContext of the job

func (*Job) GetLogger added in v0.4.0

func (j *Job) GetLogger() *logrus.Entry

GetLogger from job context

func (*Job) GetParams added in v0.4.0

func (j *Job) GetParams() map[string]interface{}

func (*Job) GetParamsWithResend added in v0.4.0

func (j *Job) GetParamsWithResend(stage string) map[string]interface{}

func (*Job) GetPreviousStatus added in v0.4.0

func (j *Job) GetPreviousStatus() string

GetPreviousStatus returns Previous Status without JOB_STATUS_RUN_OK and JOB_STATUS_RUN_FAILED

func (*Job) GetRawParams

func (j *Job) GetRawParams() []map[string]interface{}

GetRawParams from all previous calls

func (*Job) GetStatus added in v0.1.2

func (j *Job) GetStatus() string

GetStatus get job status.

func (*Job) GetTTR added in v0.4.4

func (j *Job) GetTTR() uint64

func (*Job) GetTTRDuration added in v0.4.4

func (j *Job) GetTTRDuration() time.Duration

func (*Job) HitTimeout added in v0.4.0

func (j *Job) HitTimeout() bool

HitTimeout returns true if job hit timeout always false if TTR is 0

func (*Job) IsStuck added in v0.4.4

func (j *Job) IsStuck() bool

IsStuck returns true if job in terminal state for more then TimeoutJobsAfter5MinInTerminalState

func (*Job) IsTerminal added in v0.4.4

func (j *Job) IsTerminal() bool

IsTerminal returns true in case job entered final state

func (*Job) PutInTerminal added in v0.4.4

func (j *Job) PutInTerminal()

PutInTerminal marks job as in terminal status

func (*Job) Run

func (j *Job) Run() error

Run job return error in case we have exit code greater then 0

func (*Job) SetContext

func (j *Job) SetContext(ctx context.Context)

SetContext for job in case there is time limit for example

func (*Job) StoreKey

func (j *Job) StoreKey() string

StoreKey returns StoreKey

func (*Job) Timeout added in v0.4.0

func (j *Job) Timeout() error

Timeout job flow update your API

func (*Job) TimeoutWithCancel added in v0.4.4

func (j *Job) TimeoutWithCancel(duration time.Duration) error

type Jobber

type Jobber interface {
	Run() error
	Cancel() error
	Finish() error
	Timeout() error
}

Jobber defines a job interface.

type Process added in v0.3.3

type Process struct {
	Stat     ps.Process
	Children []int
}

Process stores information about a UNIX process.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry holds all Job Records.

func NewRegistry

func NewRegistry() *Registry

NewRegistry returns a new Registry.

func (*Registry) Add

func (r *Registry) Add(rec *Job) bool

Add a job. Returns false on duplicate or invalid job id.

func (*Registry) Cleanup

func (r *Registry) Cleanup() (num int)

Cleanup by job TTR. Return number of cleaned jobs. TODO: Consider new timeout status & flow

  • Add batch

func (*Registry) Delete

func (r *Registry) Delete(id string) bool

Delete a job by job ID. Return false if record does not exist.

func (*Registry) GracefullyShutdown added in v0.3.3

func (r *Registry) GracefullyShutdown() bool

GracefullyShutdown is used when we stop the Registry. cancel all running & pending job return false if we can't cancel any job

func (*Registry) Len

func (r *Registry) Len() int

Len returns length of registry.

func (*Registry) Map added in v0.4.0

func (r *Registry) Map(f func(string, *Job))

Map function

func (*Registry) Record

func (r *Registry) Record(jid string) (*Job, bool)

Record fetch job by Job ID. Follows comma ok idiom

type Tree added in v0.3.3

type Tree struct {
	Procs map[int]Process
}

Tree is a tree of processes.

func NewProcessTree added in v0.3.3

func NewProcessTree() (*Tree, error)

Returns new Processes Tree

func (*Tree) Get added in v0.3.3

func (t *Tree) Get(pid int) []int

Get all children by pid

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL