Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #805 from Avaiga/fix/core#800-raise-error-if-run-c…
Browse files Browse the repository at this point in the history
…ore-service-twice

Fix/#800 - Run core service twice should raise an error
  • Loading branch information
trgiangdo authored Oct 30, 2023
2 parents 2c4a671 + 1903406 commit 9835dbd
Show file tree
Hide file tree
Showing 13 changed files with 769 additions and 518 deletions.
16 changes: 16 additions & 0 deletions src/taipy/core/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

from multiprocessing import Lock
from typing import Optional

from taipy.config import Config
Expand All @@ -21,13 +22,17 @@
from ._orchestrator._orchestrator_factory import _OrchestratorFactory
from ._version._version_manager_factory import _VersionManagerFactory
from .config import CoreSection
from .exceptions.exceptions import CoreServiceIsAlreadyRunning


class Core:
"""
Core service
"""

__is_running = False
__lock_is_running = Lock()

__logger = _TaipyLogger._get_logger()

_orchestrator: Optional[_Orchestrator] = None
Expand All @@ -46,10 +51,18 @@ def run(self, force_restart=False):
This function checks the configuration, manages application's version,
and starts a dispatcher and lock the Config.
"""
if self.__class__.__is_running:
raise CoreServiceIsAlreadyRunning

with self.__class__.__lock_is_running:
self.__class__.__is_running = True

self.__update_and_check_config()
self.__manage_version()

if self._orchestrator is None:
self._orchestrator = _OrchestratorFactory._build_orchestrator()

self.__start_dispatcher(force_restart)

def stop(self):
Expand All @@ -64,6 +77,9 @@ def stop(self):
self._dispatcher = _OrchestratorFactory._remove_dispatcher()
self.__logger.info("Core service has been stopped.")

with self.__class__.__lock_is_running:
self.__class__.__is_running = False

@staticmethod
def __update_and_check_config():
_CoreCLI.create_parser()
Expand Down
4 changes: 4 additions & 0 deletions src/taipy/core/exceptions/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def __init__(self, config_core_version: str, core_version: str) -> None:
)


class CoreServiceIsAlreadyRunning(Exception):
"""Raised if the Core service is already running."""


class CycleAlreadyExists(Exception):
"""Raised if it is trying to create a Cycle that has already exists."""

Expand Down
4 changes: 3 additions & 1 deletion tests/core/_backup/test_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ def init_backup_file():


def test_backup_storage_folder_when_core_run():
Core().run()
core = Core()
core.run()
backup_files = read_backup_file(backup_file_path)
assert backup_files == [f"{Config.core.storage_folder}\n"]
core.stop()


def test_no_new_entry_when_file_is_in_storage_folder():
Expand Down
18 changes: 4 additions & 14 deletions tests/core/_orchestrator/_dispatcher/test_job_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@

import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from functools import partial
from time import sleep
from unittest import mock
from unittest.mock import MagicMock

Expand All @@ -28,6 +26,7 @@
from src.taipy.core.job.job import Job
from src.taipy.core.task.task import Task
from taipy.config.config import Config
from tests.core.utils import assert_true_after_time


def execute(lock):
Expand Down Expand Up @@ -66,10 +65,10 @@ def test_build_standalone_job_dispatcher():
assert isinstance(dispatcher, _StandaloneJobDispatcher)
assert isinstance(dispatcher._executor, ProcessPoolExecutor)
assert dispatcher._nb_available_workers == 2
assert_true_after_120_second_max(dispatcher.is_running)
assert_true_after_time(dispatcher.is_running)
dispatcher.stop()
dispatcher.join()
assert_true_after_120_second_max(lambda: not dispatcher.is_running())
assert_true_after_time(lambda: not dispatcher.is_running())


def test_can_execute_2_workers():
Expand Down Expand Up @@ -103,7 +102,7 @@ def test_can_execute_2_workers():
dispatcher._dispatch(job)
assert not dispatcher._can_execute()

assert_true_after_120_second_max(lambda: dispatcher._can_execute())
assert_true_after_time(lambda: dispatcher._can_execute())


def test_can_execute_synchronous():
Expand Down Expand Up @@ -158,12 +157,3 @@ def test_exception_in_writing_data():
dispatcher._dispatch(job)
assert job.is_failed()
assert "node" in job.stacktrace[0]


def assert_true_after_120_second_max(assertion):
start = datetime.now()
while (datetime.now() - start).seconds < 120:
sleep(0.1) # Limit CPU usage
if assertion():
return
assert assertion()
41 changes: 30 additions & 11 deletions tests/core/config/checkers/test_migration_config_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@ def mock_func():

def test_check_if_entity_property_key_used_is_predefined(caplog):
with patch("sys.argv", ["prog", "--production", "1.0"]):
Core().run()

core = Core()
core.run()
assert caplog.text == ""
core.stop()

caplog.clear()

Config.unique_sections[MigrationConfig.name]._properties["_entity_owner"] = None
with patch("sys.argv", ["prog", "--production", "1.0"]):
with pytest.raises(SystemExit):
Core().run()
core = Core()
core.run()
core.stop()
assert (
"Properties of MigrationConfig `VERSION_MIGRATION` cannot have `_entity_owner` as its property." in caplog.text
)
Expand All @@ -44,7 +47,9 @@ def test_check_if_entity_property_key_used_is_predefined(caplog):
Config.unique_sections[MigrationConfig.name]._properties["_entity_owner"] = "entity_owner"
with patch("sys.argv", ["prog", "--production", "1.0"]):
with pytest.raises(SystemExit):
Core().run()
core = Core()
core.run()
core.stop()
expected_error_message = (
"Properties of MigrationConfig `VERSION_MIGRATION` cannot have `_entity_owner` as its property."
' Current value of property `_entity_owner` is "entity_owner".'
Expand All @@ -58,23 +63,29 @@ def test_check_valid_version(caplog):
Config.add_migration_function("2.0", data_nodes1, mock_func)
with patch("sys.argv", ["prog", "--production", "1.0"]):
with pytest.raises(SystemExit):
Core().run()
core = Core()
core.run()
core.stop()
assert "The target version for a migration function must be a production version." in caplog.text

caplog.clear()
Config.unblock_update()

with patch("sys.argv", ["prog", "--production", "2.0"]):
Core().run()
core = Core()
core.run()
assert caplog.text == ""
core.stop()


def test_check_callable_function(caplog):
data_nodes1 = Config.configure_data_node("data_nodes1", "pickle")
Config.add_migration_function("1.0", data_nodes1, 1)
with patch("sys.argv", ["prog", "--production", "1.0"]):
with pytest.raises(SystemExit):
Core().run()
core = Core()
core.run()
core.stop()
expected_error_message = (
"The migration function of config `data_nodes1` from version 1.0 must be populated with"
" Callable value. Current value of property `migration_fcts` is 1."
Expand All @@ -87,7 +98,9 @@ def test_check_callable_function(caplog):
Config.add_migration_function("1.0", data_nodes1, "bar")
with patch("sys.argv", ["prog", "--production", "1.0"]):
with pytest.raises(SystemExit):
Core().run()
core = Core()
core.run()
core.stop()
expected_error_message = (
"The migration function of config `data_nodes1` from version 1.0 must be populated with"
' Callable value. Current value of property `migration_fcts` is "bar".'
Expand All @@ -99,7 +112,9 @@ def test_check_callable_function(caplog):

Config.add_migration_function("1.0", data_nodes1, mock_func)
with patch("sys.argv", ["prog", "--production", "1.0"]):
Core().run()
core = Core()
core.run()
core.stop()


def test_check_migration_from_productions_to_productions_exist(caplog):
Expand All @@ -108,7 +123,9 @@ def test_check_migration_from_productions_to_productions_exist(caplog):
_VersionManager._set_production_version("1.2", True)

with patch("sys.argv", ["prog", "--production", "1.0"]):
Core().run()
core = Core()
core.run()
core.stop()
assert 'There is no migration function from production version "1.0" to version "1.1".' in caplog.text
assert 'There is no migration function from production version "1.1" to version "1.2".' in caplog.text

Expand All @@ -117,5 +134,7 @@ def test_check_migration_from_productions_to_productions_exist(caplog):

Config.add_migration_function("1.2", "data_nodes1", mock_func)
with patch("sys.argv", ["prog", "--production", "1.0"]):
Core().run()
core = Core()
core.run()
core.stop()
assert 'There is no migration function from production version "1.0" to version "1.1".' in caplog.text
30 changes: 27 additions & 3 deletions tests/core/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from src.taipy.core._orchestrator._orchestrator import _Orchestrator
from src.taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
from src.taipy.core.config.job_config import JobConfig
from src.taipy.core.exceptions.exceptions import CoreServiceIsAlreadyRunning
from taipy.config import Config
from taipy.config.exceptions.exceptions import ConfigurationUpdateBlocked

Expand All @@ -24,13 +25,15 @@ class TestCore:
def test_run_core_trigger_config_check(self, caplog):
Config.configure_data_node(id="d0", storage_type="toto")
with pytest.raises(SystemExit):
Core().run()
core = Core()
core.run()
expected_error_message = (
"`storage_type` field of DataNodeConfig `d0` must be either csv, sql_table,"
" sql, mongo_collection, pickle, excel, generic, json, parquet, or in_memory."
' Current value of property `storage_type` is "toto".'
)
assert expected_error_message in caplog.text
core.stop()

def test_run_core_as_a_service_development_mode(self):
_OrchestratorFactory._dispatcher = None
Expand All @@ -40,7 +43,6 @@ def test_run_core_as_a_service_development_mode(self):
assert core._dispatcher is None
assert _OrchestratorFactory._dispatcher is None

Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
core.run()
assert core._orchestrator is not None
assert core._orchestrator == _Orchestrator
Expand All @@ -49,6 +51,7 @@ def test_run_core_as_a_service_development_mode(self):
assert core._dispatcher is not None
assert isinstance(core._dispatcher, _DevelopmentJobDispatcher)
assert isinstance(_OrchestratorFactory._dispatcher, _DevelopmentJobDispatcher)
core.stop()

def test_run_core_as_a_service_standalone_mode(self):
_OrchestratorFactory._dispatcher = None
Expand All @@ -70,15 +73,35 @@ def test_run_core_as_a_service_standalone_mode(self):
assert isinstance(_OrchestratorFactory._dispatcher, _StandaloneJobDispatcher)
assert core._dispatcher.is_running()
assert _OrchestratorFactory._dispatcher.is_running()
core.stop()

def test_core_service_can_only_be_run_once(self):
core_instance_1 = Core()
core_instance_2 = Core()

core_instance_1.run()

with pytest.raises(CoreServiceIsAlreadyRunning):
core_instance_1.run()
with pytest.raises(CoreServiceIsAlreadyRunning):
core_instance_2.run()

# Stop the Core service and run it again should work
core_instance_1.stop()

core_instance_1.run()
core_instance_1.stop()
core_instance_2.run()
core_instance_2.stop()

def test_block_config_update_when_core_service_is_running_development_mode(self):
_OrchestratorFactory._dispatcher = None

core = Core()
Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
core.run()
with pytest.raises(ConfigurationUpdateBlocked):
Config.configure_data_node(id="i1")
core.stop()

def test_block_config_update_when_core_service_is_running_standalone_mode(self):
_OrchestratorFactory._dispatcher = None
Expand All @@ -88,3 +111,4 @@ def test_block_config_update_when_core_service_is_running_standalone_mode(self):
core.run()
with pytest.raises(ConfigurationUpdateBlocked):
Config.configure_data_node(id="i1")
core.stop()
Loading

0 comments on commit 9835dbd

Please sign in to comment.