scheduler

package
v0.18.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EntityEvent = "event"

	ISODateFormat = "2006-01-02T15:04:05Z"

	EventCategorySLAMiss    EventCategory = "sla_miss"
	EventCategoryJobFailure EventCategory = "failure"
	EventCategoryJobSuccess EventCategory = "job_success"
	EventCategoryReplay     EventCategory = "replay_lifecycle"

	SLAMissEvent    JobEventType = "sla_miss"
	JobFailureEvent JobEventType = "failure"
	JobSuccessEvent JobEventType = "job_success"

	TaskStartEvent   JobEventType = "task_start"
	TaskRetryEvent   JobEventType = "task_retry"
	TaskFailEvent    JobEventType = "task_fail"
	TaskSuccessEvent JobEventType = "task_success"

	HookStartEvent   JobEventType = "hook_start"
	HookRetryEvent   JobEventType = "hook_retry"
	HookFailEvent    JobEventType = "hook_fail"
	HookSuccessEvent JobEventType = "hook_success"

	SensorStartEvent   JobEventType = "sensor_start"
	SensorRetryEvent   JobEventType = "sensor_retry"
	SensorFailEvent    JobEventType = "sensor_fail"
	SensorSuccessEvent JobEventType = "sensor_success"

	StatusFiring   EventStatus = "firing"
	StatusResolved EventStatus = "resolved"
)
View Source
const (
	EntityJobRun = "jobRun"

	OperatorTask   OperatorType = "task"
	OperatorSensor OperatorType = "sensor"
	OperatorHook   OperatorType = "hook"

	UpstreamTypeStatic   = "static"
	UpstreamTypeInferred = "inferred"
)
View Source
const (
	MetricNotificationQueue         = "notification_queue_total"
	MetricNotificationWorkerBatch   = "notification_worker_batch_total"
	MetricNotificationWorkerSendErr = "notification_worker_send_err_total"
	MetricNotificationSend          = "notification_worker_send_total"
)

Variables

Functions

func GroupJobsByTenant

func GroupJobsByTenant(j []*JobWithDetails) map[tenant.Tenant][]*JobWithDetails

Types

type Alert

type Alert struct {
	On       EventCategory
	Channels []string
	Config   map[string]string
	Severity string
	Team     string
}

type AlertAttrs added in v0.11.7

type AlertAttrs struct {
	Owner         string
	JobURN        string
	Title         string
	SchedulerHost string
	Status        EventStatus
	JobEvent      *Event

	JobWithDetails *JobWithDetails
}

type ConfigMap

type ConfigMap map[string]string

type Event

type Event struct {
	JobName        JobName
	URN            string
	Tenant         tenant.Tenant
	Type           JobEventType
	EventTime      time.Time
	OperatorName   string
	Status         State
	JobScheduledAt time.Time
	Values         map[string]any
	SLAObjectList  []*SLAObject
}

func EventFrom

func EventFrom(eventTypeName string, eventValues map[string]any, jobName JobName, tenent tenant.Tenant) (*Event, error)

type EventCategory added in v0.18.0

type EventCategory string

func (EventCategory) String added in v0.18.0

func (e EventCategory) String() string

type EventName

type EventName string

type EventStatus added in v0.11.7

type EventStatus string

type Executor

type Executor struct {
	Name string
	Type ExecutorType
}

func ExecutorFrom

func ExecutorFrom(name string, executorType ExecutorType) (Executor, error)

func ExecutorFromEnum

func ExecutorFromEnum(name, enum string) (Executor, error)

type ExecutorInput

type ExecutorInput struct {
	Configs ConfigMap
	Secrets ConfigMap
	Files   ConfigMap
}

type ExecutorType

type ExecutorType string
const (
	ExecutorTask ExecutorType = "task"
	ExecutorHook ExecutorType = "hook"
)

func ExecutorTypeFrom

func ExecutorTypeFrom(val string) (ExecutorType, error)

func (ExecutorType) String

func (e ExecutorType) String() string

type HTTPUpstreams

type HTTPUpstreams struct {
	Name    string
	URL     string
	Headers map[string]string
	Params  map[string]string
}

type Hook

type Hook struct {
	Name   string
	Config map[string]string
}

type Job

type Job struct {
	ID     uuid.UUID
	Name   JobName
	Tenant tenant.Tenant

	Destination resource.URN
	Task        *Task
	Hooks       []*Hook

	WindowConfig window.Config
	Assets       map[string]string
}

func (*Job) GetHook

func (j *Job) GetHook(hookName string) (*Hook, error)

func (*Job) URN added in v0.11.7

func (j *Job) URN() string

type JobEventType

type JobEventType string

func FromStringToEventType

func FromStringToEventType(name string) (JobEventType, error)

func (JobEventType) IsOfType

func (event JobEventType) IsOfType(category EventCategory) bool

func (JobEventType) String added in v0.7.0

func (event JobEventType) String() string

type JobMetadata

type JobMetadata struct {
	Version     int
	Owner       string
	Description string
	Labels      map[string]string
}

type JobName

type JobName string

func JobNameFrom

func JobNameFrom(name string) (JobName, error)

func (JobName) GetJobURN added in v0.17.0

func (n JobName) GetJobURN(tnnt tenant.Tenant) string

func (JobName) String

func (n JobName) String() string

type JobRun

type JobRun struct {
	ID uuid.UUID

	JobName       JobName
	Tenant        tenant.Tenant
	State         State
	ScheduledAt   time.Time
	SLAAlert      bool
	StartTime     time.Time
	EndTime       *time.Time
	SLADefinition int64

	Monitoring map[string]any
}

func (*JobRun) HasSLABreached added in v0.9.7

func (j *JobRun) HasSLABreached() bool

type JobRunDetailsList added in v0.17.0

type JobRunDetailsList []*JobRunWithDetails

func (JobRunDetailsList) FilterRunsManagedByReplay added in v0.17.0

func (j JobRunDetailsList) FilterRunsManagedByReplay(runs []*JobRunStatus) JobRunDetailsList

func (JobRunDetailsList) GetSortedRunsByStates added in v0.17.0

func (j JobRunDetailsList) GetSortedRunsByStates(states []State) []*JobRunWithDetails

type JobRunID

type JobRunID uuid.UUID

func JobRunIDFromString

func JobRunIDFromString(runID string) (JobRunID, error)

func (JobRunID) IsEmpty

func (i JobRunID) IsEmpty() bool

func (JobRunID) UUID

func (i JobRunID) UUID() uuid.UUID

type JobRunMeta added in v0.11.4

type JobRunMeta struct {
	Labels         map[string]string
	DestinationURN resource.URN
}

type JobRunStatus

type JobRunStatus struct {
	ScheduledAt time.Time
	State       State
}

func JobRunStatusFrom

func JobRunStatusFrom(scheduledAt time.Time, state string) (JobRunStatus, error)

func (JobRunStatus) GetLogicalTime added in v0.7.0

func (j JobRunStatus) GetLogicalTime(jobCron *cron.ScheduleSpec) time.Time

func (JobRunStatus) GetScheduledAt added in v0.17.0

func (j JobRunStatus) GetScheduledAt() time.Time

func (JobRunStatus) GetState added in v0.17.0

func (j JobRunStatus) GetState() State

type JobRunStatusList added in v0.7.0

type JobRunStatusList []*JobRunStatus

func (JobRunStatusList) GetJobRunStatusSummary added in v0.11.3

func (j JobRunStatusList) GetJobRunStatusSummary() string

func (JobRunStatusList) GetOnlyDifferedRuns added in v0.18.1

func (j JobRunStatusList) GetOnlyDifferedRuns(runsComparator []*JobRunStatus) JobRunStatusList

func (JobRunStatusList) GetSortedRunsByScheduledAt added in v0.8.0

func (j JobRunStatusList) GetSortedRunsByScheduledAt() []*JobRunStatus

func (JobRunStatusList) GetSortedRunsByStates added in v0.7.0

func (j JobRunStatusList) GetSortedRunsByStates(states []State) []*JobRunStatus

func (JobRunStatusList) IsAllTerminated added in v0.11.0

func (j JobRunStatusList) IsAllTerminated() bool

func (JobRunStatusList) IsAnyFailure added in v0.11.0

func (j JobRunStatusList) IsAnyFailure() bool

func (JobRunStatusList) MergeWithUpdatedRuns added in v0.7.0

func (j JobRunStatusList) MergeWithUpdatedRuns(updatedRunMap map[time.Time]State) []*JobRunStatus

func (JobRunStatusList) OverrideWithStatus added in v0.9.0

func (j JobRunStatusList) OverrideWithStatus(status State) []*JobRunStatus

func (JobRunStatusList) ToRunStatusMap added in v0.7.0

func (j JobRunStatusList) ToRunStatusMap() map[time.Time]State

type JobRunWithDetails added in v0.17.0

type JobRunWithDetails struct {
	ScheduledAt     time.Time
	State           State
	RunType         string
	ExternalTrigger bool
	DagRunID        string
	DagID           string
}

func (JobRunWithDetails) GetState added in v0.17.0

func (j JobRunWithDetails) GetState() State

type JobRunsCriteria

type JobRunsCriteria struct {
	Name        string
	StartDate   time.Time
	EndDate     time.Time
	Filter      []string
	OnlyLastRun bool
}

JobRunsCriteria represents the filter condition to get run status from scheduler

func (*JobRunsCriteria) ExecutionEndDate

func (c *JobRunsCriteria) ExecutionEndDate(jobCron *cron.ScheduleSpec) time.Time

func (*JobRunsCriteria) ExecutionStart

func (c *JobRunsCriteria) ExecutionStart(cron *cron.ScheduleSpec) time.Time

type JobUpstream

type JobUpstream struct {
	JobName        string
	Host           string
	TaskName       string        // TODO: remove after airflow migration
	DestinationURN resource.URN  //- bigquery://pilot.playground.table
	Tenant         tenant.Tenant // Current or external tenant
	Type           string
	External       bool
	State          string
}

type JobWithDetails

type JobWithDetails struct {
	Name JobName

	Job           *Job
	JobMetadata   *JobMetadata
	Schedule      *Schedule
	Retry         Retry
	Alerts        []Alert
	Webhook       []Webhook
	RuntimeConfig RuntimeConfig
	Priority      int
	Upstreams     Upstreams
}

JobWithDetails contains the details for a job

func (*JobWithDetails) GetLabelsAsString

func (j *JobWithDetails) GetLabelsAsString() string

func (*JobWithDetails) GetName

func (j *JobWithDetails) GetName() string

func (*JobWithDetails) GetSafeLabels added in v0.10.2

func (j *JobWithDetails) GetSafeLabels() map[string]string

func (*JobWithDetails) GetUniqueLabelValues added in v0.8.0

func (j *JobWithDetails) GetUniqueLabelValues() []string

func (*JobWithDetails) SLADuration

func (j *JobWithDetails) SLADuration() (int64, error)

type NotifyAttrs

type NotifyAttrs struct {
	Owner    string
	JobEvent *Event
	Route    string
	Secret   string
}

type OperatorRun

type OperatorRun struct {
	ID           uuid.UUID
	Name         string
	JobRunID     uuid.UUID
	OperatorType OperatorType
	Status       State
	StartTime    time.Time
	EndTime      *time.Time
}

type OperatorType

type OperatorType string

func (OperatorType) String

func (o OperatorType) String() string

type Replay added in v0.7.0

type Replay struct {
	// contains filtered or unexported fields
}

func NewReplay added in v0.7.0

func NewReplay(id uuid.UUID, jobName JobName, tenant tenant.Tenant, config *ReplayConfig, state ReplayState, createdAt, updatedAt time.Time, message string) *Replay

func NewReplayRequest added in v0.7.0

func NewReplayRequest(jobName JobName, tenant tenant.Tenant, config *ReplayConfig, state ReplayState) *Replay

func (*Replay) Config added in v0.7.0

func (r *Replay) Config() *ReplayConfig

func (*Replay) CreatedAt added in v0.7.0

func (r *Replay) CreatedAt() time.Time

func (*Replay) ID added in v0.7.0

func (r *Replay) ID() uuid.UUID

func (*Replay) IsTerminated added in v0.11.3

func (r *Replay) IsTerminated() bool

func (*Replay) JobName added in v0.7.0

func (r *Replay) JobName() JobName

func (*Replay) Message added in v0.7.0

func (r *Replay) Message() string

func (*Replay) State added in v0.7.0

func (r *Replay) State() ReplayState

func (*Replay) Tenant added in v0.7.0

func (r *Replay) Tenant() tenant.Tenant

func (*Replay) UpdatedAt added in v0.17.0

func (r *Replay) UpdatedAt() time.Time

type ReplayConfig added in v0.7.0

type ReplayConfig struct {
	StartTime   time.Time
	EndTime     time.Time
	Parallel    bool
	JobConfig   map[string]string
	Description string
}

func NewReplayConfig added in v0.7.0

func NewReplayConfig(startTime, endTime time.Time, parallel bool, jobConfig map[string]string, description string) *ReplayConfig

type ReplayNotificationAttrs added in v0.16.2

type ReplayNotificationAttrs struct {
	JobName  string
	ReplayID string
	Tenant   tenant.Tenant
	JobURN   string
	State    ReplayState

	JobWithDetails *JobWithDetails
}

type ReplayState added in v0.7.0

type ReplayState string // contract status for business layer
const (
	// ReplayStateCreated is an initial state which indicates the replay has been created but not picked up yet
	ReplayStateCreated ReplayState = "created"

	// ReplayStateInProgress indicates the replay is being executed
	ReplayStateInProgress ReplayState = "in progress"

	// ReplayStateSuccess is a terminal state which occurs when the replay execution finished with successful job runs
	ReplayStateSuccess ReplayState = "success"

	ReplayStateTimeout ReplayState = "timeout"

	// ReplayStateFailed is a terminal state which occurs when the replay execution failed, timed out, or finished with one of the run fails
	ReplayStateFailed ReplayState = "failed"

	// ReplayStateCancelled is a terminal state which occurs when the replay is cancelled by user
	ReplayStateCancelled ReplayState = "cancelled"

	EntityReplay = "replay"
)

func ReplayStateFromString added in v0.7.0

func ReplayStateFromString(state string) (ReplayState, error)

func (ReplayState) String added in v0.7.0

func (j ReplayState) String() string

type ReplayUserState added in v0.8.1

type ReplayUserState string // contract status for presentation layer

func (ReplayUserState) String added in v0.8.1

func (j ReplayUserState) String() string

type ReplayWithRun added in v0.7.0

type ReplayWithRun struct {
	Replay *Replay
	Runs   []*JobRunStatus // TODO: JobRunStatus does not have `message/log`
}

func (*ReplayWithRun) GetFirstExecutableRun added in v0.7.0

func (r *ReplayWithRun) GetFirstExecutableRun() *JobRunStatus

func (*ReplayWithRun) GetLastExecutableRun added in v0.7.0

func (r *ReplayWithRun) GetLastExecutableRun() *JobRunStatus

type Resource

type Resource struct {
	Request *ResourceConfig
	Limit   *ResourceConfig
}

type ResourceConfig

type ResourceConfig struct {
	CPU    string
	Memory string
}

type Retry

type Retry struct {
	ExponentialBackoff bool
	Count              int
	Delay              int32
}

type RunConfig

type RunConfig struct {
	Executor Executor

	ScheduledAt time.Time
	JobRunID    JobRunID
}

func RunConfigFrom

func RunConfigFrom(executor Executor, scheduledAt time.Time, runID string) (RunConfig, error)

type RuntimeConfig

type RuntimeConfig struct {
	Resource  *Resource
	Scheduler map[string]string
}

type SLAObject

type SLAObject struct {
	JobName        JobName
	JobScheduledAt time.Time
}

func (*SLAObject) String

func (s *SLAObject) String() string

type Schedule

type Schedule struct {
	DependsOnPast bool
	CatchUp       bool
	StartDate     time.Time
	EndDate       *time.Time
	Interval      string
}

type State

type State string
const (
	StatePending State = "pending"

	StateAccepted State = "accepted"
	StateRunning  State = "running"
	StateQueued   State = "queued"
	StateCanceled State = "canceled"

	StateRetry State = "retried"

	StateSuccess State = "success"
	StateFailed  State = "failed"

	StateNotScheduled State = "waiting_to_schedule"
	StateWaitUpstream State = "wait_upstream"
	StateInProgress   State = "in_progress"
	StateUpForRetry   State = "up_for_retry"
	StateRestarting   State = "restarting"
	StateMissing      State = "missing"
)

func StateFromString

func StateFromString(state string) (State, error)

func (State) String

func (j State) String() string

type Task

type Task struct {
	Name   string
	Config map[string]string
}

type Upstreams

type Upstreams struct {
	HTTP         []*HTTPUpstreams
	UpstreamJobs []*JobUpstream
}

type Webhook added in v0.11.4

type Webhook struct {
	On        EventCategory
	Endpoints []WebhookEndPoint
}

type WebhookAttrs added in v0.11.4

type WebhookAttrs struct {
	Owner    string
	JobEvent *Event
	Meta     *JobRunMeta
	Route    string
	Headers  map[string]string
}

type WebhookEndPoint added in v0.11.4

type WebhookEndPoint struct {
	URL     string
	Headers map[string]string
}

Directories

Path Synopsis
handler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL