drivers

package
v0.53.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2025 License: Apache-2.0 Imports: 21 Imported by: 0

README

Each driver in the package can implement one or more of following interfaces:

Following interfaces are system interfaces and is present for all instances. Depending on cloud/local these can be shared with all instances or private to an instance as well. Global connectors are passed as runtime configs. Instance specific connectors are set in instance while creating instance.

  • OLAPStore for storing data and running analytical queries
  • CatalogStore for storing sources, models and metrics views, including metadata like last refresh time
  • RepoStore for storing code artifacts (this is essentially a file system shim)
  • RegistryStore for tracking instances (DSNs for OLAPs and repos, instance variables etc)

Following interfaces are only available as source connectors. These are always instance specific connectors.

  • ObjectStore for downloading files from remote object stores like s3,gcs etc
  • SQLStore for runnning arbitrary SQL queries against DataWarehouses like bigquery. Caution: Not to be confused with postgres, duckdb etc.
  • FileStore stores path for local files.

Special interfaces. Also instance specific.

  • Transporter for transfering data from one infra to other.

Documentation

Index

Constants

View Source
const RepoListLimit = 2000

RepoListLimit is the maximum number of files that can be listed in a call to RepoStore.ListRecursive. This limit is effectively a cap on the number of files in a project because `rill start` lists the project directory using a "**" glob.

Variables

View Source
var (
	// ErrUnsupportedConnector is returned from Ingest for unsupported connectors.
	ErrUnsupportedConnector = errors.New("drivers: connector not supported")
	// ErrOptimizationFailure is returned when an optimization fails.
	ErrOptimizationFailure = errors.New("drivers: optimization failure")
)
View Source
var Connectors = make(map[string]Driver)

Connectors tracks all registered connector drivers.

View Source
var Drivers = make(map[string]Driver)

Drivers is a registry of drivers.

View Source
var ErrFileAlreadyExists = errors.New("file already exists")
View Source
var ErrInconsistentControllerVersion = errors.New("controller: inconsistent version")

ErrInconsistentControllerVersion is returned from Controller when an unexpected controller version is observed in the DB. An unexpected controller version will only be observed if multiple controllers are running simultanesouly (split brain).

View Source
var ErrNoRows = errors.New("no rows found for the query")
View Source
var ErrNotFound = errors.New("driver: not found")

ErrNotFound indicates the resource wasn't found.

View Source
var ErrNotImplemented = errors.New("driver: not implemented")

ErrNotImplemented indicates the driver doesn't support the requested operation.

View Source
var ErrNotNotifier = errors.New("driver: not a notifier")

ErrNotNotifier indicates the driver cannot be used as a Notifier.

View Source
var ErrRepoListLimitExceeded = fmt.Errorf("glob exceeded limit of %d matched files", RepoListLimit)

ErrRepoListLimitExceeded should be returned when RepoListLimit is exceeded.

View Source
var ErrResourceAlreadyExists = errors.New("controller: resource already exists")

ErrResourceAlreadyExists is returned from catalog functions when attempting to create a resource that already exists.

View Source
var ErrResourceNotFound = errors.New("controller: resource not found")

ErrResourceNotFound is returned from catalog functions when a referenced resource does not exist.

View Source
var ErrStorageLimitExceeded = fmt.Errorf("connectors: exceeds storage limit")

ErrStorageLimitExceeded indicates the driver's storage limit was exceeded.

Functions

func IsIgnored added in v0.44.0

func IsIgnored(path string, ignorePathsConfig []string) bool

IsIgnored returns true if the path (and any files in nested directories) should be ignored.

func NewPermissionDeniedError added in v0.30.0

func NewPermissionDeniedError(msg string) error

func RecordDownloadMetrics added in v0.30.0

func RecordDownloadMetrics(ctx context.Context, m *DownloadMetrics)

func Register

func Register(name string, driver Driver)

Register registers a new driver.

func RegisterAsConnector added in v0.30.0

func RegisterAsConnector(name string, driver Driver)

RegisterAsConnector tracks a connector driver.

Types

type AIService added in v0.41.0

type AIService interface {
	Complete(ctx context.Context, msgs []*CompletionMessage) (*CompletionMessage, error)
}

type AdminService added in v0.37.0

type AdminService interface {
	GetReportMetadata(ctx context.Context, reportName, ownerID string, emailRecipients []string, executionTime time.Time) (*ReportMetadata, error)
	GetAlertMetadata(ctx context.Context, alertName string, annotations map[string]string, queryForUserID, queryForUserEmail string) (*AlertMetadata, error)
	ProvisionConnector(ctx context.Context, name, driver string, args map[string]any) (map[string]any, error)
}

type AlertMetadata added in v0.41.0

type AlertMetadata struct {
	OpenURL            string
	EditURL            string
	QueryForAttributes map[string]any
}

type AlertStatus added in v0.43.0

type AlertStatus struct {
	// TODO: Remove ToEmail, ToName once email notifier is created
	ToEmail        string
	ToName         string
	DisplayName    string
	ExecutionTime  time.Time
	Status         runtimev1.AssertionStatus
	IsRecover      bool
	FailRow        map[string]any
	ExecutionError string
	OpenLink       string
	EditLink       string
}

type CatalogStore

type CatalogStore interface {
	NextControllerVersion(ctx context.Context) (int64, error)
	CheckControllerVersion(ctx context.Context, v int64) error

	FindResources(ctx context.Context) ([]Resource, error)
	CreateResource(ctx context.Context, v int64, r Resource) error
	UpdateResource(ctx context.Context, v int64, r Resource) error
	DeleteResource(ctx context.Context, v int64, k, n string) error
	DeleteResources(ctx context.Context) error

	FindModelPartitions(ctx context.Context, opts *FindModelPartitionsOptions) ([]ModelPartition, error)
	FindModelPartitionsByKeys(ctx context.Context, modelID string, keys []string) ([]ModelPartition, error)
	CheckModelPartitionsHaveErrors(ctx context.Context, modelID string) (bool, error)
	InsertModelPartition(ctx context.Context, modelID string, partition ModelPartition) error
	UpdateModelPartition(ctx context.Context, modelID string, partition ModelPartition) error
	UpdateModelPartitionPending(ctx context.Context, modelID, partitionKey string) error
	UpdateModelPartitionsPendingIfError(ctx context.Context, modelID string) error
	DeleteModelPartitions(ctx context.Context, modelID string) error

	FindInstanceHealth(ctx context.Context, instanceID string) (*InstanceHealth, error)
	UpsertInstanceHealth(ctx context.Context, h *InstanceHealth) error
}

CatalogStore is implemented by drivers capable of storing catalog info for a specific instance. Implementations should treat resource kinds as case sensitive, but resource names as case insensitive.

type CompletionMessage added in v0.41.0

type CompletionMessage struct {
	Role string
	Data string
}

type CreateTableOptions added in v0.53.0

type CreateTableOptions struct {
	View         bool
	BeforeCreate string
	AfterCreate  string
	TableOpts    map[string]any
}

type Dialect added in v0.16.0

type Dialect int

Dialect enumerates OLAP query languages.

const (
	DialectUnspecified Dialect = iota
	DialectDuckDB
	DialectDruid
	DialectClickHouse
	DialectPinot
)

func (Dialect) AutoUnnest added in v0.51.0

func (d Dialect) AutoUnnest(expr string) string

func (Dialect) CanPivot added in v0.47.0

func (d Dialect) CanPivot() bool

func (Dialect) ConvertToDateTruncSpecifier added in v0.43.0

func (d Dialect) ConvertToDateTruncSpecifier(grain runtimev1.TimeGrain) string

func (Dialect) DateDiff added in v0.49.0

func (d Dialect) DateDiff(grain runtimev1.TimeGrain, t1, t2 time.Time) (string, error)

func (Dialect) DateTruncExpr added in v0.47.0

func (d Dialect) DateTruncExpr(dim *runtimev1.MetricsViewSpec_DimensionV2, grain runtimev1.TimeGrain, tz string, firstDayOfWeek, firstMonthOfYear int) (string, error)

func (Dialect) DimensionSelect added in v0.44.0

func (d Dialect) DimensionSelect(db, dbSchema, table string, dim *runtimev1.MetricsViewSpec_DimensionV2) (dimSelect, unnestClause string)

func (Dialect) DimensionSelectPair added in v0.46.0

func (d Dialect) DimensionSelectPair(db, dbSchema, table string, dim *runtimev1.MetricsViewSpec_DimensionV2) (expr, alias, unnestClause string)

func (Dialect) EscapeIdentifier added in v0.42.0

func (d Dialect) EscapeIdentifier(ident string) string

EscapeIdentifier returns an escaped SQL identifier in the dialect.

func (Dialect) EscapeStringValue added in v0.47.0

func (d Dialect) EscapeStringValue(s string) string

func (Dialect) EscapeTable added in v0.43.0

func (d Dialect) EscapeTable(db, schema, table string) string

EscapeTable returns an esacped fully qualified table name

func (Dialect) GetNullExpr added in v0.52.8

func (d Dialect) GetNullExpr(typ runtimev1.Type_Code) (bool, string)

func (Dialect) GetTimeExpr added in v0.52.8

func (d Dialect) GetTimeExpr(t time.Time) (bool, string)

func (Dialect) GetValExpr added in v0.52.8

func (d Dialect) GetValExpr(val any, typ runtimev1.Type_Code) (bool, string, error)

func (Dialect) JoinOnExpression added in v0.47.0

func (d Dialect) JoinOnExpression(lhs, rhs string) string

func (Dialect) LateralUnnest added in v0.46.0

func (d Dialect) LateralUnnest(expr, tableAlias, colName string) (tbl string, auto bool, err error)

func (Dialect) MetricsViewDimensionExpression added in v0.44.0

func (d Dialect) MetricsViewDimensionExpression(dimension *runtimev1.MetricsViewSpec_DimensionV2) string

func (Dialect) OrderByExpression added in v0.47.0

func (d Dialect) OrderByExpression(name string, desc bool) string

func (Dialect) RequiresCastForLike added in v0.49.0

func (d Dialect) RequiresCastForLike() bool

RequiresCastForLike returns true if the dialect requires an expression used in a LIKE or ILIKE condition to explicitly be cast to type TEXT.

func (Dialect) SafeDivideExpression added in v0.47.0

func (d Dialect) SafeDivideExpression(numExpr, denExpr string) string

func (Dialect) SelectInlineResults added in v0.52.8

func (d Dialect) SelectInlineResults(result *Result) (string, []any, []any, error)

SelectInlineResults returns a SQL query which inline results from the result set supplied along with the positional arguments and dimension values.

func (Dialect) String added in v0.16.0

func (d Dialect) String() string

func (Dialect) SupportsILike added in v0.47.0

func (d Dialect) SupportsILike() bool

type DirEntry added in v0.43.0

type DirEntry struct {
	Path  string
	IsDir bool
}

type DownloadMetrics added in v0.30.0

type DownloadMetrics struct {
	Connector string
	Ext       string
	Partial   bool
	Duration  time.Duration
	Size      int64
}

type Driver

type Driver interface {
	// Spec returns metadata about the driver, such as which configuration properties it supports.
	Spec() Spec

	// Open opens a new handle.
	// If instanceID is empty, the connection is considered shared and its As...() functions may be invoked with different instance IDs.
	Open(instanceID string, config map[string]any, st *storage.Client, ac *activity.Client, logger *zap.Logger) (Handle, error)

	// HasAnonymousSourceAccess returns true if the driver can access the data identified by srcProps without any additional configuration.
	HasAnonymousSourceAccess(ctx context.Context, srcProps map[string]any, logger *zap.Logger) (bool, error)

	// TertiarySourceConnectors returns a list of drivers required to access the data identified by srcProps, excluding the driver itself.
	TertiarySourceConnectors(ctx context.Context, srcProps map[string]any, logger *zap.Logger) ([]string, error)
}

Driver represents an external service that Rill can connect to.

type FileFormat added in v0.47.0

type FileFormat string
const (
	FileFormatUnspecified FileFormat = ""
	FileFormatParquet     FileFormat = "parquet"
	FileFormatCSV         FileFormat = "csv"
	FileFormatJSON        FileFormat = "json"
	FileFormatXLSX        FileFormat = "xlsx"
)

func (FileFormat) Filename added in v0.47.0

func (f FileFormat) Filename(stem string) string

func (FileFormat) Valid added in v0.47.0

func (f FileFormat) Valid() bool

type FileIterator added in v0.30.0

type FileIterator interface {
	// Close do cleanup and release resources
	Close() error
	// Next returns a list of file downloaded from external sources
	// and cleanups file created in previous batch
	Next() ([]string, error)
	// Format returns general file format (json, csv, parquet, etc)
	// Returns an empty string if there is no general format
	Format() string
}

FileIterator provides ways to iteratively download files from external sources Clients should call close once they are done with iterator to release any resources

type FileStore added in v0.30.0

type FileStore interface {
	// FilePaths returns local absolute paths where files are stored
	FilePaths(ctx context.Context, src map[string]any) ([]string, error)
}

type FindModelPartitionsOptions added in v0.51.0

type FindModelPartitionsOptions struct {
	ModelID      string
	Limit        int
	WherePending bool
	WhereErrored bool
	AfterIndex   int
	AfterKey     string
}

FindModelPartitionsOptions is used to filter model partitions.

type Handle added in v0.32.0

type Handle interface {
	// Ping verifies a connection to an external service is healthy
	Ping(ctx context.Context) error

	// Driver name used to open the handle.
	Driver() string

	// Config used to open the handle.
	Config() map[string]any

	// Migrate prepares the handle for use. It will always be called before any of the As...() functions.
	Migrate(ctx context.Context) error

	// MigrationStatus returns the handle's current and desired migration version (if applicable).
	MigrationStatus(ctx context.Context) (current int, desired int, err error)

	// Close closes the handle.
	Close() error

	// AsRegistry returns a RegistryStore if the handle can serve as such, otherwise returns false.
	// A registry is responsible for tracking runtime metadata, namely instances and their configuration.
	AsRegistry() (RegistryStore, bool)

	// AsCatalogStore returns a CatalogStore if the handle can serve as such, otherwise returns false.
	// A catalog stores the state of an instance's resources (such as sources, models, metrics views, alerts, etc.)
	AsCatalogStore(instanceID string) (CatalogStore, bool)

	// AsRepoStore returns a RepoStore if the handle can serve as such, otherwise returns false.
	// A repo stores an instance's file artifacts (mostly YAML and SQL files).
	AsRepoStore(instanceID string) (RepoStore, bool)

	// AsAdmin returns an AdminService if the handle can serve as such, otherwise returns false.
	// An admin service enables the runtime to interact with the control plane that deployed it.
	AsAdmin(instanceID string) (AdminService, bool)

	// AsAI returns an AIService if the driver can serve as such, otherwise returns false.
	// An AI service enables an instance to request prompt-based text inference.
	AsAI(instanceID string) (AIService, bool)

	// AsOLAP returns an OLAPStore if the driver can serve as such, otherwise returns false.
	// An OLAP store is used to serve interactive, low-latency, analytical queries.
	// NOTE: We should consider merging the OLAPStore and SQLStore interfaces.
	AsOLAP(instanceID string) (OLAPStore, bool)

	// AsObjectStore returns an ObjectStore if the driver can serve as such, otherwise returns false.
	// An object store can store, list and retrieve files on a remote server.
	AsObjectStore() (ObjectStore, bool)

	// AsFileStore returns a FileStore if the driver can serve as such, otherwise returns false.
	// A file store can store, list and retrieve local files.
	// NOTE: The file store can probably be merged with the ObjectStore interface.
	AsFileStore() (FileStore, bool)

	// AsWarehouse returns a Warehouse if the driver can serve as such, otherwise returns false.
	// A Warehouse represents a service that can execute SQL statements on cloud warehouses and return the result rows typically as files.
	AsWarehouse() (Warehouse, bool)

	// AsModelExecutor returns a ModelExecutor capable of building a model.
	// Since models may move data between connectors, the model executor can be seem as a "meta driver" that uses handles on other connectors.
	// The provided options provides both an input connector and an output connector. One or both of these will be the receiver itself.
	// It should return false if the handle is not capable of executing a model between the provided input and output connectors.
	AsModelExecutor(instanceID string, opts *ModelExecutorOptions) (ModelExecutor, bool)

	// AsModelManager returns a ModelManager for managing model results produced by a ModelExecutor.
	// This is different from the ModelExecutor since sometimes, the model's input connector executes and writes the model result to the output connector.
	// But managing the result lifecycle is easier to do directly using the output connector.
	AsModelManager(instanceID string) (ModelManager, bool)

	// AsTransporter returns a Transporter for moving data between two other handles. One of the input handles may be the Handle itself.
	// Examples: duckdb.AsTransporter(gcs, duckdb), beam.AsTransporter(gcs, s3).
	AsTransporter(from Handle, to Handle) (Transporter, bool)

	// AsNotifier returns a Notifier (if the driver can serve as such) to send notifications: alerts, reports, etc.
	// Examples: email notifier, slack notifier.
	AsNotifier(properties map[string]any) (Notifier, error)
}

Handle represents a connection to an external service, such as a database, object store, etc. It should implement one or more of the As...() functions.

func Open

func Open(driver, instanceID string, config map[string]any, st *storage.Client, ac *activity.Client, logger *zap.Logger) (Handle, error)

Open opens a new connection. If instanceID is empty, the connection is considered shared and its As...() functions may be invoked with different instance IDs. If instanceID is not empty, the connection is considered instance-specific and its As...() functions will only be invoked with the same instance ID.

type IncrementalStrategy added in v0.45.0

type IncrementalStrategy string

IncrementalStrategy is a strategy to use for incrementally inserting data into a SQL table.

const (
	IncrementalStrategyUnspecified        IncrementalStrategy = ""
	IncrementalStrategyAppend             IncrementalStrategy = "append"
	IncrementalStrategyMerge              IncrementalStrategy = "merge"
	IncrementalStrategyPartitionOverwrite IncrementalStrategy = "partition_overwrite"
)

type InformationSchema

type InformationSchema interface {
	All(ctx context.Context, like string) ([]*Table, error)
	Lookup(ctx context.Context, db, schema, name string) (*Table, error)
}

InformationSchema contains information about existing tables in an OLAP driver. Table lookups should be case insensitive.

type IngestionSummary added in v0.24.0

type IngestionSummary struct {
	BytesIngested int64
}

IngestionSummary is details about ingestion

type InsertTableOptions added in v0.53.0

type InsertTableOptions struct {
	BeforeInsert string
	AfterInsert  string
	ByName       bool
	InPlace      bool
	Strategy     IncrementalStrategy
	UniqueKey    []string
}

type Instance

type Instance struct {
	// Identifier
	ID string
	// Environment is the environment that the instance represents
	Environment string
	// Driver name to connect to for OLAP
	OLAPConnector string
	// ProjectOLAPConnector is an override of OLAPConnector that may be set in rill.yaml.
	// NOTE: Hopefully we can merge this with OLAPConnector if/when we remove the ability to set OLAPConnector using flags.
	ProjectOLAPConnector string
	// Driver name for reading/editing code artifacts
	RepoConnector string
	// Driver name for the admin service managing the deployment (optional)
	AdminConnector string
	// Driver name for the AI service (optional)
	AIConnector string
	// Driver name for catalog
	CatalogConnector string
	// CreatedOn is when the instance was created
	CreatedOn time.Time `db:"created_on"`
	// UpdatedOn is when the instance was last updated in the registry
	UpdatedOn time.Time `db:"updated_on"`
	// Instance specific connectors
	Connectors []*runtimev1.Connector `db:"connectors"`
	// ProjectConnectors contains default connectors from rill.yaml
	ProjectConnectors []*runtimev1.Connector `db:"project_connectors"`
	// Variables contains user-provided variables
	Variables map[string]string `db:"variables"`
	// ProjectVariables contains default variables from rill.yaml
	// (NOTE: This can always be reproduced from rill.yaml, so it's really just a handy cache of the values.)
	ProjectVariables map[string]string `db:"project_variables"`
	// FeatureFlags contains feature flags configured in rill.yaml
	FeatureFlags map[string]bool `db:"feature_flags"`
	// Annotations to enrich activity events (like usage tracking)
	Annotations map[string]string
	// EmbedCatalog tells the runtime to store the instance's catalog in its OLAP store instead
	// of in the runtime's metadata store. Currently only supported for the duckdb driver.
	EmbedCatalog bool `db:"embed_catalog"`
	// WatchRepo indicates whether to watch the repo for file changes and reconcile them automatically.
	WatchRepo bool `db:"watch_repo"`
	// Paths to expose over HTTP (defaults to ./public)
	PublicPaths []string `db:"public_paths"`
	// IgnoreInitialInvalidProjectError indicates whether to ignore an invalid project error when the instance is initially created.
	IgnoreInitialInvalidProjectError bool `db:"-"`
}

Instance represents a single data project, meaning one OLAP connection, one repo connection, and one catalog connection.

func (*Instance) Config added in v0.43.0

func (i *Instance) Config() (InstanceConfig, error)

Config resolves the current dynamic config properties for the instance. See InstanceConfig for details.

func (*Instance) ResolveOLAPConnector added in v0.42.0

func (i *Instance) ResolveOLAPConnector() string

ResolveOLAPConnector resolves the OLAP connector to default to for the instance.

func (*Instance) ResolveVariables added in v0.23.0

func (i *Instance) ResolveVariables(withLowerKeys bool) map[string]string

ResolveVariables returns the final resolved variables

type InstanceConfig added in v0.43.0

type InstanceConfig struct {
	// DownloadLimitBytes is the limit on size of exported file. If set to 0, there is no limit.
	DownloadLimitBytes int64 `mapstructure:"rill.download_limit_bytes"`
	// InteractiveSQLRowLimit is the row limit for interactive SQL queries. It does not apply to exports of SQL queries. If set to 0, there is no limit.
	InteractiveSQLRowLimit int64 `mapstructure:"rill.interactive_sql_row_limit"`
	// StageChanges indicates whether to keep previously ingested tables for sources/models, and only override them if ingestion of a new table is successful.
	StageChanges bool `mapstructure:"rill.stage_changes"`
	// ModelDefaultMaterialize indicates whether to materialize models by default.
	ModelDefaultMaterialize bool `mapstructure:"rill.models.default_materialize"`
	// ModelMaterializeDelaySeconds adds a delay before materializing models.
	ModelMaterializeDelaySeconds uint32 `mapstructure:"rill.models.materialize_delay_seconds"`
	// MetricsComparisonsExact indicates whether to rewrite metrics comparison queries to approximately correct queries.
	// Approximated comparison queries are faster but may not return comparison data points for all values.
	MetricsApproximateComparisons bool `mapstructure:"rill.metrics.approximate_comparisons"`
	// MetricsApproximateComparisonsCTE indicates whether to rewrite metrics comparison queries to use a CTE for base query.
	MetricsApproximateComparisonsCTE bool `mapstructure:"rill.metrics.approximate_comparisons_cte"`
	// MetricsApproxComparisonTwoPhaseLimit if query limit is less than this then rewrite metrics comparison queries to use a two-phase comparison approach where first query is used to get the base values and the second query is used to get the comparison values.
	MetricsApproxComparisonTwoPhaseLimit int64 `mapstructure:"rill.metrics.approximate_comparisons_two_phase_limit"`
	// MetricsExactifyDruidTopN indicates whether to split Druid TopN queries into two queries to increase the accuracy of the returned measures.
	// Enabling it reduces the performance of Druid toplist queries.
	// See runtime/metricsview/executor_rewrite_druid_exactify.go for more details.
	MetricsExactifyDruidTopN bool `mapstructure:"rill.metrics.exactify_druid_topn"`
	// AlertStreamingRefDefaultRefreshCron sets a default cron expression for refreshing alerts with streaming refs.
	// Namely, this is used to check alerts against external tables (e.g. in Druid) where new data may be added at any time (i.e. is considered "streaming").
	AlertsDefaultStreamingRefreshCron string `mapstructure:"rill.alerts.default_streaming_refresh_cron"`
}

InstanceConfig contains dynamic configuration for an instance. It is configured by parsing instance variables prefixed with "rill.". For example, a variable "rill.stage_changes=true" would set the StageChanges field to true. InstanceConfig should only be used for config that the user is allowed to change dynamically at runtime.

type InstanceHealth added in v0.51.0

type InstanceHealth struct {
	InstanceID string    `db:"instance_id"`
	HealthJSON []byte    `db:"health_json"`
	UpdatedOn  time.Time `db:"updated_on"`
}

InstanceHealth represents the health of an instance.

type ModelEnv added in v0.45.0

type ModelEnv struct {
	AllowHostAccess    bool
	RepoRoot           string
	StageChanges       bool
	DefaultMaterialize bool
	AcquireConnector   func(ctx context.Context, name string) (Handle, func(), error)
}

ModelEnv contains contextual info about the model's instance.

type ModelExecuteOptions added in v0.48.0

type ModelExecuteOptions struct {
	*ModelExecutorOptions
	// InputProperties are the resolved properties of the model's input connector.
	InputProperties map[string]any
	// OutputProperties are the resolved properties of the model's output connector.
	OutputProperties map[string]any
	// Priority is the priority of the model execution.
	Priority int
	// Incremental is true if the model is an incremental model.
	Incremental bool
	// IncrementalRun is true if the execution is an incremental run.
	IncrementalRun bool
	// PartitionRun is true if the execution is a partition run.
	PartitionRun bool
	// PreviousResult is the result of a previous execution.
	// For concurrent partition execution, it may not be the most recent previous result.
	PreviousResult *ModelResult
	// TempDir is a temporary directory for storing intermediate data.
	TempDir string
}

ModelExecuteOptions are options passed to a model executor's Execute function. They embed the ModelExecutorOptions that were used to initialize the ModelExecutor, plus additional options for the current execution step.

type ModelExecutor added in v0.45.0

type ModelExecutor interface {
	// Execute runs the model. The execution may be a full, incremental, or partition run.
	// For partition runs, Execute may be called concurrently by multiple workers.
	Execute(ctx context.Context, opts *ModelExecuteOptions) (*ModelResult, error)

	// Concurrency returns the number of concurrent calls that may be made to Execute given a user-provided desired concurrency.
	// If the desired concurrency is 0, it should return the recommended default concurrency.
	// If the desired concurrency is too high, it should return false.
	Concurrency(desired int) (int, bool)
}

ModelExecutor executes models. A ModelExecutor may either be the a model's input or output connector.

type ModelExecutorOptions added in v0.45.0

type ModelExecutorOptions struct {
	// Env contains contextual info about the model's instance.
	Env *ModelEnv
	// ModelName is the name of the model.
	ModelName string
	// InputHandle is the handle of the model's input connector.
	InputHandle Handle
	// InputConnector is the name of the model's input connector.
	InputConnector string
	// PreliminaryInputProperties are the preliminary properties of the model's input connector.
	// It may not always be available and may contain templating variables that have not yet been resolved.
	PreliminaryInputProperties map[string]any
	// OutputHandle is the handle of the model's output connector.
	OutputHandle Handle
	// OutputConnector is the name of the model's output connector.
	OutputConnector string
	// PreliminaryOutputProperties are the preliminary properties of the model's output connector.
	// It may not always be available and may contain templating variables that have not yet been resolved.
	PreliminaryOutputProperties map[string]any
}

ModelExecutorOptions are options passed when acquiring a ModelExecutor.

type ModelManager added in v0.45.0

type ModelManager interface {
	// Rename is called when a model is renamed, giving the ModelManager a chance to update state derived from the model's name (such as a table name).
	Rename(ctx context.Context, res *ModelResult, newName string, env *ModelEnv) (*ModelResult, error)

	// Exists returns whether the result still exists in the connector (for integrity checks).
	Exists(ctx context.Context, res *ModelResult) (bool, error)

	// Delete removes the result from the connector.
	Delete(ctx context.Context, res *ModelResult) error

	// MergePartitionResults merges two results produced by concurrent incremental partition runs.
	MergePartitionResults(a, b *ModelResult) (*ModelResult, error)
}

ModelManager manages model results returned by ModelExecutor. Unlike ModelExecutor, the result connector will always be used as the ModelManager.

type ModelPartition added in v0.51.0

type ModelPartition struct {
	// Key is a unique identifier for the partition. It should be a hash of DataJSON.
	Key string
	// DataJSON is the serialized parameters of the partition.
	DataJSON []byte
	// Index is used to order the execution of partitions.
	// Since it's just a guide and execution order usually is not critical,
	// it's okay if it's not unique or not always correct (e.g. for incrementally computed partitions).
	Index int
	// Watermark represents the time when the underlying data that the partition references was last updated.
	// If a partition's watermark advances, we automatically schedule it for re-execution.
	Watermark *time.Time
	// ExecutedOn is the time when the partition was last executed. If it is nil, the partition is considered pending.
	ExecutedOn *time.Time
	// Error is the last error that occurred when executing the partition.
	Error string
	// Elapsed is the duration of the last execution of the partition.
	Elapsed time.Duration
}

ModelPartition represents a single executable unit of a model. Partitions are an advanced feature that enables splitting and parallelizing execution of a model.

type ModelResult added in v0.45.0

type ModelResult struct {
	Connector  string
	Properties map[string]any
	Table      string
}

ModelResult contains metadata about the result of a model execution.

type Notifier added in v0.43.0

type Notifier interface {
	SendAlertStatus(s *AlertStatus) error
	SendScheduledReport(s *ScheduledReport) error
}

Notifier sends notifications.

type OLAPStore

type OLAPStore interface {
	Dialect() Dialect
	WithConnection(ctx context.Context, priority int, longRunning bool, fn WithConnectionFunc) error
	Exec(ctx context.Context, stmt *Statement) error
	Execute(ctx context.Context, stmt *Statement) (*Result, error)
	InformationSchema() InformationSchema

	CreateTableAsSelect(ctx context.Context, name, sql string, opts *CreateTableOptions) error
	InsertTableAsSelect(ctx context.Context, name, sql string, opts *InsertTableOptions) error
	DropTable(ctx context.Context, name string) error
	RenameTable(ctx context.Context, name, newName string) error
	AddTableColumn(ctx context.Context, tableName, columnName string, typ string) error
	AlterTableColumn(ctx context.Context, tableName, columnName string, newType string) error

	MayBeScaledToZero(ctx context.Context) bool
}

OLAPStore is implemented by drivers that are capable of storing, transforming and serving analytical queries. NOTE crud APIs are not safe to be called with `WithConnection`

type ObjectStore added in v0.30.0

type ObjectStore interface {
	// ListObjects returns the paths that match the given properties.
	// It resolves globs with support for all patterns supported by the doublestar package (notably "**").
	ListObjects(ctx context.Context, props map[string]any) ([]ObjectStoreEntry, error)
	// DownloadFiles provides an iterator for downloading and consuming files
	DownloadFiles(ctx context.Context, props map[string]any) (FileIterator, error)
}

type ObjectStoreEntry added in v0.48.0

type ObjectStoreEntry struct {
	Path      string
	IsDir     bool
	UpdatedOn time.Time
}

ObjectStoreEntry represents a file listing in an object store.

type ObjectStoreModelOutputProperties added in v0.48.0

type ObjectStoreModelOutputProperties struct {
	Path   string     `mapstructure:"path"`
	Format FileFormat `mapstructure:"format"`
}

type ObjectStoreModelResultProperties added in v0.48.0

type ObjectStoreModelResultProperties struct {
	Path   string `mapstructure:"path"`
	Format string `mapstructure:"format"`
}

type ObjectType added in v0.16.0

type ObjectType int

Constants representing the kinds of catalog objects.

const (
	ObjectTypeUnspecified ObjectType = 0
	ObjectTypeTable       ObjectType = 1
	ObjectTypeSource      ObjectType = 2
	ObjectTypeModel       ObjectType = 3
	ObjectTypeMetricsView ObjectType = 4
)

type PermissionDeniedError added in v0.30.0

type PermissionDeniedError struct {
	// contains filtered or unexported fields
}

PermissionDeniedError is returned when a driver cannot access some data due to insufficient permissions.

func (*PermissionDeniedError) Error added in v0.30.0

func (e *PermissionDeniedError) Error() string

type ProgressUnit added in v0.30.0

type ProgressUnit int
const (
	ProgressUnitByte ProgressUnit = iota
	ProgressUnitFile
	ProgressUnitRecord
)

type PropertySpec added in v0.43.0

type PropertySpec struct {
	Key         string
	Type        PropertyType
	Required    bool
	DisplayName string
	Description string
	DocsURL     string
	Hint        string
	Default     string
	Placeholder string
	Secret      bool
	NoPrompt    bool
}

PropertySpec provides metadata about a single connector property.

type PropertyType added in v0.43.0

type PropertyType int

PropertyType is an enum of types supported for connector properties.

const (
	UnspecifiedPropertyType PropertyType = iota
	NumberPropertyType
	BooleanPropertyType
	StringPropertyType
	FilePropertyType
	InformationalPropertyType
)

type RegistryStore

type RegistryStore interface {
	FindInstances(ctx context.Context) ([]*Instance, error)
	FindInstance(ctx context.Context, id string) (*Instance, error)
	CreateInstance(ctx context.Context, instance *Instance) error
	DeleteInstance(ctx context.Context, id string) error
	EditInstance(ctx context.Context, instance *Instance) error
}

RegistryStore is implemented by drivers capable of storing and looking up instances and repos.

type RepoObjectStat added in v0.15.0

type RepoObjectStat struct {
	LastUpdated time.Time
	IsDir       bool
}

type RepoStore

type RepoStore interface {
	Driver() string
	// Root returns directory where artifacts are stored.
	Root(ctx context.Context) (string, error)
	CommitHash(ctx context.Context) (string, error)
	ListRecursive(ctx context.Context, glob string, skipDirs bool) ([]DirEntry, error)
	Get(ctx context.Context, path string) (string, error)
	FileHash(ctx context.Context, paths []string) (string, error)
	Stat(ctx context.Context, path string) (*RepoObjectStat, error)
	Put(ctx context.Context, path string, reader io.Reader) error
	MakeDir(ctx context.Context, path string) error
	Rename(ctx context.Context, fromPath string, toPath string) error
	Delete(ctx context.Context, path string, force bool) error
	Sync(ctx context.Context) error
	Watch(ctx context.Context, cb WatchCallback) error
}

RepoStore is implemented by drivers capable of storing code artifacts. It mirrors a file system, but may be virtualized by a database for non-local deployments.

type ReportMetadata added in v0.37.0

type ReportMetadata struct {
	BaseURLs      ReportURLs
	RecipientURLs map[string]ReportURLs
}

type ReportURLs added in v0.51.0

type ReportURLs struct {
	OpenURL   string
	ExportURL string
	EditURL   string
}

type Resource added in v0.33.0

type Resource struct {
	Kind string
	Name string
	Data []byte
}

Resource is an entry in a catalog store

type Result added in v0.15.0

type Result struct {
	*sqlx.Rows
	Schema *runtimev1.StructType
	// contains filtered or unexported fields
}

Result wraps the results of query.

func (*Result) Close added in v0.18.0

func (r *Result) Close() error

Close wraps rows.Close and calls the Result's cleanup function (if it is set). Close should be idempotent.

func (*Result) Err added in v0.47.0

func (r *Result) Err() error

Err returns the error of the underlying rows.

func (*Result) Next added in v0.47.0

func (r *Result) Next() bool

Next wraps rows.Next and enforces the cap set by SetCap.

func (*Result) SetCap added in v0.47.0

func (r *Result) SetCap(n int64)

SetCap caps the number of rows to return. If the number is exceeded, an error is returned.

func (*Result) SetCleanupFunc added in v0.18.0

func (r *Result) SetCleanupFunc(fn func() error)

SetCleanupFunc sets a function, which will be called when the Result is closed.

type ScheduledReport added in v0.43.0

type ScheduledReport struct {
	DisplayName    string
	ReportTime     time.Time
	DownloadFormat string
	OpenLink       string
	DownloadLink   string
	EditLink       string
}

type Spec added in v0.30.0

type Spec struct {
	DisplayName           string
	Description           string
	DocsURL               string
	ConfigProperties      []*PropertySpec
	SourceProperties      []*PropertySpec
	ImplementsRegistry    bool
	ImplementsCatalog     bool
	ImplementsRepo        bool
	ImplementsAdmin       bool
	ImplementsAI          bool
	ImplementsSQLStore    bool
	ImplementsOLAP        bool
	ImplementsObjectStore bool
	ImplementsFileStore   bool
	ImplementsNotifier    bool
	ImplementsWarehouse   bool
}

Spec provides metadata about a connector and the properties it supports.

type Statement

type Statement struct {
	Query            string
	Args             []any
	DryRun           bool
	Priority         int
	LongRunning      bool
	ExecutionTimeout time.Duration
}

Statement wraps a query to execute against an OLAP driver.

type Table

type Table struct {
	Database                string
	DatabaseSchema          string
	IsDefaultDatabase       bool
	IsDefaultDatabaseSchema bool
	Name                    string
	View                    bool
	Schema                  *runtimev1.StructType
	UnsupportedCols         map[string]string
}

Table represents a table in an information schema.

type TransferOptions added in v0.34.1

type TransferOptions struct {
	AllowHostAccess  bool
	RepoRoot         string
	AcquireConnector func(string) (Handle, func(), error)
}

TransferOptions provide execution context for Transporter.Transfer

type Transporter added in v0.30.0

type Transporter interface {
	Transfer(ctx context.Context, srcProps, sinkProps map[string]any, opts *TransferOptions) error
}

Transporter implements logic for moving data between two connectors (the actual connector objects are provided in AsTransporter)

type Warehouse added in v0.48.0

type Warehouse interface {
	// QueryAsFiles downloads results into files and returns an iterator to iterate over them
	QueryAsFiles(ctx context.Context, props map[string]any) (FileIterator, error)
}

type WatchCallback added in v0.30.0

type WatchCallback func(event []WatchEvent)

type WatchEvent added in v0.30.0

type WatchEvent struct {
	Type runtimev1.FileEvent
	Path string
	Dir  bool
}

type WithConnectionFunc added in v0.18.0

type WithConnectionFunc func(wrappedCtx context.Context, ensuredCtx context.Context, conn *sql.Conn) error

WithConnectionFunc is a callback function that provides a context to be used in further OLAP store calls to enforce affinity to a single connection. It also provides pointers to the actual database/sql and database/sql/driver connections. It's called with two contexts: wrappedCtx wraps the input context (including cancellation), and ensuredCtx wraps a background context (ensuring it can never be cancelled).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL