Dividing Infinity - Distributed Partitioning Schemes

2016-08-15 16:00:00 UTC by Paul Asmuth

This is the second post in a series discussing the architecture and implementation of massively parallel databases, such as Vertica [0], BigQuery [1] or EventQL [2]. The target audience are software and system engineers with an interest in databases and distributed systems.

In the last post we saw that in order to execute interactive queries on a large data set we have to split the data up into smaller partitions and put each partition on it's own server. This way we can utilize the combined processing power of all servers to answer the query rapidly.

The problem we'll discuss today is how exactly we're going to split up a given dataset into partitions and distribute them among servers.

Say we have a table containing a large number of rows, a couple billion or so. The total size of the table is around 100TB. Our task is to distribute the rows uniformly among 20 servers, i.e. put roughly 5TB of the table on each server.

Of course, solving that task is trivial: We read in our 100TB source table, write the first 5TB of rows to the first server, the next 5TB to the second server and so on.

While this simplistic scheme works well for a static dataset, we'll have to be more clever if we are to implement an entire database that supports adding and modifying rows.

Why? Consider this: If we want to modify a row in our naively partitioned table, we first have to figure out on which server we have put the row when splitting the table into pieces. Now, to find the row, we have to search through all rows until we hit the correct one. In the worst case we would have to examine all rows on all servers to find any single row - the whole 100TB of data.

In more technical terms: Locating a row has linear complexity [3]: Finding the row in a table containing one thousand rows would take one thousand times longer than finding the row in a table containing just one row. It gets slower and slower as we add more rows.

Clearly, our simplistic solution will not scale: We need a more efficient way to figure out on which server a given row is stored.

To quickly tell the location of a specific row, we could store an index file somewhere that records the location of each row. We could then do a quick lookup into our index to find the correct server instead of searching through all the data.

Sadly, this doesn't solve the problem. The index file would still have linear complexity. Allthough this time it wouldn't be linear in time, but in space: If we were storing a bazillion rows in our table, our index file would also have a bazillion rows.

Essentially, we would just be rephrasing the problem statement from "How do we partition a large table?" to "How do we partition a large index file?".

We'll have to come up with a partitioning scheme that allows us to find rows with less than linear complexity. I.e. we have to find an algorithm that can correctly compute the location of any single row, but doesn't get slower and slower as we add more rows to the table.

Modulo Hashing

One such algorithm is called modulo hashing. The good thing about modulo hashing is that it's not only very efficient but also extremely simple to implement.

If we want to partition our input table using modulo hashing, we first have to assign an identifier ID to every row in the table. This ID is usually dervied from the row itself, for example by designating one of the table's columns as the primary key. For illustration purposes, we will use numeric identifiers, but the same works with strings. [4]

The only piece of informationthat modulo hashing keeps is a single variable N. This variable N contains the number of servers among which the table should be partitioned.

Now, to figure out on which server a given row belongs, we simply compute the remainder of the division of the row's ID over N. This use of the modulo operation is also where the algorithm derives its name from.

find_row(row_id) {
  server_id := row_id % N;
  return server_id;

If, for example, we wanted to locate the row with ID=123 in a table partitioned among 8 servers (N=8), the row would be stored on server number 3 (123 % 8 = 3).

Of course, this is assuming we have also used the algorithm to decide on which server to put each row while loading the input table in the first place.

Modulo hashing works out so that every possible ID is consistently mapped to a single server. The distribution of the rows among servers will be approximately uniform, i.e. every server will get roughly the same number of rows. [5]

The modulo hashing algorithm is a huge improvement over our naive approach as it is constant in space and time: Locating a row takes the same amount of time regardless of the total number of rows in the table. And the only pieces of information we need to tell where a row goes are the row's ID and the total number of servers N, which will only take a few bytes to store. Not bad.

So are we done? I'm afraid we're not.

One thing modulo hashing can't handle well is a growing dataset. If we're continually adding more rows to the table, we will eventually have to add more capacity and increase the number of servers N. However, once we do that the locations of almost all of the rows would change, since changing N also changes the result of all modulo operations.

This means that in order to increase the number of servers N and still keep the rows where they belong, we would have to copy every single row in the table to it's new location every time we add or remove a server.

That's not exactly ideal: Even if we did not care about the massive overhead of copying every single row, we would eventually reach a point where our table grows faster than we can rebalance it.

Consistent Hashing

Consistent hashing is a more involved version of modulo hashing. The main improvement of consistent hashing is that it allows to add and remove servers without affecting the locations of all rows.

At the heart of the consistent hashing algorithm is a so called circular keyspace. Before we discuss what exactly that means let's define the term keyspace:

As with modular hashing we need to assign an identifier ID to each row. Now, the keyspace is the range of all valid ID values. If the ID is numeric, the keyspace is the range from negative infinity to positive infinity.

A keyspace and the positions of three row identifiers within the keyspace.

Within the keyspace, identifiers are well-ordered [6]. That means each ID has a successor and a predecessor. The successor of an ID is the ID that goes immediately after it and the predecessor is the one that goes immediately before it.

In the illustration above, the successor of green (ID=123) is red (ID=856), the successor of red (ID=856) is yellow (ID=923) and so on.

But what is the successor of yellow (ID=923)? Does it have one? The answer is not entirely clear. However, we will later have to come up with a successor for each possible position in the keyspace, so we will have to define what the successor of the last position in our keyspace is.

Imagine, we glued one end of our keyspace to the other end:

A circular keyspace and the positions of three row identifiers within the keyspace.

Now we can clearly say that, in clockwise order, yellow's (ID=923) successor is green (ID+123). Also, it finally looks like a circular keyspace!

Back to consistent hashing. Like we did with modulo hashing, we will choose an initial number of servers N. For each of the N servers, we will put a marker at a random position in the circular keyspace.

To decide on which server a given row ID belongs, we first locate the position of the ID in the circular keyspace and then search clockwise for the next server marker. In other words: each row goes onto the server whose marker immediately succeeds the row-id's position in the keyspace.

The illustration below shows a circular keyspace with eight server markers. In the illustration, the succeeding server marker for row 123 is server 1, while the suceeding server marker for rows 856 and 923 is server 3.

So row 123 gets stored on server 1 while rows 856 and 923 get stored on server 3.

A circular keyspace with three row identifiers and eight server markers.

Alike the modulo hashing scheme, we still end up with a uniform distribution of rows among servers [7] and can quickly locate each row. The only information we have to store are the positions of each server's marker in the keyspace.

Additionally, any server marker that we add or remove will only affect the rows immediately between it and the previous marker: The location of all other rows remain consistent, hence the name consistent hashing.

This means we can add or remove servers and only have to move a small subset of the rows into their new locations. The ratio of rows that needs to be moved is 1/N where N is the number of servers. So as we add more servers, the percentage of rows that need to be moved actually gets smaller.

Can we still do better? It depends on your usecase. Consistent hashing is successfully employed in a number of popular key/value databases [8] such as DynamoDB [9], Cassandra [10] or memcache [11]. Nevertheless, here are two things that we could improve about consistent hashing:

Firstly, consistent hashing only supports an exact lookup operation. That is, we can only find a row quickly if we already know it's ID. If we want to find the locations of all rows in a range of IDs, for example all rows with an ID between 100 and 200, we're back to scanning the full table.

Because of this, consistent hashing is particularly well suited for key/value databases where range scans are not required but less so for OLAP [12] systems like EventQL.

The other possible improvement is that we still have to copy roughly 1/N of the table's rows after changing the number of servers. If we had 100TB on 20 servers, that would mean we're - realistically - still copying at least 15TB for every server addition [13]. Not too bad, but still a lot of overhead network traffic.

The BigTable Algorithm

The last algorithm we will look at today is best known for it's publication in Google's BigTable paper [14].

The bigtable algorithm takes a completely different approach to the problem, but it also starts by defining a keyspace. Except this time it's not a circular keyspace, but a linear one.

The illustration below shows a bigtable keyspace and the position of three rows with the identifiers 123, 856 and 923 within the keyspace.

The next thing bigtable does is to split up the keyspace into a number of partitions that are defined by their start and end positions, i.e. by the lowest and highest row identifiers that will still be contained in the partition.

The illustration below shows a keyspace that is split into five parititions A-E. In the illustration, the row with ID=123 goes into partition B and the rows with ID=856 and ID=923 both go into partition D.

Three record identifiers are mapped onto five partitions

Now, the clever bit about the bigtable algorithm is how it comes up with the partition boundaries. To see why it's clever we have to understand why we can't simply divide the keyspace into equal parts without knowing the exact distribution of the input data:

One reason for that is that if you split up the range from negative to positive infinity into a list of discrete partitions, you end up with an infinite number of partitions.

The other reason is that it's highly likely that the row identifiers will all be piled up in a small area of the keyspace. After all, the identifiers might be user-supplied so we can't nessecarily guarantee anything about their distribution.

Realistic distribution of record identifiers in the keyspace

So it could be, that even though we have split up the keyspace into a large number of partitions, all rows actually end up in the same partition. And we can't solve the problem by making the partitions infinitesimally small either - that would be like going to back to keeping track of every row's location individually, just a bit worse.

Here's how bigtable solves the problem: Initially the table starts out with a single partition that covers the whole keyspace - from negative infinity to positive infiity.

As soon as this first partition has become too large, it will be split in two. The split point will be chosen so that it roughly halves the data in the partition into equal parts. This continues recursively as partitions become too large. At a basic level, it's an application of the classic divide and conquer principle [15].

Partition D is splitting into partitions D1 and D2

This way, you always end up with a number of partitions that are roughly equal in size. Even though the distribution of row identifiers in the keyspace is initially unknown.

And since each partition is defined in terms of it's lowest and highest contained row identifier, we can easily implement efficient range scans: To find all rows in a given range of identifiers, we only have to scan the partitions with overlapping ranges.

Lastly, the bigtable scheme does require a second allocation layer to assign partitions to servers that we didn't discuss here. Suffice to say that this second allocation layer allows us to add new servers to a cluster without physically moving a single row. Of course, we still need to move around some rows every time we split a partition.


So is the bigtable algorithm really "better" than consistent hashing? Again, it depends on the usecase.

The upsides of the bigtable scheme are that it supports range scans and that we can add capacity to a cluster without copying rows. The major downside is that implementing the algorithm in a masterless system requires a fair amount of code and synchronization.

For EventQL, we still chose the bigtable algorithm as the clear winner. After reading this post, go have a look at the debug interface of EventQL where you can see the partition map for a given table. Hopefully it will make a lot more sense now:

Partition map for an EventQL table with a DATETIME primary key

That's all for today. In the next post we will discuss how to handle streaming updates on columnar files. You can subscribe to email updates for upcoming posts or the RSS feed in the sidebar.

[0] Andrew Lamb et al. (2012) The Vertica Analytic Database: C-Store 7 Years Later (The 38th International Conference on Very Large Data Bases) — http://vldb.org/pvldb/vol5/p1790_andrewlamb_vldb2012.pdf

[1] Sergey Melnik et al. (2010) Dremel: Interactive Analysis of Web-Scale Datasets (The 36th International Conference on Very Large Data Bases) — http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36632.pdf

[2] EventQL (2016) An open-source SQL database for large-scale event analytics &mdash http://eventql.io

[3] Complexity Theory on Wikipedia — https://en.wikipedia.org/wiki/Big_O_notation

[4] The usual way to do this is to first run the string through a hash function and then use (a subset of) the output of the hash function as a numeric identifier.

[5] To ensure uniform distribution, pre-process the identifiers with a suitable hash function (i.e. a hash function that guarantees uniform distribution of its outputs such as MurmurHash)

[6] Well-order on Wikipedia — https://en.wikipedia.org/wiki/Well-order

[7] Practical systems will apply a hash function to the row ID and assign more than one marker per server. Marker values are chosen randomly in usually a 160 bit keyspace. This works out so that the data is almost guaranteed to be uniformly distributed among servers.

[8] Key-value database on Wikipedia — https://en.wikipedia.org/wiki/Key-value_database

[9] Giuseppe DeCandia et al. (2007) Dynamo: Amazon’s Highly Available Key-value Store (21st ACM Symposium on Operating Systems Principles) — http://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf

[10] Avinash Lakshman, Prashant Malik (2009) Cassandra - A Decentralized Structured Storage System (ACM SIGOPS Operating Systems Review archive Volume 44 Issue 2) — https://www.cs.cornell.edu/projects/ladis2009/papers/lakshman-ladis2009.pdf

[11] memcached - a distributed memory object caching system — https://memcached.org/

[12] OLAP on Wikipedia — https://en.wikipedia.org/wiki/Online_analytical_processing

[13] Assuming a replication factor of 3. In practice, replication facotr and overhead combined may result in the data being copied 6-9 times.

[14] Fay Chang et al. (2006) Bigtable: A Distributed Storage System for Structured Data (7th USENIX Symposium on Operating Systems Design and Implementation) — http://static.googleusercontent.com/media/research.google.com/en//archive/bigtable-osdi06.pdf

[15] Divide and conquer on Wikipedia — https://en.wikipedia.org/wiki/Divide_and_conquer

Next up in the series:

Tagged with massively parallel databases, internals, eventql, bigquery, vertica, sql, columnar storage

Want to learn more about EventQL?

Learn more