drivers

package
v0.42.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

README

Each driver in the package can implement one or more of following interfaces:

Following interfaces are system interfaces and is present for all instances. Depending on cloud/local these can be shared with all instances or private to an instance as well. Global connectors are passed as runtime configs. Instance specific connectors are set in instance while creating instance.

  • OLAPStore for storing data and running analytical queries
  • CatalogStore for storing sources, models and metrics views, including metadata like last refresh time
  • RepoStore for storing code artifacts (this is essentially a file system shim)
  • RegistryStore for tracking instances (DSNs for OLAPs and repos, instance variables etc)

Following interfaces are only available as source connectors. These are always instance specific connectors.

  • ObjectStore for downloading files from remote object stores like s3,gcs etc
  • SQLStore for runnning arbitrary SQL queries against DataWarehouses like bigquery. Caution: Not to be confused with postgres, duckdb etc.
  • FileStore stores path for local files.

Special interfaces. Also instance specific.

  • Transporter for transfering data from one infra to other.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Connectors = make(map[string]Driver)

Connectors tracks all registered connector drivers.

View Source
var Drivers = make(map[string]Driver)

Drivers is a registry of drivers.

View Source
var ErrDropNotSupported = errors.New("driver: drop not supported")

ErrDropNotSupported indicates the driver doesn't support dropping its state.

View Source
var ErrFileAlreadyExists = errors.New("file already exists")
View Source
var ErrInconsistentControllerVersion = errors.New("controller: inconsistent version")

ErrInconsistentControllerVersion is returned from Controller when an unexpected controller version is observed in the DB. An unexpected controller version will only be observed if multiple controllers are running simultanesouly (split brain).

View Source
var ErrIteratorDone = errors.New("empty iterator")
View Source
var ErrNotFound = errors.New("driver: not found")

ErrNotFound indicates the resource wasn't found.

View Source
var ErrNotImplemented = errors.New("driver: not implemented")

ErrNotImplemented indicates the driver doesn't support the requested operation.

View Source
var ErrResourceAlreadyExists = errors.New("controller: resource already exists")

ErrResourceAlreadyExists is returned from catalog functions when attempting to create a resource that already exists.

View Source
var ErrResourceNotFound = errors.New("controller: resource not found")

ErrResourceNotFound is returned from catalog functions when a referenced resource does not exist.

View Source
var ErrStorageLimitExceeded = fmt.Errorf("connectors: exceeds storage limit")

ErrStorageLimitExceeded indicates the driver's storage limit was exceeded.

View Source
var ErrUnsupportedConnector = errors.New("drivers: connector not supported")

ErrUnsupportedConnector is returned from Ingest for unsupported connectors.

Functions

func Drop added in v0.27.0

func Drop(driver string, config map[string]any, logger *zap.Logger) error

Drop tears down a store. Drivers that do not support it return ErrDropNotSupported.

func NewPermissionDeniedError added in v0.30.0

func NewPermissionDeniedError(msg string) error

func RecordDownloadMetrics added in v0.30.0

func RecordDownloadMetrics(ctx context.Context, m *DownloadMetrics)

func Register

func Register(name string, driver Driver)

Register registers a new driver.

func RegisterAsConnector added in v0.30.0

func RegisterAsConnector(name string, driver Driver)

RegisterAsConnector tracks a connector driver.

Types

type AIService added in v0.41.0

type AIService interface {
	Complete(ctx context.Context, msgs []*CompletionMessage) (*CompletionMessage, error)
}

type AdminService added in v0.37.0

type AdminService interface {
	GetReportMetadata(ctx context.Context, reportName string, annotations map[string]string, executionTime time.Time) (*ReportMetadata, error)
	GetAlertMetadata(ctx context.Context, alertName string, annotations map[string]string, queryForUserID, queryForUserEmail string) (*AlertMetadata, error)
}

type AlertMetadata added in v0.41.0

type AlertMetadata struct {
	OpenURL            string
	EditURL            string
	QueryForAttributes map[string]any
}

type CatalogStore

type CatalogStore interface {
	NextControllerVersion(ctx context.Context) (int64, error)
	CheckControllerVersion(ctx context.Context, v int64) error
	FindResources(ctx context.Context) ([]Resource, error)
	CreateResource(ctx context.Context, v int64, r Resource) error
	UpdateResource(ctx context.Context, v int64, r Resource) error
	DeleteResource(ctx context.Context, v int64, k, n string) error
	DeleteResources(ctx context.Context) error
}

