Documentation ¶
Index ¶
- type AssetInstance
- func (t *AssetInstance) AddDownstream(task TaskInstance)
- func (t *AssetInstance) AddUpstream(task TaskInstance)
- func (t *AssetInstance) Blocking() bool
- func (t *AssetInstance) Completed() bool
- func (t *AssetInstance) GetAsset() *pipeline.Asset
- func (t *AssetInstance) GetDownstream() []TaskInstance
- func (t *AssetInstance) GetHumanID() string
- func (t *AssetInstance) GetHumanReadableDescription() string
- func (t *AssetInstance) GetPipeline() *pipeline.Pipeline
- func (t *AssetInstance) GetStatus() TaskInstanceStatus
- func (t *AssetInstance) GetType() TaskInstanceType
- func (t *AssetInstance) GetUpstream() []TaskInstance
- func (t *AssetInstance) MarkAs(status TaskInstanceStatus)
- type ColumnCheckInstance
- type CustomCheckInstance
- type InstancesByType
- type Metadata
- type MetadataPushInstance
- type PipelineAssetState
- type PipelineState
- type Scheduler
- func (s *Scheduler) FindMajorityOfTypes(defaultIfNone pipeline.AssetType) pipeline.AssetType
- func (s *Scheduler) GetTaskInstancesByStatus(status TaskInstanceStatus) []TaskInstance
- func (s *Scheduler) InstanceCount() int
- func (s *Scheduler) InstanceCountByStatus(status TaskInstanceStatus) int
- func (s *Scheduler) Kickstart()
- func (s *Scheduler) MarkAll(status TaskInstanceStatus)
- func (s *Scheduler) MarkAsset(task *pipeline.Asset, status TaskInstanceStatus, downstream bool)
- func (s *Scheduler) MarkByTag(tag string, status TaskInstanceStatus, downstream bool)
- func (s *Scheduler) MarkPendingInstancesByType(instanceType TaskInstanceType, status TaskInstanceStatus)
- func (s *Scheduler) MarkTaskInstance(instance TaskInstance, status TaskInstanceStatus, downstream bool)
- func (s *Scheduler) MarkTaskInstanceIfNotSkipped(instance TaskInstance, status TaskInstanceStatus, markDownstream bool)
- func (s *Scheduler) Run(ctx context.Context) []*TaskExecutionResult
- func (s *Scheduler) SavePipelineState(param map[string]string, runID, statePath string) error
- func (s *Scheduler) Tick(result *TaskExecutionResult) bool
- func (s *Scheduler) WillRunTaskOfType(taskType pipeline.AssetType) bool
- type TaskExecutionResult
- type TaskInstance
- type TaskInstanceStatus
- type TaskInstanceType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AssetInstance ¶
type AssetInstance struct { ID string HumanID string Pipeline *pipeline.Pipeline Asset *pipeline.Asset // contains filtered or unexported fields }
func (*AssetInstance) AddDownstream ¶
func (t *AssetInstance) AddDownstream(task TaskInstance)
func (*AssetInstance) AddUpstream ¶
func (t *AssetInstance) AddUpstream(task TaskInstance)
func (*AssetInstance) Blocking ¶ added in v0.9.0
func (t *AssetInstance) Blocking() bool
func (*AssetInstance) Completed ¶
func (t *AssetInstance) Completed() bool
func (*AssetInstance) GetAsset ¶
func (t *AssetInstance) GetAsset() *pipeline.Asset
func (*AssetInstance) GetDownstream ¶
func (t *AssetInstance) GetDownstream() []TaskInstance
func (*AssetInstance) GetHumanID ¶
func (t *AssetInstance) GetHumanID() string
func (*AssetInstance) GetHumanReadableDescription ¶ added in v0.11.29
func (t *AssetInstance) GetHumanReadableDescription() string
func (*AssetInstance) GetPipeline ¶
func (t *AssetInstance) GetPipeline() *pipeline.Pipeline
func (*AssetInstance) GetStatus ¶
func (t *AssetInstance) GetStatus() TaskInstanceStatus
func (*AssetInstance) GetType ¶
func (t *AssetInstance) GetType() TaskInstanceType
func (*AssetInstance) GetUpstream ¶
func (t *AssetInstance) GetUpstream() []TaskInstance
func (*AssetInstance) MarkAs ¶
func (t *AssetInstance) MarkAs(status TaskInstanceStatus)
type ColumnCheckInstance ¶
type ColumnCheckInstance struct { *AssetInstance Column *pipeline.Column Check *pipeline.ColumnCheck // contains filtered or unexported fields }
func (*ColumnCheckInstance) Blocking ¶ added in v0.9.0
func (t *ColumnCheckInstance) Blocking() bool
func (*ColumnCheckInstance) GetHumanReadableDescription ¶ added in v0.11.29
func (t *ColumnCheckInstance) GetHumanReadableDescription() string
func (*ColumnCheckInstance) GetType ¶
func (t *ColumnCheckInstance) GetType() TaskInstanceType
type CustomCheckInstance ¶
type CustomCheckInstance struct { *AssetInstance Check *pipeline.CustomCheck }
func (*CustomCheckInstance) Blocking ¶ added in v0.9.0
func (t *CustomCheckInstance) Blocking() bool
func (*CustomCheckInstance) GetHumanReadableDescription ¶ added in v0.11.29
func (t *CustomCheckInstance) GetHumanReadableDescription() string
func (*CustomCheckInstance) GetType ¶
func (t *CustomCheckInstance) GetType() TaskInstanceType
type InstancesByType ¶
type InstancesByType map[TaskInstanceType][]TaskInstance
func (InstancesByType) AddDownstreamByType ¶
func (i InstancesByType) AddDownstreamByType(instanceType TaskInstanceType, downstream TaskInstance)
func (InstancesByType) AddUpstreamByType ¶
func (i InstancesByType) AddUpstreamByType(instanceType TaskInstanceType, upstream TaskInstance)
type MetadataPushInstance ¶ added in v0.9.0
type MetadataPushInstance struct {
*AssetInstance
}
func (*MetadataPushInstance) Blocking ¶ added in v0.9.0
func (t *MetadataPushInstance) Blocking() bool
func (*MetadataPushInstance) GetHumanReadableDescription ¶ added in v0.11.29
func (t *MetadataPushInstance) GetHumanReadableDescription() string
func (*MetadataPushInstance) GetType ¶ added in v0.9.0
func (t *MetadataPushInstance) GetType() TaskInstanceType
type PipelineAssetState ¶ added in v0.11.122
type PipelineState ¶ added in v0.11.122
type PipelineState struct { Parameters map[string]string `json:"parameters"` Metadata Metadata `json:"metadata"` State []*PipelineAssetState `json:"state"` Version string `json:"version"` TimeStamp time.Time `json:"timestamp"` RunID string `json:"run_id"` CompatibilityHash string `json:"compatibility_hash"` }
type Scheduler ¶
type Scheduler struct { WorkQueue chan TaskInstance Results chan *TaskExecutionResult // contains filtered or unexported fields }
func NewScheduler ¶
func (*Scheduler) FindMajorityOfTypes ¶ added in v0.4.3
func (*Scheduler) GetTaskInstancesByStatus ¶
func (s *Scheduler) GetTaskInstancesByStatus(status TaskInstanceStatus) []TaskInstance
func (*Scheduler) InstanceCount ¶
func (*Scheduler) InstanceCountByStatus ¶
func (s *Scheduler) InstanceCountByStatus(status TaskInstanceStatus) int
func (*Scheduler) Kickstart ¶
func (s *Scheduler) Kickstart()
Kickstart initiates the scheduler process by sending a "start" task for the processing.
func (*Scheduler) MarkAll ¶
func (s *Scheduler) MarkAll(status TaskInstanceStatus)
func (*Scheduler) MarkAsset ¶ added in v0.11.29
func (s *Scheduler) MarkAsset(task *pipeline.Asset, status TaskInstanceStatus, downstream bool)
func (*Scheduler) MarkByTag ¶ added in v0.11.0
func (s *Scheduler) MarkByTag(tag string, status TaskInstanceStatus, downstream bool)
func (*Scheduler) MarkPendingInstancesByType ¶ added in v0.11.29
func (s *Scheduler) MarkPendingInstancesByType(instanceType TaskInstanceType, status TaskInstanceStatus)
func (*Scheduler) MarkTaskInstance ¶
func (s *Scheduler) MarkTaskInstance(instance TaskInstance, status TaskInstanceStatus, downstream bool)
func (*Scheduler) MarkTaskInstanceIfNotSkipped ¶ added in v0.11.114
func (s *Scheduler) MarkTaskInstanceIfNotSkipped(instance TaskInstance, status TaskInstanceStatus, markDownstream bool)
func (*Scheduler) SavePipelineState ¶ added in v0.11.122
func (*Scheduler) Tick ¶
func (s *Scheduler) Tick(result *TaskExecutionResult) bool
Tick marks an iteration of the scheduler loop. It is called when a result is received. The results are mainly fed from a channel, but Tick allows implementing additional methods of passing Asset results and simulating scheduler loops, e.g. time travel. It is also useful for testing purposes.
type TaskExecutionResult ¶
type TaskExecutionResult struct { Instance TaskInstance Error error }
type TaskInstance ¶
type TaskInstance interface { GetPipeline() *pipeline.Pipeline GetAsset() *pipeline.Asset GetType() TaskInstanceType GetHumanID() string GetHumanReadableDescription() string GetStatus() TaskInstanceStatus MarkAs(status TaskInstanceStatus) Completed() bool Blocking() bool GetUpstream() []TaskInstance GetDownstream() []TaskInstance AddUpstream(t TaskInstance) AddDownstream(t TaskInstance) }
type TaskInstanceStatus ¶
type TaskInstanceStatus int
const ( Pending TaskInstanceStatus = iota Queued Running Failed UpstreamFailed Succeeded Skipped )
func GetStatusForTask ¶ added in v0.11.122
func GetStatusForTask(tasks []TaskInstanceStatus) TaskInstanceStatus
func (TaskInstanceStatus) String ¶
func (s TaskInstanceStatus) String() string
type TaskInstanceType ¶
type TaskInstanceType int
const ( TaskInstanceTypeMain TaskInstanceType = iota TaskInstanceTypeColumnCheck TaskInstanceTypeCustomCheck TaskInstanceTypeMetadataPush )
func (TaskInstanceType) String ¶
func (s TaskInstanceType) String() string
Click to show internal directories.
Click to hide internal directories.