In this session, we're going to continue diving into the DataFrames API. So far we've developed a little bit of an intuition about what DataFrames are, how we can create them, we learned how to do transformations and importantly also aggregations on DataFrames. So we have a little handful of operations that we know how to do in DataFrames those in particular relational operations. And in this session, we'll continue to focus on the DataFrames API. In this session, we'll cover how to work with missing values or how to deal with things like null. We'll look at common actions on DataFrames, so we saw transformations and aggregations, these were both transformations of some sort. But we didn't look at any actions, yet, or so we think. Then we'll dive into joints on DataFrames, and finally we'll talk a little bit about optimizations on DataFrames. So let's get started with cleaning data with DataFrames. Anybody who works with real data sets knows that data is never pretty when you start with it. Often it's full of unwanted values like null or NaN which stands for not a number. In these cases, it's often desirable to do something about these unwanted values in particular often we like to drop rows or records with unwanted values like null or NaN. Or sometimes we like to actually replace those values with something that is less problematic than null. For example, maybe if null is in a numeric column, we'd like to replace it with zero. Spark SQL offers a handful of methods to help you clean your data. For example on the dropping front, there are a number of methods or overloads of this drop method that we can use to clean up some of these unwanted values. The most commonly used is the method called drop without any parameters. And if you call this method on a data frame, what this does is it drops rows that contain either null or NaN in any column and returns a new data frame without those problematic values. Which means if you have some row with many values in it and most of them are good but one is null, then that entire row gets dropped. That behavior might not be desirable sometimes. Perhaps we know we can deal with that null value and replace it with something else in some other way. In that case you might want to use the drop method while passing all to drop. And in this case, what we do is we just drop rows that contain null or NaN in all columns. So in other words, that just means that if we have a row with all null values then we drop the entire row but only if all of the columns are null or NaN. And finally, there's another variant of drop that we can pass an array too which contains column names and this means that we only dropped more records that contain null values or NaN values in these columns. So that means we only drop columns for example if the id and the name has a null value but we don't care about other columns with null or NaN values for example. The other class of operations that we can do to clean up our datasets has to do with replacing unwanted values with something else. There are few different variance of the fill method. This variant here that I have chosen to show if you pass a parameter to it, in this case, a number, zero, and then what we can do is replace all the occurrences of null or NaN in numeric columns. If we pass something numeric to fill with the specified values on this case. If there is a null or a NaN found in any column that has a numeric type, we replace the value with 0 instead. And of course we return a new DataFrame. This variant to fill gets a little bit more specific. In this variant we can pass a Map to fill. Where the keys are column names and the values are what will replace null or NaN with. So perhaps we want to handle different columns differently. So say we've got one column called minimum balance. And another column called name. Perhaps we want to replace minBalance if there's a null. We want to replace it with a 0. But if there was another column called name, we would maybe instead replace the null that we find in that column with some other string that could be more useful. And finally there's the replace method here which is good for replacing specific values. So let's say I messed up my data set accidentally perhaps there's a row or something isn't what it should be, I can make corrections with this replace method. In this case, what we are doing is we pass an array to replace and a map as well. So the array contains column ids and map contains values that should be replaced. So in this case, what happens is in the column called id, we replace the specified value which is 1,2,3,4 with 8,9,2,3 if come across 1,2,3,4 in the id column. So the idea is I can one off replace erroneous values for example using this replace method. So these are just a gist of some of the methods you can use to clean your data. These are the main variants of the three methods that we can use on DataFrames to clean up datasets, so drop, fill, and replace are the main methods. They each have a few variants, but we've covered most of them so far. You might have noticed that we covered transformations in the last session but we didn't say anything about actions. Just like RDDs, DataFrames had their own set of action and we've actually even used one of these actions several times already. In fact the actions that are found in DataFrames aren't that all interesting, they're actually very similar to the actions found on RDDs. So I'm not going to show you examples of each one of these things being used since we've already seen them many times already in RDDs sections but there is one that is particularly interesting and that's this one here called show. And this is the one we've actually already seen several times already. This one is rather special for DataFrames. So as we saw, this one displays the top 20 rows of the Dataframe in a tabular form so pretty prints it in a very nice way so it's easier to actually look at and visually parse table. The other common actions are actions that we already know such collect(), which returns an array that contains all of the rows in this case, of the DataFrame back to the master node. There's also the count operation, which returns the number of rows. So this is an operation that returns the number of rows specifically and then there's method first or head. These are both the same method which returns the first row, the entire first row in the DataFrame, so if you ever want to just take a peek at what's in your DataFrame, you can just grab the first row on top, using this first or head method. And finally, there's the take method which we're already also very well familiar with, the idea is return the first N rows back to the master node. So it returns an array of rows and once we get back this array of rows, we can then introspect it. And that's actually about it for actions. There aren't so many special actions on data frames. And in fact we're actually often using aggregations more than we're using these actions. Okay, so let's get into joins. What's pretty neat is that joins on dataframes are actually pretty similar to those on pair RDDs with the one major difference, that because we don't have key value pairs anymore, we have to specify which columns we should join on. Because with Pair RDDs, we were always implicitly joining on keys. Now, we have used a table, we have to say which of these columns in the table should behave like the key that we should focus on in our join. So just like in Pair RDDs, there are several joins to choose from. And we have to tell Spark which one we want to use. And to do a join, the syntax is just a little bit different. This is for example how we do an innerjoin. So assuming we have two DataFrames, one called df1 and the other called df2. We can do an interjoin by saying df1.join, and then passes a parameter df2. Now this here is how we tell Spark SQL which column should behave as keys. So in this case, each of these DataFrames, oops, this should be 2. Both of these should be 2. So in this case, each of these DataFrames DataFrames have a column called ID here, and we want to join on these ID columns. So what we do is we select the column via this dollar sign notation here. That we want to make sure that both of these IDs are equal when they're joined and that's it. We just pass the DataFrame that we want on the right side of our join, as well as which operators should be the keys, essentially. Unlike joins on RDDs, joins on DataFrames always use the same method named join h ere. In RDDs we were using different methods, left_outer join, right_outer join, to specify the different kinds of joins that we wanted to do. To specify that we want to do a different kind of join than an inner join all you have to do is pass another parameter. In this case we say right outer for the right outer joins. So we're saying that we want to do a right outer join with DataFrame one on the left and DataFrame two on the right. We want the ids again to be our keys essentially in our join. And then we say that we want we do a rewrite outer join, this is the third parameter here we passed the join. And that's all there is to it. Now and order to change this join type to anything else we can pass one of these strings instead of right outer. So if we wanted to change this to a left outer join we pass this string here. Or an outer join would just say outer for example. So let's look at an example. Recall this CFF data set that we saw from earlier in the course. So this was the data set with the train passengers and the train tickets. And let's try to adapt that to the data frames API. So this is the code for the data that we use in our example. I've adapted it a little bit to put it in to a DataFrame format. So I've made everything case classes. So now we have case classes representing the subscriptions, they're called Abo. The ID is the key which is subtype int. And the value is a pair of strings which represent the customer's last name and the type of subscription that they have. So that's the case class Abo. In the case class Loc, which stands for location, the key is an integer and the value is a string. And this string represents the cities that this customer most often travels to. So assuming that I have a sample dataset here, I can create two different DataFrames, abosDF. By calling toDF on the RDDs that I created in the previous example, in the RDD example. So I just used this toDF method that we saw earlier. And now I have a couple of dataframes. I have dataframes for the subscriptions called abosDF. And I have a dataframe for the locations called locationsDF. And if I call the show method on each of these two DataFrames, this is what it looks like. So in the case of abosDF, I have three records, customer 101, 102, 103, 104. And here's their information. So this is the last name and this is the type of subscription they have. So this person has an [FOREIGN] which is a free train pass all over Switzerland. And these people had what's called a DemiTarif which is a half price fare card, okay? So they get a 50% discount on their train tickets. And in this data set I have customer numbers and the cities that these customers most frequently travel to. So for example customer 101 frequently travels to Bern and Thun for example. And customer 103 travels quite a bit, they go to Chur, St. Gallen and Zurich. So these are our two data sets, this is what they look like. Now say we'd like to combine these two data sets. Such that we have a new data set that contains only customers that both have a subscription and where there exists location info. So how would you do it? I just showed you how to perform joins on DataFrames, so all you have to do is figure out the right join to use. And then you just have to put it into the shape that I showed you on the previous slide. So what would this program look like? Well we perform an inner join of course. Because, as the problem statement says. We want to combine these two data sets. Such that we have a resulting data set that contains only customers that have both a subscription and location in full force. So we want an inner join. That said assuming that we have abosDF and locationsDF, all we have to do is call abosDF. Then we say join locationsDF. We say, okay. Our key is going to be ID here. And that's it, that's all we do. This is by default an inner join if we don't pass a parameter to it specifying the join. So by default, join is a inner join unless we say otherwise. And this is what the resulting data set looks like. So you'll wind up with both id columns and their resulting data set. But this is correct, notice that we have all customers, 101, 102, and 103 in here, and not customer 104. Because customer 104, if you recall, didn't exist in the locations data set. So it's correct that customer 104 is missing in this case, in the inner joined case. So let's do another example. Let's assume the CFF wants to know for which subscribers, so this is the keyword here, subscribers. People who have subscriptions like the half fare card. We want to know whether or not we've collected location information for each of these subscribers. So it's possible to have the scenario where maybe somebody has some kind of subscription, like a DemiTarif. But they don't use the CFF app. So it's impossible for us to collect location information about them. It would be good to have a dataset where we could see situations like these. In case we'd like to suggest for example that this user should download the mobile app. So, if we wanted to join these two datasets together into that shows which location information is available, which join do we use? What program will be right with DataFrames to accomplish this task? I’ll let you try it for yourself. Well, the answer is that we would choose a left_outer join. So we choose a left_outer join because we'd like to keep all of the keys in the abosDF data set. So in this case, this is the one where we make sure we have an entry for each one of these elements. Which means that our resulting data set looks like this. So notice that there exists a record on the left side here for every single person in the subscriptions data set, in the abos data set. And everybody who has a subscription seems to use the mobile app except for customer number 104. This person has no values, so in this case this is a customer for which we have no location data. But we do have this person in our dataset, which is what we wanted. So now we could do a filter on this dataset. And we could then know to suggest to user number 104, hey, would you like to download the mobile app? So, as expected, customer number 104 has returned because we chose a left outer join. Okay, so now that we're familiar with the DataFrames API, we're familiar with all of the major operations. We've seen all of the untyped transformations that we care about using. We've seen aggregation and grouping operations. We've seen joins. We've seen actions. Let's revisit that example that we looked at in the first session on Spark SQL. So remember this example about selecting scholarship recipients with a certain dataset. In this example, we imagine that we're an organization called CodeAward. That offers scholarships to programmers who have overcome some kind of adversity. And we have two dataset in this example. We have one full of records represented by this demographic case class. Which contains demographic fields about a certain person. So each person has an ID number, they have an age. They have other information associated with them like the country that they're from. Or whether or not they served in the military, for example. And the other data set that we had Was one containing financial information. So we had many instances of this finance data type here which also contained an ID and then had financial information such as whether or not this person had debt, whether or not this person had financial dependence or student loans and also income information. So if we rearrange that data set into DataFrames. So imagine then we have two data frames full of these two pieces of information. So we have two data frames. One representing this demographic information and another one representing this financial information. One is called demographicsDF and the other one is called financesDF. So let's imagine now that our goal is to tally up and select students for a specific scholarship based on some kind of criteria. And if you recall the criteria that we chose was looking for students that are from Switzerland and who have debt and some financial dependent. This is the criteria that we want to search for in order to figure out how many people are eligible for a certain scholarship. So again we want to count these up, we want to figure out how many people are eligible for a scholarship of this criteria. How might we implement this program with the DataFrame API? If you recall, I showed you three different possible solutions to this problem. And I showed you also that the performance was vastly different in each case. So think about what you can do to achieve good performance when solving this problem. Well, that was sort of a trick question. Because if you use DataFrames, the sales point is that all the optimization should happen automatically for you thanks to the Catalyst query optimizer under the hood. That means that you can just write this query however is most comfortable, and Spark can usually optimize the unnecessary operations, and make it as fast as possible. So in this case, I would just do the most natural thing, like what we did in solution number one when I first showed this example to you. If you recall in solution number 1, the most natural thing to do is to first do a join and then to filter out the people who didn't meet the criteria that we had established on the previous slide, so. So our steps were join first, and then filter down our dataset so that it meets the criteria as a second step. And finally to sum it all up. So looking a little bit more carefully at our code, we take the demographics dataFrame. We called join, we passed to it the finances DataFrame and then we say that we want to chose as keys the IDs from both of these DataFrames. And finally we are just explicit here about the fact that we want to do an inner join, we want to keep all keys that exist in both datasets. And then we filter out, and then we pass through the people in the filter that have debt and they have financial dependents. And we pass through the people who live in Switzerland. And finally, we count the rows in our resulting data frame. So if you recall the performance numbers from the first session in this sequence of sessions about Spark SQL, you'll remember that the first solution, the one we did join first, filters later, took 5 seconds to complete. Whereas the second possibility which did the filters first and then join second, took only 1.3 seconds to complete. So, the second possibility where we re-ordered things and put the filters first and then joined second, so that we were joining less data had a 3.6 times performance boost. There's also a cartesian product that we could do that was 177 times slower. So how does the DataFrame solution that we just look at compared to these other three possibilities? Well, this is what it looks like. The DataFrame example's performance numbers are here. And actually it's even faster than these other two possibilities here because the cartesian product version is a 193x slower than this DataFrame version here. So the DataFrame version actually took only 1.24 seconds to complete as compared to 1.35 seconds for the filter filter join version. And 4.97 seconds for the join filter version, right? And notice I do join filter filter, it doesn't matter because, so what Spark does under the hood is it reorders these operations to try and get the best performance. And in this case, this is where we're at. It's 4x faster than this first naive solution here that we did with the join first on a regular RDD. So that's a pretty big difference, isn't it? Imagine that if instead of seconds or hours. Okay, so let's briefly cover this sort of optimizations that Spark does. I'm not going to dive deep into the query optimizer or which is a data encoding framework. I'm just going to give you a general gist of what these things do. To recall that Spark comes with two specialized backing components. On the one hand it comes with Catalyst which is this query optimizer that I mentioned and on the other hand it comes with this thing called Tungsten which is an off-heap data encoder or a serializer. So focusing on catalyst, remember this map that we saw earlier on about how Spark SQL fits into the whole Spark ecosystem. Spark SQL sits on top of regular Spark in RDDs. It takes in user programs with this relational dataframe API. So users are writing this relational code. And ultimately it spits out highly optimized RDDs that are run on regular Spark. And these highly optimized RDDs are arrived at by this tabulus optimizer here. So on the one hand you put in relational operations. Catalyst runs, and then we get out on the other end RDDs. So what's important to remember is that catalyst compiles Spark SQL programs down to an RDD. So if you recall this picture from an earlier session, on this Spark RDD side of things, there's not a lot of structure. Things are just blobs of account objects for example or some kind of objects. We don't have very much visibility into the structure of this data. And then we have this old peg operation, so peg function operation that does some kind of operation that we don't know what it is because we can't see inside of it. So there's a lot of ambiguity here. Not a lot of structure that we can easily see, which makes it really difficult to aggressively optimize. And on the other side we have these tables in dataframes that are very, very structured, very, very rigid. And we have this set of operations which is also very, very structured and very, very restricted. So we know more about what a user intends to do when they write in this sort of syntax than if they write in this sort of syntax here. So assuming that Catalyst has this information, assuming that it has a full knowledge and understanding of all the data types in the program, assuming it knows the exact schema of our data, so that it knows which fields that we have in our data set. And assuming that it has detailed knowledge of the computations that we would like to do, so assuming that it has these three things, there are number of optimizations that it can do. So one example is reordering operations. So the fact that all of our operations are still lazy and that we now have this structure gives us the ability to analyze and rearrange this gag of computations or logical operations that the user would like to do before they're actually executed and run in the cluster. So before this gag of operations, runs as an RDD. We can rearrange this stuff real quick. That makes it possible for Catalyst to rearrange and fuse together filter operations and push all filters as early as possible or push operations down to the data layer. Pushing filters up enables us to filter data out so we can do expensive operations on less data, like we saw with the join examples. Another sort of optimization that Catalyst can do is that it can reduce the amount of data that we must read. So, if we have a full knowledge of all data types that are possible to be used in the program. And we know the exact scheme of our data and we know the exact computation that will be done, I know exactly which field might not be used in a computation. This means I can move less data run the network because if we recall a few slides back. So if there are numerous fields that we don't need in our computation, in one of these blob of objects that exists inside of an RDD. In this case, we don't know that, because these are blobs of objects that we don't know much about inside of an RDD. There's no way for us to somehow focus on only a small piece of these objects, because, like I said earlier, we have no idea what's inside of these things. So that means if we have to do some kind of operation that involves Shuffling data over the network or moving some of these objects around. That means we have to serialize this entire thing and send it to another machine even though we know we don't need most of that. Not knowing the structure of the data could indeed be more expensive later. Also, it wastes memory if we don't need most of this sitting in memory. So, Catalyst can narrow down, select, and serialize, and send around only relevant columns of our data set. So, we don't have to send the full data set all over the network if we don't need to. Another kind of optimization that Catalyst does is pruning unneeded partitions. So, it analyzes the DataFrame and filter operations to figure out and skip partitions that are unneeded in some computation. So it can know in advance that we don't have to run a certain computation on certain partitions, which, of course, also saves some time. Of course, there are many more optimizations that Catalyst does, so I hope this gives you a little bit of an intuition about what some of these optimizations actually do. The other backend component that I mentioned earlier is called Tungsten, which is Spark's off-heap data encoder. So since we have a very restricted set of data types that we know literally everything about, that gives Tungsten the ability to provide highly specialized encoders to encode that data. So, we know how to represent that and we know how to serialize these things super efficiently, which is column-based. And I'll talk about why that's great in a moment, but its also off-heap, so that means its free from garbage collection overhead. So what I mean by highly specialized data encoders is that Tungsten can take schema information and knowing all of the possible data types that a schema can be built out of. It can super optimally and tightly pack the serialized data into memory. So this means more data can fit into memory and we could have faster serialization and deserialization, which is a CPU bound task. So this is a highly optimized, highly specialized, super performant data decoder. The idea is that we keep data serialized in a special format in memory, so that we can keep more data in memory and we can access it more quickly. The next feature of Tungsten is that it stores everything in a column-based format. So storing tabular data in a columnar format actually comes from the observation that most operations that are actually done on tables tend to be focused on doing operations on specific columns or attributes of the data set. So, when storing data, it's actually more efficient to store data in these columns, grouped in these columns rather than storing them grouped by these rows. The idea is that it would take more time to look up each individual element, some more specific in a row, than it would to just grab an entire column of data knowing that we have the entire thing already logically grouped together. And this is actually well known to be most efficient across all kinds of different database systems. So these highly specialized data encoders and decoders, they store everything in a column-based format. And finally, everything is off-heap. So Tungsten knows how to tightly pack all of this data in this columnar format off-heap in memory that it manages itself. So that means that we can avoid garbage collection overhead and garbage collection pauses and things like that. So to summarize, Tungsten lets us to put more data in memory and gives us a lot faster access to that data that's stored in memory. So taken together these two systems, Catalyst, the query optimizer on one hand, and Tungsten, the data encoder that's super performant on the other, these two systems offer ways to significantly speed up your jobs, even if you initially write your jobs very efficiently. So it knows how to rearrange these computations, it can store these computations in an efficent format. It can reduce the amount of data that it needs to send over the network because it knows what sorts of patterns of computation you want to do. So, Catalyst and Tungsten together are actually what provide these enormous speed ups. Well, that said, you might be wondering well, man, these data frames, they seem to good to be true. Of course they come with some limitations, so let's talk about some of those now. The first major limitation that you're going to come across, which maybe we haven't shown you in these slides, is that they're untyped. I mentioned that they're untyped but we really weren't bitten by it. So what happens a lot of the time with these untyped APIs is that code like this will compile even though there doesn't exist a column called state. So this code happily compiles and it runs but an exception is thrown at some point saying oops, sorry, nope. I can't find state, I only have columns street, zip, and price. I don't have a column called state. So this can be pretty annoying because errors aren't caught at compile time, instead they're caught as exceptions later on. It would be really nice if this error here was caught at compile time like we're already used to. We're used to the compiler catching us when we make errors like this, so this is a little bit uncomfortable if you come from Scala. It also means that you'll find yourself having to cast elements in rows to certain types that you know exist in the schema because rows aren't typed. So, this can be not just annoying because you have exceptions every now and then, but it can also be a little bit verbose and hard to read because you'll find yourself writing row.AsInstanceOfInteger, for example, which nobody wants to really do. We want to just talk about specific fields, for example. So things being untyped in DataFrames can be a little bit annoying sometimes, especially when you come from Scala and you're really used to this nice compiler that helps you out a lot of the time. Another limitation is that we can only use a limited set of data types. So if your data can't be expressed by case classes or products or these standard Spark SQL data types, it might be difficult to ensure that a Tungsten encoder exists for your data type. So one example of a situation where using DataFrames might be problematic is if you have an application which already has some kind of regular Scala class that's super complicated, and you want to create a DataFrame or a data set of this super complicated thing that you already have. In this case it might be difficult to break it up and represent it as case classes, products, and these SQL data types. So having these limited data types can sometimes be difficult. A big reason that DataFrames can be difficult to use is that perhaps your data set isn't semi-structured or structured, perhaps you have an unstructured data set. So in this case, DataFrames wouldn't be appropriate because you don't know the structure of your data, because you don't have any real structure in your data. You don't have some kind of schema. It's not self describing like JSON is or it's not some existing table that already has an existing schema. There is no structure to it, so these cases it might be better to just use regular RDDs.