pipeline

package
v0.11.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CommentTask TaskDefinitionType = "comment"
	YamlTask    TaskDefinitionType = "yaml"

	AssetTypePython               = AssetType("python")
	AssetTypeSnowflakeQuery       = AssetType("sf.sql")
	AssetTypeSnowflakeQuerySensor = AssetType("sf.sensor.query")
	AssetTypeBigqueryQuery        = AssetType("bq.sql")
	AssetTypeBigqueryTableSensor  = AssetType("bq.sensor.table")
	AssetTypeBigqueryQuerySensor  = AssetType("bq.sensor.query")
	AssetTypeEmpty                = AssetType("empty")
	AssetTypePostgresQuery        = AssetType("pg.sql")
	AssetTypeRedshiftQuery        = AssetType("rs.sql")
	AssetTypeMsSQLQuery           = AssetType("ms.sql")
	AssetTypeSynapseQuery         = AssetType("synapse.sql")
	AssetTypeIngestr              = AssetType("ingestr")

	RunConfigFullRefresh = RunConfig("full-refresh")
	RunConfigStartDate   = RunConfig("start-date")
	RunConfigEndDate     = RunConfig("end-date")
)

Variables

View Source
var AssetTypeConnectionMapping = map[AssetType]string{
	AssetTypeBigqueryQuery:        "google_cloud_platform",
	AssetTypeBigqueryTableSensor:  "google_cloud_platform",
	AssetTypeSnowflakeQuery:       "snowflake",
	AssetTypeSnowflakeQuerySensor: "snowflake",
	AssetTypePostgresQuery:        "postgres",
	AssetTypeRedshiftQuery:        "redshift",
	AssetTypeMsSQLQuery:           "mssql",
	AssetTypeSynapseQuery:         "synapse",
}
View Source
var IngestrTypeConnectionMapping = map[string]AssetType{
	"bigquery":  AssetTypeBigqueryQuery,
	"snowflake": AssetTypeSnowflakeQuery,
	"postgres":  AssetTypePostgresQuery,
	"redshift":  AssetTypeRedshiftQuery,
	"mssql":     AssetTypeMsSQLQuery,
	"synapse":   AssetTypeSynapseQuery,
}
View Source
var ValidQualityChecks = map[string]bool{
	"not_null":        true,
	"unique":          true,
	"positive":        true,
	"min":             true,
	"max":             true,
	"accepted_values": true,
	"negative":        true,
	"non_negative":    true,
	"pattern":         true,
}

Functions

This section is empty.

Types

type Asset

type Asset struct {
	ID              string             `json:"id"`
	URI             string             `json:"uri"`
	Name            string             `json:"name"`
	Description     string             `json:"description"`
	Type            AssetType          `json:"type"`
	ExecutableFile  ExecutableFile     `json:"executable_file"`
	DefinitionFile  TaskDefinitionFile `json:"definition_file"`
	Parameters      EmptyStringMap     `json:"parameters"`
	Connection      string             `json:"connection"`
	Secrets         []SecretMapping    `json:"secrets"`
	Materialization Materialization    `json:"materialization"`
	Columns         []Column           `json:"columns"`
	CustomChecks    []CustomCheck      `json:"custom_checks"`
	Image           string             `json:"image"`
	Instance        string             `json:"instance"`
	Owner           string             `json:"owner"`
	Metadata        EmptyStringMap     `json:"metadata"`
	Tags            EmptyStringArray   `json:"tags"`
	Snowflake       SnowflakeConfig    `json:"snowflake"`

	Pipeline *Pipeline `json:"-"`

	Upstreams []Upstream `json:"upstreams"`
	// contains filtered or unexported fields
}

func ConvertYamlToTask

func ConvertYamlToTask(content []byte) (*Asset, error)

func (*Asset) AddDownstream

func (a *Asset) AddDownstream(asset *Asset)

func (*Asset) AddUpstream

func (a *Asset) AddUpstream(asset *Asset)

func (*Asset) ColumnNames added in v0.5.0

func (a *Asset) ColumnNames() []string

func (*Asset) ColumnNamesWithPrimaryKey added in v0.5.0

func (a *Asset) ColumnNamesWithPrimaryKey() []string

func (*Asset) ColumnNamesWithUpdateOnMerge added in v0.5.0

func (a *Asset) ColumnNamesWithUpdateOnMerge() []string

func (*Asset) EnrichFromEntityAttributes added in v0.11.9

func (a *Asset) EnrichFromEntityAttributes(entities []*glossary.Entity) error

func (*Asset) GetColumnWithName added in v0.11.11

func (a *Asset) GetColumnWithName(name string) *Column

func (*Asset) GetDownstream

func (a *Asset) GetDownstream() []*Asset

func (*Asset) GetFullDownstream

func (a *Asset) GetFullDownstream() []*Asset

func (*Asset) GetFullUpstream

func (a *Asset) GetFullUpstream() []*Asset

func (*Asset) GetUpstream

func (a *Asset) GetUpstream() []*Asset

func (*Asset) MarshalJSON added in v0.11.9

func (a *Asset) MarshalJSON() ([]byte, error)

type AssetCollection added in v0.6.6

type AssetCollection []*Asset

func (AssetCollection) MarshalJSON added in v0.6.6

func (ac AssetCollection) MarshalJSON() ([]byte, error)

type AssetMaterializationMap added in v0.5.0

type AssetMaterializationMap map[MaterializationType]map[MaterializationStrategy]MaterializerFunc

type AssetType

type AssetType string

type Builder added in v0.11.9

type Builder struct {
	GlossaryReader glossaryReader
	// contains filtered or unexported fields
}

func NewBuilder

func NewBuilder(config BuilderConfig, yamlTaskCreator TaskCreator, commentTaskCreator TaskCreator, fs afero.Fs, gr glossaryReader) *Builder

func (*Builder) CreateAssetFromFile added in v0.11.9

func (b *Builder) CreateAssetFromFile(path string) (*Asset, error)

func (*Builder) CreatePipelineFromPath added in v0.11.9

func (b *Builder) CreatePipelineFromPath(pathToPipeline string) (*Pipeline, error)

func (*Builder) SetGlossaryReader added in v0.11.9

func (b *Builder) SetGlossaryReader(reader glossaryReader)

type BuilderConfig

type BuilderConfig struct {
	PipelineFileName    string
	TasksDirectoryName  string
	TasksDirectoryNames []string
	TasksFileSuffixes   []string
}

type Column

type Column struct {
	EntityAttribute *EntityAttribute `json:"entity_attribute"`
	Name            string           `json:"name"`
	Type            string           `json:"type"`
	Description     string           `json:"description"`
	Checks          []ColumnCheck    `json:"checks"`
	PrimaryKey      bool             `json:"primary_key"`
	UpdateOnMerge   bool             `json:"update_on_merge"`
}

func (*Column) HasCheck added in v0.9.0

func (c *Column) HasCheck(check string) bool

type ColumnCheck

type ColumnCheck struct {
	ID       string           `json:"id"`
	Name     string           `json:"name"`
	Value    ColumnCheckValue `json:"value"`
	Blocking bool             `json:"blocking"`
}

func NewColumnCheck added in v0.3.0

func NewColumnCheck(assetName, columnName, name string, value ColumnCheckValue, blocking bool) ColumnCheck

type ColumnCheckValue

type ColumnCheckValue struct {
	IntArray    *[]int    `json:"int_array"`
	Int         *int      `json:"int"`
	Float       *float64  `json:"float"`
	StringArray *[]string `json:"string_array"`
	String      *string   `json:"string"`
	Bool        *bool     `json:"bool"`
}

func (*ColumnCheckValue) MarshalJSON

func (ccv *ColumnCheckValue) MarshalJSON() ([]byte, error)

func (*ColumnCheckValue) ToString added in v0.10.1

func (ccv *ColumnCheckValue) ToString() string

func (*ColumnCheckValue) UnmarshalJSON added in v0.5.0

func (ccv *ColumnCheckValue) UnmarshalJSON(data []byte) error

type CustomCheck

type CustomCheck struct {
	ID          string `json:"id"`
	Name        string `json:"name"`
	Description string `json:"description"`
	Query       string `json:"query"`
	Value       int64  `json:"value"`
	Blocking    bool   `json:"blocking"`
}

type DefinitionFile

type DefinitionFile struct {
	Name string `json:"name"`
	Path string `json:"path"`
}

type EmptyStringArray added in v0.11.0

type EmptyStringArray []string

func (EmptyStringArray) MarshalJSON added in v0.11.0

func (a EmptyStringArray) MarshalJSON() ([]byte, error)

func (*EmptyStringArray) UnmarshalJSON added in v0.11.0

func (a *EmptyStringArray) UnmarshalJSON(data []byte) error

type EmptyStringMap

type EmptyStringMap map[string]string

func (EmptyStringMap) MarshalJSON

func (m EmptyStringMap) MarshalJSON() ([]byte, error)

func (*EmptyStringMap) UnmarshalJSON added in v0.5.0

func (b *EmptyStringMap) UnmarshalJSON(data []byte) error

type EntityAttribute added in v0.11.9

type EntityAttribute struct {
	Entity    string `json:"entity"`
	Attribute string `json:"attribute"`
}

type ExecutableFile

type ExecutableFile struct {
	Name    string `json:"name"`
	Path    string `json:"path"`
	Content string `json:"content"`
}

type Materialization

type Materialization struct {
	Type           MaterializationType     `json:"type"`
	Strategy       MaterializationStrategy `json:"strategy"`
	PartitionBy    string                  `json:"partition_by"`
	ClusterBy      []string                `json:"cluster_by"`
	IncrementalKey string                  `json:"incremental_key"`
}

func (Materialization) MarshalJSON

func (m Materialization) MarshalJSON() ([]byte, error)

type MaterializationStrategy

type MaterializationStrategy string
const (
	MaterializationStrategyNone          MaterializationStrategy = ""
	MaterializationStrategyCreateReplace MaterializationStrategy = "create+replace"
	MaterializationStrategyDeleteInsert  MaterializationStrategy = "delete+insert"
	MaterializationStrategyAppend        MaterializationStrategy = "append"
	MaterializationStrategyMerge         MaterializationStrategy = "merge"
)

type MaterializationType

type MaterializationType string
const (
	MaterializationTypeNone  MaterializationType = ""
	MaterializationTypeView  MaterializationType = "view"
	MaterializationTypeTable MaterializationType = "table"
)

type Materializer added in v0.5.0

type Materializer struct {
	MaterializationMap AssetMaterializationMap
	FullRefresh        bool
}

func (*Materializer) Render added in v0.5.0

func (m *Materializer) Render(asset *Asset, query string) (string, error)

type MaterializerFunc added in v0.5.0

type MaterializerFunc func(task *Asset, query string) (string, error)

type Notifications

type Notifications struct {
	Slack []SlackNotification `json:"slack"`
}

func (Notifications) MarshalJSON

func (n Notifications) MarshalJSON() ([]byte, error)

type ParseError added in v0.10.1

type ParseError struct {
	Msg string
}

func (*ParseError) Error added in v0.10.1

func (e *ParseError) Error() string

type Pipeline

type Pipeline struct {
	LegacyID           string          `yaml:"id" json:"legacy_id"`
	Name               string          `yaml:"name" json:"name"`
	Schedule           schedule        `yaml:"schedule" json:"schedule"`
	StartDate          string          `yaml:"start_date" json:"start_date"`
	DefinitionFile     DefinitionFile  `json:"definition_file"`
	DefaultParameters  EmptyStringMap  `yaml:"default_parameters" json:"default_parameters"`
	DefaultConnections EmptyStringMap  `yaml:"default_connections" json:"default_connections"`
	Assets             AssetCollection `json:"assets"`
	Notifications      Notifications   `yaml:"notifications" json:"notifications"`
	Catchup            bool            `yaml:"catchup" json:"catchup"`
	Retries            int             `yaml:"retries" json:"retries"`

	TasksByType map[AssetType][]*Asset `json:"-"`
	// contains filtered or unexported fields
}

func PipelineFromPath added in v0.11.6

func PipelineFromPath(filePath string, fs afero.Fs) (*Pipeline, error)

func (*Pipeline) GetAssetByName added in v0.5.6

func (p *Pipeline) GetAssetByName(assetName string) *Asset

func (*Pipeline) GetAssetByPath

func (p *Pipeline) GetAssetByPath(assetPath string) *Asset

func (*Pipeline) GetAssetsByTag added in v0.11.0

func (p *Pipeline) GetAssetsByTag(tag string) []*Asset

func (*Pipeline) GetConnectionNameForAsset

func (p *Pipeline) GetConnectionNameForAsset(asset *Asset) (string, error)

func (*Pipeline) GetMajorityAssetTypesFromSQLAssets added in v0.9.1

func (p *Pipeline) GetMajorityAssetTypesFromSQLAssets(defaultIfNone AssetType) AssetType

func (*Pipeline) HasAssetType

func (p *Pipeline) HasAssetType(taskType AssetType) bool

func (*Pipeline) RelativeAssetPath

func (p *Pipeline) RelativeAssetPath(t *Asset) string

func (*Pipeline) WipeContentOfAssets added in v0.11.16

func (p *Pipeline) WipeContentOfAssets()

WipeContentOfAssets removes the content of the executable files of all assets in the pipeline. This is useful when we want to serialize the pipeline to JSON and we don't want to include the content of the assets.

type RunConfig added in v0.10.3

type RunConfig string

type SecretMapping

type SecretMapping struct {
	SecretKey   string `json:"secret_key"`
	InjectedKey string `json:"injected_key"`
}

type SlackNotification

type SlackNotification struct {
	Channel string `json:"channel"`
}

type SnowflakeConfig added in v0.11.14

type SnowflakeConfig struct {
	Warehouse string `json:"warehouse"`
}

func (SnowflakeConfig) MarshalJSON added in v0.11.14

func (s SnowflakeConfig) MarshalJSON() ([]byte, error)

type TaskCreator

type TaskCreator func(path string) (*Asset, error)

func CreateTaskFromFileComments

func CreateTaskFromFileComments(fs afero.Fs) TaskCreator

func CreateTaskFromYamlDefinition

func CreateTaskFromYamlDefinition(fs afero.Fs) TaskCreator

type TaskDefinitionFile

type TaskDefinitionFile struct {
	Name string             `json:"name"`
	Path string             `json:"path"`
	Type TaskDefinitionType `json:"type"`
}

type TaskDefinitionType

type TaskDefinitionType string

type TaskSchedule

type TaskSchedule struct {
	Days []string `json:"days"`
}

type Upstream added in v0.11.9

type Upstream struct {
	Type     string         `json:"type"`
	Value    string         `json:"value"`
	Metadata EmptyStringMap `json:"metadata,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL