pipeline

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CommentTask TaskDefinitionType = "comment"
	YamlTask    TaskDefinitionType = "yaml"

	AssetTypePython               = AssetType("python")
	AssetTypeSnowflakeQuery       = AssetType("sf.sql")
	AssetTypeSnowflakeQuerySensor = AssetType("sf.sensor.query")
	AssetTypeBigqueryQuery        = AssetType("bq.sql")
	AssetTypeEmpty                = AssetType("empty")
	AssetTypePostgresQuery        = AssetType("pg.sql")
	AssetTypeRedshiftQuery        = AssetType("rs.sql")
)

Variables

View Source
var ValidQualityChecks = map[string]bool{
	"not_null":        true,
	"unique":          true,
	"positive":        true,
	"min":             true,
	"max":             true,
	"accepted_values": true,
	"non_negative":    true,
}

Functions

func NewBuilder

func NewBuilder(config BuilderConfig, yamlTaskCreator TaskCreator, commentTaskCreator TaskCreator, fs afero.Fs) *builder

Types

type Asset

type Asset struct {
	ID              string             `json:"id"`
	Name            string             `json:"name"`
	Description     string             `json:"description"`
	Type            AssetType          `json:"type"`
	ExecutableFile  ExecutableFile     `json:"executable_file"`
	DefinitionFile  TaskDefinitionFile `json:"definition_file"`
	Parameters      EmptyStringMap     `json:"parameters"`
	Connection      string             `json:"connection"`
	Secrets         []SecretMapping    `json:"secrets"`
	DependsOn       []string           `json:"upstream"`
	Materialization Materialization    `json:"materialization"`
	Columns         []Column           `json:"columns"`
	CustomChecks    []CustomCheck      `json:"custom_checks"`
	Image           string             `json:"image"`
	Metadata        EmptyStringMap     `json:"metadata"`

	Pipeline *Pipeline `json:"-"`
	// contains filtered or unexported fields
}

func ConvertYamlToTask

func ConvertYamlToTask(content []byte) (*Asset, error)

func (*Asset) AddDownstream

func (a *Asset) AddDownstream(asset *Asset)

func (*Asset) AddUpstream

func (a *Asset) AddUpstream(asset *Asset)

func (*Asset) ColumnNames added in v0.5.0

func (a *Asset) ColumnNames() []string

func (*Asset) ColumnNamesWithPrimaryKey added in v0.5.0

func (a *Asset) ColumnNamesWithPrimaryKey() []string

func (*Asset) ColumnNamesWithUpdateOnMerge added in v0.5.0

func (a *Asset) ColumnNamesWithUpdateOnMerge() []string

func (*Asset) GetDownstream

func (a *Asset) GetDownstream() []*Asset

func (*Asset) GetFullDownstream

func (a *Asset) GetFullDownstream() []*Asset

func (*Asset) GetFullUpstream

func (a *Asset) GetFullUpstream() []*Asset

func (*Asset) GetUpstream

func (a *Asset) GetUpstream() []*Asset

type AssetMaterializationMap added in v0.5.0

type AssetMaterializationMap map[MaterializationType]map[MaterializationStrategy]MaterializerFunc

type AssetType

type AssetType string

type BuilderConfig

type BuilderConfig struct {
	PipelineFileName    string
	TasksDirectoryName  string
	TasksDirectoryNames []string
	TasksFileSuffixes   []string
}

type Column

type Column struct {
	Name          string        `json:"name"`
	Type          string        `json:"type"`
	Description   string        `json:"description"`
	Checks        []ColumnCheck `json:"checks"`
	PrimaryKey    bool          `json:"primary_key"`
	UpdateOnMerge bool          `json:"update_on_merge"`
}

type ColumnCheck

type ColumnCheck struct {
	ID    string           `json:"id"`
	Name  string           `json:"name"`
	Value ColumnCheckValue `json:"value"`
}

func NewColumnCheck added in v0.3.0

func NewColumnCheck(assetName, columnName, name string, value ColumnCheckValue) ColumnCheck

type ColumnCheckValue

type ColumnCheckValue struct {
	IntArray    *[]int    `json:"int_array"`
	Int         *int      `json:"int"`
	Float       *float64  `json:"float"`
	StringArray *[]string `json:"string_array"`
	String      *string   `json:"string"`
	Bool        *bool     `json:"bool"`
}

func (*ColumnCheckValue) MarshalJSON

func (ccv *ColumnCheckValue) MarshalJSON() ([]byte, error)

func (*ColumnCheckValue) UnmarshalJSON added in v0.5.0

func (ccv *ColumnCheckValue) UnmarshalJSON(data []byte) error

type CustomCheck

type CustomCheck struct {
	ID    string `json:"id"`
	Name  string `json:"name"`
	Query string `json:"query"`
	Value int64  `json:"value"`
}

type DefinitionFile

type DefinitionFile struct {
	Name string `json:"name"`
	Path string `json:"path"`
}

type EmptyStringMap

type EmptyStringMap map[string]string

func (EmptyStringMap) MarshalJSON

func (m EmptyStringMap) MarshalJSON() ([]byte, error)

func (*EmptyStringMap) UnmarshalJSON added in v0.5.0

func (b *EmptyStringMap) UnmarshalJSON(data []byte) error

type ExecutableFile

type ExecutableFile struct {
	Name    string `json:"name"`
	Path    string `json:"path"`
	Content string `json:"content"`
}

type Materialization

type Materialization struct {
	Type           MaterializationType     `json:"type"`
	Strategy       MaterializationStrategy `json:"strategy"`
	PartitionBy    string                  `json:"partition_by"`
	ClusterBy      []string                `json:"cluster_by"`
	IncrementalKey string                  `json:"incremental_key"`
}

func (Materialization) MarshalJSON

func (m Materialization) MarshalJSON() ([]byte, error)

type MaterializationStrategy

type MaterializationStrategy string
const (
	MaterializationStrategyNone          MaterializationStrategy = ""
	MaterializationStrategyCreateReplace MaterializationStrategy = "create+replace"
	MaterializationStrategyDeleteInsert  MaterializationStrategy = "delete+insert"
	MaterializationStrategyAppend        MaterializationStrategy = "append"
	MaterializationStrategyMerge         MaterializationStrategy = "merge"
)

type MaterializationType

type MaterializationType string
const (
	MaterializationTypeNone  MaterializationType = ""
	MaterializationTypeView  MaterializationType = "view"
	MaterializationTypeTable MaterializationType = "table"
)

type Materializer added in v0.5.0

type Materializer struct {
	MaterializationMap AssetMaterializationMap
}

func (*Materializer) Render added in v0.5.0

func (m *Materializer) Render(asset *Asset, query string) (string, error)

type MaterializerFunc added in v0.5.0

type MaterializerFunc func(task *Asset, query string) (string, error)

type Notifications

type Notifications struct {
	Slack []SlackNotification `json:"slack"`
}

func (Notifications) MarshalJSON

func (n Notifications) MarshalJSON() ([]byte, error)

type Pipeline

type Pipeline struct {
	LegacyID           string         `yaml:"id" json:"legacy_id"`
	Name               string         `yaml:"name" json:"name"`
	Schedule           schedule       `yaml:"schedule" json:"schedule"`
	StartDate          string         `yaml:"start_date" json:"start_date"`
	DefinitionFile     DefinitionFile `json:"definition_file"`
	DefaultParameters  EmptyStringMap `yaml:"default_parameters" json:"default_parameters"`
	DefaultConnections EmptyStringMap `yaml:"default_connections" json:"default_connections"`
	Assets             []*Asset       `json:"assets"`
	Notifications      Notifications  `yaml:"notifications" json:"notifications"`
	Catchup            bool           `yaml:"catchup" json:"catchup"`
	Retries            int            `yaml:"retries" json:"retries"`

	TasksByType map[AssetType][]*Asset `json:"-"`
	// contains filtered or unexported fields
}

func (*Pipeline) GetAssetByPath

func (p *Pipeline) GetAssetByPath(assetPath string) *Asset

func (*Pipeline) GetConnectionNameForAsset

func (p *Pipeline) GetConnectionNameForAsset(asset *Asset) string

func (*Pipeline) HasAssetType

func (p *Pipeline) HasAssetType(taskType AssetType) bool

func (*Pipeline) RelativeAssetPath

func (p *Pipeline) RelativeAssetPath(t *Asset) string

type SecretMapping

type SecretMapping struct {
	SecretKey   string `json:"secret_key"`
	InjectedKey string `json:"injected_key"`
}

type SlackNotification

type SlackNotification struct {
	Channel string `json:"channel"`
}

type TaskCreator

type TaskCreator func(path string) (*Asset, error)

func CreateTaskFromFileComments

func CreateTaskFromFileComments(fs afero.Fs) TaskCreator

func CreateTaskFromYamlDefinition

func CreateTaskFromYamlDefinition(fs afero.Fs) TaskCreator

type TaskDefinitionFile

type TaskDefinitionFile struct {
	Name string             `json:"name"`
	Path string             `json:"path"`
	Type TaskDefinitionType `json:"type"`
}

type TaskDefinitionType

type TaskDefinitionType string

type TaskSchedule

type TaskSchedule struct {
	Days []string `json:"days"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL