Documentation ¶
Index ¶
- type AssetInstance
- func (t *AssetInstance) AddDownstream(task TaskInstance)
- func (t *AssetInstance) AddUpstream(task TaskInstance)
- func (t *AssetInstance) Completed() bool
- func (t *AssetInstance) GetAsset() *pipeline.Asset
- func (t *AssetInstance) GetDownstream() []TaskInstance
- func (t *AssetInstance) GetHumanID() string
- func (t *AssetInstance) GetPipeline() *pipeline.Pipeline
- func (t *AssetInstance) GetStatus() TaskInstanceStatus
- func (t *AssetInstance) GetType() TaskInstanceType
- func (t *AssetInstance) GetUpstream() []TaskInstance
- func (t *AssetInstance) MarkAs(status TaskInstanceStatus)
- type ColumnCheckInstance
- type CustomCheckInstance
- type InstancesByType
- type Scheduler
- func (s *Scheduler) GetTaskInstancesByStatus(status TaskInstanceStatus) []TaskInstance
- func (s *Scheduler) InstanceCount() int
- func (s *Scheduler) InstanceCountByStatus(status TaskInstanceStatus) int
- func (s *Scheduler) Kickstart()
- func (s *Scheduler) MarkAll(status TaskInstanceStatus)
- func (s *Scheduler) MarkTask(task *pipeline.Asset, status TaskInstanceStatus, downstream bool)
- func (s *Scheduler) MarkTaskInstance(instance TaskInstance, status TaskInstanceStatus, downstream bool)
- func (s *Scheduler) Run(ctx context.Context) []*TaskExecutionResult
- func (s *Scheduler) Tick(result *TaskExecutionResult) bool
- func (s *Scheduler) WillRunTaskOfType(taskType pipeline.AssetType) bool
- type TaskExecutionResult
- type TaskInstance
- type TaskInstanceStatus
- type TaskInstanceType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AssetInstance ¶
type AssetInstance struct { ID string HumanID string Pipeline *pipeline.Pipeline Asset *pipeline.Asset // contains filtered or unexported fields }
func (*AssetInstance) AddDownstream ¶
func (t *AssetInstance) AddDownstream(task TaskInstance)
func (*AssetInstance) AddUpstream ¶
func (t *AssetInstance) AddUpstream(task TaskInstance)
func (*AssetInstance) Completed ¶
func (t *AssetInstance) Completed() bool
func (*AssetInstance) GetAsset ¶
func (t *AssetInstance) GetAsset() *pipeline.Asset
func (*AssetInstance) GetDownstream ¶
func (t *AssetInstance) GetDownstream() []TaskInstance
func (*AssetInstance) GetHumanID ¶
func (t *AssetInstance) GetHumanID() string
func (*AssetInstance) GetPipeline ¶
func (t *AssetInstance) GetPipeline() *pipeline.Pipeline
func (*AssetInstance) GetStatus ¶
func (t *AssetInstance) GetStatus() TaskInstanceStatus
func (*AssetInstance) GetType ¶
func (t *AssetInstance) GetType() TaskInstanceType
func (*AssetInstance) GetUpstream ¶
func (t *AssetInstance) GetUpstream() []TaskInstance
func (*AssetInstance) MarkAs ¶
func (t *AssetInstance) MarkAs(status TaskInstanceStatus)
type ColumnCheckInstance ¶
type ColumnCheckInstance struct { *AssetInstance Column *pipeline.Column Check *pipeline.ColumnCheck // contains filtered or unexported fields }
func (*ColumnCheckInstance) GetType ¶
func (t *ColumnCheckInstance) GetType() TaskInstanceType
type CustomCheckInstance ¶
type CustomCheckInstance struct { *AssetInstance Check *pipeline.CustomCheck }
func (*CustomCheckInstance) GetType ¶
func (t *CustomCheckInstance) GetType() TaskInstanceType
type InstancesByType ¶
type InstancesByType map[TaskInstanceType][]TaskInstance
func (InstancesByType) AddDownstreamByType ¶
func (i InstancesByType) AddDownstreamByType(instanceType TaskInstanceType, downstream TaskInstance)
func (InstancesByType) AddUpstreamByType ¶
func (i InstancesByType) AddUpstreamByType(instanceType TaskInstanceType, upstream TaskInstance)
type Scheduler ¶
type Scheduler struct { WorkQueue chan TaskInstance Results chan *TaskExecutionResult // contains filtered or unexported fields }
func NewScheduler ¶
func NewScheduler(logger *zap.SugaredLogger, p *pipeline.Pipeline) *Scheduler
func (*Scheduler) GetTaskInstancesByStatus ¶
func (s *Scheduler) GetTaskInstancesByStatus(status TaskInstanceStatus) []TaskInstance
func (*Scheduler) InstanceCount ¶
func (*Scheduler) InstanceCountByStatus ¶
func (s *Scheduler) InstanceCountByStatus(status TaskInstanceStatus) int
func (*Scheduler) Kickstart ¶
func (s *Scheduler) Kickstart()
Kickstart initiates the scheduler process by sending a "start" task for the processing.
func (*Scheduler) MarkAll ¶
func (s *Scheduler) MarkAll(status TaskInstanceStatus)
func (*Scheduler) MarkTask ¶
func (s *Scheduler) MarkTask(task *pipeline.Asset, status TaskInstanceStatus, downstream bool)
func (*Scheduler) MarkTaskInstance ¶
func (s *Scheduler) MarkTaskInstance(instance TaskInstance, status TaskInstanceStatus, downstream bool)
func (*Scheduler) Tick ¶
func (s *Scheduler) Tick(result *TaskExecutionResult) bool
Tick marks an iteration of the scheduler loop. It is called when a result is received. The results are mainly fed from a channel, but Tick allows implementing additional methods of passing Asset results and simulating scheduler loops, e.g. time travel. It is also useful for testing purposes.
type TaskExecutionResult ¶
type TaskExecutionResult struct { Instance TaskInstance Error error }
type TaskInstance ¶
type TaskInstance interface { GetPipeline() *pipeline.Pipeline GetAsset() *pipeline.Asset GetType() TaskInstanceType GetHumanID() string GetStatus() TaskInstanceStatus MarkAs(status TaskInstanceStatus) Completed() bool GetUpstream() []TaskInstance GetDownstream() []TaskInstance AddUpstream(t TaskInstance) AddDownstream(t TaskInstance) }
type TaskInstanceStatus ¶
type TaskInstanceStatus int
const ( Pending TaskInstanceStatus = iota Queued Running Failed UpstreamFailed Succeeded )
func (TaskInstanceStatus) String ¶
func (s TaskInstanceStatus) String() string
type TaskInstanceType ¶
type TaskInstanceType int
const ( TaskInstanceTypeMain TaskInstanceType = iota TaskInstanceTypeColumnCheck TaskInstanceTypeCustomCheck )
func (TaskInstanceType) String ¶
func (s TaskInstanceType) String() string
Click to show internal directories.
Click to hide internal directories.