Chapter Summary
This chapter is the start of the three related topics regarding storage systems, namely replication, partition and transaction. Replication is important because of these these properties it provides:
- 1) keep the data geographically close to the users
- 2) allow system to continue to work even if some of its parts have failed (redundancy)
- 3) allow scaling out to serve the increasing throughput
Three common patterns, namely single-leader, multi-leader and leaderless are discussed. In the single-leader pattern, the challenge is mainly the follower set-up and replication lag. For multi-leader and leaderless pattern, the main challenge is the write conflict detection and resolving. This chapter discusses the definition of the patterns, the challenges and various solutions in detail.
Mind Map
Question Summary
Single-leader
đź’ˇ What are synchronous replication and asynchronous replication?
- Synchronous replication means that when write happens, the successful response to the client will only be made after copying the data to all the followers.
- Asynchronous replication means that the successful response is immediately returned to client when a write is saved to the leader. The copying from leader to followers will happen in background asynchronously.
đź’ˇ What is semi-synchronous replication and why it is good?
In practice, synchronous replication is impractical because one unavailable node will block all the writes. Semi-synchronous refer to the way that the data write to the leader, is synchronously copied to 1 follower, and then copied to other followers asynchronously. This ensures that at any time, at least two nodes have the up-to-date copy of the data.
đź’ˇ What is the trade-off of async replication?
It results in weak durability. In the case that a leader fails before all the data is copied to followers, there will be data lost.
đź’ˇ What are the 4 steps to setup a new follower?
- Take a consistent snapshot of the leader’s database at some point in time
- Copy the snapshot to the new follower node
- The follower connects to the leader to request all the data change since the snapshot was taken
- The follower catches up the change, which is to process the backlog of data changes since the snapshot
đź’ˇ What is the log sequence number in PostgreSQL and binlog coordinate in MySQL?
These two are the same concept, refers to the exact position in the leader’s replication log that the snapshot of the database is associated with.
đź’ˇ What are the 3 steps for an automatic failover process?
- Step1: determine the leader has failed, normally use a ping timeout
- Step2: choose a new leader through an election process
- Step3: reconfiguring the system to use the new leader
đź’ˇ Why manual operation to handle leader failover is preferred?
- Automatically handle unreplicated writes in failed leader is tricky. Simple discarding might cause serious consequences
- May result in split brain problem, which means that multiple leaders running at the same time may occur if the failover mechanism is not carefully designed
- If the timeout is not properly decided, false alarm may occur and could result in an unnecessary failover
đź’ˇ What is split brain?
More than one leader is running in a single leader replication architecture. If there’s no process to handle write conflict, it will result in data lost or corruption.
đź’ˇ What are the four leader-to-follower replication methods?
- Statement-based
- Write-ahead log shipping
- Logical (row-based) log
- Trigger-based
đź’ˇ In a statement-based replication approach, what conditions may break the replication?
- Any statement use nondeterministic function (NOW(), RAND())
- Statement use auto-incrementing column
- Statements that have side effects (e.g. triggers, stored procedures, user-defined functions)
đź’ˇ What is the disadvantage of write-ahead log shipping replication approach?
The log data is stored in a very low level, which makes the replication closely coupled to the storage engine. Normally the log cannot be consumed by different versions of the engine, which makes the zero-down time version upgrade challenging.
đź’ˇ What are the different ways to solve replication lag issue?
- Reading your own writes (or read-after-write consistency): 1) serve the often-modified entry by leader only 2) serve by follower only after some time passed the write (e.g. 1 mins later after update) 3) record the update timestamp and use the timestamp to check if value exists
- Monotonic reads: the same user always reads from the same replica
- Consistent prefix reads: if any writes happens in a certain order, then anyone reading those writes will see them appear in the same order
Multi-leader
đź’ˇ What are the ways (name 4) of achieving convergent conflict resolution?
- Last write wins approach
- Higher-numbered replica wins
- Somehow merge the values (e.g. concatenate strings)
- Record the conflict in an explicit data structure that preserves all the information and ask for user to resolve at some later time
đź’ˇ What is the replication topology in multi-leader replication architecture?
A replication topology describe the communication paths along which writes are propagated from one node to another.
đź’ˇ What are the commonly seen multi-leader replication topologies?
Circular topology, star topology and all-to-all topology
đź’ˇ Why all-to-all topology is better than circular and star topology?
Avoid the single point of failure
đź’ˇ What is the causality issue of all-to-all topology and what is the solution to that?
Causality issue refer to the problem that some operations does not arrive to the system in the order expected due to the network latency. So one operation that depends on the other might arrive earlier (e.g. an update statement comes before an insert of the item). Version vector is the solution to the causality issue
Leaderless
đź’ˇ How does an unavailable node catch up on the writes when it comes back online?
- Read repair: the client fetches the data during read and detect the stale value from the nodes, it then write the update-to-date back to the nodes
- Anti-entropy process: a background process that constantly looks for differences in the data between replicas and copies any missing data from one replica to another
đź’ˇ What is the benefit of choosing r and w to be majority (more than n/2) of nodes?
This choice ensures w+r>n (quorum condition) while still tolerating up to n/2 node failures
đź’ˇ If a workload pattern is few writes and many reads, what is better config for the quorum and why?
Reduce the r, say to 1, increase the w, say to n (so r + w > n holds). In this case, we can reduce the load of read. However increase w to n has the disadvantage that a single unavailable nodes will cause write failure.
đź’ˇ Define strict quorum reads and writes
Define the number of nodes that confirms writes are w, the number of nodes required for read is r, and the total number of nodes is n. When w + r > n, it guarantees that at least one of the nodes for read contains the latest updated writes.
đź’ˇ Define sloppy quorum
In a large cluster with significantly more than n nodes, writes and reads still require w and r successful responses, but those may include nodes that are not among the designated n “home” nodes for a value.
Highlighted Topics
Quorums Condition
Leaderless architecture becomes popular after Amazon used it for its in-house Dynamo system (the paper is written in 2007). Riak, Cassandra and Voldemort are open source datastores with leaderless replication models inspired by Dynamo, so this kind of database is also known as Dynamo-style.
Quorums is the essential concept to guarantee the data consistency in Dynamo-style data storages. It is defined as this:
If there are n replicas, every write must be confirmed by w nodes to be considered successful, and we must query at least r nodes for each read. As long as w + r > n, we expect to get an up-to-date value when reading, because at least one of the r nodes we’re reading from must be up to date. Reads and writes that obey these r and w values are called quorum reads and writes.
We can smartly choose w and r in different use cases:
- For a write-intensive use case, we can choose a smaller w (but a larger r) to reduce the write latencies. Vice versa for a read-intensive use case.
- Generally we can choose w > n/2 and r > n/2 so the system can tolerate up to n/2 nodes failures
However even with w + r > n, there are likely to be edge cases where stale values are returned. Dynamo-style databases are generally optimized for use cases that can tolerate eventual consistency (BASE over ACID).
Extended Topics
Version Vector
Consider two operations A and B happens at very similar time, there’s only three different possibilities in terms of the which happens first:
- A happens before B and B depends on A’s occurrence
- B happens before A and A depends on B’s occurrence
- They happen at the same time and there’s no causality relationship between them
If we can identify the causality between A and B, then we can say that A and B are not concurrent and we can safely keep the result from the most recent operation. Otherwise, we want to keep the conflicted result from the concurrent operations A and B, and let the client to resolve the conflict (In simpler cases, we can take the “later” operation based on the timestamp or replica number order, but it comes with the risk of data loss).
To detect the concurrency, we can take the approach to detect the causality, if no causality, means that the operations are concurrent.
A simple algorithm to capture the happens-before relationship
- The server maintains a version number for every key, increments the version number every time that key is written, and stores the new version number along with the value written
- When a client reads a key, the server returns all values that have not been overwritten, as well as the latest version number. A client must read a key before writing
- When a client writes a key, it must include the version number from the prior read, and it must merge together all values that it received in the prior read
- When the server receives a write with a particular version number, it can overwrite all values with the that version number or below, but it must keep all values with a higher version number (because those values are concurrent with the incoming write)
Important aspect to consider when reasoning about the scalability of these version vector is the existence of three different orders of magnitude at play:
- a small number of replica nodes for each key
- a large number of server nodes
- a huge number of clients, keys and issued operations
Thus a scalable solution should avoid mechanisms that are linear with the highest magnitude and, if possible, even try to match the lowest scale
Here’s a summary and comparison of different version vector approaches
Version vector with causal histories
Causal histories ({a1, a2}) are maintained in this version vector solution, but this is not useful in practical systems because they scale linearly with the number of updates.
Causally compliant total order (LWW)
Assuming that client clocks are well synchronised and applying real time clock order, in this approach, replica nodes never store multiple versions and writes do not need to provide a get context. Cassandra is using this simplest approach.
Version vectors with per-server entry
A concise version vector can be achieved by storing all the update-to-date server+version information with each replica. In this case, it is possible to track the causality among updates that were received in different servers. However, this approach cannot track causality among updates submitted to the same server. Dynamo system uses one entry per replica node and thus falls into this category.
Version vector with per-client entry
The version vectors with one entry per replica node are not enough to track causality among concurrent clients. One natural approach is to track causality by using version vectors with one entry per client. In the implementation, each client needs to maintain a counter increment and provide it in each PUT, otherwise we will encountered the problem illustrated below. This approach is not very practical mainly because it is a big challenge to maintain the clients stability of the storage system - the application can run concurrent and dynamic threads, it is difficult to identify and track them.
Dotted version vectors
Dotted version vectors use a concise and accurate representation for the clocks to be used as a substitute for the classic version vectors. It uses only server-based ids and only a component per replica node, thus avoiding the space consumption explosion. The formal definition of the version vector is
\[\begin{align*} &C[(r, m)] = \{r_i | 1 \leq i \leq m\}, \\ \\ &C[(r,m,n)] = \{r_i| 1 \leq i \leq m\} \cup \{r_n\}, \\ \\ &C[X] = \bigcup_{x\in X}C[x] \\ \\ &\text{In a component (r,m,n) we will always have n > m} \end{align*}\]The example shows how the version vectors are updated for each update:
As we see, the popular databases like dynamoDB and Casandra are using simpler version of the version vectors in practice. This is very common engineering design pattern - In reality, production-level engineering system comprises complexities for performance and engineering simplicity. Dotted version vector implementation can be found in Riak.