From 29102714f45f81a93bf1d4759a51df5b63dbec73 Mon Sep 17 00:00:00 2001 From: Sergey Motornyuk Date: Fri, 6 Sep 2024 23:42:57 +0300 Subject: [PATCH] feat: SQLAlchemy v2 support --- ckanext/harvest/harvesters/base.py | 10 +++-- ckanext/harvest/logic/action/update.py | 58 +++++++++++++++----------- ckanext/harvest/model/__init__.py | 1 - 3 files changed, 40 insertions(+), 29 deletions(-) diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 3a51bbad..e31f8f7e 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -4,7 +4,7 @@ import re import uuid -from sqlalchemy import exists, and_ +import sqlalchemy as sa from sqlalchemy.orm import contains_eager from ckantoolkit import config @@ -344,7 +344,9 @@ def _create_or_update_package(self, package_dict, harvest_object, # plugin) harvest_object.add() - model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED') + model.Session.execute( + sa.text('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED') + ) model.Session.flush() new_package = p.toolkit.get_action( @@ -400,10 +402,10 @@ def last_error_free_job(cls, harvest_job): .filter(HarvestJob.status == 'Finished') .filter(HarvestJob.id != harvest_job.id) .filter( - ~exists().where( + ~sa.exists().where( HarvestGatherError.harvest_job_id == HarvestJob.id)) .outerjoin(HarvestObject, - and_(HarvestObject.harvest_job_id == HarvestJob.id, + sa.and_(HarvestObject.harvest_job_id == HarvestJob.id, HarvestObject.current == False, # noqa: E712 HarvestObject.report_status != 'not modified')) .options(contains_eager(HarvestJob.objects)) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 7dce1f57..4006a3bd 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -7,8 +7,8 @@ import logging import datetime +import sqlalchemy as sa from ckantoolkit import config -from sqlalchemy import and_, or_ from urllib.parse import urljoin from ckan.lib.search.index import PackageSearchIndex @@ -125,7 +125,7 @@ def harvest_source_clear(context, data_dict): select package_id from harvest_object where harvest_source_id = '{harvest_source_id}'));'''.format( harvest_source_id=harvest_source_id) - result = model.Session.execute(sql) + result = model.Session.execute(sa.text(sql)) ids = [] for row in result: ids.append(row[0]) @@ -137,14 +137,18 @@ def harvest_source_clear(context, data_dict): where harvest_source_id = '{harvest_source_id}');'''.format( harvest_source_id=harvest_source_id) + if toolkit.check_ckan_version(max_version='2.10.99'): + sql += ''' + delete from resource_revision where package_id in ( + select id from package where state = 'to_delete'); + ''' + # CKAN-2.3 or above: delete resource views, resource revisions & resources if toolkit.check_ckan_version(min_version='2.3'): sql += ''' delete from resource_view where resource_id in ( select id from resource where package_id in ( select id from package where state = 'to_delete')); - delete from resource_revision where package_id in ( - select id from package where state = 'to_delete'); delete from resource where package_id in ( select id from package where state = 'to_delete'); ''' @@ -172,6 +176,24 @@ def harvest_source_clear(context, data_dict): and context = 'Package'; ''' + if toolkit.check_ckan_version(max_version='2.10.99'): + sql += ''' + delete from package_tag_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from member_revision where table_id in ( + select id from package where state = 'to_delete'); + delete from package_extra_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from package_revision where id in ( + select id from package where state = 'to_delete'); + delete from package_relationship_revision where subject_package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship_revision where object_package_id in ( + select id from package where state = 'to_delete'); + delete from package_extra where package_id in ( + select id from package where state = 'to_delete'); + ''' + sql += ''' delete from harvest_object_error where harvest_object_id in ( select id from harvest_object @@ -183,22 +205,8 @@ def harvest_source_clear(context, data_dict): delete from harvest_gather_error where harvest_job_id in ( select id from harvest_job where source_id = '{harvest_source_id}'); delete from harvest_job where source_id = '{harvest_source_id}'; - delete from package_tag_revision where package_id in ( - select id from package where state = 'to_delete'); - delete from member_revision where table_id in ( - select id from package where state = 'to_delete'); - delete from package_extra_revision where package_id in ( - select id from package where state = 'to_delete'); - delete from package_revision where id in ( - select id from package where state = 'to_delete'); delete from package_tag where package_id in ( select id from package where state = 'to_delete'); - delete from package_extra where package_id in ( - select id from package where state = 'to_delete'); - delete from package_relationship_revision where subject_package_id in ( - select id from package where state = 'to_delete'); - delete from package_relationship_revision where object_package_id in ( - select id from package where state = 'to_delete'); delete from package_relationship where subject_package_id in ( select id from package where state = 'to_delete'); delete from package_relationship where object_package_id in ( @@ -226,7 +234,7 @@ def harvest_source_clear(context, data_dict): sql += ''' commit; ''' - model.Session.execute(sql) + model.Session.execute(sa.text(sql)) # Refresh the index for this source to update the status object get_action('harvest_source_reindex')(context, {'id': harvest_source_id}) @@ -408,7 +416,7 @@ def harvest_source_job_history_clear(context, data_dict): COMMIT; '''.format(harvest_source_id=harvest_source_id) - model.Session.execute(sql) + model.Session.execute(sa.text(sql)) # Refresh the index for this source to update the status object get_action('harvest_source_reindex')(context, {'id': harvest_source_id}) @@ -529,8 +537,8 @@ def harvest_objects_import(context, data_dict): .join(Package) .filter(HarvestObject.current == True) # noqa: E712 .filter(Package.state == u'active') - .filter(or_(Package.id == package_id_or_name, - Package.name == package_id_or_name))) + .filter(sa.or_(Package.id == package_id_or_name, + Package.name == package_id_or_name))) join_datasets = False else: last_objects_ids = \ @@ -671,7 +679,7 @@ def harvest_jobs_run(context, data_dict): num_objects_in_progress = \ session.query(HarvestObject.id) \ .filter(HarvestObject.harvest_job_id == job['id']) \ - .filter(and_((HarvestObject.state != u'COMPLETE'), + .filter(sa.and_((HarvestObject.state != u'COMPLETE'), (HarvestObject.state != u'ERROR'))) \ .count() @@ -976,7 +984,9 @@ def harvest_source_reindex(context, data_dict): del context['extras_as_string'] context.update({'ignore_auth': True}) package_dict = logic.get_action('harvest_source_show')( - context, {'id': harvest_source_id}) + dict(context, validate=False, use_cache=False), + {'id': harvest_source_id}, + ) log.debug('Updating search index for harvest source: %s', package_dict.get('name') or harvest_source_id) diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index c347cb48..aa582275 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -360,7 +360,6 @@ def harvest_object_before_insert_listener(mapper, connection, target): if not target.harvest_source_id or not target.source: if not target.job: raise Exception("You must define a Harvest Job for each Harvest Object") - target.source = target.job.source target.harvest_source_id = target.job.source.id