diff --git a/pleskdistup/actions/distupgrade.py b/pleskdistup/actions/distupgrade.py index b7b01bc..7ef8ea7 100644 --- a/pleskdistup/actions/distupgrade.py +++ b/pleskdistup/actions/distupgrade.py @@ -1,5 +1,7 @@ # Copyright 2023-2024. WebPros International GmbH. All rights reserved. import os +import re +import uuid import subprocess import typing import urllib.request @@ -145,6 +147,118 @@ def estimate_revert_time(self) -> int: return 20 +class SetupAptReposRegexp(action.ActiveAction): + from_regexp: str + to_regexp: str + sources_list_path: str + sources_list_d_path: str + backup_suffix: str + _name: str + + def __init__( + self, + from_regexp: str, + to_regexp: str, + sources_list_path: str = "/etc/apt/sources.list", + sources_list_d_path: str = "/etc/apt/sources.list.d/", + name: str = "set up APT repositories to change from {self.from_regexp!r} to {self.to_regexp!r}", + ): + self.from_regexp = from_regexp + self.to_regexp = to_regexp + self.sources_list_path = sources_list_path + self.sources_list_d_path = sources_list_d_path + + self._name = name + self.backup_suffix = "_" + str(uuid.uuid4()) + + @property + def name(self): + return self._name.format(self=self) + + def _apply_replace_on_file(self, fpath: str, ptrn: re.Pattern, to_regexp: str) -> None: + old_lines = [] + new_lines = [] + with open(fpath) as f: + for line in f: + old_lines.append(line) + new_lines.append(ptrn.sub(to_regexp, line)) + if not [idx for idx, (e1, e2) in enumerate(zip(old_lines, new_lines)) if e1 != e2]: + return + with open(fpath + self.backup_suffix, 'w') as f: + f.truncate(0) + f.writelines(old_lines) + with open(fpath, 'w') as f: + f.truncate(0) + f.writelines(new_lines) + + def _revert_file(self, fpath: str): + backup_path = fpath + self.backup_suffix + if not os.path.exists(backup_path): + return + with open(backup_path) as b: + with open(fpath, 'w') as f: + f.truncate(0) + f.writelines(b.readlines()) + os.remove(backup_path) + + def _rm_backups(self): + backup_path = self.sources_list_path + self.backup_suffix + if os.file.exists(backup_path): + try: + os.remove(backup_path) + except Exception as ex: + log.info(f"Skip failed remove backup ({backup_path}): {ex}") + for root, _, filenames in os.walk(self.sources_list_d_path): + for f in filenames: + if not f.endswith(self.backup_suffix): + continue + backup_path = os.path.join(root, f) + try: + os.remove(backup_path) + except Exception as ex: + log.info(f"Skip failed remove backup ({backup_path}): {ex}") + + def _change_by_regexp(self, from_regexp: str, to_regexp: str) -> None: + p = re.compile(from_regexp) + self._apply_replace_on_file(self.sources_list_path, p, to_regexp) + + for root, _, filenames in os.walk(self.sources_list_d_path): + for f in filenames: + if f.endswith(".list"): + self._apply_replace_on_file(os.path.join(root, f), p, to_regexp) + + def _revert_all(self): + self._revert_file(self.sources_list_path) + for root, _, filenames in os.walk(self.sources_list_d_path): + for f in filenames: + if f.endswith(".list"): + self._revert_file(os.path.join(root, f)) + + def _prepare_action(self) -> action.ActionResult: + self._change_by_regexp(self.from_regexp, self.to_regexp) + packages.update_package_list() + return action.ActionResult() + + def _post_action(self) -> action.ActionResult: + self._rm_backups() + return action.ActionResult() + + def _revert_action(self) -> action.ActionResult: + self._revert_all() + packages.update_package_list() + return action.ActionResult() + + def estimate_prepare_time(self) -> int: + return 22 + + def estimate_revert_time(self) -> int: + return 22 + + +SetupDebianReposRegexp = SetupAptReposRegexp +SetupUbuntuReposRegexp = SetupAptReposRegexp + + class SetupAptRepositories(action.ActiveAction): from_codename: str to_codename: str