pipeline

package
v0.11.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CommentTask TaskDefinitionType = "comment"
	YamlTask    TaskDefinitionType = "yaml"

	AssetTypePython               = AssetType("python")
	AssetTypeSnowflakeQuery       = AssetType("sf.sql")
	AssetTypeSnowflakeQuerySensor = AssetType("sf.sensor.query")
	AssetTypeBigqueryQuery        = AssetType("bq.sql")
	AssetTypeBigqueryTableSensor  = AssetType("bq.sensor.table")
	AssetTypeBigqueryQuerySensor  = AssetType("bq.sensor.query")
	AssetTypeEmpty                = AssetType("empty")
	AssetTypePostgresQuery        = AssetType("pg.sql")
	AssetTypeRedshiftQuery        = AssetType("rs.sql")
	AssetTypeMsSQLQuery           = AssetType("ms.sql")
	AssetTypeSynapseQuery         = AssetType("synapse.sql")
	AssetTypeIngestr              = AssetType("ingestr")

	RunConfigFullRefresh = RunConfig("full-refresh")
	RunConfigStartDate   = RunConfig("start-date")
	RunConfigEndDate     = RunConfig("end-date")
)

Variables

View Source
var AssetTypeConnectionMapping = map[AssetType]string{
	AssetTypeBigqueryQuery:        "google_cloud_platform",
	AssetTypeBigqueryTableSensor:  "google_cloud_platform",
	AssetTypeSnowflakeQuery:       "snowflake",
	AssetTypeSnowflakeQuerySensor: "snowflake",
	AssetTypePostgresQuery:        "postgres",
	AssetTypeRedshiftQuery:        "redshift",
	AssetTypeMsSQLQuery:           "mssql",
	AssetTypeSynapseQuery:         "synapse",
}
View Source
var IngestrTypeConnectionMapping = map[string]AssetType{
	"bigquery":  AssetTypeBigqueryQuery,
	"snowflake": AssetTypeSnowflakeQuery,
	"postgres":  AssetTypePostgresQuery,
	"redshift":  AssetTypeRedshiftQuery,
	"mssql":     AssetTypeMsSQLQuery,
	"synapse":   AssetTypeSynapseQuery,
}
View Source
var ValidQualityChecks = map[string]bool{
	"not_null":        true,
	"unique":          true,
	"positive":        true,
	"min":             true,
	"max":             true,
	"accepted_values": true,
	"negative":        true,
	"non_negative":    true,
	"pattern":         true,
}

Functions

This section is empty.

Types

type Asset

type Asset struct {
	ID              string             `json:"id"`
	Name            string             `json:"name"`
	Description     string             `json:"description"`
	Type            AssetType          `json:"type"`
	ExecutableFile  ExecutableFile     `json:"executable_file"`
	DefinitionFile  TaskDefinitionFile `json:"definition_file"`
	Parameters      EmptyStringMap     `json:"parameters"`
	Connection      string             `json:"connection"`
	Secrets         []SecretMapping    `json:"secrets"`
	DependsOn       []string           `json:"upstream"`
	Materialization Materialization    `json:"materialization"`
	Columns         []Column           `json:"columns"`
	CustomChecks    []CustomCheck      `json:"custom_checks"`
	Image           string             `json:"image"`
	Instance        string             `json:"instance"`
	Owner           string             `json:"owner"`
	Metadata        EmptyStringMap     `json:"metadata"`
	Tags            EmptyStringArray   `json:"tags"`

	Pipeline *Pipeline `json:"-"`

	Upstreams []Upstream `json:"upstreams"`
	// contains filtered or unexported fields
}

func ConvertYamlToTask

func ConvertYamlToTask(content []byte) (*Asset, error)

func (*Asset) AddDownstream

func (a *Asset) AddDownstream(asset *Asset)

func (*Asset) AddUpstream

func (a *Asset) AddUpstream(asset *Asset)

func (*Asset) ColumnNames added in v0.5.0

func (a *Asset) ColumnNames() []string

func (*Asset) ColumnNamesWithPrimaryKey added in v0.5.0

func (a *Asset) ColumnNamesWithPrimaryKey() []string

func (*Asset) ColumnNamesWithUpdateOnMerge added in v0.5.0

func (a *Asset) ColumnNamesWithUpdateOnMerge() []string

func (*Asset) EnrichFromEntityAttributes added in v0.11.9

func (a *Asset) EnrichFromEntityAttributes(entities []*glossary.Entity) error

func (*Asset) GetDownstream

func (a *Asset) GetDownstream() []*Asset

func (*Asset) GetFullDownstream

func (a *Asset) GetFullDownstream() []*Asset

func (*Asset) GetFullUpstream

func (a *Asset) GetFullUpstream() []*Asset

func (*Asset) GetUpstream

func (a *Asset) GetUpstream() []*Asset

func (Asset) MarshalJSON added in v0.11.9

func (a Asset) MarshalJSON() ([]byte, error)

type AssetCollection added in v0.6.6

type AssetCollection []*Asset

func (AssetCollection) MarshalJSON added in v0.6.6

func (ac AssetCollection) MarshalJSON() ([]byte, error)

type AssetMaterializationMap added in v0.5.0

type AssetMaterializationMap map[MaterializationType]map[MaterializationStrategy]MaterializerFunc

type AssetType

type AssetType string

type Builder added in v0.11.9

type Builder struct {
	GlossaryReader glossaryReader
	// contains filtered or unexported fields
}

func NewBuilder

func NewBuilder(config BuilderConfig, yamlTaskCreator TaskCreator, commentTaskCreator TaskCreator, fs afero.Fs, gr glossaryReader) *Builder

func (*Builder) CreateAssetFromFile added in v0.11.9

func (b *Builder) CreateAssetFromFile(path string) (*Asset, error)

func (*Builder) CreatePipelineFromPath added in v0.11.9

func (b *Builder) CreatePipelineFromPath(pathToPipeline string) (*Pipeline, error)

func (*Builder) SetGlossaryReader added in v0.11.9

func (b *Builder) SetGlossaryReader(reader glossaryReader)

type BuilderConfig

type BuilderConfig struct {
	PipelineFileName    string
	TasksDirectoryName  string
	TasksDirectoryNames []string
	TasksFileSuffixes   []string
}

type Column

type Column struct {
	EntityAttribute *EntityAttribute `json:"entity_attribute"`
	Name            string           `json:"name"`
	Type            string           `json:"type"`
	Description     string           `json:"description"`
	Checks          []ColumnCheck    `json:"checks"`
	PrimaryKey      bool             `json:"primary_key"`
	UpdateOnMerge   bool             `json:"update_on_merge"`
}

func (*Column) HasCheck added in v0.9.0

func (c *Column) HasCheck(check string) bool

type ColumnCheck

type ColumnCheck struct {
	ID       string           `json:"id"`
	Name     string           `json:"name"`
	Value    ColumnCheckValue `json:"value"`
	Blocking bool             `json:"blocking"`
}

func NewColumnCheck added in v0.3.0

func NewColumnCheck(assetName, columnName, name string, value ColumnCheckValue, blocking bool) ColumnCheck

type ColumnCheckValue

type ColumnCheckValue struct {
	IntArray    *[]int    `json:"int_array"`
	Int         *int      `json:"int"`
	Float       *float64  `json:"float"`
	StringArray *[]string `json:"string_array"`
	String      *string   `json:"string"`
	Bool        *bool     `json:"bool"`
}

func (*ColumnCheckValue) MarshalJSON

func (ccv *ColumnCheckValue) MarshalJSON() ([]byte, error)

func (*ColumnCheckValue) ToString added in v0.10.1

func (ccv *ColumnCheckValue) ToString() string

func (*ColumnCheckValue) UnmarshalJSON added in v0.5.0

func (ccv *ColumnCheckValue) UnmarshalJSON(data []byte) error

type CustomCheck

type CustomCheck struct {
	ID          string `json:"id"`
	Name        string `json:"name"`
	Description string `json:"description"`
	Query       string `json:"query"`
	Value       int64  `json:"value"`
	Blocking    bool   `json:"blocking"`
}

type DefinitionFile

type DefinitionFile struct {
	Name string `json:"name"`
	Path string `json:"path"`
}

type EmptyStringArray added in v0.11.0

type EmptyStringArray []string

func (EmptyStringArray) MarshalJSON added in v0.11.0

func (a EmptyStringArray) MarshalJSON() ([]byte, error)

func (*EmptyStringArray) UnmarshalJSON added in v0.11.0

func (a *EmptyStringArray) UnmarshalJSON(data []byte) error

type EmptyStringMap

type EmptyStringMap map[string]string

func (EmptyStringMap) MarshalJSON

func (m EmptyStringMap) MarshalJSON() ([]byte, error)

func (*EmptyStringMap) UnmarshalJSON added in v0.5.0

func (b *EmptyStringMap) UnmarshalJSON(data []byte) error

type EntityAttribute added in v0.11.9

type EntityAttribute struct {
	Entity    string `json:"entity"`
	Attribute string `json:"attribute"`
}

type ExecutableFile

type ExecutableFile struct {
	Name    string `json:"name"`
	Path    string `json:"path"`
	Content string `json:"content"`
}

type Materialization

type Materialization struct {
	Type           MaterializationType     `json:"type"`
	Strategy       MaterializationStrategy `json:"strategy"`
	PartitionBy    string                  `json:"partition_by"`
	ClusterBy      []string                `json:"cluster_by"`
	IncrementalKey string                  `json:"incremental_key"`
}

func (Materialization) MarshalJSON

func (m Materialization) MarshalJSON() ([]byte, error)

type MaterializationStrategy

type MaterializationStrategy string
const (
	MaterializationStrategyNone          MaterializationStrategy = ""
	MaterializationStrategyCreateReplace MaterializationStrategy = "create+replace"
	MaterializationStrategyDeleteInsert  MaterializationStrategy = "delete+insert"
	MaterializationStrategyAppend        MaterializationStrategy = "append"
	MaterializationStrategyMerge         MaterializationStrategy = "merge"
)

type MaterializationType

type MaterializationType string
const (
	MaterializationTypeNone  MaterializationType = ""
	MaterializationTypeView  MaterializationType = "view"
	MaterializationTypeTable MaterializationType = "table"
)

type Materializer added in v0.5.0

type Materializer struct {
	MaterializationMap AssetMaterializationMap
	FullRefresh        bool
}

func (*Materializer) Render added in v0.5.0

func (m *Materializer) Render(asset *Asset, query string) (string, error)

type MaterializerFunc added in v0.5.0

type MaterializerFunc func(task *Asset, query string) (string, error)

type Notifications

type Notifications struct {
	Slack []SlackNotification `json:"slack"`
}

func (Notifications) MarshalJSON

func (n Notifications) MarshalJSON() ([]byte, error)

type ParseError added in v0.10.1

type ParseError struct {
	Msg string
}

func (*ParseError) Error added in v0.10.1

func (e *ParseError) Error() string

type Pipeline

type Pipeline struct {
	LegacyID           string          `yaml:"id" json:"legacy_id"`
	Name               string          `yaml:"name" json:"name"`
	Schedule           schedule        `yaml:"schedule" json:"schedule"`
	StartDate          string          `yaml:"start_date" json:"start_date"`
	DefinitionFile     DefinitionFile  `json:"definition_file"`
	DefaultParameters  EmptyStringMap  `yaml:"default_parameters" json:"default_parameters"`
	DefaultConnections EmptyStringMap  `yaml:"default_connections" json:"default_connections"`
	Assets             AssetCollection `json:"assets"`
	Notifications      Notifications   `yaml:"notifications" json:"notifications"`
	Catchup            bool            `yaml:"catchup" json:"catchup"`
	Retries            int             `yaml:"retries" json:"retries"`

	TasksByType map[AssetType][]*Asset `json:"-"`
	// contains filtered or unexported fields
}

func PipelineFromPath added in v0.11.6

func PipelineFromPath(filePath string, fs afero.Fs) (*Pipeline, error)

func (*Pipeline) GetAssetByName added in v0.5.6

func (p *Pipeline) GetAssetByName(assetName string) *Asset

func (*Pipeline) GetAssetByPath

func (p *Pipeline) GetAssetByPath(assetPath string) *Asset

func (*Pipeline) GetAssetsByTag added in v0.11.0

func (p *Pipeline) GetAssetsByTag(tag string) []*Asset

func (*Pipeline) GetConnectionNameForAsset

func (p *Pipeline) GetConnectionNameForAsset(asset *Asset) (string, error)

func (*Pipeline) GetMajorityAssetTypesFromSQLAssets added in v0.9.1

func (p *Pipeline) GetMajorityAssetTypesFromSQLAssets(defaultIfNone AssetType) AssetType

func (*Pipeline) HasAssetType

func (p *Pipeline) HasAssetType(taskType AssetType) bool

func (*Pipeline) RelativeAssetPath

func (p *Pipeline) RelativeAssetPath(t *Asset) string

type RunConfig added in v0.10.3

type RunConfig string

type SecretMapping

type SecretMapping struct {
	SecretKey   string `json:"secret_key"`
	InjectedKey string `json:"injected_key"`
}

type SlackNotification

type SlackNotification struct {
	Channel string `json:"channel"`
}

type TaskCreator

type TaskCreator func(path string) (*Asset, error)

func CreateTaskFromFileComments

func CreateTaskFromFileComments(fs afero.Fs) TaskCreator

func CreateTaskFromYamlDefinition

func CreateTaskFromYamlDefinition(fs afero.Fs) TaskCreator

type TaskDefinitionFile

type TaskDefinitionFile struct {
	Name string             `json:"name"`
	Path string             `json:"path"`
	Type TaskDefinitionType `json:"type"`
}

type TaskDefinitionType

type TaskDefinitionType string

type TaskSchedule

type TaskSchedule struct {
	Days []string `json:"days"`
}

type Upstream added in v0.11.9

type Upstream struct {
	Type  string `json:"type"`
	Value string `json:"value"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL