In this lesson, you will make your pipeline more robust. You will add automatic read tries to protect your tasks against transient failures, and learn how to add notifications in case a Dag or task fails. Let's get to it. First, let's cause a task. To fail in the create collection. If not. Exists task, add. The line that tries to print ten divided by zero. This line will cause a zero division error in Python that. Fails the task. Any error or exception raised in an airflow. Task. Will fail. The task. And lines below the line causing the error will not be executed. This is the behavior you know from. Python scripts and cells in a. Notebook. After running the cell to overwrite the Dag file in the docs folder, you can create a manual run of the Dag in the airflow UI. The task. Failure of the create collection if. Not exists task causes the. Diagram to fail. Clicking on the doc name, you can see that the tag overview page. Surfaces the error. Log lines of. Recently failed tasks and provides a. Quick link to the full logs. You can also. See. Both in the grid view. After the graph you how the. Failed task caused its downstream task. Load embeddings to vector db to be in the state of upstream failed. This means that this task did not run. Because not all. Of its upstream tasks was successfully. Completed. How can you recover from a failure? Let's fix the code error. Remove the print statement. Dividing by zero. Run the cell to save. The Dag and then go back to the airflow UI. You can clear a task instance with the Clear Task instance button, which is displayed as just a. Forward curved arrow. If the sidebar is collapsed, clicking this button. You get several. Options. A common option to select is downstream. This means. That you not only attempt. The failed task, but also all tasks. Downstream of it. If you have many. Failed tasks in a complex stack, it is usually. Quicker to clear the whole Dag run. Click on the red bar of the failed diagram in the grid view and then unclear run or the forward curved arrow. The menu that. Opens re attempts all tasks in the dock. By default. But you can set. It to clear only. Failed tasks as well. To only clear. The tasks which are in the state. Of failed or upstream failed. Great. You can now recover from. A task failure. But what is an issue. Is just transient. If an API. Is down Saturday. Night for a. Few minutes. It would be great if airflow. Could just try the task. Again after a while. Instead of. Having to wait for you to clear. The task. Manually. On Monday. This is where automatic task retries come in. Retries is a task. Parameter that determines. For every task how many chances it gets before it fails. You can set the wait time. Between retries. Using. The retry delay parameter. In some cases, it makes sense to wait hours before. Retrying. A task again. Sometimes all that is needed are a few seconds to get past the rate limit, for example. In practice, this is one of the most powerful. Airflow features. And also one of the easiest to. Implement. A common strategy is to add. A default number of three. Tries to all tasks in a Dag. You can use the default. Args stack. Parameter to define. Default task parameters for all tasks. In your Dag setting. Default args to a dictionary containing retries. One and retry delay duration. 10s. Gives each task in the stack a second chance at succeeding if it fails at first. And it will wait. 10s before it makes its second attempt. After saving the changes and. Re adding the print ten divided by zero line to. Cause a task. Failure, you can see what happens if. Retry are set. In the airflow UI. Running the stack. Manually. The create collection if not exists task does not. Fail immediately like before. Instead, it status changes. To up for retry. Which shows as a yellow square with a slow forward. Curved error. After 10s, the task tries again. In this case, hitting the same line, causing a failure. And then fails for good. If the issue would have been transient, like an API being unavailable or a rate limit, having been hit. The second run might have been successful. Pause the. Video and add the default. Args. Dictionary to your DAC, containing defaults for. Retries and. Retry delay. Feel free to experiment and give your tasks more than one extra chance for success. Add a line that will fail. A task and. Then watch it retry in the airflow UI. Awesome. Now what if you have one task that. Should retry more or fewer. Times than the other tasks in the. DAC? No problem, you can override all. Defaults defined in the default args. Dictionary at the task level by providing the. Same. Arguments to the individual. Add task. Decorator. For example, adding retries equals five. And retry delay duration. Seconds equals to gives this task five extra chances after an initial. Failure. Six attempts in total with two seconds in. Between them. Another common situation is that some tasks regularly fail, but you still want the tasks. Downstream of it to run. In this example, if the collection. Already. Exists in V8. Then loading to. It will. Still work even if the create collection if. Not exists task fails. By default. Airflow tasks. Require all the upstream. Tasks to be successful. To run. This is referred. To as an airflow trigger rule, and the default one is called All success. There are many other trigger. Rules. A handy one to make a. Task. Run. Even if some of its upstream tasks failed, is called. All done. If you set the trigger, all of the node embeddings to vector db task to all done. It will run as soon as all its upstream tasks have finished. No matter if the. End state was a success, failed. Upstream. Failed, or skipped. Pause the video and add the all done trigger rule to the load embeddings to vector DB task. Run the cell to save the Dat. And then create a new diagram in the airflow UI. You can see that the load embeddings to vector db task. Waits until all its upstream tasks. Finish, but then. Runs despite the. Create collection if not. Exists task. Having failed, the Dacron itself. Is also marked as successful despite. Containing a failed. Task. The diagram end state is determined. Only by the life. Tasks. Of the Dat press. If they are. All in a state. Of either. Success or skipped. The Dacron counts as a success. Nice. You have full control now over what. Happens in case of task. Failures. There's just one piece missing for some. Crucial. Tasks that you'd. Like to be alerted if they fail. For example. Via an email. Or a slack message. You can add alerts to your Dag using callback. Functions. Callbacks exist at. The dock and at the task level for different situations. The most commonly used one is the on failure callback. Let's add. An on failure callback to all. Tasks. Of your Dag. Do you remember the quickest way to define a. Task. Parameter for all tasks in a Dag? Correct. Adding it to the default. Args. Dictionary. The callback parameter can be set to any function. This code runs as soon as any task fails. For this example, you can just print a message to the task logs. In a real life situation. You'd write code calling out. To the messaging tool of your choice, or use a. Pre-built notifier. Class. About which you can learn more. At the links in the resources section. Going back to the airflow UI and creating another manual diagram, you can see how after the. Task. Failure. The callback function is executed, which prints a line to the task logs. If you want a callback function to only run if the whole Dag run fails, you can provide it to the on failure callback parameter of the Dag itself. Pause the video here to add an on failure callback function to your tasks. Feel free to experiment with this. For example. By overriding the default function given to all. The tasks in the DAC. In the default args. Dictionary with a second. Callback. Function for a specific task. Remember that in order for the on failure callback function to run. You need to call that task to fail. Awesome. Now you know how to ensure you will be notified if a crucial. Task. Or DAC fails. Time for vacation, right? Well, what happens if you come. Back after two well-deserved weeks traveling for Portugal? And your. Dacron history contains several failures, and some Dag runs might have been missed. Entirely. Or sometimes you make changes to attack. For example, to add another feature to the. Training set. Of a normal model and want to rerun. Past stag runs to see. How the improved model performs. Against historic data. No worries. In airflow three, you can backfill any Dag that runs. On a time based schedule for any dates. In the past. Directly in the airflow UI. After clicking the trigger button. Select. Backfill and define your date. Range as well as the behavior. Of the backfill. Should only. Missing runs. Be filled in? Missing runs be filled in and error Dag runs be rerun. Or do you want to rerun all runs for that time period? In any case, after clicking Run backfill, the airflow UI will show a banner. As long as the. Backfill is in. Progress on which you can pause or stop the backfill as well. Pause the video. Now make sure to remove all error. Inducing test lines from your tasks, and after saving the deck. Go to the airflow UI to create a backfill job for the fetch data Dag. Note that this stack runs. Hourly, so we recommend you only backfill for 1 or 2 days. In the past. To avoid creating a lot of Dag. Runs in the sandbox environment. Wow, that was a lot. You now know all. The basics of. How to turn your gen AI prototype notebooks into production ready airflow pipelines, and can even. Use advanced features like dynamic task mapping. In the next lesson. You learn more about Jennie AI workflows in real life, including real example pipelines.