dag

package
v1.14.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2024 License: GPL-3.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const ExecutorTypeSubWorkflow = "subworkflow"

ExecutorTypeSubWorkflow is defined here in order to parse the `run` field in the DAG file.

Variables

View Source
var Exts = []string{".yaml", ".yml"}

Exts is a list of supported file extensions for DAG files.

Functions

func EvalConditions

func EvalConditions(cond []Condition) error

EvalConditions evaluates a list of conditions and checks the results. It returns an error if any of the conditions were not met.

func NewContext

func NewContext(ctx context.Context, dag *DAG, finder Finder) context.Context

NewContext creates a new context with the DAG and Finder.

Types

type Condition

type Condition struct {
	Condition string // Condition to evaluate
	Expected  string // Expected value
}

Condition contains a condition and the expected value. Conditions are evaluated and compared to the expected value. The condition can be a command substitution or an environment variable. The expected value must be a string without any substitutions.

type Context

type Context struct {
	DAG    *DAG
	Finder Finder
}

Context contains the current DAG and Finder.

func GetContext

func GetContext(ctx context.Context) (Context, error)

GetContext returns the DAG Context from the context. It returns an error if the context does not contain a DAG Context.

type ContinueOn

type ContinueOn struct {
	Failure bool // Failure is the flag to continue to the next step on failure.
	Skipped bool // Skipped is the flag to continue to the next step on skipped.
}

ContinueOn contains the conditions to continue on failure or skipped. Failure is the flag to continue to the next step on failure. Skipped is the flag to continue to the next step on skipped. A step can be skipped when the preconditions are not met. Then if the ContinueOn.Skip is set, the step will continue to the next step.

type DAG

type DAG struct {
	// Location is the absolute path to the DAG file.
	Location string `json:"Location"`
	// Group is the group name of the DAG. This is optional.
	Group string `json:"Group"`
	// Name is the name of the DAG. The default is the filename without the
	// extension.
	Name string `json:"Name"`
	// Tags contains the list of tags for the DAG. optional.
	Tags []string `json:"Tags"`
	// Description is the description of the DAG. optional.
	Description string `json:"Description"`

	// Schedule configuration.
	// This is used by the scheduler to start / stop / restart the DAG.
	Schedule        []Schedule `json:"Schedule"`
	StopSchedule    []Schedule `json:"StopSchedule"`
	RestartSchedule []Schedule `json:"RestartSchedule"`

	// Env contains a list of environment variables to be set before running
	// the DAG.
	Env []string `json:"Env"`

	// LogDir is the directory where the logs are stored.
	// The actual log directory is LogDir + Name (with invalid characters
	// replaced with '_').
	LogDir string `json:"LogDir"`

	// Parameters configuration.
	// The DAG definition contains only DefaultParams. Params are automatically
	// set by the DAG loader.
	// DefaultParams contains the default parameters to be passed to the DAG.
	DefaultParams string `json:"DefaultParams"`
	// Params contains the list of parameters to be passed to the DAG.
	Params []string `json:"Params"`

	// Commands configuration to be executed in the DAG.
	// Steps contains the list of steps in the DAG.
	Steps []Step `json:"Steps"`
	// HandlerOn contains the steps to be executed on different events.
	HandlerOn HandlerOn `json:"HandlerOn"`

	// Preconditions contains the conditions to be met before running the DAG.
	// If the conditions are not met, the whole DAG is skipped.
	Preconditions []Condition `json:"Preconditions"`

	// Mail notification configuration.
	// MailOn contains the conditions to send mail.
	// SMTP contains the SMTP configuration.
	// If you don't want to repeat the SMTP configuration for each DAG, you can
	// set it in the base configuration.
	SMTP *SMTPConfig `json:"Smtp"`
	// ErrorMail contains the mail configuration for error.
	ErrorMail *MailConfig `json:"ErrorMail"`
	// InfoMail contains the mail configuration for info.
	InfoMail *MailConfig `json:"InfoMail"`
	// MailOn contains the conditions to send mail.
	MailOn *MailOn `json:"MailOn"`

	// Timeout is a field to specify the maximum execution time of the DAG task
	Timeout time.Duration `json:"Timeout"`
	// Misc configuration for DAG execution.
	// Delay is the delay before starting the DAG.
	Delay time.Duration `json:"Delay"`
	// RestartWait is the time to wait before restarting the DAG.
	RestartWait time.Duration `json:"RestartWait"`
	// MaxActiveRuns specifies the maximum concurrent steps to run in an
	// execution.
	MaxActiveRuns int `json:"MaxActiveRuns"`
	// MaxCleanUpTime is the maximum time to wait for cleanup when the DAG is
	// stopped.
	MaxCleanUpTime time.Duration `json:"MaxCleanUpTime"`
	// HistRetentionDays is the number of days to keep the history.
	HistRetentionDays int `json:"HistRetentionDays"`
}

DAG contains all information about a workflow.

func Load

func Load(base, dag, params string) (*DAG, error)

Load loads config from file.

func LoadMetadata

func LoadMetadata(dag string) (*DAG, error)

LoadMetadata loads config from file and returns only the headline data.

func LoadWithoutEval

func LoadWithoutEval(dag string) (*DAG, error)

LoadWithoutEval loads config from file without evaluating env variables.

func LoadYAML

func LoadYAML(data []byte) (*DAG, error)

LoadYAML loads config from YAML data. It does not evaluate the environment variables. This is used to validate the YAML data.

func (*DAG) HasTag

func (d *DAG) HasTag(tag string) bool

HasTag checks if the DAG has the given tag.

func (*DAG) SockAddr

func (d *DAG) SockAddr() string

SockAddr returns the unix socket address for the DAG. The address is used to communicate with the agent process. TODO: It needs to be unique for each process so that multiple processes can run in parallel.

func (*DAG) String

func (d *DAG) String() string

String implements the Stringer interface. It returns the string representation of the DAG. TODO: Remove if not needed.

type ExecutorConfig

type ExecutorConfig struct {
	// Type represents one of the registered executor.
	// See `executor.Register` in `internal/executor/executor.go`.
	Type   string
	Config map[string]any // Config contains executor specific configuration.
}

ExecutorConfig contains the configuration for the executor.

type Finder

type Finder interface {
	Find(name string) (*DAG, error)
}

Finder finds a DAG by name. This is used to find the DAG when a node references another DAG.

type HandlerOn

type HandlerOn struct {
	Failure *Step `json:"Failure"`
	Success *Step `json:"Success"`
	Cancel  *Step `json:"Cancel"`
	Exit    *Step `json:"Exit"`
}

HandlerOn contains the steps to be executed on different events in the DAG.

type HandlerType

type HandlerType string

HandlerType is the type of the handler.

const (
	HandlerOnSuccess HandlerType = "onSuccess"
	HandlerOnFailure HandlerType = "onFailure"
	HandlerOnCancel  HandlerType = "onCancel"
	HandlerOnExit    HandlerType = "onExit"
)

func ParseHandlerType

func ParseHandlerType(s string) HandlerType

ParseHandlerType converts a string to a HandlerType.

func (HandlerType) String

func (e HandlerType) String() string

type MailConfig

type MailConfig struct {
	From string `json:"From"`
	To   string `json:"To"`
	// Prefix is the prefix for the subject of the mail.
	Prefix string `json:"Prefix"`
	// AttachLogs is the flag to attach the logs in the mail.
	AttachLogs bool `json:"AttachLogs"`
}

MailConfig contains the mail configuration.

type MailOn

type MailOn struct {
	Failure bool `json:"Failure"`
	Success bool `json:"Success"`
}

MailOn contains the conditions to send mail.

type RepeatPolicy

type RepeatPolicy struct {
	Repeat   bool          // Repeat determines if the step should be repeated.
	Interval time.Duration // Interval is the time to wait between repeats.
}

RepeatPolicy contains the repeat policy for a step.

type RetryPolicy

type RetryPolicy struct {
	Limit    int           // Limit is the number of retries allowed.
	Interval time.Duration // Interval is the time to wait between retries.
}

RetryPolicy contains the retry policy for a step.

type SMTPConfig

type SMTPConfig struct {
	Host     string `json:"Host"`
	Port     string `json:"Port"`
	Username string `json:"Username"`
	Password string `json:"Password"`
}

SMTPConfig contains the SMTP configuration.

type Schedule

type Schedule struct {
	// Expression is the cron expression.
	Expression string `json:"Expression"`
	// Parsed is the parsed cron schedule.
	Parsed cron.Schedule `json:"-"`
}

Schedule contains the cron expression and the parsed cron schedule.

type Step

type Step struct {
	// Name is the name of the step.
	Name string `json:"Name"`
	// Description is the description of the step.
	Description string `json:"Description,omitempty"`
	// Variables contains the list of variables to be set.
	Variables []string `json:"Variables,omitempty"`
	// OutputVariables is a structure to store the output variables for the
	// following steps.
	OutputVariables *SyncMap `json:"OutputVariables,omitempty"`
	// Dir is the working directory for the step.
	Dir string `json:"Dir,omitempty"`
	// ExecutorConfig contains the configuration for the executor.
	ExecutorConfig ExecutorConfig `json:"ExecutorConfig,omitempty"`
	// CmdWithArgs is the command with arguments.
	CmdWithArgs string `json:"CmdWithArgs,omitempty"`
	// Command specifies only the command without arguments.
	Command string `json:"Command,omitempty"`
	// Script is the script to be executed.
	Script string `json:"Script,omitempty"`
	// Args contains the arguments for the command.
	Args []string `json:"Args,omitempty"`
	// Stdout is the file to store the standard output.
	Stdout string `json:"Stdout,omitempty"`
	// Stderr is the file to store the standard error.
	Stderr string `json:"Stderr,omitempty"`
	// Output is the variable name to store the output.
	Output string `json:"Output,omitempty"`
	// Depends contains the list of step names to depend on.
	Depends []string `json:"Depends,omitempty"`
	// ContinueOn contains the conditions to continue on failure or skipped.
	ContinueOn ContinueOn `json:"ContinueOn,omitempty"`
	// RetryPolicy contains the retry policy for the step.
	RetryPolicy *RetryPolicy `json:"RetryPolicy,omitempty"`
	// RepeatPolicy contains the repeat policy for the step.
	RepeatPolicy RepeatPolicy `json:"RepeatPolicy,omitempty"`
	// MailOnError is the flag to send mail on error.
	MailOnError bool `json:"MailOnError,omitempty"`
	// Preconditions contains the conditions to be met before running the step.
	Preconditions []Condition `json:"Preconditions,omitempty"`
	// SignalOnStop is the signal to send on stop.
	SignalOnStop string `json:"SignalOnStop,omitempty"`
	// SubWorkflow contains the information about a sub DAG to be executed.
	SubWorkflow *SubWorkflow `json:"SubWorkflow,omitempty"`
}

Step contains the runtime information for a step in a DAG. A step is created from parsing a DAG file written in YAML. It marshal/unmarshal to/from JSON when it is saved in the execution history.

func (*Step) String

func (s *Step) String() string

String implements the Stringer interface. TODO: Remove if not needed.

type SubWorkflow

type SubWorkflow struct {
	Name   string
	Params string
}

SubWorkflow contains information about a sub DAG to be executed.

type SyncMap

type SyncMap struct{ sync.Map }

SyncMap wraps a sync.Map to make it JSON serializable.

func (*SyncMap) MarshalJSON

func (m *SyncMap) MarshalJSON() ([]byte, error)

func (*SyncMap) UnmarshalJSON

func (m *SyncMap) UnmarshalJSON(data []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL