Next, let's look at dataflow windowing capabilities. This is really dataflow strength when it comes to streaming. Dataflow gives us three different types of windows; fixed, sliding and sessions. Fixed windows are those that are divided into time slices, for example, hourly, daily, monthly. Fixed time windows consist of consistent, non-overlapping intervals. Sliding windows are those that you use for computing. For example, give me five minutes worth of data and compute that every 30 minutes. Sliding time windows can overlap, for example, in a running average. Sliding windows are defined by a minimum gap duration and the timing is triggered by another element. Session windows are defined by a minimum gap duration and the timing is triggered by another element. Session windows are for situations where the communication is bursty, it might correspond to a web session. An example might be if a user comes in and uses 4-5 pages and leaves. You can capture that as a session window. Any key in your data can be used as a session key. It will have a timeout period and it will flush the window at the end of that timeout period. Here's how we can set these different types of windows in Python. In a fixed time window example, we can use the functions beam.WindowInto and window.FixedWindows with arguments 60 to get fixed window starting every 60 seconds. In the second example with a sliding time window, we use window.SlidingWindows with argument 30 and five. Here the first argument refers to the length of the window, that is 30 seconds. The second argument refers to how often new windows open, that is five seconds. Finally, we have the example of a session window. We use windows.Sessions with an argument of 10 multiplied by 60 to define a session window with time out of 10 minutes, that is 600 seconds. How does windowing work? All things being equal, this is how windowing ought to work. If there was no latency. If we had an ideal world, if everything was instantaneous then these fixed time windows would just flush at the close of the window. At the very microsecond at which it becomes 805, a five minute window terminates and flushes all of the data. This is only if there is no latency. But in the real world, latency happens. We have network delays, system backlogs, processing delays, Pub/Sub latency, etc. When do we want to close the window? Should we wait a little bit longer than 805, maybe a few more seconds? This is what we call the watermark, and dataflow keeps track of it automatically. Basically, it is going to keep track of the lag time and it is able to do this for example, if you are using the Pub/Sub connector because it knows the time of the oldest unprocessed message in Pub/Sub. Then it knows the latest message it has processed through the dataflow. It then takes this difference and that is the lag time. What dataflow is going to do is continuously compute the watermark, which is how far behind we are. Dataflow ordinarily is going to wait until the watermark it has computed has elapsed. If it is running a system lag of three or four seconds, it is going to wait four seconds before it flushes the window, because that is when it believes all of the data should have arrived for that time period. What then happens to late data? Let's say, it gets an event with a timestamp of 804, but now it is 806. It is two minutes late, one minute after the close of the window. What does it do with that data? The answer is, you get to choose that. The default is just to discard it. But you can also tell it to reprocess the window based on those late arrivals. Beams default windowing configuration tries to determine when all data has arrived based on the type of data source, and then advances the watermark past the end of the window. This default configuration does not allow late data. The default behavior is to trigger at the watermark. If you don't specify a trigger, you are actually using the trigger AfterWatermark. AfterWatermarkis an event time trigger. We could also apply any other trigger using event time. The messages timestamps are used to measure time with these triggers. But we could also add custom triggers. If the trigger is based on processing time, the actual clock, real-time is used to decide when to emit results. For instance, you could decide to emit exactly every 30 seconds, regardless of the timestamps of the messages that have arrived to the window. AfterCount is an example of a data-driven trigger. Rather than emitting results based on time, here we trigger based on the amount of data that has arrived within the window. The combination of several types of triggers opens a world of possibilities with streaming pipelines. We may emit some results early using AfterProcessingTime and then again at the watermark when data is complete, and then for the next five messages that arrive late after the watermark. Now we know different techniques to handle accumulation late arrival data. We also know that triggers are used to initiate the accumulation, and watermarks help in deciding the lag time and related corrective actions for computing accumulations. The code in this example creates a sample trigger. As you can see in the code, we are creating a sliding window of 60 seconds and it slides every five seconds. The function AfterWatermark method gives us details about when to trigger the accumulation. The code uses two options. First, early or speculative figuring, which is set to 30 seconds. Second, late for each late arriving item. The second code segment demonstrates the composite trigger. The composite trigger will get activated either after 100 elements are available for accumulation or every 60 seconds irrespective of watermark. This code segment uses a fixed window of one minutes duration. This is how the window pre-processes. This late processing works in Java and Python. When you set a trigger, you need to choose either accumulate mode or discard mode. This example shows the different behaviors caused by the intersection of windowing triggers and accumulation mode.