In this session, we are diving to a very important components of Spark, called spark SQL. Despite Spark's rising popularity, SQL has been and still is the the lingua franca for doing analytics. Over despite how widespread SQL is used, it's still a pain in the neck to connect big data processing pipelines like spark or Hadoop to an SQL database. Usually for in application, one has to choose either SQL or something like Spark or Hadoop. Wouldn't it be nice that were possible to seamlessly intermix SQL queries with regular Scala code? And even better than that, wouldn't it be nice to get all off the optimizations that we're used to in the databases community on Spark jobs as well? So it's pretty common in databases to do all kinds of very advanced optimizations. But so far we have to optimize our computer pipelines in Spark by hand. Spark SQL is a component that delivers both of these two nice things. It makes it possible to seamlessly intermix SQL and Scala, and it also optimizes Spark SQL code very aggressively kind of like using many the same techniques from the databases world. So Spark SQL has three main goals, the first of which is to support relational processing in Spark. So this that goal of intermixing relational code, declarative SQL like syntax with functionally APIs like Sparks RDD API. And this is because it's sometimes more desirable to express a computation in SQL syntax than with functional APIs and vice a versa. So, you could imagine that, for example, selecting an entire column of the database is easier to do with a single line in a SQL statement than it is to do somehow with Mac or a filter operation. The other two main goals of Spark SQL is of course, high performance like previously mentioned and support for getting data more easily into Spark. Wouldn't it be nice if we could just read in semi-structured data like JSON, for example? So Spark SQL seeks to add relational processing to Spark, bring super high performance from optimizations in the databases world, and to support reading in data from semi-structured and structured datasets. That said, what is Spark SQL? Is it like another system that we have to learn how to use? No, it's just a component of the Spark stack. So, you can think of it as a library. It's a Spark module for structured data processing or sort of doing relational queries and it's implemented as a library on top of the Spark. So you can think of it as just adding new APIs to the APIs that you already know. And you don't have to learn a new system or anything. And the three main APIs that it adds is SQL literal syntax, and a thing called DataFrames and another thing called datasets. So these are the APIs, these are the things that you see in and have access too, and on the backend it adds two new backend components. It adds something called Catalyst, which is a query optimizer. This is the component that will optimize your code. And another component called tungsten which is an off-heap serializer will set another way to system that encodes Scala objects in a very, very efficient representation off-heap away from the garbage collector. So this is the main things that Sparks SQL adds, APIs and some stuff from the backend that should speed up your code quite a bit. Just to give you a visual depiction of hows Spark relates to the rest of the whole Spark ecosystem, so we have this system called Spark and inside of it we have this RDD API, we've been operating this entire course on these things called RDDs. We've been doing functional transformations on them, they're sort of the core data structure of Spark. Spark SQL sits on top of Spark as a library. It has its own APIs, and it has this thing called the Catalyst Optimizer, and while it has it's own DataFrame API, it ultimately runs on RDDs under the covers. And on top of Spark SQL, you have everything else, like the Console, and regular User Programs, so user can write programs both using the DataFrame API and RDDs, and freely intermix them. But the bottom line here is that, we have this API that then does some optimizations and ultimately runs RDDs under the covers. So Spark SQL is a Spark component that provides a SQL-like API on top of Spark. This is neat because everything about SQL is structured. In fact, SQL stands for Structural Query Language. It has a fixed set of data types, so things like longs, integers, strings, arrays even, and has a fixed set of operations, SELECT, WHERE, GROUP, BY, etc. And for decades, both research and industry surrounding these relational databases have focused on exploiting this rigidness. This known data types, this known operations to get all kinds of performance speedups. Since knowledge of SQL databases wasn't required pre-requisite for this course, I'm just going to quickly spend a slide running through vocabulary. When thinking in SQL, data's organized into one or more tables. So here's the example of what a table might look like, there's a header, tables of columns which are named, for example, Customer_Name, or Destination, or Ticket_Price. And then there are rows, so records. And a table typically represents some kind of collection or dataset. So, you might have a table and think of it as a table of customers. So we can think of this as the CFF or SBB customer data set. So like the train dataset that we were looking at in an earlier session. This is how you think about a table. It's kind of like a collection of data. Other words that you may here used. A relation is just a table, and attributes are columns. So this Customer_Name here is an attribute. So this is just a column. Rows are also called either records or tuples. So this row here represents a record or a tuple. This record represents, for example, a ticket purchase, this is the last name of the customer, the destination of the customer and the price that the customer paid for their ticket, this is one record in our dataset representing that purchase. So these is the terminology that we are going to use. We have tables also known as relations, we have columns in our tables which are also known as attributes. And then we have rows which are also known as records or tuples. Now back to spark SQL. Spark SQL's core abstraction is known as a DataFrame. And every time we use the word DataFrame, you can imagine it as being in conceptually equivalent to a table in a relational database. So DataFrames are conceptually distributed collections of records. And these records, they have a known schema, okay?. And super important is this point here, that unlike RDDs which are also a kind of distributed collection, DataFrames require some kind of schema information about the data that it contains. DataFrames are also untyped, okay? This is an important point that's going to come back. What do I mean by untyped? Well I mean, that it doesn't have some kind of type parameter that the Scala compiler statically checks, compare that for a moment with RDDs. RDDs have a type parameter. So we can always make sure that the type that's inside an RDD makes sense. However, DataFrames have no type parameter. They can contain rules which can contain any schema which the Scala compiler does not have a lot of ability to type check. And finally, one last terminological point. Transformations on DataFrames are called untyped transformations. This terminology's going to also come back to us later. Real quick before I go any further, we gotta talk about the SparkSession object. So, if you remember this Spark context from the rest of this course, the SparkSession is basically the Spark context for everything Spark SQL. So you're going to now have to switch to using the SparkSession instead of Spark context. And to create and use one of these SparkSession objects is still pretty straightforward. You just import the SparkSession object and there's this builder pattern here. You can add custom configurations that you'd like, but in general, you have to start with a builder. You have to name your app, and then you have to say .getOrCreate() so you can have any number of configuration options in the middle here. But this is the general shape, this is kind of the minimum that you need to start a new spark session. Another important thing to notice this SparkSession object I believe is meant to supersede Spark contacts down the line, so this is something that you should start looking into. So let's get into, how do we create a DataFrame? There are two ways to create a DataFrame. Either from an existing RDD, or by reading in some kind of specific data source from file. So, reading in some kind of common structured or semi-structured format like JSON. And for creating a DataFrame from an existing RDD, we can do it via two ways. Either by inferring the schema, or by explicitly providing the schema that should be contained inside of our DataFrame. So the first way to create a DataFrame that I'm going to cover is to create one from an existing RDD by reflectively inferring this schema, okay? This is probably the most convenient way to create DataFrames. So assuming that you have some kind of RDD,let's just say, in this case, it's a tulpeRDD. To turn our tulpeRDD into a DataFrame, all we have to do is call ToDF on it. We can pass arguments to toDF which represent the names of the columns. If we don't pass this list of column names to toDF when we call it then Spark will assign numbers as attributes or column names to your DataFrame will be usually something like this, _1 or _2 or _3 for the number of the column. However, if you have an RDD of case class, things are even more convenient. So assuming we have this case class Person, and we have an RDD of Person, all we have to do is call toDF on this RDD of Person objects. And in this case, the column names automatically get inferred to be the names of the fields of these case class. And again, this is done only of every reflection. The other way of creating DataFrame from an existing RDD which is quite a bit involved is to explicitly specify a schema. And this is useful for the case where you don't have some kind of predefined case class that you can use as its schema. Like I said, this is a bit involved, so bear with me here. It takes three steps. First you create an RDD of rows from the original RDD. Then you create a schema separately that matches the structure of rows. Then finally, you apply that schema to your RDD of rows using this method called createDataFrame. So, let's step through a quick example. And like I said, this is a little bit involved, so please bear with me. For the sake of this example, I'm just going to stick with a case class even though I know it's possible to use the two DF method. Just stick with me here. So assuming I have an RDD full of these Person objects. I first have to do encode the schema in the string, so I have to do this manually. Then I have to build up a schema using these StructType things and StructField things that matches the shape of the data that I'm trying to build a schema for. So this is done by breaking up this string and then mapping on it and creating a StructField for each part of the schema which has the name in the string here as the parameter. It has a type called StringType which we'll see later. And nullable is set to true. Like I said, we'll see some of these things later. And then all of that goes inside of something called StructType and now we have a schema. So this is what a schema looks like. We build up the schema from these things called StructTypes, StructFields. Then we convert our RDD of people to a rowRDD. Then we convert our peopleRDD into a rowRDD by breaking it up and mapping everything to Row objects. And finally, we can use this createDataFrame method on the SparkSession object and pass to it the rowRDD that we just created, as well as the schema that we just created. So that's how you manually, explicitly build up a schema and then create a DataFrame from that schema. It's a bit involved but sometimes necessary. And finally, likely the most convenient way to create a DataFrame is by reading in data from a source file. So using the same SparkSession object, you can read in a number of semi-structured or structured data types using the read method. So, there's this read method on SparkSession object and then you can read in, you can specify what you'd like to read in, for example, JSON, and then specify a path to this JSON file. It's kind of like read text file but it's better. It all partially semi-structured and structured data and it will build up a DataFrame with that schema of that semi-structured or structured datas. And of course, it's important to know that there are a handful of formats that Spark SQL can just automatically read in for you, can't read arbitrarily anything but the most common are probably going to be JSON, CSV, and Parquet. There are more methods available for directly reading in semi-structured and structured data. To see that entire list you can visit the docs which are here in the slides. Now that you know how to make DataFrames, once you have a DataFrame, you can freely write familiar SQL syntax to operate on your DataFrame. So how do we use plain old SQL syntax on a DataFrame? Assuming that we have a DataFrame called peopleDF, all we have to do is register our DataFrame as a temporary SQL view. So there's a method called createOrReplaceTemView here, but you can call on your DataFrame. This gives you a name that you can refer to in your SQL FROM statement. So in order to use SQL syntax, you need to register your DataFrame as a table. And then, you can operate on it and regular SQL syntax. So, now that I had this temporary SQL view register called people, I can freely use it inside of a SQL query. So, to do a SQL query in Spark, once you have a DataFrame called the SQL method on the SparkSession object and pass to it a SQL query which refers to a registered temporary SQL view. So in this case, people. So in this crew, we select all rows from this people table where the age field is greater than seventeen, for example. It's quite convenient, isn't it? Now, since you know you’re writing SQL queries on top of Spark which is not a SQL database, you might ask what SQL statements you have available to you. Well, the SQL statements that are available to you are largely what's available in the Hive Query Language. So, this includes standard SQL statements such as SELECT, FROM, WHERE, COUNT, HAVING, GROUP BY, ORDER BY, SORT BY, and of course all of the JOINS. And also importantly, you can do subqueries as well in this syntax. Most everything from HiveQL is supported so you can usually just try it out and see if it works. But if you want to collect more information about what syntax may or may not be supported, I've included the number of links for you here in the slides. There's a cheat sheet for HiveQL, a list of supported Sparks SQL syntax and an updated list of supported high features in Spark SQL on this Sparks official documentation. So let's doing more interesting SQL query because so far we've just created DataFrames and we've done really simple query. Let's now assume we have a DataFrame representing a data set of employees. So, how do we create that data set of employees? Well, we have a case class Employee here with a number of fields, id which has type Integer, first name, last name, both type String, age type Integer and then the city that this person lives in, type String here. And to create a DataFrame, we simply create an RDD and then call toDF on it. So now we have a DataFrame for these employee records here. Now it's time for a little quiz. Let's query this data set to obtain just the IDs and the last names of the employees working in a specific city. Let's say Sydney, Australia and then, once we've obtained that, let's sort our result in order of increasing employee ID. So that means, we want the IDs and the last name's column of this employee DataFrame for only employees that work in a specific city. And then after we're done with that, we want to sort our employees in order of increasing employee ID. So I leave it to you, how would you write this SQL query? Well it's not too difficult. Let's assume that we have a table registered With the name employees. I look at that on the last slide, but assuming we had this table registered, all you have to do is say SELECT id and last name. So we select these two columns from this employees' table. And, we filter out everybody who lives in Sydney and then, we just ORDER BY the id. To make sure we understand this query, let's visualize it. So assuming that this is our dataset here, so we have this employee DataFrame. So assuming we have this dataset here, this is our employee DataFrame. And remember, we register it as a table called employees. This is what the result of the query looks like. We've obtained our result in dataset by selecting only these two columns. And then filtering out the records where city equals Sydney. So we get rid of these two people that live in New York here. And then we sort the elements by order of increasing id numbers. So in this dataset, the ids are backwards and here they're now in the right order. So the bottom line, now we know how to create DataFrame and how to do arbitrary SQL queries on top of them, pretty cool.