Documentation ¶
Index ¶
- Constants
- type AlertManager
- type AssetCompiler
- type EventHandler
- type EventsService
- type FilesCompiler
- type InputCompiler
- type JobInputCompiler
- type JobReplayRepository
- type JobRepository
- type JobRunAssetsCompiler
- type JobRunRepository
- type JobRunService
- func (s *JobRunService) GetInterval(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, ...) (interval.Interval, error)
- func (s *JobRunService) GetJobRuns(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, ...) ([]*scheduler.JobRunStatus, error)
- func (s *JobRunService) GetJobRunsByFilter(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, ...) ([]*scheduler.JobRun, error)
- func (s *JobRunService) JobRunInput(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, ...) (*scheduler.ExecutorInput, error)
- func (s *JobRunService) UpdateJobScheduleState(ctx context.Context, tnnt tenant.Tenant, jobName []job.Name, state string) error
- func (s *JobRunService) UpdateJobState(ctx context.Context, event *scheduler.Event) error
- func (s *JobRunService) UploadJobs(ctx context.Context, tnnt tenant.Tenant, toUpdate, toDelete []string) (err error)
- func (s *JobRunService) UploadToScheduler(ctx context.Context, projectName tenant.ProjectName) error
- type Notifier
- type OperatorRunRepository
- type PluginRepo
- type PriorityResolver
- type ProjectGetter
- type ReplayExecutor
- type ReplayRepository
- type ReplayScheduler
- type ReplayService
- func (r *ReplayService) CancelReplay(ctx context.Context, replayWithRun *scheduler.ReplayWithRun) error
- func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, ...) (replayID uuid.UUID, err error)
- func (r *ReplayService) GetByFilter(ctx context.Context, project tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error)
- func (r *ReplayService) GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error)
- func (r *ReplayService) GetReplayList(ctx context.Context, projectName tenant.ProjectName) (replays []*scheduler.Replay, err error)
- func (r *ReplayService) GetRunsStatus(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, ...) ([]*scheduler.JobRunStatus, error)
- type ReplayValidator
- type ReplayWorker
- func (w *ReplayWorker) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, ...) []*scheduler.JobRunStatus
- func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName)
- func (w *ReplayWorker) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, ...) (scheduler.JobRunStatusList, error)
- func (w *ReplayWorker) FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error)
- func (w *ReplayWorker) ScanReplayRequest(ctx context.Context)
- type Scheduler
- type SchedulerRunGetter
- type TemplateCompiler
- type TenantGetter
- type TenantService
- type Validator
- type Webhook
Constants ¶
View Source
const ( NotificationSchemeSlack = "slack" NotificationSchemePagerDuty = "pagerduty" )
View Source
const ( SecretsStringToMatch = ".secret." TimeISOFormat = time.RFC3339 JobAttributionLabelsKey = "JOB_LABELS" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AlertManager ¶ added in v0.11.7
type AlertManager interface { SendJobRunEvent(attr *scheduler.AlertAttrs) SendReplayEvent(attr *scheduler.ReplayNotificationAttrs) }
type AssetCompiler ¶
type EventHandler ¶ added in v0.7.0
type EventsService ¶ added in v0.11.7
type EventsService struct {
// contains filtered or unexported fields
}
func NewEventsService ¶ added in v0.11.7
func NewEventsService(l log.Logger, jobRepo JobRepository, tenantService TenantService, notifyChan map[string]Notifier, webhookNotifier Webhook, compiler TemplateCompiler, alertsHandler AlertManager) *EventsService
func (*EventsService) Close ¶ added in v0.11.7
func (e *EventsService) Close() error
type FilesCompiler ¶
type InputCompiler ¶
type InputCompiler struct {
// contains filtered or unexported fields
}
func NewJobInputCompiler ¶
func NewJobInputCompiler(tenantService TenantService, compiler TemplateCompiler, assetCompiler AssetCompiler, logger log.Logger) *InputCompiler
func (InputCompiler) Compile ¶
func (i InputCompiler) Compile(ctx context.Context, job *scheduler.JobWithDetails, config scheduler.RunConfig, executedAt time.Time) (*scheduler.ExecutorInput, error)
type JobInputCompiler ¶
type JobReplayRepository ¶ added in v0.7.0
type JobRepository ¶
type JobRepository interface { GetJob(ctx context.Context, name tenant.ProjectName, jobName scheduler.JobName) (*scheduler.Job, error) GetJobDetails(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName) (*scheduler.JobWithDetails, error) GetAll(ctx context.Context, projectName tenant.ProjectName) ([]*scheduler.JobWithDetails, error) GetJobs(ctx context.Context, projectName tenant.ProjectName, jobs []string) ([]*scheduler.JobWithDetails, error) }
type JobRunAssetsCompiler ¶
type JobRunAssetsCompiler struct {
// contains filtered or unexported fields
}
func NewJobAssetsCompiler ¶
func NewJobAssetsCompiler(engine FilesCompiler, logger log.Logger) *JobRunAssetsCompiler
func (*JobRunAssetsCompiler) CompileJobRunAssets ¶
func (*JobRunAssetsCompiler) CompileQuery ¶ added in v0.10.1
type JobRunRepository ¶
type JobRunRepository interface { GetByID(ctx context.Context, id scheduler.JobRunID) (*scheduler.JobRun, error) GetByScheduledAt(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time) (*scheduler.JobRun, error) GetLatestRun(ctx context.Context, project tenant.ProjectName, name scheduler.JobName, status *scheduler.State) (*scheduler.JobRun, error) GetRunsByTimeRange(ctx context.Context, project tenant.ProjectName, jobName scheduler.JobName, status *scheduler.State, since, until time.Time) ([]*scheduler.JobRun, error) GetByScheduledTimes(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, scheduledTimes []time.Time) ([]*scheduler.JobRun, error) Create(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time, slaDefinitionInSec int64) error Update(ctx context.Context, jobRunID uuid.UUID, endTime time.Time, jobRunStatus scheduler.State) error UpdateState(ctx context.Context, jobRunID uuid.UUID, jobRunStatus scheduler.State) error UpdateSLA(ctx context.Context, jobName scheduler.JobName, project tenant.ProjectName, scheduledTimes []time.Time) error UpdateMonitoring(ctx context.Context, jobRunID uuid.UUID, monitoring map[string]any) error }
type JobRunService ¶
type JobRunService struct {
// contains filtered or unexported fields
}
func NewJobRunService ¶
func NewJobRunService(logger log.Logger, jobRepo JobRepository, jobRunRepo JobRunRepository, replayRepo JobReplayRepository, operatorRunRepo OperatorRunRepository, scheduler Scheduler, resolver PriorityResolver, compiler JobInputCompiler, eventHandler EventHandler, projectGetter ProjectGetter, ) *JobRunService
func (*JobRunService) GetInterval ¶ added in v0.10.0
func (*JobRunService) GetJobRuns ¶
func (s *JobRunService) GetJobRuns(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, criteria *scheduler.JobRunsCriteria) ([]*scheduler.JobRunStatus, error)
func (*JobRunService) GetJobRunsByFilter ¶ added in v0.17.0
func (*JobRunService) JobRunInput ¶
func (s *JobRunService) JobRunInput(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, config scheduler.RunConfig) (*scheduler.ExecutorInput, error)
func (*JobRunService) UpdateJobScheduleState ¶ added in v0.9.6
func (*JobRunService) UpdateJobState ¶
func (*JobRunService) UploadJobs ¶ added in v0.8.0
func (*JobRunService) UploadToScheduler ¶
func (s *JobRunService) UploadToScheduler(ctx context.Context, projectName tenant.ProjectName) error
type OperatorRunRepository ¶
type OperatorRunRepository interface { GetOperatorRun(ctx context.Context, operatorName string, operator scheduler.OperatorType, jobRunID uuid.UUID) (*scheduler.OperatorRun, error) CreateOperatorRun(ctx context.Context, operatorName string, operator scheduler.OperatorType, jobRunID uuid.UUID, startTime time.Time) error UpdateOperatorRun(ctx context.Context, operator scheduler.OperatorType, jobRunID uuid.UUID, eventTime time.Time, state scheduler.State) error }
type PriorityResolver ¶
type PriorityResolver interface {
Resolve(context.Context, []*scheduler.JobWithDetails) error
}
type ProjectGetter ¶ added in v0.10.0
type ReplayExecutor ¶ added in v0.11.0
type ReplayExecutor interface { Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus }
type ReplayRepository ¶ added in v0.7.0
type ReplayRepository interface { RegisterReplay(ctx context.Context, replay *scheduler.Replay, runs []*scheduler.JobRunStatus) (uuid.UUID, error) UpdateReplay(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, runs []*scheduler.JobRunStatus, message string) error UpdateReplayHeartbeat(ctx context.Context, replayID uuid.UUID) error UpdateReplayRuns(ctx context.Context, replayID uuid.UUID, runs []*scheduler.JobRunStatus) error UpdateReplayStatus(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, message string) error CancelReplayRequest(ctx context.Context, replayID uuid.UUID, message string) error ScanAbandonedReplayRequests(ctx context.Context, unhandledClassifierDuration time.Duration) ([]*scheduler.Replay, error) AcquireReplayRequest(ctx context.Context, replayID uuid.UUID, unhandledClassifierDuration time.Duration) error GetReplayRequestByID(ctx context.Context, replayID uuid.UUID) (*scheduler.Replay, error) GetReplayByFilters(ctx context.Context, projectName tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error) GetReplayRequestsByStatus(ctx context.Context, statusList []scheduler.ReplayState) ([]*scheduler.Replay, error) GetReplaysByProject(ctx context.Context, projectName tenant.ProjectName, dayLimits int) ([]*scheduler.Replay, error) GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error) }
type ReplayScheduler ¶ added in v0.7.0
type ReplayScheduler interface { Clear(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) error ClearBatch(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, startTime, endTime time.Time) error CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, dagRunID string) error CreateRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) GetJobRunsWithDetails(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunWithDetails, error) }
type ReplayService ¶ added in v0.7.0
type ReplayService struct {
// contains filtered or unexported fields
}
func NewReplayService ¶ added in v0.7.0
func NewReplayService( replayRepo ReplayRepository, jobRepo JobRepository, tenantGetter TenantGetter, validator ReplayValidator, worker ReplayExecutor, runGetter SchedulerRunGetter, logger log.Logger, pluginToExecutionProjectKeyMap map[string]string, alertManager AlertManager, ) *ReplayService
func (*ReplayService) CancelReplay ¶ added in v0.11.3
func (r *ReplayService) CancelReplay(ctx context.Context, replayWithRun *scheduler.ReplayWithRun) error
func (*ReplayService) CreateReplay ¶ added in v0.7.0
func (*ReplayService) GetByFilter ¶ added in v0.16.1
func (r *ReplayService) GetByFilter(ctx context.Context, project tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error)
func (*ReplayService) GetReplayByID ¶ added in v0.7.0
func (r *ReplayService) GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error)
func (*ReplayService) GetReplayList ¶ added in v0.7.0
func (r *ReplayService) GetReplayList(ctx context.Context, projectName tenant.ProjectName) (replays []*scheduler.Replay, err error)
func (*ReplayService) GetRunsStatus ¶ added in v0.9.0
func (r *ReplayService) GetRunsStatus(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) ([]*scheduler.JobRunStatus, error)
type ReplayValidator ¶ added in v0.7.0
type ReplayWorker ¶ added in v0.7.0
type ReplayWorker struct {
// contains filtered or unexported fields
}
func NewReplayWorker ¶ added in v0.7.0
func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig, alertManager AlertManager) *ReplayWorker
func (*ReplayWorker) CancelReplayRunsOnScheduler ¶ added in v0.17.0
func (w *ReplayWorker) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus
func (*ReplayWorker) FetchAndSyncStatus ¶ added in v0.17.0
func (w *ReplayWorker) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error)
func (*ReplayWorker) FetchRunsWithDetails ¶ added in v0.17.0
func (w *ReplayWorker) FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error)
func (*ReplayWorker) ScanReplayRequest ¶ added in v0.17.0
func (w *ReplayWorker) ScanReplayRequest(ctx context.Context)
type Scheduler ¶
type Scheduler interface { GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) DeployJobs(ctx context.Context, t tenant.Tenant, jobs []*scheduler.JobWithDetails) error ListJobs(ctx context.Context, t tenant.Tenant) ([]string, error) DeleteJobs(ctx context.Context, t tenant.Tenant, jobsToDelete []string) error UpdateJobState(ctx context.Context, tnnt tenant.Tenant, jobName []job.Name, state string) error }
type SchedulerRunGetter ¶ added in v0.9.0
type SchedulerRunGetter interface {
GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
}
type TemplateCompiler ¶
type TenantGetter ¶ added in v0.13.0
type TenantService ¶
type Validator ¶ added in v0.7.0
type Validator struct {
// contains filtered or unexported fields
}
func NewValidator ¶ added in v0.7.0
func NewValidator(replayRepository ReplayRepository, scheduler ReplayScheduler, jobRepo JobRepository) *Validator
Click to show internal directories.
Click to hide internal directories.