aux-db
Overview
A generic database service for ephemeral, resource-associated data.
Rationale
We have many random pieces of data that are
- associated with a resource: the resource identifier is the primary
key
- ephemeral: data is re-computed periodically
- expensive to compute
- displayed frequently (as in user-facing dashboards for instance)
A typical example is the web application scanner: a daily cron job
scans the filesystem (very expensive operation) to build the list of
web applications associated with each website; this data is then
displayed as part of the user account dashboard.
What we need is some kind of short-term cache for this data so as to
make queries very lightweight.
We don't want to use the primary user database because the data is
potentially large, and rewritten periodically in its entirety, which
would cause a large write load on the user database itself. Also there
is no need for the real-time capability of the primary user database:
this data is written once and then read many times, by completely
different actors.
Finally, a pure key/value store is not sufficient: we would like to
be able to run analytical queries on the entire dataset, to compute
aggregates and other informational views.
Since many types of data share these same characteristics, it makes
sense to have a single "auxiliary database" to handle all of them in a
generic fashion.
Implementation
Every data item is identified by a primary key (globally unique) with
the following components:
- shard_id
- data type
- resource id
- (optional) application-specific key
Every data type has its own separate schema, which the clients must
agree on. The server is agnostic with respect to the data structure,
as it only sees opaque JSON blobs. But these data blobs can be
inspected by pre-defined aggregations (queries), that are shipped
with the server configuration and can be queried via the RPC
interface, which can take advantage of the SQLITE JSON extensions.
The application-specific key is a separate and independent key space
hooked off the primary (shard / type / resource_id) key, which allows
one to store multiple entries for the same type and resource.
Q: Isn't this just a way to achieve SQL sharding?
A: In a sense, yes, but with RPC APIs tuned to a very specific use
case, which allows for lightweight clients.
Distributed architecture
We can make the reasonable assumption that it is ok if we don't have
access to resource-associated data when we lose access to the primary
resource in the first place.
Under this assumption, we can then place an "aux database" instance on
every service shard, and only store data on it that is associated to
resources on that same shard.
We need smart clients that:
- can query the shard-specific aux database for each data item
(i.e. the client must know the resource shard);
- run analytical results on every shard and know how to aggregate the
results.
For analytical queries, we can offload the most complex work from the
client by having the server itself query all the other shards, and
then run a reduce stage on the aggregate. This is not as efficient
as a real Map/Reduce architecture due to all the (in-memory) data
serialization involved, but it works on a similar principle.
The time axis
Data entries are inherently timestamped. We need to support two
separate use cases, which are practically orthogonal:
- aggregate queries across multiple keys but at the latest timestamp
- historical queries for a key across time
To do this we maintain two separate sets of tables for each data type,
the latest one and the historical one. Pre-defined queries then
specify over which table they are expected to run. This solution keeps
latest queries fast, and allows historical ones (albeit slowly,
perhaps).
Data expiration
Since we offer aggregate queries, it is important that the database
only contains fresh data. Since data here is loaded by bulk loaders,
we need to expire old entries.
The simplest approach would be to expect that each bulk load
corresponds to a full load, and completely replaces the previous
data table type. However this is not well compatible with the
incremental approach adopted by our scanner scripts. We do something
different instead:
The responsibility for managing "freshness" is moved to the client
side by letting the client set a TTL for each item (presumably set to
a few multiples of the cron interval). Aggregate queries will then
only scan non-expired data items.
API
Insert
Set(key, value)
Point set, sets a single key / value pair at the current timestamp.
Load(items...)
Bulk load a large number of entries at the current timestamp.
Query
Get(key...)
Point get, retrieve the value associated with a key, at the latest
available timestamp. We support retrieving multiple keys at once for
latency reasons.
Query(query_name, [param, ...])
Run a pre-defined query, with optional parameters.
Example
Let's take an example scenario where we want to store disk usage for
certain types of resources. The data type will be usage
, with a
trivial value schema reporting the usage in bytes under the usage
attribute:
{"usage": 1234}
This would result in the following sample Set request:
{
"type": "usage",
"resource_id", "user@example.com",
"value_json": "{\"usage\": 1234}",
"timestamp": "2021-01-23T10:10:05+00:00",
"ttl": 86400
}
The shard ID will automatically be set to the one of the server,
resulting in the final key for the entry of (if shard_id is 1):
1/usage/user@example.com
The bulk data loader is just a daily cron job that runs "du" and
resolves paths to the resource ID they're associated to.
Queries like "top N users" can be something like (this would be the
query definition in the configuration file, note how we have to
specify the types of the result columns):
queries:
top_usage:
sql: |
SELECT resource_id, json_extract(value_json, '$.usage') AS usage
FROM latest
ORDER BY usage DESC LIMIT 10
results:
- { name: resource_id, type: string }
- { name: usage, type: int }
While a historical view of a resource usage is:
queries:
historical_usage:
sql: |
SELECT timestamp, json_extract(value_json, '$.usage') AS usage
FROM historical
WHERE key = :key
ORDER BY timestamp DESC
results:
- { name: timestamp, type: timestamp }
- { name: usage, type: int }
which can then be invoked with the following Query request specifying
the value for the key parameter:
{
"query_name": "historical_usage",
"params": [
{
"name": "key",
"value": "1/usage/user@example.com"
}
]
}
Suppose now we would like to aggregate the top N data over all the
service shards. We can then define a reduce stage for the query,
which in this case will simply compute once again the same top N
calculation over the aggregated results from the various shards:
queries:
top_usage:
sql: |
SELECT resource_id, json_extract(value_json, '$.usage') AS usage
FROM latest
ORDER BY usage DESC LIMIT 10
results:
- { name: resource_id, type: string }
- { name: usage, type: int }
reduce_sql: |
SELECT resource_id, json_extract(value_json, '$.usage') AS usage
FROM :table
ORDER BY usage DESC LIMIT 10
reduce_results:
- { name: resource_id, type: string }
- { name: usage, type: int }
Note that we could have omitted the reduce_results attribute since
it's the same as results (assumed to be the default).
The only difference with the primary SQL query is the :table
parameter: the reduce phase runs on a temporary data table, this will
be replaced by its name at execution time.
We can then ask the server to query all shards (here called 1, 2
and 3) and reduce the results:
{
"query_name": "top_usage",
"params": [
{
"name": "key",
"value": "1/usage/user@example.com"
}
],
"shards": ["1", "2", "3"]
}