Now that we know how to create our DAGs using the Airflow Python SDK, create our Composer environment, and access the airflow web server. Let's focus on what we need to work on for building continuous training pipelines and airflow, and running them on Cloud Composer. There are six stages of the continuous training process we want to address within the context of airflow and Cloud Composer. First is data ingestion. How do you get your data into your pipeline? Then data validation. Does the data you plan to use for training still satisfy your business rules? Has there been any drift in the data statistics over time? Next, data preparation. How are you going to preprocess your data and perform feature engineering? Then, model training. How do you train your model as part of your pipeline? We have model evaluation. Should this new model be deployed? How does it compare to previous models? Finally, we have model deployment. How can you deploy your model as part of your pipeline? Let's now go through each of these steps one by one. We can move data around by using various different built-in operators. For example, we can use a BigQuery operator together with a BigQuery to Cloud Storage operator, to select the data we want to export with a SQL query, save it to a BigQuery table, and then export it to a Cloud Storage bucket. We can use other operators, such as a BigQuery Data Transfer Service, Dart transfer runs operator, and other similar operators. To move our data from other Clouds or on-premises. For example, using a JDBC driver for relational databases to Google Cloud for later steps of our pipeline. After our data is in place, we can proceed to data validation. We want to ensure that the data still meets our business rules, and that we haven't seen drift over time. Another reason we may want to validate data is that we may not want to rerun the training pipeline if we do not have any new data. In that case, we're just repeating what we've already done. We can use BigQueryCheckOperators, BigQueryIntervalCheckOperators, and BigQueryValueCheckOperators to ensure that our data meets the expected rules. In the case of a BigQueryCheckOperator, the task generated by the operator will fail if the query returns a certain result, such as zero, null, or false. Here's a concrete example of such a task. We define our SQL statement, which is named check_ sql, and then we run it as part of the BigQueryCheckOperator. If the query returns a non-zero value, the task succeeds. But if the query returns a value of zero, it fails. However, you may have noticed something weird in the SQL query. Between double curly braces is macros.ds_add. This is an example of using the Jinja template capabilities of airflow. Ds_add will add a specified number of days to the date stamp represented by ds. When we work with airflow, ds represents the date part of the execution timestamp. Now, we can be more specific about what this query is doing. We are counting the number of rows where the trip_start_timestamp is no more than 30 days old. If we do not have any fresh data within the last 30 days, we don't want to re-run our monthly pipeline. This is an easy way to build this logic into your workflow. We mentioned a couple of other check operators before. Let us also explain those. BigQueryIntervalCheckOperators check that the values of metrics given as SQL expressions are within a certain tolerance of the values from a chosen number of days before. If you're worried about data drift, these operators are especially useful. BigQueryValueCheckOperators check that the result of a query is within a certain tolerance of an expected pass value. Unlike for the interval check operator, this value is set in advance and does not change automatically over time. Note that if you can query your data using SQL, there are generic SQL versions of all three of these check operators. You don't need to load your data into BigQuery in order to use these check operators. Finally, what do we do when a check operator fails? We don't necessarily want the entire workflow to fail, but we might want to do something with that information. Here we have a PubSubPublishOperator that will publish a message to a preset pub sub topic. This message can let other systems or downstream processes know that the DAG run was canceled, due to reason like a lack of fresh data. Note that the trigger rule here is that the TriggerRule.ALL_FAILED. This task will only trigger if all upstream dependencies fail, instead of the default behavior of all upstream dependencies succeeding. We put this op directly downstream of the BigQueryCheckOperator we defined before. Next, we're ready to prepare our data for training. This stage will often involve preprocessing if it hasn't been done already, engineering new features, and creating a repeatable split of your data into training and test data sets. Let us talk about a few of the options available on Google Cloud. Any data preparation that can be done using SQL, can be done using a BigQuery operator as before. If your code is written in Hadoop or Spark, you can use Dataproc. A DataprocCreateClusterOperator will create your cluster. You can then use a DataprocSubmitJobOperator to submit your Hadoop or Spark job. Then finally use the DataprocDeleteClusterOperator to delete your cluster when the job has finished running. If your data preprocessing job is written using Apache Beam, use the DatFlowPythonOperator, or the DataFlowJavaOperator, depending on which of the SDKs you used.