composer_task

package
v0.0.0-...-9f7285a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2025 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const ComposerDagProcessorManagerLogQueryTaskName = ComposerQueryPrefix + "dag-processor-manager"
View Source
const ComposerMonitoringLogQueryTaskName = ComposerQueryPrefix + "monitoring"
View Source
const ComposerQueryPrefix = gcp_task.GCPPrefix + "query/composer/"
View Source
const ComposerSchedulerLogQueryTaskName = ComposerQueryPrefix + "scheduler"
View Source
const ComposerWorkerLogQueryTaskName = ComposerQueryPrefix + "worker"
View Source
const InputComposerEnvironmentTaskID = gcp_task.GCPPrefix + "input/composer/environment_name"

Variables

View Source
var AirflowDagProcessorLogParseJob = parser.NewParserTaskFromParser(gcp_task.GCPPrefix+"composer/dagprocessor", &AirflowDagProcessorParser{"/home/airflow/gcs/dags/"}, false, inspection_task.InspectionTypeLabel(InspectionTypeId))
View Source
var AutocompleteClusterNames = task.NewCachedProcessor(gcp_task.AutocompleteClusterNamesTaskID+"#composer", []string{
	gcp_task.InputProjectIdTaskID,
}, func(ctx context.Context, taskMode int, v *task.VariableSet) (any, error) {
	client, err := api.DefaultGCPClientFactory.NewClient()
	if err != nil {
		return nil, err
	}
	projectId, err := gcp_task.GetInputProjectIdFromTaskVariable(v)
	if err != nil {
		return nil, err
	}
	if projectId != "" {
		clusterNames, err := client.GetClusterNames(ctx, projectId)
		if err != nil {
			slog.WarnContext(ctx, fmt.Sprintf("Failed to read the cluster names in the project %s\n%s", projectId, err))
			return &gcp_task.AutocompleteClusterNameList{
				ClusterNames: []string{},
				Error:        "Failed to get the list from API",
			}, nil
		}
		return &gcp_task.AutocompleteClusterNameList{
			ClusterNames: clusterNames,
			Error:        "",
		}, nil
	}
	return &gcp_task.AutocompleteClusterNameList{
		ClusterNames: []string{},
		Error:        "Project ID is empty",
	}, nil
}, inspection_task.InspectionTypeLabel(InspectionTypeId))
View Source
var AutocompleteComposerEnvironmentNames = task.NewCachedProcessor(AutocompleteComposerEnvironmentNamesTaskID, []string{
	gcp_task.InputLocationsTaskID,
	gcp_task.InputProjectIdTaskID,
}, func(ctx context.Context, taskMode int, v *task.VariableSet) (any, error) {
	client, err := api.DefaultGCPClientFactory.NewClient()
	if err != nil {
		return nil, err
	}
	projectId, err := gcp_task.GetInputProjectIdFromTaskVariable(v)
	if err != nil {
		return nil, err
	}
	location, err := gcp_task.GetInputLocationsFromTaskVariable(v)
	if err != nil {
		return nil, err
	}

	if projectId != "" && location != "" {
		clusterNames, err := client.GetComposerEnvironmentNames(ctx, projectId, location)
		if err != nil {
			slog.WarnContext(ctx, fmt.Sprintf("Failed to read the composer environments in the (project,location) (%s, %s) \n%s", projectId, location, err))
			return []string{}, nil
		}
		return clusterNames, nil
	}
	return []string{}, nil
})
View Source
var AutocompleteComposerEnvironmentNamesTaskID = gcp_task.GCPPrefix + "autocomplete/composer-environment-names"
View Source
var ComposerClusterNamePrefixTask = inspection_task.NewInspectionProducer(task.ClusterNamePrefixTaskID+"#composer", func(ctx context.Context, taskMode int, progress *progress.TaskProgress) (any, error) {
	return "", nil
}, inspection_task.InspectionTypeLabel(InspectionTypeId))
View Source
var ComposerDagProcessorManagerLogQueryTask = query.NewQueryGeneratorTask(
	ComposerDagProcessorManagerLogQueryTaskName,
	"Composer Environment/DAG Processor Manager",
	enum.LogTypeComposerEnvironment,
	[]string{
		gcp_task.InputProjectIdTaskID,
		InputComposerEnvironmentTaskID,
	},
	createGenerator("dag-processor-manager"),
)
View Source
var ComposerInspectionType = inspection.InspectionType{
	Id:   InspectionTypeId,
	Name: "Cloud Composer",
	Description: `Visualize logs related to Cloud Composer environment.
Supports all GKE related logs(Cloud Composer v2 or v1) and Airflow logs(Airflow 2.0.0 or higher in any Cloud Composer version(v1-v3))`,
	Icon:     "assets/icons/composer.webp",
	Priority: math.MaxInt - 1,
}
View Source
var ComposerMonitoringLogQueryTask = query.NewQueryGeneratorTask(
	ComposerMonitoringLogQueryTaskName,
	"Composer Environment/Airflow Monitoring",
	enum.LogTypeComposerEnvironment,
	[]string{
		gcp_task.InputProjectIdTaskID,
		InputComposerEnvironmentTaskID,
	},
	createGenerator("airflow-monitoring"),
)
View Source
var ComposerSchedulerLogQueryTask = query.NewQueryGeneratorTask(
	ComposerSchedulerLogQueryTaskName,
	"Composer Environment/Airflow Scheduler",
	enum.LogTypeComposerEnvironment,
	[]string{
		gcp_task.InputProjectIdTaskID,
		InputComposerEnvironmentTaskID,
	},
	createGenerator("airflow-scheduler"),
)
View Source
var ComposerWorkerLogQueryTask = query.NewQueryGeneratorTask(
	ComposerWorkerLogQueryTaskName,
	"Composer Environment/Airflow Worker",
	enum.LogTypeComposerEnvironment,
	[]string{
		gcp_task.InputProjectIdTaskID,
		InputComposerEnvironmentTaskID,
	},
	createGenerator("airflow-worker"),
)
View Source
var InputComposerEnvironmentNameTask = form.NewInputFormDefinitionBuilder(InputComposerEnvironmentTaskID, gcp_task.PriorityForResourceIdentifierGroup+5000, "Composer Environment Name").WithDependencies(
	[]string{AutocompleteComposerEnvironmentNamesTaskID},
).WithSuggestionsFunc(func(ctx context.Context, value string, variables *task.VariableSet, previousValues []string) ([]string, error) {
	environments, err := GetAutocompleteComposerEnvironmentNamesTaskVariable(variables)
	if err != nil {
		return nil, err
	}
	return common.SortForAutocomplete(value, environments), nil
}).Build()
View Source
var InspectionTypeId = "gcp-composer"

Functions

func GetAutocompleteComposerEnvironmentNamesTaskVariable

func GetAutocompleteComposerEnvironmentNamesTaskVariable(v *task.VariableSet) ([]string, error)

func GetInputComposerEnvironmentVariable

func GetInputComposerEnvironmentVariable(tv *task.VariableSet) (string, error)

Types

type AirflowDagProcessorParser

type AirflowDagProcessorParser struct {
	// contains filtered or unexported fields
}

func (*AirflowDagProcessorParser) Dependencies

func (*AirflowDagProcessorParser) Dependencies() []string

func (*AirflowDagProcessorParser) Description

func (*AirflowDagProcessorParser) Description() string

func (*AirflowDagProcessorParser) GetParserName

func (*AirflowDagProcessorParser) GetParserName() string

func (*AirflowDagProcessorParser) Grouper

Grouper implements parser.Parser.

func (*AirflowDagProcessorParser) LogTask

func (*AirflowDagProcessorParser) LogTask() string

func (*AirflowDagProcessorParser) Parse

type AirflowSchedulerParser

type AirflowSchedulerParser struct {
}

Parse airflow-scheduler logs and make them into TaskInstances. This parser will detect these lifecycles; - scheduled - queued - success - failed

func (*AirflowSchedulerParser) Dependencies

func (*AirflowSchedulerParser) Dependencies() []string

func (*AirflowSchedulerParser) Description

func (*AirflowSchedulerParser) Description() string

func (*AirflowSchedulerParser) GetParserName

func (*AirflowSchedulerParser) GetParserName() string

func (*AirflowSchedulerParser) Grouper

func (*AirflowSchedulerParser) LogTask

func (*AirflowSchedulerParser) LogTask() string

func (*AirflowSchedulerParser) Parse

type AirflowWorkerParser

type AirflowWorkerParser struct {
}

func (*AirflowWorkerParser) Dependencies

func (*AirflowWorkerParser) Dependencies() []string

Dependencies implements parser.Parser.

func (*AirflowWorkerParser) Description

func (*AirflowWorkerParser) Description() string

Description implements parser.Parser.

func (*AirflowWorkerParser) GetParserName

func (*AirflowWorkerParser) GetParserName() string

GetParserName implements parser.Parser.

func (*AirflowWorkerParser) Grouper

DependsOnPast implements parser.Parser.

func (*AirflowWorkerParser) LogTask

func (*AirflowWorkerParser) LogTask() string

LogTask implements parser.Parser.

func (*AirflowWorkerParser) Parse

Parse implements parser.Parser.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL