From c6b5afe36729df840b771b3853d022dab2dec285 Mon Sep 17 00:00:00 2001 From: hyejiyu Date: Mon, 23 Sep 2024 23:51:33 +0900 Subject: [PATCH] Increase the number of consumers and queues --- airflow/dag.py | 61 +++++++++++++++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/airflow/dag.py b/airflow/dag.py index fbe91c2..ae491c0 100644 --- a/airflow/dag.py +++ b/airflow/dag.py @@ -28,14 +28,15 @@ def send_post_request(categoryId): print(f"Failed: {response.status_code}, {response.text}") response.raise_for_status() + # DAG의 기본 설정 default_args = { 'owner': 'khuda', # DAG 소유자 'depends_on_past': False, # 이전 DAG 실패 여부에 의존하지 않음 - 'email': ['dbgpwl34@gmail.com'], # 수신자 이메일 - "email_on_success": True, # 성공 시 이메일 전송 - 'email_on_failure': True, # 실패 시 이메일 전송 - 'email_on_retry': True, # 재시도 시 이메일 전송 +# 'email': ['dbgpwl34@gmail.com'], # 수신자 이메일 +# "email_on_success": True, # 성공 시 이메일 전송 +# 'email_on_failure': True, # 실패 시 이메일 전송 +# 'email_on_retry': True, # 재시도 시 이메일 전송 'retries': 1, # 실패 시 재시도 횟수 'retry_delay': timedelta(minutes=5) # 재시도 간격 } @@ -45,36 +46,46 @@ def send_post_request(categoryId): 'HomePlus_Crawling_DAG', # DAG의 이름 default_args=default_args, # 기본 인자 설정 description='HomePlus Crawling', # DAG 설명 - schedule_interval=timedelta(days=1), # 실행 주기 (매일 1회) + schedule_interval='0 9,16,22 * * *', # 실행 주기 (매일 09:00, 16:00, 22:00 정각) start_date=datetime(2024, 9, 20), # 시작 날짜 catchup=False # 시작 날짜부터 현재까지의 미실행 작업 실행 여부 ) as dag: - run_consumer_task1 = BashOperator( - task_id='run-consumer-1', # Task 이름 - bash_command="python3 /home/patturning1/mq_consumer1.py &", - do_xcom_push=False - ) + # run_consumer_task1 = BashOperator( + # task_id='run-consumer-1', # Task 이름 + # bash_command="python3 /home/patturning1/mq_consumer1.py &", + # do_xcom_push=False + # ) - run_consumer_task2 = BashOperator( - task_id='run-consumer-2', # Task 이름 - bash_command="python3 /home/patturning1/mq_consumer2.py &", - do_xcom_push=False - ) + # run_consumer_task2 = BashOperator( + # task_id='run-consumer-2', # Task 이름 + # bash_command="python3 /home/patturning1/mq_consumer2.py &", + # do_xcom_push=False + # ) - run_consumer_task3 = BashOperator( - task_id='run-consumer-3', # Task 이름 - bash_command="python3 /home/patturning1/mq_consumer3.py &", - do_xcom_push=False - ) + # run_consumer_task3 = BashOperator( + # task_id='run-consumer-3', # Task 이름 + # bash_command="python3 /home/patturning1/mq_consumer3.py &", + # do_xcom_push=False + # ) @task def send_post_request_HOMEPLUS_task(category_id): - return send_post_request(category_id) - - category_ids = list(range(100001, 100078)) - - [run_consumer_task1, run_consumer_task2, run_consumer_task3, send_post_request_HOMEPLUS_task.expand(category_id=category_ids)] + return send_post_request(category_id) + + # TaskFlow API로 task 정의 + @task + def generate_queue_values(): + return [{"consumer": "HomePlus.product.queue.1"}, {"consumer": "HomePlus.product.queue.1"}, + {"consumer": "HomePlus.product.queue.2"}, {"consumer": "HomePlus.product.queue.2"}, + {"consumer": "HomePlus.product.queue.3"}, {"consumer": "HomePlus.product.queue.3"}] + # BashOperator에서 expand로 받은 값을 사용 + run_consumer_task = BashOperator.partial( + task_id="run-consumer-task", + bash_command="python3 /home/patturning1/consumer_test.py {{ params.param1 }}", # 템플릿을 사용하여 매핑된 값 사용 + ).expand(params=generate_queue_values()) + category_ids = list(range(100001, 100078)) + [run_consumer_task, send_post_request_HOMEPLUS_task.expand(category_id=category_ids)] \ No newline at end of file