scheduler

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AssetInstance

type AssetInstance struct {
	ID       string
	HumanID  string
	Pipeline *pipeline.Pipeline
	Asset    *pipeline.Asset
	// contains filtered or unexported fields
}

func (*AssetInstance) AddDownstream

func (t *AssetInstance) AddDownstream(task TaskInstance)

func (*AssetInstance) AddUpstream

func (t *AssetInstance) AddUpstream(task TaskInstance)

func (*AssetInstance) Completed

func (t *AssetInstance) Completed() bool

func (*AssetInstance) GetAsset

func (t *AssetInstance) GetAsset() *pipeline.Asset

func (*AssetInstance) GetDownstream

func (t *AssetInstance) GetDownstream() []TaskInstance

func (*AssetInstance) GetHumanID

func (t *AssetInstance) GetHumanID() string

func (*AssetInstance) GetPipeline

func (t *AssetInstance) GetPipeline() *pipeline.Pipeline

func (*AssetInstance) GetStatus

func (t *AssetInstance) GetStatus() TaskInstanceStatus

func (*AssetInstance) GetType

func (t *AssetInstance) GetType() TaskInstanceType

func (*AssetInstance) GetUpstream

func (t *AssetInstance) GetUpstream() []TaskInstance

func (*AssetInstance) MarkAs

func (t *AssetInstance) MarkAs(status TaskInstanceStatus)

type ColumnCheckInstance

type ColumnCheckInstance struct {
	*AssetInstance

	Column *pipeline.Column
	Check  *pipeline.ColumnCheck
	// contains filtered or unexported fields
}

func (*ColumnCheckInstance) GetType

type CustomCheckInstance

type CustomCheckInstance struct {
	*AssetInstance

	Check *pipeline.CustomCheck
}

func (*CustomCheckInstance) GetType

type InstancesByType

type InstancesByType map[TaskInstanceType][]TaskInstance

func (InstancesByType) AddDownstreamByType

func (i InstancesByType) AddDownstreamByType(instanceType TaskInstanceType, downstream TaskInstance)

func (InstancesByType) AddUpstreamByType

func (i InstancesByType) AddUpstreamByType(instanceType TaskInstanceType, upstream TaskInstance)

type Scheduler

type Scheduler struct {
	WorkQueue chan TaskInstance
	Results   chan *TaskExecutionResult
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(logger *zap.SugaredLogger, p *pipeline.Pipeline) *Scheduler

func (*Scheduler) GetTaskInstancesByStatus

func (s *Scheduler) GetTaskInstancesByStatus(status TaskInstanceStatus) []TaskInstance

func (*Scheduler) InstanceCount

func (s *Scheduler) InstanceCount() int

func (*Scheduler) InstanceCountByStatus

func (s *Scheduler) InstanceCountByStatus(status TaskInstanceStatus) int

func (*Scheduler) Kickstart

func (s *Scheduler) Kickstart()

Kickstart initiates the scheduler process by sending a "start" task for the processing.

func (*Scheduler) MarkAll

func (s *Scheduler) MarkAll(status TaskInstanceStatus)

func (*Scheduler) MarkTask

func (s *Scheduler) MarkTask(task *pipeline.Asset, status TaskInstanceStatus, downstream bool)

func (*Scheduler) MarkTaskInstance

func (s *Scheduler) MarkTaskInstance(instance TaskInstance, status TaskInstanceStatus, downstream bool)

func (*Scheduler) Run

func (*Scheduler) Tick

func (s *Scheduler) Tick(result *TaskExecutionResult) bool

Tick marks an iteration of the scheduler loop. It is called when a result is received. The results are mainly fed from a channel, but Tick allows implementing additional methods of passing Asset results and simulating scheduler loops, e.g. time travel. It is also useful for testing purposes.

func (*Scheduler) WillRunTaskOfType

func (s *Scheduler) WillRunTaskOfType(taskType pipeline.AssetType) bool

type TaskExecutionResult

type TaskExecutionResult struct {
	Instance TaskInstance
	Error    error
}

type TaskInstance

type TaskInstance interface {
	GetPipeline() *pipeline.Pipeline
	GetAsset() *pipeline.Asset
	GetType() TaskInstanceType
	GetHumanID() string

	GetStatus() TaskInstanceStatus
	MarkAs(status TaskInstanceStatus)
	Completed() bool

	GetUpstream() []TaskInstance
	GetDownstream() []TaskInstance
	AddUpstream(t TaskInstance)
	AddDownstream(t TaskInstance)
}

type TaskInstanceStatus

type TaskInstanceStatus int
const (
	Pending TaskInstanceStatus = iota
	Queued
	Running
	Failed
	UpstreamFailed
	Succeeded
)

func (TaskInstanceStatus) String

func (s TaskInstanceStatus) String() string

type TaskInstanceType

type TaskInstanceType int
const (
	TaskInstanceTypeMain TaskInstanceType = iota
	TaskInstanceTypeColumnCheck
	TaskInstanceTypeCustomCheck
)

func (TaskInstanceType) String

func (s TaskInstanceType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL