How to manage Airflow Dags in Production | Dags versioning & Deployment

#apacheairflow #airflowatjagex #jagex #apacheairflowforbeginners #airflowinproduction
In this video, I'll discussing on how we (at Jagex) manage and operate more than 100+ ETL pipelines in Airflow efficiently.
00:00 Intro
00:27 Issues we faced
02:08 Airflow & DAGs versioning
05:47 Airflow Deployment Process
06:51 Automated CICD deployment
08:53 Airflow Dag versioning proposal AIP-36
09:11 Outro
Learn Something New Below:
maxcotec.com/blog/chip-chick-...
maxcotec.com/blog/what-is-tra...
👍 If this video was helpful to you, please don't forget to thumbs-up,
⌨️ leave a comment and
🔗 share with your friends.
⭐ Support our work by hitting that subscribe button.
🔔 hit bell icon to stay alert on more upcoming interesting stuff.
DAG versioning proposal
cwiki.apache.org/confluence/d...

Пікірлер: 17

  • @BalzaelRisingDark
    @BalzaelRisingDark Жыл бұрын

    Very nice enginering of the life cycle. I'm wondering if the release/modifcation of a DAG and subsequent generation of a new Docker Image for Airflow will mean to rollout the installation again every time. In other words how do you manage cases where the current Airflow installation is already running some DAGs and you want to update your DAG repository?

  • @dchandrateja

    @dchandrateja

    Жыл бұрын

    Hi Mattia, based on my knowledge, if you are using a kubernetes executer, the replicasets of the pods are updated wo make sure it is fault tolerant. Hence K8's will let your present image runnning in a seperate replica and updates the other replicas, and will complete the whole process repectively.

  • @mithunmanoharmithun
    @mithunmanoharmithun Жыл бұрын

    How does airflow pick up the DAG from each of the submodules - since on checkout each submodule will be having the folder structure of its source repo?

  • @maxcoteclearning

    @maxcoteclearning

    Жыл бұрын

    Airflow automatically scans for python scripts with airflow.DAG() definition, and if it finds it, it treats it as a DAG. Normally we keep single main.py (with DAG() code) file at root of every submodule repository.

  • @vaib5917
    @vaib5917 Жыл бұрын

    Hi, I really need to know if we put the python script into a container and run it using DockerOperator, how can we pass the values of Variables from AirFlow Admin UI t the container ?? Please help.

  • @maxcoteclearning

    @maxcoteclearning

    Жыл бұрын

    If you want to use dockerOperator for one or all of the DAG tasks, then yes. keep the business logic script (python, bash, java, go or any other language) inside a Docker container image name e.g. my_task_1:latest. Then write a DAG code in python which initializes this DockerOperator task(s). This DAG code will be containarized inside airflow image. Whie initializing the DockerOperator in your DAG code, you can pass AIrflow variables, secrets, connections and xcoms to my_task_1:latest by two ways, 1. via environment variables or 2. command args. See DockerOperator details here airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html

  • @oluwatobitobias
    @oluwatobitobias2 жыл бұрын

    If airflow is running via virtualenv ...shouldn't it be automatically using the lib/python/custom-site of that venv??? And if not...how do i point airflow to the lib directory to look for packages???

  • @maxcoteclearning

    @maxcoteclearning

    2 жыл бұрын

    Yes airflow will use lib/python/custom-site packages (if using venv). But here i am explaining NOT to use the same airflow venv for populating packages required by individual dags. In doing so, we will end-up having a gigantic python venv with loads of py-packages (used by several different dags). This becomes mess and hard to manage especially when cleaning up the venv while dag removal or upgrade.

  • @samsid5223
    @samsid522310 ай бұрын

    Do you faced issue with parallelism. I have set airflow core parallelism to 512, max dag run is 1 max task run for one dag is 128. I have a default pool with 128 slot but still my airflow is not running more than the 32 tasks at time. I'm using helm chart to deploy the airflow.

  • @maxcoteclearning

    @maxcoteclearning

    10 ай бұрын

    No we haven't. Since you have provided very less hints, I'd assume there could be multiple factors causing your issue. 1. the machine where you are running Airflow may not have enough resources (CPU and memory) ? Check scheduler logs to get more hints. Or 2. Different executors (e.g., SequentialExecutor, LocalExecutor, CeleryExecutor, etc.) have different behavior regarding task parallelism. Make sure you are using an executor that supports the desired level of parallelism. e.g. in case of Celery, check if your task queue is not throttled. 3. Could it be because your tasks are dependent on each other due to which you are not seeing desired tasks running in parallel ? 4. whats the value set for AIRFLOW__CORE__DAG_CONCURRENCY ? its the max number of task instances allowed to run concurrently FOR A SINGLE SPECIFIC DAG.

  • @samsid5223

    @samsid5223

    10 ай бұрын

    Thanks for your prompt response. We have enough resources on Kubernetes. I triggered individual dags to check the concurrency. I'm using CeleryExecutor and for task concurrency I have set it up AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG to 128. AIRFLOW__CORE__DAG__CONCURRENCY has been renamed to max_active_tasks_per_dag in newer version . I thought I have only 128 slot and thought could might be creating issue because I was putting parallelism more than the slot available but now I have decreased to 56 but still it's not going beyond 32.

  • @samsid5223

    @samsid5223

    10 ай бұрын

    What about the scheduler log? I don't see any issue do you want me to check for specific errors/warnings. I list down the configuration in airflow ui and pod as well and both the places I can see parallelism is set to 512. What about the celery queue throttle how can I view this? and also do you know what's the general memory usage for running airflow with 50-100 dags. How much memory usually it consumes for all the poda?

  • @maxcoteclearning

    @maxcoteclearning

    9 ай бұрын

    @@samsid5223 I haven't got much experience with CeleryExecutors TBH. But have you tried looking at celery concurrency settings ? specifically celeryconfig.py file max-task-per-child (docs.celeryq.dev/en/stable/userguide/workers.html#max-tasks-per-child-setting) , it could be referered as `--max-tasks-per-child`. I believe this control how many tasks Celery workers can execute concurrently. have you tried increasing celert worker concurrency ? (docs.celeryq.dev/en/stable/userguide/workers.html#concurrency) .

  • @samsid5223

    @samsid5223

    9 ай бұрын

    Thank you for all your responses. I will take a look in celeryconfig but imho max task per child is same as max task per dag and other parallelism settings. This could also be a bug in airflow version 2.5.3. I will try with different versions

  • @rahulbhandari1
    @rahulbhandari1 Жыл бұрын

    So if you have 2000+ dags you are saying to have 2000+ repos ? thats sound like nightmare to manage especially during airflow version upgrades when some variables/operators gets deprecated which needs to be updated in the repo for each dag. Imagine creating 2000 separate prs , one for each dag repo. Also wat about common operator/libs/utils which are used across sub set of dags and may have to updated which will mean updating code for each dag repo . If you have common repo say added as separate repo or part of base image and if some updates needs to be done .

  • @maxcoteclearning

    @maxcoteclearning

    Жыл бұрын

    Not necessarily. Usually single repo generates multiple dags. We have around 120 dags from 50 repos (dynamic dags from configs). Making code compatible with newer versions is always part of life cycle maintenance.

  • @rahulbhandari1

    @rahulbhandari1

    11 ай бұрын

    @@maxcoteclearning We have about 2500 dags running on our airflow cluster . Not all dags can be generated using simple config . If thats the case you have very narrow use case . We do have common factories that we use for some of our similar dags . Having 50 plus repos is just bonkers to manage just 120 dags , "Making code compatible with newer versions is always part of life cycle maintenance." -> yes and the idea is to make the lifecycle management scalable . if u have 1000 repos to manage ur ETL good luck with that . There is reason why at scale mono repo is the preferred approach at least per application . IT democratizes use of common libs, code quality and tests and scale your ci processes. Check implementation of big tech like airbnb, meta, google . Proper tooling and custom ci can achieve everything u mentioned even with single repo