scheduler

package
v0.8.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EntityEvent = "event"

	ISODateFormat = "2006-01-02T15:04:05Z"

	EventCategorySLAMiss    JobEventCategory = "sla_miss"
	EventCategoryJobFailure JobEventCategory = "failure"

	SLAMissEvent    JobEventType = "sla_miss"
	JobFailureEvent JobEventType = "failure"
	JobSuccessEvent JobEventType = "job_success"

	TaskStartEvent   JobEventType = "task_start"
	TaskRetryEvent   JobEventType = "task_retry"
	TaskFailEvent    JobEventType = "task_fail"
	TaskSuccessEvent JobEventType = "task_success"

	HookStartEvent   JobEventType = "hook_start"
	HookRetryEvent   JobEventType = "hook_retry"
	HookFailEvent    JobEventType = "hook_fail"
	HookSuccessEvent JobEventType = "hook_success"

	SensorStartEvent   JobEventType = "sensor_start"
	SensorRetryEvent   JobEventType = "sensor_retry"
	SensorFailEvent    JobEventType = "sensor_fail"
	SensorSuccessEvent JobEventType = "sensor_success"
)
View Source
const (
	EntityJobRun = "jobRun"

	OperatorTask   OperatorType = "task"
	OperatorSensor OperatorType = "sensor"
	OperatorHook   OperatorType = "hook"

	UpstreamTypeStatic   = "static"
	UpstreamTypeInferred = "inferred"
)
View Source
const (
	MetricNotificationQueue         = "notification_queue_total"
	MetricNotificationWorkerBatch   = "notification_worker_batch_total"
	MetricNotificationWorkerSendErr = "notification_worker_send_err_total"
)
View Source
const (
	// initial state
	ReplayStateCreated ReplayState = "created"

	// running state
	ReplayStateInProgress      ReplayState = "in progress"
	ReplayStatePartialReplayed ReplayState = "partial replayed"
	ReplayStateReplayed        ReplayState = "replayed"

	// terminal state
	ReplayStateInvalid ReplayState = "invalid"
	ReplayStateSuccess ReplayState = "success"
	ReplayStateFailed  ReplayState = "failed"

	// state on presentation layer
	ReplayUserStateCreated    ReplayUserState = "created"
	ReplayUserStateInProgress ReplayUserState = "in progress"
	ReplayUserStateInvalid    ReplayUserState = "invalid"
	ReplayUserStateSuccess    ReplayUserState = "success"
	ReplayUserStateFailed     ReplayUserState = "failed"

	EntityReplay = "replay"
)

Variables

Functions

func GroupJobsByTenant

func GroupJobsByTenant(j []*JobWithDetails) map[tenant.Tenant][]*JobWithDetails

Types

type Alert

type Alert struct {
	On       JobEventCategory
	Channels []string
	Config   map[string]string
}

type ConfigMap

type ConfigMap map[string]string

type Event

type Event struct {
	JobName        JobName
	Tenant         tenant.Tenant
	Type           JobEventType
	EventTime      time.Time
	OperatorName   string
	Status         State
	JobScheduledAt time.Time
	Values         map[string]any
	SLAObjectList  []*SLAObject
}

func EventFrom

func EventFrom(eventTypeName string, eventValues map[string]any, jobName JobName, tenent tenant.Tenant) (*Event, error)

type EventName

type EventName string

type Executor

type Executor struct {
	Name string
	Type ExecutorType
}

func ExecutorFrom

func ExecutorFrom(name string, executorType ExecutorType) (Executor, error)

func ExecutorFromEnum

func ExecutorFromEnum(name, enum string) (Executor, error)

type ExecutorInput

type ExecutorInput struct {
	Configs ConfigMap
	Secrets ConfigMap
	Files   ConfigMap
}

type ExecutorType

type ExecutorType string
const (
	ExecutorTask ExecutorType = "task"
	ExecutorHook ExecutorType = "hook"
)

func ExecutorTypeFrom

func ExecutorTypeFrom(val string) (ExecutorType, error)

func (ExecutorType) String

func (e ExecutorType) String() string

type HTTPUpstreams

type HTTPUpstreams struct {
	Name    string
	URL     string
	Headers map[string]string
	Params  map[string]string
}

type Hook

type Hook struct {
	Name   string
	Config map[string]string
}

type Job

type Job struct {
	Name   JobName
	Tenant tenant.Tenant

	Destination string
	Task        *Task
	Hooks       []*Hook
	Window      models.Window
	Assets      map[string]string
}

func (*Job) GetHook

func (j *Job) GetHook(hookName string) (*Hook, error)

type JobEventCategory

type JobEventCategory string

type JobEventType

type JobEventType string

func FromStringToEventType

func FromStringToEventType(name string) (JobEventType, error)

func (JobEventType) IsOfType

func (event JobEventType) IsOfType(category JobEventCategory) bool

func (JobEventType) String added in v0.7.0

func (event JobEventType) String() string

type JobMetadata

type JobMetadata struct {
	Version     int
	Owner       string
	Description string
	Labels      map[string]string
}

type JobName

type JobName string

func JobNameFrom

func JobNameFrom(name string) (JobName, error)

func (JobName) String

func (n JobName) String() string

type JobRun

type JobRun struct {
	ID uuid.UUID

	JobName     JobName
	Tenant      tenant.Tenant
	State       State
	ScheduledAt time.Time
	StartTime   time.Time
	SLAAlert    bool
	EndTime     time.Time

	Monitoring map[string]any
}

type JobRunID

type JobRunID uuid.UUID

func JobRunIDFromString

func JobRunIDFromString(runID string) (JobRunID, error)

func (JobRunID) IsEmpty

func (i JobRunID) IsEmpty() bool

func (JobRunID) UUID

func (i JobRunID) UUID() uuid.UUID

type JobRunStatus

type JobRunStatus struct {
	ScheduledAt time.Time
	State       State
}

func JobRunStatusFrom

func JobRunStatusFrom(scheduledAt time.Time, state string) (JobRunStatus, error)

func (JobRunStatus) GetLogicalTime added in v0.7.0

func (j JobRunStatus) GetLogicalTime(jobCron *cron.ScheduleSpec) time.Time

type JobRunStatusList added in v0.7.0

type JobRunStatusList []*JobRunStatus

func (JobRunStatusList) GetSortedRunsByScheduledAt added in v0.8.0

func (j JobRunStatusList) GetSortedRunsByScheduledAt() []*JobRunStatus

func (JobRunStatusList) GetSortedRunsByStates added in v0.7.0

func (j JobRunStatusList) GetSortedRunsByStates(states []State) []*JobRunStatus

func (JobRunStatusList) MergeWithUpdatedRuns added in v0.7.0

func (j JobRunStatusList) MergeWithUpdatedRuns(updatedRunMap map[time.Time]State) []*JobRunStatus

func (JobRunStatusList) ToRunStatusMap added in v0.7.0

func (j JobRunStatusList) ToRunStatusMap() map[time.Time]State

type JobRunsCriteria

type JobRunsCriteria struct {
	Name        string
	StartDate   time.Time
	EndDate     time.Time
	Filter      []string
	OnlyLastRun bool
}

JobRunsCriteria represents the filter condition to get run status from scheduler

func (*JobRunsCriteria) ExecutionEndDate

func (c *JobRunsCriteria) ExecutionEndDate(jobCron *cron.ScheduleSpec) time.Time

func (*JobRunsCriteria) ExecutionStart

func (c *JobRunsCriteria) ExecutionStart(cron *cron.ScheduleSpec) time.Time

type JobUpstream

type JobUpstream struct {
	JobName        string
	Host           string
	TaskName       string        // TODO: remove after airflow migration
	DestinationURN string        //- bigquery://pilot.playground.table
	Tenant         tenant.Tenant // Current or external tenant
	Type           string
	External       bool
	State          string
}

type JobWithDetails

type JobWithDetails struct {
	Name JobName

	Job           *Job
	JobMetadata   *JobMetadata
	Schedule      *Schedule
	Retry         Retry
	Alerts        []Alert
	RuntimeConfig RuntimeConfig
	Priority      int
	Upstreams     Upstreams
}

JobWithDetails contains the details for a job

func (*JobWithDetails) GetLabelsAsString

func (j *JobWithDetails) GetLabelsAsString() string

func (*JobWithDetails) GetName

func (j *JobWithDetails) GetName() string

func (*JobWithDetails) GetUniqueLabelValues added in v0.8.0

func (j *JobWithDetails) GetUniqueLabelValues() []string

func (*JobWithDetails) SLADuration

func (j *JobWithDetails) SLADuration() (int64, error)

type NotifyAttrs

type NotifyAttrs struct {
	Owner    string
	JobEvent *Event
	Route    string
	Secret   string
}

type OperatorRun

type OperatorRun struct {
	ID           uuid.UUID
	Name         string
	JobRunID     uuid.UUID
	OperatorType OperatorType
	Status       State
	StartTime    time.Time
	EndTime      time.Time
}

type OperatorType

type OperatorType string

func (OperatorType) String

func (o OperatorType) String() string

type Replay added in v0.7.0

type Replay struct {
	// contains filtered or unexported fields
}

func NewReplay added in v0.7.0

func NewReplay(id uuid.UUID, jobName JobName, tenant tenant.Tenant, config *ReplayConfig, state ReplayState, createdAt time.Time) *Replay

func NewReplayRequest added in v0.7.0

func NewReplayRequest(jobName JobName, tenant tenant.Tenant, config *ReplayConfig, state ReplayState) *Replay

func (*Replay) Config added in v0.7.0

func (r *Replay) Config() *ReplayConfig

func (*Replay) CreatedAt added in v0.7.0

func (r *Replay) CreatedAt() time.Time

func (*Replay) ID added in v0.7.0

func (r *Replay) ID() uuid.UUID

func (*Replay) JobName added in v0.7.0

func (r *Replay) JobName() JobName

func (*Replay) Message added in v0.7.0

func (r *Replay) Message() string

func (*Replay) State added in v0.7.0

func (r *Replay) State() ReplayState

func (*Replay) Tenant added in v0.7.0

func (r *Replay) Tenant() tenant.Tenant

func (*Replay) UserState added in v0.8.1

func (r *Replay) UserState() ReplayUserState

type ReplayConfig added in v0.7.0

type ReplayConfig struct {
	StartTime   time.Time
	EndTime     time.Time
	Parallel    bool
	JobConfig   map[string]string
	Description string
}

func NewReplayConfig added in v0.7.0

func NewReplayConfig(startTime, endTime time.Time, parallel bool, jobConfig map[string]string, description string) *ReplayConfig

type ReplayState added in v0.7.0

type ReplayState string // contract status for business layer

func ReplayStateFromString added in v0.7.0

func ReplayStateFromString(state string) (ReplayState, error)

func (ReplayState) String added in v0.7.0

func (j ReplayState) String() string

type ReplayUserState added in v0.8.1

type ReplayUserState string // contract status for presentation layer

func (ReplayUserState) String added in v0.8.1

func (j ReplayUserState) String() string

type ReplayWithRun added in v0.7.0

type ReplayWithRun struct {
	Replay *Replay
	Runs   []*JobRunStatus // TODO: JobRunStatus does not have `message/log`
}

func (*ReplayWithRun) GetFirstExecutableRun added in v0.7.0

func (r *ReplayWithRun) GetFirstExecutableRun() *JobRunStatus

func (*ReplayWithRun) GetLastExecutableRun added in v0.7.0

func (r *ReplayWithRun) GetLastExecutableRun() *JobRunStatus

type Resource

type Resource struct {
	Request *ResourceConfig
	Limit   *ResourceConfig
}

type ResourceConfig

type ResourceConfig struct {
	CPU    string
	Memory string
}

type Retry

type Retry struct {
	ExponentialBackoff bool
	Count              int
	Delay              int32
}

type RunConfig

type RunConfig struct {
	Executor Executor

	ScheduledAt time.Time
	JobRunID    JobRunID
}

func RunConfigFrom

func RunConfigFrom(executor Executor, scheduledAt time.Time, runID string) (RunConfig, error)

type RuntimeConfig

type RuntimeConfig struct {
	Resource  *Resource
	Scheduler map[string]string
}

type SLAObject

type SLAObject struct {
	JobName        JobName
	JobScheduledAt time.Time
}

func (*SLAObject) String

func (s *SLAObject) String() string

type Schedule

type Schedule struct {
	DependsOnPast bool
	StartDate     time.Time
	EndDate       *time.Time
	Interval      string
}

type State

type State string
const (
	StatePending State = "pending"

	StateAccepted State = "accepted"
	StateRunning  State = "running"
	StateQueued   State = "queued"

	StateRetry State = "retried"

	StateSuccess State = "success"
	StateFailed  State = "failed"

	StateWaitUpstream State = "wait_upstream"
	StateInProgress   State = "in_progress"
)

func StateFromString

func StateFromString(state string) (State, error)

func (State) String

func (j State) String() string

type Task

type Task struct {
	Name   string
	Config map[string]string
}

type Upstreams

type Upstreams struct {
	HTTP         []*HTTPUpstreams
	UpstreamJobs []*JobUpstream
}

Directories

Path Synopsis
handler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL