Now that you know what Apache Airflow and Cloud Composer are, let's explore the core concept of using Apache Airflow to orchestrate your workflows. What is a directed acyclic graph or a DAG? A DAG is a collection of the task you want to run, represented by the nodes of the graph, organized in such a way that reflects their relationships and dependencies represented by the edges of the graph. In Airflow, we use a Python SDK to define the DAGs, the task, and dependencies as code. Every DAG has a definition, operators, and definitions of the operator relationships. In the image at the bottom of the slide, we have the first part of a DAG from a continuous training pipeline. We can see the task with their names as the nodes of the DAG and the arrows between the nodes representing the dependencies between the task. If the DAG is a representation of the task and dependencies, then what is a DAG run? A DAG run is a physical instance of that DAG containing task instances that run for a specific execution date. The execution date is the logical date and time that the DAG run and is task instances are running for. The Airflow scheduler, which is managed by composer within a kubernetes pod, will often create the DAG runs. However, they can also be created by external triggers. Note once again, the distinction between a DAG and a DAG run. A DAG can have multiple runs, even concurrently, each with different execution dates. Finally, let's talk about the task and operators that define the work being done in your workflows, testifying a unit of work in your workflow. What are these tasks? They are parametrized implementations of operators that we will discuss in the next section. Operators are usually, but not always, atomic. This means that they can work alone and they don't need to share resources with any other operators. The DAG will make sure that operators run in the correct order. Other than those dependencies, operators do generally run independently. In Airflow, you can leverage x-columns if you do need to pass information between different operators. Airflow supports three main types of operators. Operators that perform an action or tell another system to perform an action. There are transfer operators that move data from one system to another. For example, BigQuery, the Cloud Storage. Then finally sensors that will keep running until a specific criterion is met, such as a file sensor that waits until a file is present at a certain location before triggering any downstream task. With the right terminology explained, we're ready to build our DAGs. As a reminder, an Airflow DAG is defined in a Python script. These scripts have five main sections: imports, arguments, instantiation of the DAG, task definitions, and dependencies. We highly recommend that you don't include code outside of the DAG code in one of these scripts. Airflow will check these scripts once a second to look for updates. Any code outside of the DAG definition will then be run every second. Keep that in mind. First are the imports. Here we will import what we need to define our DAG. Here we will import our DAG class and variable class, which we'll see more about later. Next, we import the various operators we will use in our DAG. We will also import the trigger rule class to give us more control over how dependencies are managed. As a quick note, we see a new operator here, the dummy operator. Task create it from these operators do nothing. But these are useful when logically you need some node in your graph at a certain location. Next, we define our default and DAG specific arguments. Often you will see the default arguments defined as a dictionary. These arguments can be used when creating any task. These arguments include data like the DAG owner, start date, e-mail, options to e-mail or failure or retry, retry limits, and retry delay. The depends_on_past argument here is important from the official documentation. Depends_on_past when set to true keeps a task from being triggered if the previous schedule for the task did not succeed. Unless a previous run of your DAG has failed, the depends_on_past should not be a factor. It will not affect the current run at all if the previous run executed the task successfully. Next, we instantiate our DAG. All the code for our DAG will be written within the context of a DAG object. Here we name our DAG ID, which will serve as the unique identifier we will see in the Airflow webserver. We will also see other options here, such as catchup the schedule interval. In this case, we're using a special syntax that says to run the DAG monthly at midnight on the first day of the month. You can also pass in [inaudible] syntax as a string or use a DateTime time delta object instead. But what does catchup mean? Remember we had a start date as one of our default arguments. If the start date was a year ago, what happens to the runs we'd missed. If catchup is false, we simply forget them. If catchup is true, however, we will backfill the runs with the appropriate execution time for each one. For most common task, Airflow provides built-in operators as a BashOperator for executing a Bash command, and a PythonOperator for executing an arbitrary Python callable. If refocusing on Google Cloud, there is a long list of contributed operators for Google Cloud products. We list a few here; such as BigQuery, Cloud Storage, Dataproc, Dataflow, Cloud Build, and AI Platform. In the example on this slide, we have a BigQuery operator. We give the instance of the operator or a task a Task ID. This Task ID will be the name of the task within the DAG. It will be used for reference and individual representation of the DAG in the Airflow webserver. Now that we know how to create a task, how do we define data dependencies? We do this using the set_upstream and set_downstream methods. Note that we can also do this using bitshift operators. For example, the four expressions here are functionally the same. Op1 >> op2 is the same as op1.set_downstream(op2). Likewise, op2 << op1 is the same as op2.set-upstream(op1). If a task has multiple upstream or downstream dependencies, we can use the list to make our notation more concise. For the example here, we can compress two separate statements into a single statement as you can see on the bottom of the slide. Now that we have a DAG ready to go, how do we create and access our environment to run it? We can access the Composer environment via the Google Cloud Console, Google Cloud SDK CLI, or via REST APIs. In the Google Cloud Console, it is easy to set up our Cloud Composer environment after choosing a few options and pressing "Submit". After the environment is set up, we will find a link to the folder in Cloud Storage where our DAGs will be stored and a link to the Airflow webserver. Using the Google Cloud SDK CLI, we can create, modify, and delete our Composer environments. We can also run commands via the Airflow CLI using G-Cloud Composer environments run with the environment name and the Airflow CLI command we want to run. Finally, we can create our environment by using the REST API. We construct an environments.create API requests and provide the configuration information required. For information about this and all other operations you can perform with the REST API, please see the Cloud Composer REST API documentation. Finally, how do we get our files in 2D Airflow webserver for DAGs, dependencies, and custom plug-ins? Cloud Composer performs a periodic one-way rsync between the Cloud storage locations listed here and corresponding local directories. All you need to do is be sure that your files make it to the right location in Cloud storage and the rest will be managed for you. You can keep workflow-related data and task logs up to date and the corresponding Cloud Storage locations by using Cloud Storage views, an open-source views adapter that allows you to mount Cloud Storage buckets as file systems on Linux or macOS Systems.