9.2.4 Preaggregation

--------------------------------------------------------------------------------
EVENTQL - PRE-AGGREGATION
--------------------------------------------------------------------------------
v0.8 - September, 2016                             Paul Asmuth 

Table of Contents

  1. Introduction
  2. Definitions
  3. Design Overview
  4. Implementation Details
    4.1 Table Eligibility
    4.2 Configuration
    4.3 Aggregation Functions
    4.4 Pre-Aggregation Procedure
    4.5 Leader Election
    4.6 Load Balancing
    4.7 Changes to the ALTER statement
    4.8 Changes to the INSERT statement
    4.9 Changes to the SELECT statement
  5. Reliability Considerations
  6. Alternatives Considered
    6.1 Implement as standalone service
    6.2 Implement in the compaction routine
  7. Code Locations


1. Introduction

  One of the primary usecases of EventQL is multidimensional analysis of
  timeseries data. These type of analyses usually involve a table that is
  indexed by a DATETIME primary key and contains a number of other columns that
  can be classified into "dimensions" and "measurements".

  Dimensions are columns that may be referred in the GROUP BY statement of
  queries on the data. Measurements are (usually numeric) columns that will only
  be used in aggregations (i.e. from aggregate function in the SELECT list).

  For this usecase it's usually highly beneficial to pre-aggregate the
  measurements for each input dimension over a recurring time interval before
  writing them to disk.

  To see why this may result in huge performance increases, consider this
  somewhat realistic usecase: We're running an ad network and are displaying
  100,000 ad impressions per second. We want to measure the rate at which these
  ads are clicked with 1-minute granularity. To make things a bit more
  interesting, we also want to be able to get an individual click rate for each
  of the websites on which our ads are displayed.

  If our ads are displayed on 4,000 individual websites at any given time, the
  examples works out so that the size of the data after pre-aggreagtion would be
  roughly 0.07% of the size of the input data. A 1500x speedup.


2. Definitions

  We define an abstract pre-aggregation routine, that accepts a rowset N of
  rows for a given input time interval and produces another rowset M, so that
  any query that computes aggregations on the measurements (and optionally
  groups by time or one of the dimensions) will return the same result for both
  input rowsets N and M. The number of rows in M will always be less or equal
  to the number of rows in N.

  The length of the recurring aggregation time interval represents a lower bound
  on the time granularity over which the data can be grouped in queries. Hence
  we'll refer to the length of an individual aggregation time interval as the
  "granularity" of the aggregation from now on.

  We also define the "cardinality of an aggregation" for a given time interval
  as the number of rows in the output rowset M. Given an arrival rate of events
  R and a granularity G, the pre-aggregation function transforms R * G input
  rows into C output rows where C is the cardinality of the aggregation.


3. Design Overview

  The proposed approach is to implement pre-aggregation as a separate internal
  subsystem, touching as few of the core subsystems as possible.

  Pre-aggregation is optionally enabled for a table by setting a pre-aggregation
  config for the table (further specified in section 4).

  Once enabled, we elect N hosts as "aggregation nodes" for the table. Each
  insert into the pre-aggregation-enabled table will initially bypass the normal
  insert code and will instead be sent to one of the N aggregation hosts. Each
  aggregation node then performs the pre-aggreagtion routine locally on all
  received input rows and, at the end of the aggregation interval, forwards the
  output rows to the regular table insert procedure.

  Of course, this scheme will result in up to N * C instead of C output rows.
  While this would not impact the correctness of any aggregate queries over the
  data, it would still be visible to the user when doing a simple select on the
  table. For UX reasons, and because we consider it an implementation detail
  that should not have to be explained in user-facing documentation, we
  automatically insert a default aggregation clause into every SELECT select
  statement which refers to pre-aggregation-enabled tables. This default
  aggregation clause is statically derived from the pre-aggregation config.


4. Implementation Details

4.1 Table Eligibility

  Pre-aggregation may only be enabled for tables that meet the following
  requirements:

    - Must have a non-unique primary key of time DATETIME

4.2 Configuration

  We add the following fields ("the pre-aggregation config") to the table
  configuration (which in turn is stored in the coordination service).

    - granularity            -- The aggregation granularity
    - num_servers            -- The number of servers that should run the aggr.
    - dimensions             -- A list of dimensions, for each dimension:
      - dimensions.column    -- The column name for the dimension
    - measures               -- A list of measures, for each measure:
      - measure.column       -- The column name for the measure
      - measure.aggr_fn      -- The aggregation function to be used

  The pre-aggregation config may be changed by the user at any time. On any
  change to either the pre-aggregation config or the table schema the server
  must check and update the pre-aggregation config in such a fashion that it
  never refers to non-existing columns.

4.3 Aggregation Functions

  An aggregation function is responsible for converting a number of scalar
  input values for a given measure, dimension and time interval into a single
  scalar aggregated value that will be stored into one column of the output row.

  Aggregation functions may optionally take a number of configuration
  parameters. The parameters are encoded as = pairs where key and
  value are both strings.

  Additionally, not every aggregation function is required to support every
  (output) column type.

  The following is not meant as a definite list of supported aggregation
  functions but mereley as the list of initially considered and supported
  methods:

    - COUNT
    - SUM
    - MEAN
    - MIN
    - MAX
    - DELTA
    - CARDINALITY_APPROX
    - QUANTILES
    - QUANTILES_APPROX

4.4 Pre-Aggregation Procedure

  The pre-aggregation procedure is run for a specific table and on a specific
  host. All the work it performs is done on the local machine and all data is
  kept in memory.

  Conceptually, the procedure stores a map of running aggregations. Each entry
  in the map contains a vector of aggregation states for each measure.

  The key for the map is another vector of strings. The key vector has one
  element for each dimension in the aggregation config plus one element for the
  time. The elements for the dimensions are set to the values of the respective
  columns in the input row (or NULL if the input row is missing the column). The
  time element is set to quotient of the input row's primary key column value
  and the granularity (using truncating division). Input rows without a primary
  key column value are rejected.

  For each incoming row, the matching entry is created or retrieved from the
  map of running aggregations. Then, the aggregation function for each measure
  is called with the corresponding column's value from the input row and the
  last aggregation state from the map entry. Finally, the result of the
  invocation is written back to the aggregation state in the map entry.

  Once a new entry is added to the aggregation map a timer is started that fires
  after G (granularity) seconds have passed one the wall clock. Once the timer
  fires, the entry is removed from the map and inserted into the table using
  the regular table insert mechansim (using the nominal time retrieved from
  the map's key as the rows primary key value).

4.5 Leader Election

  We use the coordination service to elect the N aggregating servers from the
  list of available servers for every table. The aggregating servers may, in
  principle, be randomly chosen from all live servers in the cluster.

  The implementation of the election routine is pluggable (as is the
  coordination service) and not in the scope of this document.

4.6 Load Balancing

  Each input row should be sent to one of the aggregating servers for the table
  in such a fashion that the distribution of rows over servers will be
  approximately uniformly. This could be implemented as a random pick for every
  row or a simple round-robin scheme.

  However, we're explicitly _not_ using a consistent assignment scheme based
  on the rows dimension as that would open the system up to hotspotting issues
  if the distribution of the input data is not uniform (which is likely).

4.7 Changes to the ALTER statement

  The ALTER TABLE must be changed to atomically remove columns from the
  pre-aggregation config when columns are removed from the table schema.

  Forthermore, the new ALTER TABLE SET PREAGGREGATION statement must be
  implemented.

4.8 Changes to the INSERT statement

  The INSERT statment must be modified to check if the table is
  pre-aggregation-enabled and if so, divert the insert into the pre-aggregation
  subsystem. A new flag INSERT_NOPREAGGR is added that optionally disabled this
  behaviour.

4.9 Changes to the SELECT statement

  The SELECT statement execution must be altered to insert an aggregation node
  for every query on a pre-aggregation-enabled table that does not already
  contain an eligible aggregation clause.

  The synthetic aggregation node is a group over the quotient of primary key
  column value and granularity (using truncating division) where all columns
  referred to from measurement definitions are aggregated using the respective
  aggregation function's output merge function.


5. Relability Considerations

  The proposed implementation has no provisions to handle pending data on server
  shutdown or server crashes, so in these events some input rows will be lost.
  As part of the separate writea-ahead-logging effort, we will add (optional)
  durable write-ahead storage for accepted input rows.

  A separate issue that is not addressed is that the API and implementation do
  not allow for exactly-once-inserts in the case of failures: If the client
  recevies an error code after sending an insert to one of the aggregating
  servers, it can't be sure if the value was stored or not so it can't correctly
  decide if it should retry or not.

  However, this is really more of a philosophical concern (solving the problem
  would defeat a large part of the benefits of pre-aggregation). Users that need
  strict exactly-once semantics can not use this optimization and should insert
  the rows individually, without pre-aggregation (which _is_ an idempotent
  operation and safe to retry).


6. Alternatives Considered

6.1 Implement as standalone service

  On first look, pre-aggregation might seem like a feature that does not belong
  in the core database, but could be provided as a separate service that
  performs the pre-aggregation and then simply stores the aggregated rows into
  the database.

  However, with this scheme it would be painful to support more complex
  aggregation modes, like estimating the number of unique strings or estimating
  quantiles.

  Both of these problems would be easily solvable using probabilistic data
  structures, but if the pre-aggregation was performed in a standalone service
  we would have to precompute and serialize these data structures before sending
  them to the database. And we can't treat the data as binary blobs in the
  database if we want to run useful (and fast) queries on it, so this
  would leave us with one of two choices:

  Either duplicate all data strcture code from the database into the standalone
  service or a fat client and then make sure that both copies always match 100%
  so the serialization works (which would be a PITA). Or alternatively, keep
  the code only in the database and send all the raw values to the database as
  part of the aggregated row (which would defeat the purpose of streaming
  aggregation).

6.2 Implement in the compaction routine

  One downside of the proposed approach is that with constant arrival rate R and
  granularity G, the number of stored rows for a given input stream still scales
  linearly in the number of hosts we assign to the stream (and therefore,
  strictly speaking also in the arrival rate R).

  One alternative solution, implementing the pre-aggregation in the partition
  compaction routine, avoids this and ends up with a constant number of stored
  rows regardless of the number of servers assigned to the aggregation.

  However, in the interest of keeping the design simple and minimally invasive
  to the other subsystems, modifying the compaction routine is left open for
  when/if the issue turns out to be a practical limitation.


7. Code Locations

  FIXME