Airflow how to add new dag
Airflow how to add new dag
TutorialВ¶
This tutorial walks you through some of the fundamental Airflow concepts, objects, and their usage while writing your first pipeline.
Example Pipeline definitionВ¶
Here is an example of a basic pipeline definition. Do not worry if this looks complicated, a line by line explanation follows below.
It’s a DAG definition file¶
Importing ModulesВ¶
An Airflow pipeline is just a Python script that happens to define an Airflow DAG object. Let’s start by importing the libraries we will need.
See Modules Management for details on how Python and Airflow manage modules.
Default ArgumentsВ¶
We’re about to create a DAG and some tasks, and we have the choice to explicitly pass a set of arguments to each task’s constructor (which would become redundant), or (better!) we can define a dictionary of default parameters that we can use when creating tasks.
For more information about the BaseOperator’s parameters and what they do, refer to the airflow.models.BaseOperator documentation.
Also, note that you could easily define different sets of arguments that would serve different purposes. An example of that would be to have different settings between a production and development environment.
Instantiate a DAGВ¶
TasksВ¶
Tasks are generated when instantiating operator objects. An object instantiated from an operator is called a task. The first argument task_id acts as a unique identifier for the task.
The precedence rules for a task are as follows:
Explicitly passed arguments
Values that exist in the default_args dictionary
The operator’s default value, if one exists
Templating with JinjaВ¶
Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates.
This tutorial barely scratches the surface of what you can do with templating in Airflow, but the goal of this section is to let you know this feature exists, get you familiar with double curly brackets, and point to the most common template variable: << ds >> (today’s “date stamp”).
The params hook in BaseOperator allows you to pass a dictionary of parameters and/or objects to your templates. Please take the time to understand how the parameter my_param makes it through to the template.
Using that same DAG constructor call, it is possible to define user_defined_macros which allow you to specify your own variables. For example, passing dict(foo=’bar’) to this argument allows you to use << foo >> in your templates. Moreover, specifying user_defined_filters allows you to register your own filters. For example, passing dict(hello=lambda name: ‘Hello %s’ % name) to this argument allows you to use << 'world' | hello >> in your templates. For more information regarding custom filters have a look at the Jinja Documentation.
Adding DAG and Tasks documentationВ¶
We can add documentation for DAG or each single task. DAG documentation only support markdown so far and task documentation support plain text, markdown, reStructuredText, json, yaml. The DAG documentation can be written as a doc string at the beginning of the DAG file (recommended) or anywhere in the file. Below you can find some examples on how to implement task and DAG docs, as well as screenshots:
Setting up DependenciesВ¶
Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once.
RecapВ¶
Alright, so we have a pretty basic DAG. At this point your code should look something like this:
TestingВ¶
Running the ScriptВ¶
Time to run some tests. First, let’s make sure the pipeline is parsed successfully.
If the script does not raise an exception it means that you have not done anything horribly wrong, and that your Airflow environment is somewhat sound.
Command Line Metadata ValidationВ¶
Let’s run a few commands to validate this script further.
TestingВ¶
Now remember what we did with templating earlier? See how this template gets rendered and executed by running this command:
This should result in displaying a verbose log of events and ultimately running your bash command and printing the result.
Note that the airflow tasks test command runs task instances locally, outputs their log to stdout (on screen), does not bother with dependencies, and does not communicate state (running, success, failed, …) to the database. It simply allows testing a single task instance.
BackfillВ¶
Everything looks like it’s running fine so let’s run a backfill. backfill will respect your dependencies, emit logs into files and talk to the database to record status. If you do have a webserver up, you will be able to track the progress. airflow webserver will start a web server if you are interested in tracking the progress visually as your backfill progresses.
What’s Next?¶
That’s it, you have written, tested and backfilled your very first Airflow pipeline. Merging your code into a code repository that has a master scheduler running against it should get it to get triggered and run every day.
Here’s a few things you might want to do next:
Read the Concepts section for detailed explanation of Airflow concepts such as DAGs, Tasks, Operators, and more.
Keep reading the docs!
TutorialВ¶
This tutorial walks you through some of the fundamental Airflow concepts, objects, and their usage while writing your first pipeline.
Example Pipeline definitionВ¶
Here is an example of a basic pipeline definition. Do not worry if this looks complicated, a line by line explanation follows below.
It’s a DAG definition fileВ¶
Importing ModulesВ¶
An Airflow pipeline is just a Python script that happens to define an Airflow DAG object. Let’s start by importing the libraries we will need.
See Modules Management for details on how Python and Airflow manage modules.
Default ArgumentsВ¶
We’re about to create a DAG and some tasks, and we have the choice to explicitly pass a set of arguments to each task’s constructor (which would become redundant), or (better!) we can define a dictionary of default parameters that we can use when creating tasks.
For more information about the BaseOperator’s parameters and what they do, refer to the airflow.models.BaseOperator documentation.
Also, note that you could easily define different sets of arguments that would serve different purposes. An example of that would be to have different settings between a production and development environment.
Instantiate a DAGВ¶
TasksВ¶
Tasks are generated when instantiating operator objects. An object instantiated from an operator is called a task. The first argument task_id acts as a unique identifier for the task.
The precedence rules for a task are as follows:
Explicitly passed arguments
Values that exist in the default_args dictionary
The operator’s default value, if one exists
Templating with JinjaВ¶
Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates.
This tutorial barely scratches the surface of what you can do with templating in Airflow, but the goal of this section is to let you know this feature exists, get you familiar with double curly brackets, and point to the most common template variable: << ds >> (today’s «date stamp»).
The params hook in BaseOperator allows you to pass a dictionary of parameters and/or objects to your templates. Please take the time to understand how the parameter my_param makes it through to the template.
Using that same DAG constructor call, it is possible to define user_defined_macros which allow you to specify your own variables. For example, passing dict(foo=’bar’) to this argument allows you to use << foo >> in your templates. Moreover, specifying user_defined_filters allows you to register your own filters. For example, passing dict(hello=lambda name: ‘Hello %s’ % name) to this argument allows you to use << 'world' | hello >> in your templates. For more information regarding custom filters have a look at the Jinja Documentation.
Adding DAG and Tasks documentationВ¶
We can add documentation for DAG or each single task. DAG documentation only support markdown so far and task documentation support plain text, markdown, reStructuredText, json, yaml. The DAG documentation can be written as a doc string at the beginning of the DAG file (recommended) or anywhere in the file. Below you can find some examples on how to implement task and DAG docs, as well as screenshots:
Setting up DependenciesВ¶
Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once.
RecapВ¶
Alright, so we have a pretty basic DAG. At this point your code should look something like this:
TestingВ¶
Running the ScriptВ¶
Time to run some tests. First, let’s make sure the pipeline is parsed successfully.
If the script does not raise an exception it means that you have not done anything horribly wrong, and that your Airflow environment is somewhat sound.
Command Line Metadata ValidationВ¶
Let’s run a few commands to validate this script further.
TestingВ¶
Let’s test by running the actual task instances for a specific date. The date specified in this context is called the logical date (also called execution date for historical reasons), which simulates the scheduler running your task or DAG for a specific date and time, even though it physically will run now (or as soon as its dependencies are met).
Now remember what we did with templating earlier? See how this template gets rendered and executed by running this command:
This should result in displaying a verbose log of events and ultimately running your bash command and printing the result.
BackfillВ¶
Everything looks like it’s running fine so let’s run a backfill. backfill will respect your dependencies, emit logs into files and talk to the database to record status. If you do have a webserver up, you will be able to track the progress. airflow webserver will start a web server if you are interested in tracking the progress visually as your backfill progresses.
Pipeline ExampleВ¶
Lets look at another example; we need to get some data from a file which is hosted online and need to insert into our local database. We also need to look at removing duplicate rows while inserting.
Initial setupВ¶
We need to have docker and postgres installed. We will be using this docker file Follow the instructions properly to set up Airflow.
Create a Employee table in postgres using this
Let’s break this down into 2 steps: get data & merge data:
Here we are passing a GET request to get the data from the URL and save it in employees.csv file on our Airflow instance and we are dumping the file into a temporary table before merging the data to the final employees table
Here we are first looking for duplicate values and removing them before we insert new values in our final table
Lets look at our DAG:
This dag runs daily at 00:00. Add this python file to airflow/dags folder and go back to the main folder and run
Go to your browser and go to the site http://localhost:8080/home and trigger your DAG Airflow Example
The DAG ran successfully as we can see the green boxes. If there had been an error the boxes would be red. Before the DAG run my local table had 10 rows after the DAG run it had approx 100 rows
What’s Next?В¶
That’s it, you have written, tested and backfilled your very first Airflow pipeline. Merging your code into a code repository that has a master scheduler running against it should get it to get triggered and run every day.
Here’s a few things you might want to do next:
Read the Concepts section for detailed explanation of Airflow concepts such as DAGs, Tasks, Operators, and more.
Keep reading the docs!
TutorialВ¶
This tutorial walks you through some of the fundamental Airflow concepts, objects, and their usage while writing your first pipeline.
Example Pipeline definitionВ¶
Here is an example of a basic pipeline definition. Do not worry if this looks complicated, a line by line explanation follows below.
It’s a DAG definition file¶
Importing ModulesВ¶
An Airflow pipeline is just a Python script that happens to define an Airflow DAG object. Let’s start by importing the libraries we will need.
See Modules Management for details on how Python and Airflow manage modules.
Default ArgumentsВ¶
We’re about to create a DAG and some tasks, and we have the choice to explicitly pass a set of arguments to each task’s constructor (which would become redundant), or (better!) we can define a dictionary of default parameters that we can use when creating tasks.
For more information about the BaseOperator’s parameters and what they do, refer to the airflow.models.BaseOperator documentation.
Also, note that you could easily define different sets of arguments that would serve different purposes. An example of that would be to have different settings between a production and development environment.
Instantiate a DAGВ¶
TasksВ¶
Tasks are generated when instantiating operator objects. An object instantiated from an operator is called a task. The first argument task_id acts as a unique identifier for the task.
The precedence rules for a task are as follows:
Explicitly passed arguments
Values that exist in the default_args dictionary
The operator’s default value, if one exists
Templating with JinjaВ¶
Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates.
This tutorial barely scratches the surface of what you can do with templating in Airflow, but the goal of this section is to let you know this feature exists, get you familiar with double curly brackets, and point to the most common template variable: << ds >> (today’s “date stamp”).
Using that same DAG constructor call, it is possible to define user_defined_macros which allow you to specify your own variables. For example, passing dict(foo=’bar’) to this argument allows you to use << foo >> in your templates. Moreover, specifying user_defined_filters allows you to register your own filters. For example, passing dict(hello=lambda name: ‘Hello %s’ % name) to this argument allows you to use << 'world' | hello >> in your templates. For more information regarding custom filters have a look at the Jinja Documentation.
Adding DAG and Tasks documentationВ¶
We can add documentation for DAG or each single task. DAG documentation only supports markdown so far, while task documentation supports plain text, markdown, reStructuredText, json, and yaml. The DAG documentation can be written as a doc string at the beginning of the DAG file (recommended), or anywhere else in the file. Below you can find some examples on how to implement task and DAG docs, as well as screenshots:
Setting up DependenciesВ¶
Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once.
Using time zonesВ¶
RecapВ¶
Alright, so we have a pretty basic DAG. At this point your code should look something like this:
TestingВ¶
Running the ScriptВ¶
Time to run some tests. First, let’s make sure the pipeline is parsed successfully.
If the script does not raise an exception it means that you have not done anything horribly wrong, and that your Airflow environment is somewhat sound.
Command Line Metadata ValidationВ¶
Let’s run a few commands to validate this script further.
TestingВ¶
Let’s test by running the actual task instances for a specific date. The date specified in this context is called the logical date (also called execution date for historical reasons), which simulates the scheduler running your task or DAG for a specific date and time, even though it physically will run now (or as soon as its dependencies are met).
Now remember what we did with templating earlier? See how this template gets rendered and executed by running this command:
This should result in displaying a verbose log of events and ultimately running your bash command and printing the result.
Note that the airflow tasks test command runs task instances locally, outputs their log to stdout (on screen), does not bother with dependencies, and does not communicate state (running, success, failed, …) to the database. It simply allows testing a single task instance.
BackfillВ¶
Everything looks like it’s running fine so let’s run a backfill. backfill will respect your dependencies, emit logs into files and talk to the database to record status. If you do have a webserver up, you will be able to track the progress. airflow webserver will start a web server if you are interested in tracking the progress visually as your backfill progresses.
Pipeline ExampleВ¶
Lets look at another example; we need to get some data from a file which is hosted online and need to insert into our local database. We also need to look at removing duplicate rows while inserting.
Initial setupВ¶
We need to have Docker installed as we will be using the quick-start docker-compose installation for this example. The steps below should be sufficient, but see the quick-start documentation for full instructions.
We will also need to create a connection to the postgres db. To create one via the web UI, from the “Admin” menu, select “Connections”, then click the Plus sign to “Add a new record” to the list of connections.
Fill in the fields as shown below. Note the Connection Id value, which we’ll pass as a parameter for the postgres_conn_id kwarg.
Connection Id: tutorial_pg_conn
Connection Type: postgres
Test your connection and if the test is successful, save your connection.
Table Creation TasksВ¶
We can use the PostgresOperator to define tasks that create tables in our postgres db.
We’ll create one table to facilitate data cleaning steps ( employees_temp ) and another table to store our cleaned data ( employees ).
Optional Note:В¶
and repeat for the employees_temp table.
Data Retrieval TaskВ¶
Here we retrieve data, save it to a file on our Airflow instance, and load the data from that file into an intermediate table where we can execute data cleaning steps.
Data Merge TaskВ¶
Here we select completely unique records from the retrieved data, then we check to see if any employee Serial Numbers are already in the database (if they are, we update those records with the new data).
Completing our DAG:В¶
We’ve developed our tasks, now we need to wrap them in a DAG, which enables us to define when and how tasks should run, and state any dependencies that tasks have on other tasks. The DAG below is configured to:
run every day a midnight starting on Jan 1, 2021,
only run once in the event that days are missed, and
timeout after 60 minutes
And from the last line in the definition of the Etl DAG, we see:
the merge_data() task depends on the get_data() task,
the get_data() depends on both the create_employees_table and create_employees_temp_table tasks, and
the create_employees_table and create_employees_temp_table tasks can run independently.
Putting all of the pieces together, we have our completed DAG.
Save this code to a python file in the /dags folder (e.g. dags/etl.py ) and (after a brief delay), the Etl DAG will be included in the list of available DAGs on the web UI.
You can trigger the Etl DAG by unpausing it (via the slider on the left end) and running it (via the Run button under Actions).
In the Etl DAG’s Tree view, we see all that all tasks ran successfully in all executed runs. Success!
What’s Next?¶
That’s it, you have written, tested and backfilled your very first Airflow pipeline. Merging your code into a code repository that has a master scheduler running against it should get it to get triggered and run every day.
Here’s a few things you might want to do next:
Read the Concepts section for detailed explanation of Airflow concepts such as DAGs, Tasks, Operators, and more.
Keep reading the docs!
DAGsВ¶
A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run.
Here’s a basic example DAG:
Declaring a DAGВ¶
Or, you can use a standard constructor, passing the dag into any operators you use:
Or, you can use the @dag decorator to turn a function into a DAG generator :
Task DependenciesВ¶
A Task/Operator does not usually live alone; it has dependencies on other tasks (those upstream of it), and other tasks depend on it (those downstream of it). Declaring these dependencies between tasks is what makes up the DAG structure (the edges of the directed acyclic graph).
There are two main ways to declare individual task dependencies. The recommended one is to use the >> and operators:
Or, you can also use the more explicit set_upstream and set_downstream methods:
There are also shortcuts to declaring more complex dependencies. If you want to make two lists of tasks depend on all parts of each other, you can’t use either of the approaches above, so you need to use cross_downstream :
And if you want to chain together dependencies, you can use chain :
Loading DAGsВ¶
This means you can define multiple DAGs per Python file, or even spread one very complex DAG across multiple Python files using imports.
Note, though, that when Airflow comes to load DAGs from a Python file, it will only pull any objects at the top level that are a DAG instance. For example, take this DAG file:
While both DAG constructors get called when the file is accessed, only dag_1 is at the top level (in the globals() ), and so only it is added to Airflow. dag_2 is not loaded.
To consider all Python files instead, disable the DAG_DISCOVERY_SAFE_MODE configuration flag.
Running DAGsВ¶
DAGs will run in one of two ways:
When they are triggered either manually or via the API
On a defined schedule, which is defined as part of the DAG
DAGs do not require a schedule, but it’s very common to define one. You define it via the schedule_interval argument, like this:
The schedule_interval argument takes any value that is a valid Crontab schedule value, so you could also do:
Those DAG Runs will all have been started on the same actual day, but their execution_date values will cover those last 3 months, and that’s what all the tasks, operators and sensors inside the DAG look at when they run.
In much the same way a DAG instantiates into a DAG Run every time it’s run, Tasks specified inside a DAG also instantiate into Task Instances along with it.
DAG AssignmentВ¶
Note that every single Operator/Task must be assigned to a DAG in order to run. Airflow has several ways of calculating the DAG without you passing it explicitly:
If you declare your Operator inside a with DAG block
If you declare your Operator inside a @dag decorator,
If you put your Operator upstream or downstream of a Operator that has a DAG
Default ArgumentsВ¶
Often, many Operators inside a DAG need the same set of default arguments (such as their start_date ). Rather than having to specify this individually for every Operator, you can instead pass default_args to the DAG when you create it, and it will auto-apply them to any operator tied to it:
The DAG decoratorВ¶
New in version 2.0.
As well as the more traditional ways of declaring a single DAG using a context manager or the DAG() constructor, you can also decorate a function with @dag to turn it into a DAG generator function:
Control FlowВ¶
By default, a DAG will only run a Task when all the Tasks it depends on are successful. There are several ways of modifying this, however:
BranchingВ¶
You can make use of branching in order to tell the DAG not to run all dependent tasks, but instead to pick and choose one or more paths to go down. This is where the branching Operators come in.
The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id (or list of task_ids). The task_id returned is followed, and all of the other paths are skipped.
The task_id returned by the Python function has to reference a task directly downstream from the BranchPythonOperator task.
When a Task is downstream of both the branching operator and downstream of one of more of the selected tasks, it will not be skipped:
The BranchPythonOperator can also be used with XComs allowing branching context to dynamically decide what branch to follow based on upstream tasks. For example:
Latest OnlyВ¶
This special Operator skips all tasks downstream of itself if you are not on the «latest» DAG run (if the wall-clock time right now is between its execution_time and the next scheduled execution_time, and it was not an externally-triggered run).
Here’s an example:
In the case of this DAG:
task1 is directly downstream of latest_only and will be skipped for all runs except the latest.
task2 is entirely independent of latest_only and will run in all scheduled periods
Depends On PastВ¶
Trigger RulesВ¶
By default, Airflow will wait for all upstream tasks for a task to be successful before it runs that task.
However, this is just the default behaviour, and you can control it using the trigger_rule argument to a Task. The options for trigger_rule are:
all_success (default): All upstream tasks have succeeded
all_failed : All upstream tasks are in a failed or upstream_failed state
all_done : All upstream tasks are done with their execution
one_failed : At least one upstream task has failed (does not wait for all upstream tasks to be done)
one_success : At least one upstream task has succeeded (does not wait for all upstream tasks to be done)
dummy : No dependencies at all, run this task at any time
You can also combine this with the Depends On Past functionality if you wish.
It’s important to be aware of the interaction between trigger rules and skipped tasks, especially tasks that are skipped as part of a branching operation. You almost never want to use all_success or all_failed downstream of a branching operation.
By setting trigger_rule to none_failed_or_skipped in the join task, we can instead get the intended behaviour:
Dynamic DAGsВ¶
Since a DAG is defined by Python code, there is no need for it to be purely declarative; you are free to use loops, functions, and more to define your DAG.
For example, here is a DAG that uses a for loop to define some Tasks:
In general, we advise you to try and keep the topology (the layout) of your DAG tasks relatively stable; dynamic DAGs are usually better used for dynamically loading configuration options or changing operator options.
DAG VisualizationВ¶
If you want to see a visual representation of a DAG, you have two options:
You can load up the Airflow UI, navigate to your DAG, and select «Graph View»
We generally recommend you use the Graph View, as it will also show you the state of all the Task Instances within any DAG Run you select.
Of course, as you develop out your DAGs they are going to get increasingly complex, so we provide a few ways to modify these DAG views to make them easier to understand.
TaskGroupsВ¶
A TaskGroup can be used to organize tasks into hierarchical groups in Graph View. It is useful for creating repeating patterns and cutting down visual clutter.
Dependency relationships can be applied across all tasks in a TaskGroup with the >> and operators. For example, the following code puts task1 and task2 in TaskGroup group1 and then puts both tasks upstream of task3 :
If you want to see a more advanced use of TaskGroup, you can look at the example_task_group.py example DAG that comes with Airflow.
By default, child tasks/TaskGroups have their IDs prefixed with the group_id of their parent TaskGroup. This helps to ensure uniqueness of group_id and task_id throughout the DAG.
To disable the prefixing, pass prefix_group_id=False when creating the TaskGroup, but note that you will now be responsible for ensuring every single task and group has a unique ID of its own.
Edge LabelsВ¶
To add labels, you can use them directly inline with the >> and operators:
Or, you can pass a Label object to set_upstream / set_downstream :
Here’s an example DAG which illustrates labeling different branches:
DAG & Task DocumentationВ¶
It’s possible to add documentation or notes to your DAGs & task objects that are visible in the web interface («Graph View» & «Tree View» for DAGs, «Task Instance Details» for tasks).
There are a set of special task attributes that get rendered as rich content if defined:
Please note that for DAGs, doc_md is the only attribute interpreted.
This is especially useful if your tasks are built dynamically from configuration files, as it allows you to expose the configuration that led to the related tasks in Airflow:
SubDAGsВ¶
Sometimes, you will find that you are regularly adding exactly the same set of tasks to every DAG, or you want to group a lot of tasks into a single, logical unit. This is what SubDAGs are for.
For example, here’s a DAG that has a lot of parallel tasks in two sections:
We can combine all of the parallel task-* operators into a single SubDAG, so that the resulting DAG resembles the following:
This SubDAG can then be referenced in your main DAG file:
You can zoom into a SubDagOperator from the graph view of the main DAG to show the tasks contained within the SubDAG:
Some other tips when using SubDAGs:
By convention, a SubDAG’s dag_id should be prefixed by the name of its parent DAG and a dot ( parent.child )
You should share arguments between the main DAG and the SubDAG by passing arguments to the SubDAG operator (as demonstrated above)
Clearing a SubDagOperator also clears the state of the tasks within it.
Marking success on a SubDagOperator does not affect the state of the tasks within it.
Refrain from using Depends On Past in tasks within the SubDAG as this can be confusing.
You can specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot.
See airflow/example_dags for a demonstration.
Packaging DAGsВ¶
While simpler DAGs are usually only in a single Python file, it is not uncommon that more complex DAGs might be spread across multiple files and have dependencies that should be shipped with them («vendored»).
Note that packaged DAGs come with some caveats:
They cannot be used if you have picking enabled for serialization
They cannot contain compiled libraries (e.g. libz.so ), only pure Python
They will be inserted into Python’s sys.path and importable by any other code in the Airflow process, so ensure the package names don’t clash with other packages already installed on your system.
DAG DependenciesВ¶
Added in Airflow 2.1
While dependencies between tasks in a DAG are explicitly defined through upstream and downstream relationships, dependencies between DAGs are a bit more complex. In general, there are two ways in which one DAG can depend on another:
The dependency detector is configurable, so you can implement your own logic different than the defaults in DependencyDetector
TutorialВ¶
This tutorial walks you through some of the fundamental Airflow concepts, objects, and their usage while writing your first pipeline.
Example Pipeline definitionВ¶
Here is an example of a basic pipeline definition. Do not worry if this looks complicated, a line by line explanation follows below.
It’s a DAG definition file¶
Importing ModulesВ¶
An Airflow pipeline is just a Python script that happens to define an Airflow DAG object. Let’s start by importing the libraries we will need.
Default ArgumentsВ¶
We’re about to create a DAG and some tasks, and we have the choice to explicitly pass a set of arguments to each task’s constructor (which would become redundant), or (better!) we can define a dictionary of default parameters that we can use when creating tasks.
For more information about the BaseOperator’s parameters and what they do, refer to the airflow.models.BaseOperator documentation.
Also, note that you could easily define different sets of arguments that would serve different purposes. An example of that would be to have different settings between a production and development environment.
Instantiate a DAGВ¶
TasksВ¶
Tasks are generated when instantiating operator objects. An object instantiated from an operator is called a constructor. The first argument task_id acts as a unique identifier for the task.
The precedence rules for a task are as follows:
Explicitly passed arguments
Values that exist in the default_args dictionary
The operator’s default value, if one exists
Templating with JinjaВ¶
Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates.
This tutorial barely scratches the surface of what you can do with templating in Airflow, but the goal of this section is to let you know this feature exists, get you familiar with double curly brackets, and point to the most common template variable: << ds >> (today’s “date stamp”).
The params hook in BaseOperator allows you to pass a dictionary of parameters and/or objects to your templates. Please take the time to understand how the parameter my_param makes it through to the template.
Using that same DAG constructor call, it is possible to define user_defined_macros which allow you to specify your own variables. For example, passing dict(foo=’bar’) to this argument allows you to use << foo >> in your templates. Moreover, specifying user_defined_filters allow you to register you own filters. For example, passing dict(hello=lambda name: ‘Hello %s’ % name) to this argument allows you to use << 'world' | hello >> in your templates. For more information regarding custom filters have a look at the Jinja Documentation
For more information on the variables and macros that can be referenced in templates, make sure to read through the Macros reference
Adding DAG and Tasks documentationВ¶
We can add documentation for DAG or each single task. DAG documentation only support markdown so far and task documentation support plain text, markdown, reStructuredText, json, yaml