So let's introduce this document pipelines using little sample dataset. So this dataset is called HMP dataset and it's basically SLR meter, recordings with SLR meter sensor attached to the human body and the humans conducted different tasks like brushing teeth or climbing stairs, or combing hair, and descends stairs, and so on. So you can see all those recordings in those different folders here and each folder contains a file which is actually a data file. So the CSV because it's separated by a space but it doesn't matter, we will take care of that later and you see here the date of the recording and again here the activity and f1 stands far female one and m1 stands for male one. So those individuals have been anonymized but the know the gender. So the idea is, we will clone this dataset and then basically we are up with it. So the first thing is, you have to make sure that we have an Apache Spark service here associated with the project and then we go to Assets and we create a new notebook. So we click on "New Notebook" and we give it a name, so we give it a name test, because then we definitely don't know what it was one day later, so we create a notebook but we need to make sure that we are using the Spark services here. Otherwise there's no Spark service attached to a notebook. So we create a notebook and first of all, we execute a command line command, so that's a git clone and then this git URL. Now, we should have a folder called HMP dataset which is true and let's have a look what's inside. Okay, that's fine. So let's actually recursively traverse through those folders and create Apache Spark data frames from those files and then we just union all data frames, then we have one of all data frame containing all the data. So let's actually do that. So first of all, we need to run some imports and now we defined types of the data as you remember. Those files are basically containing three columns; x, y and z for SLR metadata. So we just defined this here and now we traverse through all those folders. So the import OS for operating system and then we say os.list.dir and folder is HMP data, so without underscore. So you see here, you have no older folders and we notice that we have some entries which you don't like, so let's get rid of those not containing underscore and that should be fine. So let's called this file list and then you say file list filtered equals then we say s for s in file_list if underscore and s. Okay. We have to rerun this one and then this one, let's see what's inside this. Okay. Looks nice so we have all the folders containing data in one array. That's pretty nice, now we can iterate over this array. First of all, we define an empty data frame, where we later appended data to, and let's call those interest categories. So for category in file list filtered so then we traverse all categories and now we traverse all through the files in each category or in each folder. So data files equals OS list, add dir so that's HMP dataset but now we append category and then we iterate over those folders. So for data file in data files, it's printed first that we know where we are. So then we create a temporary data frame using spark.read and then we have a couple of options. So first of all, we set header to false because there's no head and file. Then we set delimiter to one space and then we redefined using the schema which is defined above and above. As you remember, you have defined schema here of three fields, x, y, z of integer type. Okay? So this means we are here passing the schema a parameter, and here the category which is the folder containing the data files and data files support. For each file in each folder we are creating a temporary data frame. So now we want to add the source file and the category to the data frame as well because at the moment, we have only a SLR meta values and we have no clue where they are coming from. So in order to add strings or an illiterate to an Apache Spark data frame, we have to import that lit function here, you'll see later how we are actually using it. So that means temp_df equals temp_df and then we use it withColumns function for Apache Spark to append one and we append class. So that's the folder name where the this data came from and then it's basically a literal and a literal is our category. We do the same for the source file. So we call it source, and of course we have to use the data file, so we actually have a clue where this data came from. Now, we have to do one thing. So if a data frame is none, that means it's the first iteration, we set data frame to tempt_df and otherwise this just say, data frame equals df.union.temp_df. So union basically appends the data vertically. That's all. Let's run this and see if it works. So we see here it's iterating through all the files, climb stairs and afterwords, get up bad, and so on, and so on. At the end of the day, once this process is finished, which we see if the star disappears then we should see the data frame already created. So let's wait a bit. Usually, I create such a notebook and I called it ETL, ETL for Extract Transform Load. So that's a notebook only doing this and usually I store this data framed into IBM objects store in the Cloud but we skip that here. So let's have a look at this data frame so as Spark job get created, we see here, we have our first 20 rows of data frame created and it's exactly as expected x, y, z are the values Climb_stairs is the class and this is the source file. Okay. So the next step we want to do is we actually want to transform our data. This means we now want to create an in-texture representation of the class because machine learning algorithm cannot really cope with a string, so we will transform the class to a number of Integers and this is called a StringIndexer. So indexer equals StringIndexer and we specify inputCol class and the outputCol will be classIndex, we'll see later how it looks like, so we say index equals indexer.fit(df).transfrom(df) and indexed.show(). Let's see how this looks like. Again, Apache Spark job has been created and to test two stages, one for fit one for transform most likely and you see how it is progressing. The cool thing now is this stuff now can run in parallel. So I don't care how much data I have to transform. If I need more power, I just assign additional local notes to the Apache Spark cluster in the Cloud and then done. So, you see I have created a class index, this of type double that doesn't matter at the moment. But, here you see climb stairs equals to class number three, so that's the first step. So, now we will do one-hot encoding. So, one-hot encoding is the following, let's have a look at the Wikipedia page, so one-hot encoding actually means, that you encode a number here zero, one, two, three, four, five, six, seven into a vector, where only one element is one, the rest the zero. So, zero is encoded as a one here, one is encoded as a one here, and two is encoded as a one here, and seven is encoded as a one here. So, that's a very special encoding we are using often machine learning, it's called one-hot encoding, and we can now transform a single column, in this case, containing 12 values into a one-hot encoded vector containing 12 columns and only one column has one the rest zero, okay, that's something we are doing now. So, we import this OneHotEncoder function and we create a new OneHotEncoder. So, we use class index here as input and category vector as output, and again it's encoded, a new data frame is encoder.fit indexed. So, that's the data frame from before. Transform indexed, let's see what happens. Oh you see it doesn't have a function fit. So, interestingly, the string index had a function fit. So, this is a so called estimator in Spark ML. So, pure transformer like one-hot encoder has only a transform function. And the reason is for creating string indexes, you have to pass through all the data. This is done in a fit function and basically check out what strings you have and remember yourself, what number you assign to which category string. So, that means the indexer is the estimator, that means it can remember state. So, this is a pure transformer, this means we only need to transform function. And encoded.show should then basically trigger the Spark job. Interestingly, data frames in Apache Spark are always lazy. That means if you don't read the data nothing gets executed. And here if it could show, we want to see the first 20 rows. Therefore, Apache Spark job is executing on the top 20 rows. So, you see here, we have created categoryvec and this is the one-hot encoded vector, but to you it might look a bit strange but actually that's the Apache Spark representation of sparse vector. So, it's a compressed vector that means here, this 12 means it has 12 elements and it says here at position three that is a one. So, if you would have a second one index sparse vector at position five, then it would look like the following. You 12, 3, 5 and then you have 1. Or maybe you have 1, 1 that I would have to double check. Anyway, let's continue. Next thing we are doing is, we have to transform our values X, Y, Z into vectors, because Spark ML only can work on vector objects so let's actually do that. So, let's import vectors and the vector assembler. So, vector assembler creates vectors from ordinary data types first. So, let's create a vector assembler. And this vector assembler and a constructor expects the input columns X, Y, Z which we have here, and the output column which we call here features because those are the columns we want to send to the machine learning algorithm. And again we call this features vectorized. So, this the new data frame and we say vectorassembler.transform and we use the previous data frame which is called encoded. And then let's show data. The same for show. And you can see here we now have the features, so those correspond to X, Y and Z but this here is now an Apache Spark vector object. Which is the correct one to use for machine learning. So, the first machine learning algorithm it's actually also transformer is a normalizer So, it's always a good idea to normalize your data, normalizing means basically squashing your data to a value range between for example zero and one or minus one to plus one. And in this case it's not particularly necessary because all the three features have the same value range, but anyway, it doesn't hurt and you will simply and easily get the point how to do it. So, we import a normalizer. So, we call it normalizer equals, normalizer and a constructor again, input columns is features, and output column is features normalized. And then it's normalized data for example equals normalizer.transform and the data frame from before and again we show it afterwards, show. And let's see what happens. So, again this Spark job runs and we see here features normalized and you see here this is a good example. All those values have been squashed to value range between zero and one. So, that's what we wanted. So, the whole point of this exercise is not to create a pipeline. So, there's a pipeline object and we can basically say, pipeline equals pipeline which takes in the constructor an array of pipeline stages. So, let's remember what we've done. The first stage was indexer. So, let's put it here. The second stage was one-hot encoding, so, this goes here, and the third stage was feature vectorization. And so that's the vector assembler, the cool thing is those stages they know which fields they have to read and to create. The last stage is the normalizer. And now we say model equals pipeline.fit. We can run it on the initial data frame. So, I'm calling this model because later it's a machine learning model. So, you're already used to the name here and then a prediction. Also later when we do machine learning, we call it prediction. Then we say model.transform also data frame and then this prediction is again a data frame which should give us the same result as above or what happened here. If you named this as stages, they're not happy. As stages. So, let's see what happens here. So, you see exactly the same data frame as created before in let's say individual stages. And now we've created one pipeline containing all of the stages and we can basically fit and transform in one go. And that's really handy, we will learn later why and maybe now if we want to get rid of all the columns we don't need, we do the following so let's say df train equals predictions and we just simply drop all the columns we don't need. So, X and Y, and Z, and let's drop also class for now, and source. So, we only need the normalized features and category vector. So, we can also get rid of features and as Apache Spark was lazy, this was instantly done. If you now call show on df train it gets executed and it should return only the two columns we need. One is the feature vector, and one is the target or a label. So, you had a Spark job created. That's basically it. So, we have here category vector which is the target and our normalized input features are so in Spark meta vector location. So, later we will see what we can actually do with this data, but that's exactly what we wanted to achieve, and that's basically how Apache Spark and pipeline work.