pipeline

package
v0.11.114 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CommentTask TaskDefinitionType = "comment"
	YamlTask    TaskDefinitionType = "yaml"

	AssetTypePython               = AssetType("python")
	AssetTypeSnowflakeQuery       = AssetType("sf.sql")
	AssetTypeSnowflakeQuerySensor = AssetType("sf.sensor.query")
	AssetTypeBigqueryQuery        = AssetType("bq.sql")
	AssetTypeBigqueryTableSensor  = AssetType("bq.sensor.table")
	AssetTypeBigqueryQuerySensor  = AssetType("bq.sensor.query")
	AssetTypeDuckDBQuery          = AssetType("duckdb.sql")
	AssetTypeEmpty                = AssetType("empty")
	AssetTypePostgresQuery        = AssetType("pg.sql")
	AssetTypeRedshiftQuery        = AssetType("rs.sql")
	AssetTypeAthenaQuery          = AssetType("athena.sql")
	AssetTypeAthenaSQLSensor      = AssetType("athena.sensor.query")
	AssetTypeMsSQLQuery           = AssetType("ms.sql")
	AssetTypeDatabricksQuery      = AssetType("databricks.sql")
	AssetTypeSynapseQuery         = AssetType("synapse.sql")
	AssetTypeIngestr              = AssetType("ingestr")
	AssetTypeTableau              = AssetType("tableau")

	RunConfigFullRefresh = RunConfig("full-refresh")
	RunConfigStartDate   = RunConfig("start-date")
	RunConfigEndDate     = RunConfig("end-date")
)

Variables

View Source
var AssetTypeConnectionMapping = map[AssetType]string{
	AssetTypeBigqueryQuery:        "google_cloud_platform",
	AssetTypeBigqueryTableSensor:  "google_cloud_platform",
	AssetTypeSnowflakeQuery:       "snowflake",
	AssetTypeSnowflakeQuerySensor: "snowflake",
	AssetTypePostgresQuery:        "postgres",
	AssetTypeRedshiftQuery:        "redshift",
	AssetTypeMsSQLQuery:           "mssql",
	AssetTypeDatabricksQuery:      "databricks",
	AssetTypeSynapseQuery:         "synapse",
	AssetTypeAthenaQuery:          "athena",
	AssetTypeDuckDBQuery:          "duckdb",
}
View Source
var IngestrTypeConnectionMapping = map[string]AssetType{
	"bigquery":   AssetTypeBigqueryQuery,
	"snowflake":  AssetTypeSnowflakeQuery,
	"postgres":   AssetTypePostgresQuery,
	"redshift":   AssetTypeRedshiftQuery,
	"mssql":      AssetTypeMsSQLQuery,
	"databricks": AssetTypeDatabricksQuery,
	"synapse":    AssetTypeSynapseQuery,
	"duckdb":     AssetTypeDuckDBQuery,
}
View Source
var SupportedFileSuffixes = []string{".yml", ".yaml", ".sql", ".py"}
View Source
var ValidQualityChecks = map[string]bool{
	"not_null":        true,
	"unique":          true,
	"positive":        true,
	"min":             true,
	"max":             true,
	"accepted_values": true,
	"negative":        true,
	"non_negative":    true,
	"pattern":         true,
}

Functions

func ClearSpacesAtLineEndings added in v0.11.27

func ClearSpacesAtLineEndings(content string) string

Types

type Asset

type Asset struct {
	ID              string             `json:"id" yaml:"-" mapstructure:"-"`
	URI             string             `json:"uri" yaml:"uri,omitempty" mapstructure:"uri"`
	Name            string             `json:"name" yaml:"name,omitempty" mapstructure:"name"`
	Type            AssetType          `json:"type" yaml:"type,omitempty" mapstructure:"type"`
	Description     string             `json:"description" yaml:"description,omitempty" mapstructure:"description"`
	Connection      string             `json:"connection" yaml:"connection,omitempty" mapstructure:"connection"`
	Tags            EmptyStringArray   `json:"tags" yaml:"tags,omitempty" mapstructure:"tags"`
	Materialization Materialization    `json:"materialization" yaml:"materialization,omitempty" mapstructure:"materialization"`
	Upstreams       []Upstream         `json:"upstreams" yaml:"depends,omitempty" mapstructure:"depends"`
	Image           string             `json:"image" yaml:"image,omitempty" mapstructure:"image"`
	Instance        string             `json:"instance" yaml:"instance,omitempty" mapstructure:"instance"`
	Owner           string             `json:"owner" yaml:"owner,omitempty" mapstructure:"owner"`
	ExecutableFile  ExecutableFile     `json:"executable_file" yaml:"-" mapstructure:"-"`
	DefinitionFile  TaskDefinitionFile `json:"definition_file" yaml:"-" mapstructure:"-"`
	Parameters      EmptyStringMap     `json:"parameters" yaml:"parameters,omitempty" mapstructure:"parameters"`
	Secrets         []SecretMapping    `json:"secrets" yaml:"secrets,omitempty" mapstructure:"secrets"`
	Columns         []Column           `json:"columns" yaml:"columns,omitempty" mapstructure:"columns"`
	CustomChecks    []CustomCheck      `json:"custom_checks" yaml:"custom_checks,omitempty" mapstructure:"custom_checks"`
	Metadata        EmptyStringMap     `json:"metadata" yaml:"metadata,omitempty" mapstructure:"metadata"`
	Snowflake       SnowflakeConfig    `json:"snowflake" yaml:"snowflake,omitempty" mapstructure:"snowflake"`
	Athena          AthenaConfig       `json:"athena" yaml:"athena,omitempty" mapstructure:"athena"`
	// contains filtered or unexported fields
}

func ConvertYamlToTask

func ConvertYamlToTask(content []byte) (*Asset, error)

func (*Asset) AddDownstream

func (a *Asset) AddDownstream(asset *Asset)

func (*Asset) AddUpstream

func (a *Asset) AddUpstream(asset *Asset)

func (*Asset) CheckCount added in v0.11.87

func (a *Asset) CheckCount() int

func (*Asset) ColumnNames added in v0.5.0

func (a *Asset) ColumnNames() []string

func (*Asset) ColumnNamesWithPrimaryKey added in v0.5.0

func (a *Asset) ColumnNamesWithPrimaryKey() []string

func (*Asset) ColumnNamesWithUpdateOnMerge added in v0.5.0

func (a *Asset) ColumnNamesWithUpdateOnMerge() []string

func (*Asset) EnrichFromEntityAttributes added in v0.11.9

func (a *Asset) EnrichFromEntityAttributes(entities []*glossary.Entity) error

func (*Asset) GetColumnWithName added in v0.11.11

func (a *Asset) GetColumnWithName(name string) *Column

func (*Asset) GetDownstream

func (a *Asset) GetDownstream() []*Asset

func (*Asset) GetFullDownstream

func (a *Asset) GetFullDownstream() []*Asset

func (*Asset) GetFullUpstream

func (a *Asset) GetFullUpstream() []*Asset

func (*Asset) GetUpstream

func (a *Asset) GetUpstream() []*Asset

func (*Asset) Persist added in v0.11.27

func (a *Asset) Persist(fs afero.Fs) error

type AssetMaterializationMap added in v0.5.0

type AssetMaterializationMap map[MaterializationType]map[MaterializationStrategy]MaterializerFunc

type AssetType

type AssetType string

type AthenaConfig added in v0.11.20

type AthenaConfig struct {
	Location string `json:"location"`
}

func (AthenaConfig) MarshalJSON added in v0.11.20

func (s AthenaConfig) MarshalJSON() ([]byte, error)

type Builder added in v0.11.9

type Builder struct {
	GlossaryReader glossaryReader
	// contains filtered or unexported fields
}

func NewBuilder

func NewBuilder(config BuilderConfig, yamlTaskCreator TaskCreator, commentTaskCreator TaskCreator, fs afero.Fs, gr glossaryReader) *Builder

func (*Builder) CreateAssetFromFile added in v0.11.9

func (b *Builder) CreateAssetFromFile(path string) (*Asset, error)

func (*Builder) CreatePipelineFromPath added in v0.11.9

func (b *Builder) CreatePipelineFromPath(pathToPipeline string) (*Pipeline, error)

func (*Builder) SetGlossaryReader added in v0.11.9

func (b *Builder) SetGlossaryReader(reader glossaryReader)

type BuilderConfig

type BuilderConfig struct {
	PipelineFileName    []string
	TasksDirectoryName  string
	TasksDirectoryNames []string
	TasksFileSuffixes   []string
}

type Column

type Column struct {
	EntityAttribute *EntityAttribute  `json:"entity_attribute" yaml:"-" mapstructure:"-"`
	Name            string            `json:"name" yaml:"name,omitempty" mapstructure:"name"`
	Type            string            `json:"type" yaml:"type,omitempty" mapstructure:"type"`
	Description     string            `json:"description" yaml:"description,omitempty" mapstructure:"description"`
	PrimaryKey      bool              `json:"primary_key" yaml:"primary_key,omitempty" mapstructure:"primary_key"`
	UpdateOnMerge   bool              `json:"update_on_merge" yaml:"update_on_merge,omitempty" mapstructure:"update_on_merge"`
	Extends         string            `json:"-" yaml:"extends,omitempty" mapstructure:"extends"`
	Checks          []ColumnCheck     `json:"checks" yaml:"checks,omitempty" mapstructure:"checks"`
	Upstreams       []*UpstreamColumn `json:"upstreams,omitempty" yaml:"-" mapstructure:"-"`
}

func (*Column) HasCheck added in v0.9.0

func (c *Column) HasCheck(check string) bool

type ColumnCheck

type ColumnCheck struct {
	ID       string           `json:"id" yaml:"-" mapstructure:"-"`
	Name     string           `json:"name" yaml:"name,omitempty" mapstructure:"name"`
	Value    ColumnCheckValue `json:"value" yaml:"value,omitempty" mapstructure:"value"`
	Blocking DefaultTrueBool  `json:"blocking" yaml:"blocking,omitempty" mapstructure:"blocking"`
}

func NewColumnCheck added in v0.3.0

func NewColumnCheck(assetName, columnName, name string, value ColumnCheckValue, blocking *bool) ColumnCheck

type ColumnCheckValue

type ColumnCheckValue struct {
	IntArray    *[]int    `json:"int_array"`
	Int         *int      `json:"int"`
	Float       *float64  `json:"float"`
	StringArray *[]string `json:"string_array"`
	String      *string   `json:"string"`
	Bool        *bool     `json:"bool"`
}

func (*ColumnCheckValue) MarshalJSON

func (ccv *ColumnCheckValue) MarshalJSON() ([]byte, error)

func (ColumnCheckValue) MarshalYAML added in v0.11.27

func (ccv ColumnCheckValue) MarshalYAML() (interface{}, error)

func (*ColumnCheckValue) ToString added in v0.10.1

func (ccv *ColumnCheckValue) ToString() string

func (*ColumnCheckValue) UnmarshalJSON added in v0.5.0

func (ccv *ColumnCheckValue) UnmarshalJSON(data []byte) error

type CustomCheck

type CustomCheck struct {
	ID          string          `json:"id" yaml:"-" mapstructure:"-"`
	Name        string          `json:"name" yaml:"name" mapstructure:"name"`
	Description string          `json:"description" yaml:"description,omitempty" mapstructure:"description"`
	Value       int64           `json:"value" yaml:"value,omitempty" mapstructure:"value"`
	Blocking    DefaultTrueBool `json:"blocking" yaml:"blocking,omitempty" mapstructure:"blocking"`
	Query       string          `json:"query" yaml:"query" mapstructure:"query"`
}

type DefaultTrueBool added in v0.11.21

type DefaultTrueBool struct {
	Value *bool
}

func (*DefaultTrueBool) Bool added in v0.11.27

func (b *DefaultTrueBool) Bool() bool

func (DefaultTrueBool) IsZero added in v0.11.110

func (b DefaultTrueBool) IsZero() bool

func (DefaultTrueBool) MarshalJSON added in v0.11.21

func (b DefaultTrueBool) MarshalJSON() ([]byte, error)

func (DefaultTrueBool) MarshalYAML added in v0.11.27

func (b DefaultTrueBool) MarshalYAML() (interface{}, error)

func (*DefaultTrueBool) UnmarshalJSON added in v0.11.21

func (b *DefaultTrueBool) UnmarshalJSON(data []byte) error

func (*DefaultTrueBool) UnmarshalYAML added in v0.11.21

func (b *DefaultTrueBool) UnmarshalYAML(value *yaml.Node) error

type DefinitionFile

type DefinitionFile struct {
	Name string `json:"name"`
	Path string `json:"path"`
}

type DependsColumn added in v0.11.108

type DependsColumn struct {
	Name  string `json:"name" yaml:"name" mapstructure:"name"`
	Usage string `json:"usage" yaml:"usage" mapstructure:"usage"`
}

type DiscordNotification added in v0.11.29

type DiscordNotification struct {
	Connection         string `yaml:"connection" json:"connection" mapstructure:"connection"`
	NotificationCommon `yaml:",inline" json:",inline" mapstructure:",inline"`
}

type EmptyStringArray added in v0.11.0

type EmptyStringArray []string //nolint:recvcheck

func (EmptyStringArray) MarshalJSON added in v0.11.0

func (a EmptyStringArray) MarshalJSON() ([]byte, error)

func (*EmptyStringArray) UnmarshalJSON added in v0.11.0

func (a *EmptyStringArray) UnmarshalJSON(data []byte) error

type EmptyStringMap

type EmptyStringMap map[string]string //nolint:recvcheck

func (EmptyStringMap) MarshalJSON

func (m EmptyStringMap) MarshalJSON() ([]byte, error)

func (*EmptyStringMap) UnmarshalJSON added in v0.5.0

func (b *EmptyStringMap) UnmarshalJSON(data []byte) error

type EntityAttribute added in v0.11.9

type EntityAttribute struct {
	Entity    string `json:"entity"`
	Attribute string `json:"attribute"`
}

type ExecutableFile

type ExecutableFile struct {
	Name    string `json:"name"`
	Path    string `json:"path"`
	Content string `json:"content"`
}

type LineageExtractor added in v0.11.30

type LineageExtractor struct {
	// contains filtered or unexported fields
}

func NewLineageExtractor added in v0.11.105

func NewLineageExtractor(parser sqlParser) *LineageExtractor

NewLineageExtractor creates a new LineageExtractor instance.

func (*LineageExtractor) ColumnLineage added in v0.11.105

func (p *LineageExtractor) ColumnLineage(foundPipeline *Pipeline, asset *Asset, metadata sqlparser.Schema) error

ParseLineageRecursive processes the lineage of an asset and its upstream dependencies recursively.

func (*LineageExtractor) TableSchema added in v0.11.105

func (p *LineageExtractor) TableSchema(foundPipeline *Pipeline) sqlparser.Schema

TableSchema extracts the table schema from the assets and stores it in the columnMetadata map.

type MSTeamsNotification added in v0.11.20

type MSTeamsNotification struct {
	Connection         string `yaml:"connection" json:"connection" mapstructure:"connection"`
	NotificationCommon `yaml:",inline" json:",inline" mapstructure:",inline"`
}

type Materialization

type Materialization struct {
	Type           MaterializationType     `json:"type" yaml:"type,omitempty" mapstructure:"type"`
	Strategy       MaterializationStrategy `json:"strategy" yaml:"strategy,omitempty" mapstructure:"strategy"`
	PartitionBy    string                  `json:"partition_by" yaml:"partition_by,omitempty" mapstructure:"partition_by"`
	ClusterBy      []string                `json:"cluster_by" yaml:"cluster_by,omitempty" mapstructure:"cluster_by"`
	IncrementalKey string                  `json:"incremental_key" yaml:"incremental_key,omitempty" mapstructure:"incremental_key"`
}

func (Materialization) MarshalJSON

func (m Materialization) MarshalJSON() ([]byte, error)

type MaterializationStrategy

type MaterializationStrategy string
const (
	MaterializationStrategyNone          MaterializationStrategy = ""
	MaterializationStrategyCreateReplace MaterializationStrategy = "create+replace"
	MaterializationStrategyDeleteInsert  MaterializationStrategy = "delete+insert"
	MaterializationStrategyAppend        MaterializationStrategy = "append"
	MaterializationStrategyMerge         MaterializationStrategy = "merge"
)

type MaterializationType

type MaterializationType string
const (
	MaterializationTypeNone  MaterializationType = ""
	MaterializationTypeView  MaterializationType = "view"
	MaterializationTypeTable MaterializationType = "table"
)

type Materializer added in v0.5.0

type Materializer struct {
	MaterializationMap AssetMaterializationMap
	FullRefresh        bool
}

func (*Materializer) Render added in v0.5.0

func (m *Materializer) Render(asset *Asset, query string) (string, error)

type MaterializerFunc added in v0.5.0

type MaterializerFunc func(task *Asset, query string) (string, error)

type MetadataPush added in v0.11.29

type MetadataPush struct {
	Global   bool `json:"-"`
	BigQuery bool `json:"bigquery" yaml:"bigquery" mapstructure:"bigquery"`
}

func (*MetadataPush) HasAnyEnabled added in v0.11.29

func (mp *MetadataPush) HasAnyEnabled() bool

type NotificationCommon added in v0.11.21

type NotificationCommon struct {
	Success DefaultTrueBool `yaml:"success" json:"success" mapstructure:"success"`
	Failure DefaultTrueBool `yaml:"failure" json:"failure" mapstructure:"failure"`
}

type Notifications

type Notifications struct {
	Slack   []SlackNotification   `yaml:"slack" json:"slack" mapstructure:"slack"`
	MSTeams []MSTeamsNotification `yaml:"ms_teams" json:"ms_teams" mapstructure:"ms_teams"`
	Discord []DiscordNotification `yaml:"discord" json:"discord" mapstructure:"discord"`
}

func (Notifications) MarshalJSON

func (n Notifications) MarshalJSON() ([]byte, error)

type ParseError added in v0.10.1

type ParseError struct {
	Msg string
}

func (ParseError) Error added in v0.10.1

func (e ParseError) Error() string

type Pipeline

type Pipeline struct {
	LegacyID           string         `json:"legacy_id" yaml:"id" mapstructure:"id"`
	Name               string         `json:"name" yaml:"name" mapstructure:"name"`
	Schedule           schedule       `json:"schedule" yaml:"schedule" mapstructure:"schedule"`
	StartDate          string         `json:"start_date" yaml:"start_date" mapstructure:"start_date"`
	DefinitionFile     DefinitionFile `json:"definition_file"`
	DefaultParameters  EmptyStringMap `json:"default_parameters" yaml:"default_parameters" mapstructure:"default_parameters"`
	DefaultConnections EmptyStringMap `json:"default_connections" yaml:"default_connections" mapstructure:"default_connections"`
	Assets             []*Asset       `json:"assets"`
	Notifications      Notifications  `json:"notifications" yaml:"notifications" mapstructure:"notifications"`
	Catchup            bool           `json:"catchup" yaml:"catchup" mapstructure:"catchup"`
	MetadataPush       MetadataPush   `json:"metadata_push" yaml:"metadata_push" mapstructure:"metadata_push"`
	Retries            int            `json:"retries" yaml:"retries" mapstructure:"retries"`

	TasksByType map[AssetType][]*Asset `json:"-"`
	// contains filtered or unexported fields
}

func PipelineFromPath added in v0.11.6

func PipelineFromPath(filePath string, fs afero.Fs) (*Pipeline, error)

func (*Pipeline) GetAllConnectionNamesForAsset added in v0.11.102

func (p *Pipeline) GetAllConnectionNamesForAsset(asset *Asset) ([]string, error)

func (*Pipeline) GetAssetByName added in v0.5.6

func (p *Pipeline) GetAssetByName(assetName string) *Asset

func (*Pipeline) GetAssetByPath

func (p *Pipeline) GetAssetByPath(assetPath string) *Asset

func (*Pipeline) GetAssetsByTag added in v0.11.0

func (p *Pipeline) GetAssetsByTag(tag string) []*Asset

func (*Pipeline) GetConnectionNameForAsset

func (p *Pipeline) GetConnectionNameForAsset(asset *Asset) (string, error)

func (*Pipeline) GetMajorityAssetTypesFromSQLAssets added in v0.9.1

func (p *Pipeline) GetMajorityAssetTypesFromSQLAssets(defaultIfNone AssetType) AssetType

func (*Pipeline) HasAssetType

func (p *Pipeline) HasAssetType(taskType AssetType) bool

func (*Pipeline) RelativeAssetPath

func (p *Pipeline) RelativeAssetPath(t *Asset) string

func (*Pipeline) WipeContentOfAssets added in v0.11.16

func (p *Pipeline) WipeContentOfAssets()

WipeContentOfAssets removes the content of the executable files of all assets in the pipeline. This is useful when we want to serialize the pipeline to JSON and we don't want to include the content of the assets.

type RunConfig added in v0.10.3

type RunConfig string

type SecretMapping

type SecretMapping struct {
	SecretKey   string `json:"secret_key"`
	InjectedKey string `json:"injected_key"`
}

func (SecretMapping) MarshalYAML added in v0.11.33

func (s SecretMapping) MarshalYAML() (interface{}, error)

type SlackNotification

type SlackNotification struct {
	Channel            string `json:"channel"`
	NotificationCommon `yaml:",inline" json:",inline" mapstructure:",inline"`
}

type SnowflakeConfig added in v0.11.14

type SnowflakeConfig struct {
	Warehouse string `json:"warehouse"`
}

func (SnowflakeConfig) MarshalJSON added in v0.11.14

func (s SnowflakeConfig) MarshalJSON() ([]byte, error)

type TaskCreator

type TaskCreator func(path string) (*Asset, error)

func CreateTaskFromFileComments

func CreateTaskFromFileComments(fs afero.Fs) TaskCreator

func CreateTaskFromYamlDefinition

func CreateTaskFromYamlDefinition(fs afero.Fs) TaskCreator

type TaskDefinitionFile

type TaskDefinitionFile struct {
	Name string             `json:"name"`
	Path string             `json:"path"`
	Type TaskDefinitionType `json:"type"`
}

type TaskDefinitionType

type TaskDefinitionType string

type TaskSchedule

type TaskSchedule struct {
	Days []string `json:"days"`
}

type Upstream added in v0.11.9

type Upstream struct {
	Type     string          `json:"type" yaml:"type" mapstructure:"type"`
	Value    string          `json:"value" yaml:"value" mapstructure:"value"`
	Metadata EmptyStringMap  `json:"metadata,omitempty" yaml:"metadata,omitempty" mapstructure:"metadata"`
	Columns  []DependsColumn `json:"columns,omitempty" yaml:"columns,omitempty" mapstructure:"columns"`
}

func (Upstream) MarshalYAML added in v0.11.27

func (u Upstream) MarshalYAML() (interface{}, error)

type UpstreamColumn added in v0.11.105

type UpstreamColumn struct {
	Asset  string `json:"asset" yaml:"asset,omitempty" mapstructure:"asset"`
	Column string `json:"column" yaml:"column,omitempty" mapstructure:"column"`
	Table  string `json:"table" yaml:"table,omitempty" mapstructure:"table"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL