digraph

package
v1.16.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2025 License: GPL-2.0, GPL-3.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EnvKeyLogPath          = "DAG_EXECUTION_LOG_PATH"
	EnvKeySchedulerLogPath = "DAG_SCHEDULER_LOG_PATH" // Deprecated in favor of EnvKeyDAGStepLogPath
	EnvKeyRequestID        = "DAG_REQUEST_ID"
	EnvKeyDAGName          = "DAG_NAME"
	EnvKeyDAGStepName      = "DAG_STEP_NAME"
	EnvKeyDAGStepLogPath   = "DAG_STEP_LOG_PATH"
)

Special environment variables.

View Source
const ExecutorTypeSubWorkflow = "subworkflow"

ExecutorTypeSubWorkflow is defined here in order to parse the `run` field in the DAG file.

View Source
const SystemVariablePrefix = "DAGU_"

Variables

View Source
var (
	ErrInvalidSchedule                     = errors.New("invalid schedule")
	ErrScheduleMustBeStringOrArray         = errors.New("schedule must be a string or an array of strings")
	ErrInvalidScheduleType                 = errors.New("invalid schedule type")
	ErrInvalidKeyType                      = errors.New("invalid key type")
	ErrExecutorConfigMustBeString          = errors.New("executor config key must be string")
	ErrDuplicateFunction                   = errors.New("duplicate function")
	ErrFuncParamsMismatch                  = errors.New("func params and args given to func command do not match")
	ErrStepNameRequired                    = errors.New("step name must be specified")
	ErrStepCommandIsRequired               = errors.New("step command is required")
	ErrStepCommandIsEmpty                  = errors.New("step command is empty")
	ErrStepCommandMustBeArrayOrString      = errors.New("step command must be an array of strings or a string")
	ErrInvalidParamValue                   = errors.New("invalid parameter value")
	ErrCallFunctionNotFound                = errors.New("call must specify a functions that exists")
	ErrNumberOfParamsMismatch              = errors.New("the number of parameters defined in the function does not match the number of parameters given")
	ErrRequiredParameterNotFound           = errors.New("required parameter not found")
	ErrScheduleKeyMustBeString             = errors.New("schedule key must be a string")
	ErrInvalidSignal                       = errors.New("invalid signal")
	ErrInvalidEnvValue                     = errors.New("invalid value for env")
	ErrArgsMustBeConvertibleToIntOrString  = errors.New("args must be convertible to either int or string")
	ErrExecutorTypeMustBeString            = errors.New("executor.type value must be string")
	ErrExecutorConfigValueMustBeMap        = errors.New("executor.config value must be a map")
	ErrExecutorHasInvalidKey               = errors.New("executor has invalid key")
	ErrExecutorConfigMustBeStringOrMap     = errors.New("executor config must be string or map")
	ErrDotenvMustBeStringOrArray           = errors.New("dotenv must be a string or an array of strings")
	ErrPreconditionMustBeArrayOrString     = errors.New("precondition must be a string or an array of strings")
	ErrPreconditionKeyMustBeString         = errors.New("precondition key must be a string")
	ErrPreconditionValueMustBeString       = errors.New("precondition value must be a string")
	ErrPreconditionHasInvalidKey           = errors.New("precondition has invalid key")
	ErrContinueOnOutputMustBeStringOrArray = errors.New("continueOn.Output must be a string or an array of strings")
	ErrContinueOnExitCodeMustBeIntOrArray  = errors.New("continueOn.ExitCode must be an int or an array of ints")
	ErrDependsMustBeStringOrArray          = errors.New("depends must be a string or an array of strings")
	ErrStepsMustBeArrayOrMap               = errors.New("steps must be an array or a map")
)

errors on building a DAG.

View Source
var ErrConditionNotMet = fmt.Errorf("condition was not met")

Functions

func EvalConditions

func EvalConditions(ctx context.Context, cond []Condition) error

EvalConditions evaluates a list of conditions and checks the results. It returns an error if any of the conditions were not met.

func EvalStringFields

func EvalStringFields[T any](stepContext StepContext, obj T) (T, error)

func IsContext

func IsContext(ctx context.Context) bool

func IsStepContext

func IsStepContext(ctx context.Context) bool

func NewContext

func NewContext(ctx context.Context, dag *DAG, client DBClient, requestID, logFile string, params []string) context.Context

func RenderTemplate added in v1.16.1

func RenderTemplate(tmpl string, data map[string]any) (string, error)

RenderTemplate replaces the template variables in the given template string

func WithContext

func WithContext(ctx context.Context, dagContext Context) context.Context

func WithStepContext

func WithStepContext(ctx context.Context, stepContext StepContext) context.Context

Types

type BuildContext

type BuildContext struct {
	// contains filtered or unexported fields
}

BuildContext is the context for building a DAG.

func (BuildContext) WithFile

func (c BuildContext) WithFile(file string) BuildContext

func (BuildContext) WithOpts

func (c BuildContext) WithOpts(opts BuildOpts) BuildContext

type BuildOpts added in v1.16.1

type BuildOpts struct {
	// Base specifies the Base configuration file for the DAG.
	Base string
	// OnlyMetadata specifies whether to build only the metadata.
	OnlyMetadata bool
	// Parameters specifies the Parameters to the DAG.
	// Parameters are used to override the default Parameters in the DAG.
	Parameters string
	// ParametersList specifies the parameters to the DAG.
	ParametersList []string
	// NoEval specifies whether to evaluate dynamic fields.
	NoEval bool
}

BuildOpts is used to control the behavior of the builder.

type BuilderFn

type BuilderFn func(ctx BuildContext, spec *definition, dag *DAG) error

BuilderFn is a function that builds a part of the DAG.

type Condition

type Condition struct {
	Command   string `json:"Command,omitempty"`   // Command to evaluate
	Condition string `json:"Condition,omitempty"` // Condition to evaluate
	Expected  string `json:"Expected,omitempty"`  // Expected value
}

Condition contains a condition and the expected value. Conditions are evaluated and compared to the expected value. The condition can be a command substitution or an environment variable. The expected value must be a string without any substitutions.

func (Condition) String

func (c Condition) String() string

func (Condition) Validate

func (c Condition) Validate() error

type Context

type Context struct {
	// contains filtered or unexported fields
}

func GetContext

func GetContext(ctx context.Context) Context

func (Context) AllEnvs

func (c Context) AllEnvs() []string

func (Context) ApplyEnvs

func (c Context) ApplyEnvs()

func (Context) EvalString

func (c Context) EvalString(s string, opts ...cmdutil.EvalOption) (string, error)

func (Context) GetDAGByName

func (c Context) GetDAGByName(name string) (*DAG, error)

func (Context) GetResult

func (c Context) GetResult(name, requestID string) (*Status, error)

func (Context) WithEnv

func (c Context) WithEnv(key, value string) Context

type ContinueOn

type ContinueOn struct {
	Failure     bool     `json:"Failure,omitempty"`     // Failure is the flag to continue to the next step on failure.
	Skipped     bool     `json:"Skipped,omitempty"`     // Skipped is the flag to continue to the next step on skipped.
	ExitCode    []int    `json:"ExitCode,omitempty"`    // ExitCode is the list of exit codes to continue to the next step.
	Output      []string `json:"Output,omitempty"`      // Output is the list of output (stdout/stderr) to continue to the next step.
	MarkSuccess bool     `json:"MarkSuccess,omitempty"` // MarkSuccess is the flag to mark the step as success when the condition is met.
}

ContinueOn contains the conditions to continue on failure or skipped. Failure is the flag to continue to the next step on failure. Skipped is the flag to continue to the next step on skipped. A step can be skipped when the preconditions are not met. Then if the ContinueOn.Skip is set, the step will continue to the next step.

type DAG

type DAG struct {
	// Location is the absolute path to the DAG file.
	Location string `json:"Location"`
	// Group is the group name of the DAG. This is optional.
	Group string `json:"Group"`
	// Name is the name of the DAG. The default is the filename without the extension.
	Name string `json:"Name"`
	// Dotenv is the path to the dotenv file. This is optional.
	Dotenv []string `json:"Dotenv"`
	// Tags contains the list of tags for the DAG. This is optional.
	Tags []string `json:"Tags"`
	// Description is the description of the DAG. This is optional.
	Description string `json:"Description"`
	// Schedule configuration for starting, stopping, and restarting the DAG.
	Schedule        []Schedule `json:"Schedule"`
	StopSchedule    []Schedule `json:"StopSchedule"`
	RestartSchedule []Schedule `json:"RestartSchedule"`
	// SkipIfSuccessful indicates whether to skip the DAG if it was successful previously.
	// E.g., when the DAG has already been executed manually before the scheduled time.
	SkipIfSuccessful bool `json:"SkipIfSuccessful"`
	// Env contains a list of environment variables to be set before running the DAG.
	Env []string `json:"Env"`
	// LogDir is the directory where the logs are stored.
	LogDir string `json:"LogDir"`
	// DefaultParams contains the default parameters to be passed to the DAG.
	DefaultParams string `json:"DefaultParams"`
	// Params contains the list of parameters to be passed to the DAG.
	Params []string `json:"Params"`
	// Steps contains the list of steps in the DAG.
	Steps []Step `json:"Steps"`
	// HandlerOn contains the steps to be executed on different events.
	HandlerOn HandlerOn `json:"HandlerOn"`
	// Preconditions contains the conditions to be met before running the DAG.
	Preconditions []Condition `json:"Preconditions"`
	// SMTP contains the SMTP configuration.
	SMTP *SMTPConfig `json:"Smtp"`
	// ErrorMail contains the mail configuration for errors.
	ErrorMail *MailConfig `json:"ErrorMail"`
	// InfoMail contains the mail configuration for informational messages.
	InfoMail *MailConfig `json:"InfoMail"`
	// MailOn contains the conditions to send mail.
	MailOn *MailOn `json:"MailOn"`
	// Timeout specifies the maximum execution time of the DAG task.
	Timeout time.Duration `json:"Timeout"`
	// Delay is the delay before starting the DAG.
	Delay time.Duration `json:"Delay"`
	// RestartWait is the time to wait before restarting the DAG.
	RestartWait time.Duration `json:"RestartWait"`
	// MaxActiveRuns specifies the maximum concurrent steps to run in an execution.
	MaxActiveRuns int `json:"MaxActiveRuns"`
	// MaxCleanUpTime is the maximum time to wait for cleanup when the DAG is stopped.
	MaxCleanUpTime time.Duration `json:"MaxCleanUpTime"`
	// HistRetentionDays is the number of days to keep the history.
	HistRetentionDays int `json:"HistRetentionDays"`
}

DAG contains all information about a workflow.

func Load

func Load(ctx context.Context, dag string, opts ...LoadOption) (*DAG, error)

Load loads the DAG from the given file with the specified options.

func LoadBaseConfig added in v1.16.1

func LoadBaseConfig(ctx BuildContext, file string) (*DAG, error)

LoadBaseConfig loads the global configuration from the given file. The global configuration can be overridden by the DAG configuration.

func LoadYAML

func LoadYAML(ctx context.Context, data []byte, opts ...LoadOption) (*DAG, error)

LoadYAML loads the DAG from the given YAML data with the specified options.

func LoadYAMLWithOpts added in v1.16.1

func LoadYAMLWithOpts(ctx context.Context, data []byte, opts BuildOpts) (*DAG, error)

LoadYAMLWithOpts loads the DAG configuration from YAML data.

func (*DAG) HasTag

func (d *DAG) HasTag(tag string) bool

HasTag checks if the DAG has the given tag.

func (*DAG) SockAddr

func (d *DAG) SockAddr() string

SockAddr returns the unix socket address for the DAG. The address is used to communicate with the agent process.

func (*DAG) String

func (d *DAG) String() string

String implements the Stringer interface. String returns a formatted string representation of the DAG

type DBClient

type DBClient interface {
	GetDAG(ctx context.Context, name string) (*DAG, error)
	GetStatus(ctx context.Context, name string, requestID string) (*Status, error)
}

DBClient gets a result of a DAG execution.

type ErrorList added in v1.16.1

type ErrorList []error

ErrorList is just a list of errors. It is used to collect multiple errors in building a DAG.

func (*ErrorList) Add added in v1.16.1

func (e *ErrorList) Add(err error)

Add adds an error to the list.

func (*ErrorList) Error added in v1.16.1

func (e *ErrorList) Error() string

Error implements the error interface. It returns a string with all the errors separated by a semicolon.

type ExecutorConfig

type ExecutorConfig struct {
	// Type represents one of the registered executors.
	// See `executor.Register` in `internal/executor/executor.go`.
	Type   string         `json:"Type,omitempty"`
	Config map[string]any `json:"Config,omitempty"` // Config contains executor-specific configuration.
}

ExecutorConfig contains the configuration for the executor.

func (ExecutorConfig) IsCommand

func (e ExecutorConfig) IsCommand() bool

IsCommand returns true if the executor is a command

