Documentation ¶
Index ¶
- Constants
- type AssetCompiler
- type EventHandler
- type FilesCompiler
- type InputCompiler
- type JobInputCompiler
- type JobReplayRepository
- type JobReplayRunService
- type JobRepository
- type JobRunAssetsCompiler
- type JobRunRepository
- type JobRunService
- func (s *JobRunService) GetJobRuns(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, ...) ([]*scheduler.JobRunStatus, error)
- func (s *JobRunService) JobRunInput(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, ...) (*scheduler.ExecutorInput, error)
- func (s *JobRunService) UpdateJobState(ctx context.Context, event *scheduler.Event) error
- func (s *JobRunService) UploadJobs(ctx context.Context, tnnt tenant.Tenant, toUpdate, toDelete []string) (err error)
- func (s *JobRunService) UploadToScheduler(ctx context.Context, projectName tenant.ProjectName) error
- type Notifier
- type NotifyService
- type OperatorRunRepository
- type PluginRepo
- type PriorityResolver
- type ReplayManager
- type ReplayRepository
- type ReplayScheduler
- type ReplayService
- func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, ...) (replayID uuid.UUID, err error)
- func (r *ReplayService) GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error)
- func (r *ReplayService) GetReplayList(ctx context.Context, projectName tenant.ProjectName) (replays []*scheduler.Replay, err error)
- func (r *ReplayService) GetRunsStatus(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, ...) ([]*scheduler.JobRunStatus, error)
- type ReplayValidator
- type ReplayWorker
- type Scheduler
- type SchedulerRunGetter
- type TemplateCompiler
- type TenantService
- type Validator
- type Worker
Constants ¶
View Source
const ( SecretsStringToMatch = ".secret." TimeISOFormat = time.RFC3339 JobAttributionLabelsKey = "JOB_LABELS" )
View Source
const ( NotificationSchemeSlack = "slack" NotificationSchemePagerDuty = "pagerduty" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AssetCompiler ¶
type EventHandler ¶ added in v0.7.0
type FilesCompiler ¶
type InputCompiler ¶
type InputCompiler struct {
// contains filtered or unexported fields
}
func NewJobInputCompiler ¶
func NewJobInputCompiler(tenantService TenantService, compiler TemplateCompiler, assetCompiler AssetCompiler, logger log.Logger) *InputCompiler
type JobInputCompiler ¶
type JobReplayRepository ¶ added in v0.7.0
type JobReplayRunService ¶ added in v0.7.0
type JobReplayRunService interface {
GetJobRuns(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, criteria *scheduler.JobRunsCriteria) ([]*scheduler.JobRunStatus, error)
}
type JobRepository ¶
type JobRepository interface { GetJob(ctx context.Context, name tenant.ProjectName, jobName scheduler.JobName) (*scheduler.Job, error) GetJobDetails(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName) (*scheduler.JobWithDetails, error) GetAll(ctx context.Context, projectName tenant.ProjectName) ([]*scheduler.JobWithDetails, error) GetJobs(ctx context.Context, projectName tenant.ProjectName, jobs []string) ([]*scheduler.JobWithDetails, error) }
type JobRunAssetsCompiler ¶
type JobRunAssetsCompiler struct {
// contains filtered or unexported fields
}
func NewJobAssetsCompiler ¶
func NewJobAssetsCompiler(engine FilesCompiler, pluginRepo PluginRepo, logger log.Logger) *JobRunAssetsCompiler
type JobRunRepository ¶
type JobRunRepository interface { GetByID(ctx context.Context, id scheduler.JobRunID) (*scheduler.JobRun, error) GetByScheduledAt(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time) (*scheduler.JobRun, error) GetByScheduledTimes(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, scheduledTimes []time.Time) ([]*scheduler.JobRun, error) Create(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time, slaDefinitionInSec int64) error Update(ctx context.Context, jobRunID uuid.UUID, endTime time.Time, jobRunStatus scheduler.State) error UpdateState(ctx context.Context, jobRunID uuid.UUID, jobRunStatus scheduler.State) error UpdateSLA(ctx context.Context, jobName scheduler.JobName, project tenant.ProjectName, scheduledTimes []time.Time) error UpdateMonitoring(ctx context.Context, jobRunID uuid.UUID, monitoring map[string]any) error }
type JobRunService ¶
type JobRunService struct {
// contains filtered or unexported fields
}
func NewJobRunService ¶
func NewJobRunService(logger log.Logger, jobRepo JobRepository, jobRunRepo JobRunRepository, replayRepo JobReplayRepository, operatorRunRepo OperatorRunRepository, scheduler Scheduler, resolver PriorityResolver, compiler JobInputCompiler, eventHandler EventHandler, ) *JobRunService
func (*JobRunService) GetJobRuns ¶
func (s *JobRunService) GetJobRuns(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, criteria *scheduler.JobRunsCriteria) ([]*scheduler.JobRunStatus, error)
func (*JobRunService) JobRunInput ¶
func (s *JobRunService) JobRunInput(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, config scheduler.RunConfig) (*scheduler.ExecutorInput, error)
func (*JobRunService) UpdateJobState ¶
func (*JobRunService) UploadJobs ¶ added in v0.8.0
func (*JobRunService) UploadToScheduler ¶
func (s *JobRunService) UploadToScheduler(ctx context.Context, projectName tenant.ProjectName) error
type NotifyService ¶
type NotifyService struct {
// contains filtered or unexported fields
}
func NewNotifyService ¶
func NewNotifyService(l log.Logger, jobRepo JobRepository, tenantService TenantService, notifyChan map[string]Notifier) *NotifyService
func (*NotifyService) Close ¶
func (n *NotifyService) Close() error
type OperatorRunRepository ¶
type OperatorRunRepository interface { GetOperatorRun(ctx context.Context, operatorName string, operator scheduler.OperatorType, jobRunID uuid.UUID) (*scheduler.OperatorRun, error) CreateOperatorRun(ctx context.Context, operatorName string, operator scheduler.OperatorType, jobRunID uuid.UUID, startTime time.Time) error UpdateOperatorRun(ctx context.Context, operator scheduler.OperatorType, jobRunID uuid.UUID, eventTime time.Time, state scheduler.State) error }
type PriorityResolver ¶
type PriorityResolver interface {
Resolve(context.Context, []*scheduler.JobWithDetails) error
}
type ReplayManager ¶ added in v0.7.0
func NewReplayManager ¶ added in v0.7.0
func NewReplayManager(l log.Logger, replayRepository ReplayRepository, replayWorker Worker, now func() time.Time, config config.ReplayConfig) *ReplayManager
func (ReplayManager) Initialize ¶ added in v0.7.0
func (m ReplayManager) Initialize()
func (ReplayManager) StartReplayLoop ¶ added in v0.7.0
func (m ReplayManager) StartReplayLoop()
type ReplayRepository ¶ added in v0.7.0
type ReplayRepository interface { RegisterReplay(ctx context.Context, replay *scheduler.Replay, runs []*scheduler.JobRunStatus) (uuid.UUID, error) UpdateReplay(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, runs []*scheduler.JobRunStatus, message string) error UpdateReplayStatus(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, message string) error GetReplayToExecute(context.Context) (*scheduler.ReplayWithRun, error) GetReplayRequestsByStatus(ctx context.Context, statusList []scheduler.ReplayState) ([]*scheduler.Replay, error) GetReplaysByProject(ctx context.Context, projectName tenant.ProjectName, dayLimits int) ([]*scheduler.Replay, error) GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error) }
type ReplayScheduler ¶ added in v0.7.0
type ReplayScheduler interface { Clear(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) error ClearBatch(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, startTime, endTime time.Time) error CreateRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) }
type ReplayService ¶ added in v0.7.0
type ReplayService struct {
// contains filtered or unexported fields
}
func NewReplayService ¶ added in v0.7.0
func NewReplayService(replayRepo ReplayRepository, jobRepo JobRepository, validator ReplayValidator, runGetter SchedulerRunGetter, logger log.Logger) *ReplayService
func (*ReplayService) CreateReplay ¶ added in v0.7.0
func (*ReplayService) GetReplayByID ¶ added in v0.7.0
func (r *ReplayService) GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error)
func (*ReplayService) GetReplayList ¶ added in v0.7.0
func (r *ReplayService) GetReplayList(ctx context.Context, projectName tenant.ProjectName) (replays []*scheduler.Replay, err error)
func (*ReplayService) GetRunsStatus ¶ added in v0.9.0
func (r *ReplayService) GetRunsStatus(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) ([]*scheduler.JobRunStatus, error)
type ReplayValidator ¶ added in v0.7.0
type ReplayWorker ¶ added in v0.7.0
type ReplayWorker struct {
// contains filtered or unexported fields
}
func NewReplayWorker ¶ added in v0.7.0
func NewReplayWorker(l log.Logger, replayRepo ReplayRepository, scheduler ReplayScheduler, jobRepo JobRepository, config config.ReplayConfig) *ReplayWorker
func (ReplayWorker) Process ¶ added in v0.7.0
func (w ReplayWorker) Process(replayReq *scheduler.ReplayWithRun)
type Scheduler ¶
type Scheduler interface { GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) DeployJobs(ctx context.Context, t tenant.Tenant, jobs []*scheduler.JobWithDetails) error ListJobs(ctx context.Context, t tenant.Tenant) ([]string, error) DeleteJobs(ctx context.Context, t tenant.Tenant, jobsToDelete []string) error }
type SchedulerRunGetter ¶ added in v0.9.0
type SchedulerRunGetter interface {
GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
}
type TemplateCompiler ¶
type TenantService ¶
type Validator ¶ added in v0.7.0
type Validator struct {
// contains filtered or unexported fields
}
func NewValidator ¶ added in v0.7.0
func NewValidator(replayRepository ReplayRepository, scheduler ReplayScheduler, jobRepo JobRepository) *Validator
type Worker ¶ added in v0.7.0
type Worker interface {
Process(*scheduler.ReplayWithRun)
}
Click to show internal directories.
Click to hide internal directories.