Skip to content

Commit

Permalink
change portal tables
Browse files Browse the repository at this point in the history
  • Loading branch information
tekkisse committed Jul 5, 2024
1 parent aacedc2 commit 29e8979
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 4 deletions.
3 changes: 2 additions & 1 deletion dags/modules/utils/minioevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ def unpack_minio_event(message):
s3_object = records["s3"]["object"]
etag = s3_object["eTag"]
key = s3_object["key"]
size = s3_object["size"]

return bucket, key, etag
return bucket, key, etag, size


def decode_minio_event(bucket, key, etag):
Expand Down
52 changes: 49 additions & 3 deletions dags/register_minio_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from airflow.hooks.base import BaseHook

from modules.providers.operators.rabbitmq import RabbitMQPythonOperator
from modules.utils.minioevent import unpack_minio_event
from modules.utils.minioevent import unpack_minio_event, decode_minio_event

with DAG(
dag_id="DLM_register_minio_objects",
Expand All @@ -29,7 +29,7 @@ def process_event(message):
logging.info("Processing message!")
logging.info(f"message={message}")

bucket, key, etag = unpack_minio_event(message)
bucket, key, etag, size = unpack_minio_event(message)

# Register eTag in postgres if not already there

Expand All @@ -51,8 +51,34 @@ def process_event(message):
cur.execute(sql)
conn.commit()
cur.close()

# Register the file
obj = decode_minio_event(bucket, key, etag)
folder = obj['head_path']
filename = obj['filename']
extension = obj['extension']

logging.info(f"register object = {etag}")

sql = '''
INSERT INTO loadingbay (key,etag,objsize,deleted,folder,filename,extension)
SELECT '{key}','{etag}','{size}',false,'{folder}','{filename}','{extension}'
WHERE NOT EXISTS (SELECT 1 FROM loadingbay WHERE etag = '{etag}' );
'''
cur = conn.cursor()
cur.execute(sql)
conn.commit()
cur.close()

sql2 = '''UPDATE loadingbay set updated=NOW() WHERE etag = '{etag}' '''
cur = conn.cursor()
cur.execute(sql2)
conn.commit()
cur.close()

conn.close()


consume_events = RabbitMQPythonOperator(
func=process_event,
task_id="consume_events",
Expand All @@ -75,11 +101,31 @@ def process_event(message):
dag=dag,
)

create_main_table_task = PostgresOperator(
task_id='create_register_table',
postgres_conn_id='pg_conn',
sql='''
CREATE TABLE IF NOT EXISTS loadingbay (
id bigserial,
updated timestamp,
key VARCHAR(350),
etag VARCHAR(100),
objsize NUMERIC,
deleted boolean,
deletedDate timestamp,
folder VARCHAR(600),
filename VARCHAR(150),
extension VARCHAR(50)
);
''',
dag=dag,
)

# If the consumer task fails and isn't restarted, restart the whole DAG
restart_dag = TriggerDagRunOperator(
task_id="restart_dag",
trigger_dag_id=dag.dag_id,
trigger_rule=TriggerRule.ALL_DONE
)

create_register_table_task >> consume_events >> restart_dag
create_register_table_task >> create_main_table_task >> consume_events >> restart_dag

0 comments on commit 29e8979

Please sign in to comment.