Data with Marc

Data with Marc

This channel is to help data engineers start or improve their skills around the most exciting data engineering tools.
I'm obsessed with providing high-value content that is highly engaging. Engineering courses don't have to be boring! Let's stop the curse! Expect to see a lot of hands-on tutorials, mainly about Apache Airflow, but not only. I also love training and public speaking.
Oh, Subscribe; all excellent engineers subscribe to a fascinating KZread channel like this one 🥹.

Feel free to follow me on LinkedIn, where I post daily content on Airflow: www.linkedin.com/in/marclamberti/

Пікірлер

  • @alfahatasi
    @alfahatasiКүн бұрын

    Which Udemy training has this video in more detail?I want to create a dataset by importing data from postgresql. So when there is an update in the interconnected tables, I will check if there will be an update in the other tables.

  • @SfZq
    @SfZqКүн бұрын

    How can I use ontology with Graph to extract dependency 😖 ???

  • @djalan84
    @djalan843 күн бұрын

    what about sharing data between dags?

  • @MarcLamberti
    @MarcLambertiКүн бұрын

    You can. Just specify the dag_id in the xcom_pull method.

  • @kribashannaidoo2449
    @kribashannaidoo24493 күн бұрын

    Hi Marc, I was wondering if you would be able to shed some light on GCSToGoogleDriveOperator. I'm trying to copy a file from GCS to a personal Google Drive with Airflow. I'm only allowed to use Impersonation chain so I have a service account that has access to the GCS bucket and I have whitelisted the service account on my Google Drive folder. So the file downloads successfully from the bucket to Airflow and the upload API to Google Drive is successful but the file doesn't seem show up on my drive. What could be the problem?

  • @yuvalinselberg5570
    @yuvalinselberg55703 күн бұрын

    Excellent video, thanks!

  • @vjvan9338
    @vjvan93386 күн бұрын

    Here is the solution for following errors" And after I refresh Airflow, I got this another one: Broken DAG: [/usr/local/airflow/dags/retail.py] Traceback (most recent call last): File "/usr/local/lib/python3.10/selectors.py", line 416, in select fd_event_list = self._selector.poll(timeout) File "/usr/local/lib/python3.10/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout raise AirflowTaskTimeout(self.error_message) airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /usr/local/airflow/dags/retail.py after 30.0s" You need to change your .env file by adding following line: AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=180 AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=160 Astro also offer the documentation: docs.astronomer.io/astro/environment-variables

  • @PancoCheong
    @PancoCheong7 күн бұрын

    Is it possible to control a repeat-scheduled pipeline run only once at a time?assuming the pipeline schedule a pipline run every minute, normally it takes 10 seconds to complete; however, sometime it takes more than 3 minutes to complete due to busy database. How to avoid the next cycle to run in parallel? If possible to skip the next 2 cycles?

  • @user-bl7dy8fg7t
    @user-bl7dy8fg7t7 күн бұрын

    Still works 😄. really cool pipeline

  • @MarcLamberti
    @MarcLamberti6 күн бұрын

    Good to know 🥹

  • @briankelly6561
    @briankelly65617 күн бұрын

    Thanks Marc. Thumbs up from Canada!

  • @MarcLamberti
    @MarcLamberti7 күн бұрын

    ❤❤

  • @suebrickston
    @suebrickston7 күн бұрын

    Merci Marc! I would like to know why two python environments are needed. I installed airflow on my machine and pip installed airbyte in the same environment where I install airflow. Do I need to specify same environment twice or not at all (working local)?

  • @MarcLamberti
    @MarcLamberti7 күн бұрын

    Great question! I think they are some PyAirbyte dependencies that conflict with Airflow dependencies. That’s why we create one Python Virtual Environment with PyAirbyte installed. The second one is optional but recommended. It has the source installed to avoid having get_source installing the source s3 each time the task runs. That will save a lot of runtime

  • @GCPollaa
    @GCPollaa7 күн бұрын

    Great video. I saw that the schema of resulting table in BigQuery are mostly strings... What if we are working with parquet files with a specified schema? How to avoid data type conversion issues between aws and gcp? If I have airflow running locally, do I still need the astro cli? Thanks Marc...

  • @slords
    @slords8 күн бұрын

    Another new feature of 2.9 is the ability to give a name to an expanded task using map_index_template

  • @donnillorussia
    @donnillorussia8 күн бұрын

    This is getting closer to "no code", but I don' like this tendency at all. You will end up with overcomplicated barely supportable product and lots of dumb users having no idea on how it actually works. The more messy Airflow is, the more proprietary are the solutions

  • @MarcLamberti
    @MarcLamberti8 күн бұрын

    I'm on your side, actually. I like coding myself and I just don't like no-code solutions. However, I still feel it's a great way to speed up the process of building things for the common boring things that we create everyday especially if you have access to the code. It's not yet possible, but maybe at some point, we will be able to export DAGs from cloud IDE. Let's see :)

  • @candyskullxoxo4660
    @candyskullxoxo466010 күн бұрын

    Hi, i love your vids. Can you show how to integrate minio xcom backend in Airflow running on kubernetes? Do i need to modify the Pod executer?

  • @sirinebouksila9631
    @sirinebouksila963110 күн бұрын

    This is so helpful ! Thank you so much for your efforts .

  • @ccc_ccc789
    @ccc_ccc78911 күн бұрын

    Fantastic! Thanks Marc!

  • @MarcLamberti
    @MarcLamberti11 күн бұрын

    Thank you!

  • @alfahatasi
    @alfahatasi14 күн бұрын

    How to extract table from postgre database instead of txt file as dataset. Is there an example video for this?

  • @alexeykruglov8185
    @alexeykruglov818515 күн бұрын

    Thank you vry much) I an working with my homework with your video

  • @parashualamel6104
    @parashualamel610415 күн бұрын

    Iam facing while mapping sql alchemy connection with postgres when i tried with what you said its not reflecting to correspong variable

  • @UsmanAli-cx9bd
    @UsmanAli-cx9bd16 күн бұрын

    Great job boy. keep it up.❤

  • @ronakshah6336
    @ronakshah633616 күн бұрын

    Can you make video how should we do airflow for windows rather than ubantu

  • @DamienBellamy-rl9sf
    @DamienBellamy-rl9sf17 күн бұрын

    Super helpful video. Just in case you need to adapt this tutorial to an existing docker compose environment for Airflow, make sure to add the line "- ./config/webserver_config.py:/opt/airflow/webserver_config.py" to your volumes list as specified in the example docker-compose.yaml of this tutorial. Without it, Airflow wont load those settings and it's not there by default.

  • @datalearningsihan
    @datalearningsihan18 күн бұрын

    I have an etl process in place in the ADF. In our team, we wanted to implement the table and views transformation and implementation with dbt core. We were wondering if we could orchestrate the dbt with Azure. If so, then how? One of the approaches I could think of was to use Azure Managed Airflow Instance. But, will it allow us to install astronomer cosmos? I have never implemented dbt this way before, so needed to know if this would be the right approach or is there anything else you would suggest me?

  • @samsonleul7667
    @samsonleul766720 күн бұрын

    cosmos has a very poor documentation .do not recommend to anyone

  • @MarcLamberti
    @MarcLamberti20 күн бұрын

    Anything you were looking for specifically?

  • @samsonleul7667
    @samsonleul766719 күн бұрын

    @@MarcLamberti from cosmos.providers.dbt.core.operators import ( DbtDepsOperator, DbtRunOperationOperator, DbtSeedOperator, ) this imports do no work on the latest version of cosmos and couldn't find their alternatives

  • @essak3219
    @essak321921 күн бұрын

    Hello Marc, QQ: Does the "Clear only failed tasks" option rerun the failed task and all of its upstream tasks as well?

  • @MarcLamberti
    @MarcLamberti10 күн бұрын

    Nope, only failed and downstream tasks

  • @davekennedy6856
    @davekennedy685622 күн бұрын

    Hi Marc, I love you Airflow videos. Do you have a solution for file watching files matching pattern like filename_YYYYMMdd.csv (eg: filename_20240501.csv) in S3 and also onprem? Thanks

  • @munyaradzimagodo3983
    @munyaradzimagodo398323 күн бұрын

    my dags are not appearing on the Airflow UI, any help here

  • @ChinmaySathe-bx4ck
    @ChinmaySathe-bx4ck23 күн бұрын

    You are a life save <3

  • @archanareddy651
    @archanareddy65124 күн бұрын

    Is ECS Operator supported in 2.7.1? we are planning to upgrade from 2.1 to 2.7.1

  • @harshavardhanravipudi5225
    @harshavardhanravipudi522527 күн бұрын

    thank you

  • @user-pj1vc4sc2m
    @user-pj1vc4sc2m27 күн бұрын

    Always love Marc's videos!

  • @MarcLamberti
    @MarcLamberti27 күн бұрын

    ❤️

  • @lukevandam7610
    @lukevandam761028 күн бұрын

    finding your videos really helpful. Thank you.

  • @swamynaidulenka
    @swamynaidulenka28 күн бұрын

    Great

  • @ashwathtk8424
    @ashwathtk8424Ай бұрын

    Streamlit app is not opening at localhost:8501 , This page isn’t workinglocalhost didn’t send any data. ERR_EMPTY_RESPONSE In the log i could see something like [2024-04-23, 13:24:46 UTC] {subprocess.py:93} INFO - ModuleNotFoundError: No module named 'altair.vegalite.v4' Can this be executed alone in the streamlit app, with out having docker ...

  • @KarlaCumaco
    @KarlaCumacoАй бұрын

    the best explanation i found abou it

  • @shrikantkoli-eq7hc
    @shrikantkoli-eq7hcАй бұрын

    Very Fast

  • @oshebase
    @oshebaseАй бұрын

    Hi Marc, I love the way you make Airflow interesting to jump at for a beginner like myself. Am completely new to Airflow world, as a matter of fact, your video on online retail is my first bite. But unfortunately, i got choked 😔 along the line 😭 tried to figure things out but couldn't. The task the error was coming from was in the task that upload gcs to bigqery, this task failed with error decodedencoded utf-8. But when i check BigQuery the raw_invoices table was created but no dataset 😢 wish i could fix this or find way around this. Could you as well be kindly make a video on dynamic task maping as mentioned in this video. Maybe from API to data warehouse. Thanks for the great and concise videos. 🎉

  • @danielrerko9033
    @danielrerko9033Ай бұрын

    For anyone coming here from the 2024's and beyond, in Linux, specifically Ubuntu, remember to use: `docker compose up init-airflow` <--NOTE THE SPACE. The old `docker-compose up -d` command actually runs a docker-compose.yml file using OLD docker version ~1.25 binaries and throws errors due to new syntax changes. TIL. PS - Thanks for the video Marc <3

  • @SaltineCracker610
    @SaltineCracker610Ай бұрын

    Does anyone only have 4 jobs instead of 8 within the dbtTaskgroup task in airflow? I only have "*_run" jobs but not "*_test" jobs. Any help is appreciated. Thanks!

  • @xxkall08xx
    @xxkall08xx18 күн бұрын

    Same here, I believe he did not show that.

  • @quoridorstrategy
    @quoridorstrategyАй бұрын

    I love this format, I wish you had more like that. I think so cool useful to have end-to-end projects like that, it gives the big picture

  • @richardmonteiro8694
    @richardmonteiro8694Ай бұрын

    how can we dynamically pass tasks instead of the static list parameters?

  • @brunocampos5010
    @brunocampos5010Ай бұрын

    Hi Marc and guys, how can I increase the time out of airflow in this project?

  • @MarcLamberti
    @MarcLambertiАй бұрын

    What do you mean by the time out of airflow?

  • @Aman-lv2ee
    @Aman-lv2eeАй бұрын

    Thanks Marc, I am facing this error when connecting to Snowflake from airflow; Airflow is running in docker compose (the file you provided in udemy course), ERROR- 250001: 250001: Could not connect to Snowflake backend after 2 attempt(s).Aborting I checked all the parameters but still facing this issue ( Airflow version - v2.8.1)

  • @marcosgmn
    @marcosgmnАй бұрын

    I have many cases where I depend on many datasets but i must ensure that the dag run is attempted only once per day. Can you do that with the DatasetOrTimeSchedule schedule? something like DatasetAndTimeSchedule?

  • @MarcLamberti
    @MarcLamberti24 күн бұрын

    Yes, you can do that with DatasetOrTimeSchedule :)

  • @karinaserrano2956
    @karinaserrano2956Ай бұрын

    Where do you need to store the dataset that's going to be used in the DAG?

  • @MarcLamberti
    @MarcLambertiАй бұрын

    I tend to have a datasets.py file in include/ where I define the datasets I use across DAGs

  • @dnbnero
    @dnbneroАй бұрын

    There is a typo in your video description. Version - 2.8, not 2.9

  • @MarcLamberti
    @MarcLambertiАй бұрын

    Thank you 🙏

  • @0kazaki
    @0kazakiАй бұрын

    Pretty good! I would love to find a way to access current context in a @task.virtualenv operator :D Access stuff like Params

  • @dsdu81
    @dsdu81Ай бұрын

    Thank you so much for this amazing project. Learn a lot!!

  • @MarcLamberti
    @MarcLambertiАй бұрын

    You're very welcome!

  • @kasunkp
    @kasunkpАй бұрын

    how to set the base_url as a configmap and enable it on the value.yaml file.

  • @zheyan4704
    @zheyan4704Ай бұрын

    love it, keep it going man, MEGA HELPFUL especially the part of docker_url