Documentation
¶
Index ¶
- Constants
- Variables
- func EvalConditions(ctx context.Context, cond []Condition) error
- func EvalStringFields[T any](stepContext StepContext, obj T) (T, error)
- func IsContext(ctx context.Context) bool
- func IsStepContext(ctx context.Context) bool
- func NewContext(ctx context.Context, dag *DAG, client DBClient, requestID, logFile string, ...) context.Context
- func RenderTemplate(tmpl string, data map[string]any) (string, error)
- func WithContext(ctx context.Context, dagContext Context) context.Context
- func WithStepContext(ctx context.Context, stepContext StepContext) context.Context
- type BuildContext
- type BuildOpts
- type BuilderFn
- type Condition
- type Context
- func (c Context) AllEnvs() []string
- func (c Context) ApplyEnvs()
- func (c Context) EvalString(s string, opts ...cmdutil.EvalOption) (string, error)
- func (c Context) GetDAGByName(name string) (*DAG, error)
- func (c Context) GetResult(name, requestID string) (*Status, error)
- func (c Context) WithEnv(key, value string) Context
- type ContinueOn
- type DAG
- func Load(ctx context.Context, dag string, opts ...LoadOption) (*DAG, error)
- func LoadBaseConfig(ctx BuildContext, file string) (*DAG, error)
- func LoadYAML(ctx context.Context, data []byte, opts ...LoadOption) (*DAG, error)
- func LoadYAMLWithOpts(ctx context.Context, data []byte, opts BuildOpts) (*DAG, error)
- type DBClient
- type ErrorList
- type ExecutorConfig
- type HandlerOn
- type HandlerType
- type LoadError
- type LoadOption
- type LoadOptions
- type MailConfig
- type MailOn
- type RepeatPolicy
- type RetryPolicy
- type SMTPConfig
- type Schedule
- type Status
- type Step
- type StepBuilderFn
- type StepContext
- func (c StepContext) AllEnvs() []string
- func (c StepContext) EvalBool(value any) (bool, error)
- func (c StepContext) EvalString(s string, opts ...cmdutil.EvalOption) (string, error)
- func (c StepContext) LoadOutputVariables(vars *SyncMap)
- func (c StepContext) MailerConfig() (mailer.Config, error)
- func (c StepContext) WithEnv(key, value string) StepContext
- type SubWorkflow
- type SyncMap
Constants ¶
const ( EnvKeyLogPath = "DAG_EXECUTION_LOG_PATH" EnvKeySchedulerLogPath = "DAG_SCHEDULER_LOG_PATH" // Deprecated in favor of EnvKeyDAGStepLogPath EnvKeyRequestID = "DAG_REQUEST_ID" EnvKeyDAGName = "DAG_NAME" EnvKeyDAGStepName = "DAG_STEP_NAME" EnvKeyDAGStepLogPath = "DAG_STEP_LOG_PATH" )
Special environment variables.
const ExecutorTypeSubWorkflow = "subworkflow"
ExecutorTypeSubWorkflow is defined here in order to parse the `run` field in the DAG file.
const SystemVariablePrefix = "DAGU_"
Variables ¶
var ( ErrInvalidSchedule = errors.New("invalid schedule") ErrScheduleMustBeStringOrArray = errors.New("schedule must be a string or an array of strings") ErrInvalidScheduleType = errors.New("invalid schedule type") ErrInvalidKeyType = errors.New("invalid key type") ErrExecutorConfigMustBeString = errors.New("executor config key must be string") ErrDuplicateFunction = errors.New("duplicate function") ErrFuncParamsMismatch = errors.New("func params and args given to func command do not match") ErrStepNameRequired = errors.New("step name must be specified") ErrStepCommandIsRequired = errors.New("step command is required") ErrStepCommandIsEmpty = errors.New("step command is empty") ErrStepCommandMustBeArrayOrString = errors.New("step command must be an array of strings or a string") ErrInvalidParamValue = errors.New("invalid parameter value") ErrCallFunctionNotFound = errors.New("call must specify a functions that exists") ErrNumberOfParamsMismatch = errors.New("the number of parameters defined in the function does not match the number of parameters given") ErrRequiredParameterNotFound = errors.New("required parameter not found") ErrScheduleKeyMustBeString = errors.New("schedule key must be a string") ErrInvalidSignal = errors.New("invalid signal") ErrInvalidEnvValue = errors.New("invalid value for env") ErrArgsMustBeConvertibleToIntOrString = errors.New("args must be convertible to either int or string") ErrExecutorTypeMustBeString = errors.New("executor.type value must be string") ErrExecutorConfigValueMustBeMap = errors.New("executor.config value must be a map") ErrExecutorHasInvalidKey = errors.New("executor has invalid key") ErrExecutorConfigMustBeStringOrMap = errors.New("executor config must be string or map") ErrDotenvMustBeStringOrArray = errors.New("dotenv must be a string or an array of strings") ErrPreconditionMustBeArrayOrString = errors.New("precondition must be a string or an array of strings") ErrPreconditionKeyMustBeString = errors.New("precondition key must be a string") ErrPreconditionValueMustBeString = errors.New("precondition value must be a string") ErrPreconditionHasInvalidKey = errors.New("precondition has invalid key") ErrContinueOnOutputMustBeStringOrArray = errors.New("continueOn.Output must be a string or an array of strings") ErrContinueOnExitCodeMustBeIntOrArray = errors.New("continueOn.ExitCode must be an int or an array of ints") ErrDependsMustBeStringOrArray = errors.New("depends must be a string or an array of strings") ErrStepsMustBeArrayOrMap = errors.New("steps must be an array or a map") )
errors on building a DAG.
var ErrConditionNotMet = fmt.Errorf("condition was not met")
Functions ¶
func EvalConditions ¶
EvalConditions evaluates a list of conditions and checks the results. It returns an error if any of the conditions were not met.
func EvalStringFields ¶
func EvalStringFields[T any](stepContext StepContext, obj T) (T, error)
func IsStepContext ¶
func NewContext ¶
func RenderTemplate ¶ added in v1.16.1
RenderTemplate replaces the template variables in the given template string
func WithStepContext ¶
func WithStepContext(ctx context.Context, stepContext StepContext) context.Context
Types ¶
type BuildContext ¶
type BuildContext struct {
// contains filtered or unexported fields
}
BuildContext is the context for building a DAG.
func (BuildContext) WithFile ¶
func (c BuildContext) WithFile(file string) BuildContext
func (BuildContext) WithOpts ¶
func (c BuildContext) WithOpts(opts BuildOpts) BuildContext
type BuildOpts ¶ added in v1.16.1
type BuildOpts struct { // Base specifies the Base configuration file for the DAG. Base string // OnlyMetadata specifies whether to build only the metadata. OnlyMetadata bool // Parameters specifies the Parameters to the DAG. // Parameters are used to override the default Parameters in the DAG. Parameters string // ParametersList specifies the parameters to the DAG. ParametersList []string // NoEval specifies whether to evaluate dynamic fields. NoEval bool }
BuildOpts is used to control the behavior of the builder.
type BuilderFn ¶
type BuilderFn func(ctx BuildContext, spec *definition, dag *DAG) error
BuilderFn is a function that builds a part of the DAG.
type Condition ¶
type Condition struct { Command string `json:"Command,omitempty"` // Command to evaluate Condition string `json:"Condition,omitempty"` // Condition to evaluate Expected string `json:"Expected,omitempty"` // Expected value }
Condition contains a condition and the expected value. Conditions are evaluated and compared to the expected value. The condition can be a command substitution or an environment variable. The expected value must be a string without any substitutions.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
func GetContext ¶
func (Context) EvalString ¶
type ContinueOn ¶
type ContinueOn struct { Failure bool `json:"Failure,omitempty"` // Failure is the flag to continue to the next step on failure. Skipped bool `json:"Skipped,omitempty"` // Skipped is the flag to continue to the next step on skipped. ExitCode []int `json:"ExitCode,omitempty"` // ExitCode is the list of exit codes to continue to the next step. Output []string `json:"Output,omitempty"` // Output is the list of output (stdout/stderr) to continue to the next step. MarkSuccess bool `json:"MarkSuccess,omitempty"` // MarkSuccess is the flag to mark the step as success when the condition is met. }
ContinueOn contains the conditions to continue on failure or skipped. Failure is the flag to continue to the next step on failure. Skipped is the flag to continue to the next step on skipped. A step can be skipped when the preconditions are not met. Then if the ContinueOn.Skip is set, the step will continue to the next step.
type DAG ¶
type DAG struct { // Location is the absolute path to the DAG file. Location string `json:"Location"` // Group is the group name of the DAG. This is optional. Group string `json:"Group"` // Name is the name of the DAG. The default is the filename without the extension. Name string `json:"Name"` // Dotenv is the path to the dotenv file. This is optional. Dotenv []string `json:"Dotenv"` // Tags contains the list of tags for the DAG. This is optional. Tags []string `json:"Tags"` // Description is the description of the DAG. This is optional. Description string `json:"Description"` // Schedule configuration for starting, stopping, and restarting the DAG. Schedule []Schedule `json:"Schedule"` StopSchedule []Schedule `json:"StopSchedule"` RestartSchedule []Schedule `json:"RestartSchedule"` // SkipIfSuccessful indicates whether to skip the DAG if it was successful previously. // E.g., when the DAG has already been executed manually before the scheduled time. SkipIfSuccessful bool `json:"SkipIfSuccessful"` // Env contains a list of environment variables to be set before running the DAG. Env []string `json:"Env"` // LogDir is the directory where the logs are stored. LogDir string `json:"LogDir"` // DefaultParams contains the default parameters to be passed to the DAG. DefaultParams string `json:"DefaultParams"` // Params contains the list of parameters to be passed to the DAG. Params []string `json:"Params"` // Steps contains the list of steps in the DAG. Steps []Step `json:"Steps"` // HandlerOn contains the steps to be executed on different events. HandlerOn HandlerOn `json:"HandlerOn"` // Preconditions contains the conditions to be met before running the DAG. Preconditions []Condition `json:"Preconditions"` // SMTP contains the SMTP configuration. SMTP *SMTPConfig `json:"Smtp"` // ErrorMail contains the mail configuration for errors. ErrorMail *MailConfig `json:"ErrorMail"` // InfoMail contains the mail configuration for informational messages. InfoMail *MailConfig `json:"InfoMail"` // MailOn contains the conditions to send mail. MailOn *MailOn `json:"MailOn"` // Timeout specifies the maximum execution time of the DAG task. Timeout time.Duration `json:"Timeout"` // Delay is the delay before starting the DAG. Delay time.Duration `json:"Delay"` // RestartWait is the time to wait before restarting the DAG. RestartWait time.Duration `json:"RestartWait"` // MaxActiveRuns specifies the maximum concurrent steps to run in an execution. MaxActiveRuns int `json:"MaxActiveRuns"` // MaxCleanUpTime is the maximum time to wait for cleanup when the DAG is stopped. MaxCleanUpTime time.Duration `json:"MaxCleanUpTime"` // HistRetentionDays is the number of days to keep the history. HistRetentionDays int `json:"HistRetentionDays"` }
DAG contains all information about a workflow.
func LoadBaseConfig ¶ added in v1.16.1
func LoadBaseConfig(ctx BuildContext, file string) (*DAG, error)
LoadBaseConfig loads the global configuration from the given file. The global configuration can be overridden by the DAG configuration.
func LoadYAMLWithOpts ¶ added in v1.16.1
LoadYAMLWithOpts loads the DAG configuration from YAML data.
type DBClient ¶
type DBClient interface { GetDAG(ctx context.Context, name string) (*DAG, error) GetStatus(ctx context.Context, name string, requestID string) (*Status, error) }
DBClient gets a result of a DAG execution.
type ErrorList ¶ added in v1.16.1
type ErrorList []error
ErrorList is just a list of errors. It is used to collect multiple errors in building a DAG.
type ExecutorConfig ¶
type ExecutorConfig struct { // Type represents one of the registered executors. // See `executor.Register` in `internal/executor/executor.go`. Type string `json:"Type,omitempty"` Config map[string]any `json:"Config,omitempty"` // Config contains executor-specific configuration. }
ExecutorConfig contains the configuration for the executor.
func (ExecutorConfig) IsCommand ¶
func (e ExecutorConfig) IsCommand() bool
IsCommand returns true if the executor is a command
type HandlerOn ¶
type HandlerOn struct { Failure *Step `json:"Failure"` Success *Step `json:"Success"` Cancel *Step `json:"Cancel"` Exit *Step `json:"Exit"` }
HandlerOn contains the steps to be executed on different events in the DAG.
type HandlerType ¶
type HandlerType string
HandlerType is the type of the handler.
const ( HandlerOnSuccess HandlerType = "onSuccess" HandlerOnFailure HandlerType = "onFailure" HandlerOnCancel HandlerType = "onCancel" HandlerOnExit HandlerType = "onExit" )
func ParseHandlerType ¶
func ParseHandlerType(s string) HandlerType
ParseHandlerType converts a string to a HandlerType.
func (HandlerType) String ¶
func (h HandlerType) String() string
type LoadOption ¶
type LoadOption func(*LoadOptions)
LoadOption is a function type for setting LoadOptions.
func OnlyMetadata ¶
func OnlyMetadata() LoadOption
OnlyMetadata sets the flag to load only metadata.
func WithBaseConfig ¶
func WithBaseConfig(baseDAG string) LoadOption
WithBaseConfig sets the base DAG configuration file.
func WithParams ¶
func WithParams(params any) LoadOption
WithParams sets the parameters for the DAG.
func WithoutEval ¶
func WithoutEval() LoadOption
WithoutEval disables the evaluation of dynamic fields.
type LoadOptions ¶
type LoadOptions struct {
// contains filtered or unexported fields
}
LoadOptions contains options for loading a DAG.
type MailConfig ¶
type MailConfig struct { From string `json:"From"` To string `json:"To"` Prefix string `json:"Prefix"` AttachLogs bool `json:"AttachLogs"` }
MailConfig contains the mail configuration.
type RepeatPolicy ¶
type RepeatPolicy struct { // Repeat determines if the step should be repeated. Repeat bool `json:"Repeat,omitempty"` // Interval is the time to wait between repeats. Interval time.Duration `json:"Interval,omitempty"` }
RepeatPolicy contains the repeat policy for a step.
type RetryPolicy ¶
type RetryPolicy struct { // Limit is the number of retries allowed. Limit int `json:"Limit,omitempty"` // Interval is the time to wait between retries. Interval time.Duration `json:"Interval,omitempty"` // LimitStr is the string representation of the limit. LimitStr string `json:"LimitStr,omitempty"` // IntervalSecStr is the string representation of the interval. IntervalSecStr string `json:"IntervalSecStr,omitempty"` }
RetryPolicy contains the retry policy for a step.
type SMTPConfig ¶
type SMTPConfig struct { Host string `json:"Host"` Port string `json:"Port"` Username string `json:"Username"` Password string `json:"Password"` }
SMTPConfig contains the SMTP configuration.
type Schedule ¶
type Schedule struct { // Expression is the cron expression. Expression string `json:"Expression"` // Parsed is the parsed cron schedule. Parsed cron.Schedule `json:"-"` }
Schedule contains the cron expression and the parsed cron schedule.
type Status ¶
type Status struct { // Name represents the name of the executed DAG. Name string `json:"name,omitempty"` // Params is the parameters of the DAG execution Params string `json:"params,omitempty"` // Outputs is the outputs of the DAG execution. Outputs map[string]string `json:"outputs,omitempty"` }
Status is the result of a DAG execution.
type Step ¶
type Step struct { // Name is the name of the step. Name string `json:"Name"` // Description is the description of the step. This is optional. Description string `json:"Description,omitempty"` // Shell is the shell program to execute the command. This is optional. Shell string `json:"Shell,omitempty"` // OutputVariables stores the output variables for the following steps. // It only contains the local output variables. OutputVariables *SyncMap `json:"OutputVariables,omitempty"` // Dir is the working directory for the step. Dir string `json:"Dir,omitempty"` // ExecutorConfig contains the configuration for the executor. ExecutorConfig ExecutorConfig `json:"ExecutorConfig,omitempty"` // CmdWithArgs is the command with arguments (only display purpose). CmdWithArgs string `json:"CmdWithArgs,omitempty"` // CmdArgsSys is the command with arguments for the system. CmdArgsSys string `json:"CmdArgsSys,omitempty"` // Command specifies only the command without arguments. Command string `json:"Command,omitempty"` // ShellCmdArgs is the shell command with arguments. ShellCmdArgs string `json:"ShellCmdArgs,omitempty"` // Script is the script to be executed. Script string `json:"Script,omitempty"` // Args contains the arguments for the command. Args []string `json:"Args,omitempty"` // Stdout is the file to store the standard output. Stdout string `json:"Stdout,omitempty"` // Stderr is the file to store the standard error. Stderr string `json:"Stderr,omitempty"` // Output is the variable name to store the output. Output string `json:"Output,omitempty"` // Depends contains the list of step names to depend on. Depends []string `json:"Depends,omitempty"` // ContinueOn contains the conditions to continue on failure or skipped. ContinueOn ContinueOn `json:"ContinueOn,omitempty"` // RetryPolicy contains the retry policy for the step. RetryPolicy RetryPolicy `json:"RetryPolicy,omitempty"` // RepeatPolicy contains the repeat policy for the step. RepeatPolicy RepeatPolicy `json:"RepeatPolicy,omitempty"` // MailOnError is the flag to send mail on error. MailOnError bool `json:"MailOnError,omitempty"` // Preconditions contains the conditions to be met before running the step. Preconditions []Condition `json:"Preconditions,omitempty"` // SignalOnStop is the signal to send on stop. SignalOnStop string `json:"SignalOnStop,omitempty"` // SubWorkflow contains the information about a sub DAG to be executed. SubWorkflow *SubWorkflow `json:"SubWorkflow,omitempty"` }
Step contains the runtime information for a step in a DAG. A step is created from parsing a DAG file written in YAML. It marshals/unmarshals to/from JSON when it is saved in the execution history.
type StepBuilderFn ¶
type StepBuilderFn func(ctx BuildContext, def stepDef, step *Step) error
StepBuilderFn is a function that builds a part of the step.
type StepContext ¶
type StepContext struct { Context // contains filtered or unexported fields }
func GetStepContext ¶
func GetStepContext(ctx context.Context) StepContext
func NewStepContext ¶
func NewStepContext(ctx context.Context, step Step) StepContext
func (StepContext) AllEnvs ¶
func (c StepContext) AllEnvs() []string
func (StepContext) EvalString ¶
func (c StepContext) EvalString(s string, opts ...cmdutil.EvalOption) (string, error)
func (StepContext) LoadOutputVariables ¶
func (c StepContext) LoadOutputVariables(vars *SyncMap)
func (StepContext) MailerConfig ¶
func (c StepContext) MailerConfig() (mailer.Config, error)
func (StepContext) WithEnv ¶
func (c StepContext) WithEnv(key, value string) StepContext
type SubWorkflow ¶
type SubWorkflow struct { Name string `json:"Name,omitempty"` Params string `json:"Params,omitempty"` }
SubWorkflow contains information about a sub DAG to be executed.
type SyncMap ¶
SyncMap wraps a sync.Map to make it JSON serializable.