scheduler

package
v0.11.126 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AssetInstance

type AssetInstance struct {
	ID       string
	HumanID  string
	Pipeline *pipeline.Pipeline
	Asset    *pipeline.Asset
	// contains filtered or unexported fields
}

func (*AssetInstance) AddDownstream

func (t *AssetInstance) AddDownstream(task TaskInstance)

func (*AssetInstance) AddUpstream

func (t *AssetInstance) AddUpstream(task TaskInstance)

func (*AssetInstance) Blocking added in v0.9.0

func (t *AssetInstance) Blocking() bool

func (*AssetInstance) Completed

func (t *AssetInstance) Completed() bool

func (*AssetInstance) GetAsset

func (t *AssetInstance) GetAsset() *pipeline.Asset

func (*AssetInstance) GetDownstream

func (t *AssetInstance) GetDownstream() []TaskInstance

func (*AssetInstance) GetHumanID

func (t *AssetInstance) GetHumanID() string

func (*AssetInstance) GetHumanReadableDescription added in v0.11.29

func (t *AssetInstance) GetHumanReadableDescription() string

func (*AssetInstance) GetPipeline

func (t *AssetInstance) GetPipeline() *pipeline.Pipeline

func (*AssetInstance) GetStatus

func (t *AssetInstance) GetStatus() TaskInstanceStatus

func (*AssetInstance) GetType

func (t *AssetInstance) GetType() TaskInstanceType

func (*AssetInstance) GetUpstream

func (t *AssetInstance) GetUpstream() []TaskInstance

func (*AssetInstance) MarkAs

func (t *AssetInstance) MarkAs(status TaskInstanceStatus)

type ColumnCheckInstance

type ColumnCheckInstance struct {
	*AssetInstance

	Column *pipeline.Column
	Check  *pipeline.ColumnCheck
	// contains filtered or unexported fields
}

func (*ColumnCheckInstance) Blocking added in v0.9.0

func (t *ColumnCheckInstance) Blocking() bool

func (*ColumnCheckInstance) GetHumanReadableDescription added in v0.11.29

func (t *ColumnCheckInstance) GetHumanReadableDescription() string

func (*ColumnCheckInstance) GetType

type CustomCheckInstance

type CustomCheckInstance struct {
	*AssetInstance

	Check *pipeline.CustomCheck
}

func (*CustomCheckInstance) Blocking added in v0.9.0

func (t *CustomCheckInstance) Blocking() bool

func (*CustomCheckInstance) GetHumanReadableDescription added in v0.11.29

func (t *CustomCheckInstance) GetHumanReadableDescription() string

func (*CustomCheckInstance) GetType

type InstancesByType

type InstancesByType map[TaskInstanceType][]TaskInstance

func (InstancesByType) AddDownstreamByType

func (i InstancesByType) AddDownstreamByType(instanceType TaskInstanceType, downstream TaskInstance)

func (InstancesByType) AddUpstreamByType

func (i InstancesByType) AddUpstreamByType(instanceType TaskInstanceType, upstream TaskInstance)

type Metadata added in v0.11.122

type Metadata struct {
	Version string `json:"version"`
	OS      string `json:"os"`
}

type MetadataPushInstance added in v0.9.0

type MetadataPushInstance struct {
	*AssetInstance
}

func (*MetadataPushInstance) Blocking added in v0.9.0

func (t *MetadataPushInstance) Blocking() bool

func (*MetadataPushInstance) GetHumanReadableDescription added in v0.11.29

func (t *MetadataPushInstance) GetHumanReadableDescription() string

func (*MetadataPushInstance) GetType added in v0.9.0

type PipelineAssetState added in v0.11.122

type PipelineAssetState struct {
	Name   string `json:"name"`
	Status string `json:"status"`
}

type PipelineState added in v0.11.122

type PipelineState struct {
	Parameters        RunConfig             `json:"parameters"`
	Metadata          Metadata              `json:"metadata"`
	State             []*PipelineAssetState `json:"state"`
	Version           string                `json:"version"`
	TimeStamp         time.Time             `json:"timestamp"`
	RunID             string                `json:"run_id"`
	CompatibilityHash string                `json:"compatibility_hash"`
}

func ReadState added in v0.11.123

func ReadState(fs afero.Fs, statePath string) (*PipelineState, error)

type RunConfig added in v0.11.123

type RunConfig struct {
	Downstream        bool     `json:"downstream"`
	StartDate         string   `json:"startDate"`
	EndDate           string   `json:"endDate"`
	Workers           int      `json:"workers"`
	Environment       string   `json:"environment"`
	Force             bool     `json:"force"`
	PushMetadata      bool     `json:"pushMetadata"`
	NoLogFile         bool     `json:"noLogFile"`
	FullRefresh       bool     `json:"fullRefresh"`
	UsePip            bool     `json:"useUV"`
	Tag               string   `json:"tag"`
	ExcludeTag        string   `json:"excludeTag"`
	Only              []string `json:"only"`
	Output            string   `json:"output"`
	ExpUseWingetForUv bool     `json:"expUseWingetForUv"`
}

type Scheduler

type Scheduler struct {
	WorkQueue chan TaskInstance
	Results   chan *TaskExecutionResult
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(logger *zap.SugaredLogger, p *pipeline.Pipeline, runID string) *Scheduler

func (*Scheduler) FindMajorityOfTypes added in v0.4.3

func (s *Scheduler) FindMajorityOfTypes(defaultIfNone pipeline.AssetType) pipeline.AssetType

func (*Scheduler) GetTaskInstancesByStatus

func (s *Scheduler) GetTaskInstancesByStatus(status TaskInstanceStatus) []TaskInstance

func (*Scheduler) InstanceCount

func (s *Scheduler) InstanceCount() int

func (*Scheduler) InstanceCountByStatus

func (s *Scheduler) InstanceCountByStatus(status TaskInstanceStatus) int

func (*Scheduler) Kickstart

func (s *Scheduler) Kickstart()

Kickstart initiates the scheduler process by sending a "start" task for the processing.

func (*Scheduler) MarkAll

func (s *Scheduler) MarkAll(status TaskInstanceStatus)

func (*Scheduler) MarkAsset added in v0.11.29

func (s *Scheduler) MarkAsset(task *pipeline.Asset, status TaskInstanceStatus, downstream bool)

func (*Scheduler) MarkByTag added in v0.11.0

func (s *Scheduler) MarkByTag(tag string, status TaskInstanceStatus, downstream bool)

func (*Scheduler) MarkPendingInstancesByType added in v0.11.29

func (s *Scheduler) MarkPendingInstancesByType(instanceType TaskInstanceType, status TaskInstanceStatus)

func (*Scheduler) MarkTaskInstance

func (s *Scheduler) MarkTaskInstance(instance TaskInstance, status TaskInstanceStatus, downstream bool)

func (*Scheduler) MarkTaskInstanceIfNotSkipped added in v0.11.114

func (s *Scheduler) MarkTaskInstanceIfNotSkipped(instance TaskInstance, status TaskInstanceStatus, markDownstream bool)

func (*Scheduler) RestoreState added in v0.11.123

func (s *Scheduler) RestoreState(state *PipelineState) error

func (*Scheduler) Run

func (*Scheduler) SavePipelineState added in v0.11.122

func (s *Scheduler) SavePipelineState(fs afero.Fs, param *RunConfig, runID, statePath string) error

func (*Scheduler) Tick

func (s *Scheduler) Tick(result *TaskExecutionResult) bool

Tick marks an iteration of the scheduler loop. It is called when a result is received. The results are mainly fed from a channel, but Tick allows implementing additional methods of passing Asset results and simulating scheduler loops, e.g. time travel. It is also useful for testing purposes.

func (*Scheduler) WillRunTaskOfType

func (s *Scheduler) WillRunTaskOfType(taskType pipeline.AssetType) bool

type TaskExecutionResult

type TaskExecutionResult struct {
	Instance TaskInstance
	Error    error
}

type TaskInstance

type TaskInstance interface {
	GetPipeline() *pipeline.Pipeline
	GetAsset() *pipeline.Asset
	GetType() TaskInstanceType
	GetHumanID() string
	GetHumanReadableDescription() string

	GetStatus() TaskInstanceStatus
	MarkAs(status TaskInstanceStatus)
	Completed() bool
	Blocking() bool

	GetUpstream() []TaskInstance
	GetDownstream() []TaskInstance
	AddUpstream(t TaskInstance)
	AddDownstream(t TaskInstance)
}

type TaskInstanceStatus

type TaskInstanceStatus int
const (
	Pending TaskInstanceStatus = iota
	Queued
	Running
	Failed
	UpstreamFailed
	Succeeded
	Skipped
)

func GetStatusForTask added in v0.11.122

func GetStatusForTask(tasks []TaskInstanceStatus) TaskInstanceStatus

func StatusFromString added in v0.11.123

func StatusFromString(status string) TaskInstanceStatus

func (TaskInstanceStatus) String

func (s TaskInstanceStatus) String() string

type TaskInstanceType

type TaskInstanceType int
const (
	TaskInstanceTypeMain TaskInstanceType = iota
	TaskInstanceTypeColumnCheck
	TaskInstanceTypeCustomCheck
	TaskInstanceTypeMetadataPush
)

func (TaskInstanceType) String

func (s TaskInstanceType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL