From 29e8979f9b619ac76aa7083462b7c4e89e62aa1a Mon Sep 17 00:00:00 2001 From: Simon Thompson Date: Fri, 5 Jul 2024 14:54:11 +0100 Subject: [PATCH] change portal tables --- dags/modules/utils/minioevent.py | 3 +- dags/register_minio_objects.py | 52 ++++++++++++++++++++++++++++++-- 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/dags/modules/utils/minioevent.py b/dags/modules/utils/minioevent.py index b455df9a..afc64dc1 100644 --- a/dags/modules/utils/minioevent.py +++ b/dags/modules/utils/minioevent.py @@ -8,8 +8,9 @@ def unpack_minio_event(message): s3_object = records["s3"]["object"] etag = s3_object["eTag"] key = s3_object["key"] + size = s3_object["size"] - return bucket, key, etag + return bucket, key, etag, size def decode_minio_event(bucket, key, etag): diff --git a/dags/register_minio_objects.py b/dags/register_minio_objects.py index e79c36c0..9fa583fc 100644 --- a/dags/register_minio_objects.py +++ b/dags/register_minio_objects.py @@ -10,7 +10,7 @@ from airflow.hooks.base import BaseHook from modules.providers.operators.rabbitmq import RabbitMQPythonOperator -from modules.utils.minioevent import unpack_minio_event +from modules.utils.minioevent import unpack_minio_event, decode_minio_event with DAG( dag_id="DLM_register_minio_objects", @@ -29,7 +29,7 @@ def process_event(message): logging.info("Processing message!") logging.info(f"message={message}") - bucket, key, etag = unpack_minio_event(message) + bucket, key, etag, size = unpack_minio_event(message) # Register eTag in postgres if not already there @@ -51,8 +51,34 @@ def process_event(message): cur.execute(sql) conn.commit() cur.close() + + # Register the file + obj = decode_minio_event(bucket, key, etag) + folder = obj['head_path'] + filename = obj['filename'] + extension = obj['extension'] + + logging.info(f"register object = {etag}") + + sql = ''' + INSERT INTO loadingbay (key,etag,objsize,deleted,folder,filename,extension) + SELECT '{key}','{etag}','{size}',false,'{folder}','{filename}','{extension}' + WHERE NOT EXISTS (SELECT 1 FROM loadingbay WHERE etag = '{etag}' ); + ''' + cur = conn.cursor() + cur.execute(sql) + conn.commit() + cur.close() + + sql2 = '''UPDATE loadingbay set updated=NOW() WHERE etag = '{etag}' ''' + cur = conn.cursor() + cur.execute(sql2) + conn.commit() + cur.close() + conn.close() + consume_events = RabbitMQPythonOperator( func=process_event, task_id="consume_events", @@ -75,6 +101,26 @@ def process_event(message): dag=dag, ) + create_main_table_task = PostgresOperator( + task_id='create_register_table', + postgres_conn_id='pg_conn', + sql=''' + CREATE TABLE IF NOT EXISTS loadingbay ( + id bigserial, + updated timestamp, + key VARCHAR(350), + etag VARCHAR(100), + objsize NUMERIC, + deleted boolean, + deletedDate timestamp, + folder VARCHAR(600), + filename VARCHAR(150), + extension VARCHAR(50) + ); + ''', + dag=dag, + ) + # If the consumer task fails and isn't restarted, restart the whole DAG restart_dag = TriggerDagRunOperator( task_id="restart_dag", @@ -82,4 +128,4 @@ def process_event(message): trigger_rule=TriggerRule.ALL_DONE ) - create_register_table_task >> consume_events >> restart_dag + create_register_table_task >> create_main_table_task >> consume_events >> restart_dag