Documentation ¶
Index ¶
- Constants
- Variables
- func EvalConditions(cond []Condition) error
- func NewContext(ctx context.Context, dag *DAG, finder Finder, requestID, logFile string) context.Context
- func WithDagContext(ctx context.Context, dagContext Context) context.Context
- type Condition
- type Context
- type ContinueOn
- type DAG
- type Env
- type Envs
- type ExecutorConfig
- type Finder
- type HandlerOn
- type HandlerType
- type MailConfig
- type MailOn
- type RepeatPolicy
- type RetryPolicy
- type SMTPConfig
- type Schedule
- type Step
- type SubWorkflow
- type SyncMap
Constants ¶
const ( EnvKeyLogPath = "DAG_EXECUTION_LOG_PATH" EnvKeySchedulerLogPath = "DAG_SCHEDULER_LOG_PATH" EnvKeyRequestID = "DAG_REQUEST_ID" )
Special environment variables.
const ExecutorTypeSubWorkflow = "subworkflow"
ExecutorTypeSubWorkflow is defined here in order to parse the `run` field in the DAG file.
Variables ¶
var Exts = []string{".yaml", ".yml"}
Exts is a list of supported file extensions for DAG files.
Functions ¶
func EvalConditions ¶
EvalConditions evaluates a list of conditions and checks the results. It returns an error if any of the conditions were not met.
Types ¶
type Condition ¶
type Condition struct { Condition string // Condition to evaluate Expected string // Expected value }
Condition contains a condition and the expected value. Conditions are evaluated and compared to the expected value. The condition can be a command substitution or an environment variable. The expected value must be a string without any substitutions.
type Context ¶
Context contains the current DAG and Finder.
func GetContext ¶
GetContext returns the DAG Context from the context. It returns an error if the context does not contain a DAG Context.
type ContinueOn ¶
type ContinueOn struct { Failure bool // Failure is the flag to continue to the next step on failure. Skipped bool // Skipped is the flag to continue to the next step on skipped. }
ContinueOn contains the conditions to continue on failure or skipped. Failure is the flag to continue to the next step on failure. Skipped is the flag to continue to the next step on skipped. A step can be skipped when the preconditions are not met. Then if the ContinueOn.Skip is set, the step will continue to the next step.
type DAG ¶
type DAG struct { // Location is the absolute path to the DAG file. Location string `json:"Location"` // Group is the group name of the DAG. This is optional. Group string `json:"Group"` // Name is the name of the DAG. The default is the filename without the // extension. Name string `json:"Name"` // Tags contains the list of tags for the DAG. optional. Tags []string `json:"Tags"` // Description is the description of the DAG. optional. Description string `json:"Description"` // Schedule configuration. // This is used by the scheduler to start / stop / restart the DAG. Schedule []Schedule `json:"Schedule"` StopSchedule []Schedule `json:"StopSchedule"` RestartSchedule []Schedule `json:"RestartSchedule"` // Env contains a list of environment variables to be set before running // the DAG. Env []string `json:"Env"` // LogDir is the directory where the logs are stored. // The actual log directory is LogDir + Name (with invalid characters // replaced with '_'). LogDir string `json:"LogDir"` // Parameters configuration. // The DAG definition contains only DefaultParams. Params are automatically // set by the DAG loader. // DefaultParams contains the default parameters to be passed to the DAG. DefaultParams string `json:"DefaultParams"` // Params contains the list of parameters to be passed to the DAG. Params []string `json:"Params"` // Commands configuration to be executed in the DAG. // Steps contains the list of steps in the DAG. Steps []Step `json:"Steps"` // HandlerOn contains the steps to be executed on different events. HandlerOn HandlerOn `json:"HandlerOn"` // Preconditions contains the conditions to be met before running the DAG. // If the conditions are not met, the whole DAG is skipped. Preconditions []Condition `json:"Preconditions"` // Mail notification configuration. // MailOn contains the conditions to send mail. // SMTP contains the SMTP configuration. // If you don't want to repeat the SMTP configuration for each DAG, you can // set it in the base configuration. SMTP *SMTPConfig `json:"Smtp"` // ErrorMail contains the mail configuration for error. ErrorMail *MailConfig `json:"ErrorMail"` // InfoMail contains the mail configuration for info. InfoMail *MailConfig `json:"InfoMail"` // MailOn contains the conditions to send mail. MailOn *MailOn `json:"MailOn"` // Timeout is a field to specify the maximum execution time of the DAG task Timeout time.Duration `json:"Timeout"` // Misc configuration for DAG execution. // Delay is the delay before starting the DAG. Delay time.Duration `json:"Delay"` // RestartWait is the time to wait before restarting the DAG. RestartWait time.Duration `json:"RestartWait"` // MaxActiveRuns specifies the maximum concurrent steps to run in an // execution. MaxActiveRuns int `json:"MaxActiveRuns"` // MaxCleanUpTime is the maximum time to wait for cleanup when the DAG is // stopped. MaxCleanUpTime time.Duration `json:"MaxCleanUpTime"` // HistRetentionDays is the number of days to keep the history. HistRetentionDays int `json:"HistRetentionDays"` }
DAG contains all information about a workflow.
func LoadMetadata ¶
LoadMetadata loads config from file and returns only the headline data.
func LoadWithoutEval ¶
LoadWithoutEval loads config from file without evaluating env variables.
func LoadYAML ¶
LoadYAML loads config from YAML data. It does not evaluate the environment variables. This is used to validate the YAML data.
type ExecutorConfig ¶
type ExecutorConfig struct { // Type represents one of the registered executor. // See `executor.Register` in `internal/executor/executor.go`. Type string Config map[string]any // Config contains executor specific configuration. }
ExecutorConfig contains the configuration for the executor.
type Finder ¶
Finder finds a DAG by name. This is used to find the DAG when a node references another DAG.
type HandlerOn ¶
type HandlerOn struct { Failure *Step `json:"Failure"` Success *Step `json:"Success"` Cancel *Step `json:"Cancel"` Exit *Step `json:"Exit"` }
HandlerOn contains the steps to be executed on different events in the DAG.
type HandlerType ¶
type HandlerType string
HandlerType is the type of the handler.
const ( HandlerOnSuccess HandlerType = "onSuccess" HandlerOnFailure HandlerType = "onFailure" HandlerOnCancel HandlerType = "onCancel" HandlerOnExit HandlerType = "onExit" )
func ParseHandlerType ¶
func ParseHandlerType(s string) HandlerType
ParseHandlerType converts a string to a HandlerType.
func (HandlerType) String ¶
func (e HandlerType) String() string
type MailConfig ¶
type MailConfig struct { From string `json:"From"` To string `json:"To"` // Prefix is the prefix for the subject of the mail. Prefix string `json:"Prefix"` // AttachLogs is the flag to attach the logs in the mail. AttachLogs bool `json:"AttachLogs"` }
MailConfig contains the mail configuration.
type RepeatPolicy ¶
type RepeatPolicy struct { Repeat bool // Repeat determines if the step should be repeated. Interval time.Duration // Interval is the time to wait between repeats. }
RepeatPolicy contains the repeat policy for a step.
type RetryPolicy ¶
type RetryPolicy struct { Limit int // Limit is the number of retries allowed. Interval time.Duration // Interval is the time to wait between retries. }
RetryPolicy contains the retry policy for a step.
type SMTPConfig ¶
type SMTPConfig struct { Host string `json:"Host"` Port string `json:"Port"` Username string `json:"Username"` Password string `json:"Password"` }
SMTPConfig contains the SMTP configuration.
type Schedule ¶
type Schedule struct { // Expression is the cron expression. Expression string `json:"Expression"` // Parsed is the parsed cron schedule. Parsed cron.Schedule `json:"-"` }
Schedule contains the cron expression and the parsed cron schedule.
type Step ¶
type Step struct { // Name is the name of the step. Name string `json:"Name"` // Description is the description of the step. Description string `json:"Description,omitempty"` // Variables contains the list of variables to be set. Variables []string `json:"Variables,omitempty"` // OutputVariables is a structure to store the output variables for the // following steps. OutputVariables *SyncMap `json:"OutputVariables,omitempty"` // Dir is the working directory for the step. Dir string `json:"Dir,omitempty"` // ExecutorConfig contains the configuration for the executor. ExecutorConfig ExecutorConfig `json:"ExecutorConfig,omitempty"` // CmdWithArgs is the command with arguments. CmdWithArgs string `json:"CmdWithArgs,omitempty"` // Command specifies only the command without arguments. Command string `json:"Command,omitempty"` // Script is the script to be executed. Script string `json:"Script,omitempty"` // Args contains the arguments for the command. Args []string `json:"Args,omitempty"` // Stdout is the file to store the standard output. Stdout string `json:"Stdout,omitempty"` // Stderr is the file to store the standard error. Stderr string `json:"Stderr,omitempty"` // Output is the variable name to store the output. Output string `json:"Output,omitempty"` // Depends contains the list of step names to depend on. Depends []string `json:"Depends,omitempty"` // ContinueOn contains the conditions to continue on failure or skipped. ContinueOn ContinueOn `json:"ContinueOn,omitempty"` // RetryPolicy contains the retry policy for the step. RetryPolicy *RetryPolicy `json:"RetryPolicy,omitempty"` // RepeatPolicy contains the repeat policy for the step. RepeatPolicy RepeatPolicy `json:"RepeatPolicy,omitempty"` // MailOnError is the flag to send mail on error. MailOnError bool `json:"MailOnError,omitempty"` // Preconditions contains the conditions to be met before running the step. Preconditions []Condition `json:"Preconditions,omitempty"` // SignalOnStop is the signal to send on stop. SignalOnStop string `json:"SignalOnStop,omitempty"` // SubWorkflow contains the information about a sub DAG to be executed. SubWorkflow *SubWorkflow `json:"SubWorkflow,omitempty"` }
Step contains the runtime information for a step in a DAG. A step is created from parsing a DAG file written in YAML. It marshal/unmarshal to/from JSON when it is saved in the execution history.
type SubWorkflow ¶
SubWorkflow contains information about a sub DAG to be executed.