Documentation
¶
Index ¶
- Constants
- Variables
- func GetAutocompleteComposerEnvironmentNamesTaskVariable(v *task.VariableSet) ([]string, error)
- func GetInputComposerEnvironmentVariable(tv *task.VariableSet) (string, error)
- type AirflowDagProcessorParser
- func (*AirflowDagProcessorParser) Dependencies() []string
- func (*AirflowDagProcessorParser) Description() string
- func (*AirflowDagProcessorParser) GetParserName() string
- func (*AirflowDagProcessorParser) Grouper() grouper.LogGrouper
- func (*AirflowDagProcessorParser) LogTask() string
- func (a *AirflowDagProcessorParser) Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, ...) error
- type AirflowSchedulerParser
- func (*AirflowSchedulerParser) Dependencies() []string
- func (*AirflowSchedulerParser) Description() string
- func (*AirflowSchedulerParser) GetParserName() string
- func (*AirflowSchedulerParser) Grouper() grouper.LogGrouper
- func (*AirflowSchedulerParser) LogTask() string
- func (t *AirflowSchedulerParser) Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, ...) error
- type AirflowWorkerParser
- func (*AirflowWorkerParser) Dependencies() []string
- func (*AirflowWorkerParser) Description() string
- func (*AirflowWorkerParser) GetParserName() string
- func (*AirflowWorkerParser) Grouper() grouper.LogGrouper
- func (*AirflowWorkerParser) LogTask() string
- func (*AirflowWorkerParser) Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, ...) error
Constants ¶
View Source
const ComposerDagProcessorManagerLogQueryTaskName = ComposerQueryPrefix + "dag-processor-manager"
View Source
const ComposerMonitoringLogQueryTaskName = ComposerQueryPrefix + "monitoring"
View Source
const ComposerQueryPrefix = gcp_task.GCPPrefix + "query/composer/"
View Source
const ComposerSchedulerLogQueryTaskName = ComposerQueryPrefix + "scheduler"
View Source
const ComposerWorkerLogQueryTaskName = ComposerQueryPrefix + "worker"
View Source
const InputComposerEnvironmentTaskID = gcp_task.GCPPrefix + "input/composer/environment_name"
Variables ¶
View Source
var AirflowDagProcessorLogParseJob = parser.NewParserTaskFromParser(gcp_task.GCPPrefix+"composer/dagprocessor", &AirflowDagProcessorParser{"/home/airflow/gcs/dags/"}, false, inspection_task.InspectionTypeLabel(InspectionTypeId))
View Source
var AirflowSchedulerLogParseJob = parser.NewParserTaskFromParser(gcp_task.GCPPrefix+"composer/scheduler", &AirflowSchedulerParser{}, false, inspection_task.InspectionTypeLabel(InspectionTypeId))
View Source
var AirflowWorkerLogParseJob = parser.NewParserTaskFromParser(gcp_task.GCPPrefix+"composer/worker", &AirflowWorkerParser{}, false, inspection_task.InspectionTypeLabel(InspectionTypeId))
View Source
var AutocompleteClusterNames = task.NewCachedProcessor(gcp_task.AutocompleteClusterNamesTaskID+"#composer", []string{ gcp_task.InputProjectIdTaskID, }, func(ctx context.Context, taskMode int, v *task.VariableSet) (any, error) { client, err := api.DefaultGCPClientFactory.NewClient() if err != nil { return nil, err } projectId, err := gcp_task.GetInputProjectIdFromTaskVariable(v) if err != nil { return nil, err } if projectId != "" { clusterNames, err := client.GetClusterNames(ctx, projectId) if err != nil { slog.WarnContext(ctx, fmt.Sprintf("Failed to read the cluster names in the project %s\n%s", projectId, err)) return &gcp_task.AutocompleteClusterNameList{ ClusterNames: []string{}, Error: "Failed to get the list from API", }, nil } return &gcp_task.AutocompleteClusterNameList{ ClusterNames: clusterNames, Error: "", }, nil } return &gcp_task.AutocompleteClusterNameList{ ClusterNames: []string{}, Error: "Project ID is empty", }, nil }, inspection_task.InspectionTypeLabel(InspectionTypeId))
View Source
var AutocompleteComposerEnvironmentNames = task.NewCachedProcessor(AutocompleteComposerEnvironmentNamesTaskID, []string{ gcp_task.InputLocationsTaskID, gcp_task.InputProjectIdTaskID, }, func(ctx context.Context, taskMode int, v *task.VariableSet) (any, error) { client, err := api.DefaultGCPClientFactory.NewClient() if err != nil { return nil, err } projectId, err := gcp_task.GetInputProjectIdFromTaskVariable(v) if err != nil { return nil, err } location, err := gcp_task.GetInputLocationsFromTaskVariable(v) if err != nil { return nil, err } if projectId != "" && location != "" { clusterNames, err := client.GetComposerEnvironmentNames(ctx, projectId, location) if err != nil { slog.WarnContext(ctx, fmt.Sprintf("Failed to read the composer environments in the (project,location) (%s, %s) \n%s", projectId, location, err)) return []string{}, nil } return clusterNames, nil } return []string{}, nil })
View Source
var AutocompleteComposerEnvironmentNamesTaskID = gcp_task.GCPPrefix + "autocomplete/composer-environment-names"
View Source
var ComposerClusterNamePrefixTask = inspection_task.NewInspectionProducer(task.ClusterNamePrefixTaskID+"#composer", func(ctx context.Context, taskMode int, progress *progress.TaskProgress) (any, error) { return "", nil }, inspection_task.InspectionTypeLabel(InspectionTypeId))
View Source
var ComposerDagProcessorManagerLogQueryTask = query.NewQueryGeneratorTask( ComposerDagProcessorManagerLogQueryTaskName, "Composer Environment/DAG Processor Manager", enum.LogTypeComposerEnvironment, []string{ gcp_task.InputProjectIdTaskID, InputComposerEnvironmentTaskID, }, createGenerator("dag-processor-manager"), )
View Source
var ComposerInspectionType = inspection.InspectionType{ Id: InspectionTypeId, Name: "Cloud Composer", Description: `Visualize logs related to Cloud Composer environment. Supports all GKE related logs(Cloud Composer v2 or v1) and Airflow logs(Airflow 2.0.0 or higher in any Cloud Composer version(v1-v3))`, Icon: "assets/icons/composer.webp", Priority: math.MaxInt - 1, }
View Source
var ComposerMonitoringLogQueryTask = query.NewQueryGeneratorTask( ComposerMonitoringLogQueryTaskName, "Composer Environment/Airflow Monitoring", enum.LogTypeComposerEnvironment, []string{ gcp_task.InputProjectIdTaskID, InputComposerEnvironmentTaskID, }, createGenerator("airflow-monitoring"), )
View Source
var ComposerSchedulerLogQueryTask = query.NewQueryGeneratorTask( ComposerSchedulerLogQueryTaskName, "Composer Environment/Airflow Scheduler", enum.LogTypeComposerEnvironment, []string{ gcp_task.InputProjectIdTaskID, InputComposerEnvironmentTaskID, }, createGenerator("airflow-scheduler"), )
View Source
var ComposerWorkerLogQueryTask = query.NewQueryGeneratorTask( ComposerWorkerLogQueryTaskName, "Composer Environment/Airflow Worker", enum.LogTypeComposerEnvironment, []string{ gcp_task.InputProjectIdTaskID, InputComposerEnvironmentTaskID, }, createGenerator("airflow-worker"), )
View Source
var InputComposerEnvironmentNameTask = form.NewInputFormDefinitionBuilder(InputComposerEnvironmentTaskID, gcp_task.PriorityForResourceIdentifierGroup+5000, "Composer Environment Name").WithDependencies( []string{AutocompleteComposerEnvironmentNamesTaskID}, ).WithSuggestionsFunc(func(ctx context.Context, value string, variables *task.VariableSet, previousValues []string) ([]string, error) { environments, err := GetAutocompleteComposerEnvironmentNamesTaskVariable(variables) if err != nil { return nil, err } return common.SortForAutocomplete(value, environments), nil }).Build()
View Source
var InspectionTypeId = "gcp-composer"
Functions ¶
func GetAutocompleteComposerEnvironmentNamesTaskVariable ¶
func GetAutocompleteComposerEnvironmentNamesTaskVariable(v *task.VariableSet) ([]string, error)
func GetInputComposerEnvironmentVariable ¶
func GetInputComposerEnvironmentVariable(tv *task.VariableSet) (string, error)
Types ¶
type AirflowDagProcessorParser ¶
type AirflowDagProcessorParser struct {
// contains filtered or unexported fields
}
func (*AirflowDagProcessorParser) Dependencies ¶
func (*AirflowDagProcessorParser) Dependencies() []string
func (*AirflowDagProcessorParser) Description ¶
func (*AirflowDagProcessorParser) Description() string
func (*AirflowDagProcessorParser) GetParserName ¶
func (*AirflowDagProcessorParser) GetParserName() string
func (*AirflowDagProcessorParser) Grouper ¶
func (*AirflowDagProcessorParser) Grouper() grouper.LogGrouper
Grouper implements parser.Parser.
func (*AirflowDagProcessorParser) LogTask ¶
func (*AirflowDagProcessorParser) LogTask() string
type AirflowSchedulerParser ¶
type AirflowSchedulerParser struct { }
Parse airflow-scheduler logs and make them into TaskInstances. This parser will detect these lifecycles; - scheduled - queued - success - failed
func (*AirflowSchedulerParser) Dependencies ¶
func (*AirflowSchedulerParser) Dependencies() []string
func (*AirflowSchedulerParser) Description ¶
func (*AirflowSchedulerParser) Description() string
func (*AirflowSchedulerParser) GetParserName ¶
func (*AirflowSchedulerParser) GetParserName() string
func (*AirflowSchedulerParser) Grouper ¶
func (*AirflowSchedulerParser) Grouper() grouper.LogGrouper
func (*AirflowSchedulerParser) LogTask ¶
func (*AirflowSchedulerParser) LogTask() string
type AirflowWorkerParser ¶
type AirflowWorkerParser struct { }
func (*AirflowWorkerParser) Dependencies ¶
func (*AirflowWorkerParser) Dependencies() []string
Dependencies implements parser.Parser.
func (*AirflowWorkerParser) Description ¶
func (*AirflowWorkerParser) Description() string
Description implements parser.Parser.
func (*AirflowWorkerParser) GetParserName ¶
func (*AirflowWorkerParser) GetParserName() string
GetParserName implements parser.Parser.
func (*AirflowWorkerParser) Grouper ¶
func (*AirflowWorkerParser) Grouper() grouper.LogGrouper
DependsOnPast implements parser.Parser.
func (*AirflowWorkerParser) LogTask ¶
func (*AirflowWorkerParser) LogTask() string
LogTask implements parser.Parser.
Click to show internal directories.
Click to hide internal directories.