ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Airflow 의 Celery executor 의 custom configuration
    개발/Airflow 2023. 1. 9. 17:47

    Airflow의 executor 를 celery 로 사용중에 configuration 을 수정해야할 경우가 있다. 

    예를 들어, database engine options 를 추가한다던지.. 

     

    이번 case는 celery의 result_backend 로 사용하는 mysql 과의 연결 관리를 위해 custom option을 추가한 case로 추가 설정에 대한 내용과, custom_celery_config를 적용하는 방법을 기술하도록 하겠다. 

     

    celery config를 설정하는 방법은 세가지 정도로 파악을 했는데,

    1. celery config 파일을 수정
    2. python dag 파일에서 직접 celery config를 import
    3. Airflow configuration을 통해, celery config 파일을 import

    1번의 방식은, Airflow docker image를 추가 커스텀해야하는 부분이므로 고려하지 않았고, 3번은 Airflow 사용하는 모든 모듈들의 dag 파일에 적용을 요청해야하는 일이므로 영향도가 커 사용하지 않았습니다. 

    3번에 대한 적용방법을 사용하여 config를 변경해주었습니다. 

     

    Airflow configuration 을 통하여 Celery의 Celery_config_option 을 변경해줄 수 있습니다. 

    AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS: airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

    celery_config_options 의 기본값은 airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG 으로 되어있는데 파일 내용은 아래 파일을 참고. 

    https://github.com/apache/airflow/blob/main/airflow/config_templates/default_celery.py

     

    중요부분만 발췌 하자면, 

    DEFAULT_CELERY_CONFIG = {
        "accept_content": ["json"],
        "event_serializer": "json",
        "worker_prefetch_multiplier": conf.getint("celery", "worker_prefetch_multiplier"),
        "task_acks_late": True,
        "task_default_queue": conf.get("operators", "DEFAULT_QUEUE"),
        "task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"),
        "task_track_started": conf.getboolean("celery", "task_track_started"),
        "broker_url": broker_url,
        "broker_transport_options": broker_transport_options,
        "result_backend": result_backend,
        "worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY"),
        "worker_enable_remote_control": conf.getboolean("celery", "worker_enable_remote_control"),
    }

    으로 기본설정이 선언되어있고, 이곳에 추가가 필요한 config값을 넣어주면 되는데, 이는 Airflow에서 제공하는 기본 파일을 편집하는것으로 추후 업그레이드시 문제가 될 소지가 있습니다. 

     

    그래서 추가로 custom_config 가 포함된 python 파일을 추가하여 해당 파일을 연결해주도록 했습니다. 

    #custom_celery_config.py
    
    from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
    
    CELERY_CONFIG = {
        **DEFAULT_CELERY_CONFIG,
        'database_engine_options': {'pool_size': 5, 'pool_recycle': 1800, 'pool_pre_ping': True},
        'broker_transport_options': {
             'master_name': "mymaster",
             'sentinel_kwargs': { 'password': "password" }
         }
    }

    Airflow에서 설정한 기본 Celery 설정을 해치지 않도록, 기본설정(DEFAULT_CELERY_CONFIG)을 포함하여 추가합니다. 

    위 파일은 result_backend 로 사용중인 mysql 에 대한 연결 option (database_engine_options)과 broker로 사용중인 redis sentinel mode에서 사용하는 option(broker_transport_options) 를 추가한 celery_config 파일입니다. 

     

    위 파일을 Docker image 에 python path 위치에 삽입을 해줘도 되고, 저는 k8s 를 사용하기 때문에 배포 용이성을 위하여 helm chart안에 넣어두어 scheduler, webserver, worker의 container 안에 dags 경로에 mount 해주었습니다. 

     

    위의 파일을 mount 한뒤 최정적으로 Airflow config 설정을 아래와 같이 하면 custom config가 적용이 됩니다.

    AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS: custom_celery_config.CELERY_CONFIG

     

    '개발 > Airflow' 카테고리의 다른 글

    Use Airflow broker with redis sentinel  (0) 2023.01.09
    How to use Airflow celery executor with redis sentinel  (0) 2021.05.21

    댓글

Designed by Tistory.