In this session we'll cover something called partitioning which comes in to play when shuffling data around your cluster. Partitioning your data intelligently can often give you a lot of time when running computations. It's important to understand in general with distributed systems and in particular with dealing Spark RDDs. In the last session we focused most of the session on the specific example involving the group by key operation. We saw that using this operation causes data to be shuffled over the network. This is because to group all of the values with their corresponding key, we've got to move all of the values to the same machine. But we ended last session asking ourselves, well wait I understand we have to move data around the network. So some data is all on the same machine, but how do we know which machine to put which key on? So before we try to optimize that example that we saw in the last session any further. Let's take a detour to learn more about what partitioning is, and how it works in Spark. Let's first start with partitions, what are they? Simply put, the data within an RDD is split into many partitions, and partitions are very rigid things. Most importantly, they never span multiple machines, this is super important. Data in the same partition is always on the same machine. Another point is that each machine in the cluster contains at least one partition. Sometimes more, sometimes exactly one. Actually, the number of partitions is configurable, by the way. We'll learn about when and how the number of partitions can be changed by the user. Though by default, when a job starts the number of partitions is equal to the total number of cores on all executor nodes. That's the default. So for example if all of the machines in your cluster have four cores and you have six worker nodes then that means the default number of partitions you can start with may be 24. And importantly spark comes with two out of the box kinds of partition which makes sense for different sorts of applications. These are hash partitioning and range partitioning, we'll talk about these in more detail in the next few slides. However, it's important to note that all of this talk about customizing partitioning is only possible when working with Pair RDDs. Since, as you'll see, partitioning is done based on keys, so we partition based on keys, hence, we need a Pair RDD to do that, so let's start with hash partitioning. To illustrate this first kind of partitioning let's return to our groupByKey example that we saw in the previous session. As we saw on the last session by defaulting this example hash partitioning is used but what is it? How does it work? What does it look like? The way it works is like this. Since groupByKey knows it has to move all of the data around the cluster, its goal is to do that as fairly as possible. So the first thing it does is it computes the partition p for every tuple in the pair RDD. So we start by getting the key's hash code And then we modulo that with the default number of partitions. So we said it was 24 in the last slide. And whatever the answer is of this computation, is the partition that that key goes onto. Then when we actually do the hash partitioning, the tuples in the same partition are sent to the machine hosting that partition. So again the key intuition here is that hash partitioning tries to spread around the data as evenly as possible over all of the partitions based on the keys. The other kind of partitioning is called range partitioning. This is important when some kind of order is defined on the key so examples would include integers or charge or strings for example and if we think about our previous examples where. We were working with pair RDDs that had keys that were integers, we could hypothetically range partition these keys. For these kinds of RDDs with keys that can have an ordering, range partitioning could be an efficient choice for partitioning your data. Intuitively though, that means when you're using a range partitioner, keys are partitioned according to two things. They're partitioned according to some kind of order, so like I said a numerical order electrographical ordering as well as a set of sort of ranges for the keys. So then you have groups of possible ranges that the keys can fall within. And that means that tuples with keys in the same range will appear on the same nodes. Let's look at a more concrete example each kind of partitioning must start with hash partitioning, so consider we have a Pair RDD with these keys. So we don't care about the values for now but these the keys that we have in our pair RDD. Let's say our desired number of partitions is 4. Also just creating simple let's assume that the hashCode function is just the identity function for this example. And let's remember that you compute the partition what we do is we call the hash code on the key. Which like I said, it's just the identity function and we modulo that with a number of partitions. So in this case this will simplify to just the key, modulo 4 for the sake of this example, so let's compute the partitions that these keys belong in. So if we try to compute the partitions for each of these keys and we use the formula on the slide previous like this, And something wonky happens, this hash partitioning distributes our keys as follows amongst our four partitions. Remember the goal of hash partitioning is to try and evenly spread out the keys, though it's possible to have situations like these where you have an unlucky group of keys that can cause your data. When you're clustered to be unevenly skewed, and for some nodes to have a lot more data, and for some nodes to have none, or a lot less. Of course this means potentially bad performance, because a job could be evenly spread out on four nodes. And in this case, it's basically just spread out on one node, right now, so it's not really very parallel. In this case, since we know the hash partitioning is actually skewed, and since our keys have an ordering and are non-negative. We can use range partitioning to improve the partitioning, and to even it out significantly. So assuming that there are a set of ranges for four nodes, for example, for customer IDs 1 to 200 on the first node, customer IDs 201 to 400 on the second node. Customer IDs 401 to 600 on the third node, customer ID 601 to 800 on the fourth node. This would enable us to distribute the keys more evenly across our partitions. So here's a scenario where we can more easily distribute our keys through out the cluster, that is we just put the keys in their corresponding ranges. So all keys, between 1 and 200 on the first node, all keys between 201 and 400 on the second node, etc. This is clearly much more balanced and of course we can imagine it being much more performant than before because the work is more evenly spread out. Okay but what if I wanted to customize how my data is partitioned? We illustrated the concepts behind the has and range partitioning but we haven't seen yet how to customized and set partitioners for any of our spark jobs yet. Well there are two ways to create RDDs with specific partitioning's. First you can exclusively, called the method partitioned by on an RDD and you can pass in an argument which is an instance of some kind of partitioner. Or second, you could just keep track of the transformation that you use in your RDDs because certain transformation use certain kinds of partitioners. So if you know the partitioner associated with the transformation you're using, you can also customize the partitioning of your data that way. Let's first look at the partitionBy method. So if you try to customize your data partitioning by using the partitionBy method, the result is that partitionBy creates a new RDD with a specified partitioner. So let's imagine for a second that we have this RDD of purchases that we've been working with in the previous sessions. And let's say that we want to use a rangePartitioner on it now to do this, we first need to create an instance of a rangePartitioner here. This is done by calling new rangePartitioner and passing in the number of partitions that we want. In this case 8, and we'll also have to pass reference to an RDD that we'd like to have partitioned. In order to come up with these ranges, we need to sample the RDD that we'd like to have partitioned, so that's why we have to pass in pairs here. Sparkle take pairs and sand ballot to find out what the best ranges are. Now we simply invoke partitionBy on the pair's RDD, and we pass in the new ranged partitioner that we just created. Do you see anything else interesting on this slide? I'll give you a moment to think about it. Yes, it's this call to persist here. Why would it be desirable to call persist here? Well, the answer is new semantics and tendency to reevaluate chains of transformations again and again and again. This data would be shuffled over the network and partitioned again and again and again if we don't tell Spark. Basically what we're doing here is we're saying hey, once you move the data around in the network and repartition it just keep it where it is, persist it in memory. Else you'd easily find yourself accidentally re-partitioning your data in every iteration of a machine running algorithm, completely unknowingly. So just to repeat, to create a range partitioner you must simply specify the desired number of partitions, and then you must provide a pair RDD, with keys that are ordered. So that Spark is able to sample the RDD and create a suitable set of ranges to use for partitioning based on the actual data set. And again, I can't remind you enough, the results of partitionBy should always be persisted. Otherwise, partitioning is repeatedly applied, which involves shuffling each time the RDD is used and of course you don't want this to happen. Earlier in the session, I mentioned that it was possible to partition using transformations as well. There are two ways partitioners can be passed around with transformation. In the first case, and probably the most common case, partitioners tend to be passed around via parent RDDs. That is paired RDDs, which are the result of a transformation on an already partitioned paired RDD, is typically configured to use the same HashPartitioner that was used to construct its parent. The other case where transformations can actually provide a new partitioner is when some transformations automatically result in an RDD with a different partitioner. Of course this is usually when it contextually makes sense so for example if you use the sortByKey operation, a RangePartitioner is used because you need ordered keys in order to sort them. They make sense that the keys once sorted are partitioned such that the similar keys are in the same partition. Conversely, the default partition when using groupByKey hash partition like we saw in earlier example. Here are list of transformations that either hold on to or propagates them kind of partition. This is important to remember because it could bite you for performance reasons later. These are operations that could result in a completely different partitioning for your data. So it's important to remember that by just using some kind of transformation, that something like this can happen transparently under the hood. One big thing you should notice though is that a certain function that you use all the time is not on this slide. That's right map is missing and so is flatMap. Why aren't on this list? Interestingly though, flatMapValues and mapValues are on this slide and filter as well. And in these cases, these three operations hold on to their partitioner if their parent has some kind of partitioner defined. Otherwise any other transformation operation that's not on this slide will produce a result that doesn't have a partitioner, and that includes map and flatMap. If you use map or flatMap on a partitioned RDD that RDD will lose its partitioner. That could really screw things up just think about it for a second. If you've gone to great lengths to try and organize your data in a certain way across your cluster and then you use an operation like flat map. Then you lose all of that partitioning and your data might be all moved around again but let's focus on this point right here for a second. Why is it that all other operations will produce a result that doesn't have a partition? Or why does that make sense? Think about the map transformation for a moment. Assuming we have a hash partitioned RDD that we intend to map on, why would it make sense for the resulting RDD to lose its partitioner in its result RDD? Think about all of the things you could possibly do in a map operation and remember that partitioning only makes sense for Pair RDDs which are key value pairs. I'll give you a moment to think about it. Well, that's because it's possible for map, or flat map, to change the keys in a Pair RDD. Here's an example. We can take a very intelligently crafted data set, called RDD in this case, and we can accidentally replace all of the keys in the entire data set with the same key. Which is just the story doh. We could do this intentionally or accidentally doesn't matter. So therefore if the map transformation was able to preserved the partitioner in the result RDD, it may no longer make sense because maybe the keys are now different. So if I have some partitioner that make sense,therefore if the map transformation was able to preserve the partitioner in the resulting RDD, it may no longer make sense because now the keys maybe different. So they will no longer correspond to the correct partitions any more, that should make sense with this doh thing right? Because if I had some keys that made sense before then I would just replace them with the doh. Then what? Now the partitioner doesn't correspond to the keys that I have, this is why the mapValues operation is so important. And this is why you should generally try to use it if you can, so if you're ever working with a pair RDD, you should try to first reach for mapValues. This operation makes it possible for us to still do map transformations While making it impossible for us to change the keys in a pair of RDD. Thus making it possible for us to keep the same partition around. That's clever, right. So, if we do all of these work to intelligently partition our data across our cluster, we can still do a map operation. And we can still keep the partition around that corresponds to how our data is organized by using the map values function. That's great, in the next session we'll continue learning about how partitioning can significantly reduce data shuffling. And how it can be a really huge boon for performance if tuned carefully.