auxdb

package module
v0.0.0-...-00555b7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2024 License: GPL-3.0 Imports: 15 Imported by: 0

README

aux-db

Overview

A generic database service for ephemeral, resource-associated data.

Rationale

We have many random pieces of data that are

  • associated with a resource: the resource identifier is the primary key
  • ephemeral: data is re-computed periodically
  • expensive to compute
  • displayed frequently (as in user-facing dashboards for instance)

A typical example is the web application scanner: a daily cron job scans the filesystem (very expensive operation) to build the list of web applications associated with each website; this data is then displayed as part of the user account dashboard.

What we need is some kind of short-term cache for this data so as to make queries very lightweight.

We don't want to use the primary user database because the data is potentially large, and rewritten periodically in its entirety, which would cause a large write load on the user database itself. Also there is no need for the real-time capability of the primary user database: this data is written once and then read many times, by completely different actors.

Finally, a pure key/value store is not sufficient: we would like to be able to run analytical queries on the entire dataset, to compute aggregates and other informational views.

Since many types of data share these same characteristics, it makes sense to have a single "auxiliary database" to handle all of them in a generic fashion.

Implementation

Every data item is identified by a primary key (globally unique) with the following components:

  • shard_id
  • data type
  • resource id
  • (optional) application-specific key

Every data type has its own separate schema, which the clients must agree on. The server is agnostic with respect to the data structure, as it only sees opaque JSON blobs. But these data blobs can be inspected by pre-defined aggregations (queries), that are shipped with the server configuration and can be queried via the RPC interface, which can take advantage of the SQLITE JSON extensions.

The application-specific key is a separate and independent key space hooked off the primary (shard / type / resource_id) key, which allows one to store multiple entries for the same type and resource.

Q: Isn't this just a way to achieve SQL sharding?

A: In a sense, yes, but with RPC APIs tuned to a very specific use case, which allows for lightweight clients.

Distributed architecture

We can make the reasonable assumption that it is ok if we don't have access to resource-associated data when we lose access to the primary resource in the first place.

Under this assumption, we can then place an "aux database" instance on every service shard, and only store data on it that is associated to resources on that same shard.

We need smart clients that:

  • can query the shard-specific aux database for each data item (i.e. the client must know the resource shard);
  • run analytical results on every shard and know how to aggregate the results.

For analytical queries, we can offload the most complex work from the client by having the server itself query all the other shards, and then run a reduce stage on the aggregate. This is not as efficient as a real Map/Reduce architecture due to all the (in-memory) data serialization involved, but it works on a similar principle.

The time axis

Data entries are inherently timestamped. We need to support two separate use cases, which are practically orthogonal:

  • aggregate queries across multiple keys but at the latest timestamp
  • historical queries for a key across time

To do this we maintain two separate sets of tables for each data type, the latest one and the historical one. Pre-defined queries then specify over which table they are expected to run. This solution keeps latest queries fast, and allows historical ones (albeit slowly, perhaps).

Data expiration

Since we offer aggregate queries, it is important that the database only contains fresh data. Since data here is loaded by bulk loaders, we need to expire old entries.

The simplest approach would be to expect that each bulk load corresponds to a full load, and completely replaces the previous data table type. However this is not well compatible with the incremental approach adopted by our scanner scripts. We do something different instead:

The responsibility for managing "freshness" is moved to the client side by letting the client set a TTL for each item (presumably set to a few multiples of the cron interval). Aggregate queries will then only scan non-expired data items.

API

Insert

Set(key, value)

Point set, sets a single key / value pair at the current timestamp.

Load(items...)

Bulk load a large number of entries at the current timestamp.

Query

Get(key...)

Point get, retrieve the value associated with a key, at the latest available timestamp. We support retrieving multiple keys at once for latency reasons.

Query(query_name, [param, ...])

Run a pre-defined query, with optional parameters.

Example

Let's take an example scenario where we want to store disk usage for certain types of resources. The data type will be usage, with a trivial value schema reporting the usage in bytes under the usage attribute:

{"usage": 1234}

This would result in the following sample Set request:

{
  "type": "usage",
  "resource_id", "user@example.com",
  "value_json": "{\"usage\": 1234}",
  "timestamp": "2021-01-23T10:10:05+00:00",
  "ttl": 86400
}

The shard ID will automatically be set to the one of the server, resulting in the final key for the entry of (if shard_id is 1):

1/usage/user@example.com

The bulk data loader is just a daily cron job that runs "du" and resolves paths to the resource ID they're associated to.

Queries like "top N users" can be something like (this would be the query definition in the configuration file, note how we have to specify the types of the result columns):

queries:
  top_usage:
    sql: |
        SELECT resource_id, json_extract(value_json, '$.usage') AS usage
        FROM latest
        ORDER BY usage DESC LIMIT 10
    results:
      - { name: resource_id, type: string }
      - { name: usage, type: int }

While a historical view of a resource usage is:

queries:
  historical_usage:
    sql: |
        SELECT timestamp, json_extract(value_json, '$.usage') AS usage
        FROM historical
        WHERE key = :key
        ORDER BY timestamp DESC
    results:
      - { name: timestamp, type: timestamp }
      - { name: usage, type: int }

which can then be invoked with the following Query request specifying the value for the key parameter:

{
  "query_name": "historical_usage",
  "params": [
    {
      "name": "key",
      "value": "1/usage/user@example.com"
    }
  ]
}

Suppose now we would like to aggregate the top N data over all the service shards. We can then define a reduce stage for the query, which in this case will simply compute once again the same top N calculation over the aggregated results from the various shards:

queries:
  top_usage:
    sql: |
        SELECT resource_id, json_extract(value_json, '$.usage') AS usage
        FROM latest
        ORDER BY usage DESC LIMIT 10
    results:
      - { name: resource_id, type: string }
      - { name: usage, type: int }
    reduce_sql: |
        SELECT resource_id, json_extract(value_json, '$.usage') AS usage
        FROM :table
        ORDER BY usage DESC LIMIT 10
    reduce_results:
      - { name: resource_id, type: string }
      - { name: usage, type: int }

Note that we could have omitted the reduce_results attribute since it's the same as results (assumed to be the default).

The only difference with the primary SQL query is the :table parameter: the reduce phase runs on a temporary data table, this will be replaced by its name at execution time.

We can then ask the server to query all shards (here called 1, 2 and 3) and reduce the results:

{
  "query_name": "top_usage",
  "params": [
    {
      "name": "key",
      "value": "1/usage/user@example.com"
    }
  ],
  "shards": ["1", "2", "3"]
}

Documentation

Index

Constants

View Source
const (
	TypeInt       = "int"
	TypeString    = "string"
	TypeTimestamp = "timestamp"
)

Variables

This section is empty.

Functions

func OpenDB

func OpenDB(dburi string) (*sql.DB, error)

OpenDB opens a SQLite database and runs the database migrations.

Types

type NullString

type NullString string

NullString converts database NULLs to empty strings.

func (*NullString) Scan

func (s *NullString) Scan(value interface{}) error

type QueryParam

type QueryParam struct {
	Name string `json:"name"`
	Type string `json:"type"`
}

type QuerySpec

type QuerySpec struct {
	SQL           string       `yaml:"sql"`
	Results       []QueryParam `yaml:"results"`
	ReduceSQL     string       `yaml:"reduce_sql"`
	ReduceResults []QueryParam `yaml:"reduce_results"`
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(db *sql.DB, shardID string, queries map[string]*QuerySpec, peers clientutil.Backend, log *log.Logger) *Server

func (*Server) Close

func (s *Server) Close()

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL