Skip to content

Commit

Permalink
Make it possible to revoke pending tasks (#2343)
Browse files Browse the repository at this point in the history
  • Loading branch information
henrikek authored Dec 11, 2024
1 parent 9de4378 commit bf88d01
Show file tree
Hide file tree
Showing 14 changed files with 64 additions and 38 deletions.
26 changes: 11 additions & 15 deletions ESSArch_Core/WorkflowEngine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ def retry(self, direct=True):
none
"""

logger = logging.getLogger('essarch.WorkflowEngine')
logger.info('Retrying step {} ({})'.format(self.name, self.pk))
child_steps = self.child_steps.all()

tasks = self.tasks(manager='by_step_pos').filter(
Expand Down Expand Up @@ -338,19 +340,12 @@ def resume(self, direct=True):
"""

logger = logging.getLogger('essarch.WorkflowEngine')
logger.debug('Resuming step {} ({})'.format(self.name, self.pk))
ProcessTask.objects.filter(
logger.info('Resuming step {} ({})'.format(self.name, self.pk))
for t in ProcessTask.objects.filter(
processstep__in=self.get_descendants(include_self=True),
status__in=[celery_states.PENDING, celery_states.FAILURE, celery_states.REVOKED],
).update(
status=celery_states.PENDING,
time_started=None,
time_done=None,
traceback='',
exception='',
progress=0,
result=None,
)
):
t.reset()
child_steps = self.get_children()

step_descendants = self.get_descendants(include_self=True)
Expand Down Expand Up @@ -720,7 +715,10 @@ def get_remote_copy(self, session, host):
return r

def reset(self):
logger = logging.getLogger('essarch.WorkflowEngine')
logger.info('Reset task ({})'.format(self.pk))
self.status = celery_states.PENDING
self.celery_id = uuid.uuid4()
self.time_started = None
self.time_done = None
self.traceback = ''
Expand Down Expand Up @@ -778,19 +776,17 @@ def run(self):

def revoke(self):
logger = logging.getLogger('essarch.WorkflowEngine')
logger.debug('Revoking task ({})'.format(self.pk))
logger.info('Revoke task ({})'.format(self.pk))
current_app.control.revoke(str(self.celery_id), terminate=True)
self.status = celery_states.REVOKED
self.celery_id = uuid.uuid4()
self.save()
logger.info('Revoked task ({})'.format(self.pk))

def retry(self):
"""
Retries the task
"""
logger = logging.getLogger('essarch.WorkflowEngine')
logger.debug('Retrying task ({})'.format(self.pk))
logger.info('Retrying task ({})'.format(self.pk))
self.reset()
return self.run()

Expand Down
2 changes: 1 addition & 1 deletion ESSArch_Core/WorkflowEngine/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class Meta:
'information_package_str', 'eager',
)
read_only_fields = (
'id', 'progress', 'time_created', 'time_started', 'time_done', 'retried',
'id', 'time_created', 'time_started', 'time_done', 'retried',
)
extra_kwargs = {
'id': {
Expand Down
22 changes: 22 additions & 0 deletions ESSArch_Core/WorkflowEngine/signals.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import logging

from celery.signals import task_received, task_revoked
from django.db.models.signals import post_save, pre_save
from django.dispatch import receiver

Expand All @@ -24,3 +27,22 @@ def step_post_save(sender, instance, created, **kwargs):
instance.parent.clear_cache()
except AttributeError:
pass


@task_received.connect
def task_received_handler(request=None, **kwargs):
logger = logging.getLogger('essarch')
try:
t = ProcessTask.objects.get(celery_id=request.task_id)
logger.debug('{} signal task_received status is {}'.format(request.task_id, repr(t.status)))
if t.status == 'REVOKED':
t.revoke()
except ProcessTask.DoesNotExist:
logger.debug('{} signal task_received without ProcessTask'.format(request.task_id))
pass


@task_revoked.connect
def task_revoked_handler(request=None, **kwargs):
logger = logging.getLogger('essarch')
logger.debug('{} signal task_revoked'.format(request.id))
4 changes: 2 additions & 2 deletions ESSArch_Core/WorkflowEngine/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ def run(self, request, pk=None):
@action(detail=True, methods=['post'], permission_classes=[CanRevoke])
def revoke(self, request, pk=None):
obj = self.get_object()
if obj.status != celery_states.STARTED:
raise exceptions.ParseError('Only running tasks can be revoked')
if obj.status not in [celery_states.STARTED, celery_states.PENDING]:
raise exceptions.ParseError('Only running or pending tasks can be revoked')

obj.revoke()
return Response({'status': 'revoked task'})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
<table class="table" style="margin: 5px">
<tbody>
<tr
ng-if="currentStepTask.status == 'FAILURE' && currentStepTask.exception !== null && currentStepTask.exception !== ''"
ng-if="(currentStepTask.status == 'FAILURE' || currentStepTask.status == 'REVOKED') && currentStepTask.exception !== null && currentStepTask.exception !== ''"
>
<td>
<b>{{ ("ERROR.ERROR" | translate) + ":" }}</b>
Expand All @@ -90,7 +90,7 @@
</tbody>
</table>
<button
ng-if="currentStepTask.status == 'FAILURE' && currentStepTask.traceback !== null && currentStepTask.traceback !== ''"
ng-if="(currentStepTask.status == 'FAILURE' || currentStepTask.status == 'REVOKED') && currentStepTask.traceback !== null && currentStepTask.traceback !== ''"
class="btn btn-default"
ng-click="tracebackModal(currentStepTask)"
>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
</td>
</tr>
<tr
ng-if="currentStepTask.status == 'FAILURE' && currentStepTask.exception !== null && currentStepTask.exception !== ''"
ng-if="(currentStepTask.status == 'FAILURE' || currentStepTask.status == 'REVOKED') && currentStepTask.exception !== null && currentStepTask.exception !== ''"
>
<td>
<b>{{ ("ERROR.ERROR" | translate) + ":" }}</b>
Expand All @@ -143,7 +143,7 @@
</tbody>
</table>
<button
ng-if="currentStepTask.status == 'FAILURE' && currentStepTask.traceback !== null && currentStepTask.traceback !== ''"
ng-if="(currentStepTask.status == 'FAILURE' || currentStepTask.status == 'REVOKED') && currentStepTask.traceback !== null && currentStepTask.traceback !== ''"
class="btn btn-default"
ng-click="tracebackModal(currentStepTask)"
>
Expand Down Expand Up @@ -264,14 +264,14 @@ <h4>{{ "STATE_TREE.VALIDATIONS" | translate }}</h4>
<button
class="btn btn-success"
ng-click="myTreeControl.scope.taskStepRedo(currentStepTask)"
ng-if="checkPermission('WorkflowEngine.can_retry') && (currentStepTask.status === 'FAILURE' || currentStepTask.status === 'REVOKED')"
ng-if="checkPermission('WorkflowEngine.can_retry') && (currentStepTask.status == 'FAILURE' || currentStepTask.status == 'REVOKED')"
>
{{ "REDO" | translate }}
</button>
<button
class="btn btn-danger"
ng-click="$ctrl.revokeTask(currentStepTask)"
ng-if="checkPermission('WorkflowEngine.can_revoke') && currentStepTask.status === 'STARTED'"
ng-if="checkPermission('WorkflowEngine.can_revoke') && (currentStepTask.status == 'STARTED' || currentStepTask.status == 'PENDING')"
>
{{ "REVOKE" | translate }}
</button>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<a
ng-click="treeControl.scope.taskStepRedo(row.branch)"
ng-if="row.branch.flow_type == 'task' && row.branch.status == 'FAILURE'"
ng-if="row.branch.flow_type == 'task' && (row.branch.status == 'FAILURE' || row.branch.status == 'REVOKED')"
style="color: #0a0"
>{{ "REDO" | translate }}</a
>
6 changes: 3 additions & 3 deletions ESSArch_Core/ip/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,7 @@ def create_preservation_workflow(self):
return workflow

def create_access_workflow(self, user, tar=False, extracted=False, new=False, object_identifier_value=None,
package_xml=False, aic_xml=False, diff_check=False, edit=False):
package_xml=False, aic_xml=False, diff_check=False, edit=False, responsible=None):
logger = logging.getLogger('essarch.ip')
if new:
dst_object_identifier_value = object_identifier_value or str(uuid.uuid4())
Expand Down Expand Up @@ -1511,7 +1511,7 @@ def create_access_workflow(self, user, tar=False, extracted=False, new=False, ob
}
)

return create_workflow(workflow, self, name='Access Information Package')
return create_workflow(workflow, self, name='Access Information Package', responsible=responsible)

if tar:
try:
Expand Down Expand Up @@ -1981,7 +1981,7 @@ def create_access_workflow(self, user, tar=False, extracted=False, new=False, ob
"queue": worker_queue,
"args": [str(new_aip.pk), str(user.pk), Workarea.ACCESS, tar]
})
return create_workflow(workflow, self, name='Access Information Package')
return create_workflow(workflow, self, name='Access Information Package', responsible=responsible)

def create_migration_workflow(self, temp_path, storage_methods, export_path='', tar=False, extracted=False,
package_xml=False, aic_xml=False, diff_check=True, responsible=None):
Expand Down
4 changes: 4 additions & 0 deletions ESSArch_Core/ip/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1820,6 +1820,9 @@ def access(self, request, pk=None):
raise exceptions.ParseError('IP must either have state "Received" or be archived to be accessed')

data = request.data
user = None
if request and hasattr(request, "user"):
user = request.user

options = ['tar', 'extracted', 'edit']
if ip.package_type == InformationPackage.AIP:
Expand Down Expand Up @@ -1878,6 +1881,7 @@ def access(self, request, pk=None):
aic_xml=data.get('aic_xml', False),
diff_check=data.get('diff_check', False),
edit=data.get('edit', False),
responsible=user,
)
workflow.run()
return Response({'detail': gettext('Accessing {ip}...').format(ip=ip), 'step': workflow.pk})
Expand Down
10 changes: 6 additions & 4 deletions ESSArch_Core/storage/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,8 +744,10 @@ def fastest(self):
output_field=IntegerField(),
)
remote = Case(
When(storage_medium__storage_target__remote_server__isnull=True, then=Value(1)),
When(storage_medium__storage_target__remote_server__isnull=False, then=Value(2)),
When((Q(storage_medium__storage_target__remote_server=None) |
Q(storage_medium__storage_target__remote_server='')), then=Value(1)),
When(~(Q(storage_medium__storage_target__remote_server=None) |
Q(storage_medium__storage_target__remote_server='')), then=Value(2)),
output_field=IntegerField(),
)
storage_type = Case(
Expand All @@ -763,8 +765,8 @@ def fastest(self):
remote=remote,
storage_type=storage_type,
content_location_value_int=content_location_value_int,
).order_by('remote', 'container_order', 'storage_type').natural_sort('storage_medium__medium_id'
).order_by('content_location_value_int')
).order_by('remote', 'container_order', 'storage_type',
'storage_medium__medium_id', 'content_location_value_int')


class StorageObject(models.Model):
Expand Down
2 changes: 1 addition & 1 deletion ESSArch_Core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def get_value_from_path(root, path):
try:
el = get_elements_without_namespace(root, path)[0]
except IndexError:
logger.warning('{path} not found in {root}'.format(path=path, root=root.getroottree().getpath(root)))
logger.debug('{path} not found in {root}'.format(path=path, root=root.getroottree().getpath(root)))
return None

if "@" in path:
Expand Down
4 changes: 3 additions & 1 deletion ESSArch_Core/workflow/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ def AccessAIP(self, aip, storage_object=None, tar=True, extracted=False, new=Fal

aip.access(storage_object, self.get_processtask(), dst=dst)

self.create_success_event("Retrieved information package from storage to workspace")
msg = "Retrieved information package from storage {} to workspace".format(storage_object.storage_medium.medium_id)
self.create_success_event(msg)
return msg


@app.task(bind=True, queue='robot', track=False)
Expand Down
6 changes: 3 additions & 3 deletions requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
asgiref==3.8.1
boto3==1.35.72
boto3==1.35.78
celery[tblib]==5.4.0
cffi==1.17.1
channels==4.2.0
Expand All @@ -9,7 +9,7 @@ click==8.1.3
cryptography==44.0.0
daphne==4.1.2
dj-rest-auth[with-social]==7.0.0
django==5.0.9
django==5.0.10
django-allauth==0.61.1
django-cors-headers==4.6.0
django-countries-plus==2.2.0
Expand Down Expand Up @@ -46,7 +46,7 @@ opf-fido==1.6.1
pyfakefs==5.7.2
python-dateutil==2.8.2
pywin32==308 ; platform_system=='Windows'
redis==5.2.0
redis==5.2.1
regex==2024.11.6
requests==2.32.3
requests-toolbelt==1.0.0
Expand Down
2 changes: 1 addition & 1 deletion requirements/tests.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
coverage==7.6.8
coverage==7.6.9
django-test-without-migrations==0.6
selenium==4.25.0

0 comments on commit bf88d01

Please sign in to comment.