This channel is to help data engineers start or improve their skills around the most exciting data engineering tools.
I'm obsessed with providing high-value content that is highly engaging. Engineering courses don't have to be boring! Let's stop the curse! Expect to see a lot of hands-on tutorials, mainly about Apache Airflow, but not only. I also love training and public speaking.
Oh, Subscribe; all excellent engineers subscribe to a fascinating KZread channel like this one 🥹.
Feel free to follow me on LinkedIn, where I post daily content on Airflow: www.linkedin.com/in/marclamberti/
Пікірлер
Which Udemy training has this video in more detail?I want to create a dataset by importing data from postgresql. So when there is an update in the interconnected tables, I will check if there will be an update in the other tables.
How can I use ontology with Graph to extract dependency 😖 ???
what about sharing data between dags?
You can. Just specify the dag_id in the xcom_pull method.
Hi Marc, I was wondering if you would be able to shed some light on GCSToGoogleDriveOperator. I'm trying to copy a file from GCS to a personal Google Drive with Airflow. I'm only allowed to use Impersonation chain so I have a service account that has access to the GCS bucket and I have whitelisted the service account on my Google Drive folder. So the file downloads successfully from the bucket to Airflow and the upload API to Google Drive is successful but the file doesn't seem show up on my drive. What could be the problem?
Excellent video, thanks!
Here is the solution for following errors" And after I refresh Airflow, I got this another one: Broken DAG: [/usr/local/airflow/dags/retail.py] Traceback (most recent call last): File "/usr/local/lib/python3.10/selectors.py", line 416, in select fd_event_list = self._selector.poll(timeout) File "/usr/local/lib/python3.10/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout raise AirflowTaskTimeout(self.error_message) airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /usr/local/airflow/dags/retail.py after 30.0s" You need to change your .env file by adding following line: AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=180 AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=160 Astro also offer the documentation: docs.astronomer.io/astro/environment-variables
Is it possible to control a repeat-scheduled pipeline run only once at a time?assuming the pipeline schedule a pipline run every minute, normally it takes 10 seconds to complete; however, sometime it takes more than 3 minutes to complete due to busy database. How to avoid the next cycle to run in parallel? If possible to skip the next 2 cycles?
Still works 😄. really cool pipeline
Good to know 🥹
Thanks Marc. Thumbs up from Canada!
❤❤
Merci Marc! I would like to know why two python environments are needed. I installed airflow on my machine and pip installed airbyte in the same environment where I install airflow. Do I need to specify same environment twice or not at all (working local)?
Great question! I think they are some PyAirbyte dependencies that conflict with Airflow dependencies. That’s why we create one Python Virtual Environment with PyAirbyte installed. The second one is optional but recommended. It has the source installed to avoid having get_source installing the source s3 each time the task runs. That will save a lot of runtime
Great video. I saw that the schema of resulting table in BigQuery are mostly strings... What if we are working with parquet files with a specified schema? How to avoid data type conversion issues between aws and gcp? If I have airflow running locally, do I still need the astro cli? Thanks Marc...
Another new feature of 2.9 is the ability to give a name to an expanded task using map_index_template
This is getting closer to "no code", but I don' like this tendency at all. You will end up with overcomplicated barely supportable product and lots of dumb users having no idea on how it actually works. The more messy Airflow is, the more proprietary are the solutions
I'm on your side, actually. I like coding myself and I just don't like no-code solutions. However, I still feel it's a great way to speed up the process of building things for the common boring things that we create everyday especially if you have access to the code. It's not yet possible, but maybe at some point, we will be able to export DAGs from cloud IDE. Let's see :)
Hi, i love your vids. Can you show how to integrate minio xcom backend in Airflow running on kubernetes? Do i need to modify the Pod executer?
This is so helpful ! Thank you so much for your efforts .
Fantastic! Thanks Marc!
Thank you!
How to extract table from postgre database instead of txt file as dataset. Is there an example video for this?
Thank you vry much) I an working with my homework with your video
Iam facing while mapping sql alchemy connection with postgres when i tried with what you said its not reflecting to correspong variable
Great job boy. keep it up.❤
Can you make video how should we do airflow for windows rather than ubantu
Super helpful video. Just in case you need to adapt this tutorial to an existing docker compose environment for Airflow, make sure to add the line "- ./config/webserver_config.py:/opt/airflow/webserver_config.py" to your volumes list as specified in the example docker-compose.yaml of this tutorial. Without it, Airflow wont load those settings and it's not there by default.
I have an etl process in place in the ADF. In our team, we wanted to implement the table and views transformation and implementation with dbt core. We were wondering if we could orchestrate the dbt with Azure. If so, then how? One of the approaches I could think of was to use Azure Managed Airflow Instance. But, will it allow us to install astronomer cosmos? I have never implemented dbt this way before, so needed to know if this would be the right approach or is there anything else you would suggest me?
cosmos has a very poor documentation .do not recommend to anyone
Anything you were looking for specifically?
@@MarcLamberti from cosmos.providers.dbt.core.operators import ( DbtDepsOperator, DbtRunOperationOperator, DbtSeedOperator, ) this imports do no work on the latest version of cosmos and couldn't find their alternatives
Hello Marc, QQ: Does the "Clear only failed tasks" option rerun the failed task and all of its upstream tasks as well?
Nope, only failed and downstream tasks
Hi Marc, I love you Airflow videos. Do you have a solution for file watching files matching pattern like filename_YYYYMMdd.csv (eg: filename_20240501.csv) in S3 and also onprem? Thanks
my dags are not appearing on the Airflow UI, any help here
You are a life save <3
Is ECS Operator supported in 2.7.1? we are planning to upgrade from 2.1 to 2.7.1
thank you
Always love Marc's videos!
❤️
finding your videos really helpful. Thank you.
Great
Streamlit app is not opening at localhost:8501 , This page isn’t workinglocalhost didn’t send any data. ERR_EMPTY_RESPONSE In the log i could see something like [2024-04-23, 13:24:46 UTC] {subprocess.py:93} INFO - ModuleNotFoundError: No module named 'altair.vegalite.v4' Can this be executed alone in the streamlit app, with out having docker ...
the best explanation i found abou it
Very Fast
Hi Marc, I love the way you make Airflow interesting to jump at for a beginner like myself. Am completely new to Airflow world, as a matter of fact, your video on online retail is my first bite. But unfortunately, i got choked 😔 along the line 😭 tried to figure things out but couldn't. The task the error was coming from was in the task that upload gcs to bigqery, this task failed with error decodedencoded utf-8. But when i check BigQuery the raw_invoices table was created but no dataset 😢 wish i could fix this or find way around this. Could you as well be kindly make a video on dynamic task maping as mentioned in this video. Maybe from API to data warehouse. Thanks for the great and concise videos. 🎉
For anyone coming here from the 2024's and beyond, in Linux, specifically Ubuntu, remember to use: `docker compose up init-airflow` <--NOTE THE SPACE. The old `docker-compose up -d` command actually runs a docker-compose.yml file using OLD docker version ~1.25 binaries and throws errors due to new syntax changes. TIL. PS - Thanks for the video Marc <3
Does anyone only have 4 jobs instead of 8 within the dbtTaskgroup task in airflow? I only have "*_run" jobs but not "*_test" jobs. Any help is appreciated. Thanks!
Same here, I believe he did not show that.
I love this format, I wish you had more like that. I think so cool useful to have end-to-end projects like that, it gives the big picture
how can we dynamically pass tasks instead of the static list parameters?
Hi Marc and guys, how can I increase the time out of airflow in this project?
What do you mean by the time out of airflow?
Thanks Marc, I am facing this error when connecting to Snowflake from airflow; Airflow is running in docker compose (the file you provided in udemy course), ERROR- 250001: 250001: Could not connect to Snowflake backend after 2 attempt(s).Aborting I checked all the parameters but still facing this issue ( Airflow version - v2.8.1)
I have many cases where I depend on many datasets but i must ensure that the dag run is attempted only once per day. Can you do that with the DatasetOrTimeSchedule schedule? something like DatasetAndTimeSchedule?
Yes, you can do that with DatasetOrTimeSchedule :)
Where do you need to store the dataset that's going to be used in the DAG?
I tend to have a datasets.py file in include/ where I define the datasets I use across DAGs
There is a typo in your video description. Version - 2.8, not 2.9
Thank you 🙏
Pretty good! I would love to find a way to access current context in a @task.virtualenv operator :D Access stuff like Params
Thank you so much for this amazing project. Learn a lot!!
You're very welcome!
how to set the base_url as a configmap and enable it on the value.yaml file.
love it, keep it going man, MEGA HELPFUL especially the part of docker_url