To efficiently parallelize any algorithm, we first need to answer two questions: How do datapoints influence computation, and what data do we really need at any time to perform a certain step?

In the case of both k-means and canopy clustering, the way we compute the various steps dictates a MapReduce implementation. Let’s examine each step of k-means separately:^{[1]}

- In the
*classification*step, when we assign points to canopies/clusters, for each point the operation is computed independently, and the only thing that matters is the list of centroids. So, we can shard the data however we like, as long as we pass all the centroids to each mapper. - In the
*re-centering*step, to update k-means ’ centroids, for each centroid we only need the data assigned to it, and each point is assigned to a single centroid, so we can partition the dataset and process each group separately. *Initialization*is trickier for k-means; we would need to draw points randomly from the full dataset, and this would seem to hinder parallelization. We can, however, employ a few possible strategies to distribute the computational load:- Randomly shard the dataset and then independently draw centroids at random from each shard (although it can be tricky to obtain an overall uniform distribution of samples).
- Run canopy clustering first and feed those canopy centroids to k-means as the initial choice of centroids. Then the question becomes, can we also distribute canopy clustering? Although a little trickier, it turns out that we can do that.

It follows that parallelizing canopy clustering is the key step here, and also the trickiest part. We’ll worry about it later in this chapter. Before delving into this step, let’s give a high-level description of the full algorithm for distributed k-means:

- Initialize the centroids using canopy clustering.
- Iterate (at most m times).
- Points classification:
- Shard the dataset and send the shards, together with the list of centroids, to mappers.
- Each mapper assigns points to one of the centroids. Send the data to the reducers, aggregated by the centroid chosen (ideally there will be one reducer per centroid).

- Centroids update:
- Each reducer will compute the center of mass of its cluster and return the new centroid.

- Points classification:

This is also summarized in listing 13.2 and shown in the example in figure 13.7.

The first thing you should know is that we are not talking about good old plain MapReduce in this case. The base model of computation is MapReduce, but since the k-means heuristic consists of repeating some steps m times, we will need to start the computation several times. This is illustrated by the workflow in figure 13.7: the output of each MapReduce job, a list of centroids, is also the input of the next job, and moreover sharding the dataset and distributing it to the mappers is a step that doesn’t need to be repeated for each job, because there is no reason why the shard assigned to a mapper should change.

For these reasons, rather than running m times MapReduce separately, we can use a more efficient (in this context) evolution of this programming model, *Iterative MapReduce*.

The idea is that we spin up mappers and reducers once, sharding data during the configuration of the mappers and assigning point shards to each mapper only once. Then we iterate the classic MapReduce cycle until needed, passing only the current list of centroids as input to all the mappers. Thus, the amount of data per job to be passed to each mapper is going to be several orders of magnitude smaller than the dataset size, and ideally it will also be significantly smaller than the size of each shard. We can pass the number of mappers to create an argument to our enhanced k-means method, and tune this parameter based on the size of the dataset and the capacity of each mapper.

Figure 13.8 illustrates well how the computation proceeds from this point. Each mapper performs the classification step on the fraction of points assigned to it. Then reducers (ideally one per cluster) will read data from each mapper: the i-th reducer will only get the points assigned to the i-th centroid (or to the centroids assigned to the reducer, if more than one).

Notice how reducers don’t get any information about current centroids (in the illustration, in fact, in the reducer steps old centroids are shown as semi-transparent polygons) because each reducer only needs all the points that belong to a cluster, in order to compute its center of mass.

Each reducer eventually outputs the centroid computed (and just that; no points returned, to save bandwidth and ultimately time) and the MapReduce primary will combine the k results (where k is the number of centroids/reducers) into a single list, that will be fed again to the mappers in the next iteration of the cycle!

Once the cycle is over, we only get centroids as result. We can run the mappers one last time outside of the cycle to get the points assigned to each cluster (in this step, at line #12 of listing 13.2, we can imagine a new set of reducers will be used: dummy passthrough nodes, just returning their input).

This implementation of k-means only uses canopy clustering to bootstrap convergence with a better-than-random initial choice of centroids. Distributing the classification and re-centering steps is already a great improvement, and there is a good chance that the improvement you get is already enough to satisfy your requirements.

Still, even if canopy clustering is faster than an iteration of k-means and can be made even faster by using a cheap approximated metric instead of Euclidean distance, for huge datasets the risk is that you can waste most of the gain obtained by distributing the implementation of k-means through MapReduce if you run canopy clustering on a single machine. Moreover, sometimes this option isn’t even available for huge datasets that won’t fit on any machine.

Luckily for us, we can apply MapReduce to canopy clustering as well!

### 1. Parallelizing canopy clustering

Canopy clustering is a bit trickier to redesign as a distributed algorithm because it has only one step: drawing canopy centroids from the dataset and filtering out points within a certain distance from them so that they won’t later be selected as centroids. The issue is that in this step, for each centroid drawn from the dataset, we need to go through the whole dataset for the filtering part. In theory, for each centroid we would just need to process those points in its canopy, but we can’t identify them in advance!

To get to a good solution, it can be useful to think about the real goal for canopy clustering: we want to get a set of canopy centroids that are no closer to each other than some distance T_{2}. The key point is that between any two of these canopy’s centers, the distance must be above a minimum value, so that the canopies won’t overlap too much. As we mentioned, this distance is similar to the “core distance” in DBSCAN, and points within a radius T_{2} can be assumed to belong to the same cluster with high probability.

Suppose we shard our initial dataset, as shown in the top step of figure 13.9. If we then apply canopy clustering to each shard independently, we’ll get a certain number of centroids, probably different from mapper to mapper. If we recombine these centroids together, however, we have no guarantee that they will respect the requirement of being not closer than T_{2} from each other, because centroids from different shards haven’t been compared to each other.

It’s not time to give up yet, though! Luckily, there is an easy solution to this issue, and the solution is still . . . canopy clustering!

In fact, if we gather all the centroids from each mapper together, we can apply the canopy clustering algorithm again to this new (smaller) dataset, refining the selection, and this time guaranteeing that no two points in the output from this second pass will be at a closer distance than T_{2}. Check the last row in figure 13.9 to get an idea of how this second pass works. The solution is also efficient because the size of the new data-set (containing only the centroids produced by the mappers) is orders of magnitude smaller than the original dataset (assuming the distances T and T_{2} have been chosen properly), and therefore in step 2 we can spin up a single reducer and run canopy clustering on a single machine and on all the centroids from step 1.

At this point, we need to make a consideration. When used as a preliminary step for k-means, the algorithm has a slightly different goal (and different output) than when used as a standalone coarse-grained clustering algorithm:

- To k-means, we only have to return a list of centroids.
- The standalone algorithm will also need to return, for each canopy, the points belonging to it.

Therefore, we need to treat these two cases separately.

### 2. Centroid initialization with canopy clustering

Let’s start with canopy clustering as the initialization step for k-means. Listing 13.3 summarizes a possible implementation for the MR job performing this task. At first glance, we don’t need to do anything other than what we have shown in the previous section: we just return the output of the reducer, the list of centroids.

But there is a catch (there always is!): How do we decide how many centroids should be returned by canopy clustering?

The answer is that we can’t control it directly, but only through the values of the two distance thresholds passed to the algorithm, and only to some extent. In the end, the algorithm is a randomized heuristic and the number of canopies created can vary at each run, even with the same values for the hyper-parameters.

So, we need to think outside the box to handle this. Because there is a strong random component that influences the result of each run, we can run canopy clustering several times and take the result that is closer to our expectation. This might not be enough, though, because the variance between different runs is limited, and if we start with the “wrong” values for the thresholds, the algorithm could always output too many (or too few) centroids.

To solve this issue, we have two options: either manually tuning these thresholds after each run, or, alternatively, performing some kind of search in the domain of the thresholds, trying different values for them, either by adding at each run a random value to our initial choice for T_{1} and T_{2}, or by tuning the thresholds depending on the number of canopies returned (lowering T_{2} when too few centroids are selected in the last run, and raising it when we get too many). If we’d like to get really fancy, we can even use ML to find the best values.

While the former idea is a brute-force, fully randomized search, a better targeted solution seems more promising, because we could direct the search toward values that should work better for our goal. We can use the same idea behind gradient descent, although with a simpler algorithm that just decides the direction of the update, without worrying about slopes and gradients. For instance, we could run a cycle where we adjust the value for the inner threshold (and, when needed, also the one for the outer threshold) depending on the difference between the result we get from canopy clustering and the number of centroids we need.

Considering the random factor in this algorithm, however, this is clearly still a naive search over the possible values of T_{2}, and it could possibly lead to an infinite loop. To correct this situation, we could add a stop condition (and an argument) checking that a maximum number of iterations is not exceeded. At the same time, we could also store the result that is closest to our request in a temporary variable that we update at each run, and return this result whenever the maximum number of iterations is reached without finding a set of canopies with exactly numCentroids entries. Most of the time it can be acceptable if we return, for instance, 101 centroids instead of 100 (the point being, the caller will have the chance to check the result and decide).

We leave it to the readers, as an exercise, to extend listing 13.3 in order to handle the thresholds choice automatically, and we’ll instead move on to describe the distributed version of the full canopy clustering distributed algorithm, the one returning not only the canopy centroids, but also the overlapping sets of points that are associated with each canopy.

### 3. MapReduce canopy clustering

The *classification* step, assigning each point to one or more canopies, can be implemented in a few different ways:

- As a follow-up of the method described in listing 13.3. However, because classification will involve all points, if we don’t distribute this step, we will lose most of the advantage of running the algorithm to choose centroids in parallel.
- In the same method as centroids initialization, but with a different MapReducejob.
- In the same MapReduce job we described in the previous section, with the reducer that is in charge of also performing classification at the same time that it chooses which centroids should be kept. In particular, the reducer would get from mappers all the lists of points assigned to each centroid, so rather than cycling through all dataset points again, it could reuse these lists (we’ll see later in this section how it will need to combine them).

The first option is rather easier, but naive. Implementing the classification step at least in the same method as the choice of centroids gives us two main advantages:

- We can, in theory, reuse the same mappers, which already hold onto their shards of data, by just passing them the list of canopy centroids.
- When we perform classification, there is an issue with the centroids filtering we perform in the reducers. So far, we have been able to ignore it, but running centroid filtering and classification together allows us to solve this issue efficiently.

Figure 13.10 shows what might happen when we filter out one or more centroids during the reduce step in our previous algorithm. We haven’t mentioned this issue until now because it only affects the algorithm when we assign points to canopies, while it is irrelevant when we only care about centroids, as in the k-means initialization steps.

The problem is that when we filter out any one of the centroids selected during the map step, a fraction of the points in its canopy might not be covered anymore, not even by the centroid that was chosen in its place. See the left side of figure 13.10, where the centroid marked as a star is drawn from the list of centroids, and as a result, the other centroid within the inner radius T_{2} from the selected one is filtered out; however, the shaded area (highlighted by shading) is not covered by the canopy centered at the other centroid (the star).

While sometimes the lost coverage is made up for by other canopies, this isn’t always the case: the right side of the same figure shows an example where a few points remain uncovered after a specific choice of centroids to keep.

There are several options for solving this issue:

- Consider the “lost” points as outliers (this is not really reliable, though, because they might lie in the middle of some big cluster that can’t be covered by a single canopy).
- Enlarge canopies: the discarded centroids can be appropriately marked during a run of canopy clustering every time one of the centroids is chosen. During this phase, it is possible to keep track of the outer radius associated with each canopy, and make it large enough to cover all points in the canopy of the removed point. The cheapest way to do so is to set the radius of all canopies to T
_{2}+T_{2}, but this would obviously be increasing the overlapping between canopies. - Go through unassigned points (those that are not within a distance T
_{2}from any of the survived centroids) at the end of the classification step and assign them to the closest centroid. This solution will limit to the minimum the overlapping of canopies (each canopy’s radius will be at most T_{2}+T_{2}, but as small as the distance to the furthest of these new points), but it will be more costly. - If we perform classification in the same MapReduce job as the choice of centroids, we can find an efficient solution. Mappers will also have to produce a list with the sets of points associated to each centroid and pass these sets to the reducer along with the list of centroids for each shard. In the reducer, an ad hoc variant of canopy clustering is run, and when a centroid c is drawn, the sets assigned to all centroids within a radius T
_{2}from c are merged and assigned to c’s canopy. This is the best solution in terms of performance, because it saves one iteration over all points in all shards and only requires a single reducer. The downside is that it needs many lists of canopies to be transferred to the reducer, and a custom version of the canopy clustering algorithm that handles merging centroids.

Listing 13.4 summarizes a high-level implementation of canopy clustering leveraging MapReduce, with classification performed in the same method, but in a second MapReduce job with respect to the choice of centroids. Lines #2-5 run the same algorithm that in listing 13.3 performs the distributed computation of canopy’s centroids. Lines #6-9, on the other hand, run the code specific to this version, performing the assignment of each point p to all the canopies for which p is within a distance T^T_{2 }from the canopy’s center. As we have mentioned, choosing a larger radius ensures that no points will remain uncovered.

The method performing these assignments, classifyPoints,^{15} is run in the mappers for each shard separately. As for k-means, this step can be performed independently on each point, as long as the mapper has the full list of centroids.

The output of each mapper will be a list of lists with one entry per centroid: the list of points associated with that centroid. Notice that each point can be associated with at least one, but potentially many centroids. Each mapper will have entries for several centroids, and each centroid will have points assigned to it across several mappers: that’s why in this MR job we also need one reducer per canopy (that is, per centroid).

Each reducer will then work on a single canopy, merging the lists for that canopy produced by each mapper; the final result will be the list of canopies produced by each reducer.

This subroutine described in lines #6-8 is also illustrated in figure 13.11.

This concludes our discussion on canopy clustering, and we encourage the reader to try to write down a job for this algorithm with one of the open source implementations of MapReduce, for instance, Hadoop, or check out Mahout, a distributed linear algebra framework by Apache foundation, that does implement a distributed version of canopy clustering.

Source: Rocca Marcello La (2021), *Advanced Algorithms and Data Structures*, Manning Publications (2021)