runtime

package
v0.36.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2023 License: Apache-2.0 Imports: 35 Imported by: 0

README

runtime

The runtime a data infrastructure proxy and orchestrator – our data plane. It connects to data infrastructure and is or will be responsible for transpiling queries, parsing code artifacts, reconciling infra state, implementing connectors, enforcing (row-based) access policies, scheduling tasks, triggering alerts, and much more.

It's designed as a modular component that can be embedded in local applications (as it is into Rill Developer) or deployed stand-alone in a cloud environment.

Code structure

The base directory contains a Runtime type that represents the lifecycle of the runtime. It ties together the sub-directories:

  • client contains a Go client library for connecting to a runtime server.
  • compilers contains logic for parsing Rill projects (incomplete, currently mostly implemented in services instead).
  • connectors contains connector implementations.
  • drivers contains interfaces and drivers for external data infrastructure that the runtime interfaces with (like DuckDB and Druid).
  • pkg contains utility libraries.
  • queries contains pre-defined analytical queries that the runtime can serve (used for profiling and dashboards).
  • server contains a server that implements the runtime's APIs.
  • services contains logic for building and reconciling Rill projects.
  • testruntime contains helper functions for initializing a test runtime with test data.

How to test and run

You can start a stand-alone runtime server using the CLI:

go run ./cli runtime start

Ping the server to verify it's running:

go run ./cli runtime ping --base-url http://localhost:9090

You can run all tests using:

go test ./runtime/...

See details in web-local/README.md for info on how to run in development together with the local frontend.

Configuration

The runtime server is configured using environment variables parsed in cli/cmd/runtime/start.go. All environment variables have reasonable defaults suitable for local development. The current defaults are:

RILL_RUNTIME_HTTP_PORT="8080"
RILL_RUNTIME_GRPC_PORT="9090"
RILL_RUNTIME_LOG_LEVEL="info"
RILL_RUNTIME_METRICS_EXPORTER="prometheus"
RILL_RUNTIME_TRACES_EXPORTER=""
RILL_RUNTIME_ALLOWED_ORIGINS="*"
RILL_RUNTIME_METASTORE_DRIVER="sqlite"
RILL_RUNTIME_METASTORE_URL=":memory:"
# Hex-encoded comma-separated list of key pairs. To generate, run "go run ./scripts/generate_keypairs/main.go"
# For details: https://pkg.go.dev/github.com/gorilla/sessions#NewCookieStore
RILL_RUNTIME_SESSION_KEY_PAIRS=7938b8c95ac90b3731c353076daeae8a,90c22a5a6c6b442afdb46855f95eb7d6
RILL_RUNTIME_AUTH_ENABLE="false"
RILL_RUNTIME_AUTH_ISSUER_URL=""
RILL_RUNTIME_AUTH_AUDIENCE_URL=""
RILL_RUNTIME_EMAIL_SMTP_HOST=""
RILL_RUNTIME_EMAIL_SMTP_PORT=""
RILL_RUNTIME_EMAIL_SMTP_USERNAME=""
RILL_RUNTIME_EMAIL_SMTP_PASSWORD=""
RILL_RUNTIME_EMAIL_SENDER_EMAIL=""
RILL_RUNTIME_EMAIL_SENDER_NAME=""
RILL_RUNTIME_EMAIL_BCC=""
RILL_RUNTIME_CONNECTION_CACHE_SIZE="100"
RILL_RUNTIME_QUERY_CACHE_SIZE="10000"
RILL_RUNTIME_ALLOW_HOST_ACCESS="false"
RILL_RUNTIME_SAFE_SOURCE_REFRESH="true"
RILL_RUNTIME_GITHUB_APP_ID=""
RILL_RUNTIME_GITHUB_APP_PRIVATE_KEY=""

Adding a new endpoint

We define our APIs using gRPC and use gRPC-Gateway to map the RPCs to a RESTful API. See proto/README.md for details.

To add a new endpoint:

  1. Describe the endpoint in proto/rill/runtime/v1/api.proto
  2. Re-generate gRPC and OpenAPI interfaces by running make proto.generate
  3. Copy the new handler signature from the RuntimeServiceServer interface in proto/gen/rill/runtime/v1/api_grpc_pb.go
  4. Paste the handler signature and implement it in a relevant file in runtime/server/

Adding a new analytical query endpoint

  1. Add a new endpoint for the query by following the steps in the section above ("Adding a new endpoint")
  2. Implement the query in runtime/queries by following the instructions in runtime/queries/README.md

Example: Creating an instance and rehydrating from code artifacts

# Start runtime
go run ./runtime/cmd/main.go

# Create instance
curl --request POST --url http://localhost:8080/v1/instances --header 'Content-Type: application/json' \
  --data '{
    "instance_id": "default",
    "olap_driver": "duckdb",
    "olap_dsn": "test.db",
    "repo_driver": "file",
    "repo_dsn": "./examples/ad_bids",
    "embed_catalog": true
}'

# Apply code artifacts
curl --request POST --url http://localhost:8080/v1/instances/default/reconcile --header 'Content-Type: application/json'

# Query data
curl --request POST --url http://localhost:8080/v1/instances/default/query --header 'Content-Type: application/json' \
  --data '{ "sql": "select * from ad_bids limit 10" }'

# Query explore API
curl --request POST --url http://localhost:8080/v1/instances/default/queries/metrics-views/ad_bids_metrics/toplist/domain --header 'Content-Type: application/json' \
  --data '{
    "measure_names": ["measure_0"],
    "limit": 10,
    "sort": [{ "name": "measure_0", "ascending": false }]
}'

# Query profiling API
curl --request GET --url http://localhost:8080/v1/instances/default/null-count/ad_bids/publisher

# Get catalog info
curl --request GET --url http://localhost:8080/v1/instances/default/catalog

# Refresh source named "ad_bids_source"
curl --request POST --url http://localhost:8080/v1/instances/default/catalog/ad_bids_source/refresh

# Get available connectors
curl --request GET   --url http://localhost:8080/v1/connectors/meta

# List files in project
curl --request GET --url http://localhost:8080/v1/instances/default/files

# Fetch file in project
curl --request GET --url http://localhost:8080/v1/instances/default/files/-/models/ad_bids.sql

# Update file in project
curl --request POST --url http://localhost:8080/v1/instances/default/files/-/models/ad_bids.sql --header 'Content-Type: application/json' \
  --data '{ "blob": "select id, timestamp, publisher, domain, bid_price from ad_bids_source" }'

Using a DuckDB nightly build

The following steps apply for macOS, but a similar approach should work for Linux.

  1. Download the latest DuckDB nightly from Github from the "Artifacts" section on the newest workflow run here)
  2. Unzip the downloaded archive and copy the libduckdb.dylib file in the libduckdb-osx-universal folder to /usr/local/lib
  • You must use the command-line to copy the file. If you touch it using the Finder, macOS will quarantine it. To remove a quarantine, run: xattr -d com.apple.quarantine libduckdb.dylib.
  1. DuckDB usually does not support older file formats, so delete the stage.db and stage.db.wal files in your dev-project
  2. Add the flag -tags=duckdb_use_lib when running go run or go build to use the nightly build of DuckDB
  • If testing the local frontend, you need to temporarily set it in the dev-runtime script in package.json
  • For details, see Linking DuckDB

Note: DuckDB often makes breaking changes to its APIs, so you may encounter other errors when using a dev version of DuckDB.

Documentation

Index

Constants

View Source
const (
	ResourceKindProjectParser  string = "rill.runtime.v1.ProjectParser"
	ResourceKindSource         string = "rill.runtime.v1.Source"
	ResourceKindModel          string = "rill.runtime.v1.Model"
	ResourceKindMetricsView    string = "rill.runtime.v1.MetricsView"
	ResourceKindMigration      string = "rill.runtime.v1.Migration"
	ResourceKindReport         string = "rill.runtime.v1.Report"
	ResourceKindPullTrigger    string = "rill.runtime.v1.PullTrigger"
	ResourceKindRefreshTrigger string = "rill.runtime.v1.RefreshTrigger"
	ResourceKindBucketPlanner  string = "rill.runtime.v1.BucketPlanner"
)

Built-in resource kinds

Variables

View Source
var GlobalProjectParserName = &runtimev1.ResourceName{Kind: ResourceKindProjectParser, Name: "parser"}

GlobalProjectParserName is the name of the instance-global project parser resource that is created for each new instance.

View Source
var ReconcilerInitializers = make(map[string]ReconcilerInitializer)

ReconcilerInitializers is a registry of reconciler initializers for different resource kinds. There can be only one reconciler per resource kind.

Functions

func RegisterReconcilerInitializer added in v0.32.0

func RegisterReconcilerInitializer(resourceKind string, initializer ReconcilerInitializer)

RegisterReconciler registers a reconciler initializer for a specific resource kind

Types

type Controller added in v0.32.0

type Controller struct {
	Runtime    *Runtime
	InstanceID string
	Logger     *slog.Logger
	Activity   activity.Client
	// contains filtered or unexported fields
}

Controller manages the catalog for a single instance and runs reconcilers to migrate the catalog (and related resources in external databases) into the desired state. For information about how the controller schedules reconcilers, see `runtime/reconcilers/README.md`.

func NewController added in v0.32.0

func NewController(ctx context.Context, rt *Runtime, instanceID string, logger *zap.Logger, ac activity.Client) (*Controller, error)

NewController creates a new Controller

func (*Controller) AcquireConn added in v0.32.0

func (c *Controller) AcquireConn(ctx context.Context, connector string) (drivers.Handle, func(), error)

AcquireOLAP gets a handle for a connector in the controller's instance.

func (*Controller) AcquireOLAP added in v0.32.0

func (c *Controller) AcquireOLAP(ctx context.Context, connector string) (drivers.OLAPStore, func(), error)

AcquireOLAP gets an OLAP handle for a connector in the controller's instance.

func (*Controller) Cancel added in v0.33.0

func (c *Controller) Cancel(ctx context.Context, name *runtimev1.ResourceName) error

Cancel cancels the current invocation of a resource's reconciler (if it's running). It does not re-enqueue the resource for reconciliation.

func (*Controller) Create added in v0.32.0

func (c *Controller) Create(ctx context.Context, name *runtimev1.ResourceName, refs []*runtimev1.ResourceName, owner *runtimev1.ResourceName, paths []string, hidden bool, r *runtimev1.Resource) error

Create creates a resource and enqueues it for reconciliation. If a resource with the same name is currently being deleted, the deletion will be cancelled.

func (*Controller) Delete added in v0.32.0

func (c *Controller) Delete(ctx context.Context, name *runtimev1.ResourceName) error

Delete soft-deletes a resource and enqueues it for reconciliation (with DeletedOn != nil). Once the deleting reconciliation has been completed, the resource will be hard deleted. If Delete is called from the resource's own reconciler, the resource will be hard deleted immediately (and the calling reconcile's ctx will be canceled immediately).

func (*Controller) Flush added in v0.32.0

func (c *Controller) Flush(ctx context.Context) error

Flush forces a flush of the controller's catalog changes to persistent storage.

func (*Controller) Get added in v0.32.0

Get returns a resource by name. Soft-deleted resources (i.e. resources where DeletedOn != nil) are not returned.

func (*Controller) List added in v0.32.0

func (c *Controller) List(ctx context.Context, kind string, clone bool) ([]*runtimev1.Resource, error)

List returns a list of resources of the specified kind. If kind is empty, all resources are returned. Soft-deleted resources (i.e. resources where DeletedOn != nil) are not returned.

func (*Controller) Lock added in v0.32.0

func (c *Controller) Lock(ctx context.Context)

Lock locks the controller's catalog and delays scheduling of new reconciliations until the lock is released. It can only be called from within a reconciler invocation. While the lock is held, resources can only be edited by a caller using the ctx passed to Lock.

func (*Controller) Reconcile added in v0.33.0

func (c *Controller) Reconcile(ctx context.Context, name *runtimev1.ResourceName) error

Reconcile enqueues a resource for reconciliation. If the resource is currently reconciling, the current reconciler will be cancelled first.

func (*Controller) Run added in v0.32.0

func (c *Controller) Run(ctx context.Context) error

Run starts and runs the controller's event loop. It returns when ctx is cancelled or an unrecoverable error occurs. Before returning, it closes the controller, so it must only be called once. The event loop schedules/invokes resource reconciliation and periodically flushes catalog changes to persistent storage. The implementation centers around these internal functions: enqueue, processQueue (uses markPending, trySchedule, invoke), and processCompletedInvocation. See their docstrings for further details.

func (*Controller) Subscribe added in v0.33.1

func (c *Controller) Subscribe(ctx context.Context, fn SubscribeCallback) error

Subscribe registers a callback that will receive resource update events. The same callback function will not be invoked concurrently. The callback function is invoked under a lock and must not call the controller.

func (*Controller) Unlock added in v0.32.0

func (c *Controller) Unlock(ctx context.Context)

Unlock releases the lock acquired by Lock.

func (*Controller) UpdateError added in v0.32.0

func (c *Controller) UpdateError(ctx context.Context, name *runtimev1.ResourceName, reconcileErr error) error

UpdateError updates a resource's error. Unlike UpdateMeta and UpdateSpec, it does not cancel or enqueue reconciliation for the resource.

func (*Controller) UpdateMeta added in v0.32.0

func (c *Controller) UpdateMeta(ctx context.Context, name *runtimev1.ResourceName, refs []*runtimev1.ResourceName, owner *runtimev1.ResourceName, paths []string) error

UpdateMeta updates a resource's meta fields and enqueues it for reconciliation. If called from outside the resource's reconciler and the resource is currently reconciling, the current reconciler will be cancelled first.

func (*Controller) UpdateName added in v0.33.0

func (c *Controller) UpdateName(ctx context.Context, name, newName, owner *runtimev1.ResourceName, paths []string) error

UpdateName renames a resource and updates annotations, and enqueues it for reconciliation. If called from outside the resource's reconciler and the resource is currently reconciling, the current reconciler will be cancelled first.

func (*Controller) UpdateSpec added in v0.32.0

func (c *Controller) UpdateSpec(ctx context.Context, name *runtimev1.ResourceName, r *runtimev1.Resource) error

UpdateSpec updates a resource's spec and enqueues it for reconciliation. If called from outside the resource's reconciler and the resource is currently reconciling, the current reconciler will be cancelled first.

func (*Controller) UpdateState added in v0.32.0

func (c *Controller) UpdateState(ctx context.Context, name *runtimev1.ResourceName, r *runtimev1.Resource) error

UpdateState updates a resource's state. It can only be called from within the resource's reconciler. NOTE: Calls to UpdateState succeed even if ctx is cancelled. This enables cancelled reconcilers to update state before finishing.

func (*Controller) WaitUntilIdle added in v0.35.0

func (c *Controller) WaitUntilIdle(ctx context.Context, ignoreHidden bool) error

WaitUntilIdle returns when the controller is idle (i.e. no reconcilers are pending or running).

type ExportOptions added in v0.29.1

type ExportOptions struct {
	Format       runtimev1.ExportFormat
	Priority     int
	PreWriteHook func(filename string) error
}

type Options added in v0.16.0

type Options struct {
	MetastoreConnector      string
	SystemConnectors        []*runtimev1.Connector
	ConnectionCacheSize     int
	QueryCacheSizeBytes     int64
	SecurityEngineCacheSize int
	AllowHostAccess         bool
	SafeSourceRefresh       bool
}

type Query added in v0.16.0

type Query interface {
	// Key should return a cache key that uniquely identifies the query
	Key() string
	// Deps should return the resource names that the query targets.
	// It's used to invalidate cached queries when the underlying data changes.
	// If a dependency doesn't exist, it is ignored. (So if the underlying resource kind is unknown, it can return all possible dependency names.)
	Deps() []*runtimev1.ResourceName
	// MarshalResult should return the query result and estimated cost in bytes for caching
	MarshalResult() *QueryResult
	// UnmarshalResult should populate a query with a cached result
	UnmarshalResult(v any) error
	// Resolve should execute the query against the instance's infra.
	// Error can be nil along with a nil result in general, i.e. when a model contains no rows aggregation results can be nil.
	Resolve(ctx context.Context, rt *Runtime, instanceID string, priority int) error
	// Export resolves the query and serializes the result to the writer.
	Export(ctx context.Context, rt *Runtime, instanceID string, w io.Writer, opts *ExportOptions) error
}

type QueryResult added in v0.26.0

type QueryResult struct {
	Value any
	Bytes int64
}

type ReconcileResult added in v0.32.0

type ReconcileResult struct {
	Err       error
	Retrigger time.Time
}

ReconcileResult propagates results from a reconciler invocation

type Reconciler added in v0.32.0

type Reconciler interface {
	Close(ctx context.Context) error
	AssignSpec(from, to *runtimev1.Resource) error
	AssignState(from, to *runtimev1.Resource) error
	ResetState(r *runtimev1.Resource) error
	Reconcile(ctx context.Context, n *runtimev1.ResourceName) ReconcileResult
}

Reconciler implements reconciliation logic for all resources of a specific kind. Reconcilers are managed and invoked by a Controller.

type ReconcilerInitializer added in v0.32.0

type ReconcilerInitializer func(*Controller) Reconciler

ReconcilerInitializer is a function that initializes a new reconciler for a specific controller

type ResolvedMetricsViewSecurity added in v0.33.0

type ResolvedMetricsViewSecurity struct {
	Access    bool
	RowFilter string
	Include   []string
	Exclude   []string
}

type Runtime

type Runtime struct {
	Email *email.Client
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, opts *Options, logger *zap.Logger, ac activity.Client, emailClient *email.Client) (*Runtime, error)

func (*Runtime) AcquireHandle added in v0.32.0

func (r *Runtime) AcquireHandle(ctx context.Context, instanceID, connector string) (drivers.Handle, func(), error)

AcquireHandle returns instance specific handle

func (*Runtime) AcquireSystemHandle added in v0.32.0

func (r *Runtime) AcquireSystemHandle(ctx context.Context, connector string) (drivers.Handle, func(), error)

func (*Runtime) AllowHostAccess added in v0.27.0

func (r *Runtime) AllowHostAccess() bool

func (*Runtime) Catalog added in v0.16.0

func (r *Runtime) Catalog(ctx context.Context, instanceID string) (drivers.CatalogStore, func(), error)

func (*Runtime) Close added in v0.18.0

func (r *Runtime) Close() error

func (*Runtime) Controller added in v0.33.1

func (r *Runtime) Controller(ctx context.Context, instanceID string) (*Controller, error)

Controller returns the controller for the given instance. If the controller is currently initializing, the call will wait until the controller is ready. If the controller has closed with a fatal error, that error will be returned here until it's restarted.

func (*Runtime) CreateInstance

func (r *Runtime) CreateInstance(ctx context.Context, inst *drivers.Instance) error

CreateInstance creates a new instance and starts a controller for it.

func (*Runtime) DeleteFile added in v0.16.0

func (r *Runtime) DeleteFile(ctx context.Context, instanceID, path string) error

func (*Runtime) DeleteInstance added in v0.16.0

func (r *Runtime) DeleteInstance(ctx context.Context, instanceID string, dropDB bool) error

DeleteInstance deletes an instance and stops its controller.

func (*Runtime) EditInstance added in v0.23.0

func (r *Runtime) EditInstance(ctx context.Context, inst *drivers.Instance, restartController bool) error

EditInstance edits an existing instance. If restartController is true, the instance's controller will be re-opened and all cached connections for the instance will be evicted. Until the controller and connections have been closed and re-opened, calls related to the instance may return transient errors.

func (*Runtime) GetFile added in v0.16.0

func (r *Runtime) GetFile(ctx context.Context, instanceID, path string) (string, time.Time, error)

func (*Runtime) GetInstanceAttributes added in v0.32.0

func (r *Runtime) GetInstanceAttributes(ctx context.Context, instanceID string) []attribute.KeyValue

GetInstanceAttributes fetches an instance and converts its annotations to attributes nil is returned if an error occurred or instance was not found

func (*Runtime) Instance added in v0.35.0

func (r *Runtime) Instance(ctx context.Context, instanceID string) (*drivers.Instance, error)

Instance looks up an instance by ID. Instances are cached in-memory, so this is a cheap operation.

func (*Runtime) Instances added in v0.35.0

func (r *Runtime) Instances(ctx context.Context) ([]*drivers.Instance, error)

Instances returns all instances managed by the runtime.

func (*Runtime) ListFiles added in v0.16.0

func (r *Runtime) ListFiles(ctx context.Context, instanceID, glob string) ([]string, error)

func (*Runtime) OLAP added in v0.16.0

func (r *Runtime) OLAP(ctx context.Context, instanceID string) (drivers.OLAPStore, func(), error)

func (*Runtime) PutFile added in v0.16.0

func (r *Runtime) PutFile(ctx context.Context, instanceID, path string, blob io.Reader, create, createOnly bool) error

func (*Runtime) Query added in v0.16.0

func (r *Runtime) Query(ctx context.Context, instanceID string, query Query, priority int) error

func (*Runtime) RenameFile added in v0.16.0

func (r *Runtime) RenameFile(ctx context.Context, instanceID, fromPath, toPath string) error

func (*Runtime) Repo added in v0.16.0

func (r *Runtime) Repo(ctx context.Context, instanceID string) (drivers.RepoStore, func(), error)

func (*Runtime) ResolveMetricsViewSecurity added in v0.33.0

func (r *Runtime) ResolveMetricsViewSecurity(attributes map[string]any, instanceID string, mv *runtimev1.MetricsViewSpec, lastUpdatedOn time.Time) (*ResolvedMetricsViewSecurity, error)

func (*Runtime) WaitUntilIdle added in v0.35.0

func (r *Runtime) WaitUntilIdle(ctx context.Context, instanceID string, ignoreHidden bool) error

WaitUntilIdle waits until the instance's controller is idle (not reconciling any resources).

type SubscribeCallback added in v0.33.1

type SubscribeCallback func(e runtimev1.ResourceEvent, n *runtimev1.ResourceName, r *runtimev1.Resource)

SubscribeCallback is the callback type passed to Subscribe.

Directories

Path Synopsis
compilers
gcs
s3
pkg
dag
duration
Package duration copied as it is from github.com/senseyeio/duration
Package duration copied as it is from github.com/senseyeio/duration
singleflight
Package singleflight provides a duplicate function call suppression mechanism.
Package singleflight provides a duplicate function call suppression mechanism.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL