runtime

package
v0.32.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

README

runtime

The runtime a data infrastructure proxy and orchestrator – our data plane. It connects to data infrastructure and is or will be responsible for transpiling queries, parsing code artifacts, reconciling infra state, implementing connectors, enforcing (row-based) access policies, scheduling tasks, triggering alerts, and much more.

It's designed as a modular component that can be embedded in local applications (as it is into Rill Developer) or deployed stand-alone in a cloud environment.

Code structure

The base directory contains a Runtime type that represents the lifecycle of the runtime. It ties together the sub-directories:

  • client contains a Go client library for connecting to a runtime server.
  • compilers contains logic for parsing Rill projects (incomplete, currently mostly implemented in services instead).
  • connectors contains connector implementations.
  • drivers contains interfaces and drivers for external data infrastructure that the runtime interfaces with (like DuckDB and Druid).
  • pkg contains utility libraries.
  • queries contains pre-defined analytical queries that the runtime can serve (used for profiling and dashboards).
  • server contains a server that implements the runtime's APIs.
  • services contains logic for building and reconciling Rill projects.
  • testruntime contains helper functions for initializing a test runtime with test data.

How to test and run

You can start a stand-alone runtime server using the CLI:

go run ./cli runtime start

Ping the server to verify it's running:

go run ./cli runtime ping --base-url http://localhost:9090

You can run all tests using:

go test ./runtime/...

See details in web-local/README.md for info on how to run in development together with the local frontend.

Configuration

The runtime server is configured using environment variables parsed in cli/cmd/runtime/start.go. All environment variables have reasonable defaults suitable for local development. The current defaults are:

RILL_RUNTIME_HTTP_PORT="8080"
RILL_RUNTIME_GRPC_PORT="9090"
RILL_RUNTIME_LOG_LEVEL="info"
RILL_RUNTIME_METRICS_EXPORTER="prometheus"
RILL_RUNTIME_TRACES_EXPORTER=""
RILL_RUNTIME_ALLOWED_ORIGINS="*"
RILL_RUNTIME_METASTORE_DRIVER="sqlite"
RILL_RUNTIME_METASTORE_URL=":memory:"
RILL_RUNTIME_AUTH_ENABLE="false"
RILL_RUNTIME_AUTH_ISSUER_URL=""
RILL_RUNTIME_AUTH_AUDIENCE_URL=""
RILL_RUNTIME_CONNECTION_CACHE_SIZE="100"
RILL_RUNTIME_QUERY_CACHE_SIZE="10000"
RILL_RUNTIME_ALLOW_HOST_ACCESS="false"
RILL_RUNTIME_SAFE_SOURCE_REFRESH="true"
RILL_RUNTIME_GITHUB_APP_ID=""
RILL_RUNTIME_GITHUB_APP_PRIVATE_KEY=""

Adding a new endpoint

We define our APIs using gRPC and use gRPC-Gateway to map the RPCs to a RESTful API. See proto/README.md for details.

To add a new endpoint:

  1. Describe the endpoint in proto/rill/runtime/v1/api.proto
  2. Re-generate gRPC and OpenAPI interfaces by running make proto.generate
  3. Copy the new handler signature from the RuntimeServiceServer interface in proto/gen/rill/runtime/v1/api_grpc_pb.go
  4. Paste the handler signature and implement it in a relevant file in runtime/server/

Adding a new analytical query endpoint

  1. Add a new endpoint for the query by following the steps in the section above ("Adding a new endpoint")
  2. Implement the query in runtime/queries by following the instructions in runtime/queries/README.md

Example: Creating an instance and rehydrating from code artifacts

# Start runtime
go run ./runtime/cmd/main.go

# Create instance
curl --request POST --url http://localhost:8080/v1/instances --header 'Content-Type: application/json' \
  --data '{
    "instance_id": "default",
    "olap_driver": "duckdb",
    "olap_dsn": "test.db",
    "repo_driver": "file",
    "repo_dsn": "./examples/ad_bids",
    "embed_catalog": true
}'

# Apply code artifacts
curl --request POST --url http://localhost:8080/v1/instances/default/reconcile --header 'Content-Type: application/json'

# Query data
curl --request POST --url http://localhost:8080/v1/instances/default/query --header 'Content-Type: application/json' \
  --data '{ "sql": "select * from ad_bids limit 10" }'

# Query explore API
curl --request POST --url http://localhost:8080/v1/instances/default/queries/metrics-views/ad_bids_metrics/toplist/domain --header 'Content-Type: application/json' \
  --data '{
    "measure_names": ["measure_0"],
    "limit": 10,
    "sort": [{ "name": "measure_0", "ascending": false }]
}'

# Query profiling API
curl --request GET --url http://localhost:8080/v1/instances/default/null-count/ad_bids/publisher

# Get catalog info
curl --request GET --url http://localhost:8080/v1/instances/default/catalog

# Refresh source named "ad_bids_source"
curl --request POST --url http://localhost:8080/v1/instances/default/catalog/ad_bids_source/refresh

# Get available connectors
curl --request GET   --url http://localhost:8080/v1/connectors/meta

# List files in project
curl --request GET --url http://localhost:8080/v1/instances/default/files

# Fetch file in project
curl --request GET --url http://localhost:8080/v1/instances/default/files/-/models/ad_bids.sql

# Update file in project
curl --request POST --url http://localhost:8080/v1/instances/default/files/-/models/ad_bids.sql --header 'Content-Type: application/json' \
  --data '{ "blob": "select id, timestamp, publisher, domain, bid_price from ad_bids_source" }'

Using a DuckDB nightly build

The following steps apply for macOS, but a similar approach should work for Linux.

  1. Download the latest DuckDB nightly from Github from the "Artifacts" section on the newest workflow run here)
  2. Unzip the downloaded archive and copy the libduckdb.dylib file in the libduckdb-osx-universal folder to /usr/local/lib
  • You must use the command-line to copy the file. If you touch it using the Finder, macOS will quarantine it. To remove a quarantine, run: xattr -d com.apple.quarantine libduckdb.dylib.
  1. DuckDB usually does not support older file formats, so delete the stage.db and stage.db.wal files in your dev-project
  2. Add the flag -tags=duckdb_use_lib when running go run or go build to use the nightly build of DuckDB
  • If testing the local frontend, you need to temporarily set it in the dev-runtime script in package.json
  • For details, see Linking DuckDB

Note: DuckDB often makes breaking changes to its APIs, so you may encounter other errors when using a dev version of DuckDB.

Documentation

Index

Constants

View Source
const (
	ResourceKindProjectParser  string = "rill.runtime.v1.ProjectParser"
	ResourceKindSource         string = "rill.runtime.v1.SourceV2"
	ResourceKindModel          string = "rill.runtime.v1.ModelV2"
	ResourceKindMetricsView    string = "rill.runtime.v1.MetricsViewV2"
	ResourceKindMigration      string = "rill.runtime.v1.Migration"
	ResourceKindPullTrigger    string = "rill.runtime.v1.PullTrigger"
	ResourceKindRefreshTrigger string = "rill.runtime.v1.RefreshTrigger"
	ResourceKindBucketPlanner  string = "rill.runtime.v1.BucketPlanner"
)

Built-in resource kinds

Variables

View Source
var ErrInconsistentControllerVersion = errors.New("controller: inconsistent version")

ErrInconsistentControllerVersion is returned from Controller when an unexpected controller version is observed in the DB. An unexpected controller version will only be observed if multiple controllers are running simultanesouly (split brain).

View Source
var ErrInconsistentResourceVersion = errors.New("controller: inconsistent version")

ErrInconsistentResourceVersion is returned from catalog update functions when an unexpected resource version is observed in the DB. An unexpected resource version will only be observed if multiple controllers are running simultanesouly (split brain).

View Source
var ErrResourceNotFound = errors.New("controller: resource not found")

ErrResourceNotFound is returned from catalog functions when a referenced resource does not exist.

View Source
var ReconcilerInitializers = make(map[string]ReconcilerInitializer)

ReconcilerInitializers is a registry of reconciler initializers for different resource kinds. There can be only one reconciler per resource kind.

Functions

func RegisterReconcilerInitializer added in v0.32.0

func RegisterReconcilerInitializer(resourceKind string, initializer ReconcilerInitializer)

RegisterReconciler registers a reconciler initializer for a specific resource kind

Types

type Controller added in v0.32.0

type Controller struct {
	Runtime    *Runtime
	InstanceID string
	Logger     *slog.Logger
	// contains filtered or unexported fields
}

Controller manages the catalog for a single instance and runs reconcilers to migrate the catalog (and related resources in external databases) into the desired state.

func NewController added in v0.32.0

func NewController(ctx context.Context, rt *Runtime, instanceID string, logger *zap.Logger, opts *ControllerOptions) *Controller

NewController creates a new Controller

func (*Controller) AcquireConn added in v0.32.0

func (c *Controller) AcquireConn(ctx context.Context, connector string) (drivers.Handle, func(), error)

func (*Controller) AcquireOLAP added in v0.32.0

func (c *Controller) AcquireOLAP(ctx context.Context, connector string) (drivers.OLAPStore, func(), error)

func (*Controller) Create added in v0.32.0

func (*Controller) Delete added in v0.32.0

func (c *Controller) Delete(ctx context.Context, names ...*runtimev1.ResourceName) error

func (*Controller) Flush added in v0.32.0

func (c *Controller) Flush(ctx context.Context) error

func (*Controller) Get added in v0.32.0

func (*Controller) List added in v0.32.0

func (c *Controller) List(ctx context.Context) ([]*runtimev1.Resource, error)

func (*Controller) Lock added in v0.32.0

func (c *Controller) Lock()

func (*Controller) Retrigger added in v0.32.0

func (c *Controller) Retrigger(ctx context.Context, name *runtimev1.ResourceName, t time.Time) error

func (*Controller) Run added in v0.32.0

func (c *Controller) Run(ctx context.Context) error

Run starts and runs the controller's event loop. It returns when ctx is cancelled or an unrecoverable error occurs.

func (*Controller) Unlock added in v0.32.0

func (c *Controller) Unlock()

func (*Controller) UpdateError added in v0.32.0

func (c *Controller) UpdateError(ctx context.Context, name *runtimev1.ResourceName, err error) error

func (*Controller) UpdateMeta added in v0.32.0

func (c *Controller) UpdateMeta(ctx context.Context, name *runtimev1.ResourceName, opts *UpdateMetaOptions) error

func (*Controller) UpdateSpec added in v0.32.0

func (c *Controller) UpdateSpec(ctx context.Context, name *runtimev1.ResourceName, refs []*runtimev1.ResourceName, owner *runtimev1.ResourceName, paths []string, r *runtimev1.Resource) error

func (*Controller) UpdateState added in v0.32.0

func (c *Controller) UpdateState(ctx context.Context, name *runtimev1.ResourceName, r *runtimev1.Resource) error

type ControllerOptions added in v0.32.0

type ControllerOptions struct {
	EmbedCatalogInConnector         string
	PersistDeletedResourcesDuration time.Duration
}

ControllerOptions provides additional configuration of a controller

type ExportOptions added in v0.29.1

type ExportOptions struct {
	Format       runtimev1.ExportFormat
	Priority     int
	PreWriteHook func(filename string) error
}

type Options added in v0.16.0

type Options struct {
	ConnectionCacheSize int
	MetastoreConnector  string
	QueryCacheSizeBytes int64
	AllowHostAccess     bool
	SafeSourceRefresh   bool
	// SystemConnectors are drivers whose handles are shared with all instances
	SystemConnectors []*runtimev1.Connector
}

type Query added in v0.16.0

type Query interface {
	// Key should return a cache key that uniquely identifies the query
	Key() string
	// Deps should return the source and model names that the query targets.
	// It's used to invalidate cached queries when the underlying data changes.
	Deps() []string
	// MarshalResult should return the query result and estimated cost in bytes for caching
	MarshalResult() *QueryResult
	// UnmarshalResult should populate a query with a cached result
	UnmarshalResult(v any) error
	// Resolve should execute the query against the instance's infra.
	// Error can be nil along with a nil result in general, i.e. when a model contains no rows aggregation results can be nil.
	Resolve(ctx context.Context, rt *Runtime, instanceID string, priority int) error
	// Export resolves the query and serializes the result to the writer.
	Export(ctx context.Context, rt *Runtime, instanceID string, w io.Writer, opts *ExportOptions) error
}

type QueryResult added in v0.26.0

type QueryResult struct {
	Value any
	Bytes int64
}

type ReconcileResult added in v0.32.0

type ReconcileResult struct {
	Err       error
	Retrigger time.Time
}

ReconcileResult propagates results from a reconciler invocation

type Reconciler added in v0.32.0

type Reconciler interface {
	Reconcile(ctx context.Context, n *runtimev1.ResourceName) ReconcileResult
	Close(ctx context.Context) error
}

Reconciler implements reconciliation logic for all resources of a specific kind. Reconcilers are managed and invoked by a Controller.

type ReconcilerInitializer added in v0.32.0

type ReconcilerInitializer func(*Controller) Reconciler

ReconcilerInitializer is a function that initializes a new reconciler for a specific controller

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

func New

func New(opts *Options, logger *zap.Logger, client activity.Client) (*Runtime, error)

func (*Runtime) AcquireHandle added in v0.32.0

func (r *Runtime) AcquireHandle(ctx context.Context, instanceID, connector string) (drivers.Handle, func(), error)

AcquireHandle returns instance specific handle

func (*Runtime) AcquireSystemHandle added in v0.32.0

func (r *Runtime) AcquireSystemHandle(ctx context.Context, connector string) (drivers.Handle, func(), error)

func (*Runtime) AllowHostAccess added in v0.27.0

func (r *Runtime) AllowHostAccess() bool

func (*Runtime) Catalog added in v0.16.0

func (r *Runtime) Catalog(ctx context.Context, instanceID string) (drivers.CatalogStore, func(), error)

func (*Runtime) Close added in v0.18.0

func (r *Runtime) Close() error

func (*Runtime) CreateInstance

func (r *Runtime) CreateInstance(ctx context.Context, inst *drivers.Instance) error

func (*Runtime) DeleteFile added in v0.16.0

func (r *Runtime) DeleteFile(ctx context.Context, instanceID, path string) error

func (*Runtime) DeleteInstance added in v0.16.0

func (r *Runtime) DeleteInstance(ctx context.Context, instanceID string, dropDB bool) error

func (*Runtime) EditInstance added in v0.23.0

func (r *Runtime) EditInstance(ctx context.Context, inst *drivers.Instance) error

EditInstance edits exisiting instance. The API compares and only evicts caches if drivers or dsn is changed. This is done to ensure that db handlers are not unnecessarily closed

func (*Runtime) EvictHandle added in v0.32.0

func (r *Runtime) EvictHandle(ctx context.Context, instanceID, connector string, drop bool) error

EvictHandle flushes the db handle for the specific connector from the cache

func (*Runtime) FindInstance added in v0.16.0

func (r *Runtime) FindInstance(ctx context.Context, instanceID string) (*drivers.Instance, error)

func (*Runtime) FindInstances added in v0.16.0

func (r *Runtime) FindInstances(ctx context.Context) ([]*drivers.Instance, error)

func (*Runtime) GetCatalogEntry added in v0.16.0

func (r *Runtime) GetCatalogEntry(ctx context.Context, instanceID, name string) (*drivers.CatalogEntry, error)

func (*Runtime) GetFile added in v0.16.0

func (r *Runtime) GetFile(ctx context.Context, instanceID, path string) (string, time.Time, error)

func (*Runtime) GetInstanceAttributes added in v0.32.0

func (r *Runtime) GetInstanceAttributes(ctx context.Context, instanceID string) []attribute.KeyValue

GetInstanceAttributes fetches an instance and converts its annotations to attributes nil is returned if an error occurred or instance was not found

func (*Runtime) ListCatalogEntries added in v0.16.0

func (r *Runtime) ListCatalogEntries(ctx context.Context, instanceID string, t drivers.ObjectType) ([]*drivers.CatalogEntry, error)

func (*Runtime) ListFiles added in v0.16.0

func (r *Runtime) ListFiles(ctx context.Context, instanceID, glob string) ([]string, error)

func (*Runtime) NewCatalogService added in v0.24.0

func (r *Runtime) NewCatalogService(ctx context.Context, instanceID string) (*catalog.Service, error)

func (*Runtime) OLAP added in v0.16.0

func (r *Runtime) OLAP(ctx context.Context, instanceID string) (drivers.OLAPStore, func(), error)

func (*Runtime) PutFile added in v0.16.0

func (r *Runtime) PutFile(ctx context.Context, instanceID, path string, blob io.Reader, create, createOnly bool) error

func (*Runtime) Query added in v0.16.0

func (r *Runtime) Query(ctx context.Context, instanceID string, query Query, priority int) error

func (*Runtime) Reconcile added in v0.16.0

func (r *Runtime) Reconcile(ctx context.Context, instanceID string, changedPaths, forcedPaths []string, dry, strict bool) (*catalog.ReconcileResult, error)

func (*Runtime) RefreshSource added in v0.16.0

func (r *Runtime) RefreshSource(ctx context.Context, instanceID, name string) error

func (*Runtime) Registry added in v0.16.0

func (r *Runtime) Registry() drivers.RegistryStore

func (*Runtime) RenameFile added in v0.16.0

func (r *Runtime) RenameFile(ctx context.Context, instanceID, fromPath, toPath string) error

func (*Runtime) Repo added in v0.16.0

func (r *Runtime) Repo(ctx context.Context, instanceID string) (drivers.RepoStore, func(), error)

func (*Runtime) SyncExistingTables added in v0.16.0

func (r *Runtime) SyncExistingTables(ctx context.Context, instanceID string) error

type UpdateMetaOptions added in v0.32.0

type UpdateMetaOptions struct {
	NewName *runtimev1.ResourceName
	Refs    []*runtimev1.ResourceName
	Owner   *runtimev1.ResourceName
	Paths   []string
}

Directories

Path Synopsis
compilers
gcs
s3
pkg
dag
duration
Package duration copied as it is from github.com/senseyeio/duration
Package duration copied as it is from github.com/senseyeio/duration
singleflight
Package singleflight provides a duplicate function call suppression mechanism.
Package singleflight provides a duplicate function call suppression mechanism.
services
catalog/artifacts/yaml
Package yaml reads and writes artifacts that exactly mirror the internal representation
Package yaml reads and writes artifacts that exactly mirror the internal representation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL