9.2.2 Segment-Based Replication

--------------------------------------------------------------------------------
EVENTQL - SEGMENT BASED REPLICATION
--------------------------------------------------------------------------------
v0.1 - September, 2016                             Paul Asmuth <paul@eventql.io>

Table of Contents

  1. Design Overview
  2. Durability Considerations
  3. Implementation Details
    3.1 Compaction
    3.2 Partition Leader
    3.3 Segment Merge and Fast-Forward
    3.4 Replication Order
    3.5 Replication Procedure
    3.6 ABA Considerations
    3.7 Changes to the Server Join Operation
    3.8 Changes to the Partition Split Operation
  4. Affected Subsystems
    4.1 Binary Protocol Additions
    4.2 Changes to the Query Subsystem
  5. Rollout and Backwards Compatibility
  6. Alternatives Considered
    6.1 Row-based replication
    6.2 Operation-log-based replication
  7. Code Locations


1. Design Overview

  EventQL stores data as rows in tables. Each table is split into a list of
  partitions. Each partition contains a subset of the rows in the table. The
  partition itself is stored on disk as a list of segment files. A segment file
  in turn is an immutable data file containing one or more rows.

  These segment files form an abstract log structured merge tree: when rows are
  inserted or updated in a partition, the updates are first buffered in memory
  and after a threshold is reached, written out to disk as a new segment file.

  Now, the smallest unit of data that is replicated in EventQL are these segment
  files.

  Replication in EventQL is push-based, which means that a server that stores
  a segment file is resposible for making sure that all other servers which should
  store the segment file also have it.

  On a very high level the replication algorithm is thus very simple: Every time
  a server writes out a new segment file for a partition, it adds the segment file
  into a queue to be pushed out to every other server that should have it. On
  receiving a segment file, a server adds it to it's local log structured merge
  tree.

  If everything runs smoothly, all the EventQL replication does is to copy files
  in the 1-500MB range between servers. This is pretty much as cheap and efficient
  as it could get: we can easily saturate available IO and network bandwith with
  just using a minimal amount of CPU ressources. Also, the algorithm transfers
  only the minimal required amount (neglecting parity-based schemes) of data in
  the steady operation case. I.e each row is copied exactly N-1 times between
  servers where N is the replication factor.


2. Durability Considerations

  The one thing we have to tradeoff for the performance gains that segment based
  replication gives us is a bit of durability. Previously, with row-based
  replication any insert would be written out to N different nodes as soon as
  it was received by the cluster.

  With segment based replication, each write is initially only durably stored
  on one server in the cluster and only pushed out after a compaction timeout
  of usually around 1 minute.

  The implication of this is that with row based replication, as soon as a write
  is confirmed we can be sure that it will never be rolled back unless three
  machines simultaneously die with an unrecoverable hard drive. With segment
  based replication however, when a single machine dies with an unrecoverable
  hard drive error, up to 1 minute (or whatever the compaction timeout plus
  replication delay is) of recent writes may be lost.

  Given that most transactional databases (like MySQL or Postgres) do not give
  any stronger guarantees than this and EventQL is specfically intended for
  analytical usecases, we consider it a fair tradeoff.


3. Implementation Details

  Each server keeps a list of local segments for each partition. The list contains,
  for each segment file:

    - segment_id -- the segment file id
    - base_segment_id -- the id of the segment file on which this file is based
    - included_segment_id -- a list of segment ids included in this segment
    - acked_servers -- the list of servers that have acknowledged the segment file
    - is_major -- is this a major segment (false by default)

  Additionally, a server keeps for each partition that it stores a "root segment
  id". Every time a server creates a new segment file, it writes it out to disk,
  adds a new entry to the segment list and sets the base_segment_id of the new
  entry to the current root segment id. Then it sets the current root segment id
  to the new segment id and enqueues the new segment for replication.

  The initial root segment id is "NULL". This first segment created thus has
  a base_segment_id of NULL. Also, the first segment (based on NULL) must have
  the is_major flag set.


3.1 Compaction

  When compacting a partition (i.e. folding the list of minor segments into
  a new major segment) the server writes out a new segment file. The new segment
  file has the base_segment_id set to the id of the most recent minor segment
  included in the new segment. Additionaly, when compacting a table, a list of
  up to MAX_SEGMENT_HISTORY included segment ids (i.e. the ids of previous segments
  that have been compacted into the new segment) might be added to the new segment
  entry.

  Note that the old segments do not get deleted immediately after compaction, but
  are later deleted in the replication procedure.

  Only the leader of a partition performs compaction.


3.2 Partition Leader

  To ensure that replication will always be fast-forward in the steady case we
  elect a "leader" for each partition using the coordination service. The exact
  leader election algrithm should be pluggable and is not discussed here.

  Suffice to say that it should always name exactly one of the replicas as the
  leader. Still, even if for a short period of time two nodes consider themselves
  the leader for a given partition, no corruption will occur. It will merely
  result in a lot of unnecessary merges.

  At any time, only the leader of a given partition accepts writes for that
  partition. This implies that the leader is also the only server that produces
  new segments for the partition. However any other replicat of the partition
  can become the leader at any time.

  JOINING servers are not eligible to be elected as leaders.


3.3 Segment Merge and Fast-Forward

  If we take a step back and look at the problem a different way, we are trying
  to create a single, consistent chain of segments. Each segment is immutable
  but multiple servers are involved and we do not rely on the leader lock to be
  perfect, so it is possible that the chain "splits". I.e. two hosts compute
  independent, conflicting segments based on the same base segment.

  The main idea behind the whole segment based replication design is this: We can
  assing a unique id to each segment and then store, with each segment, the id
  of the last segment this one was based on. This way, when receving a segment
  from another node, we can easily tell if the chain split / we have a conflict
  or not.

  Based on that, we have two different procedures to handle incoming data.

  In the steady case, where there is only one leader creating new segments and we
  have a perfect, unoforked chain, each new segment that any node sees will always
  be based on the last one it has seen. In this case, we can simply store the new
  segment on disk, add it to our local segment list and change the root segment id
  to the id of the new segment without ever looking at the data. This is extremely
  cheap. We could potentially add millions of rows with a metadata operation
  (adding the segment entry) that takes milliseconds. We will call this operation
  "fast forward" from now on.

  However, when we receive a segment that is not based on the latest segment we
  have seen, we cannot fast forward it. Note that this case does not only happen
  when there was a fork in the chain, but can also happen if a node has been
  offline for a sufficiently long time. In this case, we will perform another
  operation that we wil call "merge". 

  The merge operation simply reads in the segment and inserts every row into the
  local partition like it was a new write. N.B. that due to the way primary keys
  in EventQL work, repeated writes with the exact same value are idempotent, i.e.
  regardless of how often you write a given (exact same) row the result is always
  the same.

  Another sidenote is that the "merge" operation is pretty much exactly what
  we previously did with row based replication for _every_ row and a lot of other
  products still do. So another way to look at segment based replication is that
  it allows us to "fast-forward" pretty much all of the work in replication in the
  steady case.


3.4 Replication Order

  With respect to a given source and target server combination it is guaranteed
  that the segments will be sent in sequential order. I.e. they will be sent in
  the order in which they were added to the segments list and no segment will be
  sent until it's predecessor segment has been acknowledged.


3.5 Replication Procedure

  Note that this replication procedure only applies to partitions in the LOADING
  or LIVE states. Partitions in the UNLOAD stage have a special replication
  procedure (see below)

  We define the following helper procedures:

    is_referenced(partition P, segment S):
      - set the next_id variable to the root segment id
      - iterate over all segments in the list from new to old as S:
        - Check if the segment id of S equals next_id
          - If yes
            - Check if the included_segment_id list of S includes the search id
              - If yes, return true
            - Check if the segment id of S equals our search id:
              - If yes, return true
              - If no, set next_id to the base_segment_id of S and continue
          - If no, return false
      - if we completed the loop without a match, return false

    is_reference_weak(partition P, segment S):
      - set the next_id variable to the root segment id
      - iterate over all segments in the list from new to old as S:
        - Check if the segment id of S equals next_id
          - If yes
            - Check if the included_segment_id list of S includes the search id
              - If yes, return true
            - Check if the segment id of S equals our search id:
              - If yes, return false
              - If no
                - Check if the segment S is major
                  - If yes, return true
                  - If no, set next_id to the base_segment_id of S and continue
          - If no, return true
      - if we completed the loop without a match, return true

  If we are the leader for a given partition, execute this replication procedure:

    replicate_partition(P):
    - For each segment file S in the partition P, from oldest to newest
      - Check if is_referenced(P, S);
        - If yes, check if is_reference_weak(P, S)
          - If yes, delete the segment
        - If no, merge the segment into the current partition and then delete it

    replicate_partition_to(P, R)
    - For each replica R of the partition that is not ourselves:
      - Iterate the segments list as S from old to new, starting at the most
        recent major segment referenced by the root segment id
        - If the replica R is not included in the acked_servers list for
          segment S
          - Offer the segment S to replica R
            - If the offer was declined with SEGMENT_EXISTS, add R to the
              acked_servers list for S
            - If the offer was declined with OVERLOADED, INVALID or INFLIGHT
              abort and retry later
            - If the offer was declined with OUT_OF_ORDER, remove the replica R
              from the acknowledged_server list and restart replication
            - If the offer was accepted, add R to the acked_servers list
              for S
            - If an error occurs, abort and retry later

  If we are a follower for a given partition, execute this replication procedure:

    - For each segment file S in the partition
      - Check if partition leader is in the acked_servers list for S
        - If yes, check if is_referenced(P, S):
          - If yes, check if is_reference_weak(P, S)
            - If yes, delete the segment
        - If no, push the segment to the partition leader
          - If the offer was declined with SEGMENT_EXISTS, add R to the
            acked_servers list for S
          - If the offer was declined with OVERLOADED, INVALID or INFLIGHT
            abort and retry later
          - If the offer was accepted, add R to the acked_servers list for S
          - If an error occurs, abort and retry later


  Upon receiving a segment from another node, execute this procedure:

    - Check if we are already receiving more than MAX_REPL_INCOMING_SEGMENTS
      segments simultaneously
      - If yes, decline with OVERLOADED and abort

    - Check if we are simultaneously receiving a segment with the same ID
      - If yes, decline with INFLIGHT and abort

    - Check if we are the leader for this partition
      - Check if the partition state is LOADING and the root segment id is NULL
        - If yes, accept and fast-forward-add the segment
        - If no, check if is_referenced(P, S)
          - If yes, decline with EXISTS
          - If no, accept and merge the segment into the partition

      - If no, check if the sender is the leader for this partition
        - if yes, check if the segments base_segment_id equals the local root
           segment id
          - If yes
            - Accept and fast-forward-add the segment to the current partition
            - Add the leader to the acked_servers for the new segment entry
          - If no, check if is_referenced(P, S)
            - If yes, decline with EXISTS
            - If no, check if the segment is a major segment
              - If yes
                - Accept and fast-forward-add the segment
                - Add the leader to the acked_servers for the new segment entry
              - If no, decline with OUT_OF_ORDER
        - If no, decline with INVALID


  FIXME: how to handle in-memory data in follower? (we don't accept writes
  as the follower, but when we loose our leader status this could happen)

3.6 ABA Considerations

  One ABA scenario occurs when a follower recives a major segment followed by
  a minor segment, then another major segment and then another minor segment
  that is based on the previous minor segment. From the senders point of view,
  the previous minor segment has been acknowledged. From the receivers point of
  view the minor segment is not based on the latest segment. This is handled by
  responding with an out of order error code.

  Another ABA scenario occurs in this case: Say replica R is leader for partition
  P. Now all other replicas have pushed all their data to R. Now, R is unsassigned
  from the partition by the master and another replica takes over as leader. Later,
  R is re-assigned to the partition and becomes leader again, starting out with a
  an empty partition. Now, all other replicas will accept the empty partition from
  R and delete their local data because they think it was already acknowledged by
  R. This is prevented by assinging a placement id to each replica+partition
  combination that is unique on each new assignment. The placement id is what we
  actually store in the assigned_servers list.


3.7 Changes to the Server Join Operation

  The server join operation remains largely unchanged. Joining nodes do not
  participate in leader election but are otherwise considered normal followers.
  Once a partition leader has sent the latest segment to a joining node, it will
  initiate a JOIN_FINALIZE metadata operation.


3.8 Changes to the Partition Split Operation

  The partition split operation complicates things a bit. The problem is that
  in the event of a leader failover of the splitting partition, we do not want
  to re-start the partition split from scratch. Also we have to ensure that a
  replica that is offline during the partition split and comes online later does
  not overwrite the new (splitted) partitions with it's old data.

  Forgetting about these issues for a second, the split procedure operation is
  remains conceptually unchanged. Splits are only initiated by the leader. Once
  a partition starts splitting the partition leaders for all target partitions
  are added to the follower list with a keyrange restriction.

  Due to the general replication procedure, only the leader of a given partition
  will actually push data to the new partitions. It will rewrite each segment
  file to exclude all records that do not matchthe target keyrange, but otherwise
  leave the segment unchanged, i.e. not change it's id or base_segment_id.

  If, for example, our partition P has segments A, B and C and is splitting into
  partitions P1 and P2, both new partitions P1 and P2 will also have three
  segmen files A', B' and C' with the same segment ids and base_segment_ids as
  the original A, B and C segment files.

  Once all new partition leaders have confirmed the segment, the leader performs
  the SPLIT_FINALIZE operation.

  Since the leader partition will still accept writes while it's splitting, with
  a large enough arrival rate it's impossible to make sure all records are pushed
  out to the new leaders before marking the partition split as done.

  Also, there is the special case of async splits: We allow asynchronous splits
  to be enabled for ta able. With async splits, the partition split immediately
  finalizes once it was initiated (before the old data is copied out to the new
  partitions).

  This complicates the end of the partition lifecycle. We can't just drop all
  partition replicas once the split is complete for the reasons above. Instead,
  once a partition is in the "UNLOAD" stage of its lifecycle (i.e. it has split
  into another partition), we don't accept any writes anymore so there is also
  no need to elect a leader anymore. In this stage, all replicas are equal and
  perform this special replication procedure:

    - For each segment file S in the partition
      - For each split target replica R
        - Check if R is in the acked_servers list for S
          - If no, push the segment to R
              - If the offer was declined with SEGMENT_EXISTS, add R to the
                acked_servers list for S
              - If the offer was declined with OVERLOADED or INVALID, abort and
                retry later
              - If the offer was accepted, add R to the acked_servers list for S
            - If an error occurs, abort and retry later
    - At this point we ensured all segments have been acked by all new targets
      and the partition may be deleted (moves to the PARTITION_UNLOAD_FINAL stage)


4. Affected Subsystems

  Besides the replication subsystem which needs to be re-written in large parts
  this change will also affect the query, insert and compaction subsystems.


4.1 Binary Protocol Additions

  We add the following new opcodes to the binary protocol:

    SEGMENT_OFFER
      Offer a segment from one server to another, reponse is SEGMENT_ACCEPT or
      SEGMENT_DECLINE

    SEGMENT_ACCEPT
      Tell the offering node to start transmitting the segment.

    SEGMENT_DECLINE
      Tell the offering node that the segment should not be transmitted at this
      time. Valid reasons are SEGMENT_EXISTS, OVERLOADED, INVALID, INFLIGHT and
      OUT_OF_ORDER

    SEGMENT_TRANSIT
      In response to a SEGMENT_ACCEPT operation, the offering node will send
      one or more SEGMENT_TRANSIT frames containing the segment data.

    SEGMENT_ACK
      The receiving server confirms each SEGMENT_TRANSIT op with a SEGMENT_ACK.


4.2 Changes to the Query Subsystem

  FIXME


5. Rollout and Backwards Compatibility

  FIXME


6. Alternatives Considered

6.1 Row-based replication

  The problem with the old row-based replication scheme was that it had to run
  through the regular partition insert code for each row individually at least
  6 times (assuming a replication level of 3). This was a considerable overhead:
  The row only gets stored on disk 3 out of those 6 times (and rejected on
  the other 3 inserts), but just checking if the row should be accepted or
  rejected is a fairly expensive operation and, depending on cache contents,
  might require multiple disk roundtrips.

6.2 Operation-log-based replication

  The major issue with an operation log based approach is that that we would
  have to duplicate data (we would need to store the actual columnar data files
  plus an operation log). In the worst case this would increase our disk
  footprint by more than 100% because the operation log is not columnar.

  Of course, we could delete the operation log once we have pushed it out to all
  servers. However, what do we do once a new server joins or a partition splits?
  We would have to create a synthetic operation log from the cstable files which
  would be expensive.

  Additionally, any operation-log-based approach would incur the same overheads
  as the row based replication.


7. Code Locations

  FIXME