service

package
v0.11.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SecretsStringToMatch = ".secret."

	TimeISOFormat = time.RFC3339

	JobAttributionLabelsKey = "JOB_LABELS"
)
View Source
const (
	NotificationSchemeSlack     = "slack"
	NotificationSchemePagerDuty = "pagerduty"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AssetCompiler

type AssetCompiler interface {
	CompileJobRunAssets(ctx context.Context, job *scheduler.Job, systemEnvVars map[string]string, interval interval.Interval, contextForTask map[string]interface{}) (map[string]string, error)
}

type EventHandler added in v0.7.0

type EventHandler interface {
	HandleEvent(moderator.Event)
}

type FilesCompiler

type FilesCompiler interface {
	Compile(fileMap map[string]string, context map[string]any) (map[string]string, error)
}

type InputCompiler

type InputCompiler struct {
	// contains filtered or unexported fields
}

func NewJobInputCompiler

func NewJobInputCompiler(tenantService TenantService, compiler TemplateCompiler, assetCompiler AssetCompiler, logger log.Logger) *InputCompiler

func (InputCompiler) Compile

type JobInputCompiler

type JobInputCompiler interface {
	Compile(ctx context.Context, job *scheduler.JobWithDetails, config scheduler.RunConfig, executedAt time.Time) (*scheduler.ExecutorInput, error)
}

type JobReplayRepository added in v0.7.0

type JobReplayRepository interface {
	GetReplayJobConfig(ctx context.Context, jobTenant tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) (map[string]string, error)
}

type JobRepository

type JobRepository interface {
	GetJob(ctx context.Context, name tenant.ProjectName, jobName scheduler.JobName) (*scheduler.Job, error)
	GetJobDetails(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName) (*scheduler.JobWithDetails, error)
	GetAll(ctx context.Context, projectName tenant.ProjectName) ([]*scheduler.JobWithDetails, error)
	GetJobs(ctx context.Context, projectName tenant.ProjectName, jobs []string) ([]*scheduler.JobWithDetails, error)
}

type JobRunAssetsCompiler

type JobRunAssetsCompiler struct {
	// contains filtered or unexported fields
}

func NewJobAssetsCompiler

func NewJobAssetsCompiler(engine FilesCompiler, logger log.Logger) *JobRunAssetsCompiler

func (*JobRunAssetsCompiler) CompileJobRunAssets

func (c *JobRunAssetsCompiler) CompileJobRunAssets(_ context.Context, job *scheduler.Job, systemEnvVars map[string]string, interval interval.Interval, contextForTask map[string]interface{}) (map[string]string, error)

func (*JobRunAssetsCompiler) CompileQuery added in v0.10.1

func (c *JobRunAssetsCompiler) CompileQuery(startTime, endTime time.Time, query string, envs map[string]string) (string, error)

type JobRunRepository

type JobRunRepository interface {
	GetByID(ctx context.Context, id scheduler.JobRunID) (*scheduler.JobRun, error)
	GetByScheduledAt(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time) (*scheduler.JobRun, error)
	GetByScheduledTimes(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, scheduledTimes []time.Time) ([]*scheduler.JobRun, error)
	Create(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time, slaDefinitionInSec int64) error
	Update(ctx context.Context, jobRunID uuid.UUID, endTime time.Time, jobRunStatus scheduler.State) error
	UpdateState(ctx context.Context, jobRunID uuid.UUID, jobRunStatus scheduler.State) error
	UpdateSLA(ctx context.Context, jobName scheduler.JobName, project tenant.ProjectName, scheduledTimes []time.Time) error
	UpdateMonitoring(ctx context.Context, jobRunID uuid.UUID, monitoring map[string]any) error
}

type JobRunService

type JobRunService struct {
	// contains filtered or unexported fields
}

func NewJobRunService

func NewJobRunService(logger log.Logger, jobRepo JobRepository, jobRunRepo JobRunRepository, replayRepo JobReplayRepository,
	operatorRunRepo OperatorRunRepository, scheduler Scheduler, resolver PriorityResolver, compiler JobInputCompiler, eventHandler EventHandler,
	projectGetter ProjectGetter,
) *JobRunService

func (*JobRunService) GetInterval added in v0.10.0

func (s *JobRunService) GetInterval(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, referenceTime time.Time) (interval.Interval, error)

func (*JobRunService) GetJobRuns

func (s *JobRunService) GetJobRuns(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, criteria *scheduler.JobRunsCriteria) ([]*scheduler.JobRunStatus, error)

func (*JobRunService) JobRunInput

func (s *JobRunService) JobRunInput(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, config scheduler.RunConfig) (*scheduler.ExecutorInput, error)

func (*JobRunService) UpdateJobScheduleState added in v0.9.6

func (s *JobRunService) UpdateJobScheduleState(ctx context.Context, tnnt tenant.Tenant, jobName []job.Name, state string) error

func (*JobRunService) UpdateJobState

func (s *JobRunService) UpdateJobState(ctx context.Context, event *scheduler.Event) error

func (*JobRunService) UploadJobs added in v0.8.0

func (s *JobRunService) UploadJobs(ctx context.Context, tnnt tenant.Tenant, toUpdate, toDelete []string) (err error)

func (*JobRunService) UploadToScheduler

func (s *JobRunService) UploadToScheduler(ctx context.Context, projectName tenant.ProjectName) error

type Notifier

type Notifier interface {
	io.Closer
	Notify(ctx context.Context, attr scheduler.NotifyAttrs) error
}

type NotifyService

type NotifyService struct {
	// contains filtered or unexported fields
}

func NewNotifyService

func NewNotifyService(l log.Logger, jobRepo JobRepository, tenantService TenantService, notifyChan map[string]Notifier) *NotifyService

func (*NotifyService) Close

func (n *NotifyService) Close() error

func (*NotifyService) Push

func (n *NotifyService) Push(ctx context.Context, event *scheduler.Event) error

type OperatorRunRepository

type OperatorRunRepository interface {
	GetOperatorRun(ctx context.Context, operatorName string, operator scheduler.OperatorType, jobRunID uuid.UUID) (*scheduler.OperatorRun, error)
	CreateOperatorRun(ctx context.Context, operatorName string, operator scheduler.OperatorType, jobRunID uuid.UUID, startTime time.Time) error
	UpdateOperatorRun(ctx context.Context, operator scheduler.OperatorType, jobRunID uuid.UUID, eventTime time.Time, state scheduler.State) error
}

type PluginRepo

type PluginRepo interface {
	GetByName(name string) (*plugin.Plugin, error)
}

type PriorityResolver

type PriorityResolver interface {
	Resolve(context.Context, []*scheduler.JobWithDetails) error
}

type ProjectGetter added in v0.10.0

type ProjectGetter interface {
	Get(context.Context, tenant.ProjectName) (*tenant.Project, error)
}

type ReplayExecutor added in v0.11.0

type ReplayExecutor interface {
	Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName)
}

type ReplayRepository added in v0.7.0

type ReplayRepository interface {
	RegisterReplay(ctx context.Context, replay *scheduler.Replay, runs []*scheduler.JobRunStatus) (uuid.UUID, error)
	UpdateReplay(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, runs []*scheduler.JobRunStatus, message string) error
	UpdateReplayStatus(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, message string) error

	GetReplayRequestsByStatus(ctx context.Context, statusList []scheduler.ReplayState) ([]*scheduler.Replay, error)
	GetReplaysByProject(ctx context.Context, projectName tenant.ProjectName, dayLimits int) ([]*scheduler.Replay, error)
	GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error)
}

type ReplayScheduler added in v0.7.0

type ReplayScheduler interface {
	Clear(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) error
	ClearBatch(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, startTime, endTime time.Time) error

	CreateRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error
	GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
}

type ReplayService added in v0.7.0

type ReplayService struct {
	// contains filtered or unexported fields
}

func NewReplayService added in v0.7.0

func NewReplayService(replayRepo ReplayRepository, jobRepo JobRepository, validator ReplayValidator, worker ReplayExecutor, runGetter SchedulerRunGetter, logger log.Logger) *ReplayService

func (*ReplayService) CancelReplay added in v0.11.3

func (r *ReplayService) CancelReplay(ctx context.Context, replayWithRun *scheduler.ReplayWithRun) error

func (*ReplayService) CreateReplay added in v0.7.0

func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error)

func (*ReplayService) GetReplayByID added in v0.7.0

func (r *ReplayService) GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error)

func (*ReplayService) GetReplayList added in v0.7.0

func (r *ReplayService) GetReplayList(ctx context.Context, projectName tenant.ProjectName) (replays []*scheduler.Replay, err error)

func (*ReplayService) GetRunsStatus added in v0.9.0

func (r *ReplayService) GetRunsStatus(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) ([]*scheduler.JobRunStatus, error)

type ReplayValidator added in v0.7.0

type ReplayValidator interface {
	Validate(ctx context.Context, replayRequest *scheduler.Replay, jobCron *cron.ScheduleSpec) error
}

type ReplayWorker added in v0.7.0

type ReplayWorker struct {
	// contains filtered or unexported fields
}

func NewReplayWorker added in v0.7.0

func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig) *ReplayWorker

func (*ReplayWorker) Execute added in v0.11.0

func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName)

type Scheduler

type Scheduler interface {
	GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
	DeployJobs(ctx context.Context, t tenant.Tenant, jobs []*scheduler.JobWithDetails) error
	ListJobs(ctx context.Context, t tenant.Tenant) ([]string, error)
	DeleteJobs(ctx context.Context, t tenant.Tenant, jobsToDelete []string) error
	UpdateJobState(ctx context.Context, tnnt tenant.Tenant, jobName []job.Name, state string) error
}

type SchedulerRunGetter added in v0.9.0

type SchedulerRunGetter interface {
	GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
}

type TemplateCompiler

type TemplateCompiler interface {
	Compile(templateMap map[string]string, context map[string]any) (map[string]string, error)
}

type TenantService

type TenantService interface {
	GetDetails(ctx context.Context, tnnt tenant.Tenant) (*tenant.WithDetails, error)
	GetSecrets(ctx context.Context, tnnt tenant.Tenant) ([]*tenant.PlainTextSecret, error)
}

type Validator added in v0.7.0

type Validator struct {
	// contains filtered or unexported fields
}

func NewValidator added in v0.7.0

func NewValidator(replayRepository ReplayRepository, scheduler ReplayScheduler, jobRepo JobRepository) *Validator

func (Validator) Validate added in v0.7.0

func (v Validator) Validate(ctx context.Context, replayRequest *scheduler.Replay, jobCron *cron.ScheduleSpec) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL