diff --git a/course_discovery/apps/course_metadata/models.py b/course_discovery/apps/course_metadata/models.py index b46a04da17..63d4e202d2 100644 --- a/course_discovery/apps/course_metadata/models.py +++ b/course_discovery/apps/course_metadata/models.py @@ -1149,6 +1149,67 @@ def search(cls, query, queryset=None): return filtered_queryset +class SearchAfterMixin: + """ + Represents objects to query Elasticsearch with `search_after` pagination and load by primary key. + """ + + @classmethod + def search(cls, query, queryset=None, page_size=settings.ELASTICSEARCH_DSL_QUERYSET_PAGINATION): + """ + Queries the Elasticsearch index with optional pagination using `search_after`. + + Args: + query (str) -- Elasticsearch querystring (e.g. `title:intro*`) + queryset (models.QuerySet) -- base queryset to search, defaults to objects.all() + page_size (int) -- Number of results per page. + + Returns: + QuerySet + """ + query = clean_query(query) + queryset = queryset or cls.objects.all() + + if query == '(*)': + # Early-exit optimization. Wildcard searching is very expensive in elasticsearch. And since we just + # want everything, we don't need to actually query elasticsearch at all. + return queryset + + logger.info(f"Attempting Elasticsearch document search against query: {query}") + es_document, *_ = registry.get_documents(models=(cls,)) + dsl_query = ESDSLQ('query_string', query=query, analyze_wildcard=True) + + all_ids = set() + search_after = None + + while True: + search = ( + es_document.search() + .query(dsl_query) + .sort('id') + .extra(size=page_size, search_after=search_after) + ) + + try: + results = search.execute() + except RequestError as e: + logger.warning(f"Elasticsearch request failed: {e}") + break + + ids = {result.pk for result in results} + if not ids: + logger.info("No more results found.") + break + + all_ids.update(ids) + search_after = results[-1].meta.sort + logger.info(f"Fetched {len(ids)} records; total so far: {len(all_ids)}") + + filtered_queryset = queryset.filter(pk__in=all_ids) + logger.info(f"Filtered queryset of size {len(filtered_queryset)} for query: {query}") + return filtered_queryset + + class Collaborator(TimeStampedModel): """ Collaborator model, defining any collaborators who helped write course content. diff --git a/course_discovery/apps/course_metadata/tests/test_models.py b/course_discovery/apps/course_metadata/tests/test_models.py index 53de73d2a6..a43de9d19d 100644 --- a/course_discovery/apps/course_metadata/tests/test_models.py +++ b/course_discovery/apps/course_metadata/tests/test_models.py @@ -5,7 +5,7 @@ from decimal import Decimal from functools import partial from unittest import mock -from unittest.mock import patch +from unittest.mock import MagicMock, patch import ddt import pytest @@ -17,6 +17,7 @@ from django.core.exceptions import ValidationError from django.core.management import call_command from django.db import IntegrityError, transaction +from django.db.models import QuerySet from django.test import TestCase, override_settings from edx_django_utils.cache import RequestCache from edx_toggles.toggles.testutils import override_waffle_switch @@ -39,7 +40,8 @@ FAQ, AbstractHeadingBlurbModel, AbstractMediaModel, AbstractNamedModel, AbstractTitleDescriptionModel, AbstractValueModel, CorporateEndorsement, Course, CourseEditor, CourseRun, CourseRunType, CourseType, Curriculum, CurriculumCourseMembership, CurriculumCourseRunExclusion, CurriculumProgramMembership, DegreeCost, DegreeDeadline, - Endorsement, Organization, OrganizationMapping, Program, ProgramType, Ranking, Seat, SeatType, Subject, Topic + Endorsement, Organization, OrganizationMapping, Program, ProgramType, Ranking, SearchAfterMixin, Seat, SeatType, + Subject, Topic ) from course_discovery.apps.course_metadata.publishers import ( CourseRunMarketingSitePublisher, ProgramMarketingSitePublisher @@ -4192,3 +4194,86 @@ def test_basic(self): self.assertEqual(course_run.restricted_run, restricted_course_run) self.assertEqual(restricted_course_run.restriction_type, 'custom-b2b-enterprise') self.assertEqual(str(restricted_course_run), "course-v1:SC+BreadX+3T2015: ") + + +class MockQuerySet(QuerySet): + def __init__(self, model=None, items=None): + self.model = model + self.items = items or [] + super().__init__() + + def filter(self, *args, **kwargs): + """ + Override the filter method to mimic Django's QuerySet behavior. + """ + pk_in = kwargs.get("pk__in", []) + pk_in = set(map(int, pk_in)) + print(f"Filtering with pk_in: {pk_in}, self.items: {[item.pk for item in self.items]}") + return MockQuerySet( + model=self.model, + items=[item for item in self.items if item.pk in pk_in], + ) + + def __iter__(self): + return iter(self.items) + + def __len__(self): + return len(self.items) + + def all(self): + return self + + def _chain(self): + # Mimic Django's queryset chaining + return self.__class__(model=self.model, items=self.items) + + +class MockModel: + def __init__(self, pk): + self.pk = pk + + +class SearchAfterMixinTest(SearchAfterMixin, MockModel): + objects = MockQuerySet(model=MockModel) + + +class TestSearchAfterMixin(TestCase): + @patch("course_discovery.apps.course_metadata.models.registry.get_documents") + @patch("course_discovery.apps.course_metadata.models.logger") + @patch("course_discovery.apps.course_metadata.models.clean_query") + def test_search_with_mock_data(self, mock_clean_query, mock_logger, mock_registry): + mock_document = MagicMock() + mock_search = MagicMock() + mock_document.search.return_value = mock_search + + mock_search.query.return_value = mock_search + mock_search.sort.return_value = mock_search + mock_search.extra.return_value = mock_search + + mock_result1 = MagicMock() + mock_result1.pk = 1 + mock_result1.meta.sort = ["sort1"] + + mock_result2 = MagicMock() + mock_result2.pk = 2 + mock_result2.meta.sort = ["sort2"] + + mock_search.execute.side_effect = [ + [mock_result1, mock_result2], + [], + ] + + mock_registry.return_value = (mock_document,) + mock_clean_query.return_value = "cleaned_query" + + SearchAfterMixinTest.objects = MockQuerySet( + model=MockModel, + items=[MockModel(1), MockModel(2), MockModel(3)], + ) + + result_queryset = SearchAfterMixinTest.search("query") + + self.assertEqual(len(result_queryset), 2) + self.assertTrue(all(item.pk in {1, 2} for item in result_queryset)) + mock_logger.info.assert_called() + mock_registry.assert_called_once_with(models=(SearchAfterMixinTest,)) diff --git a/docs/decisions/0030-use-elasticsearch-search-after.rst b/docs/decisions/0030-use-elasticsearch-search-after.rst index 6cbc0d2eda..e0c00143bb 100644 --- a/docs/decisions/0030-use-elasticsearch-search-after.rst +++ b/docs/decisions/0030-use-elasticsearch-search-after.rst @@ -7,49 +7,52 @@ Accepted (December 2024) Context --------- -Elasticsearch enforces a strict limit on the number of records that can be retrieved in a single query, -controlled by the `MAX_RESULT_WINDOW` setting, which defaults to 10,000. -When a query attempts to retrieve more results than this limit, Elasticsearch does not simply truncate the results—instead, +Elasticsearch enforces a strict limit on the number of records that can be retrieved in a single query, +controlled by the `MAX_RESULT_WINDOW` setting, which defaults to 10,000. +When a query attempts to retrieve more results than this limit, Elasticsearch does not simply truncate the results—instead, it can lead to partial or incomplete data retrieval across search endpoints, potentially causing significant data loss or incomplete query responses. -Increasing this limit is not a viable solution, as it can lead to significant performance issues, +Increasing this limit is not a viable solution, as it can lead to significant performance issues, including increased memory usage and query latency, which can degrade the cluster's overall stability. -To address this issue, we need a more efficient way to paginate large query results. -The solution must allow for seamless and reliable pagination without imposing excessive resource demands on the system. +To address this issue, we need a more efficient way to paginate large query results. +The solution must allow for seamless and reliable pagination without imposing excessive resource demands on the system. Furthermore, it should ensure that the existing search functionality and search responses remain unaffected in the current version of the endpoint. Decision ---------- -A new version (v2) of the `search/all/` endpoint will be introduced to enhance functionality while ensuring that the existing v1 functionality remains unaffected. +A new version (v2) of the `search/all/` endpoint will be introduced to enhance functionality while ensuring that the existing v1 functionality remains unaffected. This version will utilize ElasticSearch's search_after pagination mechanism, specifically designed to handle large query results efficiently. -*How search_after Works:** -`search_after` is a pagination mechanism that allows retrieving results beyond the standard window limit by using the sort values of the last document from the previous page. +**How search_after Works:** +`search_after` is a pagination mechanism that allows retrieving results beyond the standard window limit by using the sort values of the last document from the previous page. Instead of using traditional offset-based pagination, it uses the actual sort values of the last retrieved document to fetch the next set of results, ensuring efficient and accurate pagination for large datasets. -In the v2 implementation, response documents will include a `sort` field that can be used as the `search_after` query parameter in subsequent queries. -This approach enables scalable retrieval of large datasets by bypassing the `MAX_RESULT_WINDOW` limitations. +In the v2 implementation, response documents will include a `sort` field that can be used as the `search_after` query parameter in subsequent queries. +This approach enables scalable retrieval of large datasets by bypassing the `MAX_RESULT_WINDOW` limitations. To support this, a new `SearchAfterPagination` class will be introduced, which will parse the `search_after` query parameter to facilitate efficient pagination of ElasticSearch results. -Additionally, new serializers will be integrated for the v2 implementation. +Additionally, new serializers will be integrated for the v2 implementation. Specifically, the AggregateSearchListSerializerV2 will extend the existing AggregateSearchListSerializer, supporting the `search_after` pagination mechanism and incorporating newer serializer versions for the same document types. -New versions of the serializers +New versions of the serializers - CourseRunSearchDocumentSerializer - CourseSearchDocumentSerializer - ProgramSearchDocumentSerializer - LearnerPathwaySearchDocumentSerializer - PersonSearchDocumentSerializer -will be introduced to include additional search index fields, specifically `sort`` and `aggregate_uuid` in their responses. +will be introduced to include additional search index fields, specifically `sort`` and `aggregation_uuid` in their responses. -Consumers will interact with the new v2 search endpoint by making an initial request to `/api/v2/search/all/`, -which returns search results along with a `next` field representing the `sort` value of the last document. +Consumers will interact with the new v2 search endpoint by making an initial request to `/api/v2/search/all/`, +which returns search results along with a `next` field representing the `sort` value of the last document. For subsequent pages, they simply include this `next` value as the `search_after` parameter in their next request. +A new mixin, SearchAfterMixin, will be created and added to enable the search_after functionality in catalog query endpoints, such as `/api/v1/catalog/query_contains`. + Example Usage: + ``` # First request response1 = requests.get('/api/v2/search/all/') @@ -58,15 +61,15 @@ next_page_search_after = results1['next'] # This is the sort value of the last # Next request using the 'next' value response2 = requests.get(f'/api/v2/search/all/?search_after={json.dumps(next_page_search_after)}') + ``` Consequences -------------- -- The v2 search endpoint will introduce two new fields, `aggregate_uuid` and `sort`, in response to support the search_after pagination mechanism. +- The v2 search endpoint will introduce two new fields, `aggregation_uuid` and `sort`, in response to support the search_after pagination mechanism. Next Steps ------------------------- -- Create SearchAfter Mixin: Develop a mixin to enable search_after functionality in the Django shell. - Extend SearchAfter Functionality: Implement `search_after` for other Elasticsearch search endpoints. - Notify Users: Inform consumers about the changes and provide support during the transition. - Monitor Performance: Track the performance of the new endpoint post-deployment.