Astronomer

Astronomer

Astronomer is the commercial developer of Apache Airflow, a community-driven open-source tool that's leading the market in data orchestration. We're a global, venture-backed team of learners, innovators and collaborators working to build an Enterprise-grade product that makes it easy for data teams at Fortune 500’s and startups alike to adopt Apache Airflow.

Intro to Astro Hosted!

Intro to Astro Hosted!

What's new in Airflow 2.8

What's new in Airflow 2.8

Пікірлер

  • @ap2394
    @ap23942 күн бұрын

    HI Is it possible to schedule the Task using dataset ? or its controlled at Dag level. I mean if i hv 2 task in downstream Dag , do I hv option to customised the schedule on the basis of Task's upstream dataset

  • @spikeydude114
    @spikeydude1143 күн бұрын

    Do you have LinkedIn?

  • @pgrvloik
    @pgrvloik3 күн бұрын

    Great!

  • @rkenne1391
    @rkenne139110 күн бұрын

    Can you provide more context on the batch inference pipeline ? Airflow is an orchestrator, you will need a different framework to perform batch inference ?

  • @snehal4520
    @snehal452010 күн бұрын

    Very informative, thank you!

  • @amirhosseinsharifinejad7752
    @amirhosseinsharifinejad775214 күн бұрын

    Really helpful thank you😍

  • @PaulChung-rg6jv
    @PaulChung-rg6jv18 күн бұрын

    Tons of information. Any chance this can be thrown in a github for us engineers who need more time to digest?

  • @munyaradzimagodo3983
    @munyaradzimagodo398324 күн бұрын

    thank you, well explained. Created an express application to create DAGs programatically but the endpoints are not working

  • @CarbonsHDTuts
    @CarbonsHDTuts27 күн бұрын

    This is really awesome and I love the entire video and always love content from you guys and girls but could I please give some constructive feedback?

  • @mettuvamshidhar1389
    @mettuvamshidhar1389Ай бұрын

    Is it possible to get the list of variables pushed through xcom push in first task (here extracting lets say) And can we pull that varibales list xcom_pull and have it as a group Dynamically (instead of A, B, C)??

  • @bilalmsd07
    @bilalmsd07Ай бұрын

    what about if any of the subtasks fails ? how to trigger the error than but also the remining parallel tasks to be run.

  • @yevgenym9204
    @yevgenym9204Ай бұрын

    @Astronomer Please share a direct link to the CLI library you mention (for proper files strcuture) kzread.info/dash/bejne/rIqupLipksrdYaQ.htmlsi=HiJa9Afi-53yLZOG&t=873

  • @Astronomer
    @AstronomerАй бұрын

    You can find documentation on the Astro CLI, including download instructions, here: docs.astronomer.io/astro/cli/overview

  • @rohitnath5545
    @rohitnath5545Ай бұрын

    Do we have a video on how to run airflow using docker on cloud containers. Running locally is fine to learn and test. But the real work is to see how on cloud. Am a consultant and for my clients easier setup is the goal. With airflow i dont see that

  • @Astronomer
    @AstronomerАй бұрын

    Astronomer provides a managed service for running Airflow at scale and in the cloud. You can learn more at astronomer.io/try-astro

  • @marehmanmarehman9431
    @marehmanmarehman9431Ай бұрын

    great work, keep it up.

  • @ryank8463
    @ryank8463Ай бұрын

    Hi, this video is really beneficial. I have some question about the best practive of handling data transmission btw tasks. I am building MLops using airflow. In my model training dag, it contains data preprocess-> model training. So there would be massive data transmission btw this 2 dags. I am using Xcom to transmit data btw them. But there's like a 2G limitation in Xcom. So what's the best practice to deal with this problem? Using a S3 to sned/pull data from tasks? Or should I simply combine these 2 tasks(data preprocess-> model training)? Thank you.

  • @Astronomer
    @AstronomerАй бұрын

    Thank you! For passing larger amounts of data between tasks you have two main options: a custom XCom backend or writing to intermediary storage directly from within the tasks. In general we recommend a custom XCom backend as a best practice in these situations, because you can keep your DAG code the same, the change happens in how the data sent to and retrieved from XCom is processed. You can find a tutorial on how to set up a custom XCom backend here: docs.astronomer.io/learn/xcom-backend-tutorial. Merging the tasks is generally not recommended because it makes it harder to get observability and rerun individual actions.

  • @ryank8463
    @ryank8463Ай бұрын

    @@Astronomer Hi, Thanks for your valuable reply. I would also like to ask what level of granularity should we aim for when allocating tasks. Since the more tasks there are, the more push/pull data from the external storage happens, and when the data is large, it brings some level of network overhead.

  • @christianfernandez5717
    @christianfernandez5717Ай бұрын

    Great video. Would also be interested in a webinar regarding scaling the Airflow database since I'm having some difficulties of my own with that.

  • @Astronomer
    @AstronomerАй бұрын

    Noted, thanks for the suggestion! If it's helpful, you can check out our guide on the metadata db docs.astronomer.io/learn/airflow-database. Using a managed service like Astro is also one way many companies avoid scaling issues with Airflow.

  • @dan-takacs
    @dan-takacs2 ай бұрын

    great video. I'm trying to make this work with LivyOperator do you know if it can be expanded or partial arguments supplied to it?

  • @Astronomer
    @Astronomer2 ай бұрын

    It should work. Generally you can map over any type of operator, but not that some parameters can't be mapped over (e.g. BaseOperator params). More here: docs.astronomer.io/learn/dynamic-tasks

  • @looklook6075
    @looklook60752 ай бұрын

    32:29 why "test' connection button is disabled. SO frustrating. Aifrflow makes it so hard to connect to anything. Not intuitive at all. And your video just skipped on how to enable "test". And ask me to contact my deployment admin. lol, I am the deployment admin. Can you show me how? I checked its website and the documentation is not helpful at all. I have been stuck for over a week on how to connect airflow to an MSSQL Sever.

  • @Astronomer
    @Astronomer2 ай бұрын

    The `test` connection button is disabled by default starting in Airflow 2.7 for security reasons. You can enable it by setting the test_connection core config to Enabled. docs.astronomer.io/learn/connections#test-a-connection. We also have some guidance on connecting to an MSSQL server, although the process can vary depending on your exact setup: docs.astronomer.io/learn/connections/ms-sqlserver

  • @quintonflorence6492
    @quintonflorence6492Ай бұрын

    @@Astronomer Hi, where can I find the core config to make this update? I'm currently using Astro CLI. I'm not seeing this setting in the two .yaml files in the project. Thank you.

  • @saritabasye5254
    @saritabasye52542 ай бұрын

    *promosm* 💔

  • @pichaibravo
    @pichaibravo2 ай бұрын

    Is it good to return df many times in Airflow?

  • @Astronomer
    @Astronomer2 ай бұрын

    It's generally fine to pass dataframes in between your Airflow tasks, as long as you make sure your infrastructure can support the size of your data. If you use XCom, it's a good idea to consider a custom XCom backend for managing dataframes as Airflow's metadata db isn't set up for this specifically.

  • @ziedsalhi4503
    @ziedsalhi45033 ай бұрын

    Hi, I have already an existing airflow project, so how can use Astro CLI to run my project ?

  • @greatotool
    @greatotool3 ай бұрын

    is the git repository public?

  • @Astronomer
    @Astronomer3 ай бұрын

    Yes! You can find it here: github.com/astronomer/webinar-demos/tree/best-practices-prod

  • @greatotool
    @greatotool3 ай бұрын

    Thakns!!🙂@@Astronomer

  • @user-by8um8bk1w
    @user-by8um8bk1w3 ай бұрын

    please, share repository

  • @Astronomer
    @Astronomer3 ай бұрын

    The repo is here: github.com/astronomer/webinar-demos/tree/best-practices-prod

  • @mcpiatkowski
    @mcpiatkowski3 ай бұрын

    That is great intro and overview of Airflow for beginners! I very much like the datasets concepts and the ability to see data lineage. However, I haven't found the solution for how to make a triggered pipe, that is dataset aware, to be executed with the parent dag execution date. Is it even possible at the moment?

  • @Astronomer
    @Astronomer3 ай бұрын

    Thanks! And that is a great question. It is not possible to have the downstream Dataset-triggered DAG have the same logical_date (the new paramater equivalent to the old execution_date ) as the DAG that caused the update to the dataset, but it is possible to pull that date from the downstream DAG by accessing context["triggering_dataset_events"]: @task def print_triggering_dataset_events(**context): triggering_dataset_events = context["triggering_dataset_events"] for dataset, dataset_list in triggering_dataset_events.items(): print(dataset, dataset_list) print(dataset_list[0].source_dag_run.logical_date) print_triggering_dataset_events() If you use the above in your downstream DAG you can get that logical_date/execution_date to use in your Airflow tasks. For more info and an example with Jinja templating see: airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html#fetching-information-from-a-triggering-dataset-event .

  • @mcpiatkowski
    @mcpiatkowski3 ай бұрын

    @@Astronomer That is amazing! You are my hero for life! Thank you!

  • @veereshk6065
    @veereshk60653 ай бұрын

    Hi, Thank you for detailed demo. I just started exploring dynamic task mapping and I have below requirement where I need to get the data from metadata table and create list of dictionary. [ { 'colA' : 'valueA', 'colB' : 'valueB', 'colC' : 'valueC', 'colD' : 'valueD', }, { 'colA' : 'valueA', 'colB' : 'valueB', 'colC' : 'valueC', 'colD' : 'valueD', }, { 'colA' : 'valueA', 'colB' : 'valueB', 'colC' : 'valueC', 'colD' : 'valueD', }, ] The above structure can be generated using fetch_metadata_task (combination of BigQueryHook and PythonOperator). Now the Question is, how do I generate the dynamic tasks using the above list of dictionary. for each dictionary I want to perform set of tasks ex:GCSToBigQueryOperator, BigQueryValueCheckOperator, BigQueryToBigQueryCopyOperator etc. The sample dag dependancy look like this: start_task >> fetch_metadata_task fetch_metadata_task >> [GCSToBigQueryOperator_table1 >> BigQueryValueCheckOperator_table1 >> BigQueryToBigQueryCopyOperator_table1 >> connecting_dummy_task ] fetch_metadata_task >> [GCSToBigQueryOperator_table2 >> BigQueryValueCheckOperator_table2 >> BigQueryToBigQueryCopyOperator_table2 >> connecting_dummy_task ] fetch_metadata_task >> [GCSToBigQueryOperator_table3 >> BigQueryValueCheckOperator_table3 >> BigQueryToBigQueryCopyOperator_table3 >> connecting_dummy_task ] connecting_dummy_task >> BigQueryExecuteTask >> end_task

  • @veereshk6065
    @veereshk60653 ай бұрын

    Hi All, Thank you for detailed demo. I just started exploring dynamic task mapping and I have below requirement where I need to get the data from metadata table and create list of dictionary. [ { 'colA' : 'valueA', 'colB' : 'valueB', 'colC' : 'valueC', 'colD' : 'valueD', }, { 'colA' : 'valueA', 'colB' : 'valueB', 'colC' : 'valueC', 'colD' : 'valueD', }, { 'colA' : 'valueA', 'colB' : 'valueB', 'colC' : 'valueC', 'colD' : 'valueD', }, ] The above structure can be generated using fetch_metadata_task (combination of BigQueryHook and PythonOperator). Now the Question is, how do I generate the dynamic tasks using the above list of dictionary. for each dictionary I want to perform set of tasks ex:GCSToBigQueryOperator, BigQueryValueCheckOperator, BigQueryToBigQueryCopyOperator etc. The sample dag dependancy look like this: start_task >> fetch_metadata_task fetch_metadata_task >> [GCSToBigQueryOperator_table1 >> BigQueryValueCheckOperator_table1 >> BigQueryToBigQueryCopyOperator_table1 >> connecting_dummy_task ] fetch_metadata_task >> [GCSToBigQueryOperator_table2 >> BigQueryValueCheckOperator_table2 >> BigQueryToBigQueryCopyOperator_table2 >> connecting_dummy_task ] fetch_metadata_task >> [GCSToBigQueryOperator_table3 >> BigQueryValueCheckOperator_table3 >> BigQueryToBigQueryCopyOperator_table3 >> connecting_dummy_task ] connecting_dummy_task >> BigQueryExecuteTask >> end_task

  • @ayushikhanna1094
    @ayushikhanna10943 ай бұрын

    Is there any option available in airflow ui to auto trigger.

  • @78salieri78
    @78salieri783 ай бұрын

    Great video, with many examples, much appreciated!

  • @manCoder
    @manCoder4 ай бұрын

    Very nice introductory video. thanks a lot for this.

  • @vladislavzadvornev4548
    @vladislavzadvornev45484 ай бұрын

    Hi. Thank you for a great video. I have one question. Can I somehow start astro locally inside of my existing project that already follows a different structure? I would very much like to benefit from a conveneience of astro cli, but there's no way I want to modify a structure of a project that has been in place for more than 1.5 years :)

  • @averychen4633
    @averychen46334 ай бұрын

    you are the best

  • @user-ee6hz2zl9s
    @user-ee6hz2zl9s4 ай бұрын

    I am able to see dags in Sync Container and in Scheduler but not in WebUI. I am using Kubernetes Executor and btnami/airflow image

  • @cloudlover9186
    @cloudlover91865 ай бұрын

    I am running in to below problem , will it be acheived by time table concepts . i have same dag which should satisfy the below schedule intervals schedule interval = '30 1,4,7,10,13,16,19,22 * * *' & '00 3,6,12,15,18,21,00 * * *' , Please help and guide.

  • @Astronomer
    @Astronomer4 ай бұрын

    Would you mind sharing info on what type of scheduling interval you'd like to achieve? Not sure what it is based on that string unfortunately!

  • @cloudlover9186
    @cloudlover91864 ай бұрын

    @@Astronomer Hi , we are in process of changing a daily schedule to 90 mins frequency dag and expectation of dag to run at 00:00, 01:30 , 03:00 so on and also another new dag which is of same 90 mins frequency should run at 00:20, 01:50, 03:10 etc.., point is if i have hard coded start date as future date for example today is 01/10 i will hard code as 01/11 (2024/01/11,00,00) any future change is not impacting the start date schedule , having said we have advised to research more not to hard code start date .FYI we are using timedelta(minutes=90) in schedule interval attribute. if we use current date logic , during deployment time ,(deployment time > start date time) it is executing immediatley , how we can over come this , please help.

  • @bananaboydan3642
    @bananaboydan36425 ай бұрын

    Airflow wasnt able to locate my existing python scripts. I receive this error: ImportError: cannot import name 'weeklyExtract' from 'dags' (unknown location)

  • @Astronomer
    @Astronomer4 ай бұрын

    Would you mind sharing how you're referencing the script in the code? And where are you python scripts stored? Typically you'll need to create a sub-folder within the DAG's folder to store them and then you can reference them from that path.

  • @richie.edwards
    @richie.edwards5 ай бұрын

    Thank you. Any link to a consumable resource discussingt the topic? The video quality is very bad.

  • @Astronomer
    @Astronomer4 ай бұрын

    Yes definitely, apologies for that! Check out this link it links out to a few different methods of managing your secrets: docs.astronomer.io/astro/secrets-management

  • @HarshGupta-wi1zn
    @HarshGupta-wi1zn5 ай бұрын

    Since I am using Azure Astro. There is no astro cli what to do in this case?

  • @Astronomer
    @Astronomer5 ай бұрын

    The Astro CLI works for Azure Astro as well!

  • @HarshGupta-wi1zn
    @HarshGupta-wi1zn5 ай бұрын

    @@Astronomer so where is astro cli in azure?

  • @pushpendudhara7764
    @pushpendudhara77645 ай бұрын

    After adding the python file and html file, and restarting the web server plugin details are visible in Admin > Plugins path. But the View is not populating in cloud composer. Is there anything else need to be performed?

  • @pushpendudhara7764
    @pushpendudhara77645 ай бұрын

    It has to be airflow admin to be able to view the new Menu in web server.

  • @Astronomer
    @Astronomer5 ай бұрын

    Ah thank you for noting that! Have loved your comment so hopefully others can see!

  • @maximilianrausch5193
    @maximilianrausch51935 ай бұрын

    Do you have more resources on how to create plugins?

  • @Astronomer
    @Astronomer5 ай бұрын

    Definitely! Check out this guide for more: docs.astronomer.io/learn/using-airflow-plugins

  • @stevenanderson3896
    @stevenanderson38966 ай бұрын

    "Promo sm" ✌️

  • @illiakaltovich
    @illiakaltovich6 ай бұрын

    Tamara Fingerlinm, your approach of 'Live with an Astronomer' is really cool and organized. I gained some nice insights about the topic I am struggling with. Thank you so much! ❤

  • @Astronomer
    @Astronomer6 ай бұрын

    Thanks so much, Tamara is the best!

  • @RedShipsofSpainAgain
    @RedShipsofSpainAgain6 ай бұрын

    13:45 this is great. But one suggestion: show the assert `Today is {{ execution_date }}` where templating is not working (slide 12) next to where the templating is working (slide 14) so that the viewer audience can compare the two easily side-by-side.

  • @Astronomer
    @Astronomer6 ай бұрын

    Thanks for the suggestion!

  • @Cam-xu1sq
    @Cam-xu1sq6 ай бұрын

    You skipped out a HUGE amount in the middle lol, seems to be a very common occurrence with these videos tbh.

  • @Astronomer
    @Astronomer6 ай бұрын

    What do you mean by skipped out Cam?

  • @Cam-xu1sq
    @Cam-xu1sq6 ай бұрын

    @@Astronomer You've missed a few steps, also, this demo is a little outdated as the latest version of Astronomer.Cosmos has had a complete re-work so that the DbtTaskGroup function does everything, however, whenever I try test it out I get a weird error with JaffleShop: Database Error in model orders (models/orders.sql) improper relation name (too many dotted names): raw_csvs.testing_dbt.raw_csvs.orders__dbt_backup This error only occurs for the dbt backups for staging tables (doesn't impact views or seed tables) It's trying to query using Schema.Db_Name,Schema.Table which obviously throws an error because Schema should come after DB_Name I don't get this error if I use the only airflow-dbt python package to do seed, snapshot, test and run commands so I've kept using those for now. If you can explain why I'm getting that error that'd be awesome because I don't understand... I removed schema from my airflow conn object called cams_db and pass in the schema and db_name with the profile_config, however, I still get the same error which is frustrating.

  • @maximilianrausch5193
    @maximilianrausch51937 ай бұрын

    Are the code examples available?

  • @Astronomer
    @Astronomer6 ай бұрын

    Yes they are coming, apologies the blog hasn't been published yet, but will publish when they are!

  • @dani2500d
    @dani2500d7 ай бұрын

    Hey, Awesome webinar! Thank you! I do have one question about the best practices of structuring a DAG. So, It is better to put the task implementations (python operators) into a separate file. If my tasks require a lot of imports, is it better to import inside every task (method) or is it fine to import it all on the top level of the tasks file?

  • @Astronomer
    @Astronomer7 ай бұрын

    I honestly usually just add them all to the top of DAG file as Taskflow tasks, instead of pulling in from files, but if you're using the same python functions in multiple DAG's, the import method is probably best for you!

  • @user-ij5cf4vs5c
    @user-ij5cf4vs5c7 ай бұрын

    Hello, My test button is disabled could you tell me how to fix that issue, i dont find anything in the web for troubleshooting that kind of problem ?

  • @Astronomer
    @Astronomer7 ай бұрын

    The Test button was disabled in the latest release of airflow, but you can re-enable, check out the following docs link: docs.astronomer.io/learn/connections#:~:text=You%20can%20enable%20connection%20testing,Enabled%20in%20your%20Airflow%20environment.

  • @vivekmeka6913
    @vivekmeka69137 ай бұрын

    This is great.

  • @Astronomer
    @Astronomer7 ай бұрын

    Thanks Vivek!

  • @jamesanselm220
    @jamesanselm2207 ай бұрын

    How would you modify the DAG if you want to have a single task that runs after all the descendants of `load_files_to_snowflake[X]` have run, regardless of how many results from `get_s3_files`? For example, say I want a task to run at the end that sends a message to a SQS saying that all the processing has been completed.

  • @Astronomer
    @Astronomer7 ай бұрын

    You would just set the relationship between loadfiles and get_s3 like normal, but add a trigger rule all success to get_s3 files so it'll only run after all the load file functions have completed successfully

  • @Astronomer
    @Astronomer7 ай бұрын

    Learn more about Airflow 2.7 here! www.astronomer.io/blog/introducing-airflow-2-7/

  • @TheDimanoid999
    @TheDimanoid9997 ай бұрын

    BRO, how can you upload a video that includes scripts in 360p!

  • @Astronomer
    @Astronomer7 ай бұрын

    Hahahaha sorry man, hopefully our more recent videos are up to your standard!

  • @richie.edwards
    @richie.edwards5 ай бұрын

    @@Astronomer is there a recent video with better quality covering the same topic? Or slides? Thanks!

  • @rafaelg8238
    @rafaelg82387 ай бұрын

    Hi Marc. If possible can you to show how integrated airflow, k8s and Argocd end-to-end please? Besides that how change default port '8080' in 'webserver > startupProbe' to do healthcheck? I am running command 'k port-forward svc/airflow-webserver 8081:8080 --namespace airflow' and I would like that healthcheck as well check this port. And how configure dag folder in localhost without sync git?