factable

module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2020 License: Apache-2.0

README

.. image:: https://circleci.com/gh/LiveRamp/factable.svg?style=svg
    :target: https://circleci.com/gh/LiveRamp/factable

Overview
========

Factable is a service for reporting over event-sourced application data. It
features real-time updates, unbounded look-back, extremely fast queries, high
availability, and low total cost of ownership.

It's well suited for domains where the structure of queries is known in advance,
and derives its efficiency from continuous pre-aggregation of those structures.
In the nomenclature of traditional data warehousing, Factable maintains online
*summary* tables which are updated with each committed relation row (ie, message).
It is not a general-purpose OLAP system, but works best as a means for cost-
efficient "productionization" of insights discovered by internal analysts.

LiveRamp uses Factable to power structured and limited ad-hoc queries over
systems authoring tens of billions of messages every day, with look-back
windows spanning months and even years.

Background
==========

Event-sourced applications built atop Gazette are often managing large numbers
of granular messages which are stored across a set of Journals, each partitions
of a conceptual "topic".

Application owners often want to report on messages by characterizing a set of
interesting extractable dimensions and metrics, and then aggregating across all
messages of a topic to compute a summary, or "fact" table.

A challenge with streaming applications is that the universe of messages is
unbounded and continuously growing, at each moment changing the summary.
A few strategies have evolved to cope with this dilemma:

- A common strategy is to window messages (eg, by hour or day) and process
  periodic MapReduce batches which crunch fact table updates within the window.
  Processed summary updates are usually much, much smaller than the space of input
  messages, and from there a variety of warehousing options exist for summary
  storage and fast queries. This strategy is cost effective, but has high latency.
  Grouping across windows and handling out-of-window data can also pose problems.

* Several recent systems (BigQuery, RedShift, MemSQL, ClickHouse) focus on fast
  ingest and management of very large table sizes and highly parallel query
  execution. These systems are powerful and flexible for in-house analytics and
  OLAP, with low ingest latency, but require massive storage and compute resource
  commitments for acceptable performance, and can be expensive to operate. Query
  execution can be fast (seconds), but is fundamentally constrained by the
  underlying table size, which is 1:1 with input messages. Retaining very long
  look-backs can be prohibitively expensive, and typically some sort of
  TTL caching layer must be built on top to mitigate query cost & latency in
  support of dash-boarding and client-facing reports.

- Specialized databases (eg time series databases like InfluxDB) place more
  constraints on the structure of messages and queries, in exchange for more
  cost-effective performance. These trade-offs make them well suited for a
  number of use cases, but limit their general applicability.

* `PipelineDB <http://www.pipelinedb.com/>`_ and its "continuous view"
  abstraction shares commonalities with Factable. As a PostgreSQL extension,
  PipelineDB also offers rich query capabilities and full compatibility with its
  ecosystem. However, PipelineDB is by-default limited to a single machine (though
  a paid offering does include clustering support). There is also no capability to
  efficiently back-fill a new continuous view over historical messages.

Factable is a semi-specialized solution that focuses on the problem of building,
storing, and quickly querying summary tables. Summaries are expected to be smaller
than the underlying relation, but may still be quite large, with storage scaled
across many machines. It provides a limited set of query-time features such as
grouping and filtering (including predicate push-down) but does not implement
generalized SQL. Its design intent is to provide only those query capabilities
which having very efficient and low-latency implementations, in keeping with
Factable's intended use-cases of powering fast client reports.

The Gazette consumers framework is leveraged heavily to power Factable's
capabilities for efficient view back-fill, in-memory aggregation, horizontal
scaling, and high availability. Like all Gazette consumers, Factable instances
are ephemeral, require no pre-provisioning or storage management, and deploy
to commodity hardware via an orchestrator such as Kubernetes.

QuickStart
==========

.. code-block:: console

    # Install git submodules so docker can build
    $ git submodule update --init

    # Install required cli tools for quickstart
    $ go install -tags rocksdb ./cmd/...

    # Build and test Factable.
    $ docker build -t liveramp/factable .

    # Bootstrap local Kubernetes, here using the microk8s context,
    # and deploy a Gazette broker cluster with Etcd.
    $ ~/path/to/github.com/LiveRamp/gazette/v2/test/bootstrap_local_kubernetes.sh microk8s
    $ ~/path/to/github.com/LiveRamp/gazette/v2/test/deploy_brokers.sh microk8s default

    # Deploy Factable with the example "quotes-extractor", and using the "Quotes" schema.
    $ ./test/deploy_factable.sh microk8s default

    # Fetch the current schema from the service.
    $ ~/go/bin/factctl schema get
    INFO[0000] fetched at ModRevision                        revision=13
    mappings:
    - name: MapQuoteWords
      desc: Maps quotes to constituent words, counts, and totals.
      tag: 17
    dimensions:
    - name: DimQuoteWordCount
      type: VARINT
      desc: Returns 1 for each quote word.
      tag: 22
    ... etc ...

    # View quotes input journal, and DeltaEvent partitions. Note the fragment store
    # endpoint (we'll need this a bit later).
    $ ~/go/bin/gazctl journals list --stores
    +------------------------------------------+-------------------------------------------------------------------------------------------------+
    |                   NAME                   |                                             STORES                                              |
    +------------------------------------------+-------------------------------------------------------------------------------------------------+
    | examples/factable/quotes/input           | s3://examples/fragments/?profile=minio&endpoint=http%3A%2F%2Fvigilant-crab-minio.default%3A9000 |
    | examples/factable/vtable/deltas/part-000 | s3://examples/fragments/?profile=minio&endpoint=http%3A%2F%2Fvigilant-crab-minio.default%3A9000 |
    | examples/factable/vtable/deltas/part-001 | s3://examples/fragments/?profile=minio&endpoint=http%3A%2F%2Fvigilant-crab-minio.default%3A9000 |
    | examples/factable/vtable/deltas/part-002 | s3://examples/fragments/?profile=minio&endpoint=http%3A%2F%2Fvigilant-crab-minio.default%3A9000 |
    +------------------------------------------+-------------------------------------------------------------------------------------------------+

    # Publish a collection of example quotes to the input journal. Optionally wait
    # a minute to ensure fragments are persisted, in order to test backfill.
    $ ~/go/bin/quotes-publisher publish --broker.address=http://${BROKER_ADDRESS}:80 --quotes=pkg
    INFO[0000] done

    # "Sync" the set of Extractor & VTable shards with the current Schema and
    # DeltaEvent partitioning. Sync will drop you into an editor to review and tweak
    # ShardSpecs and JournalSpecs before application. When editing recoverylogs, we'll
    # need to fill in our Minio fragment store endpoint listed above. 
    $ ~/go/bin/factctl sync
    INFO[0000] listed input journals                         numInputs=1 relation=RelQuoteWords
    INFO[0000] shard created                                 backfill= id=33f40b6c5936b918c98fb7bc journal=examples/factable/quotes/input view=MVWordStats
    INFO[0000] shard created                                 backfill= id=6a57b4e07c9316aa1b98adda journal=examples/factable/quotes/input view=MVQuoteStats
    INFO[0000] shard created                                 backfill= id=1fd39b77d017766553ac97e6 journal=examples/factable/quotes/input view=MVRecentQuotes

    # Query the "MVQuoteStats" view. Note that views caught up with our Quotes,
    # despite being created after they were published.
    $ ~/go/bin/factctl query --path /dev/stdin <<EOF
    materializedview: MVQuoteStats
    view:
        dimensions:
        - DimQuoteAuthor
        - DimQuoteID
        metrics:
        - MetricSumQuoteCount
        - MetricSumWordQuoteCount
        - MetricSumWordTotalCount
        - MetricUniqueWords
    EOF
    e. e. cummings  9473    1       5       5       5
    e. e. cummings  9474    1       9       9       9
    e. e. cummings  9475    1       9       9       9
    e. e. cummings  9476    1       30      35      30
    e. e. cummings  9477    1       4       4       4
    e. e. cummings  9478    1       15      17      15
    e. e. cummings  9479    1       7       7       7

    # Publish the examples again. Expect queries now reflect the new messages.
    $ ~/go/bin/quotes-publisher publish --broker.address=http://${BROKER_ADDRESS}:80 --quotes=pkg
    INFO[0000] done

    # Let's try running a back-fill. First, fetch the schema for editing. Note the returned revision.
    # Edit to add an exact copy of MVQuoteStats (eg, MVQuoteStats2) with a new tag.
    $ ~/go/bin/factctl schema get > schema.yaml
    # Now apply the updated schema. Use your release instance name, and previously fetched revision.
    $ ~/go/bin/factctl schema update --path schema.yaml --instance opulent-wombat --revision 13
     
    # We want to be sure that input journal fragments have been persisted to cloud storage
    # already (eg, Minio). We can either wait 10 minutes (its configured flush interval),
    # or restart broker pods.
     
    # Also, we want to tweak the fragment store used by this journal to use the
    # raw Minio IP rather than the named service. This just lets us read signed
    # URLs returned by Minio directly from our Host, outside of the local
    # Kubernetes environment. Eg, update:
    #   s3://examples/fragments/?profile=minio&endpoint=http%3A%2F%2Fgoodly-echidna-minio.default%3A9000
    # To:
    #   s3://examples/fragments/?profile=minio&endpoint=http%3A%2F%2F10.152.183.198%3A9000
    $ ~/go/bin/gazctl journals edit -l app.gazette.dev/message-type=Quote
    
    # Run sync again, this time asking it to create a back-fill job.
    # Note that this time, we don't have to fill out the recovery log fragment store.
    # The tool infers values for new journals & shards from those that already exist.
    $ ~/go/bin/factctl sync --create-backfill
    INFO[0000] listed input journals                         numInputs=1 relation=RelQuoteWords
    INFO[0000] shard created                                 backfill=sure-pony id=6e740c5e0777300ac155508e journal=examples/factable/quotes/input view=MVQuoteStats2

    # Try running a query against MVQuoteStats2. It returns no results.

    # Extractor shards in need of back-fill are annotated with a label to
    # that effect. List all extractor shards with current back-fill labels.
    $ ~/go/bin/gazctl shards list -l app.factable.dev/backfill -L app.factable.dev/backfill
    +--------------------------+---------+---------------------------+
    |            ID            | STATUS  | APP FACTABLE DEV/BACKFILL |
    +--------------------------+---------+---------------------------+
    | 6e740c5e0777300ac155508e | PRIMARY | sure-pony                 |
    +--------------------------+---------+---------------------------+

    # Create specifications for our backfill job. Require that only fragments
    # 6 hours old or newer should be filled over. The job will read each input
    # fragment just once, and compute all extracted views simultaneously. It is
    # a good idea to bundle related view updates into a single sync & backfill.
    $ ~/go/bin/factctl backfill specify --name sure-pony --max-age 6h
    INFO[0000] generated backfill specification              spec=sure-pony.spec tasks=sure-pony.tasks
    Test your backfill job specification with:

    head --lines=1 sure-pony.tasks \
            | my-backfill-binary map --spec sure-pony.spec \
            | sort --stable --key=1,1 \
            | my-backfill-binary combine --spec sure-pony.spec

    # Locally run the back-fill as a map/reduce.
    $ cat sure-pony.tasks \
      | ~/go/bin/quotes-backfill map --spec sure-pony.spec \
      | sort --stable --key=1,1 \
      | ~/go/bin/quotes-backfill combine --spec sure-pony.spec > sure-pony.results

    # Back-fill jobs produce row keys and aggregates using hex-encoded key/value
    # format intended for compatibility with Hadoop Streaming.
    $ head -n 5 sure-pony.results
    9f12652e20652e2063756d6d696e67730001f72503      899191b548594c4c01000000000000000000000048628441a6844ca28440be804d74804d818440e780416e80416c8449cf
    9f12652e20652e2063756d6d696e67730001f72504      89a6abf048594c4c01000000000000000000000042849443f980439e80415084434c901e8442658044838c40e18441a784404c84413
    9f12652e20652e2063756d6d696e67730001f72505      898c8ca648594c4c010000000000000000000000515e8841eb8062e18441508c487d
    9f12652e20652e2063756d6d696e67730001f72506      899799c548594c4c010000000000000000000000428494468190425684405e804209800c8845f28441b280438d80
    9f12652e20652e2063756d6d696e67730001f72507      898f8faf48594c4c01000000000000000000000042ae845ea48443148442ef8446888043b6804b35804329

    # Load the backfill results into DeltaEvent partitions.
    $ ~/go/bin/factctl backfill load --name sure-pony --id 0 --path sure-pony.results

    # Now query to compare MVQuoteStats and MVQuoteStats2. They return identical results!
    # Try "accidentally" loading the backfill results a second time. The second load
    # is ignored (de-duplicated), and the views continue to return the same results.
    # 
    # Now try publishing Quotes again. Note that both views update with new counts, as expected.
    
    # Clear the backfill, by simply removing the label from its extractor shards.
    $ ~/go/bin/gazctl shards edit -l app.factable.dev/backfill=sure-pony
    INFO[0005] successfully applied                          rev=122


Architecture
============

Schema Model
------------

Factable requires a "Schema" which describes the shape of user relations,
and the specific views Factable is expected to derive. A schema is defined by:

Dimensions
~~~~~~~~~~
Dimensions are fields which may be extracted from journal messages.

:Name: Short, unique name of the Dimension.
:DimensionType: Type of Dimension fields (string, integer, time, etc).
:DimTag: Unique, immutable integer tag identifying the Dimension.

For each Dimension, the user must provide an "extractor" function of the
appropriate type and registered under the corresponding DimTag. Extractor
functions accept mapped messages and return concrete dimension fields.

Metrics
~~~~~~~
Metrics are measures which may be calculated from a dimension (eg, given a
"cost" dimension, a metric might sum over it). Factable metrics included simple
aggregates like sums and gauges, as well as complex sketches like HyperLogLogs
and HyperMinHashes.

:Name: Short, unique name of the Metric.
:Dimension: Name of the Dimension from which this Metric is derived.
:MetricType:
  Type of the Metric (integer-sum, HLL(string), float-sum, int-gauge, etc).
  Must be consistent with the Dimension type.
:MetTag: Unique, immutable integer tag identifying the Metric.

Mappings
~~~~~~~~
For some use cases, messages may be de-normalized with respect to how the
message might be modeled in a traditional data warehouse. For example, a
single PurchaseEvent might capture multiple product SKUs. To build a relation
at the product SKU grain, one first _maps_ the PurchaseEvent into a distinct row
for each (PurcahseEvent, product SKU) tuple.

Factable Mappings similarly define the means of deriving rows from messages.

:Name: Short, unique name of the Mapping.
:MapTag: Unique, immutable integer tag identifying the Mapping.

Like Dimensions, the user must provide a function registered under the MapTag
which converts an input message into zero or more ``[]factable.RelationRow``.

Relations
~~~~~~~~~
Factable leverages the insight that a traditional warehouse "relation" can also
be defined in terms of application messages *already stored* across a set of
Gazette Journals. Journals are immutable, which means relation rows can be
reliably and repeatedly enumerated directly from the source journals.

:Name: Short, unique name of the Relation.
:Selector: Gazette label selector of Journals capturing Relation messages.
:Mapping: Name of the Mapping applied to messages to obtain relation rows.
:Dimensions: List of Dimensions which may be extracted from relation rows.
:RelTag: Unique, immutable integer tag identifying the Relation.

Materialized Views
~~~~~~~~~~~~~~~~~~
Views summarize a Relation, which may have extremely high cardinality, into a
tractable reduced set of dimensions and measures. When a view is created, it
first fills over all historical messages of relation journals until reaching the
present. Thereafter the view is updated continuously by reading messages as they
commit to relation journals, and updating the metric aggregates of affected view
rows. Unless directed otherwise, views in Factable always reflect all messages of
the relation, regardless of when the view was created.

:Name: Short, unique name of the MaterializedView.
:Relation: Name of the Relation materialized by this view.
:Dimensions:
  Ordered Dimensions summarized by the view. Views are indexed by the natural
  sort order of extracted Dimension fields. The chosen order if view fields is
  therefore essential to performance when filtering and grouping over the row.
  Eg, filtering over a leading dimension allows for skipping over large chunks
  of view rows. Filtering a trailing dimension will likely require examining
  each row.

  Similarly, grouping over a strict prefix of a view's dimensions means the
  natural order of the query matches that of the view, and that queries can be
  executed via a linear scan of the view. Grouping over non-prefix dimensions
  is still possible, but requires buffering, sorting, and re-aggregation at
  query-time.

:Metrics: Metrics aggregated by the view.
:Retention:
  Optional retention which describes the policy for view row expiration.
  Eg, rows should be kept for 6 months with respect to a time Dimension
  included in the view.
:MVTag: Unique, immutable integer tag identifying the MaterializedView.

*Discussion*
~~~~~~~~~~~~
A Schema must be referentially consistent with itself--for example, a Metric's
named Dimension must exist, with a type matching that of the Metric--but may
change over time as entities are added, removed, or renamed. An entity's *tag*,
however, is immutable, and plays a role identical to that of Protocol Buffer tags.
Schema transitions are likewise constrained on tags: it is an error, for example,
to specify a Dimension with a new name and new type but using a previously defined
DimTag.

Processing
----------
Factable separates its execution into an *Extrator* service and a *VTable*
service, communicating over a set of *DeltaEvent* journal partitions.

Extractor Consumer
~~~~~~~~~~~~~~~~~~
The *Extrator* consumer maps messages into relation rows, and from there to
extracted Dimension fields and Metric aggregates. Each extractor ShardSpec
composes a source journal to read with a MaterializedView to process. As
each consumer shard manages its own read offsets, this allows multiple views
to read the same relation journal at different byte offsets--including from byte
offset zero, if the journal must be filled over for a newly-created view.

Extractor must be initialized with a "registry" of domain-specific
``ExtractFns``: user-defined functions indexed on tag which implement
mappings over user message type(s), and extraction of user dimensions. Extractor
consumer binaries are compiled by the user for their application. After
instantiating ``ExtractFns``, the binary calls into an ``extractor.Main``
provided by Factable.

Each processed source message is mapped to zero or more ``[]RelationRow``,
and fed into the defined dimension extractors to produce a "row key". The row key
consists of extracted dimension fields of the view, encoded with an order-
preserving byte encoding and prefixed with the view's MVTag. Importantly, the
natural ordering of these ``[]byte`` keys matches that of the unpacked view
fields tuple. Factable relies on this property to index, scan, and filter/seek
over views represented within the flat, ordered key/value space provided by RocksDB.

Within the scope of single consumer transaction, the extractor combines messages
producing the same row key by updating the key's aggregates in place, and in-memory.
Many practical applications exhibit a strong "hot key" effect, where messages
mapping to a particular row key arrive closely in sequence. In these cases especially,
in-memory combining can provide a *significant* (1 to 2 orders of magnitude)
reduction in downstream Gazette writes.

At the close of a current consumer transaction, the extractor emits each row-key
and set of partial aggregates as a DeltaEvent message. DeltaEvents are mapped to
partitions based on row key, meaning that all DeltaEvents of a given row key
produced by any extractor will arrive at the same partition (and no other), and
that partitions (and the VTable database which index them) generally hold non-
overlapping portions of the overall view key-space.

A two-phase commit write protocol is used to guarantee exactly-once processing
of each DeltaEvent message by the VTable service.

No significant state is managed by the extractor consumer--just a small amount
of metadata in support of the 2PC protocol. At the completion of each consumer
transaction, the previously combined and emitted row keys and aggregates are
discarded. As a result, even after combining it's likely to see repetitions of
specific row key DeltaEvents co-occurring closely, both emitted from a single
extractor shard, as well as across different extractor shards. Further
aggregation is done by the VTable consumer.

VTable Consumer
~~~~~~~~~~~~~~~
The *VTable* consumer provides long-term storage and query capabilities of
materialized views, powered by its consumer Shard stores. A single VTable
deployment services *all* views processed by Factable: as each view row-key is
prefixed by its MVTag, view rows are naturally grouped on disk, and at query
time the MVTag is treated as just another dimension.

Each VTable consumer Shard reads row-keys and aggregates from its assigned
DeltaEvents partition, and aggregates updates to row-keys into its local store.
Shard stores are configured to run regular RocksDB compactions, and a compaction
filter is used to enforce view retention policies and the removal of dropped views.
As DeltaEvents are mapped to partitions on row-key when written, each shard store
will generally hold a non-overlapping and uniformly-sized subset of view rows
(with the caveat that changing DeltaEvent partitioning can result in a small
amount of overlap, which is not a concern).

VTable Queries
~~~~~~~~~~~~~~
VTable consumer shards surface a gRPC Query API. Queries are defined by:

:MaterializedView:
  Name of the view to query. In the future, Factable may infer an appropriate
  view from a named relation to query ("aggregate navigation").
:Dimensions:
  Dimensions to query from the view. Query results are always returned in
  sorted order by query dimensions, and only unique rows are returned.
  View dimensions not included as query dimensions are grouped over.

  Where possible, prefer to query dimensions which are a prefix of the view's,
  as this allows the query executor to best leverage the natural index order.
  Queries having this property are extremely efficient for the executor to
  evaluate, up to and including full queries of very large views.

  Queries over non-prefix dimensions are allowed, but require that the executor
  buffer, sort, and re-aggregate query rows before responding. The executor
  will limit the size of result sets it will return.
:Metrics: Metrics to return with each query row.
:Filters:
  Dimensions to filter over, and allowed ranges of field values for each
  dimension (also known as a "predicate").

  The query executor scans the view using a RocksDB iterator, and filters
  are used to "seek" the iterator forward to the next admissible view key
  given filter constraints.

  WHere possible, use filters and prefer to filter over leading dimensions
  of the view. This allows the query executor to efficiently skip large
  portions of the view, reducing disk IO and compute requirements.


As each shard store holds disjoint subsets of the view, VTables serve queries
using a distributed scatter/gather strategy. The VTable instance which receives
the query coordinates its execution, by first scattering it to all VTable shards.
Each shard independently executes the query, applying filter predicates and
grouping to the requested query dimensions and metrics, and streams ordered
query rows back to the coordinator. The coordinator then merges the results of
each shard into final query results. Coordinator merges need only preserve the
sorted ordering produced by each shard, and it can efficiently stream the
result set to the client as the query is evaluated by shards.

Syncing Shards & Backfills
~~~~~~~~~~~~~~~~~~~~~~~~~~
The Extractor maintains a shard for each tuple of (MaterializedView, InputJournal).
Likewise the VTable service maintains a shard for each DeltaEvent partition. As
new MaterializedViews are added or dropped, or the partition of relation journals
change, or the partition of DeltaEvents change, then the set of Extractor and/or
VTable shards can become out-of-sync.

Factable provides a CLI tool (``factctl sync``) which examines the current
Schema, the set of journals matching Relation label selectors, and DeltaEvent
partitions. It determines the sets of ShardSpecs and JournalSpecs to be added or
removed, gives the user an opportunity to inspect or tweak those specifications,
and sets them live against the running service deployments. ``factctl sync``
should be run after each partitioning or schema change.

By default, Extractor ShardSpecs created by ``factctl sync`` begin reading
from input journals at byte zero. In most cases this is ideal, as the Extractor
will quickly crunch through the journal backlog. For very, very large journals,
it may be more efficient to start the Extractor shard at a recent journal offset,
and then separately crunch historical portions of the journal as a large-scale
map/reduce job. For these cases, ``factctl sync --create-backfill`` will define
a backfill job, and ``factctl backfill specify`` will create job specifications
for use with eg Hadoop Streaming.

Directories

Path Synopsis
cmd
examples/quotes-backfill
Package quotes-backfill runs backfill.Main with `quote` package ExtractFns.
Package quotes-backfill runs backfill.Main with `quote` package ExtractFns.
examples/quotes-extractor
Package quotes-extractor runs the extractor.Extractor consumer with `quote` package ExtractFns.
Package quotes-extractor runs the extractor.Extractor consumer with `quote` package ExtractFns.
vtable
Package vtable runs the vtable.VTable consumer.
Package vtable runs the vtable.VTable consumer.
pkg
hll

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL