The second challenge that we talked about when we talked about the challenges of stream processing, was that we want to be able to do continuous processing of the data as they were coming in. In other words we wanted to build a data pipeline that worked on streaming data. In the reference architecture on GCP, what we are now looking at is the second part, the cloud data flow part. Where we are getting some data, for example a streaming source or maybe even from a bad source and we are creating a data pipeline to process this data. In this chapter we'll look at what Dataflow is, this is a quick recap of the material that's been covered in the course on server less data analysis. If you haven't taken that course, I encourage you to take it because I'm not going to be talking too much about the basics of Dataflow or in the next chapter, the basics of the query. That course covers the basics in terms and the concepts. And it terms of, but in the context of doing batch processing. So, in this chapter, I would be focused on the stream concepts that Dataflow provides. We'll talk about the challenges involving stream processing and how Dataflow helps us address this. We'll then build a stream processing pipeline for traffic data. The hands on activity that we are doing in this course, and then we'll look at how to handle late data. We'll look at concept of watermarks, triggers, accumulations and how those get affected by different choices, in terms of how you do your triggers and how you do your windows. And finally, we'll do a lab where we run a data for a pipeline on the simulated traffic data stream that's coming in to PubSub. So when you look at processing of data in a stream, if all you needed to do was to process the data element by element, that's easy, all right? Some element comes in, you carry out some processing, and then you send it out. That's very easy, if every element is independent, that's not a problem. However, if you need to combine elements, if you need to do aggregations, if you need to basically say take two elements. And because they are, both have the same key. I need to basically take them, aggregate them, compute their sum, compute their mean, compute a list of those things. Now it becomes a little bit harder, and we'll talk about why that is. Of course, the hardest is if you need to do compositing. If you have data that's coming in and they may have different keys and they need do sophisticated processing, in terms of processing data that come in from two different sources. And you need to composite them, add information to the data from one source using data computed or complications based on another source, compositing. That's even harder, but we need to be able to do aggregations and compositing on stream data. So the state of the art, if you wanted to do that until very recently was that you would write two sets of code. You would write one set of code that handled batch data, and the idea was that you were very focused on having this as accurate as possible. And then you'd build another set of code that process streaming data, data that came in. And the assumption was that because data are coming in in a stream, you can not guarantee accuracy, there is going to be latency, there is going to be data that don't show up. So in essence, you would compute something, you would compute a speculative results, you would populate your dashboard. But then, once a day, you would go back maybe six hours later, may be eight hours later, may be two days later, and you'd go back and recompute everything. And that is got and called your golden source of data, and you would save that. So you would build two pipelines, one pipeline that is low latency, but not very accurate, very speculative. And a second pipeline, that is very high latency so the data aren't available until three days later but it is accurate. And you also balance throughput, you balance fault tolerance in building these two data pipelines. So that possess state of art, you would do it twice, sometimes you would do it many times. All right, you may do your batch processing after 6 hours, after 12 hours, after 24 hours, after a week, etc. But as you can see, that's pretty wasteful. The reason that you would do this was because if you have streaming data, continuously arriving data, they're going to arrive out of order. So, all of these points that are circled are all data values that were produced at 8 o'clock. Some data comes almost immediately at 8 o'clock, some other data comes in at 8:30, and but there is some piece of data that comes in a full seven hours later. Then if you're going to be computing an aggregate, what is the total of all of the things that were produced at 8 o'clock? Any result that you compute at 9 o'clock is not going to be correct because you're going to have some data that's coming in so late. And because of this, because continuously arriving data can come out of order, can be delayed because latency is a fact of life. That's why people build two pipelines. You could take your processing and split them into processing time windows. You could say, I'm going to compute at 9 o'clock, right. I'm going to compute at 9 o'clock, all the data received between 8 and 9. I'm going to compute at 10 o'clock, all the data received between 9-10 and so on. But of course, if you do that, if any result that you compute at 10 o'clock. It's not going to contain this late event that arrived at 1 PM. And so the result that you computed at 10 o'clock is going to be incorrect. If you used fixed time windows, you're going to lose information about related events, and not just about related events that arrive later. If you recall in our diagram, we had some data that arrived almost immediately. If for example we split our data set into streams and after we split our data set into streams, we compute at 10 o'clock, all the data that arrive between 9:00 and 10:00. What happens to data that were produced at 8 o'clock that happened to arrive at 8:45? It's going to be part of the 8 o'clock to 9 o'clock fixed time window. It's not going to be included in the nine to ten window and that's wrong. Because now any sum that you say at 8 o'clock, the total of number of trades was 300,000 and odd. And now, any trade information that you got that actually arrived less than an hour late is a problem or less than a minute late, right. You can treat this as hours or minutes or seconds or however you want to think about it depending on the problem that you're thinking about. The concept is the same. So, what we want is that we want to have one pipeline, that allows you to carry out these trade-offs, get accuracy, deal with low latency, deal with high latency. What we want is a unified model for processing batch and string. And something that can run on both batch data and stream data, and that's what Apache Beam gives us. So Apache Beam is an open source product based on several internet products at Google called etc. But it's been open source, it's managed by the Apache foundation and it can run on multiple runtimes, not just on cloud data flow. Cloud data flow is an execution framework that runs Apache Beam pipelines but there are others as well. There's runtimes for Spark, there's runtimes for Flink and so on. But what we will talk about is a runtime that is dataflow. So with Apache Beam, what you have, the key concept is a concept called a Window. And the idea behind the Window is that a Window is a time-based shuffle. So in other words, you will basically compute a Window. So here's your input data that's streaming in, it's unbounded. And then you say, I want a window of 7-8, so let's say it's from the 7th minute to the 8th minute or the 8th minute to the 9th minute. And this window is going to contain the data with the timestamp that is between 8 and 9. So the key difference here is that it's not that we received it between 8 and 9, but that the timestamp of the message, it was published between 8 and 9. So, all of the data that were published between 8 and 9 is going to get shuffled in to this window wherever it occurs in the input data stream. So we are basically making a distinction between the event time and the processing time. Okay. The time at which the event occurred and the time at which we get to process this event. And the Processing Time, the time at which we get to process the event could be later. Because of latency. But we will know the Event Time and we will basically window all of our events based on that event time. And then we'll carry out the processing. So Beam supports this idea of a time-based shuffle where you are essentially taking events out of this input stream, and you're putting them into the right window to calculate your results. So dataflow is a fully-managed, autoscaling environment for Beam pipelines. So the idea is that you write Beam code, you submit it to Dataflow, and Dataflow will run this Beam code on your behalf. And it will keep up with your data stream adding new workers of necessary, removing those workers of necessary. Right, so it gives you a fully managed auto scaling execution environment. And what we're going to be concerned about is primarily about the code that we need to write, which is open source, Apache Beam code. >> When a publisher publishes a message, the message is associated with a timestamp. When a message is read from PubSub by a subscriber, that data includes that timestamp. The timestamp is the basis of all windowing primitives including watermarks, triggers, and lag monitoring of delayed messages. When performing stream processing, the timestamp helps identify an account for natural delays in the flow of data such as due to networking delays. Dataflow is able to account for and handle later [INAUDIBLE] messages and out-of-order messages by using the PubSub timestamp. By default, the timestamp is set during the publishing process and represents the real time at which the message is published to PubSub which is a system time. There are cases where you'll want to override system time with your own time stamp. You can use a custom timestamp by publishing it as a PubSub attribute, then you inform data flow using the time stamp label, setter. For example, when writing batch pipelines, the data is basically being read from our file and there's no timestamp inherently associated with it. Output methods like TextIO do not support output with the timestamp. So, to associate the messages with the timestamps, you have to create that association in the p collection. In this case, rather than use process context.output, you can use process context.output with timestamp. In the example shown, the process context instance is called c, so it shows c.outputWithTimestamp. This simulates the behavior that would occur in the pipeline if there was a timestamp associated with every message. Another example is log processing. In this case, the timestamp you want to use has already been generated when the log was written. It's the time stamp already embedded in the log record. In this case you pass the contents of the log record, extract the timestamp and use it in process context.outputWithTimestamp to associate the message with a log time stamp. The rest of the pipeline will treat the data as if the logs were being generated in real time. Later in the code, there will be a group by key and each time group by key is executed, it performs sums of counts, groups, and aggregations. Those are all carried out in the context of a single window. So the important thing to understand is that this code changes accumulative action to a windowed action. If you were to calculate one mean per key, without this code, it would be one mean per key over the entire dataset. But once this apply method is inserted into the code, the mean per key will be calculated repeatedly in context within the two minute window.