-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(anta): Limit concurrency #680
base: main
Are you sure you want to change the base?
feat(anta): Limit concurrency #680
Conversation
bf88158
to
d7de654
Compare
Quality Gate passedIssues Measures |
This pull request has conflicts, please resolve those before we can evaluate the pull request. |
b623e90
to
4879120
Compare
Conflicts have been resolved. A maintainer will review the pull request shortly. |
Quality Gate passedIssues Measures |
This pull request has conflicts, please resolve those before we can evaluate the pull request. |
68e259e
to
83206e8
Compare
Conflicts have been resolved. A maintainer will review the pull request shortly. |
CodSpeed Performance ReportMerging #680 will not alter performanceComparing Summary
Benchmarks breakdown
|
This pull request has conflicts, please resolve those before we can evaluate the pull request. |
Conflicts have been resolved. A maintainer will review the pull request shortly. |
|
||
Parameters | ||
---------- | ||
default_timeout : float | None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default values are defined twice here.
Once in this file, the other definition is in the timeout CLI option.
I would suggest to keep the default values in this file, remove the default value in Click and point to the documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with removing the default value in the CLI.
@@ -163,6 +190,7 @@ def core_options(f: Callable[..., Any]) -> Callable[..., Any]: | |||
show_envvar=True, | |||
envvar="ANTA_TIMEOUT", | |||
show_default=True, | |||
type=FLOAT_OR_NONE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to remove the default value here since it is defined in the anta.settings module already.
Then you can also get rid of the FLOAT_OR_NONE type :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, agreed with removing the default value here but we do need to keep FLOAT_OR_NONE to support None (no timeout) at the CLI. We do not support None today:
(.venv) ~/git_projects/anta (main ✔) anta nrfu --timeout None table
Usage: anta nrfu [OPTIONS] COMMAND [ARGS]...
Try 'anta nrfu --help' for help.
Error: Invalid value for '--timeout': 'None' is not a valid float.
- Developing ANTA tests: advanced_usages/custom-tests.md | ||
- ANTA as a Python Library: advanced_usages/as-python-lib.md | ||
- Caching in ANTA: advanced_usages/caching.md | ||
- Scaling ANTA: advanced_usages/scaling.md |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a Settings section that render the BaseSettings models in anta.settings?
|
||
On POSIX systems, this value is used to set the soft limit for the current process. | ||
The value cannot exceed the system's hard limit. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing ANTA_NOFILE attribute in the doctsring.
|
||
|
||
class MaxConcurrencySettings(BaseSettings): | ||
"""Environment variable for configuring the maximum number of concurrent tests in the event loop.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing ANTA_MAX_CONCURRENCY attribute in the docstring
class HttpxResourceLimitsSettings(BaseSettings): | ||
"""Environment variables for configuring the underlying HTTPX client resource limits. | ||
|
||
The limits are set using the following environment variables: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use attribute section to have a table in mkdocs?
if self.max_concurrency <= 0: | ||
msg = "Concurrency limit must be greater than 0." | ||
raise RuntimeError(msg) | ||
|
||
# NOTE: The `aiter` built-in function is not available in Python 3.9 | ||
tests = generator.__aiter__() # pylint: disable=unnecessary-dunder-call | ||
tests_ended = False | ||
tests_pending: set[Task[TestResult]] = set() | ||
|
||
while tests_pending or not tests_ended: | ||
# Add tests to the pending set until the limit is reached or no more tests are available | ||
while len(tests_pending) < self.max_concurrency and not tests_ended: | ||
try: | ||
# NOTE: The `anext` built-in function is not available in Python 3.9 | ||
test = await tests.__anext__() # pylint: disable=unnecessary-dunder-call | ||
except StopAsyncIteration: # noqa: PERF203 | ||
tests_ended = True | ||
logger.debug("All tests have been added to the pending set.") | ||
else: | ||
# Ensure the coroutine is scheduled to run and add it to the pending set | ||
tests_pending.add(asyncio.create_task(test)) | ||
logger.debug("Added a test to the pending set: %s", test) | ||
|
||
if len(tests_pending) >= self.max_concurrency: | ||
logger.debug("Concurrency limit reached: %s tests running. Waiting for tests to complete.", self.max_concurrency) | ||
|
||
if not tests_pending: | ||
logger.debug("No pending tests and all tests have been processed. Exiting.") | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self.max_concurrency <= 0: | |
msg = "Concurrency limit must be greater than 0." | |
raise RuntimeError(msg) | |
# NOTE: The `aiter` built-in function is not available in Python 3.9 | |
tests = generator.__aiter__() # pylint: disable=unnecessary-dunder-call | |
tests_ended = False | |
tests_pending: set[Task[TestResult]] = set() | |
while tests_pending or not tests_ended: | |
# Add tests to the pending set until the limit is reached or no more tests are available | |
while len(tests_pending) < self.max_concurrency and not tests_ended: | |
try: | |
# NOTE: The `anext` built-in function is not available in Python 3.9 | |
test = await tests.__anext__() # pylint: disable=unnecessary-dunder-call | |
except StopAsyncIteration: # noqa: PERF203 | |
tests_ended = True | |
logger.debug("All tests have been added to the pending set.") | |
else: | |
# Ensure the coroutine is scheduled to run and add it to the pending set | |
tests_pending.add(asyncio.create_task(test)) | |
logger.debug("Added a test to the pending set: %s", test) | |
if len(tests_pending) >= self.max_concurrency: | |
logger.debug("Concurrency limit reached: %s tests running. Waiting for tests to complete.", self.max_concurrency) | |
if not tests_pending: | |
logger.debug("No pending tests and all tests have been processed. Exiting.") | |
return | |
if self.max_concurrency <= 0: | |
msg = "Concurrency limit must be greater than 0." | |
raise RuntimeError(msg) | |
tests_ended = False | |
tests_pending: set[Task[TestResult]] = set() | |
while tests_pending or not tests_ended: | |
async for test in generator: | |
# Add tests to the pending set until the limit is reached or no more tests are available | |
if len(tests_pending) < self.max_concurrency: | |
# Ensure the coroutine is scheduled to run and add it to the pending set | |
tests_pending.add(asyncio.create_task(test)) | |
logger.debug("Added a test to the pending set: %s", test) | |
else: | |
logger.debug("Concurrency limit reached: %s tests running. Waiting for tests to complete.", self.max_concurrency) | |
break | |
if len(tests_pending) < self.max_concurrency and not tests_ended: | |
tests_ended = True | |
logger.debug("All tests have been added to the pending set.") | |
if not tests_pending: | |
logger.debug("No pending tests and all tests have been processed. Exiting.") | |
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks equivalent right?
session_settings["limits"] = httpx_limits | ||
self.max_connections = httpx_limits.max_connections | ||
else: | ||
self.max_connections = DEFAULT_HTTPX_MAX_CONNECTIONS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More robust logic to instantiate the asynceapi.Device
object then retrieve self._session.limits.max_connections
.
Because of https://github.com/encode/httpx/blob/10b7295922741b91a15751029e6ad3e8e5efb9f3/httpx/_client.py#L655, httpx default values will be used.
Use a read property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you are suggesting to not store max_connections
as a separate attribute, but rather create a read-only property that retrieves it directly from HTTPX? Limits
is injected down the line to create the httpcore connection pool. We could potentially retrieve it by doing self._session._transport._pool._max_connections
but since these are all privates it's too risky IMO.
timeout | ||
Timeout value in seconds for outgoing API calls. | ||
insecure | ||
Disable SSH Host Key validation. | ||
Global timeout value in seconds for outgoing eAPI calls. | ||
httpx_timeout | ||
Optional HTTPX Timeout object for fine-grained timeout configuration. | ||
If provided, it will override the global timeout. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout defined twice.
Use timeout: httpx.Timeout
object instead and remove timeout: float
.
Use a read property to get the Timeout
instance of this AyncEOSDevice
instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes I didn't know HTTPX supported different types. Type would be timeout: Timeout | float | None
to avoid breaking change. Should we also support the tuple type (below) like HTTPX?
TimeoutTypes = Union[
Optional[float],
Tuple[Optional[float], Optional[float], Optional[float], Optional[float]],
"Timeout",
]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also replace httpx_limits
with just limits
to be consistent with timeout
then.
host=host, port=ssh_port, username=username, password=password, client_keys=CLIENT_KEYS, **ssh_params | ||
) | ||
ssh_settings["known_hosts"] = None | ||
self._ssh_opts: SSHClientConnectionOptions = SSHClientConnectionOptions(**ssh_settings) | ||
|
||
def __rich_repr__(self) -> Iterator[tuple[str, Any]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implement timeout
and limits
?
# Default values for HTTPX resource limits | ||
DEFAULT_HTTPX_MAX_CONNECTIONS = 100 | ||
DEFAULT_HTTPX_MAX_KEEPALIVE_CONNECTIONS = 20 | ||
DEFAULT_HTTPX_KEEPALIVE_EXPIRY = 5.0 | ||
|
||
# Default values for HTTPX timeouts | ||
DEFAULT_HTTPX_CONNECT_TIMEOUT = 5.0 | ||
DEFAULT_HTTPX_READ_TIMEOUT = 5.0 | ||
DEFAULT_HTTPX_WRITE_TIMEOUT = 5.0 | ||
DEFAULT_HTTPX_POOL_TIMEOUT = 5.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use https://github.com/encode/httpx/blob/10b7295922741b91a15751029e6ad3e8e5efb9f3/httpx/_config.py#L246 instead of copying the default value?
Not sure if the value will render in mkdocs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it changes upstream it may make us sad one day
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could put our own default values here instead with the current 30s default we have. From our testing we have a pretty good idea what works best:
# Default values for HTTPX resource limits
DEFAULT_HTTPX_MAX_CONNECTIONS = 10
DEFAULT_HTTPX_MAX_KEEPALIVE_CONNECTIONS = 10
DEFAULT_HTTPX_KEEPALIVE_EXPIRY = 5.0
# Default values for HTTPX timeouts
DEFAULT_HTTPX_CONNECT_TIMEOUT = 30.0
DEFAULT_HTTPX_READ_TIMEOUT = 30.0
DEFAULT_HTTPX_WRITE_TIMEOUT = 30.0
DEFAULT_HTTPX_POOL_TIMEOUT = None
@@ -37,6 +37,7 @@ def test_anta_dry_run( | |||
|
|||
results = session_results[request.node.callspec.id] | |||
|
|||
# TODO: Use AntaRunner in ANTA v2.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed?
I think the anta.runner.main
function uses the AntaRunner
class already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can consider removing main()
altogether and use AntaRunner
everywhere in v2.0.0.
@@ -69,6 +70,7 @@ def test_anta( | |||
|
|||
results = session_results[request.node.callspec.id] | |||
|
|||
# TODO: Use AntaRunner in ANTA v2.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put this TODO everywhere we are using main()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default value code could be simplified/cleaned.
Good idea to have this anta.settings
module, we should use it for ANTA_DEBUG
, in another PR ;)
Quality Gate passedIssues Measures |
Description
This PR improves the test runner by introducing a generator-based approach for managing test coroutines and setting a configurable limit on the number of concurrent tests.
Instead of loading all test coroutines into a list, the runner now uses a generator to yield tests. This approach prevents memory overload and improves performance when dealing with a large number of tests.
A limit on the number of concurrent tests is introduced to avoid overwhelming the runner. This limit is configurable with an environement variable (hidden).
Implementation:
The generator yields test coroutines, ensuring that only a limited number of tests are scheduled and run concurrently.
Upon reaching the concurrency limit, the runner waits for some tests to complete before scheduling new ones from the generator.
Fixes: #713, #832
Checklist: