service

package
v0.18.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2024 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NotificationSchemeSlack     = "slack"
	NotificationSchemePagerDuty = "pagerduty"
)
View Source
const (
	SecretsStringToMatch = ".secret."

	TimeISOFormat = time.RFC3339

	JobAttributionLabelsKey = "JOB_LABELS"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AlertManager added in v0.11.7

type AlertManager interface {
	SendJobRunEvent(attr *scheduler.AlertAttrs)
	SendReplayEvent(attr *scheduler.ReplayNotificationAttrs)
}

type AssetCompiler

type AssetCompiler interface {
	CompileJobRunAssets(ctx context.Context, job *scheduler.Job, systemEnvVars map[string]string, interval interval.Interval, contextForTask map[string]interface{}) (map[string]string, error)
}

type EventHandler added in v0.7.0

type EventHandler interface {
	HandleEvent(moderator.Event)
}

type EventsService added in v0.11.7

type EventsService struct {
	// contains filtered or unexported fields
}

func NewEventsService added in v0.11.7

func NewEventsService(l log.Logger, jobRepo JobRepository, tenantService TenantService, notifyChan map[string]Notifier, webhookNotifier Webhook, compiler TemplateCompiler, alertsHandler AlertManager) *EventsService

func (*EventsService) Close added in v0.11.7

func (e *EventsService) Close() error

func (*EventsService) Push added in v0.11.7

func (e *EventsService) Push(ctx context.Context, event *scheduler.Event) error

func (*EventsService) Relay added in v0.11.7

func (e *EventsService) Relay(ctx context.Context, event *scheduler.Event) error

func (*EventsService) Webhook added in v0.11.7

func (e *EventsService) Webhook(ctx context.Context, event *scheduler.Event) error

type FilesCompiler

type FilesCompiler interface {
	Compile(fileMap map[string]string, context map[string]any) (map[string]string, error)
}

type InputCompiler

type InputCompiler struct {
	// contains filtered or unexported fields
}

func NewJobInputCompiler

func NewJobInputCompiler(tenantService TenantService, compiler TemplateCompiler, assetCompiler AssetCompiler, logger log.Logger) *InputCompiler

func (InputCompiler) Compile

type JobInputCompiler

type JobInputCompiler interface {
	Compile(ctx context.Context, job *scheduler.JobWithDetails, config scheduler.RunConfig, executedAt time.Time) (*scheduler.ExecutorInput, error)
}

type JobReplayRepository added in v0.7.0

type JobReplayRepository interface {
	GetReplayJobConfig(ctx context.Context, jobTenant tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) (map[string]string, error)
}

type JobRepository

type JobRepository interface {
	GetJob(ctx context.Context, name tenant.ProjectName, jobName scheduler.JobName) (*scheduler.Job, error)
	GetJobDetails(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName) (*scheduler.JobWithDetails, error)
	GetAll(ctx context.Context, projectName tenant.ProjectName) ([]*scheduler.JobWithDetails, error)
	GetJobs(ctx context.Context, projectName tenant.ProjectName, jobs []string) ([]*scheduler.JobWithDetails, error)
}

type JobRunAssetsCompiler

type JobRunAssetsCompiler struct {
	// contains filtered or unexported fields
}

func NewJobAssetsCompiler

func NewJobAssetsCompiler(engine FilesCompiler, logger log.Logger) *JobRunAssetsCompiler

func (*JobRunAssetsCompiler) CompileJobRunAssets

func (c *JobRunAssetsCompiler) CompileJobRunAssets(_ context.Context, job *scheduler.Job, systemEnvVars map[string]string, interval interval.Interval, contextForTask map[string]interface{}) (map[string]string, error)

func (*JobRunAssetsCompiler) CompileQuery added in v0.10.1

func (c *JobRunAssetsCompiler) CompileQuery(startTime, endTime time.Time, query string, envs map[string]string) (string, error)

type JobRunRepository

type JobRunRepository interface {
	GetByID(ctx context.Context, id scheduler.JobRunID) (*scheduler.JobRun, error)
	GetByScheduledAt(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time) (*scheduler.JobRun, error)
	GetLatestRun(ctx context.Context, project tenant.ProjectName, name scheduler.JobName, status *scheduler.State) (*scheduler.JobRun, error)
	GetRunsByTimeRange(ctx context.Context, project tenant.ProjectName, jobName scheduler.JobName, status *scheduler.State, since, until time.Time) ([]*scheduler.JobRun, error)
	GetByScheduledTimes(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, scheduledTimes []time.Time) ([]*scheduler.JobRun, error)
	Create(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time, slaDefinitionInSec int64) error
	Update(ctx context.Context, jobRunID uuid.UUID, endTime time.Time, jobRunStatus scheduler.State) error
	UpdateState(ctx context.Context, jobRunID uuid.UUID, jobRunStatus scheduler.State) error
	UpdateSLA(ctx context.Context, jobName scheduler.JobName, project tenant.ProjectName, scheduledTimes []time.Time) error
	UpdateMonitoring(ctx context.Context, jobRunID uuid.UUID, monitoring map[string]any) error
}

type JobRunService

type JobRunService struct {
	// contains filtered or unexported fields
}

func NewJobRunService

func NewJobRunService(logger log.Logger, jobRepo JobRepository, jobRunRepo JobRunRepository, replayRepo JobReplayRepository,
	operatorRunRepo OperatorRunRepository, scheduler Scheduler, resolver PriorityResolver, compiler JobInputCompiler, eventHandler EventHandler,
	projectGetter ProjectGetter,
) *JobRunService

func (*JobRunService) GetInterval added in v0.10.0

func (s *JobRunService) GetInterval(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, referenceTime time.Time) (interval.Interval, error)

func (*JobRunService) GetJobRuns

func (s *JobRunService) GetJobRuns(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, criteria *scheduler.JobRunsCriteria) ([]*scheduler.JobRunStatus, error)

func (*JobRunService) GetJobRunsByFilter added in v0.17.0

func (s *JobRunService) GetJobRunsByFilter(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, filters ...filter.FilterOpt) ([]*scheduler.JobRun, error)

func (*JobRunService) JobRunInput

func (s *JobRunService) JobRunInput(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, config scheduler.RunConfig) (*scheduler.ExecutorInput, error)

func (*JobRunService) UpdateJobScheduleState added in v0.9.6

func (s *JobRunService) UpdateJobScheduleState(ctx context.Context, tnnt tenant.Tenant, jobName []job.Name, state string) error

func (*JobRunService) UpdateJobState

func (s *JobRunService) UpdateJobState(ctx context.Context, event *scheduler.Event) error

func (*JobRunService) UploadJobs added in v0.8.0

func (s *JobRunService) UploadJobs(ctx context.Context, tnnt tenant.Tenant, toUpdate, toDelete []string) (err error)

func (*JobRunService) UploadToScheduler

func (s *JobRunService) UploadToScheduler(ctx context.Context, projectName tenant.ProjectName) error

type Notifier

type Notifier interface {
	io.Closer
	Notify(ctx context.Context, attr scheduler.NotifyAttrs) error
}

type OperatorRunRepository

type OperatorRunRepository interface {
	GetOperatorRun(ctx context.Context, operatorName string, operator scheduler.OperatorType, jobRunID uuid.UUID) (*scheduler.OperatorRun, error)
	CreateOperatorRun(ctx context.Context, operatorName string, operator scheduler.OperatorType, jobRunID uuid.UUID, startTime time.Time) error
	UpdateOperatorRun(ctx context.Context, operator scheduler.OperatorType, jobRunID uuid.UUID, eventTime time.Time, state scheduler.State) error
}

type PluginRepo

type PluginRepo interface {
	GetByName(name string) (*plugin.Plugin, error)
}

type PriorityResolver

type PriorityResolver interface {
	Resolve(context.Context, []*scheduler.JobWithDetails) error
}

type ProjectGetter added in v0.10.0

type ProjectGetter interface {
	Get(context.Context, tenant.ProjectName) (*tenant.Project, error)
}

type ReplayExecutor added in v0.11.0

type ReplayExecutor interface {
	Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName)
	FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error)
	FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error)
	CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus
}

type ReplayRepository added in v0.7.0

type ReplayRepository interface {
	RegisterReplay(ctx context.Context, replay *scheduler.Replay, runs []*scheduler.JobRunStatus) (uuid.UUID, error)
	UpdateReplay(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, runs []*scheduler.JobRunStatus, message string) error
	UpdateReplayHeartbeat(ctx context.Context, replayID uuid.UUID) error
	UpdateReplayRuns(ctx context.Context, replayID uuid.UUID, runs []*scheduler.JobRunStatus) error
	UpdateReplayStatus(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, message string) error
	CancelReplayRequest(ctx context.Context, replayID uuid.UUID, message string) error
	ScanAbandonedReplayRequests(ctx context.Context, unhandledClassifierDuration time.Duration) ([]*scheduler.Replay, error)
	AcquireReplayRequest(ctx context.Context, replayID uuid.UUID, unhandledClassifierDuration time.Duration) error

	GetReplayRequestByID(ctx context.Context, replayID uuid.UUID) (*scheduler.Replay, error)
	GetReplayByFilters(ctx context.Context, projectName tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error)
	GetReplayRequestsByStatus(ctx context.Context, statusList []scheduler.ReplayState) ([]*scheduler.Replay, error)
	GetReplaysByProject(ctx context.Context, projectName tenant.ProjectName, dayLimits int) ([]*scheduler.Replay, error)
	GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error)
}

type ReplayScheduler added in v0.7.0

type ReplayScheduler interface {
	Clear(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) error
	ClearBatch(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, startTime, endTime time.Time) error

	CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, dagRunID string) error
	CreateRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error
	GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
	GetJobRunsWithDetails(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunWithDetails, error)
}

type ReplayService added in v0.7.0

type ReplayService struct {
	// contains filtered or unexported fields
}

func NewReplayService added in v0.7.0

func NewReplayService(
	replayRepo ReplayRepository,
	jobRepo JobRepository,
	tenantGetter TenantGetter,
	validator ReplayValidator,
	worker ReplayExecutor,
	runGetter SchedulerRunGetter,
	logger log.Logger,
	pluginToExecutionProjectKeyMap map[string]string,
	alertManager AlertManager,
) *ReplayService

func (*ReplayService) CancelReplay added in v0.11.3

func (r *ReplayService) CancelReplay(ctx context.Context, replayWithRun *scheduler.ReplayWithRun) error

func (*ReplayService) CreateReplay added in v0.7.0

func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error)

func (*ReplayService) GetByFilter added in v0.16.1

func (r *ReplayService) GetByFilter(ctx context.Context, project tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error)

func (*ReplayService) GetReplayByID added in v0.7.0

func (r *ReplayService) GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error)

func (*ReplayService) GetReplayList added in v0.7.0

func (r *ReplayService) GetReplayList(ctx context.Context, projectName tenant.ProjectName) (replays []*scheduler.Replay, err error)

func (*ReplayService) GetRunsStatus added in v0.9.0

func (r *ReplayService) GetRunsStatus(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) ([]*scheduler.JobRunStatus, error)

type ReplayValidator added in v0.7.0

type ReplayValidator interface {
	Validate(ctx context.Context, replayRequest *scheduler.Replay, jobCron *cron.ScheduleSpec) error
}

type ReplayWorker added in v0.7.0

type ReplayWorker struct {
	// contains filtered or unexported fields
}

func NewReplayWorker added in v0.7.0

func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig, alertManager AlertManager) *ReplayWorker

func (*ReplayWorker) CancelReplayRunsOnScheduler added in v0.17.0

func (w *ReplayWorker) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus

func (*ReplayWorker) Execute added in v0.11.0

func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName)

func (*ReplayWorker) FetchAndSyncStatus added in v0.17.0

func (w *ReplayWorker) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error)

func (*ReplayWorker) FetchRunsWithDetails added in v0.17.0

func (w *ReplayWorker) FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error)

func (*ReplayWorker) ScanReplayRequest added in v0.17.0

func (w *ReplayWorker) ScanReplayRequest(ctx context.Context)

type Scheduler

type Scheduler interface {
	GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
	DeployJobs(ctx context.Context, t tenant.Tenant, jobs []*scheduler.JobWithDetails) error
	ListJobs(ctx context.Context, t tenant.Tenant) ([]string, error)
	DeleteJobs(ctx context.Context, t tenant.Tenant, jobsToDelete []string) error
	UpdateJobState(ctx context.Context, tnnt tenant.Tenant, jobName []job.Name, state string) error
}

type SchedulerRunGetter added in v0.9.0

type SchedulerRunGetter interface {
	GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
}

type TemplateCompiler

type TemplateCompiler interface {
	Compile(templateMap map[string]string, context map[string]any) (map[string]string, error)
}

type TenantGetter added in v0.13.0

type TenantGetter interface {
	GetDetails(ctx context.Context, tnnt tenant.Tenant) (*tenant.WithDetails, error)
}

type TenantService

type TenantService interface {
	GetDetails(ctx context.Context, tnnt tenant.Tenant) (*tenant.WithDetails, error)
	GetSecrets(ctx context.Context, tnnt tenant.Tenant) ([]*tenant.PlainTextSecret, error)
}

type Validator added in v0.7.0

type Validator struct {
	// contains filtered or unexported fields
}

func NewValidator added in v0.7.0

func NewValidator(replayRepository ReplayRepository, scheduler ReplayScheduler, jobRepo JobRepository) *Validator

func (Validator) Validate added in v0.7.0

func (v Validator) Validate(ctx context.Context, replayRequest *scheduler.Replay, jobCron *cron.ScheduleSpec) error

type Webhook added in v0.11.4

type Webhook interface {
	io.Closer
	Trigger(attr scheduler.WebhookAttrs)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL