From 5218e0fd8846a03aa68c218592d1acfd7da03dc5 Mon Sep 17 00:00:00 2001 From: Allen Kim Date: Fri, 22 Nov 2024 20:00:28 -0800 Subject: [PATCH] Add 'change_feed_mode' to 'query_items_change_feed' API (#38105) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add 'change_feed_mode' to 'query_items_change_feed' API * remove unnecessary import * Fix lint * Updated CHANGELOG.md * Removed _feed_range.py * Addressed comments * Fixed lint * Add kwargs back to 'call'QueryItemsChangeFeed' * Fixed syntax error with f-string * Removed StrEnum to support earlier Python versions * Fixed f-string error * addressed comments * Addressed comments Also, enabled AllVersionsAndDelete mode for emulators * Removed unnecessary tests * Fix tests for emulator * Generating SDK with model renames (#38108) * Generating SDK with model name updates * Fixing the missed api name change for SDT * Updated the TypeSpec and re-generated the SDK with TranslationStatus id as string * Fixing TranslationIds and DocumentIds renames in patched methods * Updating changelog * [Identity][Monitor] Update live test setup (#37943) Signed-off-by: Paul Van Eck * Clean-up cosmos test pipeline (#38126) Update cosmos test pipeline as it no longer needs that sub-config as the data comes from the service connection. Also update the config name to use default name of public. * Multi modal eval fix (#38134) * Initial-Commit-multimodal * Fix * Sync eng/common directory with azure-sdk-tools for PR 9092 (#37713) * Export the subscription data from the service connection * Update deploy-test-resources.yml --------- Co-authored-by: Wes Haggard Co-authored-by: Wes Haggard * Removing private parameter from __call__ of AdversarialSimulator (#37709) * Update task_query_response.prompty remove required keys * Update task_simulate.prompty * Update task_query_response.prompty * Update task_simulate.prompty * Remove private variable and use kwargs * Add experimental tag to adv sim --------- Co-authored-by: Nagkumar Arkalgud * Enabling option to disable response payload on writes (#37365) * Initial draft * Adding tests * Renaming parameter * Update container.py * Renaming test file * Fixing LINT issues * Update container.py * Update _base.py * Update _base.py * Fixing tests * Fixing tests * Adding support to disable response payload on write for AIO * Update CHANGELOG.md * Update _cosmos_client.py * Reacting to code review comments * Addressing code review feedback * Addressed CR feedback * Fixing pyLint errors * Fixing pylint errors * Update test_crud.py * Fixing svc regression * Update sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py Co-authored-by: Anna Tisch * Reacting to code review feedback. * Update container.py * Update test_query_vector_similarity.py --------- Co-authored-by: Anna Tisch * deprecate azure_germany (#37654) * deprecate azure_germany * update * update * Update sdk/identity/azure-identity/azure/identity/_constants.py Co-authored-by: Paul Van Eck * update --------- Co-authored-by: Paul Van Eck * Add default impl to handle token challenges (#37652) * Add default impl to handle token challenges * update version * update * update * update * update * Update sdk/core/azure-core/azure/core/pipeline/policies/_utils.py Co-authored-by: Paul Van Eck * Update sdk/core/azure-core/azure/core/pipeline/policies/_utils.py Co-authored-by: Paul Van Eck * update * Update sdk/core/azure-core/tests/test_utils.py Co-authored-by: Paul Van Eck * Update sdk/core/azure-core/azure/core/pipeline/policies/_utils.py Co-authored-by: Paul Van Eck * update --------- Co-authored-by: Paul Van Eck * Make Credentials Required for Content Safety and Protected Materials Evaluators (#37707) * Make Credentials Required for Content Safety Evaluators * fix a typo * lint, fix content safety evaluator * revert test change * remove credential from rai_service * addFeedRangesAndUseFeedRangeInQueryChangeFeed (#37687) * Add getFeedRanges API * Add feedRange support in query changeFeed Co-authored-by: annie-mac * Update release date for core (#37723) * Improvements to mindependency dev_requirement conflict resolution (#37669) * during mindependency runs, dev_requirements on local relative paths are now checked for conflict with the targeted set of minimum dependencies * multiple type clarifications within azure-sdk-tools * added tests for new conflict resolution logic --------- Co-authored-by: McCoy Patiño <39780829+mccoyp@users.noreply.github.com> * Need to add environment to subscription configuration (#37726) Co-authored-by: Wes Haggard * Enable samples for formrecognizer (#37676) * multi-modal-changes * fixes * Fix with latest * dict-fix * adding-protected-material * adding-protected-material * adding-protected-material * bumping-version * adding assets * Added image in simulator * Added image in simulator * bumping-version * push-asset * assets * pushing asset * remove-containt-on-key * asset * asset2 * asset3 * asset4 * adding conftest * conftest * cred fix * asset-new * fix * asset * adding multi-modal-without-tests * asset-from-main * asset-from-main * fix * adding one test only * new asset * tests,fix: Sanitizer should replace with enum value not enum name * test-asset * [AutoRelease] t2-containerservicefleet-2024-09-24-42036(can only be merged by SDK owner) (#37538) * code and test * Update CHANGELOG.md * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-dns-2024-09-25-81486(can only be merged by SDK owner) (#37560) * code and test * update-testcase * Update CHANGELOG.md * Update test_mgmt_dns_test.py --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> * [AutoRelease] t2-appconfiguration-2024-10-09-68726(can only be merged by SDK owner) (#37800) * code and test * update-testcase * Update pyproject.toml --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 Co-authored-by: Yuchao Yan * code and test (#37855) Co-authored-by: azure-sdk * [AutoRelease] t2-servicefabricmanagedclusters-2024-10-08-57405(can only be merged by SDK owner) (#37768) * code and test * update-testcase * update-testcases --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-containerinstance-2024-10-21-66631(can only be merged by SDK owner) (#38005) * code and test * update-testcase * Update CHANGELOG.md * Update CHANGELOG.md * Update CHANGELOG.md --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> * [sdk generation pipeline] bump typespec-python 0.36.1 (#38008) * update version * update package.json * [AutoRelease] t2-dnsresolver-2024-10-12-16936(can only be merged by SDK owner) (#37864) * code and test * update-testcase * Update CHANGELOG.md * Update CHANGELOG.md --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> Co-authored-by: Yuchao Yan * new asset after fix in conftest * asset * chore: Update assets.json * Move perf pipelines to TME subscription (#38020) Co-authored-by: Wes Haggard * fix * after-comments * fix * asset * new asset with 1 test recording only * chore: Update assets.json * conftest fix * assets change * new test * few changes * removing proxy start * added all tests * asset * fixes * fixes with asset * asset-after-tax * enabling 2 more tests * unit test fix * asset * new asset * fixes per comments * changes by black * merge fix * pylint fix * pylint fix * ground test fix * fixes - pylint, black, mypy * more tests * docstring fixes * doc string fix * asset * few updates after Nagkumar review * fix * fix mypy --------- Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com> Co-authored-by: Wes Haggard Co-authored-by: Wes Haggard Co-authored-by: Nagkumar Arkalgud Co-authored-by: Nagkumar Arkalgud Co-authored-by: Fabian Meiswinkel Co-authored-by: Anna Tisch Co-authored-by: Xiang Yan Co-authored-by: Paul Van Eck Co-authored-by: Neehar Duvvuri <40341266+needuv@users.noreply.github.com> Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> Co-authored-by: annie-mac Co-authored-by: Scott Beddall <45376673+scbedd@users.noreply.github.com> Co-authored-by: McCoy Patiño <39780829+mccoyp@users.noreply.github.com> Co-authored-by: kdestin <101366538+kdestin@users.noreply.github.com> Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> Co-authored-by: ChenxiJiang333 Co-authored-by: Yuchao Yan * azure-ai-evaluation release 1.0.0b5 2024-10-28 (#38138) * Update task_query_response.prompty remove required keys * Update task_simulate.prompty * Update task_query_response.prompty * Update task_simulate.prompty * Fix the api_key needed * Update for release * Black fix for file * Add original text in global context * Update test * Update the indirect attack simulator * Black suggested fixes * Update simulator prompty * Update adversarial scenario enum to exclude XPIA * Update changelog * Black fixes * Remove duplicate import * Fix the mypy error * Mypy please be happy * Updates to non adv simulator * accept context from assistant messages, exclude them when using them for conversation * update changelog * pylint fixes * pylint fixes * remove redundant quotes * Fix typo * pylint fix * Update broken tests * Include the grounding json in the manifest * Fix typo * Come on package * Release 1.0.0b5 * Notice from Chang * Remove adv_conv template parameters from the outputs --------- Co-authored-by: Nagkumar Arkalgud Co-authored-by: Nagkumar Arkalgud * open apiview for mgmt sdk (#38143) * [AutoRelease] t2-appplatform-2024-10-25-72111(can only be merged by SDK owner) (#38092) * code and test * changelog * changelog * changelog * Update CHANGELOG.md * Update _version.py * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: Yuchao Yan Co-authored-by: ChenxiJiang333 * AzurePipelinesCredential | adding mlflow uri func (#36580) * Bug 3323988: Regex fix and indices correction for model download * fixing test case * adding mlflow tracking uri func * passing service context to azureml mlflow * final flow APC complete * modify host_url * fixing unit test cases * changing mock for urlparse * fixing the log msg * Update changelog (#38133) * Increment package version after release of azure-ai-evaluation (#38142) * Remove psycopg2-binary from dev_requirements.txt (#38103) Remove psycopg2-binary from dev_requirements.txt * [Evaluation] Error improve for service-based evaluator/simulator (#38106) * Error improve for service-based evaluator/simulator * update * update print summary * update * fix failed tests * fix black * update changelog * update * update version * AzMon exporter: Serialize complex log bodies to json and set dependency type to `gen_ai.system` instead of N/A (#37694) * Serialize complex log bodies to json and support gen_ai.system * Experimental tags on ADV scenarios (#38166) * Update task_query_response.prompty remove required keys * Update task_simulate.prompty * Update task_query_response.prompty * Update task_simulate.prompty * Fix the api_key needed * Update for release * Black fix for file * Add original text in global context * Update test * Update the indirect attack simulator * Black suggested fixes * Update simulator prompty * Update adversarial scenario enum to exclude XPIA * Update changelog * Black fixes * Remove duplicate import * Fix the mypy error * Mypy please be happy * Updates to non adv simulator * accept context from assistant messages, exclude them when using them for conversation * update changelog * pylint fixes * pylint fixes * remove redundant quotes * Fix typo * pylint fix * Update broken tests * Include the grounding json in the manifest * Fix typo * Come on package * Release 1.0.0b5 * Notice from Chang * Remove adv_conv template parameters from the outputs * Update chanagelog * Experimental tags on adv scenarios --------- Co-authored-by: Nagkumar Arkalgud Co-authored-by: Nagkumar Arkalgud Co-authored-by: Nagkumar Arkalgud * Sync eng/common directory with azure-sdk-tools for PR 9259 (#38160) * Run perf tests under federated auth Run the tests under the same federated auth used to deploy the tests and setup the variables needed to configure AzurePipelineCredential. * Fix indention for working directory --------- Co-authored-by: Wes Haggard * Re-generated REST client after re-copying Swagger folder for `2024-10-01-preview`. (#38003) - Re-copied Swagger folder from https://github.com/Azure/azure-rest-api-specs/tree/release-machinelearningservices-Microsoft.MachineLearningServices-2024-10-01-preview/specification/machinelearningservices/resource-manager/Microsoft.MachineLearningServices/preview/2024-10-01-preview - Up to https://github.com/Azure/azure-rest-api-specs/commit/4a10268338c5edaacf2134a85a8e7d8a485451b2 - Re-generated REST client for `v2024-10-01-preview` - Unit and e2e tests (in Live mode) passed for feature `workspaces` (Details in PR) Co-authored-by: Jing Li * [Evaluation] Change RougeType to Enum (#38131) * Change RougeType to Enum * update test recording * update assets.json * try to update recordings to fix the failed tests * update * revert assets.json * update * Auto-enable Azure AI Inference instrumentation in Azure Monitor, update docs (#38071) * Auto-enable Azure AI Inference instrumentation in Azure Monitor, update docs * [AutoRelease] t2-redhatopenshift-2024-10-30-81004(can only be merged by SDK owner) (#38181) * code and test * update-testcase * Update CHANGELOG.md * Update _meta.json --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> * [AutoRelease] t2-resourcehealth-2024-10-30-72592(can only be merged by SDK owner) (#38186) * code and test * update-testcase * update-testcases * update-testcases --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-appconfiguration-2024-10-30-38914(can only be merged by SDK owner) (#38177) * code and test * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-databox-2024-10-30-61405(can only be merged by SDK owner) (#38182) * code and test * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-edgeorder-2024-10-30-57522(can only be merged by SDK owner) (#38178) * code and test * update-testcase * update-testcases --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-extendedlocation-2024-10-30-79235(can only be merged by SDK owner) (#38176) * code and test * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-digitaltwins-2024-10-30-74766(can only be merged by SDK owner) (#38179) * code and test * update-testcase * Rename test_azure_mgmt_digitaltwins_test.py to disable_test_azure_mgmt_digitaltwins_test.py * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> * Added get_arm_info (#38018) * Added get_arm_info * update * update * Update * Update release date * update * Update CHANGELOG.md (#38170) One column name updated to "New `max_token` for Generation" in CHANGLOG.md * Minor Readme fix (#38191) * Update task_query_response.prompty remove required keys * Update task_simulate.prompty * Update task_query_response.prompty * Update task_simulate.prompty * Fix the api_key needed * Update for release * Black fix for file * Add original text in global context * Update test * Update the indirect attack simulator * Black suggested fixes * Update simulator prompty * Update adversarial scenario enum to exclude XPIA * Update changelog * Black fixes * Remove duplicate import * Fix the mypy error * Mypy please be happy * Updates to non adv simulator * accept context from assistant messages, exclude them when using them for conversation * update changelog * pylint fixes * pylint fixes * remove redundant quotes * Fix typo * pylint fix * Update broken tests * Include the grounding json in the manifest * Fix typo * Come on package * Release 1.0.0b5 * Notice from Chang * Remove adv_conv template parameters from the outputs * Update chanagelog * Experimental tags on adv scenarios * Readme fix onbreaking change --------- Co-authored-by: Nagkumar Arkalgud Co-authored-by: Nagkumar Arkalgud Co-authored-by: Nagkumar Arkalgud * Minor fixes in vanilla OTel tracing sample (#38194) * Minor fixes in OTel tracing samples * Add test for get_arm_endpoints (#38196) * Add overloads for __call__ methods that accept query/response and conversation (#38097) * Add overloads for __call__ methods that take query/response and conversation * remove callable type hint * add docstrings/type hints * fix a typo * remove file * remove a bad param * add docs for relevance * fix some missing type hints * lint and run black * merge with main * fix some mypy errors, not all pylint * fix black errors * attempt to fix tests * fix retrieval * fix up tests and lint * fix some docstrings to mark some things as optional * [Monitor] Apply black formatting (#38129) Signed-off-by: Paul Van Eck * [CI] Update autorest CI to use Python 3.9 (#38175) Signed-off-by: Paul Van Eck * Eval qr json lines now has context from both turns and category if it exists (#38199) * Update task_query_response.prompty remove required keys * Update task_simulate.prompty * Update task_query_response.prompty * Update task_simulate.prompty * Fix the api_key needed * Update for release * Black fix for file * Add original text in global context * Update test * Update the indirect attack simulator * Black suggested fixes * Update simulator prompty * Update adversarial scenario enum to exclude XPIA * Update changelog * Black fixes * Remove duplicate import * Fix the mypy error * Mypy please be happy * Updates to non adv simulator * accept context from assistant messages, exclude them when using them for conversation * update changelog * pylint fixes * pylint fixes * remove redundant quotes * Fix typo * pylint fix * Update broken tests * Include the grounding json in the manifest * Fix typo * Come on package * Release 1.0.0b5 * Notice from Chang * Remove adv_conv template parameters from the outputs * Update chanagelog * Experimental tags on adv scenarios * Readme fix onbreaking change * Add the category and both user and assistant context to the response of qr_json_lines * Update changelog --------- Co-authored-by: Nagkumar Arkalgud Co-authored-by: Nagkumar Arkalgud Co-authored-by: Nagkumar Arkalgud * Fix doc issues (#38204) * Evaluation: Remove `parallel` from composite evaluators (#38168) * Remove `parallel` from composite evaluators * update recording * update * output_dir check * fix the test recording * fix the failed unit-test * update changelog * update * fix black issue * revert output_path related change * Update sdk/evaluation/azure-ai-evaluation/CHANGELOG.md Co-authored-by: Neehar Duvvuri <40341266+needuv@users.noreply.github.com> --------- Co-authored-by: Neehar Duvvuri <40341266+needuv@users.noreply.github.com> * [Core] Allow operation-level tracing attributes (#38164) Our documentation advertises that the `tracing_attributes` keyword argument can be passed in at both the client constructor level and the operation/method level. This makes this actually the case. Per-operation customization of tracing attributes can help users mark/identify spans from specific operations when spans are handled through custom span processors. Signed-off-by: Paul Van Eck * Sync eng/common directory with azure-sdk-tools for PR 9281 (#38213) * package-properties are now populated with matrix configurations from their ci.yml if present * create new code path for generate-job-matrix.yml which combines Create-JobMatrix and the "distribute-packages-to-matrix" action to generate dynamic matrices for PRs --------- Co-authored-by: Scott Beddall * Sync eng/common directory with azure-sdk-tools for PR 9290 (#38223) * identify and resolve missing function GenerateMatrixForConfig * identify and resolve miss-used null-coalesce operator --------- Co-authored-by: Scott Beddall * update (#38220) * [AutoRelease] t2-containerservicefleet-2024-10-31-68497(can only be merged by SDK owner) (#38221) * code and test * update-testcase * format * reformat --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 Co-authored-by: Yuchao Yan * [BatchAI] deprecate azure-mgmt-batchai (#38226) * [ModelsRepository] deprecating azure-iot-modelrepository (#38225) * [ModelsRepository] deprecating iot-modelrepository * add changelog section * [ServerManager] deprecating azure-mgmt-servermanager (#38229) * [DocumentDB] deprecate azure-mgmt-documentdb (#38227) * [EH/SB] ran black (#38210) * ran black * fix pylint * black sb * pylint * spacing * Update randomization pattern for Adversarial simulation (#38211) * Update randomization pattern for Adversarial simulation * update changelog * amqp msg (#38122) * Implement live metrics filtering for charts (part 1) (#37998) * Update CODEOWNERS for graphrbac owner (#38236) Updating the owner of graphrbac to Laurent as Yuchao said they are not the owners. * Multi modal docstring improvements (#38193) * docstring-update * doc string updates * doc string updates * Increment package version after release of azure-core (#38240) * kwarg type hints (#38214) * [Evaluation] add environment variable for API token refresh rate (#38162) * use env var for azure token refresh interval * update changelog and set default value for env var * cast refresh rate to int * fix pylint error * fix tox black issue * [Evaluation] Default to non-randomized order of template parameters (#38239) * Default to non-randomized order of template parameters * small changelog update * resolve issue with language-settings handling additional service changes (#38216) * Reduce unnecessary delete calls to ARM for storage accounts (#38246) Co-authored-by: Ben Broderick Phillips * clean up unused python script (#38128) * Sync eng/common directory with azure-sdk-tools for PR 9288 (#38243) * Update Logging * Update eng/common/scripts/logging.ps1 --------- Co-authored-by: Chidozie Ononiwu Co-authored-by: Wes Haggard * [Scheduler] deprecate azure-mgmt-scheduler (#38228) * [ServiceManagement] deprecate azure-servicemanagement-legacy (#38230) * [GraphRBAC] deprecating package (#38224) * [GraphRBAC] updating docs to point to replacement repo * update version file * fix link * Enable py2docfx docs gen tool, remove the dockerimage docs validation (#38085) * Enable py2docfx docs gen tool, remove the dockerimage docs validation * Remove the --no-venv-required flag from the command line which was a leftover copy/paste from the original PR * Change the version to a pre-release for testing as the released version doesn't appear to have these changes * Update py2docfx version since previous version had problems on Linux * Change the python command to add -u so the sdtout and stderr streams are unbuffered and we see the full output * Updates for feedback and to see what's going on with the output * Remove the Get-Content line which should have been removed in the last commit * Print the command output on actual different lines * Finish the sentence in the comment * Sync eng/common directory with azure-sdk-tools for PR 9294 (#38251) Co-authored-by: Scott Beddall * [core] add servicemanagement legacy to ci for release (#38253) * Session Token Management APIs (#36971) * merge from main and resolve conflicts * remove async keyword from changeFeed query in aio package * refactor * refactor * fix pylint * added public surface methods * pylint fix * fix * added functionality for merging session tokens from logical pk * fix mypy * added tests for basic merge and split * resolve comments * resolve comments * resolve comments * resolve comments * fix pylint * fix mypy * fix tests * add tests * fix pylint * fix and resolve comments * fix and resolve comments * Added isSubsetFeedRange logic * Added request context to crud operations, session token helpers * revert unnecessary change * Added more tests * Added more tests * Changed tests to use new public feed range and more test coverage for request context * Added more tests * Fix tests and add changelog * fix spell checks * Added tests and pushed request context to client level * Added async methods and removed feed range from request context * fix tests * fix tests and pylint * Reacting to comments * Reacting to comments * pylint and added hpk tests * reacting to comments * fix tests and mypy * fix mypy * fix mypy * reacting to comments * reacting to comments * reacting to comments * fix cspell * rename method to get_latest_session_token * reacting to reverted feed range * change based on the api review * Reacting to API review and adding samples. * Fixed pylint * Reacting to comments * Reacting to comments * Reacting to comments * Reacting to comments * Fix pydoc * Fix pydoc * reacting to comments * reacting to comments --------- Co-authored-by: annie-mac * [AutoRelease] t2-network-2024-10-31-29845(can only be merged by SDK owner) (#38235) * code and test * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [ServiceBus/EventHub] add service specific message annotations to receiver logs (#38090) * [ServiceBus] add service specific message annotations to receiver logs * add to eventhub + update readme logging to include thread formatting * only log service annotations header props * move logging to sdk layer * move msg logging into eh consumer * remove sb pyamqp recvr logs * black * Updating CODEOWNERS for Synapse (#38255) Wan Yang no longer uses the @wonner alias and uses @wanyang7. * Evaluation: Fix the `output_path` parameter of `evaluate` API doesn't support relative path (#38241) * Fix output_path parameter doesn't support relative path * add comments * fix the test * update * minor update * update * [synapse] deprecate azure-synapse (#38262) * [Synapse] deprecation azure-mgmt-synapse * update synapse rel date * [DocumentDB] update deprecation release date (#38265) * [CognitiveServices] deprecate vision packages (#38206) * deprecate computervision * deprecate vision face * contentmoderator * update release date * update changelog date * update release date * customvision * RAI service input sanitization (#38247) * escape eval inputs * new recordings and disable accidentally enabled test * save recordings again * save recordings again again * save recordings again again again * update groundedness threshold * Update sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_builtin_evaluators.py Co-authored-by: Billy Hu * correct new check --------- Co-authored-by: Billy Hu * pass params from ci.yml to cosmos-sdk-client appropriately (#38272) * Fix __call__ Overload Types (#38238) * Fix __call__ overload issues * fix typing issue * make query required for groundednesspro * fix a malformatted docstring * fix some type hints and remove eval_last_turn from evaluators * fix optional import * comment out eval last turn section * Update deprecation_process.md (#38270) Adding a section on: - adding the artifact to the ci before release - skipping checks for other failing packages in the pipeline for release if needed * [DocumentDB] add changelog to manifest.ini (#38273) * [evaluation] Add support for using evaluate() with evaluators that have missing inputs (#38276) * Update evaluate to allow optional params + add tests * Record tests * Exclude optional params from 'missing inputs' warning * Add tests * Record tests * Fix linting errors * Merge from main and fix linting error * Add unit test for missing output * Update convo test to assert per turn results * Run linting * Add test data * Re-record test * disabled black in pyproject.toml for all packages (#38271) * [AutoRelease] t2-postgresqlflexibleservers-2024-10-30-49242(can only be merged by SDK owner) (#38188) * code and test * update-testcase * Update CHANGELOG.md * Update CHANGELOG.md --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> * [AutoRelease] t2-devtestlabs-2024-11-04-17468(can only be merged by SDK owner) (#38286) * code and test * update-testcase * update changelog * Update CHANGELOG.md * Update _version.py * Update setup.py --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 Co-authored-by: Yuchao Yan Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> * [AutoRelease] t2-sql-2024-10-03-42323(can only be merged by SDK owner) (#37698) * code and test * Update CHANGELOG.md * Update sdk/sql/azure-mgmt-sql/CHANGELOG.md * Update sdk/sql/azure-mgmt-sql/CHANGELOG.md * update-testcase * fix generated samples * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> Co-authored-by: Yuchao Yan Co-authored-by: ChenxiJiang333 * [EventHub] add ssl_context kwarg to clients (#37702) * remove verify from pyamqp JWTToken * add ssl_context kwarg * add tests * fix merge * fix failing/lint/mypy * lint * separate ssl_context property from conn verify * make sb changes * add tests * changelog * black * Update CHANGELOG.md (#38301) * download_file is fully annotated (#38284) * Release azure-monitor-opentelemetry-exporter (#38310) * Increment package version after release of azure-monitor-opentelemetry (#38308) * Eval/bugfix/content safety parallel (#38307) * fix cs eval * recordings and cl * target newly released proxy version (#38282) Co-authored-by: Scott Beddall * [Storage] Added connection pool note to `max_concurrency` kwarg for upload/download APIs (#38254) * Sync eng/common directory with azure-sdk-tools for PR 9308 (#38311) Remove unnecessary Resolve-Path in Get-PrPkgProperties Co-authored-by: Scott Beddall * Version/location updates for stress script usage (#38281) Co-authored-by: Ben Broderick Phillips * [AutoRelease] t2-loganalytics-2024-11-04-45063(can only be merged by SDK owner) (#38292) * code and test * update-testcase * fix generated samples * update version --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-automation-2024-11-04-74277(can only be merged by SDK owner) (#38294) * code and test * update-testcase * Update CHANGELOG.md --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> * Broker on mac support (#38274) * Broker on mac support * update * update * update * update * update * update readme * update * Add firewallsku as ManagedNetwork property (#37885) * Generated REST client for upcoming API version `v2024-10-01-preview`. - Created Swagger folder `2024-10-01-preview` by copying from the same one in GitHub project `azure-rest-api-specs` - https://github.com/Azure/azure-rest-api-specs/tree/release-machinelearningservices-Microsoft.MachineLearningServices-2024-10-01-preview/specification/machinelearningservices/resource-manager/Microsoft.MachineLearningServices/preview/2024-10-01-preview - Up to https://github.com/Azure/azure-rest-api-specs/pull/30776 - Safe to merge in for not bumping up callers' API versions with it - Cannot bump up before registering it for RPs in selected regions - Added tag `v2024-10-01-preview` to `readme.md` - Generated REST client for `v2024-10-01-preview` * Initial commit * Initial commit * add new test for firewallsku * add new test for firewallsku * remove irrelevant changes * add addressprefix changes * address review comments * address review comments * reset the file in rest client * address review comments --------- Co-authored-by: Jing Li Co-authored-by: Nethra Sashikar * [AutoRelease] t2-managementgroups-2024-11-04-45946(can only be merged by SDK owner) (#38291) * code and test * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-managedservices-2024-11-04-44075(can only be merged by SDK owner) (#38287) * code and test * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-marketplaceordering-2024-11-04-08673(can only be merged by SDK owner) (#38288) * code and test * update-testcase * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-servicebus-2024-11-04-58886(can only be merged by SDK owner) (#38293) * code and test * update-testcase * update version --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 Co-authored-by: msyyc <70930885+msyyc@users.noreply.github.com> * [Synapse] azure-synapse post deprecation (#38315) * [Cognitive Services] vision post-deprecation (#38304) * [Cognitive Services] vision post-deprecation * black on active cs packages * [Cosmos] documentdb post deprecation (#38314) * [sdk generation pipeline] fix logic to extract swagger file (#38334) * fix logic to extract swagger file * fix logic to extract swagger file * Update deprecation_process.md for Verify Readmes failure (#38333) Adding a section for Verify Readmes failure following #37975 . * Increment package version after release of azure-identity-broker (#38337) * Edit pass on Azure Identity Broker README (#38339) * Edit pass on Azure Identity Broker README * Change version * [Core] Deprecate OpenCensus tracing plugin (#37975) Signed-off-by: Paul Van Eck * [Core] servicemanagement-legacy post deprecation (#38319) * Prompt support for Inference SDK (#37917) * Prompty support within Azure AI Inference SDK * Fix unit test * Address PR feedback with copyright, merge PromptConfig to PromptTemplate * Add comment and set model_name as optional * Bug fixes * Updated parameter names from PM feedbacks * Improve sample code and unit tests * Update readme and comments * Rename files * Address PR comment * add Pydantic as dependency * Fix type errors * Fix spelling issues * Address PR comments and fix linter issues * Fix type import for "Self" * Change to keyword-only constructor and fix linter issues * Rename function `from_message` to `from_str`; `render` to `create_messages` * Change from `from_str` to `from_string` * Merge latest code from `microsoft/prompty` and resolve linter issues * Fix PR comment * Fix PR comments * Remove a defunct variable from docindex.yml (#38342) * Fix errors from sphinx and mypy * Changed parameter to `mode` Since `change_feed_mode` was only used for `query_items_change_feed`, it doesn't have to be specified. `mode` should be good enough. * Fixed typo * Changed 'mode' to be string type * Reverted necessary type def * Addressed comments * Added samples for change_feed_mode * Addressed comments * Removed unnecessary docstring * Remove mode if 'continuation' was in override definition * add test samples tracking (#38502) * add test samples tracking * add samples tracking to csv/html/md reports * fix for catastrophic pipeline failure * Add OpenTelemetry LoggingHandler conditionally (#38549) * Add helpers to log a GitHub "notice" (#38574) Co-authored-by: Mike Harder * [AutoRelease] t2-cosmosdb-2024-11-14-60943(can only be merged by SDK owner) (#38531) * code and test * update-testcase --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-mysqlflexibleservers-2024-11-05-47456(can only be merged by SDK owner) (#38329) * code and test * update-testcase * update-format --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 * [AutoRelease] t2-netapp-2024-11-08-58381(can only be merged by SDK owner) (#38411) * code and test * Update CHANGELOG.md * Update CHANGELOG.md * assets * update-format --------- Co-authored-by: azure-sdk Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> Co-authored-by: Audunn Baldvinsson Co-authored-by: ChenxiJiang333 * Shrike (#38560) * Datastore auth bug (#38586) * Increment package version after release of azure-search-documents (#38593) * Merge App Config Provider Beta to Main (#38579) * Revert "Remove Telemetry from main (#37783)" (#37812) This reverts commit a65dfb2c24a90c0b8218bbb968b92ea0abf2a5ff. * Allocation Id (#37840) * Adding Telemetry * Telemetry Support * fixing formatting * Update _azureappconfigurationprovider.py * Update _azureappconfigurationproviderasync.py * formatting * changing doc style due to pylint-next * fixing kwargs docs * Formatting * Review comments * Changed label checking. * black format changes * pylint * Update sdk/appconfiguration/azure-appconfiguration-provider/azure/appconfiguration/provider/_azureappconfigurationprovider.py Co-authored-by: Avani Gupta * added space checks * Update conftest.py * moved telemetry to client wrapper * fixing format * updating after merge * fixing black issue * removing unused imports * AllocationId * Update CODEOWNERS * Update CODEOWNERS * fixing issues * Update _client_manager_base.py * Fixing configuration value empty in calc * fixing pylint * Update _constants.py * review comments * fixing allocation check * format fix --------- Co-authored-by: Avani Gupta * Python Provider 2.0.0b2 Changelog update (#37860) * Update CHANGELOG.md * Update CHANGELOG.md * Increment package version after release of azure-appconfiguration-provider (#37877) * Update _client_manager_base.py (#38019) * App Config Allocation Id Update (#38065) * updated calc to sort keys * Update CHANGELOG.md * Allocation id update (#38242) * updated calc to sort keys * Update CHANGELOG.md * Update _client_manager_base.py * Update CHANGELOG.md (#38521) * Increment package version after release of azure-appconfiguration-provider (#38553) * Removing Allocation ID (#38555) * Removing Allocation ID * Remove constants * Update _client_manager_base.py --------- Co-authored-by: Avani Gupta Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com> * batching adjustments for create-prjobmatrix (#38597) Co-authored-by: Scott Beddall * [EG] resource notification event (#38100) * rn * update * update * bump * black * Updated doc strings * Merged main * Addressed comments * Revert "Merged main" This reverts commit 217dabc769d52cfd7432af8ce92029e1bfb2616f. * Added comment why it is safe to raise exception if mode was missing * Added comment why it is safe to raise exception if mode was missing * Moved the feature update log under unreleased features * Add missing period in changelog --------- Signed-off-by: Paul Van Eck Co-authored-by: hamshavathimunibyraiah <125092972+hamshavathimunibyraiah@users.noreply.github.com> Co-authored-by: Paul Van Eck Co-authored-by: Wes Haggard Co-authored-by: Waqas Javed <7674577+w-javed@users.noreply.github.com> Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com> Co-authored-by: Wes Haggard Co-authored-by: Nagkumar Arkalgud Co-authored-by: Nagkumar Arkalgud Co-authored-by: Fabian Meiswinkel Co-authored-by: Anna Tisch Co-authored-by: Xiang Yan Co-authored-by: Neehar Duvvuri <40341266+needuv@users.noreply.github.com> Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> Co-authored-by: annie-mac Co-authored-by: Scott Beddall <45376673+scbedd@users.noreply.github.com> Co-authored-by: McCoy Patiño <39780829+mccoyp@users.noreply.github.com> Co-authored-by: kdestin <101366538+kdestin@users.noreply.github.com> Co-authored-by: ChenxiJiang333 <119990644+ChenxiJiang333@users.noreply.github.com> Co-authored-by: ChenxiJiang333 Co-authored-by: Yuchao Yan Co-authored-by: Nagkumar Arkalgud Co-authored-by: Kshitij Chawla <166698309+kshitij-microsoft@users.noreply.github.com> Co-authored-by: sanchez-alex <141684261+sanchez-alex@users.noreply.github.com> Co-authored-by: Leighton Chen Co-authored-by: Billy Hu Co-authored-by: Liudmila Molkova Co-authored-by: Nagkumar Arkalgud Co-authored-by: Jing Li Co-authored-by: Jing Li Co-authored-by: changliu2 <99364750+changliu2@users.noreply.github.com> Co-authored-by: Yalin Li Co-authored-by: Scott Beddall Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> Co-authored-by: Libba Lawrence Co-authored-by: slister1001 <103153180+slister1001@users.noreply.github.com> Co-authored-by: Matthew Metcalf Co-authored-by: Ben Broderick Phillips Co-authored-by: Chidozie Ononiwu Co-authored-by: James Suplizio Co-authored-by: MilesHolland <108901744+MilesHolland@users.noreply.github.com> Co-authored-by: Diondra <16376603+diondrapeck@users.noreply.github.com> Co-authored-by: Peter Wu <162184229+weirongw23-msft@users.noreply.github.com> Co-authored-by: Nethra C Sashikar Co-authored-by: Nethra Sashikar Co-authored-by: msyyc <70930885+msyyc@users.noreply.github.com> Co-authored-by: Scott Addie <10702007+scottaddie@users.noreply.github.com> Co-authored-by: David Wu Co-authored-by: Krista Pratico Co-authored-by: Mike Harder Co-authored-by: Audunn Baldvinsson Co-authored-by: Amit Chauhan <70937115+achauhan-scc@users.noreply.github.com> Co-authored-by: Avani Gupta --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + sdk/cosmos/azure-cosmos/azure/cosmos/_base.py | 4 +- .../cosmos/_change_feed/change_feed_state.py | 122 +++++----- .../cosmos/_change_feed/change_feed_utils.py | 146 ++++++++++++ .../azure/cosmos/_cosmos_client_connection.py | 2 + .../azure/cosmos/aio/_container.py | 134 +++++------ .../azure/cosmos/aio/_database.py | 9 + .../azure-cosmos/azure/cosmos/container.py | 142 ++++++------ .../azure-cosmos/azure/cosmos/database.py | 9 + .../azure/cosmos/http_constants.py | 5 + .../samples/change_feed_management.py | 56 +++++ .../samples/change_feed_management_async.py | 55 +++++ .../azure-cosmos/test/test_change_feed.py | 208 ++++++++++++++++-- .../test/test_change_feed_async.py | 173 +++++++++++++-- 14 files changed, 840 insertions(+), 226 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_utils.py diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 66274aa624ad..fcec879fc406 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.9.1 (Unreleased) #### Features Added +* Added change feed mode support in `query_items_change_feed`. See [PR 38105](https://github.com/Azure/azure-sdk-for-python/pull/38105) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py index 2362491898b8..5330089bdcba 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py @@ -61,7 +61,8 @@ 'supported_query_features': 'supportedQueryFeatures', 'query_version': 'queryVersion', 'priority': 'priorityLevel', - 'no_response': 'responsePayloadOnWriteDisabled' + 'no_response': 'responsePayloadOnWriteDisabled', + 'max_item_count': 'maxItemCount', } # Cosmos resource ID validation regex breakdown: @@ -170,6 +171,7 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches # set consistency level. check if set via options, this will override the default if options.get("consistencyLevel"): consistency_level = options["consistencyLevel"] + # TODO: move this line outside of if-else cause to remove the code duplication headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level elif default_client_consistency_level is not None: consistency_level = default_client_consistency_level diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py index ac6f00017af4..4f513a2d8662 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py @@ -29,6 +29,8 @@ from abc import ABC, abstractmethod from enum import Enum from typing import Optional, Union, List, Any, Dict, Deque +import logging +from typing_extensions import Literal from azure.cosmos import http_constants from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromInternal, \ @@ -176,18 +178,18 @@ def apply_server_response_continuation(self, continuation: str, has_modified_res class ChangeFeedStateV2(ChangeFeedState): container_rid_property_name = "containerRid" - change_feed_mode_property_name = "mode" + mode_property_name = "mode" change_feed_start_from_property_name = "startFrom" continuation_property_name = "continuation" - # TODO: adding change feed mode def __init__( self, container_link: str, container_rid: str, feed_range: FeedRangeInternal, change_feed_start_from: ChangeFeedStartFromInternal, - continuation: Optional[FeedRangeCompositeContinuation] + continuation: Optional[FeedRangeCompositeContinuation], + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] ) -> None: self._container_link = container_link @@ -208,6 +210,8 @@ def __init__( else: self._continuation = continuation + self._mode = "LatestVersion" if mode is None else mode + super(ChangeFeedStateV2, self).__init__(ChangeFeedStateVersion.V2) @property @@ -218,17 +222,14 @@ def to_dict(self) -> Dict[str, Any]: return { self.version_property_name: ChangeFeedStateVersion.V2.value, self.container_rid_property_name: self._container_rid, - self.change_feed_mode_property_name: "LatestVersion", + self.mode_property_name: self._mode, self.change_feed_start_from_property_name: self._change_feed_start_from.to_dict(), self.continuation_property_name: self._continuation.to_dict() if self._continuation is not None else None } - def populate_request_headers( + def set_start_from_request_headers( self, - routing_provider: SmartRoutingMapProvider, request_headers: Dict[str, Any]) -> None: - request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue - # When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time # of the documents may not be sequential. # So when reading the changeFeed by LSN, it is possible to encounter documents with lower _ts. @@ -243,11 +244,10 @@ def populate_request_headers( self._continuation.current_token.feed_range) change_feed_start_from_feed_range_and_etag.populate_request_headers(request_headers) - # based on the feed range to find the overlapping partition key range id - over_lapping_ranges =\ - routing_provider.get_overlapping_ranges( - self._container_link, - [self._continuation.current_token.feed_range]) + def set_pk_range_id_request_headers( + self, + over_lapping_ranges, + request_headers: Dict[str, Any]) -> None: if len(over_lapping_ranges) > 1: raise self.get_feed_range_gone_error(over_lapping_ranges) @@ -260,30 +260,41 @@ def populate_request_headers( # the current token feed range spans less than single physical partition # for this case, need to set both the partition key range id and epk filter headers request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_ranges[0]["id"] - request_headers[ - http_constants.HttpHeaders.StartEpkString] = self._continuation.current_token.feed_range.min - request_headers[ - http_constants.HttpHeaders.EndEpkString] = self._continuation.current_token.feed_range.max + request_headers[http_constants.HttpHeaders.StartEpkString] = self._continuation.current_token.feed_range.min + request_headers[http_constants.HttpHeaders.EndEpkString] = self._continuation.current_token.feed_range.max - async def populate_request_headers_async( + def set_mode_request_headers( self, - async_routing_provider: AsyncSmartRoutingMapProvider, request_headers: Dict[str, Any]) -> None: - request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue + if self._mode == "AllVersionsAndDeletes": + request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.FullFidelityFeedHeaderValue + request_headers[http_constants.HttpHeaders.ChangeFeedWireFormatVersion] = \ + http_constants.HttpHeaders.SeparateMetaWithCrts + else: + request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue - # When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time - # of the documents may not be sequential. - # So when reading the changeFeed by LSN, it is possible to encounter documents with lower _ts. - # In order to guarantee we always get the documents after customer's point start time, - # we will need to always pass the start time in the header. - self._change_feed_start_from.populate_request_headers(request_headers) + def populate_request_headers( + self, + routing_provider: SmartRoutingMapProvider, + request_headers: Dict[str, Any]) -> None: + self.set_start_from_request_headers(request_headers) + + # based on the feed range to find the overlapping partition key range id + over_lapping_ranges = \ + routing_provider.get_overlapping_ranges( + self._container_link, + [self._continuation.current_token.feed_range]) + + self.set_pk_range_id_request_headers(over_lapping_ranges, request_headers) + + self.set_mode_request_headers(request_headers) - if self._continuation.current_token is not None and self._continuation.current_token.token is not None: - change_feed_start_from_feed_range_and_etag = \ - ChangeFeedStartFromETagAndFeedRange( - self._continuation.current_token.token, - self._continuation.current_token.feed_range) - change_feed_start_from_feed_range_and_etag.populate_request_headers(request_headers) + + async def populate_request_headers_async( + self, + async_routing_provider: AsyncSmartRoutingMapProvider, + request_headers: Dict[str, Any]) -> None: + self.set_start_from_request_headers(request_headers) # based on the feed range to find the overlapping partition key range id over_lapping_ranges = \ @@ -291,22 +302,9 @@ async def populate_request_headers_async( self._container_link, [self._continuation.current_token.feed_range]) - if len(over_lapping_ranges) > 1: - raise self.get_feed_range_gone_error(over_lapping_ranges) + self.set_pk_range_id_request_headers(over_lapping_ranges, request_headers) - overlapping_feed_range = Range.PartitionKeyRangeToRange(over_lapping_ranges[0]) - if overlapping_feed_range == self._continuation.current_token.feed_range: - # exactly mapping to one physical partition, only need to set the partitionKeyRangeId - request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_ranges[0]["id"] - else: - # the current token feed range spans less than single physical partition - # for this case, need to set both the partition key range id and epk filter headers - request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = \ - over_lapping_ranges[0]["id"] - request_headers[http_constants.HttpHeaders.StartEpkString] = \ - self._continuation.current_token.feed_range.min - request_headers[http_constants.HttpHeaders.EndEpkString] = \ - self._continuation.current_token.feed_range.max + self.set_mode_request_headers(request_headers) def populate_feed_options(self, feed_options: Dict[str, Any]) -> None: pass @@ -367,12 +365,20 @@ def from_continuation( if continuation_data is None: raise ValueError(f"Invalid continuation: [Missing {ChangeFeedStateV2.continuation_property_name}]") continuation = FeedRangeCompositeContinuation.from_json(continuation_data) - return ChangeFeedStateV2( + + mode = continuation_json.get(ChangeFeedStateV2.mode_property_name) + # All 'continuation_json' from ChangeFeedStateV2 must contain 'mode' property. For the 'continuation_json' + # from older ChangeFeedState versions won't even hit this point, since their version is not 'v2'. + if mode is None: + raise ValueError(f"Invalid continuation: [Missing {ChangeFeedStateV2.mode_property_name}]") + + return cls( container_link=container_link, container_rid=container_rid, feed_range=continuation.feed_range, change_feed_start_from=change_feed_start_from, - continuation=continuation) + continuation=continuation, + mode=mode) @classmethod def from_initial_state( @@ -394,6 +400,7 @@ def from_initial_state( raise ValueError("partitionKey is in the changeFeedStateContext, but missing partitionKeyFeedRange") else: # default to full range + logging.info("'feed_range' empty. Using full range by default.") feed_range = FeedRangeInternalEpk( Range( "", @@ -405,11 +412,12 @@ def from_initial_state( change_feed_start_from = ( ChangeFeedStartFromInternal.from_start_time(change_feed_state_context.get("startTime"))) - if feed_range is not None: - return cls( - container_link=container_link, - container_rid=collection_rid, - feed_range=feed_range, - change_feed_start_from=change_feed_start_from, - continuation=None) - raise ValueError("feed_range is empty") + mode = change_feed_state_context.get("mode") + + return cls( + container_link=container_link, + container_rid=collection_rid, + feed_range=feed_range, + change_feed_start_from=change_feed_start_from, + continuation=None, + mode=mode) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_utils.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_utils.py new file mode 100644 index 000000000000..15854405549e --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_utils.py @@ -0,0 +1,146 @@ +# The MIT License (MIT) +# Copyright (c) 2014 Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Internal Helper functions in the Azure Cosmos database change_feed service. +""" + +import warnings +from datetime import datetime +from typing import Any, Dict, Tuple + +CHANGE_FEED_MODES = ["LatestVersion", "AllVersionsAndDeletes"] + +def add_args_to_kwargs( + args: Tuple[Any, ...], + kwargs: Dict[str, Any] + ) -> None: + """Add positional arguments(args) to keyword argument dictionary(kwargs). + Since 'query_items_change_feed' method only allows the following 4 positional arguments in the exact order + and types, if the order and types don't match, errors will be raised. + If the positional arguments are in the correct orders and types, the arguments will be added to keyword arguments. + + 4 positional arguments: + - str 'partition_key_range_id': [Deprecated] ChangeFeed requests can be executed against specific partition + key ranges. This is used to process the change feed in parallel across multiple consumers. + - bool 'is_start_from_beginning': [Deprecated] Get whether change feed should start from + beginning (true) or from current (false). By default, it's start from current (false). + - str 'continuation': e_tag value to be used as continuation for reading change feed. + - int 'max_item_count': Max number of items to be returned in the enumeration operation. + + :param args: Positional arguments. Arguments must be in the following order: + 1. partition_key_range_id + 2. is_start_from_beginning + 3. continuation + 4. max_item_count + :type args: Tuple[Any, ...] + :param kwargs: Keyword arguments + :type kwargs: dict[str, Any] + """ + if len(args) > 4: + raise TypeError(f"'query_items_change_feed()' takes 4 positional arguments but {len(args)} were given.") + + if len(args) > 0: + keys = [ + 'partition_key_range_id', + 'is_start_from_beginning', + 'continuation', + 'max_item_count', + ] + for i, value in enumerate(args): + key = keys[i] + + if key in kwargs: + raise TypeError(f"'query_items_change_feed()' got multiple values for argument '{key}'.") + + kwargs[key] = value + +def validate_kwargs( + kwargs: Dict[str, Any] + ) -> None: + """Validate keyword arguments(kwargs). + The values of keyword arguments must match the expect type and conditions. If the conditions do not match, + errors will be raised with the error messages and possible ways to correct the errors. + + :param kwargs: Keyword arguments to verify for query_items_change_feed API + :keyword mode: Must be one of the values in the Enum, 'ChangeFeedMode'. + If the value is 'ALL_VERSIONS_AND_DELETES', the following keywords must be in the right condition: + - 'partition_key_range_id': Cannot be used at any time + - 'is_start_from_beginning': Must be 'False' + - 'start_time': Must be "Now" + :paramtype mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] + :keyword partition_key_range_id: Deprecated Warning. + :paramtype partition_key_range_id: str + :keyword is_start_from_beginning: Deprecated Warning. Cannot be used with 'start_time'. + :paramtype is_start_from_beginning: bool + :keyword start_time: Must be in supported types. + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :type kwargs: dict[str, Any] + """ + # Filter items with value None + kwargs = {key: value for key, value in kwargs.items() if value is not None} + + # Validate the keyword arguments + if "mode" in kwargs: + mode = kwargs["mode"] + if mode not in CHANGE_FEED_MODES: + raise ValueError( + f"Invalid mode was used: '{kwargs['mode']}'." + f" Supported modes are {CHANGE_FEED_MODES}.") + + if mode == 'AllVersionsAndDeletes': + if "partition_key_range_id" in kwargs: + raise ValueError( + "'AllVersionsAndDeletes' mode is not supported if 'partition_key_range_id'" + " was used. Please use 'feed_range' instead.") + if "is_start_from_beginning" in kwargs and kwargs["is_start_from_beginning"] is not False: + raise ValueError( + "'AllVersionsAndDeletes' mode is only supported if 'is_start_from_beginning'" + " is 'False'. Please use 'is_start_from_beginning=False' or 'continuation' instead.") + if "start_time" in kwargs and kwargs["start_time"] != "Now": + raise ValueError( + "'AllVersionsAndDeletes' mode is only supported if 'start_time' is 'Now'." + " Please use 'start_time=\"Now\"' or 'continuation' instead.") + + if "partition_key_range_id" in kwargs: + warnings.warn( + "'partition_key_range_id' is deprecated. Please pass in 'feed_range' instead.", + DeprecationWarning + ) + + if "is_start_from_beginning" in kwargs: + warnings.warn( + "'is_start_from_beginning' is deprecated. Please pass in 'start_time' instead.", + DeprecationWarning + ) + + if not isinstance(kwargs["is_start_from_beginning"], bool): + raise TypeError( + f"'is_start_from_beginning' must be 'bool' type," + f" but given '{type(kwargs['is_start_from_beginning']).__name__}'.") + + if kwargs["is_start_from_beginning"] is True and "start_time" in kwargs: + raise ValueError("'is_start_from_beginning' and 'start_time' are exclusive, please only set one of them.") + + if "start_time" in kwargs: + if not isinstance(kwargs['start_time'], datetime): + if kwargs['start_time'].lower() not in ["now", "beginning"]: + raise ValueError( + f"'start_time' must be either 'Now' or 'Beginning', but given '{kwargs['start_time']}'.") diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index 03cdcbb8e214..e3aab396b338 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -1177,6 +1177,7 @@ def _QueryChangeFeed( def fetch_fn(options: Mapping[str, Any]) -> Tuple[List[Dict[str, Any]], CaseInsensitiveDict]: if collection_link in self.__container_properties_cache: + # TODO: This will make deep copy. Check if this has any performance impact new_options = dict(options) new_options["containerRID"] = self.__container_properties_cache[collection_link]["_rid"] options = new_options @@ -3007,6 +3008,7 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]: # This case should be interpreted as an empty array. return [] + # TODO: copy is not needed if query was none, since the header was copied inside of "base.GetHeaders" initial_headers = self.default_headers.copy() # Copy to make sure that default_headers won't be changed. if query is None: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 5e7c808aded4..7584d1dea1a1 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -21,7 +21,6 @@ """Create, read, update and delete items in the Azure Cosmos DB SQL API service. """ -import warnings from datetime import datetime from typing import Any, Dict, Mapping, Optional, Sequence, Type, Union, List, Tuple, cast, overload, AsyncIterable from typing_extensions import Literal @@ -30,6 +29,7 @@ from azure.core.async_paging import AsyncItemPaged, AsyncList from azure.core.tracing.decorator import distributed_trace from azure.core.tracing.decorator_async import distributed_trace_async # type: ignore +from azure.cosmos._change_feed.change_feed_utils import validate_kwargs from ._cosmos_client_connection_async import CosmosClientConnection from ._scripts import ScriptsProxy @@ -508,6 +508,7 @@ def query_items_change_feed( start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, partition_key: PartitionKeyType, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed, in the order in which they were modified. @@ -518,13 +519,19 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] :keyword partition_key: The partition key that is used to define the scope (logical partition or a subset of a container) - :type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] + :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] :keyword Callable response_hook: A callable invoked with the response metadata. :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] @@ -539,6 +546,7 @@ def query_items_change_feed( max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed, in the order in which they were modified. @@ -550,11 +558,18 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] - :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :keyword priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ @@ -571,12 +586,15 @@ def query_items_change_feed( ) -> AsyncItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed, in the order in which they were modified. - :keyword str continuation: The continuation token retrieved from previous response. + :keyword str continuation: The continuation token retrieved from previous response. It contains chang feed mode. + :type continuation: str :keyword int max_item_count: Max number of items to be returned in the enumeration operation. - :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + :keyword priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ @@ -590,6 +608,7 @@ def query_items_change_feed( max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed in the entire container, @@ -601,11 +620,18 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] - :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :keyword priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ @@ -619,82 +645,58 @@ def query_items_change_feed( # pylint: disable=unused-argument """Get a sorted list of items that were changed, in the order in which they were modified. - :keyword str continuation: The continuation token retrieved from previous response. + :keyword str continuation: The continuation token retrieved from previous response. It contains chang feed mode. :keyword Dict[str, Any] feed_range: The feed range that is used to define the scope. :keyword partition_key: The partition key that is used to define the scope (logical partition or a subset of a container) - :type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] + :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :keyword start_time: The start time to start processing chang feed items. Beginning: Processing the change feed items from the beginning of the change feed. Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] - :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :keyword priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ # pylint: disable=too-many-statements - if kwargs.get("priority") is not None: - kwargs['priority'] = kwargs['priority'] + validate_kwargs(kwargs) feed_options = _build_options(kwargs) change_feed_state_context = {} - # Back compatibility with deprecation warnings for partition_key_range_id - if kwargs.get("partition_key_range_id") is not None: - warnings.warn( - "partition_key_range_id is deprecated. Please pass in feed_range instead.", - DeprecationWarning - ) - - change_feed_state_context["partitionKeyRangeId"] = kwargs.pop('partition_key_range_id') - - # Back compatibility with deprecation warnings for is_start_from_beginning - if kwargs.get("is_start_from_beginning") is not None: - warnings.warn( - "is_start_from_beginning is deprecated. Please pass in start_time instead.", - DeprecationWarning - ) - - if kwargs.get("start_time") is not None: - raise ValueError("is_start_from_beginning and start_time are exclusive, please only set one of them") - - is_start_from_beginning = kwargs.pop('is_start_from_beginning') - if is_start_from_beginning is True: - change_feed_state_context["startTime"] = "Beginning" - - # parse start_time - if kwargs.get("start_time") is not None: - start_time = kwargs.pop('start_time') - if not isinstance(start_time, (datetime, str)): - raise TypeError( - "'start_time' must be either a datetime object, or either the values 'Now' or 'Beginning'.") - change_feed_state_context["startTime"] = start_time - - # parse continuation token - if feed_options.get("continuation") is not None: - change_feed_state_context["continuation"] = feed_options.pop('continuation') - - if kwargs.get("max_item_count") is not None: - feed_options["maxItemCount"] = kwargs.pop('max_item_count') - - if kwargs.get("partition_key") is not None: - change_feed_state_context["partitionKey"] =\ - self._set_partition_key(cast(PartitionKeyType, kwargs.get("partition_key"))) - change_feed_state_context["partitionKeyFeedRange"] = \ - self._get_epk_range_for_partition_key(kwargs.pop('partition_key')) - - if kwargs.get("feed_range") is not None: + if "mode" in kwargs: + change_feed_state_context["mode"] = kwargs.pop("mode") + if "partition_key_range_id" in kwargs: + change_feed_state_context["partitionKeyRangeId"] = kwargs.pop("partition_key_range_id") + if "is_start_from_beginning" in kwargs and kwargs.pop('is_start_from_beginning') is True: + change_feed_state_context["startTime"] = "Beginning" + elif "start_time" in kwargs: + change_feed_state_context["startTime"] = kwargs.pop("start_time") + if "partition_key" in kwargs: + partition_key = kwargs.pop("partition_key") + change_feed_state_context["partitionKey"] = self._set_partition_key(cast(PartitionKeyType, partition_key)) + change_feed_state_context["partitionKeyFeedRange"] = self._get_epk_range_for_partition_key(partition_key) + if "feed_range" in kwargs: change_feed_state_context["feedRange"] = kwargs.pop('feed_range') + if "continuation" in feed_options: + change_feed_state_context["continuation"] = feed_options.pop("continuation") - feed_options["containerProperties"] = self._get_properties() feed_options["changeFeedStateContext"] = change_feed_state_context + feed_options["containerProperties"] = self._get_properties() - response_hook = kwargs.pop('response_hook', None) + response_hook = kwargs.pop("response_hook", None) if hasattr(response_hook, "clear"): response_hook.clear() diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py index 21b77de57ed8..49253b223f88 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py @@ -173,6 +173,7 @@ async def create_container( match_condition: Optional[MatchConditions] = None, analytical_storage_ttl: Optional[int] = None, vector_embedding_policy: Optional[Dict[str, Any]] = None, + change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> ContainerProxy: @@ -207,6 +208,8 @@ async def create_container( :keyword Dict[str, Any] vector_embedding_policy: The vector embedding policy for the container. Each vector embedding possesses a predetermined number of dimensions, is associated with an underlying data type, and is generated for a particular distance function. + :keyword Dict[str, Any] change_feed_policy: The change feed policy to apply 'retentionDuration' to + the container. :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. @@ -255,6 +258,8 @@ async def create_container( definition["computedProperties"] = computed_properties if vector_embedding_policy is not None: definition["vectorEmbeddingPolicy"] = vector_embedding_policy + if change_feed_policy is not None: + definition["changeFeedPolicy"] = change_feed_policy if full_text_policy is not None: definition["fullTextPolicy"] = full_text_policy @@ -291,6 +296,7 @@ async def create_container_if_not_exists( match_condition: Optional[MatchConditions] = None, analytical_storage_ttl: Optional[int] = None, vector_embedding_policy: Optional[Dict[str, Any]] = None, + change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> ContainerProxy: @@ -327,6 +333,8 @@ async def create_container_if_not_exists( :keyword Dict[str, Any] vector_embedding_policy: **provisional** The vector embedding policy for the container. Each vector embedding possesses a predetermined number of dimensions, is associated with an underlying data type, and is generated for a particular distance function. + :keyword Dict[str, Any] change_feed_policy: The change feed policy to apply 'retentionDuration' to + the container. :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. @@ -359,6 +367,7 @@ async def create_container_if_not_exists( session_token=session_token, initial_headers=initial_headers, vector_embedding_policy=vector_embedding_policy, + change_feed_policy=change_feed_policy, full_text_policy=full_text_policy, **kwargs ) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 22baa8933a0b..b60d1dbddc4d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -29,6 +29,7 @@ from azure.core import MatchConditions from azure.core.paging import ItemPaged from azure.core.tracing.decorator import distributed_trace +from azure.cosmos._change_feed.change_feed_utils import add_args_to_kwargs, validate_kwargs from ._base import ( build_options, @@ -328,6 +329,7 @@ def query_items_change_feed( start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, partition_key: PartitionKeyType, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed, in the order in which they were modified. @@ -338,14 +340,21 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] :keyword partition_key: The partition key that is used to define the scope (logical partition or a subset of a container) - :type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] + :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :type response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ @@ -359,6 +368,7 @@ def query_items_change_feed( max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: @@ -371,11 +381,18 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ @@ -392,12 +409,15 @@ def query_items_change_feed( ) -> ItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed, in the order in which they were modified. - :keyword str continuation: The continuation token retrieved from previous response. + :keyword str continuation: The continuation token retrieved from previous response. It contains chang feed mode. + :paramtype continuation: str :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ @@ -410,6 +430,7 @@ def query_items_change_feed( max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed in the entire container, @@ -421,11 +442,18 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ @@ -440,106 +468,68 @@ def query_items_change_feed( """Get a sorted list of items that were changed, in the order in which they were modified. - :keyword str continuation: The continuation token retrieved from previous response. + :keyword str continuation: The continuation token retrieved from previous response. It contains chang feed mode. :keyword Dict[str, Any] feed_range: The feed range that is used to define the scope. :keyword partition_key: The partition key that is used to define the scope (logical partition or a subset of a container) - :type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] + :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :keyword start_time: The start time to start processing chang feed items. Beginning: Processing the change feed items from the beginning of the change feed. Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :param Any args: args :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ # pylint: disable=too-many-statements - if kwargs.get("priority") is not None: - kwargs['priority'] = kwargs['priority'] + add_args_to_kwargs(args, kwargs) + validate_kwargs(kwargs) feed_options = build_options(kwargs) change_feed_state_context = {} - # Back compatibility with deprecation warnings for partition_key_range_id - if (args and args[0] is not None) or kwargs.get("partition_key_range_id") is not None: - warnings.warn( - "partition_key_range_id is deprecated. Please pass in feed_range instead.", - DeprecationWarning - ) - - try: - change_feed_state_context["partitionKeyRangeId"] = kwargs.pop('partition_key_range_id') - except KeyError: - change_feed_state_context['partitionKeyRangeId'] = args[0] - - # Back compatibility with deprecation warnings for is_start_from_beginning - if (len(args) >= 2 and args[1] is not None) or kwargs.get("is_start_from_beginning") is not None: - warnings.warn( - "is_start_from_beginning is deprecated. Please pass in start_time instead.", - DeprecationWarning - ) - - if kwargs.get("start_time") is not None: - raise ValueError("is_start_from_beginning and start_time are exclusive, please only set one of them") - - try: - is_start_from_beginning = kwargs.pop('is_start_from_beginning') - except KeyError: - is_start_from_beginning = args[1] - - if is_start_from_beginning is True: - change_feed_state_context["startTime"] = "Beginning" - - # parse start_time - if kwargs.get("start_time") is not None: - - start_time = kwargs.pop('start_time') - if not isinstance(start_time, (datetime, str)): - raise TypeError( - "'start_time' must be either a datetime object, or either the values 'Now' or 'Beginning'.") - change_feed_state_context["startTime"] = start_time - - # parse continuation token - if len(args) >= 3 and args[2] is not None or feed_options.get("continuation") is not None: - try: - continuation = feed_options.pop('continuation') - except KeyError: - continuation = args[2] - change_feed_state_context["continuation"] = continuation - - if len(args) >= 4 and args[3] is not None or kwargs.get("max_item_count") is not None: - try: - feed_options["maxItemCount"] = kwargs.pop('max_item_count') - except KeyError: - feed_options["maxItemCount"] = args[3] - - if kwargs.get("partition_key") is not None: - change_feed_state_context["partitionKey"] =\ - self._set_partition_key(cast(PartitionKeyType, kwargs.get('partition_key'))) - change_feed_state_context["partitionKeyFeedRange"] =\ - self._get_epk_range_for_partition_key(kwargs.pop('partition_key')) - - if kwargs.get("feed_range") is not None: + if "mode" in kwargs: + change_feed_state_context["mode"] = kwargs.pop("mode") + if "partition_key_range_id" in kwargs: + change_feed_state_context["partitionKeyRangeId"] = kwargs.pop("partition_key_range_id") + if "is_start_from_beginning" in kwargs and kwargs.pop('is_start_from_beginning') is True: + change_feed_state_context["startTime"] = "Beginning" + elif "start_time" in kwargs: + change_feed_state_context["startTime"] = kwargs.pop("start_time") + if "partition_key" in kwargs: + partition_key = kwargs.pop("partition_key") + change_feed_state_context["partitionKey"] = self._set_partition_key(cast(PartitionKeyType, partition_key)) + change_feed_state_context["partitionKeyFeedRange"] = self._get_epk_range_for_partition_key(partition_key) + if "feed_range" in kwargs: change_feed_state_context["feedRange"] = kwargs.pop('feed_range') + if "continuation" in feed_options: + change_feed_state_context["continuation"] = feed_options.pop("continuation") container_properties = self._get_properties() feed_options["changeFeedStateContext"] = change_feed_state_context feed_options["containerRID"] = container_properties["_rid"] - response_hook = kwargs.pop('response_hook', None) + response_hook = kwargs.pop("response_hook", None) if hasattr(response_hook, "clear"): response_hook.clear() result = self.client_connection.QueryItemsChangeFeed( self.container_link, options=feed_options, response_hook=response_hook, **kwargs ) + if response_hook: response_hook(self.client_connection.last_response_headers, result) return result diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py index 9940d1a932f5..f7e5a7f9d715 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py @@ -174,6 +174,7 @@ def create_container( # pylint:disable=docstring-missing-param match_condition: Optional[MatchConditions] = None, analytical_storage_ttl: Optional[int] = None, vector_embedding_policy: Optional[Dict[str, Any]] = None, + change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> ContainerProxy: @@ -204,6 +205,8 @@ def create_container( # pylint:disable=docstring-missing-param :keyword Dict[str, Any] vector_embedding_policy: **provisional** The vector embedding policy for the container. Each vector embedding possesses a predetermined number of dimensions, is associated with an underlying data type, and is generated for a particular distance function. + :keyword Dict[str, Any] change_feed_policy: The change feed policy to apply 'retentionDuration' to + the container. :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. @@ -250,6 +253,8 @@ def create_container( # pylint:disable=docstring-missing-param definition["computedProperties"] = computed_properties if vector_embedding_policy is not None: definition["vectorEmbeddingPolicy"] = vector_embedding_policy + if change_feed_policy is not None: + definition["changeFeedPolicy"] = change_feed_policy if full_text_policy is not None: definition["fullTextPolicy"] = full_text_policy @@ -293,6 +298,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param match_condition: Optional[MatchConditions] = None, analytical_storage_ttl: Optional[int] = None, vector_embedding_policy: Optional[Dict[str, Any]] = None, + change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> ContainerProxy: @@ -325,6 +331,8 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param :keyword Dict[str, Any] vector_embedding_policy: The vector embedding policy for the container. Each vector embedding possesses a predetermined number of dimensions, is associated with an underlying data type, and is generated for a particular distance function. + :keyword Dict[str, Any] change_feed_policy: The change feed policy to apply 'retentionDuration' to + the container. :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. @@ -359,6 +367,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param session_token=session_token, initial_headers=initial_headers, vector_embedding_policy=vector_embedding_policy, + change_feed_policy=change_feed_policy, full_text_policy=full_text_policy, **kwargs ) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py index 4c42cc55b0e2..b0b6a87e7ca2 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py @@ -219,6 +219,11 @@ class HttpHeaders: # Change feed AIM = "A-IM" IncrementalFeedHeaderValue = "Incremental feed" + FullFidelityFeedHeaderValue = "Full-Fidelity Feed" + ChangeFeedWireFormatVersion = "x-ms-cosmos-changefeed-wire-format-version" + + # Change feed wire format version + SeparateMetaWithCrts = "2021-09-15" # For Using Multiple Write Locations AllowTentativeWrites = "x-ms-cosmos-allow-tentative-writes" diff --git a/sdk/cosmos/azure-cosmos/samples/change_feed_management.py b/sdk/cosmos/azure-cosmos/samples/change_feed_management.py index f5e854406764..e790fe636be6 100644 --- a/sdk/cosmos/azure-cosmos/samples/change_feed_management.py +++ b/sdk/cosmos/azure-cosmos/samples/change_feed_management.py @@ -73,6 +73,51 @@ def read_change_feed_with_start_time(container, start_time): print('\nFinished reading all the change feed from start time of {}\n'.format(time)) +def read_change_feed_with_continuation(container, continuation): + print('\nReading change feed from continuation\n') + + # You can read change feed from a specific continuation token. + # You must pass in a valid continuation token. + response = container.query_items_change_feed(continuation=continuation) + for doc in response: + print(doc) + + print('\nFinished reading all the change feed from continuation\n') + +def delete_all_items(container): + print('\nDeleting all item\n') + + for item in container.query_items(query='SELECT * FROM c', enable_cross_partition_query=True): + # Deleting the current item + container.delete_item(item, partition_key=item['address']['state']) + + print('Deleted all items') + +def read_change_feed_with_all_versions_and_delete_mode(container): + change_feed_mode = "AllVersionsAndDeletes" + print("\nReading change feed with 'AllVersionsAndDeletes' mode.\n") + + # You can read change feed with a specific change feed mode. + # You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"]. + response = container.query_items_change_feed(mode=change_feed_mode) + for doc in response: + print(doc) + + print("\nFinished reading all the change feed with 'AllVersionsAndDeletes' mode.\n") + +def read_change_feed_with_all_versions_and_delete_mode_from_continuation(container, continuation): + change_feed_mode = "AllVersionsAndDeletes" + print("\nReading change feed with 'AllVersionsAndDeletes' mode.\n") + + # You can read change feed with a specific change feed mode from a specific continuation token. + # You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"]. + # You must pass in a valid continuation token. + response = container.query_items_change_feed(mode=change_feed_mode, continuation=continuation) + for doc in response: + print(doc) + + print("\nFinished reading all the change feed with 'AllVersionsAndDeletes' mode.\n") + def run_sample(): client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}) try: @@ -103,6 +148,17 @@ def run_sample(): read_change_feed(container) # Read Change Feed from timestamp read_change_feed_with_start_time(container, timestamp) + # Delete all items from container + delete_all_items(container) + # Read change feed with 'AllVersionsAndDeletes' mode + read_change_feed_with_all_versions_and_delete_mode(container) + continuation_token = container.client_connection.last_response_headers['etag'] + # Read change feed with 'AllVersionsAndDeletes' mode after create item + create_items(container, 10) + read_change_feed_with_all_versions_and_delete_mode_from_continuation(container,continuation_token) + # Read change feed with 'AllVersionsAndDeletes' mode after create/delete item + delete_all_items(container) + read_change_feed_with_all_versions_and_delete_mode_from_continuation(container,continuation_token) # cleanup database after sample try: diff --git a/sdk/cosmos/azure-cosmos/samples/change_feed_management_async.py b/sdk/cosmos/azure-cosmos/samples/change_feed_management_async.py index 9ea66d4ebcda..c8191d3e55f6 100644 --- a/sdk/cosmos/azure-cosmos/samples/change_feed_management_async.py +++ b/sdk/cosmos/azure-cosmos/samples/change_feed_management_async.py @@ -77,6 +77,50 @@ async def read_change_feed_with_start_time(container, start_time): print('\nFinished reading all the change feed from start time of {}\n'.format(time)) +async def read_change_feed_with_continuation(container, continuation): + print('\nReading change feed from continuation\n') + + # You can read change feed from a specific continuation token. + # You must pass in a valid continuation token. + response = container.query_items_change_feed(continuation=continuation) + async for doc in response: + print(doc) + + print('\nFinished reading all the change feed from continuation\n') + +async def delete_all_items(container): + print('\nDeleting all item\n') + + async for item in container.query_items(query='SELECT * FROM c'): + # Deleting the current item + await container.delete_item(item, partition_key=item['address']['state']) + + print('Deleted all items') + +async def read_change_feed_with_all_versions_and_delete_mode(container): + change_feed_mode = "AllVersionsAndDeletes" + print("\nReading change feed with 'AllVersionsAndDeletes' mode.\n") + + # You can read change feed with a specific change feed mode. + # You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"]. + response = container.query_items_change_feed(mode=change_feed_mode) + async for doc in response: + print(doc) + + print("\nFinished reading all the change feed with 'AllVersionsAndDeletes' mode.\n") + +async def read_change_feed_with_all_versions_and_delete_mode_from_continuation(container, continuation): + change_feed_mode = "AllVersionsAndDeletes" + print("\nReading change feed with 'AllVersionsAndDeletes' mode.\n") + + # You can read change feed with a specific change feed mode from a specific continuation token. + # You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"]. + # You must pass in a valid continuation token. + response = container.query_items_change_feed(mode=change_feed_mode, continuation=continuation) + async for doc in response: + print(doc) + + print("\nFinished reading all the change feed with 'AllVersionsAndDeletes' mode.\n") async def run_sample(): async with CosmosClient(HOST, MASTER_KEY) as client: @@ -108,6 +152,17 @@ async def run_sample(): await read_change_feed(container) # Read Change Feed from timestamp await read_change_feed_with_start_time(container, timestamp) + # Delete all items from container + await delete_all_items(container) + # Read change feed with 'AllVersionsAndDeletes' mode + await read_change_feed_with_all_versions_and_delete_mode(container) + continuation_token = container.client_connection.last_response_headers['etag'] + # Read change feed with 'AllVersionsAndDeletes' mode after create item + await create_items(container, 10) + await read_change_feed_with_all_versions_and_delete_mode_from_continuation(container, continuation_token) + # Read change feed with 'AllVersionsAndDeletes' mode after create/delete item + await delete_all_items(container) + await read_change_feed_with_all_versions_and_delete_mode_from_continuation(container, continuation_token) # cleanup database after sample try: diff --git a/sdk/cosmos/azure-cosmos/test/test_change_feed.py b/sdk/cosmos/azure-cosmos/test/test_change_feed.py index 456f8a7dbd5a..7c886e6dda5f 100644 --- a/sdk/cosmos/azure-cosmos/test/test_change_feed.py +++ b/sdk/cosmos/azure-cosmos/test/test_change_feed.py @@ -13,7 +13,16 @@ import azure.cosmos.exceptions as exceptions import test_config from azure.cosmos.partition_key import PartitionKey +from azure.cosmos._change_feed.change_feed_state import ChangeFeedStateV2 +ID = 'id' +CURRENT = 'current' +PREVIOUS = 'previous' +METADATA = 'metadata' +OPERATION_TYPE = 'operationType' +CREATE = 'create' +DELETE = 'delete' +E_TAG = 'etag' @pytest.fixture(scope="class") def setup(): @@ -26,9 +35,36 @@ def setup(): "tests.") test_client = cosmos_client.CosmosClient(config.host, config.masterKey), return { - "created_db": test_client[0].get_database_client(config.TEST_DATABASE_ID) + "created_db": test_client[0].get_database_client(config.TEST_DATABASE_ID), + "is_emulator": config.is_emulator } +def round_time(): + utc_now = datetime.now(timezone.utc) + return utc_now - timedelta(microseconds=utc_now.microsecond) + +def assert_change_feed(expected, actual): + if len(actual) == 0: + assert len(expected) == len(actual) + return + + #TODO: remove this if we can add flag to get 'previous' always + for item in actual: + if METADATA in item and item[METADATA][OPERATION_TYPE] == DELETE: + if ID in item[METADATA]: + item[PREVIOUS] = {ID: item[METADATA][ID]} + + # Sort actual by operation_type and id + actual = sorted(actual, key=lambda k: (k[METADATA][OPERATION_TYPE], k[CURRENT][ID]) if k[METADATA][OPERATION_TYPE] == CREATE else (k[METADATA][OPERATION_TYPE], k[PREVIOUS][ID])) + + for expected_change_feed, actual_change_feed in zip(expected, actual): + for expected_type, expected_data in expected_change_feed.items(): + assert expected_type in actual_change_feed + actual_data = actual_change_feed[expected_type] + for key, value in expected_data.items(): + assert key in actual_data + assert expected_data[key] == actual_data[key] + @pytest.mark.cosmosEmulator @pytest.mark.unittest @pytest.mark.usefixtures("setup") @@ -63,7 +99,7 @@ def test_query_change_feed_with_different_filter(self, change_feed_filter_param, filter_param = None # Read change feed from current should return an empty list - query_iterable = created_collection.query_items_change_feed(filter_param) + query_iterable = created_collection.query_items_change_feed(**filter_param) iter_list = list(query_iterable) assert len(iter_list) == 0 assert 'etag' in created_collection.client_connection.last_response_headers @@ -165,9 +201,6 @@ def test_query_change_feed_with_start_time(self, setup): PartitionKey(path="/pk")) batchSize = 50 - def round_time(): - utc_now = datetime.now(timezone.utc) - return utc_now - timedelta(microseconds=utc_now.microsecond) def create_random_items(container, batch_size): for _ in range(batch_size): # Generate a Random partition key @@ -218,14 +251,6 @@ def create_random_items(container, batch_size): # Should equal batch size assert totalCount == batchSize - # test an invalid value, Attribute error will be raised for passing non datetime object - invalid_time = "Invalid value" - try: - list(created_collection.query_items_change_feed(start_time=invalid_time)) - fail("Cannot format date on a non datetime object.") - except ValueError as e: #TODO: previously it is throwing AttributeError, now has changed into ValueError, is it breaking change? - assert "Invalid start_time 'Invalid value'" == e.args[0] - setup["created_db"].delete_container(created_collection.id) def test_query_change_feed_with_multi_partition(self, setup): @@ -252,5 +277,162 @@ def test_query_change_feed_with_multi_partition(self, setup): assert actual_ids == expected_ids + def test_query_change_feed_with_all_versions_and_deletes(self, setup): + partition_key = 'pk' + # 'retentionDuration' was required to enable `ALL_VERSIONS_AND_DELETES` for Emulator testing + change_feed_policy = {"retentionDuration": 10} if setup["is_emulator"] else None + created_collection = setup["created_db"].create_container("change_feed_test_" + str(uuid.uuid4()), + PartitionKey(path=f"/{partition_key}"), + change_feed_policy=change_feed_policy) + mode = 'AllVersionsAndDeletes' + + ## Test Change Feed with empty collection(Save the continuation token) + query_iterable = created_collection.query_items_change_feed( + mode=mode, + ) + expected_change_feeds = [] + actual_change_feeds = list(query_iterable) + cont_token1 = created_collection.client_connection.last_response_headers[E_TAG] + assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for created items from cont_token1 (Save the new continuation token) + new_documents = [{partition_key: f'pk{i}', ID: f'doc{i}'} for i in range(4)] + created_items = [] + for document in new_documents: + created_item = created_collection.create_item(body=document) + created_items.append(created_item) + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token1, + mode=mode, + ) + + expected_change_feeds = [{CURRENT: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: CREATE}} for i in range(4)] + actual_change_feeds = list(query_iterable) + cont_token2 = created_collection.client_connection.last_response_headers['etag'] + assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for deleted items + for item in created_items: + created_collection.delete_item(item=item, partition_key=item['pk']) + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token2, + mode=mode, + ) + + expected_change_feeds = [{CURRENT: {}, PREVIOUS: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: DELETE}} for i in range(4)] + actual_change_feeds = list(query_iterable) + assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for created/deleted items + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token1, + mode = mode + ) + + expected_change_feeds = [{CURRENT: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: CREATE}} for i in range(4)]\ + + [{CURRENT: {}, PREVIOUS: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: DELETE}} for i in range(4)] + actual_change_feeds = list(query_iterable) + assert_change_feed(expected_change_feeds, actual_change_feeds) + + def test_query_change_feed_with_errors(self, setup): + created_collection = setup["created_db"].create_container("change_feed_test_" + str(uuid.uuid4()), + PartitionKey(path="/pk")) + mode = 'AllVersionsAndDeletes' + + # Error if invalid mode was used + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + mode="test_invalid_mode", + ) + assert str(e.value) == "Invalid mode was used: 'test_invalid_mode'. Supported modes are ['LatestVersion', 'AllVersionsAndDeletes']." + + # Error if partition_key_range_id was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + partition_key_range_id="TestPartitionKeyRangeId", + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is not supported if 'partition_key_range_id' was used. Please use 'feed_range' instead." + + # Error if is_start_from_beginning was in invalid type + with pytest.raises(TypeError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning="Now", + ) + assert str(e.value) == "'is_start_from_beginning' must be 'bool' type, but given 'str'." + + # Error if is_start_from_beginning was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning=True, + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is only supported if 'is_start_from_beginning' is 'False'. Please use 'is_start_from_beginning=False' or 'continuation' instead." + + # Error if 'is_start_from_beginning' was used with 'start_time' + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning=True, + start_time="Now", + ) + assert str(e.value) == "'is_start_from_beginning' and 'start_time' are exclusive, please only set one of them." + + # Error if 'start_time' was invalid value + invalid_time = "Invalid value" + # TODO: previously it is throwing AttributeError, now has changed into ValueError, is it breaking change? + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed(start_time=invalid_time) + assert str(e.value) == "'start_time' must be either 'Now' or 'Beginning', but given 'Invalid value'." + + # Error if 'start_time' was invalid type + invalid_time = 1.2 + with pytest.raises(AttributeError) as e: + created_collection.query_items_change_feed(start_time=invalid_time) + assert str(e.value) == "'float' object has no attribute 'lower'" + + # Error if start_time was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + start_time=round_time(), + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is only supported if 'start_time' is 'Now'. Please use 'start_time=\"Now\"' or 'continuation' instead." + + # Error if too many positional arguments + with pytest.raises(TypeError) as e: + created_collection.query_items_change_feed( + "partition_key_range_id", + False, + "continuation", + 10, + "extra_argument", + ) + assert str(e.value) == "'query_items_change_feed()' takes 4 positional arguments but 5 were given." + + # Error if arguments are in both positional and keyword arguments list + with pytest.raises(TypeError) as e: + created_collection.query_items_change_feed( + "partition_key_range_id", + False, + "continuation", + 10, + continuation="123", + ) + assert str(e.value) == "'query_items_change_feed()' got multiple values for argument 'continuation'." + + # Error if continuation is missing 'mode' + with pytest.raises(ValueError) as e: + continuation_json = { + "containerRid": "", + "startFrom": {'Type': 'Now'}, + "continuation": {'Range': {'isMaxInclusive': False, 'isMinInclusive': True, 'max': 'FF', 'min': ''}, 'continuation': [{'range': {'isMaxInclusive': False, 'isMinInclusive': True, 'max': '1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF', 'min': ''}, 'token': '"1"'}, {'range': {'isMaxInclusive': False, 'isMinInclusive': True, 'max': 'FF', 'min': '1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF'}, 'token': '"1"'}], 'rid': 'ksMfAMrReEg=', 'v': 'v2'}, + } + ChangeFeedStateV2.from_continuation( + container_link="", + container_rid="", + continuation_json=continuation_json, + ) + assert str(e.value) == "Invalid continuation: [Missing mode]" + if __name__ == "__main__": unittest.main() diff --git a/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py b/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py index 3e138f64e230..f690ca36aa1e 100644 --- a/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py +++ b/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py @@ -15,6 +15,14 @@ from azure.cosmos.aio import CosmosClient from azure.cosmos.partition_key import PartitionKey +ID = 'id' +CURRENT = 'current' +PREVIOUS = 'previous' +METADATA = 'metadata' +OPERATION_TYPE = 'operationType' +CREATE = 'create' +DELETE = 'delete' +E_TAG = 'etag' @pytest_asyncio.fixture() async def setup(): @@ -27,12 +35,39 @@ async def setup(): test_client = CosmosClient(config.host, config.masterKey) created_db = await test_client.create_database_if_not_exists(config.TEST_DATABASE_ID) created_db_data = { - "created_db": created_db + "created_db": created_db, + "is_emulator": config.is_emulator } yield created_db_data await test_client.close() +def round_time(): + utc_now = datetime.now(timezone.utc) + return utc_now - timedelta(microseconds=utc_now.microsecond) + +async def assert_change_feed(expected, actual): + if len(actual) == 0: + assert len(expected) == len(actual) + return + + #TODO: remove this if we can add flag to get 'previous' always + for item in actual: + if METADATA in item and item[METADATA][OPERATION_TYPE] == DELETE: + if ID in item[METADATA]: + item[PREVIOUS] = {ID: item[METADATA][ID]} + + # Sort actual by operation_type and id + actual = sorted(actual, key=lambda k: (k[METADATA][OPERATION_TYPE], k[CURRENT][ID]) if k[METADATA][OPERATION_TYPE] == CREATE else (k[METADATA][OPERATION_TYPE], k[PREVIOUS][ID])) + + for expected_change_feed, actual_change_feed in zip(expected, actual): + for expected_type, expected_data in expected_change_feed.items(): + assert expected_type in actual_change_feed + actual_data = actual_change_feed[expected_type] + for key, value in expected_data.items(): + assert key in actual_data + assert expected_data[key] == actual_data[key] + @pytest.mark.cosmosEmulator @pytest.mark.asyncio @pytest.mark.usefixtures("setup") @@ -188,10 +223,6 @@ async def test_query_change_feed_with_start_time(self, setup): PartitionKey(path="/pk")) batchSize = 50 - def round_time(): - utc_now = datetime.now(timezone.utc) - return utc_now - timedelta(microseconds=utc_now.microsecond) - async def create_random_items(container, batch_size): for _ in range(batch_size): # Generate a Random partition key @@ -242,14 +273,6 @@ async def create_random_items(container, batch_size): # Should equal batch size assert totalCount == batchSize - # test an invalid value, Attribute error will be raised for passing non datetime object - invalid_time = "Invalid value" - try: - change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=invalid_time)] - fail("Cannot format date on a non datetime object.") - except ValueError as e: - assert ("Invalid start_time 'Invalid value'" == e.args[0]) - await setup["created_db"].delete_container(created_collection.id) async def test_query_change_feed_with_multi_partition_async(self, setup): @@ -276,5 +299,129 @@ async def test_query_change_feed_with_multi_partition_async(self, setup): assert actual_ids == expected_ids + async def test_query_change_feed_with_all_versions_and_deletes(self, setup): + partition_key = 'pk' + # 'retentionDuration' was required to enable `ALL_VERSIONS_AND_DELETES` for Emulator testing + change_feed_policy = {"retentionDuration": 10} if setup["is_emulator"] else None + created_collection = await setup["created_db"].create_container("change_feed_test_" + str(uuid.uuid4()), + PartitionKey(path=f"/{partition_key}"), + change_feed_policy=change_feed_policy) + + mode = 'AllVersionsAndDeletes' + + ## Test Change Feed with empty collection(Save the continuation token) + query_iterable = created_collection.query_items_change_feed( + mode=mode, + ) + expected_change_feeds = [] + actual_change_feeds = [item async for item in query_iterable] + cont_token1 = created_collection.client_connection.last_response_headers[E_TAG] + await assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for created items from cont_token1 (Save the new continuation token) + new_documents = [{partition_key: f'pk{i}', ID: f'doc{i}'} for i in range(4)] + created_items = [] + for document in new_documents: + created_item = await created_collection.create_item(body=document) + created_items.append(created_item) + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token1, + mode=mode, + ) + + expected_change_feeds = [{CURRENT: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: CREATE}} for i in range(4)] + actual_change_feeds = [item async for item in query_iterable] + cont_token2 = created_collection.client_connection.last_response_headers['etag'] + await assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for deleted items + for item in created_items: + await created_collection.delete_item(item=item, partition_key=item['pk']) + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token2, + mode=mode, + ) + + expected_change_feeds = [{CURRENT: {}, PREVIOUS: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: DELETE}} for i in + range(4)] + actual_change_feeds = [item async for item in query_iterable] + await assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for created/deleted items + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token1, + mode=mode + ) + + expected_change_feeds = [{CURRENT: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: CREATE}} for i in range(4)] \ + + [{CURRENT: {}, PREVIOUS: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: DELETE}} for i in + range(4)] + actual_change_feeds = [item async for item in query_iterable] + await assert_change_feed(expected_change_feeds, actual_change_feeds) + + async def test_query_change_feed_with_errors(self, setup): + created_collection = await setup["created_db"].create_container("change_feed_test_" + str(uuid.uuid4()), + PartitionKey(path="/pk")) + mode = 'AllVersionsAndDeletes' + + # Error if invalid mode was used + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + mode="test_invalid_mode", + ) + assert str(e.value) == "Invalid mode was used: 'test_invalid_mode'. Supported modes are ['LatestVersion', 'AllVersionsAndDeletes']." + + # Error if partition_key_range_id was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + partition_key_range_id="TestPartitionKeyRangeId", + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is not supported if 'partition_key_range_id' was used. Please use 'feed_range' instead." + + # Error if is_start_from_beginning was in invalid type + with pytest.raises(TypeError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning="Now", + ) + assert str(e.value) == "'is_start_from_beginning' must be 'bool' type, but given 'str'." + + # Error if is_start_from_beginning was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning="Now", + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is only supported if 'is_start_from_beginning' is 'False'. Please use 'is_start_from_beginning=False' or 'continuation' instead." + + # Error if 'is_start_from_beginning' was used with 'start_time' + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning=True, + start_time="Now", + ) + assert str(e.value) == "'is_start_from_beginning' and 'start_time' are exclusive, please only set one of them." + + # Error if 'start_time' was invalid value + invalid_time = "Invalid value" + # TODO: previously it is throwing AttributeError, now has changed into ValueError, is it breaking change? + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed(start_time=invalid_time) + assert str(e.value) == "'start_time' must be either 'Now' or 'Beginning', but given 'Invalid value'." + + # Error if 'start_time' was invalid type + invalid_time = 1.2 + with pytest.raises(AttributeError) as e: + created_collection.query_items_change_feed(start_time=invalid_time) + assert str(e.value) == "'float' object has no attribute 'lower'" + + # Error if start_time was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + start_time=round_time(), + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is only supported if 'start_time' is 'Now'. Please use 'start_time=\"Now\"' or 'continuation' instead." + if __name__ == '__main__': unittest.main()