In this session, we're going to dig deeper into the DataFrame API. So far, we got a little bit of an intuition about what DataFrames are, how they fit into this whole Spark SQL thing, and we saw how to create them. We saw that if we have a DataFrame, we can use SQL syntax and do SQL queries on them, which is pretty cool. But it doesn't just end there. DataFrames have their own APIs as well in addition to being able to use just arbitrary SQL syntax on a DataFrame. In this session we'll focus on the DataFrames API. In particular, we'll dig into the available DataFrame data types, we'll look at some of the basic operations that are available on DataFrames that you should know about, because it's not like RDDs. And finally we'll look at how to do aggregations on DataFrames, because there's also a little bit of a different way to do that as well. Okay, DataFrames in a nutshell, let's recap some of the key points to remember when using DataFrames. So the first point is that DataFrames can be thought of as just a relational API over Spark's RDDs. So you're running ultimately on top of RDDs, the same RDDs we learned about in previous sessions, but we have a nice relational API over them now. And the reason for this is because it's just sometimes more convenient to use a declarative relational API, than it is to use functional APIs for analysis jobs. We saw this in the last example, in the last session, where we saw this nice little query, where we were trying to select some information about some employees, and then filter it and sort it. And it was done in almost no code at all and it was extremely clear to read. It would have been a little bit more involved and a little bit less clear to read if we had done it with a functional API. The next key point to hold onto about DataFrames is that they're able to be aggressively optimized automatically. So Spark SQL takes many years of research in the database communities. When we apply them in Spark, now that we have some computations that are built up in sort of a relational way, we can apply these relational optimizations to get the same optimizations that people in the databases community have been enjoying for many years. And finally, one thing that you're really going to have to remember, maybe it won't bite us right in the beginning of this session, but it definitely will later, is that DataFrames are untyped. So the elements within DataFrames are rows and they're not parameterized by some types. So the Scala compiler can't type check Spark SQL schemas in DataFrames. They're just opaque to the Scala compiler. There's all kinds of analysis done by Spark SQL, but the Scala compiler doesn't do any type checking. So they're effectively untyped. So as I've mentioned a few times now, Spark SQL's DataFrames operate on a restricted set of data types. This is in order to enable optimization opportunities, so we always know exactly what the data looks likes. So here is a list of the basic data types in Spark SQL. So in this column we have the Scala types, so all of the types that we're used to already seeing in this course, and these are the types that SQL sees. And this is a description of each of the types. These are all pretty standard Scala types, or pretty standard primitive types. We have things like byte, shorts, ints, longs, we have strings here at the bottom. Interestingly enough, there is a timestamps and dates from Java. We've things like floats and doubles, BigDecimal, these are all pretty uncontroversial data types. But these are the data types that Spark SQL uses. These are the core data types that Spark SQL is built around. In addition to these basic data types, Spark SQL has a handful of complex data types available to use, such as arrays, maps, and well, in this case, case classes, but structs. So these are complex data types that can contain basic data types or other complex data types themselves, so you can build up richer data types using these things, these arrays, maps, or in this case, structs. Since each of these data types are represented a little bit differently in Spark SQL, because of their history in regular SQL, I'm going to run through each of them to show you sort of the main differences. And really, the main difference has to do with this containsNull thing here. So, in this case an array, for example an array of string, in SQL is an array type with a string type, which is its parameter with this boolean here. And in this case this boolean is this containsNull thing, which is set to true if the elements in the ArrayType can have no values. So if it's possible for this array to contain nulls, we're going to have this conditional set to true, otherwise it'll be set to false. So there's always this additional piece of data hanging around here called containsNull. So we have not just type information about the type contained in the array and the fact that it's an array at all, but we also have some knowledge about whether or not this thing can contain nulls. The next complex data type that we're going to look at is a map. Just like maps in Scala, maps have keys and values. And the SQL version of this has types that correspond to the keys and types that correspond to the values. So a map type contains a keyType and a valueType, as well as this Boolean here, which represents whether or not the valueType can contain null, okay? So it's similar to what we just saw in arrays, but instead this containsNull thing refers to the value instead. So whatever is in this value, so in this case a string, if this value can be null, then this is set to true. So we know the type, this is nullable basically. And finally Struct. So Structs are little bit less rigid than arrays and maps. The Struct type contains a list of possible fields of different types. So unlike arrays and maps, which were one type or two possible types, structs can be any arbitrary number of types. But you have to have something called a struct field, which contains a name for the field, the type of the field, and then again, this nullable value here. So you can build up more interesting data types this way. And this is how you can take, for example, something like a case class and map it to something that SQL can understand. So, in this case we'll be using case classes in Scala but in SQL the same case class would look like this. So if we have a case class person with a field name and a field age, the name is type String and the age is type Int. This can be realized as a StructType in SQL with a list of fields. One field, so one Struct field, has the name name. It has the type String, and it's nullable, it's possible to be nulled. And again, another field which has name age as type integer and also is nullable. So this is how we can represent, for example, a person case class object here. So these are the data types that Spark SQL can operate on. This is pretty important because if for some reason your data type doesn't fit into one of these shapes, it can't be worked on in Spark SQL. So just to repeat, Spark SQL has a restricted set of possible types. Nothing fancy happening in these types, you have basic structs, basic arrays, basic maps. And then you have this list of effectively primitive types that I showed you on the previous slide. And every computation that we will be doing in Spark SQL has to be able to be represented by these sets of types. Just to show you that it's possible to arbitrarily nest these complex data types, here's an example where I've taken some Scala types here. So there are actually rather complex and nested, so I have a type called Account, which contains a field called balance, and a field called employees, which is an array of Employees. But Employee is defined as another case class with these fields here, with primitive fields, Int, String, String, and finally there's another case class called Project. With a type String as a title, and then another array of these Employee objects and then a value of type Account here. So this is a rather complex data type this Project thing here. So if I wanted to represent this Project type, I could do it just like this within SQL. So it's super involved and difficult to read. But this is actually just a realization of this Project type here, which contains Accounts and Employees types. So you can basically write this code and then expand it to be this, right? So this here is project, right? And I'm sorry about the wonky, True is here something happened in the PDF readar. But, these are all struct fields and these true things are the nullable Booleans. Things like arrays, you have things like again more Structtypes, within Structtypes. And then you have, of course, primitive type so here is a StructField of type StringType with the name name, for example. So again, these things can be nested and built up to represent reasonably complex data types. One important note. So, this is actually going to come up again and again throughout Spark SQL. Spark SQL really requires a lot of imports. Throughout this and the next few sessions, I've tried to tell you everywhere where we might have to have some special imports to use one of the things that I'm showing you. Though if ever in doubt, have look at the Spark docs. In this case, in order to use any of the data types, you need to import all the Spark SQL's types so that can be done like this. So again, if you're ever going to use Spark SQL data type, you actually need to explicitly import the types that Spark SQL operates on. Okay, so what sorts of operations are defined on DataFrames? So far all I told you is that, the DataFrames API is all about relational operations, relational API. So I said simply, the DataFrames API actually is quite similar looking to SQL, example method includes select, where, limit, orderBy, groupBy, join. You might notice that this is quite different looking from RDDs that you're already used to because we've been operating on things like maps and filter and with reduces and stuff like this, we've been passing functions to higher order functions like map, produce and filter, right? So the main difference between the RDD API and the DataFrames API was that, when introduced the DataFrames API was designed to accept just the Spark SQL expressions instead of arbitrary user-defined function literals, like we used all over the place when we were learning about RDDs. And the reason for this was to give the optimizer a restricted set of operations that it already knew how to optimize based on, so these relational optimizations that needed relational operations do those relational optimizations. Right, so here's a selection of some of the more often we use methods, but we'll go into more detail in subsequent slides. But first before I do that I gotta show you one important trick so you can actually see what you're doing with these relational operations. So, to see what your data looks like. So there are a couple of methods in Spark to show what your data looks like. The first that I'm going to be using all the time is called show. It pretty prints the DataFrame in a tabular form. And it shows only the first 20 elements. So it looks something like this. So if I have this employee DataFrame again, right, which we use toDF on, and if I do show on it, it prints out this nice thing here which shows all of the elements, in this case there's only four. Otherwise, it would show up to 20. But it print these out nicely. So you can visually sort of parse this easier. And then another operation that we can use to get a look at our data is called printSchema which prints the schema of the DataFrame in tree format, so here's an example of what this looks like. I'm using the same exact DataFrame here. So this employee DataFrame that we saw earlier and so here's what it looks like when we say printschema. So, this is actually a quite simple schema it basically, this is the root and there's just one level of fields here. I have just five fields, one of type integer string, string, integer string, right, and all of them are nullable. Remember this nullable thing that we saw in previous slides? There's always this nullable Boolean that has to hang around to say whether or not one of these data types can be nulled out for some reason. Okay, so let's actually look at some of these transformations. Two things to remember, we're talking about transformations right now which means that transformations always return a data as a result and second, don't forget that they're lazily evaluated, okay, just like regular RDDs, these are lazily evaluated too. Some of the most common transformations include, select, which I'm sure you already have kind of a idea of what it might do. It selects a set of name columns, so more than one column potentially and it returns and you DataFrame with these columns as a result. We saw this earlier when we said, we wanted to select two out of the, i don't know, five or several columns in the SQL query that we did in the previous session where we took the employee's last name and employee's ID number out of our DataFrame and returned a new DataFrame with those columns. So just visually, if you have If you have a table with four columns and then you say, I want to select columns A and D then you get back a new DataFrame or new table I guess with just these two columns. And you select by passing the names of the columns to the select method.They are here other old versions of this. You can have a look at this part to see the different choices for selecting columns but this is ultimately the idea here. The next transformation that's going to come up is called aggregate. So, this one is peculiar and we're going to go into more detail in to it later in this session. But, the short story is that aggregate performs aggregations on a series of columns, so on one or more columns and it returns a new DataFrame with the calculated output with the aggregated output. Okay, so we'll get more concrete about what that means a few slides from now, but moving on group by, which you can guess what that might do. So it groups the DataFrame using the specified columns. So, it's important to note that this is a little bit of weird method here. I say that it returns a DataFrame, but it doesn't exactly return a DataFrame. Because this groupBy method is actually intended to be used right before an aggregation. So typically, you would be doing a groupBy and then an aggregation after the groupBy. So we'll go into a little more depth when we talk about aggregations about what this groupBy exactly does but you can think about it right now as grouping the DataFrame by some specified columns, okay? And then, we'll do aggregation on it in the second step. And finally, one of the more commonly used operations is, of course, a join. There are many ways to interact with joins. We'll go into detail in the next session about all of the ways to use joins. Just remember that, all of these things are transformations. Either they explicitly return a DataFrame or they indirectly return a DataFrame as a result. Of course, there are several more transformations such filter, limit, orderBy, where, as, sort, union, and drop. These are all basically the same operations that we know from SQL. All of these along with their overloads can be found in the Spark SQL docs. So I'd like to show you an example of how to use some of these transformations. But before I do, I have to talk a little bit about columns. So if we go back to the previous slide, we'll note that sometimes you pass Strings to this operations and sometimes you pass a Column. And, usually the Strings are meant to refer to a Column but what's a Column? So how do I actually talk about this columns in the context of this Spark SQL programs? So there's a few different ways to specify columns and it could sometimes be confusing if you just jump in to a code so that anybody telling you what this things mean. So the main three ways that you'll see people referring to specific columns in the DataFrame are as follows. Most commonly you'll see this $-notation which is just a short-hand for specifying a name of a specific columns. So, if your calling for example filter on some DataFrames, so I'll go DataFrame .filter, if I want to filter on the age column, I can do $ and then in quotations put the name of the column and then I can do whatever operation, whatever SQL operation I want to do. So, this basically means the age column of this df, dataframe here. So, another way to refer to a specific column is to actually explicitly refer to the DataFrames, so we can, just like above, we can explicitly say, okay, the df DataFrame here and the the age column we pass a string to it. So, we pass age as a string and then that's a signal to Spark SQL to look in this DataFrame for that column. So and finally, the last way to do it is using a SQL query string. So just like what we saw when we were actually writing out these SQL queries in this syntax like this, where the SQL query was a string here, Spark SQL actually parses that with a SQL parser and then figures out what are columns and what are operators from this string that you pass to it. So you can do the same thing here where you can pass sort of a SQL query kind of and hope that Spark SQL can parse it correctly. There's sometimes some errors with this. So I would, if I were you I would probably go with one of these two methods over this method, because these are a little bit less error prone. Okay, so now that we know how to refer to columns inside of these DataFrame operations, let's look at an example. Recall the example SQL query that we did in the previous session on this data set of employees. So we've been looking at this employee DataFrame now for a little while. Rather than using SQL syntax like we did in the last session, let's convert our example to use the DataFrame API. So if you remember what this example asks of us, it asked us to obtain just the IDs and the usernames of the employees working in a specific city. Say in this case, you want to select just the people living in Sydney, okay? And then let's sort the result in order of increasing employee ID afterwards. So once we've grabbed these columns and then filtered out the people who live in Sydney, then we sort them by the employee ID. So how do we solve this with the DataFrame API with some of the methods that you've seen so far? I'll let you try it first. The DataFrame solution actually looks a lot like the SQL query that we did in the previous session. In the previous session, we used this, a select statement followed by a where statement, followed by an orderBy statement. We can do essentially the same thing here. So given the employee DataFrame, all we have to do is select the ID of the employee and the last name of the employee. So now we've selected those two columns, and then we filter out everybody who does not live in Sydney. And finally, we order the records or the rows by the ID. And as always, it's nice to try this out in a real dataset. So this is the input dataset, so this what employee DF looks like. So, in this case we have these four records. And after running the Spark SQL expression here, we get a new DataFrame called sydneyEmployeesDF, like this, with only two columns. And then the ID numbers and last names of the people who live in Sydney, sorted by the ID Numbers. So David Walker is now first because he's 221. He lived in Sydney, so he's the first element, and then the next one is Slate Markham, who is now second because his ID is 645, and he also lives in Sydney. And then New York people are nowhere to be found. So here's an example of a few transformations on a DataFrame. Also it's important to note that there are two methods available for filtering in Spark SQL. One is called filter and one is called where. Both of them are the same. One is just an alias of the other, so you can always think of these things as completely equivalent. When filter looks better write filter, when where looks better write where. And also note that it's possible to make filters a little bit more complex too. We can have Boolean expressions and, id or or. And we can also use several different columns in these expressions. So we can compare between columns as well. The only thing to notice is that if you do a lot of this, it can be a little more difficult for this Spark SQL query optimizer catalyst to optimize your queries. The last topic I'm going to cover in this session is grouping and aggregating on DataFrames. I mentioned kind of awkwardly earlier on that the aggregate method is meant to be used after a call to the groupBy method. Let me show you a little bit more of what I mean here. So there are some special operations for grouping and aggregating, and this is because one of the most common things that you want to do with a table is to group data by some column, by some attribute, and then to do some kind of aggregation on that column, that attribute, like a count. So these things are a pretty common pattern. For grouping and aggregating, Spark SQL provides this groupBy function that I mentioned earlier, which returns this strange thing called a RelationalGroupedDataSet. This is when I said the type was simplified. We'll talk about that in a second. And then it has several standard aggregation functions defined on it. So these are defined on this RelationalGroupedDataSet, some of which include count, sum, max, min and average, amongst many others. So you have kind of this two step thing going on here, you've gotta do groupBy. You get back this RelationalGroupedDataset thing, and then you get some of these aggregation functions that you could potentially use. This can get a little hairy with the types, so the API docs are really your best friend here. But the basic pattern you have to remember is that you have some DataFrame, you do a groupBy on it, then you call some other method like agg here, which I'll talk about in more detail in a moment, or count. Things like this. So, this count operation is defined on this RelationalGroupedDataset thing, and it returns another DataFrame. So then you get back to DataFrames again. So this API basically requires you to go through two steps. First call groupBy and then then call some other operation to get you back to a DataFrame. This can look a little complicated so let's look at an example to clear it up. So, let's assume that we have a dataset of homes currently for sale in one entire US state. Let's try to use these grouping and aggregating operations to calculate the most expensive and the least expensive homes for sale per zip code. So we want to do this per zip code. We want to find the most expensive and the least expensive per zip code in this entire state. So assuming that we have a DataFrame full of these listing things here, how could we compute these most expensive and least expensive homes for sale per zip code? I'm going to let you try it. Feel free to go back to the last slide if it helps. But see if you can use groupBy and some kind of aggregate function to do these calculations. Here's how we can do it. First things first, we're going to have to import another SQL import. So this one's called functions and it gives you these things here, like the max and the count and all of that. So you're going to have to import that if you're going to want to use any of these grouping and aggregation functions. Otherwise, you just do groupBy zip code, which is pretty straightforward, right? We're grouping by the zip code, and then per each zip code, we want to know what the maximum price is of the listing. So the price is of type Int. We can easily calculate the max. And we're grouping by the zip code, which is also an Int. So now we have the maximum price per zip code, and likewise we can do the same thing with minimum, so we can just groupBy the zip codes. So now we have groups of zip codes and then we can choose the minimum price you need to zip code. This is extremely readable, extremely straightforward. If you want to see how this looks on real data, I encourage you go to the data [INAUDIBLE] community edition notebooks and to try this out on a real dataset. You can almost exactly copy the code here. Okay, that was kind of easy, wasn't it? Let's try something a little harder. Let's assume that we have the following dataset representing all of the posts in a busy open source community's discourse forums. So just think forums with subforums, okay? And we have this case class Post here which represents some activity or some post in this forum. Each post has an author ID, it has a location where this post was made, in this case, it was made in a specific subforum, and a number of likes for that post, as well as the date that it was posted. Now, assuming that we have a DataFrame pull up those posts, let's say we'd like to tally up each author's posts per subforum, and then rank the authors with the most posts per subforum, okay? So, basically what we want to do is we want to count up all of the posts that people are making in each subforum, then we want to basically figure out who was the person with the most posts in each subforum. So maybe we want to give this person a community award or something so we want to know who was the most busy in each one of these subforums, okay? How can we do this using these grouping and aggregating functions on DataFrames? I'll give you a few minutes to try it yourself. Okay, so first things first, we've gotta import our SQL functions here. And after that, we just gotta do groupBy and aggregate and an orderBy. So let's step through this. So we want to group author IDs and subforums together, because we're trying to figure out who's posted the most in each subforum. So we need both the author and also the subforums. And then we do an aggregation on these. In this case, I'd use this interesting aggregate method which itself take another call to a function. I actually didn't need to use this here, I could have just used count, but I wanted to show you what it looks like to use it. And then I specify that I want to do a count on the author IDs. This gives me a new DataFrame, with the following columns. I get an authorID column, a subforum column, and then a count column. So the number of counts for per that authorID in that subforum. Okay? And finally, I can just rearrange the order, and I use orderBy. So I have two parameters here because I want to first order by the subforums, and then I want to order by the counts. But notice I do this desc here, which says, okay, I don't want it to be in increasing order, I want it do be in descending order. So, the count column will be in descending order, okay? So what does this look like in real data? This is maybe not the most beautiful dataset that you're going to have a look at, but note that there are two subforums. They have actually similar looking names if you glance really quickly. One is called design, another is called debate. And let's just say I have three authors, authorID 1, 2, 3. Author ID 1 posts a lot in the design forum, for example. And this is what the result of the groupBy aggregate followed by the orderBy looks like here. So as you can see, I've ordered by the subforums now. So I've got the debate subforum first, followed by the design subforum. And then I have the counts in decreasing order. So the author with the most posts in the debate forum is author number 2, and the author with the most posts in the design forum is author number 1. As you might have noticed, there's kind of two APIs here. There's this API that's available inside of this RelationalGroupedDataset thing here, and there's also an API available for use with this aggregate method here. And each of these have individuals scalar doc pages that you can look at to see what operations are available to use. So there is a set of operations available within this aggregate thing here, and you can figure out what all those operations are by going to the spark.sql.functions page of the API docs, you can see a full list. And if you want to see a full list of the operations that you can call following a groupBy, have a look at the API docs for this RelationalGroupedDataset thing here.