CatalogStore is implemented by drivers capable of storing catalog info for a specific instance. Implementations should treat resource kinds as case sensitive, but resource names as case insensitive.

type CompletionMessage added in v0.41.0

type CompletionMessage struct {
	Role string
	Data string
}

type Dialect added in v0.16.0

type Dialect int

Dialect enumerates OLAP query languages.

const (
	DialectUnspecified Dialect = iota
	DialectDuckDB
	DialectDruid
	DialectClickHouse
)

func (Dialect) EscapeIdentifier added in v0.42.0

func (d Dialect) EscapeIdentifier(ident string) string

EscapeIdentifier returns an escaped SQL identifier in the dialect.

func (Dialect) String added in v0.16.0

func (d Dialect) String() string

type DownloadMetrics added in v0.30.0

type DownloadMetrics struct {
	Connector string
	Ext       string
	Partial   bool
	Duration  time.Duration
	Size      int64
}

type Driver

type Driver interface {
	// Spec returns metadata about the driver, such as which configuration properties it supports.
	Spec() Spec

	// Open opens a new handle.
	Open(config map[string]any, shared bool, client *activity.Client, logger *zap.Logger) (Handle, error)

	// Drop removes all state in a handle. Drivers that do not support it should return ErrDropNotSupported.
	Drop(config map[string]any, logger *zap.Logger) error

	// HasAnonymousSourceAccess returns true if the driver can access the data identified by srcProps without any additional configuration.
	HasAnonymousSourceAccess(ctx context.Context, srcProps map[string]any, logger *zap.Logger) (bool, error)

	// TertiarySourceConnectors returns a list of drivers required to access the data identified by srcProps, excluding the driver itself.
	TertiarySourceConnectors(ctx context.Context, srcProps map[string]any, logger *zap.Logger) ([]string, error)
}

Driver represents an external service that Rill can connect to.

type FileIterator added in v0.30.0

type FileIterator interface {
	// Close do cleanup and release resources
	Close() error
	// Next returns a list of file downloaded from external sources
	// and cleanups file created in previous batch
	Next() ([]string, error)
	// Size returns size of data downloaded in unit.
	// Returns 0,false if not able to compute size in given unit
	Size(unit ProgressUnit) (int64, bool)
	// Format returns general file format (json, csv, parquet, etc)
	// Returns an empty string if there is no general format
	Format() string
}

FileIterator provides ways to iteratively download files from external sources Clients should call close once they are done with iterator to release any resources

type FileStore added in v0.30.0

type FileStore interface {
	// FilePaths returns local absolute paths where files are stored
	FilePaths(ctx context.Context, src map[string]any) ([]string, error)
}

type Handle added in v0.32.0

type Handle interface {
	// Driver name used to open the handle.
	Driver() string

	// Config used to open the handle.
	Config() map[string]any

	// Migrate prepares the handle for use. It will always be called before any of the As...() functions.
	Migrate(ctx context.Context) error

	// MigrationStatus returns the handle's current and desired migration version (if applicable).
	MigrationStatus(ctx context.Context) (current int, desired int, err error)

	// Close closes the handle.
	Close() error

	// AsRegistry returns a RegistryStore if the handle can serve as such, otherwise returns false.
	// A registry is responsible for tracking runtime metadata, namely instances and their configuration.
	AsRegistry() (RegistryStore, bool)

	// AsCatalogStore returns a CatalogStore if the handle can serve as such, otherwise returns false.
	// A catalog stores the state of an instance's resources (such as sources, models, metrics views, alerts, etc.)
	AsCatalogStore(instanceID string) (CatalogStore, bool)

	// AsRepoStore returns a RepoStore if the handle can serve as such, otherwise returns false.
	// A repo stores an instance's file artifacts (mostly YAML and SQL files).
	AsRepoStore(instanceID string) (RepoStore, bool)

	// AsAdmin returns an AdminService if the handle can serve as such, otherwise returns false.
	// An admin service enables the runtime to interact with the control plane that deployed it.
	AsAdmin(instanceID string) (AdminService, bool)

	// AsAI returns an AIService if the driver can serve as such, otherwise returns false.
	// An AI service enables an instance to request prompt-based text inference.
	AsAI(instanceID string) (AIService, bool)

	// AsSQLStore returns a SQLStore if the driver can serve as such, otherwise returns false.
	// A SQL store represents a service that can execute SQL statements and return the resulting rows.
	AsSQLStore() (SQLStore, bool)

	// AsOLAP returns an OLAPStore if the driver can serve as such, otherwise returns false.
	// An OLAP store is used to serve interactive, low-latency, analytical queries.
	// NOTE: We should consider merging the OLAPStore and SQLStore interfaces.
	AsOLAP(instanceID string) (OLAPStore, bool)

	// AsObjectStore returns an ObjectStore if the driver can serve as such, otherwise returns false.
	// An object store can store, list and retrieve files on a remote server.
	AsObjectStore() (ObjectStore, bool)

	// AsFileStore returns a FileStore if the driver can serve as such, otherwise returns false.
	// A file store can store, list and retrieve local files.
	// NOTE: The file store can probably be merged with the ObjectStore interface.
	AsFileStore() (FileStore, bool)

	// AsTransporter returns a Transporter for moving data between two other handles. One of the input handles may be the Handle itself.
	// Examples: duckdb.AsTransporter(gcs, duckdb), beam.AsTransporter(gcs, s3).
	AsTransporter(from Handle, to Handle) (Transporter, bool)
}

Handle represents a connection to an external service, such as a database, object store, etc. It should implement one or more of the As...() functions.

func Open

func Open(driver string, config map[string]any, shared bool, client *activity.Client, logger *zap.Logger) (Handle, error)

Open opens a new connection

type InformationSchema

type InformationSchema interface {
	All(ctx context.Context) ([]*Table, error)
	Lookup(ctx context.Context, name string) (*Table, error)
}

InformationSchema contains information about existing tables in an OLAP driver. Table lookups should be case insensitive.

type IngestionSummary added in v0.24.0

type IngestionSummary struct {
	BytesIngested int64
}

IngestionSummary is details about ingestion

type Instance

type Instance struct {
	// Identifier
	ID string
	// Environment is the environment that the instance represents
	Environment string
	// Driver name to connect to for OLAP
	OLAPConnector string
	// ProjectOLAPConnector is an override of OLAPConnector that may be set in rill.yaml.
	// NOTE: Hopefully we can merge this with OLAPConnector if/when we remove the ability to set OLAPConnector using flags.
	ProjectOLAPConnector string
	// Driver name for reading/editing code artifacts
	RepoConnector string
	// Driver name for the admin service managing the deployment (optional)
	AdminConnector string
	// Driver name for the AI service (optional)
	AIConnector string
	// Driver name for catalog
	CatalogConnector string
	// CreatedOn is when the instance was created
	CreatedOn time.Time `db:"created_on"`
	// UpdatedOn is when the instance was last updated in the registry
	UpdatedOn time.Time `db:"updated_on"`
	// Instance specific connectors
	Connectors []*runtimev1.Connector `db:"connectors"`
	// ProjectVariables contains default connectors from rill.yaml
	ProjectConnectors []*runtimev1.Connector `db:"project_connectors"`
	// Variables contains user-provided variables
	Variables map[string]string `db:"variables"`
	// ProjectVariables contains default variables from rill.yaml
	// (NOTE: This can always be reproduced from rill.yaml, so it's really just a handy cache of the values.)
	ProjectVariables map[string]string `db:"project_variables"`
	// Annotations to enrich activity events (like usage tracking)
	Annotations map[string]string
	// EmbedCatalog tells the runtime to store the instance's catalog in its OLAP store instead
	// of in the runtime's metadata store. Currently only supported for the duckdb driver.
	EmbedCatalog bool `db:"embed_catalog"`
	// WatchRepo indicates whether to watch the repo for file changes and reconcile them automatically.
	WatchRepo bool `db:"watch_repo"`
	// StageChanges indicates whether to stage source and model changes before applying them.
	StageChanges bool `db:"stage_changes"`
	// ModelDefaultMaterialize indicates whether to materialize models by default.
	ModelDefaultMaterialize bool `db:"model_default_materialize"`
	// ModelMaterializeDelaySeconds adds a delay before materializing models.
	ModelMaterializeDelaySeconds uint32 `db:"model_materialize_delay_seconds"`
	// IgnoreInitialInvalidProjectError indicates whether to ignore an invalid project error when the instance is initially created.
	IgnoreInitialInvalidProjectError bool `db:"-"`
}

Instance represents a single data project, meaning one OLAP connection, one repo connection, and one catalog connection.

func (*Instance) ResolveOLAPConnector added in v0.42.0

func (i *Instance) ResolveOLAPConnector() string

ResolveOLAPConnector resolves the OLAP connector to default to for the instance.

func (*Instance) ResolveVariables added in v0.23.0

func (i *Instance) ResolveVariables() map[string]string

ResolveVariables returns the final resolved variables

type NoOpProgress added in v0.30.0

type NoOpProgress struct{}

func (NoOpProgress) Observe added in v0.30.0

func (n NoOpProgress) Observe(val int64, unit ProgressUnit)

func (NoOpProgress) Target added in v0.30.0

func (n NoOpProgress) Target(val int64, unit ProgressUnit)

type OLAPStore

type OLAPStore interface {
	Dialect() Dialect
	WithConnection(ctx context.Context, priority int, longRunning, tx bool, fn WithConnectionFunc) error
	Exec(ctx context.Context, stmt *Statement) error
	Execute(ctx context.Context, stmt *Statement) (*Result, error)
	InformationSchema() InformationSchema
	EstimateSize() (int64, bool)

	CreateTableAsSelect(ctx context.Context, name string, view bool, sql string) error
	InsertTableAsSelect(ctx context.Context, name string, byName bool, sql string) error
	DropTable(ctx context.Context, name string, view bool) error
	// RenameTable is force rename
	RenameTable(ctx context.Context, name, newName string, view bool) error
	AddTableColumn(ctx context.Context, tableName, columnName string, typ string) error
	AlterTableColumn(ctx context.Context, tableName, columnName string, newType string) error
}

OLAPStore is implemented by drivers that are capable of storing, transforming and serving analytical queries. NOTE crud APIs are not safe to be called with `WithConnection`

type ObjectStore added in v0.30.0

type ObjectStore interface {
	// DownloadFiles provides an iterator for downloading and consuming files
	DownloadFiles(ctx context.Context, src map[string]any) (FileIterator, error)
}

type ObjectType added in v0.16.0

type ObjectType int

Constants representing the kinds of catalog objects.

const (
	ObjectTypeUnspecified ObjectType = 0
	ObjectTypeTable       ObjectType = 1
	ObjectTypeSource      ObjectType = 2
	ObjectTypeModel       ObjectType = 3
	ObjectTypeMetricsView ObjectType = 4
)

type PermissionDeniedError added in v0.30.0

type PermissionDeniedError struct {
	// contains filtered or unexported fields
}

PermissionDeniedError is returned when a driver cannot access some data due to insufficient permissions.

func (*PermissionDeniedError) Error added in v0.30.0

func (e *PermissionDeniedError) Error() string

type Progress added in v0.30.0

type Progress interface {
	Target(val int64, unit ProgressUnit)
	// Observe is used by caller to provide incremental updates
	Observe(val int64, unit ProgressUnit)
}

Progress is an interface for communicating progress info

type ProgressUnit added in v0.30.0

type ProgressUnit int
const (
	ProgressUnitByte ProgressUnit = iota
	ProgressUnitFile
	ProgressUnitRecord
)

type PropertySchema added in v0.30.0

type PropertySchema struct {
	Key         string
	Type        PropertySchemaType
	Required    bool
	DisplayName string
	Description string
	Placeholder string
	// Default can be different from placeholder in the sense that placeholder should not be used as default value.
	// If a default is set then it should also be used as a placeholder.
	Default       string
	Hint          string
	Href          string
	Secret        bool
	ValidateFunc  func(any interface{}) error
	TransformFunc func(any interface{}) interface{}
}

PropertySchema provides the schema for a property supported by a connector.

func (PropertySchema) ValidateType added in v0.30.0

func (ps PropertySchema) ValidateType(val any) bool

ValidateType checks that val has the correct type.

type PropertySchemaType added in v0.30.0

type PropertySchemaType int

PropertySchemaType is an enum of types supported for connector properties.

const (
	UnspecifiedPropertyType PropertySchemaType = iota
	StringPropertyType
	NumberPropertyType
	BooleanPropertyType
	InformationalPropertyType
)

type QueryOption added in v0.33.2

type QueryOption struct {
	// TotalLimitInBytes rerpresent the max limit on the bytes that should be downloaded in a file
	TotalLimitInBytes int64
}

type RegistryStore

type RegistryStore interface {
	FindInstances(ctx context.Context) ([]*Instance, error)
	FindInstance(ctx context.Context, id string) (*Instance, error)
	CreateInstance(ctx context.Context, instance *Instance) error
	DeleteInstance(ctx context.Context, id string) error
	EditInstance(ctx context.Context, instance *Instance) error
}

RegistryStore is implemented by drivers capable of storing and looking up instances and repos.

type RepoObjectStat added in v0.15.0

type RepoObjectStat struct {
	LastUpdated time.Time
}

type RepoStore

type RepoStore interface {
	Driver() string
	// Root returns directory where artifacts are stored.
	Root() string
	CommitHash(ctx context.Context) (string, error)
	ListRecursive(ctx context.Context, glob string) ([]string, error)
	Get(ctx context.Context, path string) (string, error)
	Stat(ctx context.Context, path string) (*RepoObjectStat, error)
	Put(ctx context.Context, path string, reader io.Reader) error
	Rename(ctx context.Context, fromPath string, toPath string) error
	Delete(ctx context.Context, path string) error
	Sync(ctx context.Context) error
	Watch(ctx context.Context, cb WatchCallback) error
}

RepoStore is implemented by drivers capable of storing code artifacts. It mirrors a file system, but may be virtualized by a database for non-local deployments.

type ReportMetadata added in v0.37.0

type ReportMetadata struct {
	OpenURL   string
	ExportURL string
	EditURL   string
}

type Resource added in v0.33.0

type Resource struct {
	Kind string
	Name string
	Data []byte
}

Resource is an entry in a catalog store

type Result added in v0.15.0

type Result struct {
	*sqlx.Rows
	Schema *runtimev1.StructType
	// contains filtered or unexported fields
}

Result wraps the results of query.

func (*Result) Close added in v0.18.0

func (r *Result) Close() error

Close wraps rows.Close and calls the Result's cleanup function (if it is set). Close should be idempotent.

func (*Result) SetCleanupFunc added in v0.18.0

func (r *Result) SetCleanupFunc(fn func() error)

SetCleanupFunc sets a function, which will be called when the Result is closed.

type RowIterator added in v0.31.0

type RowIterator interface {
	// Schema of the underlying data
	Schema(ctx context.Context) (*runtimev1.StructType, error)
	// Next fetches next row
	Next(ctx context.Context) ([]driver.Value, error)
	// Close closes the iterator and frees resources
	Close() error
	// Size returns total size of data downloaded in unit.
	// Returns 0,false if not able to compute size in given unit
	Size(unit ProgressUnit) (uint64, bool)
}

RowIterator returns an iterator to iterate over result of a sql query

type SQLStore added in v0.31.0

type SQLStore interface {
	// Query returns driver.RowIterator to iterate over results row by row
	Query(ctx context.Context, props map[string]any) (RowIterator, error)
	// QueryAsFiles downloads results into files and returns an iterator to iterate over them
	QueryAsFiles(ctx context.Context, props map[string]any, opt *QueryOption, p Progress) (FileIterator, error)
}

SQLStore is implemented by drivers capable of running sql queries and generating an iterator to consume results. In future the results can be produced in other formats like arrow as well. May be call it DataWarehouse to differentiate from OLAP or postgres?

type Spec added in v0.30.0

type Spec struct {
	DisplayName        string
	Description        string
	ServiceAccountDocs string
	SourceProperties   []PropertySchema
	ConfigProperties   []PropertySchema
	Help               string
}

Spec provides metadata about a connector and the properties it supports.

type Statement

type Statement struct {
	Query            string
	Args             []any
	DryRun           bool
	Priority         int
	LongRunning      bool
	ExecutionTimeout time.Duration
}

Statement wraps a query to execute against an OLAP driver.

type Table

type Table struct {
	Database       string
	DatabaseSchema string
	Name           string
	View           bool
	Schema         *runtimev1.StructType
}

Table represents a table in an information schema.

type TransferOptions added in v0.34.1

type TransferOptions struct {
	AllowHostAccess  bool
	RepoRoot         string
	Progress         Progress
	AcquireConnector func(string) (Handle, func(), error)
}

TransferOptions provide execution context for Transporter.Transfer

type Transporter added in v0.30.0

type Transporter interface {
	Transfer(ctx context.Context, srcProps, sinkProps map[string]any, opts *TransferOptions) error
}

Transporter implements logic for moving data between two connectors (the actual connector objects are provided in AsTransporter)

type WatchCallback added in v0.30.0

type WatchCallback func(event []WatchEvent)

type WatchEvent added in v0.30.0

type WatchEvent struct {
	Type runtimev1.FileEvent
	Path string
	Dir  bool
}

type WithConnectionFunc added in v0.18.0

type WithConnectionFunc func(wrappedCtx context.Context, ensuredCtx context.Context, conn *sql.Conn) error

WithConnectionFunc is a callback function that provides a context to be used in further OLAP store calls to enforce affinity to a single connection. It also provides pointers to the actual database/sql and database/sql/driver connections. It's called with two contexts: wrappedCtx wraps the input context (including cancellation), and ensuredCtx wraps a background context (ensuring it can never be cancelled).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL