In this session, we're going to talk about the last and the newest major Spark API called Datasets. To motivate Datasets, let's start with an example. Let's say we've just done the following computation on a DataFrame. And let's just say that our DataFrame is full of these things called listings here, and each one of these listings represents a home for sale on the market. What we've done with this data is we've calculated the average price of house for sale per zip code. So we've gone through all these listings and for each zip code we calculated the average price of the homes for sale in that zip code, okay? So that's what the program looks like here. So all we do is we take our listings DataFrame, we say groupBy the zip code, so now we have everything collected in groups per zip code. And then finally, for each group what we do is we call the average function and we pass to it the name of the price column because we want to calculate the average of that column. So that's the computation that we've done. Great, wonderful, seems just fine. Now let's do something completely normal, let's call collect() on it to kick off this computation, and let's try to bring back averagePricesDF in the form of an array back to the master node. Oops or no, or what was this thing again? I don't remember, I thought I was getting back an array of doubles representing the average prices. What is this row thing here? We've seen them in the last few sessions, but we didn't really look at them very closely. What's in this row thing again? Okay, that's right, because a row is untyped, we have to cast the stuff in the row, but that means that I have to remember the order of the columns in that row so I know which things to cast. So you might ask yourself, okay, well, then how many columns was my result again? And what were the type of those things, because now I've gotta count them and then indexed into them and then cast them to the correct types, so here's one attempt. So I have a value called averagePricesAgain. And what I do is I take the array of row objects, and for each row I say, okay, the first thing in my row I'll just cast it to a string because that's something, I don't remember what that was, but I think the second one was the price, let's just cast that to an integer and let's see how it works. Nope, got a ClassCastException, so that was wrong. Let's try again. Instead of randomly trying different casts, let's look up the API docs for Row. Okay, so I can call head on my array of rows. And then once I have a row, apparently there's a method called schema, which gives me the schema, and then I can pretty print it nicely in the form of a tree. Okay, wonderful, so this is my schema. The first element in the row is an integer, and the second element in the row is a double, that's great. Okay, so my average price is this one here. Good to know. Okay, so let's try that again. Hooray, so now I've casted this correctly. So this is an integer, it matches with the first element in the row, and then the second element is double. Wonderful, it all works. It's just really ugly, it's really hard to read, I have no idea what's happening here. And as you could imagine, I could be randomly trying different casts until something works, and I could still be wrong, so this is pretty error prone. But hey, at least it worked. Though wouldn't it be nice if we could have both these wonderful Spark SQL optimizations and actual typesafety, so I didn't have to do this. I would really much rather select the price in a way like this, where I just call .price on something and it turns out to be all the right type, and everything works. This is really inconvenient. So wouldn't it be cool if I could have these Spark SQL optimizations as well as that typesafety, because on DataFrames we don't have that typesafety, everything is untyped, remember? Well, that's where Datasets come in. They're kind of the Holy Grail, because they combine these nice optimizations with typesafety. Okay, maybe that's too extreme of a metaphor, but you get what I mean. Now before I go any further, I have to admit that I've been holding some information from you that's rather important information, and that is that DataFrames are actually Datasets. So this whole time we've been talking about this thing called the DataFrame, it's actually really a Dataset with type Row. So DataFrames are actually Datasets, they're actually the same thing. But okay, what the heck is a Dataset then? Well, a Dataset can be thought of as a typed distributed collection of data. So kind of in the same way that we think about an RDD. It's a typed distributed collection of data. However, Datasets have a little bit more than RDDs. So, the Dataset API actually unifies this DataFrame API with the RDD API. So let's think back to the last few sessions where we're looking at these very relational operations that looked very different from the operations that we learned in RDDs. Now, we can call both sets of these operations on a same Spark-distributed collection. What's even cooler is that we can mix and match these things, so we can have both functional operators and relational operators all intermingling in the same code. And finally, one other major difference between Datasets and RDDs is that, like DataFrames, Datasets require structured or semi-structured data, so your data has to have some kind of schema associated with it in order for it to be a Dataset. Another way to think of Datasets is to think of them as a compromise between RDDs and DataFrames. So, on the one hand, you get more type information with Datasets than you did on DataFrames, but on the other hand you get more optimizations on Datasets than you got on RDDs. So, Spark was able to do a few simple optimizations on RDDs like some pipelining optimizations but it couldn't do all of the cool stuff that the Catalyst optimizer could do. So, Datasets gets some of those optimizations, while having that nice flexible functional API as well. Let me tell you what I mean by mixing and matching these two APIs. So here's an example of that same computation that we're looking at a few slides back while we were trying to calculate the average price of all of the homes for sale in different zip codes. So let's assume that we have a Dataset called listingsDS, it's the same shape as it was before, it's a case class with some fields in it, but the most important field is, of course, the price in the zip code field. And the first thing we do is we can do groupByKey on it. Remember groupByKey from RDDs? And then look at this, this is super cool, we're passing a lambda to the groupByKey function. In this case, what we're doing is we're telling our DataFrame which one of our columns is going to be our key, so we pass a function to do that. So here we are, passing functions, it looks a lot like RDDs again. And the next operation is one of these aggregation operations that we became familiar with on DataFrames. So here we are, calculating the average per key grouping. And then we're telling Spark that the type of that calculation is a double. The point here is to show you that in the same line of code I can have both of these different APIs or both of these different ways of thinking about writing code. I can have this nice functional thing here, and then on the next line it can have a very relational expression. So, I can mix and match these things however I want. And the types match up because everything is a Dataset, so it all works out. So just a few more observations to give you a little bit of an intuition about what Datasets are all about. So Datasets are kind of something in the middle between DataFrames and RDDs. Importantly, a DataFrame is just a Dataset, so they're the same type, which means I have the same operations on both of them. So you can still use all the relational DataFrames operations that we learned in the previous two sessions now on Datasets, so we can just effortlessly carry over the knowledge of that API to this session now and use everything like we've used it before. In addition, Datasets add more typed operations that can also be used. So these typed operations include, actually relational operations, so we can have both typed and untyped relational operations. And then of course we have these functional transformations which are all typed as well. So now we have this higher-order functions like map, flatMap, and filter again, which we didn't really have on plain data frames before, they were mixed together with data sets. Datasets are a good choice of abstraction when you want to mix and match functional relational transformations while still benefitting from some of the optimizations that we saw in DataFrames. And of course, we can't forget that it's got almost a type safe API as well, so it's not completely, perfectly type safe but it's pretty type safe, let's just say. Okay, so let's get right into how to create these things, so since Spark 2.0, Creating Datasets is super easy. You can create datasets from data frame, all you've gotta do is call toDS on your dataframe, which is a convenience method that creates a new dataset from a data frame. Though note in order to use these toDS convenience methods you need to import this thing called spark implicits. Another way to create a Dataset is to do it using these operations for reading in structure, to semistructure data from file. So if you remember from the last session when we were using these to create DataFrames, now if you provide some type information. So in this case we say, okay, this json file here is full of Person instances. And if we define a case class whose structure and names and types on that with the structure and scheme on this people.json file. Then this Dataset will be read into memory from file and will be perfectly typed. Until now we've always had to do some kind of parsing or some kind of casting, types in order to get the Datasets into memory or with the right types. So this suddenly makes things a lot easier, we can also create a dataset from an RDD by using the same convenience method. And the same even goes from common Scala types, we can just call toDS on this list, for example. And there you go we have a new dataset of type string, before I can dig in to some of the operations on DataFrames, I've gotta remind you about these Column things here. So recall Columns from the DataFrame sessions, we're always using these Column things, and there was this syntax for using them. In Datasets, because everything is now typed, we now have the notion of typed Columns. So if you try to use the same old syntax that we've been using in the previous sessions, you'll get a type error saying nope, this is a column and I was expecting a typed column and you might ask. Well okay, how do I create a typed column then? Well it's pretty easy you just have to use this as method. With the type that you're column contains and then this creates a new typed column. This is going to be important throughout this session because many methods will expect typed columns. Okay, so let's get right into the transformation on datasets. So remember how I kept on reminding you that transformation in DataFrames are called untyped transformations? Well that's because they didn't send API introduces a whole hast of typed transformations as well. So you have also typed variance of some of the untyped transformations as well. So, on the one hand we have these untyped transformation which are the same ones that we learned in the data frame session, and on the hand we have typed transformation. So like I said, these are typed variants of many the DataFrame transformations, and then we also have additional transformations like these high order functions on RDDs that we like so much. So when you look at the data sets API you're going to find transformations grouped into these two categories and it's going to be very important. Because if you accidentally use it on untyped transformation, then your nice dataset can lose all of its type transformations. You're going to have to keep these two things straight in your mind Because everything is a data set, because both frames and data sets are data sets, these APIs are seamlessly integrated. That means you can call a map actually on a DataFrame even though I didn't show that to you in a previous session. But when you do call a map on a DataFrame you end up getting back a DataSet for example. And of course this can be problematic because if you have no type information, how do you suddenly create type information? So here's an example down here below. Let's see if we have a data frame of pairs here, and now, I want to do a math bonnet, and I want to increment all of the digits in the keys here in these key value pairs. But again, I had this ugly thing going on here where I have rose And I have to cast each element to them if I want to do something with them. So in this case, in order to add 1 to the first digit here in the pairs, I've got to do this ugly cast and then add 1. It's really not clear what I'm doing. So while these two APIs are seamlessly integrated, you can accidentally do things like this and end up with no type information at all when at one point you did have type information. So again, you got to be careful about that. And also, it's important to note that not every operation that you know from RDDs are available on datasets. And even operations that are available on datasets that you've seen on RDDs, they don't always look 100% the same on datasets as they did in RDDs. So you can't just, without looking at the API docs start programming RDD's on top of dataset. We're going to get lots of compiler, so just keep that in mind. So let's go through some of the common typed transformations on datasets. All of these look pretty familiar, don't they? So we have a map operation which takes the function from TDU and returns a new dataset. Just like all the other maps that we're used to the same is true for flatmap, the same is also true for filters. A filter takes a predicate and it returns a dataset out, distinct is also the same, distinct returns a new dataset with all the duplicates removed. So these all transformations because they return datasets and they are all typed because the key just type the information around, so I hope these familiar operations make you comfortable. Because already with groupByKey, things start getting a little weird again. So coalesce and repartition look the same as what we're used to, but groupByKey gets a little weird. Because it has this return type called KeyValueGroupedDataset, which is a little strange. How is this a transformation if it doesn't return a dataset, right? Well we'll see in moment they're ultimately grouped by key in two steps will return a dataset. But we're going to see right now that ultimately groupByKey does return a new dataset It just does so with an extra step in the middle. So let's look at grouping operations on datasets now, so if you remember in the last session on DataFrames we also had this weird two step process. We had this special set of aggregation operations that were meant to be used after we called, in the case of DataFrames, after we called group by, just regular groupBy. But on Datasets we call groupByKey Again we have a special type called aggregation operations that can be used on the result of the groupByKey that was called on Dataset. So it's the same two step thing that we saw on the last session, in the case of groupByKey we return a different type called KeyValueGroupedDataset. You're going to have to remember the name of this if you want to look up some of the aggregation operations that you can use. And in this key value group dataset thing there are a number of aggregation operations defines which return datasets. Okay, so how do we actually group and aggregate on datasets? Well, you call groupByKey on the dataset you get back one of these things, and then you just call a method on their result of groupByKey that you've gotten out of this KeyValueGroupDataset thing here, and it returns a dataset. And that's how you do a group and then aggregate on the dataset. So we'll come back to a by key a little bit later, but let's first talk about some more. So let's talk about this KeyValueGroupDataset thing here. Let's talk about the aggregation operations that are available to us on that thing after we do a group by key, there are several operations, I'm going to list just a couple. One is called reduceGroups. So what this thing does here, it's basically a reduce function, but it does a reduce on the elements of each group. So after the group by key was done, we could look at the individual elements that represent the collection of values that have been grouped, and then we do a reduce on that. So that's reduced groups, and that returns a Dataset when it's done. The next thing is called Aggregate, which is this kind of mysterious general aggregation function, that we saw in the previous session on data frames. And this can be pretty general, this aggregation can be pretty general. Also note that it takes a parameter, a typed column rather than a regular column. So this is basically a typed variant of this general aggregation method that we saw on previous session on data frames. But if you remember, there was also an API on this thing too. Let's have a look at that more closely. So just like in DataFrames, there was this general aggregation function. And it has this mysterious typed column argument, which I don't know what to do with. If I pass the typed column to it, what does it aggregate? Usually, we want to calculate an average, or a mean, or a center deviation of something. Well, typically if we go to this function object that's defined in this Spark SQL package and we select one of dozens of different aggregation optimizations, there are many numerical and statistical aggregation operations, there's things like max and min. There's really dozens of different choices for aggregation operations that you can choose out of this function things here. And then all you do, in this case, let's say we've chosen one called average. So we pass a column name to average and then we pass this average of the column to the aggregate function. And that's how we get this aggregate where we compute the average in a certain column. Oops, the same thing happened that happened earlier in this session. We forgot that this was supposed to be a typed column, so we can't just pass the column name to the average function here. We have to use this as method to convert our untyped regular column into a TypedColumn. So okay we just say that this is double, the parentheses are in the wrong location here. Okay, we do this. And everything is all better now. This average function here which returns a regular untyped column now has been converted into a typed column. And so the column that we're looking at is now a typed column, and we can compute the typed average on that column. So again we keep the type information around. So let's go back to this KeyValueGroupedDataset thing again. Well normally, the operations on these group data sets are aggregations. There are a couple of not really aggregation methods that are also very useful. So one is called mapGroups, which is very similar to the reduce groups that we saw earlier. And what this method does is that it applies the map function that you pass to it to each group of data. So you could imagine going to the collection of the values. And here what map groups does is it basically there's a map on that collection of values, and it returns a new dataset as a result. And we have the same thing for flat maps, so exactly the same idea, just a flattened version. But okay I just went through a list of transformations and there was one super ridiculously important transformation that we learned about on RDDs, and it was nowhere to be found. So what happened to reduceByKey? Well the short answer is that datasets don't have a reduceByKey method. But we can do a little challenge right now. I just with a number of operations on datasets, I also showed you these grouping operations where you first do things as a group by key and then you have this aggregation operations that you do on the result of the group by key, so we went through all these things. And I argue that with some of that information, it should be pretty easy to emulate the semantics of a reduceByKey transformation on a specific dataset. Maybe this isn't a complete general reduceByKey, but let's say, given this dataset here which is a number of pairs, so we have pairs of type int string, so we have these weird little string snippets and a bunch of integers here is the keys. And your challenge is to try emulate this call here. So, you pretended that these pairs were in the pair RDD, and then you call reduceByKey on them, and you passed this function here, my challenge to you is to compute the same large query result using some of the operations that I've just showed you, but to do it instead on a dataset. I really want you to try this on your own so I'm going to give you a moment to try it yourself. Of course there are several possible solutions but this is the one I've chosen right now. So remember that now, this keyValues thing has type, Dataset [(Int, String)]. And now we call groupByKey on it. And this function that we pass to groupByKey basically tells groupByKey which column or attribute in the dataset should be viewed as the key. So since the elements of this dataset are a pair, and we know that we want to use the number here which appears to be some kind of ID as the key. What we do is, we select the first element of the pair here. And so what this does is it does groupByKey on the integers here. And remember, now we have back one of these strange types, these KeyValue grouped dataset things, right? So it's not a dataset what I got back, and I've go to do something else on it to make it a dataset. So I'm going to use the mapGroup function here, and if you're a call, what this does is, it takes a function from a key and a value to some other type. So this k is an integer, and this vs here is a collection of strings that represent the groups of the values. So what I'm going to do because on an RDD, reduceByKey, when having elements like this, it first groups by the indices and then it reduces on the values. And if you remember, our reduced function and reduceByKey was a simple concatenation, so we were concatenating all these pieces of strings together. So I'd like to do the same thing in this mapGroups function. So I again return a key value pair where I keep the key as it is. And then I just take this collection of values and I do a fold on it to concatenate all the strings together. So that's how conceptually, with this specific dataset, I can basically emulate what a reduceByKey operation does. And just like on data frames, if I call show on the results, I get now a visualized table here. So here it is, these are my keys and these are my values, and look, there's something interesting happening here. Suddenly, is jumbled stuff makes a little bit of sense. Let's sort that actually another time to make this pit into a better order. So if we sort the records now by ID number where we put 1, 2, and 3 in the right order, then we have a cute little, this is a secret message actually from all of these pieces. So that's how we know we did it right. That's the intended result. So while that might have worked, there's actually a problem with this approach. The only issue is that the API doc gives us a big hint of the danger of using this mapGroups function. So mapGroups does not support partial aggregation, and as a result requires shuffling all of the data in the dataset which is of course extremely expensive and we don't want to do that, right? So if an application intends to perform an aggregation over each key, which is exactly what we're doing, it's actually best to use something else. It's best to use the reduce function or to use this thing called an aggregator. Okay, so mental note. Don't use these mapGroup things unless you really have to. But okay, let's follow the advice of the docs. Let's try doing it with the reduce function. How would you do that? Remember there was a reduced function that we saw in the key value grouped dataset class. In fact, it's called reduced groups. So conceptually, it's a little bit similar to mapGroups because it's focusing on the group to values and instead this is doing a reduce on the group of values. So all we gotta do with the result of our groupByKey is called mapValues. Because reduceGroups has to work on individual strings and not pairs. So mapValues basically goes and takes out all of the strings here and passes them down the line to reduceGroups. Which then concatenates all of those strings. And voila, that works. I could print out the Dataset and it would be exactly the same thing. But the docs also suggested this weird aggregator thing here, what is that thing? So, an aggregator is a class that helps you generically aggregate data. So, you'll notice that some of the aggregation functions that you'll be able to chose from are very specialized. And you don't have the ability to create your own aggregation function. So this aggregation class here is supposed to let you do that. And actually if you think about it, it's kind of like the aggregate method that we saw in RDDs. Because it will give you a couple of functions that will look just like the two functions and the zero that we would pass to the aggregate method on RDDs. First things first, you've gotta know where this thing is. It's inside of spark.sql.expressions, so you're going to have to import this aggregator then to use it. Okay, but what is this aggregator class? The first thing you might notice is that it has three types. A type IN, a type BUF and a type OUT. So if you think about this aggregate method that we saw before in, on RDDs it allowed us to change the types. And to do things in parallel. So we broke up our computation into a couple of functions so it was possible to compute some pieces in parallel and to combine them all later. So this is exactly what this class aggregator is going to do. And in this case, the IN type is actually the input type of the aggregator. So this is would be the type that we got out of the group by key function that we saw earlier. BUF is the intermediate type during aggregation. So if we have to change the type to something else in the middle of this computation, we have this BUF type that we can change it to. And finally, OUT is the type that comes out of the aggregation if we want to change the type. And all of these types can be different. And this is what an aggregator looks like. So what we gotta do is create a new aggregator here. We've gotta define what these types are and then we have to implement these peculiar methods here. So this should be pretty clear what it does. This is the initial value of the aggregation. So this is like the zero that we pass to fold left for example. So this is the initial value. Now reduce and merge, these are like the two functions that we passed to the aggregate method on RDD. Where the reduced method adds an element to the running total and the merge method is what merges these independently computed aggregations together. And finally, this finish method basically gives a chance to change the type one more time before we return the value of the aggregation. So despite how scary it looks, it's actually not so hard once you get the hang of it. Once you get the hang of this structure and you look and you look at things in terms of input, buffer and output types. And you just keep this schema in your mind. So let's try to emulate that reduceByKey on an earlier slide with one of these aggregators. Let's create our own aggregator. So again we assume the same keyValues data set here and we start by looking at the types in these things. We can break this into steps. The first step is determining what these three parameter types should be to the aggregator. Remember, input, buffer, and output. So, what's the input type? Well, remember, the input type is what came out of the group ByKey method. So, in this case, it would be a pair of int string. So We know that this is going to be a pair of int string. And we that the output type is going to be a string. Since these are pretty simple data types, the buffer value can also be a string. Because we're just going to be concatenating strings together inside of this aggregator. Okay, so first step is complete. Our input type is pair Int, String and then our buffer type is string and output type is string, great. Now we have to feel in the rest of the types. That's completely easy to do, if you go back two slides. And here we have a template of where we can plug in these different types. So, actually everything is going to be a string because BUF and OUT are all strings. And the only thing that's not a string is this input type which is a pair. So we just mechanically plug these types in. Now we have to figure out what these methods should actually each do. The zero or the initial value is pretty straight forward. We're going to be concatenating and building up a strings, so this is definitely going to be an empty string. Okay, cool. So the next method that we have to figure out what to do with is this reduce method here. And actually these types that we've already plugged in give it away. So this is a function where we have to somehow take the string out of this pair and concatenate it with this other string here. So that's pretty straightforward. We just take B, we pull out the string out of A and we concatenate them together. Now this is a string. And this merge function, since we already have a string we just want to merge together two strings. So, we just do another set of string concatenations. Again, we need to return a string. We just return r without doing anything to it and that's it, there's our aggregator. The last thing that we do is we convert it to a column. So there is this toColumn method here. Because remember, if we're going to parse it to an aggregation method it needs to be of type column. So here we go we parse our aggregator which we named strConcat. We parse it to this aggregation function. And of course we make it a typed column because this is just a regular column, we do as string because the result is of type string. And that's it, voila, we have our aggregator. Oops, expect we don't, we got a compile error. So there was something missing. It might be hard to read but what it says here is that there are two unimplemented members. One called bufferEncoder and one called outputEncoder and they're of type encoder. Okay, what's an encoder? Well, believe it or not we heard about these in passing during the data frame session. Encoders are the things that convert your data between these JVM objects that you're used to operating on and this highly specialized internal tabular representation that Spark SQL operates on. And I mentioned this at the very beginning of the session but these encoder things, these things are required by datasets. So every data set has encoders to go along with it. And these encoder things are extremely specialized, optimized code generators that generate custom bytecode for serialization and deserialization of your data. And they do it all in the special internal tabular representation that we heard about. And this special representation is Sparks internal Tungsten's binary format. Which allows these operations to happen on already serialized data. Which actually greatly improves the memory utilization. So we can put more stuff in memory and we can not have to unpack so much of it basically. Well, that's the context for these things. But since these encoder things are all about serialization you might ask well why isn't Java or Kryo good enough? Well, for a couple of reasons. It starts with the fact that we have a very limited set of data types, which we have encoders define for us. So all the primitives and things that are case classes, all of those Spark SQL core data types. These things that we already understand super, super well we can make sure that there's always a super optimal encoder available for these Core Data types. So that gives us an optimization opportunity. But perhaps most importantly, is the fact that these encoder things actually contain the schema information. So they're not just serializers and deserializers. There's serializers and deserializers that make all of their decisions based on this schema information. So they know in advance, the structure of the data that they are serializing and deserializing. This thing here basically is what enables Tungsten to do all the optimization that it does. Since Spark only understands the structure of the data that's stored in it's data sets. It could actually create a more optimal layered memory when caching these data sets. So this is kind of the magic that Tungsten does. So this is where we get this great memory utilization from. And that goes without saying that it uses significantly less memory than Kryo or Java. Because we have this extremely compact special format that we're keeping this stuff in in memory, which is a lot smaller than Kryo or Java. And it's actually even ten times faster than Kryo, which is one of the fastest industrial serializers that are around. So these encoder things all tie back to this tungsten component that does all of these optimizations and memory. And with the representation of the data that we're operating on. And I told you that there's an encoder for every dataset. So every Dataset has encoders for its types, but we've been creating Datasets all over the place. And we've never seen these encoder things before. Well that's because the most often used way to introduce an encoder is to do it automatically via implicits from this SparkSession object. But as we saw, well sometimes we run into the situation where we're required to explicitly pass an encoder to something. And for that Spark SQL actually has an object called encoders in the SQL package. Which contains a big selection of methods for creating encoders from Scala primitive types and products and things like that. So here are three kinds of encoders, so there's encoders for INT, LONG, STRING, etc, for primitives that are knowable,right? Which isn't the case for all of Scala's primitives. And then there are methods that create encoders for the Scala primitives. So, scalaInt, sclaLong, scalaByte, etc, the list goes on. And finally, there are methods that create product or tuple encoders, because sometimes we're building up complex types of A case classes. So, here are some examples of how one might go about creating an encoder. So you just call this the name of this package object. Then you invoke this method, for example, scalaInt, and you get back an encoder of type integer, the scala integer. And the same goes for string or for product, for example, as well, if we have a case class type like person here. So that was a quick introduction to encoders. So let's finish up the implementation of this aggregator. All we gotta do is choose the types for these encoders and then look up the right methods to use in the encoder's object. Well, that's pretty simple, because BUF and OUT are both strings, as we saw before. So all we need is to get the string encoders out of this encoder's object and voila, that's it. So I've implemented my encoders. So now all I gotta do is take this custom strConcat Aggregator column thing here, I've gotta make it a typed column. And then I gotta pass it to this aggregator method that we saw before. And that's it, here's the result, just as we expected. So that was a whirlwind tour of actually some of the more advanced ways one could go about grouping and aggregating data. So now you know how to make your own custom aggregators, in case you don't find an aggregator that you need in the Spark API. So I hope this slide is refreshing, so we're changing directions a little bit. We saw transformations and then grouping and aggregating on Datasets. Now let's look at actions on Datasets. And you'll be very happy to notice that these actions are all exactly the same as the actions that we've seen on RDDs and on DataFrames. There's nothing different about them, in fact. Like DataFrames, we have a show method, and all of the other methods like for each or count or collect, they're all the same. So I hope this is reassuring, something super familiar after digging into those aggregator things a few slides back. Everything is as expected with actions, so that's an introduction to Datasets. So the general gist is that they have the same operations available to them that DataFrames have available to them in addition to more of these higher-order functions like math, flat math, and that we liked from RDDs. And, of course, they're also mostly typesafe which is really nice if you've ever had to wrangle with a bunch of exceptions that were thrown by the annualizer when having to deal with DataFrames being untyped. So, at this point I've given you three APIs, three big main different APIs. One is called Datasets, another one is called DataFrames, and then we spent a bunch of time on these things called RDDs. And so, your head must be spinning right now. And you must be like, well, when on Earth would I use one or the other, or the other one of these things? So, I've prepared a simple criteria for when you might be interested in choosing one abstraction over another. So the first thing to remember is that RDDs are really the core abstraction that everything else is built on top of. So this is becoming almost like byte code to Spark, because they're building all of these richer abstractions on top of these RDD things, and they're generating RDDs. So Datasets and DataFrames, we go through these optimizers, and in the end, we have RDDs that we're actually running. So, we write code in Datasets, and then again, what Spark is running is an RDD, right? So you can think of RDDs as a little bit more low level and totally free form. It's up to you to do your own optimizations on them. And we learned that the hard way in several sessions earlier in the course. That it's super hard, sometimes, to know exactly what these things are doing under the covers. So RDDs are a good choice in the situation that you have data that won't fit for some reason into a Dataset or a DataFrame. Or you have to super fine tune and manage low level details of your RDD computations. You can't just trust the optimizers to do it for you. Or finally, if you have some kind of complex data type that cannot be represented or serialized with these encoder things, which is a real possibility. Encoders can't yet nicely deal with user defined data types. So these are situations when an RDD might be necessary. Otherwise, you may often want to reach for a Dataset or a DataFrames, especially, if you're going to read in some kind of structured data format like JSON or XML. You can just use that wonderful read operation to read in a JSON file and map it to a case class. Datasets do that really well for you. So this is a situation where kind of getting data in suddenly becomes really easy with things like Datasets and even DataFrames. So when you have structured and semi-structured data, it's often really nice to go with a DataFrame or Dataset. DataFrames are generally a good choice when you want to rely as much as possible on these optimization components that Spark provides. If you really want to just write code, not think about what it does, and have something else optimize it for you, and generally trust that it's going to be okay. DataFrames are a good choice, if you can deal with the fact that they're not typed. But, like I said in the very beginning of this session, Datasets are a really nice compromise. Because maybe they're not as performant as these DataFrames here. But they still have good performance, they still have some optimization done on them. So Datasets are a good choice when you want type safety, when you want to work with functional APIs. You have structured / semi-structured data, and good performance is good enough, it doesn't have to be the best. So it's up to you to decide which one. If you prefer based on the data that you have and the sort of computation that you want to do. Perhaps it easier to do it with sort of functional APIs and types. Or maybe it's better to do it totally with relational operations, it really depends on the problem that you're trying to solve. But this is a good set of criteria to help you make that decision. So, before I wrap up the session on Datasets, I wanted to just run through some of the limitations of Datasets that might not have been completely obvious. So, one thing that I eluded to but didn't quite explain is that Datasets don't get all of the optimization the DataFrames get. And they gave you some hints throughout the course about why that's the case. I talked many times about structure and how giving more structure helps optimization. To be a little bit more concrete, relational operations tend to be more performant and able to be easily optimized by the Catalyst core optimizer Because with information about the structure of the data, and the structure of the computations, then the catalyst optimizer knows it can access only the fields that are involved and say, for example, a filter. So if we focus on a filter that look like this, because we have the structure of the data, we have the structure of the computation, we can see into everything. We can see the data. We can see the computation. Then Spark's optimizer can know that it only has to access certain fields involved in that filter and skip over lots of other ones without actually having to instantiate the entire data type. So that's why Catalyst is good at optimizing in this case. Catalyst actually cannot optimize functional filter operations or functional operations that include passing around function literals or lambdas. And the reason why is because when you pass a function to Spark, all Spark can know when you passed it this function BLOB is that you need an entire record serialized, so an entire object serialized, and you need to apply this function to it you don't what's in it. So you need to create an object and then apply this function to this thing and you don't know what's inside either of these two things which requires Spark to do potentially more work in it than it has to in order to do the baseline minimum that it should have to do, because Spark just can't see into lambda functions. It has no idea what they're doing. So Catalyst can't do very much for operations that involve passing around these function literals. And of course you don't miss out on every possible optimization. You just miss out on some important ones. So the key take away here is that, when you use datasets with high order functions like map, you end up missing out on many optimizations that Catalyst does for data frames. So that's something to keep in mind. This is why sometimes datasets are slower than data frames. However, it doesn't mean that every operation on a data frame doesn't get these wonderful optimizations from Catalyst. Or if you choose relational operations like select, or the relational filters, or any of these operations then you get all of Catalyst's optimizations on datasets. So it really depends on the operations that you're doing on datasets, whether or not you get the full sort of suite of optimizations that are available. Finally, that's not to say that datasets that use things like map operations don't get any optimizations. They still have Tungsten always running under the hood, storing and organizing data in highly efficient and optimized ways, which of course can still result in large speedups over regular RDDs, so that's not nothing. So you basically just lose out a little bit out on Catalyst if you use high-order functions, but you still get all the benefits of Tungsten. So I hope that gives you just a little bit of intuition about the capabilities that Spark has to optimize operations on datasets. And the two last limitations which are also shared by data frames is the fact that the number of data types are limited, so you end up having to express all of your data with case classes, and these standard Spark SQL data types, these primitive data types. If you can't do that, then it might be difficult to ensure that there is a Tungsten encoder that exists for you. So, this could be also be a problem. You can't just put any data type ever into Spark. And finally there is that point about semi-structured and structured data. Sometimes there are just datasets that are completely unstructured, they do have no schema, they're not self describing, there is no way to sort of programmatically guess the structure of these datasets. And in those cases datasets might not be a good thing because it might be really difficult to shuffle that data around and put it into some format that's comfortable for datasets to deal with. It might be easier to just parse the bits and pieces that you need in with regular Spark RDDs.