airflow

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

README

Airflow v2.x

Currently, allows configuring dags to be loaded from

  • GCS bucket
  • Local filesystem
  • inmemory

Implementation already supports variety of other systems, just need to configure them.

For using a fs that needs auth, it is required to create a project secret with STORAGE as key and base64 encoded service account/token as value.

Optimus also provides api to get currently running job status using airflow APIs. For this to work, it is required to register a secret with SCHEDULER_AUTH as key and base64 encoded username:password as token. This assumes airflow is configured to use basic auth on api by default.

Documentation

Index

Constants

View Source
const (
	EntityAirflow = "Airflow"
)

Variables

View Source
var SharedLib []byte

Functions

This section is empty.

Types

type Bucket

type Bucket interface {
	WriteAll(ctx context.Context, key string, p []byte, opts *blob.WriterOptions) error
	List(opts *blob.ListOptions) *blob.ListIterator
	Delete(ctx context.Context, key string) error
	Close() error
}

type BucketFactory

type BucketFactory interface {
	New(ctx context.Context, tenant tenant.Tenant) (Bucket, error)
}

type Client

type Client interface {
	Invoke(ctx context.Context, r airflowRequest, auth SchedulerAuth) ([]byte, error)
}

type ClientAirflow

type ClientAirflow struct {
	// contains filtered or unexported fields
}

func NewAirflowClient

func NewAirflowClient() *ClientAirflow

func (ClientAirflow) Invoke

func (ac ClientAirflow) Invoke(ctx context.Context, r airflowRequest, auth SchedulerAuth) ([]byte, error)

type DagCompiler

type DagCompiler interface {
	Compile(project *tenant.Project, job *scheduler.JobWithDetails) ([]byte, error)
}

type DagRun

type DagRun struct {
	ExecutionDate   time.Time `json:"execution_date"`
	State           string    `json:"state"`
	ExternalTrigger bool      `json:"external_trigger"`
}

type DagRunListResponse

type DagRunListResponse struct {
	DagRuns      []DagRun `json:"dag_runs"`
	TotalEntries int      `json:"total_entries"`
}

type DagRunRequest

type DagRunRequest struct {
	OrderBy          string   `json:"order_by"`
	PageOffset       int      `json:"page_offset"`
	PageLimit        int      `json:"page_limit"`
	DagIds           []string `json:"dag_ids"`
	ExecutionDateGte string   `json:"execution_date_gte,omitempty"`
	ExecutionDateLte string   `json:"execution_date_lte,omitempty"`
}

type ProjectGetter

type ProjectGetter interface {
	Get(context.Context, tenant.ProjectName) (*tenant.Project, error)
}

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(l log.Logger, bucketFac BucketFactory, client Client, compiler DagCompiler, projectGetter ProjectGetter, secretGetter SecretGetter) *Scheduler

func (*Scheduler) Clear added in v0.7.0

func (s *Scheduler) Clear(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, executionTime time.Time) error

func (*Scheduler) ClearBatch added in v0.7.0

func (s *Scheduler) ClearBatch(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, startExecutionTime, endExecutionTime time.Time) error

func (*Scheduler) CreateRun added in v0.8.0

func (s *Scheduler) CreateRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error

func (*Scheduler) DeleteJobs

func (s *Scheduler) DeleteJobs(ctx context.Context, t tenant.Tenant, jobNames []string) error

func (*Scheduler) DeployJobs

func (s *Scheduler) DeployJobs(ctx context.Context, tenant tenant.Tenant, jobs []*scheduler.JobWithDetails) error

func (*Scheduler) GetJobRuns

func (s *Scheduler) GetJobRuns(ctx context.Context, tnnt tenant.Tenant, jobQuery *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)

func (*Scheduler) ListJobs

func (s *Scheduler) ListJobs(ctx context.Context, t tenant.Tenant) ([]string, error)

TODO list jobs should not refer from the scheduler, rather should list from db and it has nothing to do with scheduler.

func (*Scheduler) UpdateJobState added in v0.9.0

func (s *Scheduler) UpdateJobState(ctx context.Context, tnnt tenant.Tenant, jobNames []job.Name, state string) error

UpdateJobState set the state of jobs as enabled / disabled on scheduler

type SchedulerAuth

type SchedulerAuth struct {
	// contains filtered or unexported fields
}

type SecretGetter

type SecretGetter interface {
	Get(ctx context.Context, projName tenant.ProjectName, namespaceName, name string) (*tenant.PlainTextSecret, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL