runtime

package
v0.33.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2023 License: Apache-2.0 Imports: 34 Imported by: 0

README

runtime

The runtime a data infrastructure proxy and orchestrator – our data plane. It connects to data infrastructure and is or will be responsible for transpiling queries, parsing code artifacts, reconciling infra state, implementing connectors, enforcing (row-based) access policies, scheduling tasks, triggering alerts, and much more.

It's designed as a modular component that can be embedded in local applications (as it is into Rill Developer) or deployed stand-alone in a cloud environment.

Code structure

The base directory contains a Runtime type that represents the lifecycle of the runtime. It ties together the sub-directories:

  • client contains a Go client library for connecting to a runtime server.
  • compilers contains logic for parsing Rill projects (incomplete, currently mostly implemented in services instead).
  • connectors contains connector implementations.
  • drivers contains interfaces and drivers for external data infrastructure that the runtime interfaces with (like DuckDB and Druid).
  • pkg contains utility libraries.
  • queries contains pre-defined analytical queries that the runtime can serve (used for profiling and dashboards).
  • server contains a server that implements the runtime's APIs.
  • services contains logic for building and reconciling Rill projects.
  • testruntime contains helper functions for initializing a test runtime with test data.

How to test and run

You can start a stand-alone runtime server using the CLI:

go run ./cli runtime start

Ping the server to verify it's running:

go run ./cli runtime ping --base-url http://localhost:9090

You can run all tests using:

go test ./runtime/...

See details in web-local/README.md for info on how to run in development together with the local frontend.

Configuration

The runtime server is configured using environment variables parsed in cli/cmd/runtime/start.go. All environment variables have reasonable defaults suitable for local development. The current defaults are:

RILL_RUNTIME_HTTP_PORT="8080"
RILL_RUNTIME_GRPC_PORT="9090"
RILL_RUNTIME_LOG_LEVEL="info"
RILL_RUNTIME_METRICS_EXPORTER="prometheus"
RILL_RUNTIME_TRACES_EXPORTER=""
RILL_RUNTIME_ALLOWED_ORIGINS="*"
RILL_RUNTIME_METASTORE_DRIVER="sqlite"
RILL_RUNTIME_METASTORE_URL=":memory:"
RILL_RUNTIME_AUTH_ENABLE="false"
RILL_RUNTIME_AUTH_ISSUER_URL=""
RILL_RUNTIME_AUTH_AUDIENCE_URL=""
RILL_RUNTIME_CONNECTION_CACHE_SIZE="100"
RILL_RUNTIME_QUERY_CACHE_SIZE="10000"
RILL_RUNTIME_ALLOW_HOST_ACCESS="false"
RILL_RUNTIME_SAFE_SOURCE_REFRESH="true"
RILL_RUNTIME_GITHUB_APP_ID=""
RILL_RUNTIME_GITHUB_APP_PRIVATE_KEY=""

Adding a new endpoint

We define our APIs using gRPC and use gRPC-Gateway to map the RPCs to a RESTful API. See proto/README.md for details.

To add a new endpoint:

  1. Describe the endpoint in proto/rill/runtime/v1/api.proto
  2. Re-generate gRPC and OpenAPI interfaces by running make proto.generate
  3. Copy the new handler signature from the RuntimeServiceServer interface in proto/gen/rill/runtime/v1/api_grpc_pb.go
  4. Paste the handler signature and implement it in a relevant file in runtime/server/

Adding a new analytical query endpoint

  1. Add a new endpoint for the query by following the steps in the section above ("Adding a new endpoint")
  2. Implement the query in runtime/queries by following the instructions in runtime/queries/README.md

Example: Creating an instance and rehydrating from code artifacts

# Start runtime
go run ./runtime/cmd/main.go

# Create instance
curl --request POST --url http://localhost:8080/v1/instances --header 'Content-Type: application/json' \
  --data '{
    "instance_id": "default",
    "olap_driver": "duckdb",
    "olap_dsn": "test.db",
    "repo_driver": "file",
    "repo_dsn": "./examples/ad_bids",
    "embed_catalog": true
}'

# Apply code artifacts
curl --request POST --url http://localhost:8080/v1/instances/default/reconcile --header 'Content-Type: application/json'

# Query data
curl --request POST --url http://localhost:8080/v1/instances/default/query --header 'Content-Type: application/json' \
  --data '{ "sql": "select * from ad_bids limit 10" }'

# Query explore API
curl --request POST --url http://localhost:8080/v1/instances/default/queries/metrics-views/ad_bids_metrics/toplist/domain --header 'Content-Type: application/json' \
  --data '{
    "measure_names": ["measure_0"],
    "limit": 10,
    "sort": [{ "name": "measure_0", "ascending": false }]
}'

# Query profiling API
curl --request GET --url http://localhost:8080/v1/instances/default/null-count/ad_bids/publisher

# Get catalog info
curl --request GET --url http://localhost:8080/v1/instances/default/catalog

# Refresh source named "ad_bids_source"
curl --request POST --url http://localhost:8080/v1/instances/default/catalog/ad_bids_source/refresh

# Get available connectors
curl --request GET   --url http://localhost:8080/v1/connectors/meta

# List files in project
curl --request GET --url http://localhost:8080/v1/instances/default/files

# Fetch file in project
curl --request GET --url http://localhost:8080/v1/instances/default/files/-/models/ad_bids.sql

# Update file in project
curl --request POST --url http://localhost:8080/v1/instances/default/files/-/models/ad_bids.sql --header 'Content-Type: application/json' \
  --data '{ "blob": "select id, timestamp, publisher, domain, bid_price from ad_bids_source" }'

Using a DuckDB nightly build

The following steps apply for macOS, but a similar approach should work for Linux.

  1. Download the latest DuckDB nightly from Github from the "Artifacts" section on the newest workflow run here)
  2. Unzip the downloaded archive and copy the libduckdb.dylib file in the libduckdb-osx-universal folder to /usr/local/lib
  • You must use the command-line to copy the file. If you touch it using the Finder, macOS will quarantine it. To remove a quarantine, run: xattr -d com.apple.quarantine libduckdb.dylib.
  1. DuckDB usually does not support older file formats, so delete the stage.db and stage.db.wal files in your dev-project
  2. Add the flag -tags=duckdb_use_lib when running go run or go build to use the nightly build of DuckDB
  • If testing the local frontend, you need to temporarily set it in the dev-runtime script in package.json
  • For details, see Linking DuckDB

Note: DuckDB often makes breaking changes to its APIs, so you may encounter other errors when using a dev version of DuckDB.

Documentation

Index

Constants

View Source
const (
	ResourceKindProjectParser  string = "rill.runtime.v1.ProjectParser"
	ResourceKindSource         string = "rill.runtime.v1.SourceV2"
	ResourceKindModel          string = "rill.runtime.v1.ModelV2"
	ResourceKindMetricsView    string = "rill.runtime.v1.MetricsViewV2"
	ResourceKindMigration      string = "rill.runtime.v1.Migration"
	ResourceKindPullTrigger    string = "rill.runtime.v1.PullTrigger"
	ResourceKindRefreshTrigger string = "rill.runtime.v1.RefreshTrigger"
	ResourceKindBucketPlanner  string = "rill.runtime.v1.BucketPlanner"
)

Built-in resource kinds

Variables

View Source
var ReconcilerInitializers = make(map[string]ReconcilerInitializer)

ReconcilerInitializers is a registry of reconciler initializers for different resource kinds. There can be only one reconciler per resource kind.

Functions

func RegisterReconcilerInitializer added in v0.32.0

func RegisterReconcilerInitializer(resourceKind string, initializer ReconcilerInitializer)

RegisterReconciler registers a reconciler initializer for a specific resource kind

Types

type Controller added in v0.32.0

type Controller struct {
	Runtime    *Runtime
	InstanceID string
	Logger     *slog.Logger
	// contains filtered or unexported fields
}

Controller manages the catalog for a single instance and runs reconcilers to migrate the catalog (and related resources in external databases) into the desired state. For information about how the controller schedules reconcilers, see `runtime/reconcilers/README.md`.

func NewController added in v0.32.0

func NewController(ctx context.Context, rt *Runtime, instanceID string, logger *zap.Logger) (*Controller, error)

NewController creates a new Controller

func (*Controller) AcquireConn added in v0.32.0

func (c *Controller) AcquireConn(ctx context.Context, connector string) (drivers.Handle, func(), error)

AcquireOLAP gets a handle for a connector in the controller's instance.

func (*Controller) AcquireOLAP added in v0.32.0

func (c *Controller) AcquireOLAP(ctx context.Context, connector string) (drivers.OLAPStore, func(), error)

AcquireOLAP gets an OLAP handle for a connector in the controller's instance.

func (*Controller) Cancel added in v0.33.0

func (c *Controller) Cancel(ctx context.Context, name *runtimev1.ResourceName) error

Cancel cancels the current invocation of a resource's reconciler (if it's running). It does not re-enqueue the resource for reconciliation.

func (*Controller) Create added in v0.32.0

Create creates a resource and enqueues it for reconciliation. If a resource with the same name is currently being deleted, the deletion will be cancelled.

func (*Controller) Delete added in v0.32.0

func (c *Controller) Delete(ctx context.Context, name *runtimev1.ResourceName) error

Delete soft-deletes a resource and enqueues it for reconciliation (with DeletedOn != nil). Once the deleting reconciliation has been completed, the resource will be hard deleted. If Delete is called from the resource's own reconciler, the resource will be hard deleted immediately (and the calling reconcile's ctx will be canceled immediately).

func (*Controller) Flush added in v0.32.0

func (c *Controller) Flush(ctx context.Context) error

Flush forces a flush of the controller's catalog changes to persistent storage.

func (*Controller) Get added in v0.32.0

Get returns a resource by name. Soft-deleted resources (i.e. resources where DeletedOn != nil) are not returned.

func (*Controller) List added in v0.32.0

func (c *Controller) List(ctx context.Context, kind string) ([]*runtimev1.Resource, error)

List returns a list of resources of the specified kind. If kind is empty, all resources are returned. Soft-deleted resources (i.e. resources where DeletedOn != nil) are not returned.

func (*Controller) Lock added in v0.32.0

func (c *Controller) Lock(ctx context.Context)

Lock locks the controller's catalog and delays scheduling of new reconciliations until the lock is released. It can only be called from within a reconciler invocation. While the lock is held, resources can only be edited by a caller using the ctx passed to Lock.

func (*Controller) Reconcile added in v0.33.0

func (c *Controller) Reconcile(ctx context.Context, name *runtimev1.ResourceName) error

Reconcile enqueues a resource for reconciliation. If the resource is currently reconciling, the current reconciler will be cancelled first.

func (*Controller) Run added in v0.32.0

func (c *Controller) Run(ctx context.Context) error

Run starts and runs the controller's event loop. It returns when ctx is cancelled or an unrecoverable error occurs. The event loop schedules/invokes resource reconciliation and periodically flushes catalog changes to persistent storage. The implementation centers around these internal functions: enqueue, processQueue, schedule, and processCompletedInvocation. See their docstrings for further details.

func (*Controller) Unlock added in v0.32.0

func (c *Controller) Unlock(ctx context.Context)

Unlock releases the lock acquired by Lock.

func (*Controller) UpdateError added in v0.32.0

func (c *Controller) UpdateError(ctx context.Context, name *runtimev1.ResourceName, err error) error

UpdateError updates a resource's error. Unlike UpdateMeta and UpdateSpec, it does not cancel or enqueue reconciliation for the resource.

func (*Controller) UpdateMeta added in v0.32.0

func (c *Controller) UpdateMeta(ctx context.Context, name *runtimev1.ResourceName, refs []*runtimev1.ResourceName, owner *runtimev1.ResourceName, paths []string) error

UpdateMeta updates a resource's meta fields and enqueues it for reconciliation. If called from outside the resource's reconciler and the resource is currently reconciling, the current reconciler will be cancelled first.

func (*Controller) UpdateName added in v0.33.0

func (c *Controller) UpdateName(ctx context.Context, name, newName, owner *runtimev1.ResourceName, paths []string) error

UpdateName renames a resource and updates annotations, and enqueues it for reconciliation. If called from outside the resource's reconciler and the resource is currently reconciling, the current reconciler will be cancelled first.

func (*Controller) UpdateSpec added in v0.32.0

func (c *Controller) UpdateSpec(ctx context.Context, name *runtimev1.ResourceName, r *runtimev1.Resource) error

UpdateSpec updates a resource's spec and enqueues it for reconciliation. If called from outside the resource's reconciler and the resource is currently reconciling, the current reconciler will be cancelled first.

func (*Controller) UpdateState added in v0.32.0

func (c *Controller) UpdateState(ctx context.Context, name *runtimev1.ResourceName, r *runtimev1.Resource) error

UpdateState updates a resource's state. It can only be called from within the resource's reconciler. NOTE: Calls to UpdateState succeed even if ctx is cancelled. This enables cancelled reconcilers to update state before finishing.

type ExportOptions added in v0.29.1

type ExportOptions struct {
	Format       runtimev1.ExportFormat
	Priority     int
	PreWriteHook func(filename string) error
}

type Options added in v0.16.0

type Options struct {
	ConnectionCacheSize     int
	MetastoreConnector      string
	QueryCacheSizeBytes     int64
	SecurityEngineCacheSize int
	AllowHostAccess         bool
	SafeSourceRefresh       bool
	// SystemConnectors are drivers whose handles are shared with all instances
	SystemConnectors []*runtimev1.Connector
}

type Query added in v0.16.0

type Query interface {
	// Key should return a cache key that uniquely identifies the query
	Key() string
	// Deps should return the source and model names that the query targets.
	// It's used to invalidate cached queries when the underlying data changes.
	Deps() []string
	// MarshalResult should return the query result and estimated cost in bytes for caching
	MarshalResult() *QueryResult
	// UnmarshalResult should populate a query with a cached result
	UnmarshalResult(v any) error
	// Resolve should execute the query against the instance's infra.
	// Error can be nil along with a nil result in general, i.e. when a model contains no rows aggregation results can be nil.
	Resolve(ctx context.Context, rt *Runtime, instanceID string, priority int) error
	// Export resolves the query and serializes the result to the writer.
	Export(ctx context.Context, rt *Runtime, instanceID string, w io.Writer, opts *ExportOptions) error
}

type QueryResult added in v0.26.0

type QueryResult struct {
	Value any
	Bytes int64
}

type ReconcileResult added in v0.32.0

type ReconcileResult struct {
	Err       error
	Retrigger time.Time
}

ReconcileResult propagates results from a reconciler invocation

type Reconciler added in v0.32.0

type Reconciler interface {
	Close(ctx context.Context) error
	AssignSpec(from, to *runtimev1.Resource) error
	AssignState(from, to *runtimev1.Resource) error
	Reconcile(ctx context.Context, n *runtimev1.ResourceName) ReconcileResult
}

Reconciler implements reconciliation logic for all resources of a specific kind. Reconcilers are managed and invoked by a Controller.

type ReconcilerInitializer added in v0.32.0

type ReconcilerInitializer func(*Controller) Reconciler

ReconcilerInitializer is a function that initializes a new reconciler for a specific controller

type ResolvedMetricsViewSecurity added in v0.33.0

type ResolvedMetricsViewSecurity struct {
	Access    bool
	RowFilter string
	Include   []string
	Exclude   []string
}

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

func New

func New(opts *Options, logger *zap.Logger, client activity.Client) (*Runtime, error)

func (*Runtime) AcquireHandle added in v0.32.0

func (r *Runtime) AcquireHandle(ctx context.Context, instanceID, connector string) (drivers.Handle, func(), error)

AcquireHandle returns instance specific handle

func (*Runtime) AcquireSystemHandle added in v0.32.0

func (r *Runtime) AcquireSystemHandle(ctx context.Context, connector string) (drivers.Handle, func(), error)

func (*Runtime) AllowHostAccess added in v0.27.0

func (r *Runtime) AllowHostAccess() bool

func (*Runtime) Catalog added in v0.16.0

func (r *Runtime) Catalog(ctx context.Context, instanceID string) (drivers.CatalogStore, func(), error)

func (*Runtime) Close added in v0.18.0

func (r *Runtime) Close() error

func (*Runtime) CreateInstance

func (r *Runtime) CreateInstance(ctx context.Context, inst *drivers.Instance) error

func (*Runtime) DeleteFile added in v0.16.0

func (r *Runtime) DeleteFile(ctx context.Context, instanceID, path string) error

func (*Runtime) DeleteInstance added in v0.16.0

func (r *Runtime) DeleteInstance(ctx context.Context, instanceID string, dropDB bool) error

func (*Runtime) EditInstance added in v0.23.0

func (r *Runtime) EditInstance(ctx context.Context, inst *drivers.Instance) error

EditInstance edits exisiting instance. The API compares and only evicts caches if drivers or dsn is changed. This is done to ensure that db handlers are not unnecessarily closed

func (*Runtime) EvictHandle added in v0.32.0

func (r *Runtime) EvictHandle(ctx context.Context, instanceID, connector string, drop bool) error

EvictHandle flushes the db handle for the specific connector from the cache

func (*Runtime) FindInstance added in v0.16.0

func (r *Runtime) FindInstance(ctx context.Context, instanceID string) (*drivers.Instance, error)

func (*Runtime) FindInstances added in v0.16.0

func (r *Runtime) FindInstances(ctx context.Context) ([]*drivers.Instance, error)

func (*Runtime) GetCatalogEntry added in v0.16.0

func (r *Runtime) GetCatalogEntry(ctx context.Context, instanceID, name string) (*drivers.CatalogEntry, error)

func (*Runtime) GetFile added in v0.16.0

func (r *Runtime) GetFile(ctx context.Context, instanceID, path string) (string, time.Time, error)

func (*Runtime) GetInstanceAttributes added in v0.32.0

func (r *Runtime) GetInstanceAttributes(ctx context.Context, instanceID string) []attribute.KeyValue

GetInstanceAttributes fetches an instance and converts its annotations to attributes nil is returned if an error occurred or instance was not found

func (*Runtime) ListCatalogEntries added in v0.16.0

func (r *Runtime) ListCatalogEntries(ctx context.Context, instanceID string, t drivers.ObjectType) ([]*drivers.CatalogEntry, error)

func (*Runtime) ListFiles added in v0.16.0

func (r *Runtime) ListFiles(ctx context.Context, instanceID, glob string) ([]string, error)

func (*Runtime) NewCatalogService added in v0.24.0

func (r *Runtime) NewCatalogService(ctx context.Context, instanceID string) (*catalog.Service, error)

func (*Runtime) OLAP added in v0.16.0

func (r *Runtime) OLAP(ctx context.Context, instanceID string) (drivers.OLAPStore, func(), error)

func (*Runtime) PutFile added in v0.16.0

func (r *Runtime) PutFile(ctx context.Context, instanceID, path string, blob io.Reader, create, createOnly bool) error

func (*Runtime) Query added in v0.16.0

func (r *Runtime) Query(ctx context.Context, instanceID string, query Query, priority int) error

func (*Runtime) Reconcile added in v0.16.0

func (r *Runtime) Reconcile(ctx context.Context, instanceID string, changedPaths, forcedPaths []string, dry, strict bool) (*catalog.ReconcileResult, error)

func (*Runtime) RefreshSource added in v0.16.0

func (r *Runtime) RefreshSource(ctx context.Context, instanceID, name string) error

func (*Runtime) Registry added in v0.16.0

func (r *Runtime) Registry() drivers.RegistryStore

func (*Runtime) RenameFile added in v0.16.0

func (r *Runtime) RenameFile(ctx context.Context, instanceID, fromPath, toPath string) error

func (*Runtime) Repo added in v0.16.0

func (r *Runtime) Repo(ctx context.Context, instanceID string) (drivers.RepoStore, func(), error)

func (*Runtime) ResolveMetricsViewSecurity added in v0.33.0

func (r *Runtime) ResolveMetricsViewSecurity(attributes map[string]any, instanceID string, mv *runtimev1.MetricsView, lastUpdatedOn time.Time) (*ResolvedMetricsViewSecurity, error)

func (*Runtime) SyncExistingTables added in v0.16.0

func (r *Runtime) SyncExistingTables(ctx context.Context, instanceID string) error

Directories

Path Synopsis
compilers
gcs
s3
pkg
dag
duration
Package duration copied as it is from github.com/senseyeio/duration
Package duration copied as it is from github.com/senseyeio/duration
singleflight
Package singleflight provides a duplicate function call suppression mechanism.
Package singleflight provides a duplicate function call suppression mechanism.
services
catalog/artifacts/yaml
Package yaml reads and writes artifacts that exactly mirror the internal representation
Package yaml reads and writes artifacts that exactly mirror the internal representation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL