Documentation ¶
Index ¶
- Constants
- Variables
- func CmdWrapper(RunAs string, UseSHELL bool, CMD string) (shell string, args []string)
- func ContainsIntInIntSlice(s []int, e int) bool
- func DefaultPath() string
- func DoApi(ctx context.Context, params map[string]interface{}, stage string) error
- func DoApiCall(ctx context.Context, params map[string]string, stage string) (error, []map[string]interface{})
- func GetAPIParamsFromSection(stage string) map[string]string
- func GetParamsFromSection(stage string, param string) map[string]string
- func GetSliceParamsFromSection(stage string, param string) []string
- func IsTerminalStatus(status string) bool
- func MergeEnvVars(CmdENVs []string) (uniqueMergedENV []string)
- func NewRemoteApiRequest(ctx context.Context, section string, method string, url string) (error, []map[string]interface{})
- func ReinitializeConfig() error
- func StoreKey(Id string, RunUID string, ExtraRunUID string) string
- type ContextKey
- type Job
- func (j *Job) AddToContext(key interface{}, value interface{})
- func (j *Job) AppendLogStream(logStream []string) (err error)
- func (j *Job) Cancel() error
- func (j *Job) Failed() error
- func (j *Job) Finish() error
- func (j *Job) FlushSteamsBuffer() error
- func (j *Job) GetAPIParams(stage string) map[string]string
- func (j *Job) GetContext() *context.Context
- func (j *Job) GetLogger() *logrus.Entry
- func (j *Job) GetParams() map[string]interface{}
- func (j *Job) GetParamsWithResend(stage string) map[string]interface{}
- func (j *Job) GetPreviousStatus() string
- func (j *Job) GetRawParams() []map[string]interface{}
- func (j *Job) GetStatus() string
- func (j *Job) GetTTR() uint64
- func (j *Job) GetTTRDuration() time.Duration
- func (j *Job) HitTimeout() bool
- func (j *Job) IsStuck() bool
- func (j *Job) IsTerminal() bool
- func (j *Job) PutInTerminal()
- func (j *Job) Run() error
- func (j *Job) SetContext(ctx context.Context)
- func (j *Job) StoreKey() string
- func (j *Job) Timeout() error
- func (j *Job) TimeoutWithCancel(duration time.Duration) error
- type Jobber
- type Process
- type Registry
- type Tree
Constants ¶
const ( JOB_STATUS_PENDING = "PENDING" JOB_STATUS_IN_PROGRESS = "RUNNING" JOB_STATUS_SUCCESS = "SUCCESS" JOB_STATUS_RUN_OK = "RUN_OK" JOB_STATUS_RUN_FAILED = "RUN_FAILED" JOB_STATUS_ERROR = "ERROR" JOB_STATUS_CANCELED = "CANCELED" JOB_STATUS_TIMEOUT = "TIMEOUT" )
Variables ¶
var ( ErrFailedSendRequest = errors.New("Failed to send request") ErrInvalidNegativeExitCode = errors.New("Invalid negative exit code") ErrJobCancelled = errors.New("Job cancelled") ErrJobTimeout = errors.New("Job timeout") ErrJobGotSignal = errors.New("Job got sgnal") ErrorJobNotInWaitStatus = errors.New("Process state is not in WaitStatus") // FetchNewJobAPIURL is URL for pulling new jobs FetchNewJobAPIURL string // FetchNewJobAPIMethod is Http METHOD for fetch Jobs API FetchNewJobAPIMethod = "POST" // FetchNewJobAPIParams is used in eqch requesto for a new job FetchNewJobAPIParams = make(map[string]string) // StreamingAPIURL is URL for uploading log steams StreamingAPIURL string // StreamingAPIMethod is Http METHOD for streaming log API StreamingAPIMethod = "POST" )
Functions ¶
func CmdWrapper ¶ added in v0.1.2
CmdWrapper wraps command.
func ContainsIntInIntSlice ¶ added in v0.3.3
ContainsIntInIntSlice check whether int in the slice
func DoApiCall ¶
func DoApiCall(ctx context.Context, params map[string]string, stage string) (error, []map[string]interface{})
DoApiCall for the jobs stages TODO: add custom headers
func GetAPIParamsFromSection ¶
GetAPIParamsFromSection of the configuration
func GetParamsFromSection ¶
GetParamsFromSection from stage & with sub-param Example stage = `jobs.logstream` and param `params` in the following config: with `GetParamsFromSection("jobs.logstream", "params")` var yamlExample = []byte(` jobs:
logstream: &update url: "localhost" method: post params: "job_uid": "job_uid" "run_uid": "1"
func GetSliceParamsFromSection ¶
GetSliceParamsFromSection from stage & with sub-param Example stage = `jobs.logstream` and param `params` in the following config: with `GetParamsFromSection("jobs.logstream", "resend-params")` var yamlExample = []byte(` jobs:
logstream: &update url: "localhost" method: post resend-params: - "job_uid" - "run_uid" - "extra_run_id"
func IsTerminalStatus ¶
IsTerminalStatus returns true if status is terminal: - Failed - Canceled - Successful
func MergeEnvVars ¶ added in v0.1.2
MergeEnvVars merges enviroments variables.
func NewRemoteApiRequest ¶
func NewRemoteApiRequest(ctx context.Context, section string, method string, url string) (error, []map[string]interface{})
NewRemoteApiRequest perform request to API.
func ReinitializeConfig ¶
func ReinitializeConfig() error
ReinitializeConfig on load or config file changes
Types ¶
type ContextKey ¶ added in v0.4.0
type ContextKey string
const ( CtxKeyRequestTimeout ContextKey = "ctx_req_timeout" CtxKeyRequestWorker ContextKey = "ctx_req_worker_id" )
type Job ¶
type Job struct { Id string // Identification for Job RunUID string // Running identification ExtraRunUID string // Extra identification ExtraSendParams map[string]string Priority int64 // Priority for a Job CreateAt time.Time // When Job was created StartAt time.Time // When command started LastActivityAt time.Time // When job metadata last changed StoppedAt time.Time // When job stopped CMDExecutionStoppedAt time.Time // When execution stopped PreviousStatus string // Previous Status Status string // Currently status MaxAttempts int // Absolute max num of attempts. MaxFails int // Absolute max number of failures. TTR uint64 // Time-to-run in Millisecond CMD string // Command CmdENV []string // Command RunAs string // RunAs defines user ResetBackPressureTimer time.Duration // how often we will dump the logs StreamInterval time.Duration ExitCode int // Exit code // params got from your API RawParams []map[string]interface{} // If we should use shell and wrap the command UseSHELL bool // contains filtered or unexported fields }
Job public structure
func NewTestJob ¶
NewTestJob return Job with defaults for test
func (*Job) AddToContext ¶ added in v0.4.0
func (j *Job) AddToContext(key interface{}, value interface{})
AddToContext for job in case there is time limit for example
func (*Job) AppendLogStream ¶
Appends log stream to the buffer. The content of the buffer will be uploaded to API after:
- high volume log producers - after j.elements
- after buffer is full
- after slow log interval
func (*Job) FlushSteamsBuffer ¶
FlushSteamsBuffer - empty current job's streams lines
func (*Job) GetAPIParams ¶
GetAPIParams for stage from all previous calls
func (*Job) GetContext ¶ added in v0.4.0
GetContext of the job
func (*Job) GetParamsWithResend ¶ added in v0.4.0
func (*Job) GetPreviousStatus ¶ added in v0.4.0
GetPreviousStatus returns Previous Status without JOB_STATUS_RUN_OK and JOB_STATUS_RUN_FAILED
func (*Job) GetRawParams ¶
GetRawParams from all previous calls
func (*Job) GetTTRDuration ¶ added in v0.4.4
func (*Job) HitTimeout ¶ added in v0.4.0
HitTimeout returns true if job hit timeout always false if TTR is 0
func (*Job) IsStuck ¶ added in v0.4.4
IsStuck returns true if job in terminal state for more then TimeoutJobsAfter5MinInTerminalState
func (*Job) IsTerminal ¶ added in v0.4.4
IsTerminal returns true in case job entered final state
func (*Job) PutInTerminal ¶ added in v0.4.4
func (j *Job) PutInTerminal()
PutInTerminal marks job as in terminal status
func (*Job) SetContext ¶
SetContext for job in case there is time limit for example
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry holds all Job Records.
func (*Registry) Cleanup ¶
Cleanup by job TTR. Return number of cleaned jobs. TODO: Consider new timeout status & flow
- Add batch
func (*Registry) GracefullyShutdown ¶ added in v0.3.3
GracefullyShutdown is used when we stop the Registry. cancel all running & pending job return false if we can't cancel any job