Skip to content

Commit

Permalink
Merge pull request openedx#1733 from openedx/asheehan-edx/cleaning-up…
Browse files Browse the repository at this point in the history
…-lost-transmission-audits

feat: new mark orphaned transmisisions command and logic to remove du…
  • Loading branch information
alex-sheehan-edx authored Apr 10, 2023
2 parents db9ab87 + c9189df commit b5a8dc7
Show file tree
Hide file tree
Showing 12 changed files with 458 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Unreleased
----------
* Nothing

[3.61.10]
---------
feat: new tagging orphaned content tast for integrated channels

[3.61.9]
--------
feat: Serialize and create a viewset for enterpriseCatalogQuery as readonly
Expand Down
2 changes: 1 addition & 1 deletion enterprise/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
Your project description goes here.
"""

__version__ = "3.61.9"
__version__ = "3.61.10"

default_app_config = "enterprise.apps.EnterpriseConfig"
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from logging import getLogger

from django.apps import apps
from django.conf import settings
from django.db.models import Q

from enterprise.api_client.enterprise_catalog import EnterpriseCatalogApiClient
Expand Down Expand Up @@ -299,13 +300,49 @@ def _get_catalog_diff(
# We need to remove any potential create transmissions if the content already exists on the customer's instance
# under a different catalog
for item in items_to_create:
if item.get('content_key') not in existing_content_keys:
# If the catalog system has indicated that the content is new and needs creating, we need to check if the
# content already exists on the customer's instance under a different catalog. If it does, we need to
# check if the content key exists as an orphaned transmission record for this customer and config,
# indicating that the content was previously created but then the config under which it was created was
# deleted.
content_key = item.get('content_key')
orphaned_content = self._get_customer_config_orphaned_content(
max_set_count=1,
content_key=content_key
).first()

# if it does exist as an orphaned content record: 1) don't add the item to the list of items to create,
# 2) swap the catalog uuid of the transmission audit associated with the orphaned record, and 3) mark the
# orphaned record resolved
if orphaned_content:
ContentMetadataTransmissionAudit = apps.get_model(
'integrated_channel',
'ContentMetadataTransmissionAudit'
)
ContentMetadataTransmissionAudit.objects.filter(
integrated_channel_code=self.enterprise_configuration.channel_code(),
plugin_configuration_id=self.enterprise_configuration.id,
content_id=content_key
).update(
enterprise_customer_catalog_uuid=enterprise_catalog.uuid
)

self._log_info(
'Found an orphaned content record while creating. '
'Swapping catalog uuid and marking record as resolved.',
course_or_course_run_key=content_key
)
orphaned_content.resolved = True
orphaned_content.save()

# if the item to create doesn't exist as an orphaned piece of content, do all the normal checks
elif content_key not in existing_content_keys:
unique_new_items_to_create.append(item)
else:
self._log_info(
'Found an previous content record in another catalog while creating. '
'Skipping record.',
course_or_course_run_key=item.get('content_key')
course_or_course_run_key=content_key
)

content_to_create = self._check_matched_content_to_create(
Expand Down Expand Up @@ -380,6 +417,24 @@ def _check_matched_content_to_delete(self, enterprise_customer_catalog, items):
)
return items_to_delete

def _get_customer_config_orphaned_content(self, max_set_count, content_key=None):
"""
Helper method to retrieve the customer's orphaned content metadata items.
"""
OrphanedContentTransmissions = apps.get_model(
'integrated_channel',
'OrphanedContentTransmissions'
)
content_query = Q(content_id=content_key) if content_key else Q()
base_query = Q(
integrated_channel_code=self.enterprise_configuration.channel_code(),
plugin_configuration_id=self.enterprise_configuration.id,
resolved=False,
) & content_query

# Grab orphaned content metadata items for the customer, ordered by oldest to newest
return OrphanedContentTransmissions.objects.filter(base_query).order_by('created')[:max_set_count]

def export(self, **kwargs):
"""
Export transformed content metadata if there has been an update to the consumer's catalogs
Expand Down Expand Up @@ -470,6 +525,24 @@ def export(self, **kwargs):
for key, item in items_to_delete.items():
delete_payload[key] = item

# If we're not at the max payload count, we can check for orphaned content and shove it in the delete payload
current_payload_count = len(items_to_create) + len(items_to_update) + len(items_to_delete)
if current_payload_count < max_payload_count:
space_left_in_payload = max_payload_count - current_payload_count
orphaned_content_to_delete = self._get_customer_config_orphaned_content(
max_set_count=space_left_in_payload,
)

for orphaned_item in orphaned_content_to_delete:
# log the content that would have been deleted because it's orphaned
self._log_info(
f'Exporter intends to delete orphaned content for customer: {self.enterprise_customer.uuid}, '
f'config {self.enterprise_configuration.channel_code}-{self.enterprise_configuration} with '
f'content_id: {orphaned_item.content_id}'
)
if getattr(settings, "ALLOW_ORPHANED_CONTENT_REMOVAL", False):
delete_payload[orphaned_item.content_id] = orphaned_item.transmission

self._log_info(
f'Exporter finished for customer: {self.enterprise_customer.uuid} with payloads- create_payload: '
f'{create_payload}, update_payload: {update_payload}, delete_payload: {delete_payload}'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
Mark all content metadata audit records not directly connected to a customer's catalogs as orphaned.
"""
import logging

from django.core.management.base import BaseCommand

from integrated_channels.integrated_channel.management.commands import IntegratedChannelCommandMixin
from integrated_channels.integrated_channel.tasks import mark_orphaned_content_metadata_audit

LOGGER = logging.getLogger(__name__)


class Command(IntegratedChannelCommandMixin, BaseCommand):
"""
Mark all content metadata audit records not directly connected to a customer's catalogs as orphaned.
./manage.py lms mark_orphaned_content_metadata_audits
"""

def handle(self, *args, **options):
"""
Mark all content metadata audit records not directly connected to a customer's catalogs as orphaned.
"""
try:
mark_orphaned_content_metadata_audit.delay()
except Exception as exc: # pylint: disable=broad-except
LOGGER.exception(f'Failed to mark orphaned content metadata audits. Task failed with exception: {exc}')
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Generated by Django 3.2.18 on 2023-03-31 18:14

from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
import model_utils.fields


class Migration(migrations.Migration):

dependencies = [
('integrated_channel', '0026_genericenterprisecustomerpluginconfiguration_last_modified_at'),
]

operations = [
migrations.CreateModel(
name='OrphanedContentTransmissions',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')),
('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')),
('integrated_channel_code', models.CharField(max_length=30)),
('plugin_configuration_id', models.PositiveIntegerField()),
('content_id', models.CharField(max_length=255)),
('resolved', models.BooleanField(default=False)),
('transmission', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='orphaned_record', to='integrated_channel.contentmetadataitemtransmission')),
],
options={
'index_together': {('integrated_channel_code', 'plugin_configuration_id', 'resolved')},
},
),
]
37 changes: 37 additions & 0 deletions integrated_channels/integrated_channel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class SoftDeletionQuerySet(QuerySet):
"""
Soft deletion query set.
"""

def delete(self):
return super().update(deleted_at=localized_utcnow())

Expand Down Expand Up @@ -241,6 +242,22 @@ def clean(self):
}
)

def fetch_orphaned_content_audits(self):
"""
Helper method attached to customer configs to fetch all orphaned content metadata audits not linked to the
customer's catalogs.
"""
enterprise_customer_catalogs = self.customer_catalogs_to_transmit or \
self.enterprise_customer.enterprise_customer_catalogs.all()

customer_catalog_uuids = enterprise_customer_catalogs.values_list('uuid', flat=True)
return ContentMetadataItemTransmission.objects.filter(
integrated_channel_code=self.channel_code(),
enterprise_customer=self.enterprise_customer,
remote_deleted_at__isnull=True,
remote_created_at__isnull=False,
).exclude(enterprise_customer_catalog_uuid__in=customer_catalog_uuids)

def update_content_synced_at(self, action_happened_at, was_successful):
"""
Given the last time a Content record sync was attempted and status update the appropriate timestamps.
Expand Down Expand Up @@ -412,6 +429,7 @@ class GenericEnterpriseCustomerPluginConfiguration(EnterpriseCustomerPluginConfi
"""
A generic implementation of EnterpriseCustomerPluginConfiguration which can be instantiated
"""

def __str__(self):
"""
Return human-readable string representation.
Expand Down Expand Up @@ -814,3 +832,22 @@ def __repr__(self):
Return uniquely identifying string representation.
"""
return self.__str__()


class OrphanedContentTransmissions(TimeStampedModel):
"""
A model to track content metadata transmissions that were successfully sent to the integrated channel but then
subsequently were orphaned by a removal of their associated catalog from the customer.
"""
class Meta:
index_together = [('integrated_channel_code', 'plugin_configuration_id', 'resolved')]

integrated_channel_code = models.CharField(max_length=30)
plugin_configuration_id = models.PositiveIntegerField(blank=False, null=False)
content_id = models.CharField(max_length=255, blank=False, null=False)
transmission = models.ForeignKey(
ContentMetadataItemTransmission,
related_name='orphaned_record',
on_delete=models.CASCADE,
)
resolved = models.BooleanField(default=False)
43 changes: 43 additions & 0 deletions integrated_channels/integrated_channel/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
INTEGRATED_CHANNEL_CHOICES,
IntegratedChannelCommandUtils,
)
from integrated_channels.integrated_channel.models import ContentMetadataItemTransmission, OrphanedContentTransmissions
from integrated_channels.utils import generate_formatted_log

LOGGER = get_task_logger(__name__)
Expand Down Expand Up @@ -89,6 +90,48 @@ def _log_batch_task_finish(task_name, channel_code, job_user_id,
))


@shared_task
@set_code_owner_attribute
def mark_orphaned_content_metadata_audit():
"""
Task to mark content metadata audits as orphaned if they are not linked to any customer catalogs.
"""
start = time.time()
_log_batch_task_start('mark_orphaned_content_metadata_audit', None, None, None)

orphaned_metadata_audits = ContentMetadataItemTransmission.objects.none()
# Go over each integrated channel
for individual_channel in INTEGRATED_CHANNEL_CHOICES.values():
try:
# Iterate through each configuration for the channel
for config in individual_channel.objects.all():
# fetch orphaned content
orphaned_metadata_audits |= config.fetch_orphaned_content_audits()
except Exception as exc: # pylint: disable=broad-except
LOGGER.exception(
f'[Integrated Channel] mark_orphaned_content_metadata_audit failed with exception {exc}.',
exc_info=True
)
# Generate orphaned content records for each fetched audit record
for orphaned_metadata_audit in orphaned_metadata_audits:
OrphanedContentTransmissions.objects.get_or_create(
integrated_channel_code=orphaned_metadata_audit.integrated_channel_code,
plugin_configuration_id=orphaned_metadata_audit.plugin_configuration_id,
transmission=orphaned_metadata_audit,
content_id=orphaned_metadata_audit.content_id,
)

duration = time.time() - start
_log_batch_task_finish(
'mark_orphaned_content_metadata_audit',
channel_code=None,
job_user_id=None,
integrated_channel_full_config=None,
duration_seconds=duration,
extra_message=f'Orphaned content metadata audits marked: {orphaned_metadata_audits.count()}'
)


@shared_task
@set_code_owner_attribute
@locked(expiry_seconds=TASK_LOCK_EXPIRY_SECONDS, lock_name_kwargs=['channel_code', 'channel_pk'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ def _transmit_action(self, content_metadata_item_map, client_method, action_name
transmission_limit = settings.INTEGRATED_CHANNELS_API_CHUNK_TRANSMISSION_LIMIT.get(
self.enterprise_configuration.channel_code()
)

# If we're deleting, fetch all orphaned, uneresolved content transmissions
is_delete_action = action_name == 'delete'
if is_delete_action:
OrphanedContentTransmissions = apps.get_model(
'integrated_channel',
'OrphanedContentTransmissions'
)
orphaned_items = OrphanedContentTransmissions.objects.filter(
integrated_channel_code=self.enterprise_configuration.channel_code(),
plugin_configuration_id=self.enterprise_configuration.id,
resolved=False,
)
successfully_removed_content_keys = []

for chunk in islice(chunk_items, transmission_limit):
json_payloads = [item.channel_metadata for item in list(chunk.values())]
serialized_chunk = self._serialize_items(json_payloads)
Expand Down Expand Up @@ -203,11 +218,18 @@ def _transmit_action(self, content_metadata_item_map, client_method, action_name
transmission.remote_created_at = action_happened_at
elif action_name == 'update':
transmission.remote_updated_at = action_happened_at
elif action_name == 'delete':
elif is_delete_action:
transmission.remote_deleted_at = action_happened_at
if was_successful:
successfully_removed_content_keys.append(transmission.content_id)
transmission.save()
if was_successful:
transmission.remove_marked_for()
self.enterprise_configuration.update_content_synced_at(action_happened_at, was_successful)
results.append(transmission)

if is_delete_action and successfully_removed_content_keys:
# Mark any successfully deleted, orphaned content transmissions as resolved
orphaned_items.filter(content_id__in=successfully_removed_content_keys).update(resolved=True)

return results
20 changes: 20 additions & 0 deletions test_utils/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
GenericEnterpriseCustomerPluginConfiguration,
GenericLearnerDataTransmissionAudit,
LearnerDataTransmissionAudit,
OrphanedContentTransmissions,
)
from integrated_channels.moodle.models import MoodleEnterpriseCustomerConfiguration
from integrated_channels.sap_success_factors.models import (
Expand Down Expand Up @@ -943,6 +944,25 @@ class Meta:
}


class OrphanedContentTransmissionsFactory(factory.django.DjangoModelFactory):
"""
``OrphanedContentTransmissions`` factory.
"""

class Meta:
"""
Meta for ``OrphanedContentTransmissions``.
"""

model = OrphanedContentTransmissions

integrated_channel_code = 'GENERIC'
content_id = factory.LazyAttribute(lambda x: FAKER.slug())
plugin_configuration_id = factory.LazyAttribute(lambda x: FAKER.random_int(min=1))
resolved = False
transmission = factory.Iterator(ContentMetadataItemTransmission.objects.all())


class EnterpriseCustomerInviteKeyFactory(factory.django.DjangoModelFactory):
"""
EnterpriseCustomerInviteKey factory.
Expand Down
Loading

0 comments on commit b5a8dc7

Please sign in to comment.