scheduler

package
v0.11.114 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AssetInstance

type AssetInstance struct {
	ID       string
	HumanID  string
	Pipeline *pipeline.Pipeline
	Asset    *pipeline.Asset
	// contains filtered or unexported fields
}

func (*AssetInstance) AddDownstream

func (t *AssetInstance) AddDownstream(task TaskInstance)

func (*AssetInstance) AddUpstream

func (t *AssetInstance) AddUpstream(task TaskInstance)

func (*AssetInstance) Blocking added in v0.9.0

func (t *AssetInstance) Blocking() bool

func (*AssetInstance) Completed

func (t *AssetInstance) Completed() bool

func (*AssetInstance) GetAsset

func (t *AssetInstance) GetAsset() *pipeline.Asset

func (*AssetInstance) GetDownstream

func (t *AssetInstance) GetDownstream() []TaskInstance

func (*AssetInstance) GetHumanID

func (t *AssetInstance) GetHumanID() string

func (*AssetInstance) GetHumanReadableDescription added in v0.11.29

func (t *AssetInstance) GetHumanReadableDescription() string

func (*AssetInstance) GetPipeline

func (t *AssetInstance) GetPipeline() *pipeline.Pipeline

func (*AssetInstance) GetStatus

func (t *AssetInstance) GetStatus() TaskInstanceStatus

func (*AssetInstance) GetType

func (t *AssetInstance) GetType() TaskInstanceType

func (*AssetInstance) GetUpstream

func (t *AssetInstance) GetUpstream() []TaskInstance

func (*AssetInstance) MarkAs

func (t *AssetInstance) MarkAs(status TaskInstanceStatus)

type ColumnCheckInstance

type ColumnCheckInstance struct {
	*AssetInstance

	Column *pipeline.Column
	Check  *pipeline.ColumnCheck
	// contains filtered or unexported fields
}

func (*ColumnCheckInstance) Blocking added in v0.9.0

func (t *ColumnCheckInstance) Blocking() bool

func (*ColumnCheckInstance) GetHumanReadableDescription added in v0.11.29

func (t *ColumnCheckInstance) GetHumanReadableDescription() string

func (*ColumnCheckInstance) GetType

type CustomCheckInstance

type CustomCheckInstance struct {
	*AssetInstance

	Check *pipeline.CustomCheck
}

func (*CustomCheckInstance) Blocking added in v0.9.0

func (t *CustomCheckInstance) Blocking() bool

func (*CustomCheckInstance) GetHumanReadableDescription added in v0.11.29

func (t *CustomCheckInstance) GetHumanReadableDescription() string

func (*CustomCheckInstance) GetType

type InstancesByType

type InstancesByType map[TaskInstanceType][]TaskInstance

func (InstancesByType) AddDownstreamByType

func (i InstancesByType) AddDownstreamByType(instanceType TaskInstanceType, downstream TaskInstance)

func (InstancesByType) AddUpstreamByType

func (i InstancesByType) AddUpstreamByType(instanceType TaskInstanceType, upstream TaskInstance)

type MetadataPushInstance added in v0.9.0

type MetadataPushInstance struct {
	*AssetInstance
}

func (*MetadataPushInstance) Blocking added in v0.9.0

func (t *MetadataPushInstance) Blocking() bool

func (*MetadataPushInstance) GetHumanReadableDescription added in v0.11.29

func (t *MetadataPushInstance) GetHumanReadableDescription() string

func (*MetadataPushInstance) GetType added in v0.9.0

type Scheduler

type Scheduler struct {
	WorkQueue chan TaskInstance
	Results   chan *TaskExecutionResult
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(logger *zap.SugaredLogger, p *pipeline.Pipeline) *Scheduler

func (*Scheduler) FindMajorityOfTypes added in v0.4.3

func (s *Scheduler) FindMajorityOfTypes(defaultIfNone pipeline.AssetType) pipeline.AssetType

func (*Scheduler) GetTaskInstancesByStatus

func (s *Scheduler) GetTaskInstancesByStatus(status TaskInstanceStatus) []TaskInstance

func (*Scheduler) InstanceCount

func (s *Scheduler) InstanceCount() int

func (*Scheduler) InstanceCountByStatus

func (s *Scheduler) InstanceCountByStatus(status TaskInstanceStatus) int

func (*Scheduler) Kickstart

func (s *Scheduler) Kickstart()

Kickstart initiates the scheduler process by sending a "start" task for the processing.

func (*Scheduler) MarkAll

func (s *Scheduler) MarkAll(status TaskInstanceStatus)

func (*Scheduler) MarkAsset added in v0.11.29

func (s *Scheduler) MarkAsset(task *pipeline.Asset, status TaskInstanceStatus, downstream bool)

func (*Scheduler) MarkByTag added in v0.11.0

func (s *Scheduler) MarkByTag(tag string, status TaskInstanceStatus, downstream bool)

func (*Scheduler) MarkPendingInstancesByType added in v0.11.29

func (s *Scheduler) MarkPendingInstancesByType(instanceType TaskInstanceType, status TaskInstanceStatus)

func (*Scheduler) MarkTaskInstance

func (s *Scheduler) MarkTaskInstance(instance TaskInstance, status TaskInstanceStatus, downstream bool)

func (*Scheduler) MarkTaskInstanceIfNotSkipped added in v0.11.114

func (s *Scheduler) MarkTaskInstanceIfNotSkipped(instance TaskInstance, status TaskInstanceStatus, markDownstream bool)

func (*Scheduler) Run

func (*Scheduler) Tick

func (s *Scheduler) Tick(result *TaskExecutionResult) bool

Tick marks an iteration of the scheduler loop. It is called when a result is received. The results are mainly fed from a channel, but Tick allows implementing additional methods of passing Asset results and simulating scheduler loops, e.g. time travel. It is also useful for testing purposes.

func (*Scheduler) WillRunTaskOfType

func (s *Scheduler) WillRunTaskOfType(taskType pipeline.AssetType) bool

type TaskExecutionResult

type TaskExecutionResult struct {
	Instance TaskInstance
	Error    error
}

type TaskInstance

type TaskInstance interface {
	GetPipeline() *pipeline.Pipeline
	GetAsset() *pipeline.Asset
	GetType() TaskInstanceType
	GetHumanID() string
	GetHumanReadableDescription() string

	GetStatus() TaskInstanceStatus
	MarkAs(status TaskInstanceStatus)
	Completed() bool
	Blocking() bool

	GetUpstream() []TaskInstance
	GetDownstream() []TaskInstance
	AddUpstream(t TaskInstance)
	AddDownstream(t TaskInstance)
}

type TaskInstanceStatus

type TaskInstanceStatus int
const (
	Pending TaskInstanceStatus = iota
	Queued
	Running
	Failed
	UpstreamFailed
	Succeeded
	Skipped
)

func (TaskInstanceStatus) String

func (s TaskInstanceStatus) String() string

type TaskInstanceType

type TaskInstanceType int
const (
	TaskInstanceTypeMain TaskInstanceType = iota
	TaskInstanceTypeColumnCheck
	TaskInstanceTypeCustomCheck
	TaskInstanceTypeMetadataPush
)

func (TaskInstanceType) String

func (s TaskInstanceType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL