pipeline

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CommentTask TaskDefinitionType = "comment"
	YamlTask    TaskDefinitionType = "yaml"

	AssetTypePython         = AssetType("python")
	AssetTypeSnowflakeQuery = AssetType("sf.sql")
	AssetTypeBigqueryQuery  = AssetType("bq.sql")
	AssetTypeEmpty          = AssetType("empty")
)

Variables

View Source
var ValidQualityChecks = map[string]bool{
	"not_null":        true,
	"unique":          true,
	"positive":        true,
	"min":             true,
	"max":             true,
	"accepted_values": true,
}

Functions

func NewBuilder

func NewBuilder(config BuilderConfig, yamlTaskCreator TaskCreator, commentTaskCreator TaskCreator, fs afero.Fs) *builder

Types

type Asset

type Asset struct {
	ID              string             `json:"id"`
	Name            string             `json:"name"`
	Description     string             `json:"description"`
	Type            AssetType          `json:"type"`
	ExecutableFile  ExecutableFile     `json:"executable_file"`
	DefinitionFile  TaskDefinitionFile `json:"definition_file"`
	Parameters      EmptyStringMap     `json:"parameters"`
	Connection      string             `json:"connection"`
	Secrets         []SecretMapping    `json:"secrets"`
	DependsOn       []string           `json:"upstream"`
	Materialization Materialization    `json:"materialization"`
	Columns         []Column           `json:"columns"`
	CustomChecks    []CustomCheck      `json:"custom_checks"`
	Image           string             `json:"image"`
	Metadata        EmptyStringMap     `json:"metadata"`

	Pipeline *Pipeline `json:"-"`
	// contains filtered or unexported fields
}

func ConvertYamlToTask

func ConvertYamlToTask(content []byte) (*Asset, error)

func (*Asset) AddDownstream

func (a *Asset) AddDownstream(asset *Asset)

func (*Asset) AddUpstream

func (a *Asset) AddUpstream(asset *Asset)

func (*Asset) GetDownstream

func (a *Asset) GetDownstream() []*Asset

func (*Asset) GetFullDownstream

func (a *Asset) GetFullDownstream() []*Asset

func (*Asset) GetFullUpstream

func (a *Asset) GetFullUpstream() []*Asset

func (*Asset) GetUpstream

func (a *Asset) GetUpstream() []*Asset

type AssetType

type AssetType string

type BuilderConfig

type BuilderConfig struct {
	PipelineFileName    string
	TasksDirectoryName  string
	TasksDirectoryNames []string
	TasksFileSuffixes   []string
}

type Column

type Column struct {
	Name        string        `json:"name"`
	Type        string        `json:"type"`
	Description string        `json:"description"`
	Checks      []ColumnCheck `json:"checks"`
}

type ColumnCheck

type ColumnCheck struct {
	ID    string           `json:"id"`
	Name  string           `json:"name"`
	Value ColumnCheckValue `json:"value"`
}

func NewColumnCheck added in v0.3.0

func NewColumnCheck(assetName, columnName, name string, value ColumnCheckValue) ColumnCheck

type ColumnCheckValue

type ColumnCheckValue struct {
	IntArray    *[]int    `json:"int_array"`
	Int         *int      `json:"int"`
	Float       *float64  `json:"float"`
	StringArray *[]string `json:"string_array"`
	String      *string   `json:"string"`
	Bool        *bool     `json:"bool"`
}

func (*ColumnCheckValue) MarshalJSON

func (v *ColumnCheckValue) MarshalJSON() ([]byte, error)

type CustomCheck

type CustomCheck struct {
	ID    string           `json:"id"`
	Name  string           `json:"name"`
	Query string           `json:"query"`
	Value ColumnCheckValue `json:"value"`
}

type DefinitionFile

type DefinitionFile struct {
	Name string `json:"name"`
	Path string `json:"path"`
}

type EmptyStringMap

type EmptyStringMap map[string]string

func (EmptyStringMap) MarshalJSON

func (m EmptyStringMap) MarshalJSON() ([]byte, error)

type ExecutableFile

type ExecutableFile struct {
	Name    string `json:"name"`
	Path    string `json:"path"`
	Content string `json:"content"`
}

type Materialization

type Materialization struct {
	Type           MaterializationType     `json:"type"`
	Strategy       MaterializationStrategy `json:"strategy"`
	PartitionBy    string                  `json:"partition_by"`
	ClusterBy      []string                `json:"cluster_by"`
	IncrementalKey string                  `json:"incremental_key"`
}

func (Materialization) MarshalJSON

func (m Materialization) MarshalJSON() ([]byte, error)

type MaterializationStrategy

type MaterializationStrategy string
const (
	MaterializationStrategyNone          MaterializationStrategy = ""
	MaterializationStrategyCreateReplace MaterializationStrategy = "create+replace"
	MaterializationStrategyDeleteInsert  MaterializationStrategy = "delete+insert"
	MaterializationStrategyAppend        MaterializationStrategy = "append"
)

type MaterializationType

type MaterializationType string
const (
	MaterializationTypeNone  MaterializationType = ""
	MaterializationTypeView  MaterializationType = "view"
	MaterializationTypeTable MaterializationType = "table"
)

type Notifications

type Notifications struct {
	Slack []SlackNotification `json:"slack"`
}

func (Notifications) MarshalJSON

func (n Notifications) MarshalJSON() ([]byte, error)

type Pipeline

type Pipeline struct {
	LegacyID           string         `yaml:"id" json:"legacy_id"`
	Name               string         `yaml:"name" json:"name"`
	Schedule           schedule       `yaml:"schedule" json:"schedule"`
	StartDate          string         `yaml:"start_date" json:"start_date"`
	DefinitionFile     DefinitionFile `json:"definition_file"`
	DefaultParameters  EmptyStringMap `yaml:"default_parameters" json:"default_parameters"`
	DefaultConnections EmptyStringMap `yaml:"default_connections" json:"default_connections"`
	Assets             []*Asset       `json:"assets"`
	Notifications      Notifications  `yaml:"notifications" json:"notifications"`

	TasksByType map[AssetType][]*Asset `json:"-"`
	// contains filtered or unexported fields
}

func (*Pipeline) GetAssetByPath

func (p *Pipeline) GetAssetByPath(assetPath string) *Asset

func (*Pipeline) GetConnectionNameForAsset

func (p *Pipeline) GetConnectionNameForAsset(asset *Asset) string

func (*Pipeline) HasAssetType

func (p *Pipeline) HasAssetType(taskType AssetType) bool

func (*Pipeline) RelativeAssetPath

func (p *Pipeline) RelativeAssetPath(t *Asset) string

type SecretMapping

type SecretMapping struct {
	SecretKey   string `json:"secret_key"`
	InjectedKey string `json:"injected_key"`
}

type SlackNotification

type SlackNotification struct {
	Channel string `json:"channel"`
}

type TaskCreator

type TaskCreator func(path string) (*Asset, error)

func CreateTaskFromFileComments

func CreateTaskFromFileComments(fs afero.Fs) TaskCreator

func CreateTaskFromYamlDefinition

func CreateTaskFromYamlDefinition(fs afero.Fs) TaskCreator

type TaskDefinitionFile

type TaskDefinitionFile struct {
	Name string             `json:"name"`
	Path string             `json:"path"`
	Type TaskDefinitionType `json:"type"`
}

type TaskDefinitionType

type TaskDefinitionType string

type TaskSchedule

type TaskSchedule struct {
	Days []string `json:"days"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL