scheduler

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EntityEvent = "event"

	ISODateFormat = "2006-01-02T15:04:05Z"

	EventCategorySLAMiss    JobEventCategory = "sla_miss"
	EventCategoryJobFailure JobEventCategory = "failure"

	SLAMissEvent    JobEventType = "sla_miss"
	JobFailureEvent JobEventType = "failure"
	JobSuccessEvent JobEventType = "job_success"

	TaskStartEvent   JobEventType = "task_start"
	TaskRetryEvent   JobEventType = "task_retry"
	TaskFailEvent    JobEventType = "task_fail"
	TaskSuccessEvent JobEventType = "task_success"

	HookStartEvent   JobEventType = "hook_start"
	HookRetryEvent   JobEventType = "hook_retry"
	HookFailEvent    JobEventType = "hook_fail"
	HookSuccessEvent JobEventType = "hook_success"

	SensorStartEvent   JobEventType = "sensor_start"
	SensorRetryEvent   JobEventType = "sensor_retry"
	SensorFailEvent    JobEventType = "sensor_fail"
	SensorSuccessEvent JobEventType = "sensor_success"
)
View Source
const (
	EntityJobRun = "jobRun"

	OperatorTask   OperatorType = "task"
	OperatorSensor OperatorType = "sensor"
	OperatorHook   OperatorType = "hook"

	UpstreamTypeStatic   = "static"
	UpstreamTypeInferred = "inferred"
)
View Source
const (
	MetricNotificationQueue         = "notification_queue_total"
	MetricNotificationWorkerBatch   = "notification_worker_batch_total"
	MetricNotificationWorkerSendErr = "notification_worker_send_err_total"
)
View Source
const (
	// initial state
	ReplayStateCreated ReplayState = "created"

	// running state
	ReplayStateInProgress      ReplayState = "in progress"
	ReplayStatePartialReplayed ReplayState = "partial replayed"
	ReplayStateReplayed        ReplayState = "replayed"

	// terminal state
	ReplayStateInvalid ReplayState = "invalid"
	ReplayStateSuccess ReplayState = "success"
	ReplayStateFailed  ReplayState = "failed"

	// state on presentation layer
	ReplayUserStateCreated    ReplayUserState = "created"
	ReplayUserStateInProgress ReplayUserState = "in progress"
	ReplayUserStateInvalid    ReplayUserState = "invalid"
	ReplayUserStateSuccess    ReplayUserState = "success"
	ReplayUserStateFailed     ReplayUserState = "failed"

	EntityReplay = "replay"
)

Variables

Functions

func GroupJobsByTenant

func GroupJobsByTenant(j []*JobWithDetails) map[tenant.Tenant][]*JobWithDetails

Types

type Alert

type Alert struct {
	On       JobEventCategory
	Channels []string
	Config   map[string]string
}

type ConfigMap

type ConfigMap map[string]string

type Event

type Event struct {
	JobName        JobName
	Tenant         tenant.Tenant
	Type           JobEventType
	EventTime      time.Time
	OperatorName   string
	Status         State
	JobScheduledAt time.Time
	Values         map[string]any
	SLAObjectList  []*SLAObject
}

func EventFrom

func EventFrom(eventTypeName string, eventValues map[string]any, jobName JobName, tenent tenant.Tenant) (*Event, error)

type EventName

type EventName string

type Executor

type Executor struct {
	Name string
	Type ExecutorType
}

func ExecutorFrom

func ExecutorFrom(name string, executorType ExecutorType) (Executor, error)

func ExecutorFromEnum

func ExecutorFromEnum(name, enum string) (Executor, error)

type ExecutorInput

type ExecutorInput struct {
	Configs ConfigMap
	Secrets ConfigMap
	Files   ConfigMap
}

type ExecutorType

type ExecutorType string
const (
	ExecutorTask ExecutorType = "task"
	ExecutorHook ExecutorType = "hook"
)

func ExecutorTypeFrom

func ExecutorTypeFrom(val string) (ExecutorType, error)

func (ExecutorType) String

func (e ExecutorType) String() string

type HTTPUpstreams

type HTTPUpstreams struct {
	Name    string
	URL     string
	Headers map[string]string
	Params  map[string]string
}

type Hook

type Hook struct {
	Name   string
	Config map[string]string
}

type Job

type Job struct {
	Name   JobName
	Tenant tenant.Tenant

	Destination string
	Task        *Task
	Hooks       []*Hook
	Window      models.Window
	Assets      map[string]string
}

func (*Job) GetHook

func (j *Job) GetHook(hookName string) (*Hook, error)

type JobEventCategory

type JobEventCategory string

type JobEventType

type JobEventType string

func FromStringToEventType

func FromStringToEventType(name string) (JobEventType, error)

func (JobEventType) IsOfType

func (event JobEventType) IsOfType(category JobEventCategory) bool

func (JobEventType) String

func (event JobEventType) String() string

type JobMetadata

type JobMetadata struct {
	Version     int
	Owner       string
	Description string
	Labels      map[string]string
}

type JobName

type JobName string

func JobNameFrom

func JobNameFrom(name string) (JobName, error)

func (JobName) String

func (n JobName) String() string

type JobRun

type JobRun struct {
	ID uuid.UUID

	JobName     JobName
	Tenant      tenant.Tenant
	State       State
	ScheduledAt time.Time
	StartTime   time.Time
	SLAAlert    bool
	EndTime     time.Time

	Monitoring map[string]any
}

type JobRunID

type JobRunID uuid.UUID

func JobRunIDFromString

func JobRunIDFromString(runID string) (JobRunID, error)

func (JobRunID) IsEmpty

func (i JobRunID) IsEmpty() bool

func (JobRunID) UUID

func (i JobRunID) UUID() uuid.UUID

type JobRunStatus

type JobRunStatus struct {
	ScheduledAt time.Time
	State       State
}

func JobRunStatusFrom

func JobRunStatusFrom(scheduledAt time.Time, state string) (JobRunStatus, error)

func (JobRunStatus) GetLogicalTime

func (j JobRunStatus) GetLogicalTime(jobCron *cron.ScheduleSpec) time.Time

type JobRunStatusList

type JobRunStatusList []*JobRunStatus

func (JobRunStatusList) GetSortedRunsByScheduledAt

func (j JobRunStatusList) GetSortedRunsByScheduledAt() []*JobRunStatus

func (JobRunStatusList) GetSortedRunsByStates

func (j JobRunStatusList) GetSortedRunsByStates(states []State) []*JobRunStatus

func (JobRunStatusList) MergeWithUpdatedRuns

func (j JobRunStatusList) MergeWithUpdatedRuns(updatedRunMap map[time.Time]State) []*JobRunStatus

func (JobRunStatusList) ToRunStatusMap

func (j JobRunStatusList) ToRunStatusMap() map[time.Time]State

type JobRunsCriteria

type JobRunsCriteria struct {
	Name        string
	StartDate   time.Time
	EndDate     time.Time
	Filter      []string
	OnlyLastRun bool
}

JobRunsCriteria represents the filter condition to get run status from scheduler

func (*JobRunsCriteria) ExecutionEndDate

func (c *JobRunsCriteria) ExecutionEndDate(jobCron *cron.ScheduleSpec) time.Time

func (*JobRunsCriteria) ExecutionStart

func (c *JobRunsCriteria) ExecutionStart(cron *cron.ScheduleSpec) time.Time

type JobUpstream

type JobUpstream struct {
	JobName        string
	Host           string
	TaskName       string        // TODO: remove after airflow migration
	DestinationURN string        //- bigquery://pilot.playground.table
	Tenant         tenant.Tenant // Current or external tenant
	Type           string
	External       bool
	State          string
}

type JobWithDetails

type JobWithDetails struct {
	Name JobName

	Job           *Job
	JobMetadata   *JobMetadata
	Schedule      *Schedule
	Retry         Retry
	Alerts        []Alert
	RuntimeConfig RuntimeConfig
	Priority      int
	Upstreams     Upstreams
}

JobWithDetails contains the details for a job

func (*JobWithDetails) GetLabelsAsString

func (j *JobWithDetails) GetLabelsAsString() string

func (*JobWithDetails) GetName

func (j *JobWithDetails) GetName() string

func (*JobWithDetails) GetUniqueLabelValues

func (j *JobWithDetails) GetUniqueLabelValues() []string

func (*JobWithDetails) SLADuration

func (j *JobWithDetails) SLADuration() (int64, error)

type NotifyAttrs

type NotifyAttrs struct {
	Owner    string
	JobEvent *Event
	Route    string
	Secret   string
}

type OperatorRun

type OperatorRun struct {
	ID           uuid.UUID
	Name         string
	JobRunID     uuid.UUID
	OperatorType OperatorType
	Status       State
	StartTime    time.Time
	EndTime      time.Time
}

type OperatorType

type OperatorType string

func (OperatorType) String

func (o OperatorType) String() string

type Replay

type Replay struct {
	// contains filtered or unexported fields
}

func NewReplay

func NewReplay(id uuid.UUID, jobName JobName, tenant tenant.Tenant, config *ReplayConfig, state ReplayState, createdAt time.Time) *Replay

func NewReplayRequest

func NewReplayRequest(jobName JobName, tenant tenant.Tenant, config *ReplayConfig, state ReplayState) *Replay

func (*Replay) Config

func (r *Replay) Config() *ReplayConfig

func (*Replay) CreatedAt

func (r *Replay) CreatedAt() time.Time

func (*Replay) ID

func (r *Replay) ID() uuid.UUID

func (*Replay) JobName

func (r *Replay) JobName() JobName

func (*Replay) Message

func (r *Replay) Message() string

func (*Replay) State

func (r *Replay) State() ReplayState

func (*Replay) Tenant

func (r *Replay) Tenant() tenant.Tenant

func (*Replay) UserState

func (r *Replay) UserState() ReplayUserState

type ReplayConfig

type ReplayConfig struct {
	StartTime   time.Time
	EndTime     time.Time
	Parallel    bool
	JobConfig   map[string]string
	Description string
}

func NewReplayConfig

func NewReplayConfig(startTime, endTime time.Time, parallel bool, jobConfig map[string]string, description string) *ReplayConfig

type ReplayState

type ReplayState string // contract status for business layer

func ReplayStateFromString

func ReplayStateFromString(state string) (ReplayState, error)

func (ReplayState) String

func (j ReplayState) String() string

type ReplayUserState

type ReplayUserState string // contract status for presentation layer

func (ReplayUserState) String

func (j ReplayUserState) String() string

type ReplayWithRun

type ReplayWithRun struct {
	Replay *Replay
	Runs   []*JobRunStatus // TODO: JobRunStatus does not have `message/log`
}

func (*ReplayWithRun) GetFirstExecutableRun

func (r *ReplayWithRun) GetFirstExecutableRun() *JobRunStatus

func (*ReplayWithRun) GetLastExecutableRun

func (r *ReplayWithRun) GetLastExecutableRun() *JobRunStatus

type Resource

type Resource struct {
	Request *ResourceConfig
	Limit   *ResourceConfig
}

type ResourceConfig

type ResourceConfig struct {
	CPU    string
	Memory string
}

type Retry

type Retry struct {
	ExponentialBackoff bool
	Count              int
	Delay              int32
}

type RunConfig

type RunConfig struct {
	Executor Executor

	ScheduledAt time.Time
	JobRunID    JobRunID
}

func RunConfigFrom

func RunConfigFrom(executor Executor, scheduledAt time.Time, runID string) (RunConfig, error)

type RuntimeConfig

type RuntimeConfig struct {
	Resource  *Resource
	Scheduler map[string]string
}

type SLAObject

type SLAObject struct {
	JobName        JobName
	JobScheduledAt time.Time
}

func (*SLAObject) String

func (s *SLAObject) String() string

type Schedule

type Schedule struct {
	DependsOnPast bool
	StartDate     time.Time
	EndDate       *time.Time
	Interval      string
}

type State

type State string
const (
	StatePending State = "pending"

	StateAccepted State = "accepted"
	StateRunning  State = "running"
	StateQueued   State = "queued"

	StateRetry State = "retried"

	StateSuccess State = "success"
	StateFailed  State = "failed"

	StateWaitUpstream State = "wait_upstream"
	StateInProgress   State = "in_progress"
)

func StateFromString

func StateFromString(state string) (State, error)

func (State) String

func (j State) String() string

type Task

type Task struct {
	Name   string
	Config map[string]string
}

type Upstreams

type Upstreams struct {
	HTTP         []*HTTPUpstreams
	UpstreamJobs []*JobUpstream
}

Directories

Path Synopsis
handler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL