So far, so good. Our first attempt at distributed clustering was with k-means and we were in luck: we discovered that it can be easily rewritten as a distributed algorithm because its steps can be performed independently for each point (classification) or each cluster (re-centering). We applied MapReduce to canopy clustering as well, even if its first step, drawing the canopies centroids, was not immediately parallelizable and required a deeper reasoning to obtain the best clustering.

To close the circle and complete the topic, in this section we are going to discuss how to apply the MapReduce paradigm to a clustering algorithm that is intrinsically non-parallelizable, at least at first glance: DBSCAN.

As we saw in section 12.3, in DBSCAN the clustering is computed by exploring and leveraging the relations between points that are therefore interconnected. Sharding a dataset would change the £-neighborhood of most points, and some of the core points might not be recognized as such because their ^neighborhoods are scattered across several shards.

Although it is possible to think about a MapReduce job computing the size of the £-neighborhood of each point with a distributed computing model, the core of DBSCAN relies on sequentially going through points and their neighbors, and thus it seems it would make more sense to explore different options.

We already discovered in section 13.1.4 that we could use canopy clustering as a first step, apply DBSCAN to each canopy separately, and then iteratively merge clusters in neighboring canopies when points close to their borders or in the overlapping regions are within each other’s £-neighborhood.

This approach becomes problematic for a few reasons:

- It’s hard to keep track of the canopies that needs to be checked, and of the points inside the canopies to compare. All pairs of canopies need to be compared to check whether they overlap or their distance is smaller than e, and then for each pair of canopies all points need to be compared to the other canopy.
- Given the shape of the envelopes (hyperspheres), it is complicated to compute the distance between points in a canopy’s external rings and the other canopy (see section 10.4.3 and figure 10.23 to get an idea of the complicated geometric implication for the 2-D case, which gets even more complicated for hyperspheres in higher dimensions).
- Getting the thresholds passed to canopy clustering right is complicated; because of the spherical shape of canopies, we will have to use a larger-than- needed Tj radius and a significant overlapping between canopies to capture the relations between clusters in different canopies.
- When the overlapping between canopies is large, points get assigned to several canopies, and thus they will need to be processed several times for each pair of canopies. This easily becomes a computational nightmare, making vain all the effort we did to parallelize.

This solution can therefore work limitedly in certain specific cases, in specific configurations, but doesn’t work in high-dimensional datasets.

Nevertheless, the basic idea is valid, and we can make it work by simply changing the way we shard the dataset. Instead of relying on random sharding or spherical canopies, we can break the dataset into a regular grid, where each cell is of the same size. Cells are hyper-rectangles instead of hyper-spheres, and for each coordinate the domain can be split differently, causing the cells’ (rectangles’) sides to each be of a different length.

This is an improvement over dealing with canopies, because identifying points close to the borders becomes easy. We can just check that the absolute value of the difference between the point and the border along some coordinate is smaller than a threshold, instead of computing the distance between a point and a hyper-sphere; moreover, it’s also easier and cheaper to shard the dataset into grid cells.

But it gets even better! Instead of having to compare points close to the borders of adjacent cells, we can define the cells to be slightly overlapping, and precisely to be overlapping, for each coordinate, over a rectangular area of side e, as shown in figure 13.12.

This way each adjacent cell overlaps the next one over a length of 2ϵ, and the e-neighborhood of each point in the overlapping section will be part of at least one of the two adjacent cells (for instance, point p in the figure has its e-neighborhood completely contained in S_{1}). The trick, therefore, is that if p is a core point in one of the shards, it will also be a core point in the union of the shards, and hence its neighbors on both sides of the cells’ border should be directly-reachable from p, and in turn end up in the same cluster. Therefore, if p is assigned to cluster C_{1} for shard S_{1}, and to cluster C_{2 }for shard S_{2}, it follows that clusters C_{1} and C_{2} should be merged.

Vice versa, if we consider any point that is outside the shard’s border, like r in figure 13.12, which is on the left of S_{1}’s inner margin, then we know for sure that

- Its distance from the shard’s border is larger than ϵ, and therefore its ϵ-neighborhood doesn’t intersect S
_{1}’s outer margin. - If there is a point z in S
_{2}that is reachable from r, then there must be a chain of core points that are directly reachable from z and r (for the definition of reachability, see section 12.3.1), and at least one of these points—call it w—must be in either S_{1}’s inner or outer margin, because these areas extend exactly for a length equal to 2ϵ, which is exactly the diameter of the core points’ ϵ-neighborhood. - Therefore, we can ignore r, because we’ll join its cluster to z’s when we examine point w.

The consequence of what we informally proved here is that, instead of comparing each point in a cell to all the points close to the border of the adjacent cells, we can just keep track of the core points within a distance ϵ from a cell’s border (or 2ϵ from a shard’s border), and merge those clusters that have a point in either that inner or outer margin that is a core point in either of the adjacent cells.

Figure 13.13 shows an example of how a MapReduce job would perform a distributed clustering of a 2D dataset using DBSCAN, and the reduction described in this section and listing 13.5 uses pseudocode to describe the step needed.

The first step is sharding the dataset according to a regular grid. Each grid cell is then expanded in all directions with a further area of side ϵ, and shards are formed by filtering all points within these expanded rectangles (which, it’s important to remember, overlap along their common edges with adjacent cells).

Each mapper performs DBSCAN clustering on its shard, and at the next step, a reducer is spun up for each pair of adjacent shards (in this example, we used a 2×2 grid, so there are four pairs of adjacent grid cells).

Reducers need to receive, from each mapper, the list of clusters found, the list of noise points (if any), and the list of points in the margin region between the two shards that the reducer will process.

The reducer then checks to see if there is a core point in the shared margin region, and if so, it merges the two clusters to which the points belong. In the example in figure 13.13, we deliberately used a global incremental indexing for clusters at this stage, but in reality, this global indexing is not easily achievable with a single pass! That’s because we don’t know in advance how many clusters a mapper will find. Merging clusters can also be handled locally, but at some point, an extra step will be needed to re-index all clusters globally.

If we suppose that clusters have global indexing, however, we can handle merging clusters using a data structure with which you should already be familiar: the disjoint set, which we described in chapter 5.

You might have noticed that there are some edge cases we should keep in mind. Clusters in one shard, for instance, can be subsets of a bigger cluster in another shard. In the example, reducer 2 gets clusters C_{1} and C_{6}, with the latter completely included in the former; in this case, even if no point in the margin regions is a core point, obviously we need to merge the two clusters (or, equivalently, get rid of C_{6}).

Likewise, if a point p is classified as a core point in one shard and noise in the other, there won’t be clusters to merge: p will already be in the right cluster, but we also need to be careful, because it should be removed from the list of outliers.

The output of each reducer will be a list of local clusters to merge, or, alternatively, the disjoint set keeping track of the merges. A further brief composition step can take care of producing the list with the point assignments to the final clusters and the list of outliers, based on reducers’ output.

The pseudo-code for this MapReduce job is simpler than any other job in this chapter, but don’t be fooled—most of the complexity is hidden in the methods grid- Shard, dbscan, and mergeClusters.

Method dbscan is exactly the same method that we described in section 12.3. In most languages, we will be able to reuse it without any modification. gridShard, in its most naive version, just iterates over points and computes the index of the cell by performing a modulo division (plus a few checks to see if the point is in the margin of adjacent cells). We’ll address some problems connected to this method later in this section, but we won’t get into the details of its implementation.

Finally, method mergeClusters is a nice application of disjoint set, the data structure we described in chapter 5. Listing 13.6 shows a possible implementation of this method that treats argument clustersSet like an instance of disjoint set shared among all reducers. While this is not practically possible, it is conceptually equivalent to having the reducers emitting a list of clusters to merge and perform the operations on the disjoint set in the combiner stage, after reducers finish their job. For this reason, we can consider clustersSet like a facade simplifying the process of emitting a pair of clusters to merge and sending the pair to the combiner, where the actual disjoint set is created and merge operations are performed.

The implementation goes through all points in the margin region between the two shards (use figure 13.12 as a reference) and checks if any of these points is a core point in at least one of the shards. Then we just need to make sure it’s not a noise point in the other shard (handled as an edge case) and merge the two clusters (c will be in shardl and c_{2} will be in shard2). Optionally, we can check that c_{1} and c_{2} are not one subset of the other and handle that case differently.

There is one final detail we need to address before wrapping up the discussion on MapReduce DBSCAN: the sharding step. Before continuing reading, stop for a minute and think about how this case is different from what we have seen before, and what issues we could face in this step.

Do you see the problem? Deterministically sharding points according to a rectangular grid will not be as cheap as the random sharding we have seen so far! Before running and even configuring the MapReduce job to perform this sharding, in fact, we would need to run a single-thread process creating the grid and assigning each point to a shard, depending on the point’s position. If the grid has m cells, and the dataset holds n points with d coordinates each, then this step will require, in the worst case, O (n*d*m) comparisons.

To speed things up, we can use an R-tree. As mentioned in chapter 10, R-trees can hold non-zero-measure objects, and in particular shapes like rectangles (see figure 10.5). We can therefore create an R-tree whose items are the grid cells, and for each point find the closest cell. Since R-trees have linear-time worst-case running time, however, we don’t improve our asymptotic result (but in most cases R-trees will result, in practice, in faster than naive search).

To seriously speed things up, however, what we really need is to distribute even the sharding step with a new MapReduce job. We know that each point can be compared to cells independently; therefore, if we split the dataset into random shards, each shard can be processed by a battery of mappers. Reducers, at the same time, will just group points by extended cell(s),^{[6]} and finally produce (not randomly, this time) new shards that can then be used in the first step of the MR-DBSCAN job (the one illustrated in figure 13.13). As a further optimization, since reducers for this job (shard- ing) will already have all the data for a cell, we can repurpose the same machines to be the mappers in the MR-DBSCAN job.

It’s also worth mentioning that the number of adjacent cells grows linearly with the dimension of the space, since the number of faces of a d-dimensional hypercube is equal to 2*d. This means that the sharding algorithm can scale out to higher dimensions.

Finally, it’s worth mentioning that because we are performing this sharding step as an independent MapReduce job, we are forced to use a regular grid. On the contrary, the article by He et al. presenting MR-DBSCAN uses a different, more sophisticated approach, where statistics on the dataset are collected in a first pass through the dataset, splitting the domain into an irregular collection of parallel rectangles (see figure 13.14) each with ideally uniform density. Then, leveraging the statistics collected, we can also tune the parameters for DBSCAN to adapt to the different density found in different cells.

Source: Rocca Marcello La (2021), *Advanced Algorithms and Data Structures*, Manning Publications (2021)