Identify hot data to cache using Multiple Bloom Filters

Recently was thinking how to support caching of hot data in highly loaded application with huge throughput. Here I'm describing a note about the idea, based on bloom filters, that I've ended up. Far from the ideal, and I haven't tested it yet. But at this moment, it seems optimal in terms of CPU and memory usage. Also, after some research over the internet, I found that idea is not new though (see link at the end of post.)

Bloom Filter (BF) gives a nice way to check if some value have been seen before. Of course, this is a probabilistic data structure, so result always goes with with some level of error.

Using multiple hash functions within single BF allows it to be better at avoiding conflicts, but in this case, it also requires the size of bloom filter to be larger. As an opposite to having one large BF and using multiple hash functions, it is possible to using multiple BFs, each relying only on single hash function, both have different size for each BF.

Multiple BFs could be also used to figure out if specified value has been seen for some number of times. As an example, imagine there is an application that accepts reads of data from storage. There is a need to store some data in the LRU cache, but because the number of different values is so large, it is impossible to just add every value to the cache. Cache limit + LRU policy will make sure that even hot data could be easily removed from cache. For example, assume we have a cache with N elements, and we reading from storage M elements, such that M is much much larger than N. If we would start adding every read element to the cache, we easily fill it with one-time read values, and the hot data would be evicted.

Leader Election: LeLann, Chang and Roberts algorithm (LCR)

This algorithm is for ring networks. Each message goes in the network from one process to another, ie no broadcasting. This means that each process knows exactly about only one other process - it's neighbor. This could be imagined as linked list.

Algorithm complexity is better than in bully algorithm, because its worst case is O(N^2), which happens only if hosts are organized into the network using by increasing (or decreasing, depends on the what extreme is used) order of their UIDs.

Leader Election: Bully Algorithm

Only the process with largest (smallest) UID becomes the leader. If some process sees that it's UID is extremal to the current leader UID, it will initiate leader election.
The are 3 types of messages:
  • election message - sent to announce election
  • answer message - response to the election message
  • coordinator message - sent to announce the identity of the elected leader
Algorithm is following:
  1. Some process P finds that leader is not available (or coordinator is down).
  2. P initiates a new leader election by broadcasting election message.
  3. Each process replies with own UID
  4. If P finds a process with higher UID, it waits a bit and if there is no reply with higher UID, it will promote that host to leader.
  5. However, if P receives a coordinator message with new leader, which UID is lower than P's UID, it will initiate another election.
Algorithm requires there should be a coordinator. It also requires N^2 + N messages, as at any moment any process can try to start election, thus send election message, handle N answer messages and send coordinator message. As an optimization, the process can send messages to the processes with larger UID only, this will give us N^2 / 2 + N messages, which is better, but not ideal.

Leader Election in Distributed System

One of the important problems of distributed computing is electing a leader process. Leader process is needed for multiple reasons: it could be used to make a final decision, it could be used to run any task that has to be run only by single process at a time (like finding a next task to execute and spreading execution between other hosts, acquiring a lock etc.), it may contain authoritative data and provide them to other nodes etc.

There are multiple leader election algorithms. Almost always, for most of the tasks it's possible to find the solution where there is no need of single leader:
  • for task scheduling the list of tasks, tasks could be partitioned and with locking spread over multiple hosts
  • for making final decision, it's possible to come up with some consensus (like paxos algorithm)
  • distributed locking could help other systems to avoid having a leader (again it might be based on some consensus algorithm like paxos or raft)

Amazon Dynamo paper notes

Notes based on the paper about Amazon Dynamo. There is however a difference between Dynamo and AWS DynamoDB (at least based on name), however this is not discussed in the document.

Dynamo is designed to be an eventual consistent storage. At the same moment it allows clients to specify number of hosts in the read (R) and write (W) quorums. The number of hosts in the cluster N is also configurable. The good rule of thumb is to keep N >= 3, and R + W > N. In this case, only the correct data would be returned.

Many systems use synchronous data replication to keep consistency strong (e.g. MySQL). This causes issues with availability, b/c if the replica failed, it becomes unavailable to its clients. Only single master host could accept writes in such systems. In opossite to this, Dynamo uses asynchronous data replication algorithms. This helps to build partition tolerated systems, as Dynamo would accept reads and writes even during network partition, and resolves the update conflicts using different algorithms.

Dynamo is a P2P system that uses gossip based protocol to share the membership and failure detection information with peers. The membership information contains the list of key ranges that the node is responsible for. Also nodes use gossip protocol to discover itself to other hosts. Basically, when a new dynamo host starts, it knows already a list of seed hosts, which it connects to tell them about own availability. Those seed hosts help to deliver the information via gossiping to other peers.

Dynamo uses consistent hashing for partitioning. This allows adding new nodes to the hash space without a need to re-hash existing values. Instead, only some part of values is moved to the new node, after what it becomes available for reads and writes.

Merkle trees

Merkle tree is a hash tree, where leaves are hashes. And each intermediate node is a hash of its children. Root node is a hash of values in its children node. Thus to know the difference between two hash trees, it's just enough to compare the root nodes of those trees. It's also easy to find the actual difference between two trees: just traverse them top-to-bottom and find the nodes with different hashes. This is how Merkle tree helps to reduce the amount of data passed between two sources.

Merkle trees are used as anti-entropy mechanism that helps to detect cases of divergence between data stored in multiple location. Merkle trees, for example, are used by Dynamo and Cassandra to find the cases when replicas have different versions of data. Merkle trees are also an awesome way to detect changes before syncing up data, which is one of the primary tasks for Dropbox, Chrome Backup, Google Drive, BitTorrent Sync etc.

Merkle trees do not rely on timestamps, but on actual data differences. This makes hash trees a wonderful structure to synchronize data between source and target with minimal effort of detecting actual changes.

For example, assume that there is a backend B, and N clients, each client maintains copy of the data (like Dropbox, BitTorrent Sync, Google Drive etc.) And some of the clients changed the data locally, and uploads them to backend B. Then each client can determine the actual change and do exchange of only necessary data with backend.

Hash trees are used by Git and Bitcoin. As I mentioned before, Dynamo, Cassandra and Riak are using Merkle trees for anti-entropy as well.Merkle trees are also used in cryptography for message signatures (search for Lamport signature).

Links: