From 7a8f3b738cff99a2310e48f74f225f3648ae995b Mon Sep 17 00:00:00 2001 From: Ali Akbar Date: Wed, 8 Jan 2025 14:55:32 +0500 Subject: [PATCH] feat: add SearchAfterMixin for ES search_after capability --- .../tests/test_views/test_catalog_queries.py | 94 +++++++++++++++++++ course_discovery/apps/api/v2/urls.py | 8 +- .../apps/api/v2/views/catalog_queries.py | 72 ++++++++++++++ .../apps/course_metadata/models.py | 73 ++++++++++++++ .../apps/course_metadata/tests/test_models.py | 37 +++++++- .../0030-use-elasticsearch-search-after.rst | 39 ++++---- 6 files changed, 303 insertions(+), 20 deletions(-) create mode 100644 course_discovery/apps/api/v2/tests/test_views/test_catalog_queries.py create mode 100644 course_discovery/apps/api/v2/views/catalog_queries.py diff --git a/course_discovery/apps/api/v2/tests/test_views/test_catalog_queries.py b/course_discovery/apps/api/v2/tests/test_views/test_catalog_queries.py new file mode 100644 index 0000000000..74df5ba0a0 --- /dev/null +++ b/course_discovery/apps/api/v2/tests/test_views/test_catalog_queries.py @@ -0,0 +1,94 @@ +import urllib + +from rest_framework.reverse import reverse + +from course_discovery.apps.api.v1.tests.test_views.mixins import APITestCase +from course_discovery.apps.core.tests.factories import UserFactory +from course_discovery.apps.core.tests.mixins import ElasticsearchTestMixin +from course_discovery.apps.course_metadata.tests.factories import CourseFactory, CourseRunFactory + + +class CatalogQueryViewSetTests(ElasticsearchTestMixin, APITestCase): + """ + Unit tests for CatalogQueryViewSet. + """ + def setUp(self): + super().setUp() + self.user = UserFactory(is_staff=True, is_superuser=True) + self.client.force_authenticate(self.user) + self.course = CourseFactory(partner=self.partner, key='simple_key') + self.course_run = CourseRunFactory(course=self.course, key='simple/key/run') + self.url_base = reverse('api:v2:catalog-query_contains') + self.error_message = 'CatalogQueryContains endpoint requires query and identifiers list(s)' + self.refresh_index() + + def test_contains_single_course_run(self): + """ Verify that a single course_run is contained in a query. """ + qs = urllib.parse.urlencode({ + 'query': 'id:' + self.course_run.key, + 'course_run_ids': self.course_run.key, + 'course_uuids': self.course.uuid, + }) + url = f'{self.url_base}/?{qs}' + response = self.client.get(url) + assert response.status_code == 200 + assert response.data == {self.course_run.key: True, str(self.course.uuid): False} + + def test_contains_single_course(self): + """ Verify that a single course is contained in a query. """ + qs = urllib.parse.urlencode({ + 'query': 'key:' + self.course.key, + 'course_run_ids': self.course_run.key, + 'course_uuids': self.course.uuid, + }) + url = f'{self.url_base}/?{qs}' + response = self.client.get(url) + assert response.status_code == 200 + assert response.data == {self.course_run.key: False, str(self.course.uuid): True} + + def test_contains_course_and_run(self): + """ Verify that both the course and the run are contained in the broadest query. """ + self.course.course_runs.add(self.course_run) + self.course.save() + qs = urllib.parse.urlencode({ + 'query': 'org:*', + 'course_run_ids': self.course_run.key, + 'course_uuids': self.course.uuid, + }) + url = f'{self.url_base}/?{qs}' + response = self.client.get(url) + assert response.status_code == 200 + assert response.data == {self.course_run.key: True, str(self.course.uuid): True} + + def test_no_identifiers(self): + """ Verify that a 400 status is returned if request does not contain any identifier lists. """ + qs = urllib.parse.urlencode({ + 'query': 'id:*' + }) + url = f'{self.url_base}/?{qs}' + response = self.client.get(url) + assert response.status_code == 400 + assert response.data == self.error_message + + def test_no_query(self): + """ Verify that a 400 status is returned if request does not contain a querystring. """ + qs = urllib.parse.urlencode({ + 'course_run_ids': self.course_run.key, + 'course_uuids': self.course.uuid, + }) + url = f'{self.url_base}/?{qs}' + response = self.client.get(url) + assert response.status_code == 400 + assert response.data == self.error_message + + def test_incorrect_queries(self): + """ Verify that a 400 status is returned if request contains incorrect query string. """ + qs = urllib.parse.urlencode({ + 'query': 'title:', + 'course_run_ids': self.course_run.key, + 'course_uuids': self.course.uuid, + }) + url = f'{self.url_base}/?{qs}' + + response = self.client.get(url) + assert response.status_code == 400 diff --git a/course_discovery/apps/api/v2/urls.py b/course_discovery/apps/api/v2/urls.py index d8c0c2ec6d..502a6e9173 100644 --- a/course_discovery/apps/api/v2/urls.py +++ b/course_discovery/apps/api/v2/urls.py @@ -1,11 +1,17 @@ """API v2 URLs.""" +from django.urls import include, path, re_path from rest_framework import routers from course_discovery.apps.api.v2.views import search as search_views +from course_discovery.apps.api.v2.views.catalog_queries import CatalogQueryContainsViewSet app_name = 'v2' +urlpatterns = [ + re_path(r'^catalog/query_contains/?', CatalogQueryContainsViewSet.as_view(), name='catalog-query_contains'), +] + router = routers.SimpleRouter() router.register(r'search/all', search_views.AggregateSearchViewSet, basename='search-all') -urlpatterns = router.urls +urlpatterns += router.urls diff --git a/course_discovery/apps/api/v2/views/catalog_queries.py b/course_discovery/apps/api/v2/views/catalog_queries.py new file mode 100644 index 0000000000..b1a04eca7c --- /dev/null +++ b/course_discovery/apps/api/v2/views/catalog_queries.py @@ -0,0 +1,72 @@ +import logging +from uuid import UUID + +from elasticsearch_dsl.query import Q as ESDSLQ +from rest_framework import status +from rest_framework.generics import GenericAPIView +from rest_framework.permissions import DjangoModelPermissions, IsAuthenticated +from rest_framework.response import Response + +from course_discovery.apps.api.mixins import ValidElasticSearchQueryRequiredMixin +from course_discovery.apps.course_metadata.models import Course, CourseRun + +log = logging.getLogger(__name__) + + +class CatalogQueryContainsViewSet(ValidElasticSearchQueryRequiredMixin, GenericAPIView): + permission_classes = (IsAuthenticated, DjangoModelPermissions) + queryset = Course.objects.all() + + def get(self, request): + """ + Determine if a set of courses and/or course runs is found in the query results. + + Returns + dict: mapping of course and run identifiers included in the request to boolean values + indicating whether the associated course or run is contained in the queryset + described by the query found in the request. + """ + query = request.GET.get('query') + course_run_ids = request.GET.get('course_run_ids', None) + course_uuids = request.GET.get('course_uuids', None) + partner = self.request.site.partner + + if query and (course_run_ids or course_uuids): + log.info( + f"Attempting search against query {query} with course UUIDs {course_uuids} " + f"and course run IDs {course_run_ids}" + ) + identified_course_ids = set() + specified_course_ids = [] + if course_run_ids: + course_run_ids = course_run_ids.split(',') + specified_course_ids = course_run_ids + identified_course_ids.update( + i.key + for i in CourseRun.search( + query, + partner=ESDSLQ('term', partner=partner.short_code), + identifiers=ESDSLQ('terms', **{'key.raw': course_run_ids}) + ).source(['key']) + ) + if course_uuids: + course_uuids = [UUID(course_uuid) for course_uuid in course_uuids.split(',')] + specified_course_ids += course_uuids + + log.info(f"Specified course ids: {specified_course_ids}") + identified_course_ids.update( + Course.search( + query, + partner=ESDSLQ('term', partner=partner.short_code), + identifiers=ESDSLQ('terms', **{'uuid': course_uuids}) + ).values_list('uuid', flat=True) + ) + log.info(f"Identified {len(identified_course_ids)} course ids: {identified_course_ids}") + + import pdb + pdb.set_trace() + contains = {str(identifier): identifier in identified_course_ids for identifier in specified_course_ids} + return Response(contains) + return Response( + 'CatalogQueryContains endpoint requires query and identifiers list(s)', status=status.HTTP_400_BAD_REQUEST + ) diff --git a/course_discovery/apps/course_metadata/models.py b/course_discovery/apps/course_metadata/models.py index b46a04da17..4d493c99ff 100644 --- a/course_discovery/apps/course_metadata/models.py +++ b/course_discovery/apps/course_metadata/models.py @@ -1149,6 +1149,79 @@ def search(cls, query, queryset=None): return filtered_queryset +class SearchAfterMixin: + """ + Represents objects to query Elasticsearch with `search_after` pagination and load by primary key. + """ + + @classmethod + def search(cls, query, queryset=None, page_size=settings.ELASTICSEARCH_DSL_QUERYSET_PAGINATION, partner=None, + identifiers=None): + """ + Queries the Elasticsearch index with optional pagination using `search_after`. + + Args: + query (str) -- Elasticsearch querystring (e.g. `title:intro*`) + queryset (models.QuerySet) -- base queryset to search, defaults to objects.all() + page_size (int) -- Number of results per page. + partner (object) -- To be included in the ES query. + identifiers (object) -- UUID or key of a product. + + Returns: + QuerySet + """ + query = clean_query(query) + queryset = queryset or cls.objects.all() + + if query == '(*)': + # Early-exit optimization. Wildcard searching is very expensive in elasticsearch. And since we just + # want everything, we don't need to actually query elasticsearch at all. + return queryset + + logger.info(f"Attempting Elasticsearch document search against query: {query}") + es_document, *_ = registry.get_documents(models=(cls,)) + + must_queries = [ESDSLQ('query_string', query=query, analyze_wildcard=True)] + if partner: + must_queries.append(partner) + if identifiers: + must_queries.append(identifiers) + + dsl_query = ESDSLQ('bool', must=must_queries) + + all_ids = set() + search_after = None + + while True: + search = ( + es_document.search() + .query(dsl_query) + .sort('id') + .extra(size=page_size) + ) + + search = search.extra(search_after=search_after) if search_after else search + + try: + results = search.execute() + except RequestError as e: + logger.warning(f"Elasticsearch request failed: {e}") + break + + ids = {result.pk for result in results} + if not ids: + logger.info("No more results found.") + break + + all_ids.update(ids) + search_after = results[-1].meta.sort if results[-1] else None + logger.info(f"Fetched {len(ids)} records; total so far: {len(all_ids)}") + + filtered_queryset = queryset.filter(pk__in=all_ids) + logger.info(f"Filtered queryset of size {len(filtered_queryset)} for query: {query}") + return filtered_queryset + + class Collaborator(TimeStampedModel): """ Collaborator model, defining any collaborators who helped write course content. diff --git a/course_discovery/apps/course_metadata/tests/test_models.py b/course_discovery/apps/course_metadata/tests/test_models.py index 53de73d2a6..b2fdf007dd 100644 --- a/course_discovery/apps/course_metadata/tests/test_models.py +++ b/course_discovery/apps/course_metadata/tests/test_models.py @@ -17,9 +17,11 @@ from django.core.exceptions import ValidationError from django.core.management import call_command from django.db import IntegrityError, transaction +from django.db.models import QuerySet from django.test import TestCase, override_settings from edx_django_utils.cache import RequestCache from edx_toggles.toggles.testutils import override_waffle_switch +from elasticsearch_dsl.connections import connections from freezegun import freeze_time from slugify import slugify from taggit.models import Tag @@ -30,6 +32,7 @@ from course_discovery.apps.api.v1.tests.test_views.mixins import OAuth2Mixin from course_discovery.apps.core.models import Currency from course_discovery.apps.core.tests.helpers import make_image_file +from course_discovery.apps.core.tests.mixins import ElasticsearchTestMixin from course_discovery.apps.core.utils import SearchQuerySetWrapper from course_discovery.apps.course_metadata.choices import ( CourseRunRestrictionType, CourseRunStatus, ExternalProductStatus, ProgramStatus @@ -39,11 +42,13 @@ FAQ, AbstractHeadingBlurbModel, AbstractMediaModel, AbstractNamedModel, AbstractTitleDescriptionModel, AbstractValueModel, CorporateEndorsement, Course, CourseEditor, CourseRun, CourseRunType, CourseType, Curriculum, CurriculumCourseMembership, CurriculumCourseRunExclusion, CurriculumProgramMembership, DegreeCost, DegreeDeadline, - Endorsement, Organization, OrganizationMapping, Program, ProgramType, Ranking, Seat, SeatType, Subject, Topic + Endorsement, Organization, OrganizationMapping, Program, ProgramType, Ranking, SearchAfterMixin, Seat, SeatType, + Subject, Topic ) from course_discovery.apps.course_metadata.publishers import ( CourseRunMarketingSitePublisher, ProgramMarketingSitePublisher ) +from course_discovery.apps.course_metadata.search_indexes.documents import CourseDocument from course_discovery.apps.course_metadata.signals import ( connect_course_data_modified_timestamp_related_models, disconnect_course_data_modified_timestamp_related_models ) @@ -4192,3 +4197,33 @@ def test_basic(self): self.assertEqual(course_run.restricted_run, restricted_course_run) self.assertEqual(restricted_course_run.restriction_type, 'custom-b2b-enterprise') self.assertEqual(str(restricted_course_run), "course-v1:SC+BreadX+3T2015: ") + + +class CourseProxy(SearchAfterMixin, Course): + """Proxy model for testing SearchAfterMixin with Course.""" + class Meta: + proxy = True + + +class CourseProxyFactory(CourseFactory): + """Factory for the CourseProxy proxy model.""" + class Meta: + model = CourseProxy + + +class TestSearchAfterMixin(ElasticsearchTestMixin, TestCase): + def setUp(self): + super().setUp() + + self.total_courses = 5 + for _ in range(self.total_courses): + CourseFactory() + + @patch("course_discovery.apps.course_metadata.models.registry.get_documents") + def test_fetch_all_courses(self, mock_get_documents): + query = "Course*" + mock_get_documents.return_value = [CourseDocument] + + queryset = CourseProxy.search(query=query) + + self.assertEqual(len(queryset), self.total_courses) diff --git a/docs/decisions/0030-use-elasticsearch-search-after.rst b/docs/decisions/0030-use-elasticsearch-search-after.rst index 6cbc0d2eda..e0c00143bb 100644 --- a/docs/decisions/0030-use-elasticsearch-search-after.rst +++ b/docs/decisions/0030-use-elasticsearch-search-after.rst @@ -7,49 +7,52 @@ Accepted (December 2024) Context --------- -Elasticsearch enforces a strict limit on the number of records that can be retrieved in a single query, -controlled by the `MAX_RESULT_WINDOW` setting, which defaults to 10,000. -When a query attempts to retrieve more results than this limit, Elasticsearch does not simply truncate the results—instead, +Elasticsearch enforces a strict limit on the number of records that can be retrieved in a single query, +controlled by the `MAX_RESULT_WINDOW` setting, which defaults to 10,000. +When a query attempts to retrieve more results than this limit, Elasticsearch does not simply truncate the results—instead, it can lead to partial or incomplete data retrieval across search endpoints, potentially causing significant data loss or incomplete query responses. -Increasing this limit is not a viable solution, as it can lead to significant performance issues, +Increasing this limit is not a viable solution, as it can lead to significant performance issues, including increased memory usage and query latency, which can degrade the cluster's overall stability. -To address this issue, we need a more efficient way to paginate large query results. -The solution must allow for seamless and reliable pagination without imposing excessive resource demands on the system. +To address this issue, we need a more efficient way to paginate large query results. +The solution must allow for seamless and reliable pagination without imposing excessive resource demands on the system. Furthermore, it should ensure that the existing search functionality and search responses remain unaffected in the current version of the endpoint. Decision ---------- -A new version (v2) of the `search/all/` endpoint will be introduced to enhance functionality while ensuring that the existing v1 functionality remains unaffected. +A new version (v2) of the `search/all/` endpoint will be introduced to enhance functionality while ensuring that the existing v1 functionality remains unaffected. This version will utilize ElasticSearch's search_after pagination mechanism, specifically designed to handle large query results efficiently. -*How search_after Works:** -`search_after` is a pagination mechanism that allows retrieving results beyond the standard window limit by using the sort values of the last document from the previous page. +**How search_after Works:** +`search_after` is a pagination mechanism that allows retrieving results beyond the standard window limit by using the sort values of the last document from the previous page. Instead of using traditional offset-based pagination, it uses the actual sort values of the last retrieved document to fetch the next set of results, ensuring efficient and accurate pagination for large datasets. -In the v2 implementation, response documents will include a `sort` field that can be used as the `search_after` query parameter in subsequent queries. -This approach enables scalable retrieval of large datasets by bypassing the `MAX_RESULT_WINDOW` limitations. +In the v2 implementation, response documents will include a `sort` field that can be used as the `search_after` query parameter in subsequent queries. +This approach enables scalable retrieval of large datasets by bypassing the `MAX_RESULT_WINDOW` limitations. To support this, a new `SearchAfterPagination` class will be introduced, which will parse the `search_after` query parameter to facilitate efficient pagination of ElasticSearch results. -Additionally, new serializers will be integrated for the v2 implementation. +Additionally, new serializers will be integrated for the v2 implementation. Specifically, the AggregateSearchListSerializerV2 will extend the existing AggregateSearchListSerializer, supporting the `search_after` pagination mechanism and incorporating newer serializer versions for the same document types. -New versions of the serializers +New versions of the serializers - CourseRunSearchDocumentSerializer - CourseSearchDocumentSerializer - ProgramSearchDocumentSerializer - LearnerPathwaySearchDocumentSerializer - PersonSearchDocumentSerializer -will be introduced to include additional search index fields, specifically `sort`` and `aggregate_uuid` in their responses. +will be introduced to include additional search index fields, specifically `sort`` and `aggregation_uuid` in their responses. -Consumers will interact with the new v2 search endpoint by making an initial request to `/api/v2/search/all/`, -which returns search results along with a `next` field representing the `sort` value of the last document. +Consumers will interact with the new v2 search endpoint by making an initial request to `/api/v2/search/all/`, +which returns search results along with a `next` field representing the `sort` value of the last document. For subsequent pages, they simply include this `next` value as the `search_after` parameter in their next request. +A new mixin, SearchAfterMixin, will be created and added to enable the search_after functionality in catalog query endpoints, such as `/api/v1/catalog/query_contains`. + Example Usage: + ``` # First request response1 = requests.get('/api/v2/search/all/') @@ -58,15 +61,15 @@ next_page_search_after = results1['next'] # This is the sort value of the last # Next request using the 'next' value response2 = requests.get(f'/api/v2/search/all/?search_after={json.dumps(next_page_search_after)}') + ``` Consequences -------------- -- The v2 search endpoint will introduce two new fields, `aggregate_uuid` and `sort`, in response to support the search_after pagination mechanism. +- The v2 search endpoint will introduce two new fields, `aggregation_uuid` and `sort`, in response to support the search_after pagination mechanism. Next Steps ------------------------- -- Create SearchAfter Mixin: Develop a mixin to enable search_after functionality in the Django shell. - Extend SearchAfter Functionality: Implement `search_after` for other Elasticsearch search endpoints. - Notify Users: Inform consumers about the changes and provide support during the transition. - Monitor Performance: Track the performance of the new endpoint post-deployment.