type HandlerOn

type HandlerOn struct {
	Failure *Step `json:"Failure"`
	Success *Step `json:"Success"`
	Cancel  *Step `json:"Cancel"`
	Exit    *Step `json:"Exit"`
}

HandlerOn contains the steps to be executed on different events in the DAG.

type HandlerType

type HandlerType string

HandlerType is the type of the handler.

const (
	HandlerOnSuccess HandlerType = "onSuccess"
	HandlerOnFailure HandlerType = "onFailure"
	HandlerOnCancel  HandlerType = "onCancel"
	HandlerOnExit    HandlerType = "onExit"
)

func ParseHandlerType

func ParseHandlerType(s string) HandlerType

ParseHandlerType converts a string to a HandlerType.

func (HandlerType) String

func (h HandlerType) String() string

type LoadError

type LoadError struct {
	Field string
	Value any
	Err   error
}

LoadError represents an error in a specific field of the configuration

func (*LoadError) Error

func (e *LoadError) Error() string

func (*LoadError) Unwrap

func (e *LoadError) Unwrap() error

type LoadOption

type LoadOption func(*LoadOptions)

LoadOption is a function type for setting LoadOptions.

func OnlyMetadata

func OnlyMetadata() LoadOption

OnlyMetadata sets the flag to load only metadata.

func WithBaseConfig

func WithBaseConfig(baseDAG string) LoadOption

WithBaseConfig sets the base DAG configuration file.

func WithParams

func WithParams(params any) LoadOption

WithParams sets the parameters for the DAG.

func WithoutEval

func WithoutEval() LoadOption

WithoutEval disables the evaluation of dynamic fields.

type LoadOptions

type LoadOptions struct {
	// contains filtered or unexported fields
}

LoadOptions contains options for loading a DAG.

type MailConfig

type MailConfig struct {
	From       string `json:"From"`
	To         string `json:"To"`
	Prefix     string `json:"Prefix"`
	AttachLogs bool   `json:"AttachLogs"`
}

MailConfig contains the mail configuration.

type MailOn

type MailOn struct {
	Failure bool `json:"Failure"`
	Success bool `json:"Success"`
}

MailOn contains the conditions to send mail.

type RepeatPolicy

type RepeatPolicy struct {
	// Repeat determines if the step should be repeated.
	Repeat bool `json:"Repeat,omitempty"`
	// Interval is the time to wait between repeats.
	Interval time.Duration `json:"Interval,omitempty"`
}

RepeatPolicy contains the repeat policy for a step.

type RetryPolicy

type RetryPolicy struct {
	// Limit is the number of retries allowed.
	Limit int `json:"Limit,omitempty"`
	// Interval is the time to wait between retries.
	Interval time.Duration `json:"Interval,omitempty"`
	// LimitStr is the string representation of the limit.
	LimitStr string `json:"LimitStr,omitempty"`
	// IntervalSecStr is the string representation of the interval.
	IntervalSecStr string `json:"IntervalSecStr,omitempty"`
}

RetryPolicy contains the retry policy for a step.

type SMTPConfig

type SMTPConfig struct {
	Host     string `json:"Host"`
	Port     string `json:"Port"`
	Username string `json:"Username"`
	Password string `json:"Password"`
}

SMTPConfig contains the SMTP configuration.

type Schedule

type Schedule struct {
	// Expression is the cron expression.
	Expression string `json:"Expression"`
	// Parsed is the parsed cron schedule.
	Parsed cron.Schedule `json:"-"`
}

Schedule contains the cron expression and the parsed cron schedule.

type Status

type Status struct {
	// Name represents the name of the executed DAG.
	Name string `json:"name,omitempty"`
	// Params is the parameters of the DAG execution
	Params string `json:"params,omitempty"`
	// Outputs is the outputs of the DAG execution.
	Outputs map[string]string `json:"outputs,omitempty"`
}

Status is the result of a DAG execution.

type Step

type Step struct {
	// Name is the name of the step.
	Name string `json:"Name"`
	// Description is the description of the step. This is optional.
	Description string `json:"Description,omitempty"`
	// Shell is the shell program to execute the command. This is optional.
	Shell string `json:"Shell,omitempty"`
	// OutputVariables stores the output variables for the following steps.
	// It only contains the local output variables.
	OutputVariables *SyncMap `json:"OutputVariables,omitempty"`
	// Dir is the working directory for the step.
	Dir string `json:"Dir,omitempty"`
	// ExecutorConfig contains the configuration for the executor.
	ExecutorConfig ExecutorConfig `json:"ExecutorConfig,omitempty"`
	// CmdWithArgs is the command with arguments (only display purpose).
	CmdWithArgs string `json:"CmdWithArgs,omitempty"`
	// CmdArgsSys is the command with arguments for the system.
	CmdArgsSys string `json:"CmdArgsSys,omitempty"`
	// Command specifies only the command without arguments.
	Command string `json:"Command,omitempty"`
	// ShellCmdArgs is the shell command with arguments.
	ShellCmdArgs string `json:"ShellCmdArgs,omitempty"`
	// Script is the script to be executed.
	Script string `json:"Script,omitempty"`
	// Args contains the arguments for the command.
	Args []string `json:"Args,omitempty"`
	// Stdout is the file to store the standard output.
	Stdout string `json:"Stdout,omitempty"`
	// Stderr is the file to store the standard error.
	Stderr string `json:"Stderr,omitempty"`
	// Output is the variable name to store the output.
	Output string `json:"Output,omitempty"`
	// Depends contains the list of step names to depend on.
	Depends []string `json:"Depends,omitempty"`
	// ContinueOn contains the conditions to continue on failure or skipped.
	ContinueOn ContinueOn `json:"ContinueOn,omitempty"`
	// RetryPolicy contains the retry policy for the step.
	RetryPolicy RetryPolicy `json:"RetryPolicy,omitempty"`
	// RepeatPolicy contains the repeat policy for the step.
	RepeatPolicy RepeatPolicy `json:"RepeatPolicy,omitempty"`
	// MailOnError is the flag to send mail on error.
	MailOnError bool `json:"MailOnError,omitempty"`
	// Preconditions contains the conditions to be met before running the step.
	Preconditions []Condition `json:"Preconditions,omitempty"`
	// SignalOnStop is the signal to send on stop.
	SignalOnStop string `json:"SignalOnStop,omitempty"`
	// SubWorkflow contains the information about a sub DAG to be executed.
	SubWorkflow *SubWorkflow `json:"SubWorkflow,omitempty"`
}

Step contains the runtime information for a step in a DAG. A step is created from parsing a DAG file written in YAML. It marshals/unmarshals to/from JSON when it is saved in the execution history.

func (*Step) String

func (s *Step) String() string

String returns a formatted string representation of the step

type StepBuilderFn

type StepBuilderFn func(ctx BuildContext, def stepDef, step *Step) error

StepBuilderFn is a function that builds a part of the step.

type StepContext

type StepContext struct {
	Context
	// contains filtered or unexported fields
}

func GetStepContext

func GetStepContext(ctx context.Context) StepContext

func NewStepContext

func NewStepContext(ctx context.Context, step Step) StepContext

func (StepContext) AllEnvs

func (c StepContext) AllEnvs() []string

func (StepContext) EvalBool

func (c StepContext) EvalBool(value any) (bool, error)

func (StepContext) EvalString

func (c StepContext) EvalString(s string, opts ...cmdutil.EvalOption) (string, error)

func (StepContext) LoadOutputVariables

func (c StepContext) LoadOutputVariables(vars *SyncMap)

func (StepContext) MailerConfig

func (c StepContext) MailerConfig() (mailer.Config, error)

func (StepContext) WithEnv

func (c StepContext) WithEnv(key, value string) StepContext

type SubWorkflow

type SubWorkflow struct {
	Name   string `json:"Name,omitempty"`
	Params string `json:"Params,omitempty"`
}

SubWorkflow contains information about a sub DAG to be executed.

type SyncMap

type SyncMap struct {
	sync.Map
	// contains filtered or unexported fields
}

SyncMap wraps a sync.Map to make it JSON serializable.

func (*SyncMap) MarshalJSON

func (m *SyncMap) MarshalJSON() ([]byte, error)

func (*SyncMap) MarshalJSONIndent

func (m *SyncMap) MarshalJSONIndent(prefix, indent string) ([]byte, error)

func (*SyncMap) Store

func (m *SyncMap) Store(key, value any)

func (*SyncMap) UnmarshalJSON

func (m *SyncMap) UnmarshalJSON(data []byte) error

func (*SyncMap) Variables

func (m *SyncMap) Variables() map[string]string

Variables returns the map of variables. A variable is a string in the form of "key=value".

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL