Now that you know how to write Airflow DAGs, this lesson is all about how to turn your Jenny AI prototype in a Jupyter notebook into an air flow pipeline. You'll write two. That's the first one to ingest, embed, and load embeddings into a vector database, and the second one to query that database for a book you like. With that, let's get coding. Like in the. Last lesson, you can run the first cell to. Get the link to your airflow environment's UI, and log in with. Airflow as the username and password. Using the same magic. Command right file, you can add your two. Dax at first one. Which will ingest, embed, and load the. Book descriptions to. The VBA vector database. Is called. Fetch Data. The second one which queries the vector. Database is. Called query data. Like in the last lesson. You are using the Dag decorator to create the Dag runs. Running these. Two cells creates two new files in the DAGs folder. One for each Dag in the airflow UI. You can see the two. Dax in the Dax list. Both do not contain any tasks yet. Pause the video here to run the. Two cells containing. The empty. DAGs. And confirm in the airflow UI that they have been. Successfully. Created. Remember that it can take up to 30. Seconds in this. Environment for a new Dag. To show up. The next step. When writing a pipeline is to define. Your Dag structure. That is, decide what actions the individual tasks. Will perform in what order, and create them. As empty tasks. The important principle to remember. From lesson. One here is to try to make. Tasks as atomic as possible, to more easily recover. From the point of failure. If something goes wrong. The ingesting. Embedding, and loading part of the Rag prototype. Can be expressed as five tasks. The first. One creates the collection in the vector. Database in case it does not exist yet. Another task lists the book description files available. To the pipeline. Again, another. Task transforms those files into a list of dictionaries. One task creates the. Vector. Embeddings. And the final. Task loads those embeddings. Into the vector database. Most of those tasks use the. Output of their. Previous task. As an input. For example, the task. That creates the vector. Embeddings gets the transformed. Book data from the upstream. Transform book description files task. The only exception is the create collection. If not exists a task. It is not embedded in the. Dependency. Structure yet. You can use. The chain function to decide how this task relates. To the others in the deck. One logical location for it. Since it prepares the vector database, is right. Before embeddings are loaded into that. Database. After running the cell. The Dag in the airflow UI is updated and now. Contains five tasks. The query data that is simpler. It has only one task called search vector. Db for. A book for this first iteration of the doc. You can hardcode the input. To the query string. Argument. Pause the video and run. The two cells containing the structured text to. Overwrite the files in the DAGs folder. Check out the docs in the airflow UI and run them manually. Awesome. You saw the tasks complete successfully. But they aren't actually doing anything yet. Let's change that. Next, we'll fill in the code for each task. This code. Will look very familiar from. Lesson. Two, and most of the time you. Can use the same. Code from your prototype. Notebooks in your airflow tasks. With some minor modifications here. I'll call out the modifications for each task. Variables used in several tasks. Can be declared at the top level of the Dag to ensure. Consistency for the stack. You'll define three. Variables the. Collection name. The book description folder. And the name of the embedding model. Note that the code at the top level of Dag. Files gets passed every time the DAGs folder is parsed for changes. By default. That is, every 30s. This means you should avoid code that takes a long time to run. Or makes connections. To external systems. At the top level. Of the Dag. Imagine running a costly query. Against your database every 30s. Let's avoid that for the create collection. If not exists task. What you need to change is the. Kind of VBA database. You use. The notebook. Uses an. Embedded VBA. Database, which. Is great for prototyping, but in production you'll want to connect to a hosted database either on premise or in the cloud. You can make this connection by using an airflow hook. In airflow. Hooks are classes that can connect to external services, for example. AWS, Google Cloud, or like here. We've. Yet they are. Available as part. Of airflow provider packages. Here we import the VBA talk from the V8 provider package, which was. Installed. In the airflow environment already. The connection is created. By using connection credentials stored safely inside airflow as a connection object. Referenced using a. Connection ID string. In our. Example, the connection. ID is my. V8 con and we've already created this connection for you as an environment variable. The rest of this task uses the exact same code as in. The notebook, to check whether the collection we want to. Use already exists and creates it. If it does not exist yet. The next two tasks list book description Files and transform book description files. Use the exact same code as in the related notebook cells as well. There are only two modifications needed. First. You need to import the. Packages used to task at the start of the task function. And secondly. You need to return the value that you want to use in the next task. For example, the list book description files task returns the list of book descriptions which the Transform Book Description Files task then uses as an input. Creating the vector. Embeddings also uses almost the same code as in the notebook cell with one small change. By default. Only some data types can be passed between airflow tasks for example. Data that is. Json serializable, or Pandas data frames. For this reason. The. Embeddings are cast to. Floats before being. Passed to the next task. To ensure Json serialize ability. In a production environment. When passing larger amounts of data between tasks. You'd use a cloud storage solution. For example, Amazon S3. This can be accomplished either by explicitly. Writing. The file to cloud storage. From within the. Task code. Or by changing the airflow configuration to automatic store data. Passed between tasks in a food location. The last task. Of the fetch data Dag loads the embeddings to V8. Other than adding the imports. For all packages used in the task. I'm using the V8 talk to connect to V8. The code is the. Same as in the notebook. After running. The cell to save. It, the airflow UI shows the updated code in the code tab. Triggering a dark run now. Causes all book descriptions in the files in the included data. Folder to be ingested, embedded, and loaded into a V8. Instance. Running in a Docker container alongside the local airflow environment. Nice. Now you just need to finish the query data deck to be able to run a new. Vector search on this data. Using an airflow deck. This stack is simple. It just has one task that runs the same code as the corresponding cell did. In the notebook. The only. Change is. That the V8. Hook is used to establish. The connection to the vector. Database. With a manual run of the DAC. You can get the book recommendation printed to the. Task logs. Feel free to change the query string to the type of book you are looking for. This is fun. But in reality, a pipeline like. This. Would need to run automatically. If you imagine you work at an online. Bookstore, the list of available books would change. Frequently, and the deck would need to run regularly to create and add new book embeddings. Scheduling pipelines is one of the core features of airflow and what we'll cover. In the next lesson.