From 7f929ad1036a2e63879af4f8165e936dc4761196 Mon Sep 17 00:00:00 2001 From: KOLANICH Date: Fri, 7 Jan 2022 04:32:25 +0300 Subject: [PATCH] Initial commit --- .ci/aptPackagesToInstall.txt | 1 + .ci/pythonPackagesToInstallFromGit.txt | 0 .editorconfig | 12 + .github/.templateMarker | 1 + .github/dependabot.yml | 8 + .github/workflows/CI.yml | 15 + .gitignore | 11 + .gitlab-ci.yml | 51 ++ Code_Of_Conduct.md | 1 + MANIFEST.in | 4 + ReadMe.md | 13 + UNLICENSE | 24 + libzip/Archive/__init__.py | 78 ++ libzip/Archive/ctors.py | 39 + libzip/Archive/middleLevel.py | 70 ++ libzip/Error.py | 156 ++++ libzip/File/__init__.py | 243 ++++++ libzip/File/ctors.py | 26 + libzip/File/extra.py | 131 ++++ libzip/File/io.py | 44 ++ libzip/File/middleLevel.py | 195 +++++ libzip/Source/Cursor.py | 96 +++ libzip/Source/__init__.py | 156 ++++ libzip/Source/abstractIO.py | 16 + libzip/Source/ctors.py | 106 +++ libzip/Source/middleLevel.py | 82 ++ libzip/Stat.py | 166 ++++ libzip/__init__.py | 11 + libzip/ctypes/__init__.py | 0 libzip/ctypes/_ctypesgen_preamble.py | 25 + .../ctypes/_funcToCtypesSignatureConvertor.py | 36 + libzip/ctypes/_inttypes.py | 7 + libzip/ctypes/callbacks.py | 9 + libzip/ctypes/functions.py | 737 ++++++++++++++++++ libzip/ctypes/library.py | 16 + libzip/ctypes/opaque.py | 6 + libzip/ctypes/structs.py | 90 +++ libzip/ctypes/utils.py | 15 + libzip/enums/CompressionMethod.py | 33 + libzip/enums/Flags.py | 23 + libzip/enums/OS.py | 30 + libzip/enums/ZipError.py | 54 ++ libzip/enums/ZipSource.py | 56 ++ libzip/enums/__init__.py | 76 ++ libzip/middleLevel.py | 31 + libzip/py.typed | 0 libzip/utils/__init__.py | 38 + libzip/utils/dosTime.py | 16 + libzip/version.py | 12 + pyproject.toml | 37 + tests/tests.py | 25 + tutorial.ipynb | 492 ++++++++++++ 52 files changed, 3620 insertions(+) create mode 100644 .ci/aptPackagesToInstall.txt create mode 100644 .ci/pythonPackagesToInstallFromGit.txt create mode 100644 .editorconfig create mode 100644 .github/.templateMarker create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/CI.yml create mode 100644 .gitignore create mode 100644 .gitlab-ci.yml create mode 100644 Code_Of_Conduct.md create mode 100644 MANIFEST.in create mode 100644 ReadMe.md create mode 100644 UNLICENSE create mode 100644 libzip/Archive/__init__.py create mode 100644 libzip/Archive/ctors.py create mode 100644 libzip/Archive/middleLevel.py create mode 100644 libzip/Error.py create mode 100644 libzip/File/__init__.py create mode 100644 libzip/File/ctors.py create mode 100644 libzip/File/extra.py create mode 100644 libzip/File/io.py create mode 100644 libzip/File/middleLevel.py create mode 100644 libzip/Source/Cursor.py create mode 100644 libzip/Source/__init__.py create mode 100644 libzip/Source/abstractIO.py create mode 100644 libzip/Source/ctors.py create mode 100644 libzip/Source/middleLevel.py create mode 100644 libzip/Stat.py create mode 100644 libzip/__init__.py create mode 100644 libzip/ctypes/__init__.py create mode 100644 libzip/ctypes/_ctypesgen_preamble.py create mode 100644 libzip/ctypes/_funcToCtypesSignatureConvertor.py create mode 100644 libzip/ctypes/_inttypes.py create mode 100644 libzip/ctypes/callbacks.py create mode 100644 libzip/ctypes/functions.py create mode 100644 libzip/ctypes/library.py create mode 100644 libzip/ctypes/opaque.py create mode 100644 libzip/ctypes/structs.py create mode 100644 libzip/ctypes/utils.py create mode 100644 libzip/enums/CompressionMethod.py create mode 100644 libzip/enums/Flags.py create mode 100644 libzip/enums/OS.py create mode 100644 libzip/enums/ZipError.py create mode 100644 libzip/enums/ZipSource.py create mode 100644 libzip/enums/__init__.py create mode 100644 libzip/middleLevel.py create mode 100644 libzip/py.typed create mode 100644 libzip/utils/__init__.py create mode 100644 libzip/utils/dosTime.py create mode 100644 libzip/version.py create mode 100644 pyproject.toml create mode 100755 tests/tests.py create mode 100644 tutorial.ipynb diff --git a/.ci/aptPackagesToInstall.txt b/.ci/aptPackagesToInstall.txt new file mode 100644 index 0000000..b9ca3b6 --- /dev/null +++ b/.ci/aptPackagesToInstall.txt @@ -0,0 +1 @@ +libzip4 diff --git a/.ci/pythonPackagesToInstallFromGit.txt b/.ci/pythonPackagesToInstallFromGit.txt new file mode 100644 index 0000000..e69de29 diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..c9162b9 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,12 @@ +root = true + +[*] +charset = utf-8 +indent_style = tab +indent_size = 4 +insert_final_newline = true +end_of_line = lf + +[*.{yml,yaml}] +indent_style = space +indent_size = 2 diff --git a/.github/.templateMarker b/.github/.templateMarker new file mode 100644 index 0000000..5e3a3e0 --- /dev/null +++ b/.github/.templateMarker @@ -0,0 +1 @@ +KOLANICH/python_project_boilerplate.py diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..89ff339 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,8 @@ +version: 2 +updates: + - package-ecosystem: "pip" + directory: "/" + schedule: + interval: "daily" + allow: + - dependency-type: "all" diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml new file mode 100644 index 0000000..7fe33b3 --- /dev/null +++ b/.github/workflows/CI.yml @@ -0,0 +1,15 @@ +name: CI +on: + push: + branches: [master] + pull_request: + branches: [master] + +jobs: + build: + runs-on: ubuntu-22.04 + steps: + - name: typical python workflow + uses: KOLANICH-GHActions/typical-python-workflow@master + with: + github_token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..12a391f --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +__pycache__ +*.py[co] +/*.egg-info +*.srctrlbm +*.srctrldb +build +dist +.eggs +monkeytype.sqlite3 +/.ipynb_checkpoints +*.kate-swp diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..a1d9c96 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,51 @@ +image: registry.gitlab.com/kolanich-subgroups/docker-images/fixed_python:latest + +variables: + DOCKER_DRIVER: overlay2 + SAST_ANALYZER_IMAGE_TAG: latest + SAST_DISABLE_DIND: "true" + SAST_CONFIDENCE_LEVEL: 5 + CODECLIMATE_VERSION: latest + +include: + - template: SAST.gitlab-ci.yml + - template: Code-Quality.gitlab-ci.yml + - template: License-Management.gitlab-ci.yml + +build: + tags: + - shared + - linux + stage: build + variables: + GIT_DEPTH: "1" + PYTHONUSERBASE: ${CI_PROJECT_DIR}/python_user_packages + + before_script: + - export PATH="$PATH:$PYTHONUSERBASE/bin" # don't move into `variables` + - apt-get update + # todo: + #- apt-get -y install + #- pip3 install --upgrade + #- python3 ./fix_python_modules_paths.py + + script: + - python3 -m build -nw bdist_wheel + - mv ./dist/*.whl ./dist/libzip-0.CI-py3-none-any.whl + - pip3 install --upgrade ./dist/*.whl + - coverage run --source=libzip -m --branch pytest --junitxml=./rspec.xml ./tests/test.py + - coverage report -m + - coverage xml + + coverage: "/^TOTAL(?:\\s+\\d+){4}\\s+(\\d+%).+/" + + cache: + paths: + - $PYTHONUSERBASE + + artifacts: + paths: + - dist + reports: + junit: ./rspec.xml + cobertura: ./coverage.xml diff --git a/Code_Of_Conduct.md b/Code_Of_Conduct.md new file mode 100644 index 0000000..bcaa2bf --- /dev/null +++ b/Code_Of_Conduct.md @@ -0,0 +1 @@ +No codes of conduct! \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..20f0fa8 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,4 @@ +include UNLICENSE +include *.md +include tests +include .editorconfig diff --git a/ReadMe.md b/ReadMe.md new file mode 100644 index 0000000..4a33f72 --- /dev/null +++ b/ReadMe.md @@ -0,0 +1,13 @@ +libzip.py [![Unlicensed work](https://raw.githubusercontent.com/unlicense/unlicense.org/master/static/favicon.png)](https://unlicense.org/) +========= +~~[wheel (GHA via `nightly.link`)](https://nightly.link/KOLANICH-libs/libzip.py/workflows/CI/master/libzip-0.CI-py3-none-any.whl)~~ +[![Libraries.io Status](https://img.shields.io/librariesio/github/KOLANICH-libs/libzip.py.svg)](https://libraries.io/github/KOLANICH-libs/libzip.py) +[![Code style: antiflash](https://img.shields.io/badge/code%20style-antiflash-FFF.svg)](https://codeberg.org/KOLANICH-tools/antiflash.py) + +Python ctypes-based bindings to [`libzip`](https://github.com/nih-at/libzip). You need a `libzip` installed: +* `libzip4` in Debian-based distros. +* `libzip` in RPM-based distros, Arch, and Gentoo. + +**Warning: Currently `libzip` is used for updating files witin the archive. It doesn't allow rewriting files in archives without creating a copy of the archive. [It is considered contradicting `libzip` goals by its authors.](https://github.com/nih-at/libzip/issues/304)**. We need a lib allowing to do that. + +For the docs see [the tutorial](./tutorial.ipynb)[![NBViewer](https://nbviewer.org/static/ico/ipynb_icon_16x16.png)](https://nbviewer.org/urls/codeberg.org/KOLANICH-libs/libzip.py/raw/branch/master/tutorial.ipynb). diff --git a/UNLICENSE b/UNLICENSE new file mode 100644 index 0000000..efb9808 --- /dev/null +++ b/UNLICENSE @@ -0,0 +1,24 @@ +This is free and unencumbered software released into the public domain. + +Anyone is free to copy, modify, publish, use, compile, sell, or +distribute this software, either in source code form or as a compiled +binary, for any purpose, commercial or non-commercial, and by any +means. + +In jurisdictions that recognize copyright laws, the author or authors +of this software dedicate any and all copyright interest in the +software to the public domain. We make this dedication for the benefit +of the public at large and to the detriment of our heirs and +successors. We intend this dedication to be an overt act of +relinquishment in perpetuity of all present and future rights to this +software under copyright law. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +OTHER DEALINGS IN THE SOFTWARE. + +For more information, please refer to diff --git a/libzip/Archive/__init__.py b/libzip/Archive/__init__.py new file mode 100644 index 0000000..c1b1961 --- /dev/null +++ b/libzip/Archive/__init__.py @@ -0,0 +1,78 @@ +import typing +from pathlib import Path, PurePath + +from .. import Source +from ..ctypes.opaque import zip_source_ptr +from ..enums import EncryptionMethod, OpenFlags, ZipFlags, ZipStat +from ..File import ADD_REPLACE_ARGS, COMMENT_FLAGS, ExistingFile, file_add +from ..Stat import stat +from .ctors import open # pylint:disable=redefined-builtin +from .middleLevel import close, get_archive_comment, get_num_entries, set_archive_comment, set_default_password, zip_ptr + +__all__ = ("Archive",) + + +class Archive: + __slots__ = ("ptr", "path", "mode") + + FILE_FLAGS = ZipFlags(0) + + def __init__(self, path: Path, mode: OpenFlags) -> None: + self.path = path + self.mode = mode + self.ptr = None + + def __enter__(self) -> "Archive": + self.ptr = open(str(self.path), self.mode) + return self + + def __exit__(self, ex_type, ex_value, traceback) -> None: + self.close() + + def __len__(self) -> int: + return get_num_entries(self.ptr, 0) + + @property + def range(self) -> range: + return range(len(self)) + + def __getitem__(self, k: typing.Union[str, int]) -> ExistingFile: + if isinstance(k, int): + if k not in self.range: + raise KeyError("File with this index doesn't exist in the archive", k) + + return ExistingFile(self, idx=k, path=None, flags=self.__class__.FILE_FLAGS, password=None) + + if isinstance(k, (str, bytes, PurePath)): + return ExistingFile(self, idx=None, path=k, flags=self.__class__.FILE_FLAGS, password=None) + + raise ValueError("Incorrect type of key", k) + + def __iter__(self): + for idx in self.range: + yield ExistingFile(self, idx=idx, path=None, flags=self.__class__.FILE_FLAGS, password=None) + + @property + def password(self): + raise NotImplementedError + + @password.setter + def password(self, pwd): + set_default_password(self.ptr, pwd) + + @property + def comment(self) -> typing.Optional[bytes]: + return get_archive_comment(self.ptr, COMMENT_FLAGS) + + @comment.setter + def comment(self, v): + set_archive_comment(self.ptr, v) + + def add(self, path: PurePath, source: typing.Union[zip_source_ptr, Source.Source], flags: ZipFlags = FILE_FLAGS, addFlags: ZipFlags = ADD_REPLACE_ARGS) -> ExistingFile: + idx = file_add(self.ptr, name=path, source=source, flags=addFlags) + return ExistingFile(self, idx=idx, path=path, flags=flags, password=None) + + def close(self) -> None: + if self.ptr is not None: + close(self.ptr) + self.ptr = None diff --git a/libzip/Archive/ctors.py b/libzip/Archive/ctors.py new file mode 100644 index 0000000..51fb92f --- /dev/null +++ b/libzip/Archive/ctors.py @@ -0,0 +1,39 @@ +import typing +from ctypes import byref, c_char_p, c_int + +from ..ctypes import functions as f +from ..Source import OpenFlags, Source, ZipError, zip_ptr, zip_source_ptr +from ..utils import acceptPathOrStrOrBytes + +__all__ = ("open", "open_from_source", "fdopen") + + +def open(src: str, flags: OpenFlags) -> zip_ptr: # pylint:disable=redefined-builtin + err = c_int(0) + res = f.open(c_char_p(acceptPathOrStrOrBytes(src)), c_int(flags), byref(err)) + err = ZipError(err.value) + if err: + raise Exception(err) + return res + + +def open_from_source(src: typing.Union[zip_source_ptr, Source], flags: OpenFlags) -> zip_ptr: + if isinstance(src, Source): + src.recordLeak() + src = src.ptr + + err = c_int(0) + res = f.open_from_source(src, c_int(flags), byref(err)) + err = ZipError(err.value) + if err: + raise Exception(err) + return res + + +def fdopen(fd_orig: c_int, _flags: OpenFlags) -> zip_ptr: + err = c_int(0) + res = f.fdopen(fd_orig, c_int(_flags), byref(err)) + err = ZipError(err.value) + if err: + raise Exception(err) + return res diff --git a/libzip/Archive/middleLevel.py b/libzip/Archive/middleLevel.py new file mode 100644 index 0000000..4c56bdc --- /dev/null +++ b/libzip/Archive/middleLevel.py @@ -0,0 +1,70 @@ +import typing +from ctypes import POINTER, byref, c_byte, c_char_p, c_int, c_int32, c_uint16, cast + +from ..ctypes import functions as f +from ..ctypes._inttypes import zip_flags +from ..ctypes.opaque import zip_ptr +from ..enums import ZipFlags +from ..Error import _checkArchiveErrorCode +from ..utils import acceptStrOrBytes + +__all__ = ("archive_set_tempdir", "close", "discard", "get_archive_comment", "set_archive_comment", "get_archive_flag", "set_archive_flag", "get_num_entries", "set_default_password", "unchange_all", "unchange_archive") + + +def archive_set_tempdir(za: zip_ptr, tempdir: c_char_p) -> None: + _checkArchiveErrorCode(za, f.archive_set_tempdir(za, tempdir)) + + +def close(za: zip_ptr) -> None: + _checkArchiveErrorCode(za, f.close(za)) + + +def discard(za: zip_ptr) -> None: + return f.discard(za) + + +def get_archive_comment(za: zip_ptr, flags: ZipFlags) -> typing.Optional[bytes]: + lenp = c_int32(0) + resPtr = f.get_archive_comment(za, byref(lenp), zip_flags(flags)) + + lenp = lenp.value + + if lenp < 0: + raise ValueError("lenp returned is < 0", lenp) + + if resPtr: + bufT = c_byte * lenp + bufPtrT = POINTER(bufT) + return bytes(cast(resPtr, bufPtrT)[0]) + + return None + + +def get_archive_flag(za: zip_ptr, flag: ZipFlags, flags: ZipFlags) -> bool: + return bool(_checkArchiveErrorCode(za, f.get_archive_flag(za, zip_flags(flag), zip_flags(flags)))) + + +def get_num_entries(za: zip_ptr, flags: ZipFlags) -> int: + return int(f.get_num_entries(za, zip_flags(flags))) + + +def set_archive_comment(za: zip_ptr, comment: typing.AnyStr) -> None: + comment = acceptStrOrBytes(comment) + + _checkArchiveErrorCode(za, f.set_archive_comment(za, c_char_p(comment), c_uint16(len(comment)))) + + +def set_archive_flag(za: zip_ptr, flag: ZipFlags, value: c_int) -> None: + _checkArchiveErrorCode(za, f.set_archive_flag(za, zip_flags(flag), value)) + + +def set_default_password(za: zip_ptr, passwd: typing.AnyStr) -> None: + _checkArchiveErrorCode(za, f.set_default_password(za, c_char_p(acceptStrOrBytes(passwd)))) + + +def unchange_all(za: zip_ptr) -> None: + _checkArchiveErrorCode(za, f.unchange_all(za)) + + +def unchange_archive(za: zip_ptr) -> None: + _checkArchiveErrorCode(za, f.unchange_archive(za)) diff --git a/libzip/Error.py b/libzip/Error.py new file mode 100644 index 0000000..9b32ecf --- /dev/null +++ b/libzip/Error.py @@ -0,0 +1,156 @@ +import typing +from ctypes import POINTER, byref, c_uint64, c_void_p + +from .ctypes import functions as f +from .ctypes._inttypes import c_int, c_int64 +from .ctypes.opaque import zip_file_ptr, zip_ptr, zip_source_ptr +from .ctypes.structs import zip_error +from .enums.ZipError import SystemErrorType, ZipError + +zip_error_ptr = POINTER(zip_error) + + +def error_to_data(error: zip_error_ptr, data: c_void_p, length: c_uint64) -> c_int64: + return f.error_to_data(error, data, length) + + +def error_fini(err: zip_error_ptr) -> None: + return f.error_fini(err) + + +def error_init(err: zip_error_ptr) -> None: + return f.error_init(err) + + +def error_init_with_code(error: zip_error_ptr, ze: c_int) -> None: + return f.error_init_with_code(error, ze) + + +def error_set(err: zip_error_ptr, ze: c_int, se: c_int) -> None: + return f.error_set(err, ze, se) + + +class IDecodedError: + __slots__ = ("_zip", "_sysType", "_sysCode", "_descr") + # pylint:disable=attribute-defined-outside-init + + def __init__(self) -> None: + self.resetCache() + + def resetCache(self) -> None: + self._zip = None + self._sysType = None + self._sysCode = None + self._descr = None + + @property + def zip(self) -> ZipError: + if self._zip is None: + self._zip = ZipError(self._getZipError()) + return self._zip + + def _getZipError(self) -> int: + raise NotImplementedError + + @property + def sysType(self) -> SystemErrorType: + if self._sysType is None: + self._sysType = SystemErrorType(self._getSystemErrorType()) + return self._sysType + + def _getSystemErrorType(self) -> int: + raise NotImplementedError + + @property + def sysCode(self) -> int: + if self._sysCode is None: + self._sysCode = self._getSystemCode() + return self._sysCode + + def _getSystemCode(self) -> int: + raise NotImplementedError + + @property + def descr(self) -> str: + if self._descr is None: + self._descr = self._getDescr().decode("utf-8") + return self._descr + + def _getDescr(self) -> bytes: + raise NotImplementedError + + def __repr__(self): + return self.__class__.__name__ + repr(tuple(self)) + + def __iter__(self): + for k in __class__.__slots__: + yield getattr(self, k[1:]) + + def __bool__(self) -> bool: + return bool(self.zip) or bool(self.sysType) or bool(self.sysCode) + + +class DecodedErrorPtr(IDecodedError): + __slots__ = ("ptr",) + + def __init__(self, sPtr): + super().__init__() + self.ptr = sPtr + + def _getZipError(self) -> int: + return f.error_code_zip(self.ptr) + + def _getSystemErrorType(self) -> int: + return f.error_system_type(self.ptr) + + def _getSystemCode(self) -> int: + return f.error_code_system(self.ptr) + + def _getDescr(self) -> bytes: + return f.error_strerror(self.ptr) + + +class DecodedErrorStruct(DecodedErrorPtr): + __slots__ = ("err",) + + def __init__(self) -> None: + self.err = zip_error(0) + super().__init__(byref(self.err)) + + def _getZipError(self) -> int: + return self.err.zip_err + + def _getSystemErrorType(self) -> int: + return self.err.sys_err + + +class LibZipError(Exception): + pass + + +def _checkErrorCodeGeneric(handle: typing.Union[zip_ptr, zip_source_ptr, zip_file_ptr], errorStructGetter: typing.Callable, resourceErrorCleaner: typing.Callable, isFailure: typing.Callable, res: typing.Union[int, typing.Any], *args) -> typing.Any: + if res is None or (isinstance(res, int) and isFailure(res)): + error = errorStructGetter(handle) # type: zip_error_ptr + if error: + dec = DecodedErrorPtr(error) + errorObj = LibZipError(*dec, *args) + error_fini(error) + if resourceErrorCleaner is not None: + resourceErrorCleaner(handle) + raise errorObj + + raise Exception("Operation haven't suceed but no error code have been set", *args) + + return res + + +def _checkArchiveErrorCodeGeneric(za: zip_ptr, isFailure: typing.Callable, res: typing.Union[int, typing.Any], *args) -> typing.Any: + return _checkErrorCodeGeneric(za, f.get_error, f.error_clear, isFailure, res, *args) + + +def _checkArchiveErrorCode(za: zip_ptr, res: typing.Union[int, typing.Any], *args) -> typing.Any: + return _checkArchiveErrorCodeGeneric(za, lambda res: res != 0, res, *args) + + +def _checkArchiveErrorCodeResult(za: zip_ptr, res: typing.Union[int, typing.Any], *args) -> typing.Any: + return _checkArchiveErrorCodeGeneric(za, lambda res: res == 0, res, *args) diff --git a/libzip/File/__init__.py b/libzip/File/__init__.py new file mode 100644 index 0000000..05deedd --- /dev/null +++ b/libzip/File/__init__.py @@ -0,0 +1,243 @@ +import typing +from collections.abc import ByteString +from datetime import date, datetime, time +from pathlib import PurePath + +from ..enums import OS, CompressionMethod, EncryptionMethod +from ..enums.Flags import ZipFlags +from ..File.middleLevel import file_idx +from ..Source import Source +from ..Source.Cursor import IReadCursor, Openable +from ..Stat import Stat, ZipStat, stat +from ..utils import PathT, toPurePathOrStrOrBytes +from .ctors import fopen_index, fopen_index_encrypted +from .extra import ExtraFields +from .io import fclose, file_is_seekable, fread, fseek, ftell +from .middleLevel import ADD_REPLACE_ARGS, COMMENT_FLAGS, EXTERNAL_ATTRS_FLAGS, LOCATE_FLAGS, MOD_TIME_FLAGS, NAME_FLAGS, ExternalAttrs, delete, file_add, file_get_comment, file_get_external_attributes, file_idx, file_rename, file_replace, file_set_comment, file_set_dostime, file_set_encryption, file_set_external_attributes, file_set_mtime, get_name, name_locate, set_file_compression, unchange + +__all__ = ("MutableExternalAttrs", "ExistingFile") + +# pylint:disable=too-many-instance-attributes,too-many-arguments,too-many-public-methods + + +class ReadCursor(IReadCursor): + __slots__ = () + + TELL_FUNC = ftell + SEEK_FUNC = fseek + READ_FUNC = fread + + @property + def isSeekable(self) -> bool: + file_is_seekable(self.parent.ptr) + + +class MutableExternalAttrs(ExternalAttrs): + __slots__ = ("parent",) + + def __init__(self, parent, s: OS, attrs: int): + self.parent = parent + super().__init__(s, attrs) + + @ExternalAttrs.os.setter + def os(self, v): + self._os = v + self.parent.externalAttributes = self + + @ExternalAttrs.attrs.setter + def attrs(self, v): + self._attrs = v + self.parent.externalAttributes = self + + +class ExistingFile(Openable): + __slots__ = ("parent", "idx", "_path", "_stat", "_dirty", "flags", "password", "extras") + + NAME_FLAGS = NAME_FLAGS + LOCATE_FLAGS = LOCATE_FLAGS + READ_CURSOR_TYPE = ReadCursor + + def get_name(self, flags: ZipFlags = NAME_FLAGS) -> PathT: + return get_name(self.parent.ptr, self.idx, flags=flags) + + def rename(self, name: PathT, flags: ZipFlags = NAME_FLAGS) -> None: + return file_rename(self.parent.ptr, self.idx, name=name, flags=flags) + + def __repr__(self): + return self.__class__.__name__ + "(" + ", ".join((repr(self.parent), repr(self.idx), repr(self.pathName), repr(self.flags), repr(self.password))) + ")" + + @property + def pathName(self) -> PathT: + if self._path is None: + self._path = self.get_name(self.__class__.NAME_FLAGS) + return self._path + + @pathName.setter + def pathName(self, v: PathT): + self.rename(v, flags=self.__class__.NAME_FLAGS) + self._path = PurePath(self.get_name(self.__class__.NAME_FLAGS)) + return self._path + + def __init__(self, parent: "Archive", idx: file_idx, path: PathT, flags: ZipFlags, password: typing.Optional[typing.Union[ByteString, str]] = None) -> None: + super().__init__(None) + self.parent = parent + self.flags = flags + self.password = password + self._stat = None + self._path = path + self._dirty = False + + if idx is None and path is None: + raise ValueError("You must provide either an index or a path") + + if idx is None: + idx = name_locate(parent.ptr, path, self.__class__.LOCATE_FLAGS) + if idx is None: + raise KeyError("File not found in the archive", path) + + self.idx = idx + + if path is None: + self._path = self.pathName + else: + self._path = toPurePathOrStrOrBytes(self._path) + + self.extras = ExtraFields(self) + + def _open(self) -> None: + self._dirtyGuard() + if self.password is not None: + self.ptr = fopen_index_encrypted(self.parent.ptr, self.idx, self.flags, password=self.password) + else: + self.ptr = fopen_index(self.parent.ptr, self.idx, self.flags) + + def _close(self) -> None: + fclose(self.ptr) + self.ptr = None + + COMMENT_FLAGS = COMMENT_FLAGS + + def getComment(self, flags: ZipFlags = COMMENT_FLAGS) -> typing.Optional[bytes]: + return file_get_comment(self.parent.ptr, self.idx, flags) + + def setComment(self, comment: typing.Union[str, ByteString], flags: ZipFlags = COMMENT_FLAGS) -> None: + return file_set_comment(self.parent.ptr, self.idx, comment=comment, flags=flags) + + @property + def comment(self) -> typing.Optional[bytes]: + return self.getComment() + + @comment.setter + def comment(self, v: typing.Union[str, ByteString]): + return self.setComment(v) + + EXTERNAL_ATTRS_FLAGS = EXTERNAL_ATTRS_FLAGS + + def getExternalAttributes(self, flags: ZipFlags = EXTERNAL_ATTRS_FLAGS) -> ExternalAttrs: + return MutableExternalAttrs(self, *file_get_external_attributes(self.parent.ptr, self.idx, flags=flags)) + + def setExternalAttributes(self, attrs: ExternalAttrs, flags: ZipFlags = EXTERNAL_ATTRS_FLAGS) -> None: + return file_set_external_attributes(self.parent.ptr, self.idx, flags=flags, attrs=attrs) + + @property + def externalAttributes(self, flags: ZipFlags = EXTERNAL_ATTRS_FLAGS) -> ExternalAttrs: + return self.getExternalAttributes(self.__class__.EXTERNAL_ATTRS_FLAGS) + + @externalAttributes.setter + def externalAttributes(self, v: ExternalAttrs): + return self.setExternalAttributes(v, self.__class__.EXTERNAL_ATTRS_FLAGS) + + # `stat`-related + @property + def stat(self) -> Stat: + if self._stat is None: + self._stat = stat(self.parent.ptr, self.idx, what=ZipStat.everything) + return self._stat + + @property + def compressionMethod(self) -> CompressionMethod: + return self.stat.compressionMethod + + @compressionMethod.setter + def compressionMethod(self, method: CompressionMethod): + return self.setCompression(method) + + def setCompression(self, method: CompressionMethod = CompressionMethod.deflate, level: int = 9) -> None: + set_file_compression(self.parent.ptr, self.idx, method=method, level=level) + self.stat.compressionMethod = method + + @property + def encryptionMethod(self) -> EncryptionMethod: + return self.stat.encryptionMethod + + @encryptionMethod.setter + def encryptionMethod(self, method: EncryptionMethod): + return self.setEncryption(method, self.password) + + def setEncryption(self, method: EncryptionMethod, password: str) -> None: + file_set_encryption(self.parent.ptr, self.idx, method=method, password=password) + self.stat.encryptionMethod = method + + @property + def modTime(self) -> datetime: + return self.stat.modificationTime + + @modTime.setter + def modTime(self, modTime: typing.Union[int, datetime]): + return self.setModTime(modTime) + + MOD_TIME_FLAGS = MOD_TIME_FLAGS + + def setModTime(self, modTime: typing.Union[int, datetime], flags: ZipFlags = MOD_TIME_FLAGS) -> None: + return file_set_mtime(self.parent.ptr, self.idx, mtime=modTime, flags=flags) + + def setDosTime(self, dosTime: typing.Optional[typing.Union[time, int]] = None, dosDate: typing.Optional[typing.Union[date, int]] = None, flags: ZipFlags = 0) -> None: + return file_set_dostime(self.parent.ptr, self.idx, dosTime=dosTime, dosDate=dosDate, flags=flags) + + @property + def externaAttributes(self) -> ExternalAttrs: + return self.getExternalAttributes(self.__class__.EXTERNAL_ATTRS_FLAGS) + + @externaAttributes.setter + def externaAttributes(self, attrs: ExternalAttrs): + return self.setExternalAttributes(flags=self.__class__.EXTERNAL_ATTRS_FLAGS, attrs=attrs) + + def delete(self) -> None: + self._dirty = True + return delete(self.parent.ptr, self.idx) + + def _dirtyGuard(self): + if self.dirty: + raise RuntimeError("Doing this op to a dirty file will result in a crash") + + def _openGuard(self): + if self.ptr: + raise RuntimeError("Doing this op to a opened file will result in a crash") + + @property + def dirty(self) -> bool: + return self._dirty + + @dirty.setter + def dirty(self, v: bool): + self._openGuard() + v = bool(v) + if self._dirty: + if not v: + self.unchange() + else: + pass + else: + if v: + self._openGuard() + self._dirty = True + else: + pass + + def unchange(self) -> None: + unchange(self.parent.ptr, self.idx) + self._dirty = False + + def replace(self, source: Source, flags: ZipFlags = ADD_REPLACE_ARGS) -> None: + self.dirty = True + file_replace(self.parent.ptr, self.idx, source=source, flags=flags) diff --git a/libzip/File/ctors.py b/libzip/File/ctors.py new file mode 100644 index 0000000..65fa90e --- /dev/null +++ b/libzip/File/ctors.py @@ -0,0 +1,26 @@ +import typing +from collections.abc import ByteString +from ctypes import c_uint64 + +from ..ctypes import functions as f +from ..ctypes.opaque import zip_file_ptr, zip_ptr +from ..enums import ZipFlags +from ..utils import AnyStr, PathT, acceptPathOrStrOrBytes, acceptStrOrBytes + +__all__ = ("fopen", "fopen_encrypted", "fopen_index", "fopen_index_encrypted") + + +def fopen(za: zip_ptr, fname: PathT, flags: ZipFlags) -> zip_file_ptr: + return f.fopen(za, acceptPathOrStrOrBytes(fname), flags) + + +def fopen_encrypted(za: zip_ptr, fname: PathT, flags: ZipFlags, password: AnyStr) -> zip_file_ptr: + return f.fopen_encrypted(za, acceptPathOrStrOrBytes(fname), flags, acceptStrOrBytes(password)) + + +def fopen_index(za: zip_ptr, index: int, flags: ZipFlags) -> zip_file_ptr: + return f.fopen_index(za, c_uint64(index), flags) + + +def fopen_index_encrypted(za: zip_ptr, index: int, flags: ZipFlags, password: typing.Union[ByteString, str]) -> zip_file_ptr: + return f.fopen_index_encrypted(za, c_uint64(index), flags, acceptStrOrBytes(password)) diff --git a/libzip/File/extra.py b/libzip/File/extra.py new file mode 100644 index 0000000..943d84c --- /dev/null +++ b/libzip/File/extra.py @@ -0,0 +1,131 @@ +import typing +from collections.abc import ByteString +from ctypes import POINTER, byref, c_byte, c_int16, c_uint16, cast + +from ..ctypes import functions as f +from ..ctypes._inttypes import zip_flags, zip_index +from ..ctypes.opaque import zip_ptr +from ..enums.Flags import ZipFlags +from ..Error import _checkArchiveErrorCode + +__all__ = ("file_extra_field_delete", "file_extra_field_delete_by_id", "file_extra_field_set", "file_extra_fields_count", "file_extra_fields_count_by_id", "file_extra_field_get", "file_extra_field_get_by_id", "ExtraField") + +# pylint:disable=too-many-arguments + + +def file_extra_field_delete(za: zip_ptr, idx: int, ef_idx: c_uint16, flags: ZipFlags) -> None: + _checkArchiveErrorCode(za, f.file_extra_field_delete(za, zip_index(idx), ef_idx, zip_flags(flags))) + + +def file_extra_field_delete_by_id(za: zip_ptr, idx: int, ef_id: c_uint16, ef_idx: c_uint16, flags: ZipFlags) -> None: + _checkArchiveErrorCode(za, f.file_extra_field_delete_by_id(za, zip_index(idx), ef_id, ef_idx, zip_flags(flags))) + + +def file_extra_field_set(za: zip_ptr, idx: int, ef_id: int, ef_idx: int, data: ByteString, flags: ZipFlags) -> None: + _checkArchiveErrorCode(za, f.file_extra_field_set(za, zip_index(idx), c_uint16(ef_id), c_uint16(ef_idx), data, c_uint16(len(data)), zip_flags(flags))) + + +def file_extra_fields_count(za: zip_ptr, idx: int, flags: ZipFlags) -> c_int16: + return _checkArchiveErrorCode(za, f.file_extra_fields_count(za, zip_index(idx), zip_flags(flags))) + + +def file_extra_fields_count_by_id(za: zip_ptr, idx: int, ef_id: int, flags: ZipFlags) -> c_int16: + return _checkArchiveErrorCode(za, f.file_extra_fields_count_by_id(za, zip_index(idx), c_uint16(ef_id), zip_flags(flags))) + + +def file_extra_field_get(za: zip_ptr, idx: int, ef_idx: int, flags: ZipFlags) -> (bytes, int): + size = c_uint16(0) + ef_id = c_uint16(0) + resPtr = _checkArchiveErrorCode(za, f.file_extra_field_get(za, zip_index(idx), c_uint16(ef_idx), byref(ef_id), byref(size), zip_flags(flags))) + + bufT = c_byte * size.value + bufPtrT = POINTER(bufT) + return (bytes(cast(resPtr, bufPtrT)[0]), ef_id.value) + + +def file_extra_field_get_by_id(za: zip_ptr, idx: int, ef_id: int, ef_idx: int, flags: ZipFlags) -> bytes: + size = c_uint16(0) + resPtr = _checkArchiveErrorCode(za, f.file_extra_field_get_by_id(za, zip_index(idx), c_uint16(ef_id), c_uint16(ef_idx), byref(size), zip_flags(flags))) + + bufT = c_byte * size.value + bufPtrT = POINTER(bufT) + return bytes(cast(resPtr, bufPtrT)[0]) + + +EXTRA_FIELDS_FLAGS = ZipFlags.central | ZipFlags.local # | ZipFlags.unchanged + + +class ExtraField: + __slots__ = ("idx", "parent", "ef_idx") + + def __init__(self, parent: "ExtraFields", ef_idx: int): + self.parent = parent + self.ef_idx = ef_idx + + def flags(self): + return self.parent.flags + + def ef_id(self): + return self.parent.ef_id + + def __bytes__(self): + if self.ef_id is not None: + return file_extra_field_get_by_id(self.parent.archive.ptr, self.parent.parent.idx, self.ef_id, self.ef_idx, self.flags) + + return file_extra_field_get(self.parent.archive.ptr, self.parent.parent.idx, self.ef_idx, self.flags) + + def set(self, data: ByteString): + if self.ef_id is not None: + return file_extra_field_set(self.parent.archive.ptr, self.parent.parent.idx, self.ef_id, self.ef_idx, data, self.flags) + + raise ValueError("`ef_id` must be set") + + def delete(self): + if self.ef_id is not None: + return file_extra_field_delete_by_id(self.parent.archive.ptr, self.parent.parent.idx, self.ef_id, self.ef_idx, self.flags) + + return file_extra_field_delete(self.parent.archive.ptr, self.parent.parent.idx, self.ef_idx, self.flags) + + +class ExtraFields: + __slots__ = ("parent", "ef_id", "flags") + + def __init__(self, parent: "ExistingFile", ef_id: int = None, flags: ZipFlags = EXTRA_FIELDS_FLAGS) -> None: + self.parent = parent + self.ef_id = ef_id + self.flags = flags + + @property + def archive(self): + return self.parent.parent + + def __len__(self): + if self.ef_id is not None: + return file_extra_fields_count_by_id(self.archive.ptr, self.parent.idx, self.ef_id, self.flags) + + return file_extra_fields_count(self.archive.ptr, self.parent.idx, self.flags) + + def range(self) -> range: + return range(len(self)) + + def __iter__(self) -> typing.Iterator[ExtraField]: + for i in self.range: # pylint:disable=not-an-iterable + yield ExtraField(self, i) + + def __getitem__(self, k: int): + if k not in self.range: # pylint:disable=unsupported-membership-test + raise KeyError(k) + + return bytes(ExtraField(self, k)) + + def __setitem__(self, k: int, v: ByteString): + if k not in self.range: # pylint:disable=unsupported-membership-test + raise KeyError(k) + + return ExtraField(self, k).set(v) + + def __delitem__(self, k: int): + if k not in self.range: # pylint:disable=unsupported-membership-test + raise KeyError(k) + + return ExtraField(self, k).delete() diff --git a/libzip/File/io.py b/libzip/File/io.py new file mode 100644 index 0000000..a1a6f9e --- /dev/null +++ b/libzip/File/io.py @@ -0,0 +1,44 @@ +import typing +from ctypes import c_char_p, c_int, c_int8, c_int64 + +from ..ctypes import functions as f +from ..ctypes.opaque import zip_file_ptr +from ..Error import _checkErrorCodeGeneric +from ..Source.abstractIO import ioWrapper + +__all__ = ("file_strerror", "fclose", "fread", "fseek", "ftell") + + +def _checkFileErrorCode(zf: zip_file_ptr, isFailure: typing.Callable, res: typing.Union[int, typing.Any], *args) -> typing.Any: + return _checkErrorCodeGeneric(zf, f.file_get_error, f.file_error_clear, isFailure, res, *args) + + +def _checkFileErrorCodeResult(zf: zip_file_ptr, res: typing.Union[int, typing.Any], *args) -> typing.Any: + return _checkFileErrorCode(zf, lambda res: res != 0, res, *args) + + +def _checkFileErrorCodeIO(zf: zip_file_ptr, res: typing.Union[int, typing.Any], *args) -> typing.Any: + return _checkFileErrorCode(zf, lambda res: res < 0, res, *args) + + +def file_strerror(zf: zip_file_ptr) -> c_char_p: + return f.file_strerror(zf) + + +def fclose(zf: zip_file_ptr) -> None: + return _checkFileErrorCodeResult(zf, f.fclose(zf)) + + +fread = ioWrapper(f.fread, _checkFileErrorCodeIO, zip_file_ptr) + + +def fseek(zf: zip_file_ptr, offset: c_int64, whence: c_int) -> c_int8: + return _checkFileErrorCodeIO(zf, f.fseek(zf, offset, whence)) + + +def file_is_seekable(zf: zip_file_ptr) -> bool: + return bool(int(f.file_is_seekable(zf))) + + +def ftell(zf: zip_file_ptr) -> c_int64: + return _checkFileErrorCodeIO(zf, f.ftell(zf)) diff --git a/libzip/File/middleLevel.py b/libzip/File/middleLevel.py new file mode 100644 index 0000000..a2be6f0 --- /dev/null +++ b/libzip/File/middleLevel.py @@ -0,0 +1,195 @@ +import typing +from ctypes import POINTER, byref, c_byte, c_char_p, c_int32, c_uint8, c_uint16, c_uint32, cast +from datetime import date, datetime, time + +from ..ctypes import functions as f +from ..ctypes._inttypes import zip_index +from ..ctypes.opaque import zip_ptr, zip_source_ptr +from ..ctypes.structs import time_t +from ..enums import OS, CompressionMethod, EncryptionMethod, ZipFlags +from ..Error import _checkArchiveErrorCode, _checkArchiveErrorCodeResult +from ..Source import Source, _Source +from ..Stat import zip_flags +from ..utils import PathT, acceptPathOrStrOrBytes, acceptStrOrBytes, toPurePathOrStrOrBytes +from ..utils.dosTime import dateToDosDateInt, timeToDosTimeInt + +__all__ = ("file_get_comment", "ExternalAttrs", "file_get_external_attributes", "file_set_external_attributes", "file_rename", "file_add", "file_replace", "file_set_comment", "file_set_dostime", "file_set_encryption", "file_set_mtime", "dir_add", "name_locate", "get_name", "set_file_compression", "unchange", "file_idx") + +# pylint:disable=too-many-function-args + + +class file_idx(int): + pass + + +class ExternalAttrs: + __slots__ = ("_os", "_attrs") + + def __init__(self, os: OS, attrs: int): + self._os = OS(os) + self._attrs = attrs + + @property + def os(self): + return self._os + + @property + def attrs(self): + return self._attrs + + def toTuple(self): + return tuple(self) + + def __iter__(self): + yield self.os + yield self.attrs + + def __eq__(self, other): + return self.toTuple() == other.toTuple() + + def __hash__(self): + return hash(self.toTuple()) + + def __repr__(self): + return self.__class__.__name__ + repr(self.toTuple()) + + +EXTERNAL_ATTRS_FLAGS = ZipFlags(0) + + +def file_get_external_attributes(za: zip_ptr, idx: file_idx, flags: ZipFlags) -> ExternalAttrs: + os = c_uint8(0) + attrs = c_uint32(0) + + _checkArchiveErrorCode(za, f.file_get_external_attributes(za, zip_index(idx), zip_flags(flags), byref(os), byref(attrs))) + + return ExternalAttrs(os.value, attrs.value) + + +def file_set_external_attributes(za: zip_ptr, idx: file_idx, flags: ZipFlags, attrs: ExternalAttrs) -> None: + _checkArchiveErrorCode(za, f.file_set_external_attributes(za, zip_index(idx), flags, c_uint8(attrs.os), attributes=c_uint32(attrs.attrs))) + + +def file_rename(za: zip_ptr, idx: file_idx, name: typing.AnyStr, flags: ZipFlags) -> None: + _checkArchiveErrorCode(za, f.file_rename(za, zip_index(idx), acceptPathOrStrOrBytes(name), zip_flags(flags))) + + +ADD_REPLACE_ARGS = ZipFlags.enc_utf_8 + + +def file_add(za: zip_ptr, name: PathT, source: typing.Union[Source, zip_source_ptr], flags: ZipFlags = ADD_REPLACE_ARGS) -> zip_index: + if isinstance(source, Source): + if source.rs is None: + with source as enteredSource: + return file_add(za, name, enteredSource, flags) + else: + source = source.rs + + if isinstance(source, _Source): + source.recordLeak() + source = source.ptr + + return _checkArchiveErrorCodeResult(za, f.file_add(za, acceptPathOrStrOrBytes(name), source, zip_flags(flags))) + + +def file_replace(za: zip_ptr, idx: file_idx, source: typing.Union[Source, zip_source_ptr], flags: ZipFlags = ADD_REPLACE_ARGS) -> None: + if isinstance(source, Source): + if source.rs is None: + with source as enteredSource: + return file_replace(za, idx, enteredSource, flags) + else: + source = source.rs + + if isinstance(source, _Source): + source.recordLeak() + source = source.ptr + + _checkArchiveErrorCode(za, f.file_replace(za, zip_index(idx), source, zip_flags(flags))) + return None + + +COMMENT_FLAGS = ZipFlags.enc_raw + + +def file_get_comment(za: zip_ptr, idx: file_idx, flags: ZipFlags = COMMENT_FLAGS) -> typing.Optional[bytes]: + size = c_uint32(0) + resPtr = f.file_get_comment(za, zip_index(idx), byref(size), zip_flags(int(flags))) + + if resPtr: + bufT = c_byte * size.value + bufPtrT = POINTER(bufT) + return bytes(cast(resPtr, bufPtrT)[0]) + + return None + + +def file_set_comment(za: zip_ptr, idx: file_idx, comment: typing.AnyStr, flags: ZipFlags = COMMENT_FLAGS) -> None: + comment = acceptStrOrBytes(comment) + _checkArchiveErrorCode(za, f.file_set_comment(za, zip_index(idx), c_char_p(comment), c_uint16(len(comment)), zip_flags(int(flags)))) + + +def file_set_dostime(za: zip_ptr, idx: file_idx, dosTime: typing.Optional[typing.Union[time, int]] = None, dosDate: typing.Optional[typing.Union[date, int]] = None, flags: ZipFlags = 0) -> None: + if bool(int(flags)): + raise NotImplementedError("Flags are not implemented yet in the underlying lib. If they got implemented, notify us.") + + if not isinstance(dosTime, int): + dosTime = timeToDosTimeInt(dosTime) + + if not isinstance(dosDate, int): + dosDate = dateToDosDateInt(dosDate) + + _checkArchiveErrorCode(za, f.file_set_dostime(za, zip_index(idx), c_uint16(dosTime), c_uint16(dosDate), zip_flags(flags))) + + +def file_set_encryption(za: zip_ptr, idx: file_idx, method: EncryptionMethod, password: typing.AnyStr) -> None: + _checkArchiveErrorCode(za, f.file_set_encryption(za, zip_index(idx), c_uint16(method), c_char_p(acceptStrOrBytes(password)))) + + +MOD_TIME_FLAGS = ZipFlags(0) + + +def file_set_mtime(za: zip_ptr, idx: file_idx, mtime: typing.Union[int, datetime], flags: ZipFlags = MOD_TIME_FLAGS) -> None: + if flags: + raise NotImplementedError("Flags are not implemented.") + + if not isinstance(mtime, int): + mtime = int(mtime.timestamp()) + + _checkArchiveErrorCode(za, f.file_set_mtime(za, zip_index(idx), time_t(mtime), flags)) + + +def dir_add(za: zip_ptr, name: PathT, flags: ZipFlags) -> zip_index: + return _checkArchiveErrorCode(za, f.dir_add(za, acceptPathOrStrOrBytes(name), zip_flags(flags))) + + +def delete(za: zip_ptr, idx: file_idx) -> None: + _checkArchiveErrorCode(za, f.delete(za, zip_index(idx))) + + +NAME_FLAGS = ZipFlags(0) +LOCATE_FLAGS = ZipFlags(0) + + +def name_locate(za: zip_ptr, fname: PathT, flags: ZipFlags = LOCATE_FLAGS) -> typing.Optional[zip_index]: + res = f.name_locate(za, acceptPathOrStrOrBytes(fname), zip_flags(flags)) + if res < 0: + res = None + return res + + +def get_name(za: zip_ptr, idx: file_idx, flags: ZipFlags) -> PathT: + return toPurePathOrStrOrBytes(_checkArchiveErrorCode(za, f.get_name(za, zip_index(idx), zip_flags(flags)))) + + +_ALLOWED_LEVEL_RANGE = range(0, 10) # 10 is excluded + + +def set_file_compression(za: zip_ptr, idx: file_idx, method: CompressionMethod = CompressionMethod.lzma2, level: int = 9) -> None: + if level not in _ALLOWED_LEVEL_RANGE: + raise ValueError("Incorrect compression level", level, _ALLOWED_LEVEL_RANGE) + + _checkArchiveErrorCode(za, f.set_file_compression(za, zip_index(idx), c_int32(method), c_uint32(level))) + + +def unchange(za: zip_ptr, idx: file_idx) -> None: + _checkArchiveErrorCode(za, f.unchange(za, zip_index(idx))) diff --git a/libzip/Source/Cursor.py b/libzip/Source/Cursor.py new file mode 100644 index 0000000..0b9c4ce --- /dev/null +++ b/libzip/Source/Cursor.py @@ -0,0 +1,96 @@ +import typing +from collections.abc import ByteString +from ctypes import c_int64, c_void_p + +__all__ = ("Cursor", "IReadCursor", "IWriteCursor") + + +TELL_FUNC_T = typing.Callable[[c_void_p], int] +SEEK_FUNC_T = typing.Callable[[c_void_p, int, int], None] + + +class Cursor: + __slots__ = ("parent",) + + def __init__(self, parent: "Openable") -> None: + self.parent = parent + + TELL_FUNC = None # type: TELL_FUNC_T + SEEK_FUNC = None # type: SEEK_FUNC_T + + def tell(self): + return self.__class__.TELL_FUNC(self.parent.ptr) + + def seek(self, offset: int, whence: int): + # todo: somehow `source_seek_compute_offset` must be used + return self.__class__.SEEK_FUNC(self.parent.ptr, offset, whence) + + +READ_FUNC_T = typing.Callable[[c_void_p, int], int] + + +class IReadCursor(Cursor): + __slots__ = () + + READ_FUNC = None # type: READ_FUNC_T + + def read(self, buff: typing.Union[bytearray, int]) -> ByteString: + if isinstance(buff, int): + buff = bytearray(buff) + + read = self.__class__.READ_FUNC(self.parent.ptr, buff) + return read + + +class IWriteCursor(Cursor): + __slots__ = () + + WRITE_FUNC = None + + def write(self, data: ByteString) -> c_int64: + return self.__class__.WRITE_FUNC(self.parent.ptr, data) + + +class Openable: + __slots__ = ("ptr", "reader", "writer") + + READ_CURSOR_TYPE = None # type: IReadCursor + WRITE_CURSOR_TYPE = None # type: IWriteCursor + + def __repr__(self): + return self.__class__.__name__ + "(" + hex(self.ptr) + ") " + + def _open(self): + raise NotImplementedError + + def _close(self): + raise NotImplementedError + + def __init__(self, ptr: c_void_p) -> None: + self.ptr = ptr + self.reader = None + self.writer = None + + def __enter__(self) -> "Openable": + if not self.reader: + self._open() + self.reader = self.__class__.READ_CURSOR_TYPE(self) + wct = self.__class__.WRITE_CURSOR_TYPE + if wct: + self.writer = wct(self) + else: + raise RuntimeError("Multiple entering of the same Openable", self) + + return self + + def __exit__(self, ex_type, ex_value, traceback) -> None: + if self.reader is not None: + self.reader = None + self.writer = None + self._close() + + def write(self, data: ByteString) -> c_int64: + return self.writer.write(data) + + def read(self, buff: typing.Union[bytearray, int]) -> bytearray: + return self.reader.read(buff) diff --git a/libzip/Source/__init__.py b/libzip/Source/__init__.py new file mode 100644 index 0000000..79b2fe3 --- /dev/null +++ b/libzip/Source/__init__.py @@ -0,0 +1,156 @@ +import platform +import traceback +import typing +from collections.abc import ByteString +from ctypes import * +from datetime import date, datetime, time +from pathlib import Path + +from ..ctypes import functions as f +from ..ctypes._inttypes import * +from ..ctypes.callbacks import * +from ..ctypes.functions import get_error +from ..ctypes.opaque import * +from ..ctypes.structs import * +from ..enums import * +from ..Error import * +from ..Error import _checkArchiveErrorCode +from ..Stat import * +from ..utils.dosTime import * +from .ctors import * +from .Cursor import IReadCursor, IWriteCursor, Openable +from .middleLevel import source_begin_write, source_begin_write_cloning, source_close, source_commit_write, source_get_file_attributes, source_is_deleted, source_open, source_read, source_stat, source_write + +__all__ = ("Cursor", "ReadCursor", "WriteCursor", "Source") + + +class ReadCursor(IReadCursor): + __slots__ = () + + TELL_FUNC = f.source_tell + SEEK_FUNC = f.source_seek + READ_FUNC = source_read + + +class WriteCursor(IWriteCursor): + __slots__ = () + + TELL_FUNC = f.source_tell_write + SEEK_FUNC = f.source_seek_write + WRITE_FUNC = source_write + + def beginWrite(self): + return source_begin_write(self.parent.ptr) + + def beginWriteCloning(self, offset: c_uint64): + return source_begin_write_cloning(self.parent.ptr, offset) + + def rollback(self) -> None: + return f.source_rollback_write(self.parent.ptr) + + def commit(self): + source_commit_write(self.parent.ptr) + + +# todo: RawIOBase + + +class _Source(Openable): + __slots__ = ("leakSources",) + + READ_CURSOR_TYPE = ReadCursor + WRITE_CURSOR_TYPE = WriteCursor + + def __init__(self, ptr: zip_source_ptr) -> None: + super().__init__(ptr) + self.leakSources = [] + + def recordLeak(self) -> None: + """Docs prohibits calling `zip_source_free` in certain cases.""" + self.leakSources.append(traceback.extract_stack(f=None, limit=None)) + + def incRef(self): + f.source_keep(self.ptr) + + def decRef(self): + if self.leakSources: + raise Exception("Docs prohibits calling `zip_source_free` on the sources used in `zip_open_from_source`, `zip_file_add` and `zip_file_replace` calls.", self.leakSources) + + f.source_free(self.ptr) + + def _open(self): + source_open(self.ptr) + + def _close(self): + source_close(self.ptr) + + def stat(self) -> Stat: + return source_stat(self.ptr) + + def getAttributes(self) -> int: + return source_get_file_attributes(self.ptr) + + @property + def attributes(self): + return source_get_file_attributes(self.ptr) + + @property + def isDeleted(self) -> bool: + return source_is_deleted(self.ptr) + + +class Source: + __slots__ = ("rs", "parent") + + def __init__(self, parent: typing.Optional["Archive"] = None) -> None: + self.__class__.rs.__set__(self, None) # pylint:disable=no-member + self.__class__.parent.__set__(self, parent) # pylint:disable=no-member + + def makeRawSource(self, za: typing.Optional[int]) -> _Source: + raise NotImplementedError + + def __enter__(self) -> _Source: + # pylint:disable=no-member,attribute-defined-outside-init + p = self.parent + self.rs = _Source(self.makeRawSource(p.ptr if p else None)) + return self.rs + + def __exit__(self, ex_type, ex_value, tb) -> None: + # pylint:disable=attribute-defined-outside-init + self.rs = None + + @classmethod + def make(cls, source: typing.Union[Path, bytes, bytearray], slc: slice = slice(0, None), parent: typing.Optional["Archive"] = None) -> "Source": + if isinstance(source, Path): + return PathSource(path=source, slc=slc, parent=parent) + + if isinstance(source, ByteString): + if slc.start != 0 and slc.stop not in (None, len(source) - 1): + source = source[slc] + + return BufferSource(buff=source, parent=parent) + + raise ValueError("Not yet supported type") + + +class PathSource(Source): + __slots__ = ("path", "slc") + + def __init__(self, path: Path, slc: slice = slice(0, None), parent=None): + super().__init__(parent=parent) + self.__class__.path.__set__(self, path) # pylint:disable=no-member + self.__class__.slc.__set__(self, slc) # pylint:disable=no-member + + def makeRawSource(self, za: typing.Optional[int]) -> _Source: + return source_file(fname=self.path, slc=self.slc, za=za) # pylint:disable=no-member + + +class BufferSource(Source): + __slots__ = ("buff",) + + def __init__(self, buff: ByteString, parent: typing.Optional["Archive"] = None) -> None: + super().__init__(parent=parent) + self.__class__.buff.__set__(self, buff) # pylint:disable=no-member + + def makeRawSource(self, za: typing.Optional[int]) -> _Source: + return source_buffer(data=self.buff, za=za) # pylint:disable=no-member diff --git a/libzip/Source/abstractIO.py b/libzip/Source/abstractIO.py new file mode 100644 index 0000000..20a8c7d --- /dev/null +++ b/libzip/Source/abstractIO.py @@ -0,0 +1,16 @@ +import typing +from ctypes import c_int64, c_uint64, c_void_p + +from ..ctypes.utils import byteStringToPointer + +__all__ = ("ioWrapper",) + + +def ioWrapper(readWriteFunc: typing.Callable, errorCodeFunc: typing.Callable, descriptorType: typing.Type[c_void_p]) -> typing.Callable: + def wrapper(src: descriptorType, data: bytearray) -> c_int64: + buf, size = byteStringToPointer(data) + return errorCodeFunc(src, readWriteFunc(src, buf, c_uint64(size))) + + wrapper.__name__ = readWriteFunc.__name__ + + return wrapper diff --git a/libzip/Source/ctors.py b/libzip/Source/ctors.py new file mode 100644 index 0000000..734b24a --- /dev/null +++ b/libzip/Source/ctors.py @@ -0,0 +1,106 @@ +import typing +from collections.abc import ByteString +from ctypes import POINTER, byref, c_char_p, c_int, c_int64, c_uint64, c_void_p + +from ..ctypes import functions as f +from ..ctypes.callbacks import zip_source_callback +from ..ctypes.opaque import FILE_P, zip_ptr, zip_source_ptr +from ..ctypes.structs import zip_buffer_fragment +from ..ctypes.utils import byteStringToPointer +from ..Error import DecodedErrorStruct, LibZipError, _checkArchiveErrorCodeResult +from ..utils import PathT, acceptPathOrStrOrBytes + +__all__ = ("source_file", "source_buffer_fragment", "source_buffer", "source_filep", "source_function", "source_zip") + + +def _source_file(za: zip_ptr, fname: PathT, start: int, length: int) -> zip_source_ptr: + return _checkArchiveErrorCodeResult(za, f.source_file(za, c_char_p(acceptPathOrStrOrBytes(fname)), c_uint64(start), c_int64(length))) + + +def _source_file_create(fname: PathT, start: int, length: int) -> zip_source_ptr: + err = DecodedErrorStruct() + res = f.source_file_create(c_char_p(acceptPathOrStrOrBytes(fname)), c_uint64(start), c_int64(length), byref(err.err)) + if err: + raise LibZipError(*err) + return res + + +def source_file(fname: PathT, slc: slice = slice(0, None), za: typing.Optional[zip_ptr] = None) -> zip_source_ptr: + start = slc.start + if slc.stop is not None: + length = slc.stop - slc.start + else: + length = -1 + + return _source_file(za, fname, start, length) if za is not None else _source_file_create(fname, start, length) + + +def _source_buffer_fragment(za: zip_ptr, fragments: POINTER(zip_buffer_fragment), nfragments: c_uint64, freep: c_int) -> zip_source_ptr: + return _checkArchiveErrorCodeResult(za, f.source_buffer_fragment(za, fragments, nfragments, freep)) + + +def _source_buffer_fragment_create(fragments: POINTER(zip_buffer_fragment), nfragments: c_uint64, freep: c_int) -> zip_source_ptr: + err = DecodedErrorStruct() + res = f.source_buffer_fragment_create(fragments, nfragments, freep, byref(err.err)) + if err: + raise LibZipError(*err) + return res + + +def source_buffer_fragment(fragments: POINTER(zip_buffer_fragment), nfragments: c_uint64, freep: c_int, za: typing.Optional[zip_ptr] = None) -> zip_source_ptr: + return _source_buffer_fragment(za, fragments, nfragments, freep) if za is not None else _source_buffer_fragment_create(fragments, nfragments, freep) + + +def _source_buffer(za: zip_ptr, data: ByteString, freep: bool = False) -> zip_source_ptr: + buf, size = byteStringToPointer(data) + return _checkArchiveErrorCodeResult(za, f.source_buffer(za, buf, c_uint64(size), c_int(int(bool(freep))))) + + +def _source_buffer_create(data: ByteString, freep: bool = False) -> zip_source_ptr: + err = DecodedErrorStruct() + buf, size = byteStringToPointer(data) + res = f.source_buffer_create(buf, c_uint64(size), c_int(int(bool(freep))), byref(err.err)) + if err: + raise LibZipError(*err) + return res + + +def source_buffer(data: ByteString, za: typing.Optional[zip_ptr] = None) -> zip_source_ptr: + """`freep` parameter was removed intentionally, the lib frees the buffer when it is True, but in our case it is Python that manages the memory. Use the specialized lower-level functions yourself if you need to use that parameter, for example, to free memory allocated using malloc""" + + return _source_buffer(za, data) if za is not None else _source_buffer_create(data) + + +def _source_filep(za: zip_ptr, file: FILE_P, start: c_uint64, length: c_int64) -> zip_source_ptr: + return _checkArchiveErrorCodeResult(za, f.source_filep(za, file, start, length)) + + +def _source_filep_create(file: FILE_P, start: c_uint64, length: c_int64) -> zip_source_ptr: + err = DecodedErrorStruct() + res = f.source_filep_create(file, start, length, byref(err.err)) + if err: + raise LibZipError(*err) + return res + + +def source_filep(file: FILE_P, start: c_uint64, length: c_int64, za: typing.Optional[zip_ptr] = None) -> zip_source_ptr: + return _source_filep(za, file, start, length) if za is not None else _source_filep_create(file, start, length) + + +def _source_function(za: zip_ptr, zcb: zip_source_callback, ud: c_void_p) -> zip_source_ptr: + return _checkArchiveErrorCodeResult(za, f.source_function(za, zcb, ud)) + + +def _source_function_create(zcb: zip_source_callback, ud: c_void_p) -> zip_source_ptr: + err = DecodedErrorStruct() + res = f.source_function_create(zcb, ud, byref(err.err)) + if err: + raise LibZipError(*err) + return res + + +def source_function(zcb: zip_source_callback, ud: c_void_p, za: typing.Optional[zip_ptr] = None) -> zip_source_ptr: + return _source_function(za, zcb, ud) if za is not None else _source_function_create(zcb, ud) + + +source_zip = f.source_zip diff --git a/libzip/Source/middleLevel.py b/libzip/Source/middleLevel.py new file mode 100644 index 0000000..f405242 --- /dev/null +++ b/libzip/Source/middleLevel.py @@ -0,0 +1,82 @@ +import typing +from ctypes import byref, c_int, c_int64, c_uint64, c_void_p + +from ..ctypes import functions as f +from ..ctypes.opaque import zip_source_ptr +from ..ctypes.structs import zip_file_attributes +from ..Error import LibZipError, _checkErrorCodeGeneric, zip_error +from ..Stat import Stat, zip_stat +from .abstractIO import ioWrapper + +__all__ = ("source_begin_write", "source_begin_write_cloning", "source_close", "source_commit_write", "file_attributes_init", "source_get_file_attributes", "source_is_deleted", "source_open", "source_seek_compute_offset", "source_stat", "source_read", "source_write") + + +def _checkSourceErrorCode(src: zip_source_ptr, isFailure, res: typing.Union[int, typing.Any], *args) -> typing.Any: + return _checkErrorCodeGeneric(src, f.source_error, None, isFailure, res=res, *args) + + +def _checkFileErrorCodeResult(src: zip_source_ptr, res: typing.Union[int, typing.Any], *args) -> typing.Any: + return _checkSourceErrorCode(src, lambda res: res != 0, res=res, *args) + + +def _checkFileErrorCodeIO(src: zip_source_ptr, res: typing.Union[int, typing.Any], *args) -> typing.Any: + return _checkSourceErrorCode(src, lambda res: res < 0, res=res, *args) + + +source_read = ioWrapper(f.source_read, _checkFileErrorCodeIO, zip_source_ptr) +source_write = ioWrapper(f.source_write, _checkFileErrorCodeIO, zip_source_ptr) + + +def source_begin_write(src: zip_source_ptr) -> None: + _checkFileErrorCodeResult(src, f.source_begin_write(src)) + + +def source_begin_write_cloning(src: zip_source_ptr, offset: c_uint64) -> None: + _checkFileErrorCodeResult(src, f.source_begin_write_cloning(src, offset)) + + +def source_close(src: zip_source_ptr) -> None: + _checkFileErrorCodeResult(src, f.source_close(src)) + + +def source_commit_write(src: zip_source_ptr) -> None: + _checkFileErrorCodeResult(src, f.source_commit_write(src)) + + +def file_attributes_init() -> zip_file_attributes: + res = zip_file_attributes() + f.file_attributes_init(byref(res)) + return res + + +def source_get_file_attributes(src: zip_source_ptr) -> c_int: + attrs = zip_file_attributes() + _checkFileErrorCodeResult(src, f.source_get_file_attributes(src, byref(attrs))) + return attrs + + +def source_is_deleted(src: zip_source_ptr) -> bool: + return bool(f.source_is_deleted(src)) + + +# def source_make_command_bitmap(): +# l.zip_source_make_command_bitmap() +# = _variadic_function(, c_int64, [zip_source_cmd], None) + + +def source_open(src: zip_source_ptr) -> None: + _checkFileErrorCodeResult(src, f.source_open(src)) + + +def source_seek_compute_offset(offset: c_uint64, length: c_uint64, data: c_void_p, data_length: c_uint64) -> c_int64: + err = zip_error(0) + res = f.source_seek_compute_offset(offset, length, data, data_length, byref(err)) + if err.value: + raise LibZipError(err.value) + return res + + +def source_stat(src: zip_source_ptr) -> Stat: + res = zip_stat() + _checkFileErrorCodeResult(src, f.source_stat(src, byref(res))) + return Stat(res) diff --git a/libzip/Stat.py b/libzip/Stat.py new file mode 100644 index 0000000..85887e1 --- /dev/null +++ b/libzip/Stat.py @@ -0,0 +1,166 @@ +import typing +from collections.abc import ByteString +from ctypes import POINTER, byref, c_int, c_uint64 +from datetime import datetime +from pathlib import PurePath + +try: + from enum import _decompose +except ImportError: + + def _decompose(cls, enum): + return tuple(self.what) + + +from .ctypes import functions as f +from .ctypes._inttypes import zip_flags +from .ctypes.opaque import zip_ptr +from .ctypes.structs import zip_stat +from .enums import CompressionMethod, EncryptionMethod, ZipStat +from .Error import _checkArchiveErrorCode +from .utils import PathT, acceptPathOrStrOrBytes + + +def stat(za: zip_ptr, fileNameOrIndex: typing.Union[PathT, int], what: ZipStat = ZipStat.everything) -> c_int: + res = zip_stat() + + if isinstance(fileNameOrIndex, int): + err = f.stat_index(za, c_uint64(fileNameOrIndex), zip_flags(int(what)), byref(res)) + elif isinstance(fileNameOrIndex, (PurePath, ByteString, str)): + err = f.stat(za, acceptPathOrStrOrBytes(fileNameOrIndex), zip_flags(int(what)), byref(res)) + else: + raise TypeError("fileNameOrIndex", "Must be either a file name or its index") + + _checkArchiveErrorCode(za, err, fileNameOrIndex) + + return Stat(res) + + +def stat_init(st: POINTER(zip_stat)) -> None: + return f.stat_init(st) + + +class SubscriptableDateTime(datetime): + __slots__ = () + + INT2ATTRMAP = ("year", "month", "day", "hour", "minute", "second") + + def __getitem__(self, k) -> typing.Union[int, typing.Tuple[int, ...]]: + k = self.__class__.INT2ATTRMAP[k] + if isinstance(k, int): + return getattr(self, k) + + return tuple(getattr(self, el) for el in k) + + +class Stat: + __slots__ = ("res",) + + @property + def name(self): + if self.what & ZipStat.name: + return self.res.name + return None + + @property + def index(self): + if self.what & ZipStat.index: + return self.res.index + return None + + @index.setter + def index(self, v): + self.res.index = v + self.what |= ZipStat.index + + @property + def originalSize(self): + if self.what & ZipStat.originalSize: + return self.res.size + return None + + @originalSize.setter + def originalSize(self, v): + self.res.size = v + self.what |= ZipStat.originalSize + + @property + def compressedSize(self): + if self.what & ZipStat.compressedSize: + return self.res.comp_size + return None + + @compressedSize.setter + def compressedSize(self, v): + self.res.comp_size = v + self.what |= ZipStat.compressedSize + + @property + def modificationTime(self): + if self.what & ZipStat.modificationTime: + return SubscriptableDateTime.fromtimestamp(self.res.mtime) + return None + + @modificationTime.setter + def modificationTime(self, v): + self.res.mtime = v + self.what |= ZipStat.modificationTime + + @property + def crc(self): + if self.what & ZipStat.crc: + return self.res.crc + return None + + @crc.setter + def crc(self, v): + self.res.crc = v + self.what |= ZipStat.crc + + @property + def compressionMethod(self): + if self.what & ZipStat.compressionMethod: + return CompressionMethod(self.res.comp_method) + return None + + @compressionMethod.setter + def compressionMethod(self, v): + self.res.comp_method = v + self.what |= ZipStat.compressionMethod + + @property + def encryptionMethod(self): + if self.what & ZipStat.encryptionMethod: + return EncryptionMethod(self.res.encryption_method) + return None + + @encryptionMethod.setter + def encryptionMethod(self, v): + self.res.encryption_method = v + self.what |= ZipStat.encryptionMethod + + @property + def flags(self): + if self.what & ZipStat.flags: + return self.res.flags + return None + + @flags.setter + def flags(self, v): + self.res.flags = v + self.what |= ZipStat.flags + + @property + def what(self) -> ZipStat: + return ZipStat(self.res.valid) + + @what.setter + def what(self, v: ZipStat): + self.res.valid |= int(v) + + def __init__(self, res: zip_stat) -> None: + self.res = res + + def __repr__(self): + members, _uncovered = _decompose(self.what.__class__, self.what) + return self.__class__.__name__ + "<" + ", ".join((el.name + "=" + repr(getattr(self, el.name))) for el in sorted(members)) + ">" diff --git a/libzip/__init__.py b/libzip/__init__.py new file mode 100644 index 0000000..3531ed5 --- /dev/null +++ b/libzip/__init__.py @@ -0,0 +1,11 @@ +import ctypes.util +import glob +import os.path +import platform +import re + +from .enums import * + +ZIP_UINT16_MAX = 0xFFFF +ZIP_EXTRA_FIELD_ALL = ZIP_UINT16_MAX +ZIP_EXTRA_FIELD_NEW = ZIP_UINT16_MAX diff --git a/libzip/ctypes/__init__.py b/libzip/ctypes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/libzip/ctypes/_ctypesgen_preamble.py b/libzip/ctypes/_ctypesgen_preamble.py new file mode 100644 index 0000000..7bf545a --- /dev/null +++ b/libzip/ctypes/_ctypesgen_preamble.py @@ -0,0 +1,25 @@ +# pylint:disable=too-few-public-methods + +__all__ = ("_variadic_function",) + + +class _variadic_function: + def __init__(self, func, restype, argtypes, errcheck): + self.func = func + self.func.restype = restype + self.argtypes = argtypes + if errcheck: + self.func.errcheck = errcheck + + def _as_parameter_(self): + # So we can pass this variadic function as a function pointer + return self.func + + def __call__(self, *args): + fixed_args = [] + i = 0 + for argtype in self.argtypes: + # Type check what we can + fixed_args.append(argtype.from_param(args[i])) + i += 1 + return self.func(*fixed_args + list(args[i:])) diff --git a/libzip/ctypes/_funcToCtypesSignatureConvertor.py b/libzip/ctypes/_funcToCtypesSignatureConvertor.py new file mode 100644 index 0000000..faa3bc5 --- /dev/null +++ b/libzip/ctypes/_funcToCtypesSignatureConvertor.py @@ -0,0 +1,36 @@ +TRACE_NATIVE_CALLS = False + +if TRACE_NATIVE_CALLS: + import sys + + def _traceArgs(name, argNames, args, kwargs, rawFunc): + identifiedArgNames = argNames[: len(args)] + positionalKwargs = dict(zip(identifiedArgNames, args)) + fullKwargs = {} + fullKwargs.update(positionalKwargs) + fullKwargs.update(kwargs) + argsStr = "(" + ", ".join(k + "=" + repr(v) for k, v in fullKwargs.items()) + ")" + callString = name + argsStr + sys.stderr.flush() + sys.stderr.write(callString) + sys.stderr.flush() + res = rawFunc(*args, **kwargs) + sys.stderr.write("\33[2K\r") + sys.stderr.write(callString + " -> " + repr(res) + "\n") + sys.stderr.flush() + return res + + +def assignTypesFromFunctionSignature(func, lib): + rawFunc = getattr(lib, "zip_" + func.__name__) + argNames = func.__code__.co_varnames[: func.__code__.co_argcount] + rawFunc.argtypes = [func.__annotations__[argName] for argName in argNames] + rawFunc.restype = func.__annotations__["return"] + + if TRACE_NATIVE_CALLS: + + def tracerFunc(*args, **kwargs): + return _traceArgs(func.__name__, argNames, args, kwargs, rawFunc) + + return tracerFunc + return rawFunc diff --git a/libzip/ctypes/_inttypes.py b/libzip/ctypes/_inttypes.py new file mode 100644 index 0000000..d7b1418 --- /dev/null +++ b/libzip/ctypes/_inttypes.py @@ -0,0 +1,7 @@ +from ctypes import c_int, c_int64, c_long, c_uint32 + +time_t = c_long + +zip_source_cmd = c_int +zip_flags = c_uint32 +zip_index = c_int64 diff --git a/libzip/ctypes/callbacks.py b/libzip/ctypes/callbacks.py new file mode 100644 index 0000000..530e422 --- /dev/null +++ b/libzip/ctypes/callbacks.py @@ -0,0 +1,9 @@ +from ctypes import CFUNCTYPE, c_double, c_int, c_int64, c_uint64, c_void_p + +from ._inttypes import zip_source_cmd +from .opaque import zip_ptr + +zip_source_callback = CFUNCTYPE(c_int64, c_void_p, c_void_p, c_uint64, zip_source_cmd) +zip_progress_callback = CFUNCTYPE(None, zip_ptr, c_double, c_void_p) +zip_cancel_callback = CFUNCTYPE(c_int, zip_ptr, c_void_p) +zip_progress_callback_t = CFUNCTYPE(None, c_double) diff --git a/libzip/ctypes/functions.py b/libzip/ctypes/functions.py new file mode 100644 index 0000000..4d368c0 --- /dev/null +++ b/libzip/ctypes/functions.py @@ -0,0 +1,737 @@ +from ctypes import CFUNCTYPE, POINTER, c_char_p, c_double, c_int, c_int8, c_int16, c_int32, c_int64, c_uint8, c_uint16, c_uint32, c_uint64, c_void_p + +from deprecation import deprecated + +from ..version import LIB_VERSION +from ._ctypesgen_preamble import _variadic_function +from ._funcToCtypesSignatureConvertor import assignTypesFromFunctionSignature as atffs +from ._inttypes import zip_flags, zip_index, zip_source_cmd +from .callbacks import zip_cancel_callback, zip_progress_callback, zip_source_callback +from .library import lib +from .opaque import FILE_P, zip_file_ptr, zip_ptr, zip_source_ptr +from .structs import time_t, zip_buffer_fragment, zip_error, zip_file_attributes, zip_stat + +# pylint:disable=too-many-arguments + + +def archive_set_tempdir(za: zip_ptr, tempdir: c_char_p) -> c_int: + return _archive_set_tempdir(za, tempdir) + + +_archive_set_tempdir = atffs(archive_set_tempdir, lib) + + +def close(za: zip_ptr) -> c_int: + return _close(za) + + +_close = atffs(close, lib) + + +def delete(za: zip_ptr, idx: zip_index) -> c_int: + return _delete(za, idx) + + +_delete = atffs(delete, lib) + + +def dir_add(za: zip_ptr, name: c_char_p, flags: zip_flags) -> c_int64: + return _dir_add(za, name, flags) + + +_dir_add = atffs(dir_add, lib) + + +def discard(za: zip_ptr) -> None: + return _discard(za) + + +_discard = atffs(discard, lib) + + +def get_error(za: zip_ptr) -> POINTER(zip_error): + return _get_error(za) + + +_get_error = atffs(get_error, lib) + + +def error_clear(za: zip_ptr) -> None: + return _error_clear(za) + + +_error_clear = atffs(error_clear, lib) + + +def error_code_zip(error: POINTER(zip_error)) -> c_int: + return _error_code_zip(error) + + +_error_code_zip = atffs(error_code_zip, lib) + + +def error_code_system(error: POINTER(zip_error)) -> c_int: + return _error_code_system(error) + + +_error_code_system = atffs(error_code_system, lib) + + +def error_fini(err: POINTER(zip_error)) -> None: + return _error_fini(err) + + +_error_fini = atffs(error_fini, lib) + + +def error_init(err: POINTER(zip_error)) -> None: + return _error_init(err) + + +_error_init = atffs(error_init, lib) + + +def error_init_with_code(error: POINTER(zip_error), ze: c_int) -> None: + return _error_init_with_code(error, ze) + + +_error_init_with_code = atffs(error_init_with_code, lib) + + +def error_set(err: POINTER(zip_error), ze: c_int, se: c_int) -> None: + return _error_set(err, ze, se) + + +_error_set = atffs(error_set, lib) + + +def error_strerror(err: POINTER(zip_error)) -> c_char_p: + return _error_strerror(err) + + +_error_strerror = atffs(error_strerror, lib) + + +def error_system_type(error: POINTER(zip_error)) -> c_int: + return _error_system_type(error) + + +_error_system_type = atffs(error_system_type, lib) + + +def error_to_data(error: POINTER(zip_error), data: c_void_p, length: c_uint64) -> c_int64: + return _error_to_data(error, data, length) + + +_error_to_data = atffs(error_to_data, lib) + + +def fclose(zf: zip_file_ptr) -> c_int: + return _fclose(zf) + + +_fclose = atffs(fclose, lib) + + +def fdopen(fd_orig: c_int, _flags: c_int, errorp: POINTER(c_int)) -> zip_ptr: + return _fdopen(fd_orig, _flags, errorp) + + +_fdopen = atffs(fdopen, lib) + + +def file_add(za: zip_ptr, name: c_char_p, source: zip_source_ptr, flags: zip_flags) -> c_int64: + return _file_add(za, name, source, flags) + + +_file_add = atffs(file_add, lib) + + +def file_attributes_init(attributes: POINTER(zip_file_attributes)) -> None: + return _file_attributes_init(attributes) + + +_file_attributes_init = atffs(file_attributes_init, lib) + + +def file_error_clear(zf: zip_file_ptr) -> None: + return _file_error_clear(zf) + + +_file_error_clear = atffs(file_error_clear, lib) + + +def file_extra_field_delete(za: zip_ptr, idx: zip_index, ef_idx: c_uint16, flags: zip_flags) -> c_int: + return _file_extra_field_delete(za, idx, ef_idx, flags) + + +_file_extra_field_delete = atffs(file_extra_field_delete, lib) + + +def file_extra_field_delete_by_id(za: zip_ptr, idx: zip_index, ef_id: c_uint16, ef_idx: c_uint16, flags: zip_flags) -> c_int: + return _file_extra_field_delete_by_id(za, idx, ef_id, ef_idx, flags) + + +_file_extra_field_delete_by_id = atffs(file_extra_field_delete_by_id, lib) + + +def file_extra_field_set(za: zip_ptr, idx: zip_index, ef_id: c_uint16, ef_idx: c_uint16, data: POINTER(c_uint8), length: c_uint16, flags: zip_flags) -> c_int: + return _file_extra_field_set(za, idx, ef_id, ef_idx, data, length, flags) + + +_file_extra_field_set = atffs(file_extra_field_set, lib) + + +def file_extra_fields_count(za: zip_ptr, idx: zip_index, flags: zip_flags) -> c_int16: + return _file_extra_fields_count(za, idx, flags) + + +_file_extra_fields_count = atffs(file_extra_fields_count, lib) + + +def file_extra_fields_count_by_id(za: zip_ptr, idx: zip_index, ef_id: c_uint16, flags: zip_flags) -> c_int16: + return _file_extra_fields_count_by_id(za, idx, ef_id, flags) + + +_file_extra_fields_count_by_id = atffs(file_extra_fields_count_by_id, lib) + + +def file_extra_field_get(za: zip_ptr, idx: zip_index, ef_idx: c_uint16, idp: POINTER(c_uint16), lenp: POINTER(c_uint16), flags: zip_flags) -> POINTER(c_uint8): + return _file_extra_field_get(za, idx, ef_idx, idp, lenp, flags) + + +_file_extra_field_get = atffs(file_extra_field_get, lib) + + +def file_extra_field_get_by_id(za: zip_ptr, idx: zip_index, ef_id: c_uint16, ef_idx: c_uint16, lenp: POINTER(c_uint16), flags: zip_flags) -> POINTER(c_uint8): + return _file_extra_field_get_by_id(za, idx, ef_id, ef_idx, lenp, flags) + + +_file_extra_field_get_by_id = atffs(file_extra_field_get_by_id, lib) + + +def file_get_comment(za: zip_ptr, idx: zip_index, lenp: POINTER(c_uint32), flags: zip_flags) -> c_char_p: + return _file_get_comment(za, idx, lenp, flags) + + +_file_get_comment = atffs(file_get_comment, lib) + + +def file_get_error(f: zip_file_ptr) -> POINTER(zip_error): + return _file_get_error(f) + + +_file_get_error = atffs(file_get_error, lib) + + +def file_get_external_attributes(za: zip_ptr, idx: zip_index, flags: zip_flags, opsys: POINTER(c_uint8), attributes: POINTER(c_uint32)) -> c_int: + return _file_get_external_attributes(za, idx, flags, opsys, attributes) + + +_file_get_external_attributes = atffs(file_get_external_attributes, lib) + + +def file_rename(za: zip_ptr, idx: zip_index, name: c_char_p, flags: zip_flags) -> c_int: + return _file_rename(za, idx, name, flags) + + +_file_rename = atffs(file_rename, lib) + + +def file_replace(za: zip_ptr, idx: zip_index, source: zip_source_ptr, flags: zip_flags) -> c_int: + return _file_replace(za, idx, source, flags) + + +_file_replace = atffs(file_replace, lib) + + +def file_set_comment(za: zip_ptr, idx: zip_index, comment: c_char_p, length: c_uint16, flags: zip_flags) -> c_int: + return _file_set_comment(za, idx, comment, length, flags) + + +_file_set_comment = atffs(file_set_comment, lib) + + +def file_set_dostime(za: zip_ptr, idx: zip_index, dtime: c_uint16, ddate: c_uint16, flags: zip_flags) -> c_int: + return _file_set_dostime(za, idx, dtime, ddate, flags) + + +_file_set_dostime = atffs(file_set_dostime, lib) + + +def file_set_encryption(za: zip_ptr, idx: zip_index, method: c_uint16, password: c_char_p) -> c_int: + return _file_set_encryption(za, idx, method, password) + + +_file_set_encryption = atffs(file_set_encryption, lib) + + +def file_set_external_attributes(za: zip_ptr, idx: zip_index, flags: zip_flags, opsys: c_uint8, attributes: c_uint32) -> c_int: + return _file_set_external_attributes(za, idx, flags, opsys, attributes) + + +_file_set_external_attributes = atffs(file_set_external_attributes, lib) + + +def file_set_mtime(za: zip_ptr, idx: zip_index, mtime: time_t, flags: zip_flags) -> c_int: + return _file_set_mtime(za, idx, mtime, flags) + + +_file_set_mtime = atffs(file_set_mtime, lib) + + +def file_strerror(zf: zip_file_ptr) -> c_char_p: + return _file_strerror(zf) + + +_file_strerror = atffs(file_strerror, lib) + + +def fopen(za: zip_ptr, fname: c_char_p, flags: zip_flags) -> zip_file_ptr: + return _fopen(za, fname, flags) + + +_fopen = atffs(fopen, lib) +fopen = deprecated(deprecated_in=None, removed_in=None, current_version=None, details="fopen_index")(fopen) + + +def fopen_encrypted(za: zip_ptr, fname: c_char_p, flags: zip_flags, password: c_char_p) -> zip_file_ptr: + return _fopen_encrypted(za, fname, flags, password) + + +_fopen_encrypted = atffs(fopen_encrypted, lib) +fopen_encrypted = deprecated(deprecated_in=None, removed_in=None, current_version=None, details="fopen_index_encrypted")(fopen_encrypted) + + +def fopen_index(za: zip_ptr, index: c_uint64, flags: zip_flags) -> zip_file_ptr: + return _fopen_index(za, index, flags) + + +_fopen_index = atffs(fopen_index, lib) + + +def fopen_index_encrypted(za: zip_ptr, index: c_uint64, flags: zip_flags, password: c_char_p) -> zip_file_ptr: + return _fopen_index_encrypted(za, index, flags, password) + + +_fopen_index_encrypted = atffs(fopen_index_encrypted, lib) + + +def fread(zf: zip_file_ptr, outbuf: c_void_p, toread: c_uint64) -> c_int64: + return _fread(zf, outbuf, toread) + + +_fread = atffs(fread, lib) + + +def fseek(zf: zip_file_ptr, offset: c_int64, whence: c_int) -> c_int8: + return _fseek(zf, offset, whence) + + +_fseek = atffs(fseek, lib) + + +if LIB_VERSION >= (1, 9, 0): + + def file_is_seekable(zf: zip_file_ptr) -> c_int: + return _file_is_seekable(zf) + + _file_is_seekable = atffs(file_is_seekable, lib) + + +def ftell(zf: zip_file_ptr) -> c_int64: + return _ftell(zf) + + +_ftell = atffs(ftell, lib) + + +def get_archive_comment(za: zip_ptr, lenp: POINTER(c_int), flags: zip_flags) -> c_char_p: + return _get_archive_comment(za, lenp, flags) + + +_get_archive_comment = atffs(get_archive_comment, lib) + + +def get_archive_flag(za: zip_ptr, flag: zip_flags, flags: zip_flags) -> c_int: + return _get_archive_flag(za, flag, flags) + + +_get_archive_flag = atffs(get_archive_flag, lib) + + +def get_name(za: zip_ptr, idx: zip_index, flags: zip_flags) -> c_char_p: + return _get_name(za, idx, flags) + + +_get_name = atffs(get_name, lib) + + +def get_num_entries(za: zip_ptr, flags: zip_flags) -> c_int64: + return _get_num_entries(za, flags) + + +_get_num_entries = atffs(get_num_entries, lib) + + +def name_locate(za: zip_ptr, fname: c_char_p, flags: zip_flags) -> c_int64: + return _name_locate(za, fname, flags) + + +_name_locate = atffs(name_locate, lib) + + +def open(src: c_char_p, flags: c_int, error: POINTER(c_int)) -> zip_ptr: # pylint:disable=redefined-builtin + return _open(src, flags, error) + + +_open = atffs(open, lib) + + +def open_from_source(src: zip_source_ptr, _flags: c_int, error: POINTER(zip_error)) -> zip_ptr: + return _open_from_source(src, _flags, error) + + +_open_from_source = atffs(open_from_source, lib) +userDataFreer = CFUNCTYPE(None, c_void_p) +userDataFreer_p = POINTER(userDataFreer) + + +def register_progress_callback_with_state(za: zip_ptr, precision: c_double, callback: zip_progress_callback, ud_free: userDataFreer_p, ud: c_void_p) -> None: + return _register_progress_callback_with_state(za, precision, callback, ud_free, ud) + + +_register_progress_callback_with_state = atffs(register_progress_callback_with_state, lib) + + +def register_cancel_callback_with_state(za: zip_ptr, callback: zip_cancel_callback, ud_free: userDataFreer_p, ud: c_void_p) -> None: + return _register_cancel_callback_with_state(za, callback, ud_free, ud) + + +_register_cancel_callback_with_state = atffs(register_cancel_callback_with_state, lib) + + +def set_archive_comment(za: zip_ptr, comment: c_char_p, length: c_uint16) -> c_int: + return _set_archive_comment(za, comment, length) + + +_set_archive_comment = atffs(set_archive_comment, lib) + + +def set_archive_flag(za: zip_ptr, flag: zip_flags, value: c_int) -> c_int: + return _set_archive_flag(za, flag, value) + + +_set_archive_flag = atffs(set_archive_flag, lib) + + +def set_default_password(za: zip_ptr, passwd: c_char_p) -> c_int: + return _set_default_password(za, passwd) + + +_set_default_password = atffs(set_default_password, lib) + + +def set_file_compression(za: zip_ptr, idx: zip_index, method: c_int32, flags: c_uint32) -> c_int: + return _set_file_compression(za, idx, method, flags) + + +_set_file_compression = atffs(set_file_compression, lib) + + +def source_begin_write(src: zip_source_ptr) -> c_int: + return _source_begin_write(src) + + +_source_begin_write = atffs(source_begin_write, lib) + + +def source_begin_write_cloning(src: zip_source_ptr, offset: c_uint64) -> c_int: + return _source_begin_write_cloning(src, offset) + + +_source_begin_write_cloning = atffs(source_begin_write_cloning, lib) + + +def source_buffer(za: zip_ptr, data: c_void_p, length: c_uint64, freep: c_int) -> zip_source_ptr: + return _source_buffer(za, data, length, freep) + + +_source_buffer = atffs(source_buffer, lib) +source_buffer = deprecated(deprecated_in=None, removed_in=None, current_version=None, details="source_buffer_create")(source_buffer) + + +def source_buffer_create(data: c_void_p, length: c_uint64, freep: c_int, error: POINTER(zip_error)) -> zip_source_ptr: + return _source_buffer_create(data, length, freep, error) + + +_source_buffer_create = atffs(source_buffer_create, lib) + + +def source_buffer_fragment(za: zip_ptr, fragments: POINTER(zip_buffer_fragment), nfragments: c_uint64, freep: c_int) -> zip_source_ptr: + return _source_buffer_fragment(za, fragments, nfragments, freep) + + +_source_buffer_fragment = atffs(source_buffer_fragment, lib) +source_buffer_fragment = deprecated(deprecated_in=None, removed_in=None, current_version=None, details="source_buffer_fragment_create")(source_buffer_fragment) + + +def source_buffer_fragment_create(fragments: POINTER(zip_buffer_fragment), nfragments: c_uint64, freep: c_int, error: POINTER(zip_error)) -> zip_source_ptr: + return _source_buffer_fragment_create(fragments, nfragments, freep, error) + + +_source_buffer_fragment_create = atffs(source_buffer_fragment_create, lib) + + +def source_close(src: zip_source_ptr) -> c_int: + return _source_close(src) + + +_source_close = atffs(source_close, lib) + + +def source_commit_write(src: zip_source_ptr) -> c_int: + return _source_commit_write(src) + + +_source_commit_write = atffs(source_commit_write, lib) + + +def source_error(src: zip_source_ptr) -> POINTER(zip_error): + return _source_error(src) + + +_source_error = atffs(source_error, lib) + + +def source_file(za: zip_ptr, fname: c_char_p, start: c_uint64, length: c_int64) -> zip_source_ptr: + return _source_file(za, fname, start, length) + + +_source_file = atffs(source_file, lib) +source_file = deprecated(deprecated_in=None, removed_in=None, current_version=None, details="source_file_create")(source_file) + + +def source_file_create(fname: c_char_p, start: c_uint64, length: c_int64, error: POINTER(zip_error)) -> zip_source_ptr: + return _source_file_create(fname, start, length, error) + + +_source_file_create = atffs(source_file_create, lib) + + +def source_filep(za: zip_ptr, file: FILE_P, start: c_uint64, length: c_int64) -> zip_source_ptr: + return _source_filep(za, file, start, length) + + +_source_filep = atffs(source_filep, lib) +source_filep = deprecated(deprecated_in=None, removed_in=None, current_version=None, details="source_filep_create")(source_filep) + + +def source_filep_create(file: FILE_P, start: c_uint64, length: c_int64, error: POINTER(zip_error)) -> zip_source_ptr: + return _source_filep_create(file, start, length, error) + + +_source_filep_create = atffs(source_filep_create, lib) + + +def source_free(src: zip_source_ptr) -> None: + return _source_free(src) + + +_source_free = atffs(source_free, lib) + + +def source_function(za: zip_ptr, zcb: zip_source_callback, ud: c_void_p) -> zip_source_ptr: + return _source_function(za, zcb, ud) + + +_source_function = atffs(source_function, lib) +source_function = deprecated(deprecated_in=None, removed_in=None, current_version=None, details="source_function_create")(source_function) + + +def source_function_create(zcb: zip_source_callback, ud: c_void_p, error: POINTER(zip_error)) -> zip_source_ptr: + return _source_function_create(zcb, ud, error) + + +_source_function_create = atffs(source_function_create, lib) + + +def source_get_file_attributes(src: zip_source_ptr, attributes: POINTER(zip_file_attributes)) -> c_int: + return _source_get_file_attributes(src, attributes) + + +_source_get_file_attributes = atffs(source_get_file_attributes, lib) + + +def source_is_deleted(src: zip_source_ptr) -> c_int: + return _source_is_deleted(src) + + +_source_is_deleted = atffs(source_is_deleted, lib) + + +def source_keep(src: zip_source_ptr) -> None: + return _source_keep(src) + + +_source_keep = atffs(source_keep, lib) + + +_source_make_command_bitmap = _variadic_function(lib.zip_source_make_command_bitmap, c_int64, [zip_source_cmd], None) + + +def source_open(src: zip_source_ptr) -> c_int: + return _source_open(src) + + +_source_open = atffs(source_open, lib) + + +def source_read(src: zip_source_ptr, data: c_void_p, length: c_uint64) -> c_int64: + return _source_read(src, data, length) + + +_source_read = atffs(source_read, lib) + + +def source_rollback_write(src: zip_source_ptr) -> None: + return _source_rollback_write(src) + + +_source_rollback_write = atffs(source_rollback_write, lib) + + +def source_seek(src: zip_source_ptr, offset: c_int64, whence: c_int) -> c_int: + return _source_seek(src, offset, whence) + + +_source_seek = atffs(source_seek, lib) + + +def source_seek_compute_offset(offset: c_uint64, length: c_uint64, data: c_void_p, data_length: c_uint64, error: POINTER(zip_error)) -> c_int64: + return _source_seek_compute_offset(offset, length, data, data_length, error) + + +_source_seek_compute_offset = atffs(source_seek_compute_offset, lib) + + +def source_seek_write(src: zip_source_ptr, offset: c_int64, whence: c_int) -> c_int: + return _source_seek_write(src, offset, whence) + + +_source_seek_write = atffs(source_seek_write, lib) + + +def source_stat(src: zip_source_ptr, st: POINTER(zip_stat)) -> c_int: + return _source_stat(src, st) + + +_source_stat = atffs(source_stat, lib) + + +def source_tell(src: zip_source_ptr) -> c_int64: + return _source_tell(src) + + +_source_tell = atffs(source_tell, lib) + + +def source_tell_write(src: zip_source_ptr) -> c_int64: + return _source_tell_write(src) + + +_source_tell_write = atffs(source_tell_write, lib) + + +def source_write(src: zip_source_ptr, data: c_void_p, length: c_uint64) -> c_int64: + return _source_write(src, data, length) + + +_source_write = atffs(source_write, lib) + + +def source_zip(za: zip_ptr, srcza: zip_ptr, srcidx: c_uint64, flags: zip_flags, start: c_uint64, length: c_int64) -> zip_source_ptr: + return _source_zip(za, srcza, srcidx, flags, start, length) + + +_source_zip = atffs(source_zip, lib) + + +def stat(za: zip_ptr, fname: c_char_p, flags: zip_flags, st: POINTER(zip_stat)) -> c_int: + return _stat(za, fname, flags, st) + + +_stat = atffs(stat, lib) + + +def stat_index(za: zip_ptr, index: c_uint64, flags: zip_flags, st: POINTER(zip_stat)) -> c_int: + return _stat_index(za, index, flags, st) + + +_stat_index = atffs(stat_index, lib) + + +def stat_init(st: POINTER(zip_stat)) -> None: + return _stat_init(st) + + +_stat_init = atffs(stat_init, lib) + + +def strerror(za: zip_ptr) -> c_char_p: + return _strerror(za) + + +_strerror = atffs(strerror, lib) + + +def unchange(za: zip_ptr, idx: zip_index) -> c_int: + return _unchange(za, idx) + + +_unchange = atffs(unchange, lib) + + +def unchange_all(za: zip_ptr) -> c_int: + return _unchange_all(za) + + +_unchange_all = atffs(unchange_all, lib) + + +def unchange_archive(za: zip_ptr) -> c_int: + return _unchange_archive(za) + + +_unchange_archive = atffs(unchange_archive, lib) + + +def compression_method_supported(method: c_int32, compress: c_int) -> c_int: + return _compression_method_supported(method, compress) + + +_compression_method_supported = atffs(compression_method_supported, lib) + + +def encryption_method_supported(method: c_uint16, encode: c_int) -> c_int: + return _encryption_method_supported(method, encode) + + +_encryption_method_supported = atffs(encryption_method_supported, lib) + + +# Just to ignore them +_DEPRECATED_API = { + "register_progress_callback": (register_progress_callback_with_state,), + "add": (file_add, file_replace), + "add_dir": (dir_add,), + "get_file_comment": (file_get_comment,), + "set_file_comment": (file_set_comment,), + "get_num_files": (get_num_entries,), + "rename": (file_rename,), + "replace": (file_replace,), + "error_get_sys_type": (error_system_type,), + "error_get": (get_error, error_code_zip, error_code_system), + "error_to_str": (error_init_with_code, error_strerror), + "file_error_get": (file_get_error, error_code_zip, error_code_system), +} diff --git a/libzip/ctypes/library.py b/libzip/ctypes/library.py new file mode 100644 index 0000000..0c5e14e --- /dev/null +++ b/libzip/ctypes/library.py @@ -0,0 +1,16 @@ +import platform +from ctypes import CDLL, c_char_p + +from ._funcToCtypesSignatureConvertor import assignTypesFromFunctionSignature as atffs + +if platform.system() == "Windows": + lib = CDLL("libzip.dll") +else: + lib = CDLL("libzip.so") + + +def libzip_version() -> c_char_p: + return _libzip_version() + + +_libzip_version = atffs(libzip_version, lib) diff --git a/libzip/ctypes/opaque.py b/libzip/ctypes/opaque.py new file mode 100644 index 0000000..2fffcc6 --- /dev/null +++ b/libzip/ctypes/opaque.py @@ -0,0 +1,6 @@ +from ctypes import c_void_p + +zip_ptr = c_void_p +zip_file_ptr = c_void_p +zip_source_ptr = c_void_p +FILE_P = c_void_p diff --git a/libzip/ctypes/structs.py b/libzip/ctypes/structs.py new file mode 100644 index 0000000..8d686db --- /dev/null +++ b/libzip/ctypes/structs.py @@ -0,0 +1,90 @@ +from ctypes import POINTER, Structure, c_char_p, c_int, c_int64, c_uint8, c_uint16, c_uint32, c_uint64 + +from ._inttypes import time_t + +# pylint:disable=too-few-public-methods + + +class zip_source_args_seek(Structure): + __slots__ = ( + "offset", + "whence", + ) + _fields_ = ( + ("offset", c_int64), + ("whence", c_int), + ) + + +class zip_error(Structure): + __slots__ = ( + "zip_err", + "sys_err", + "str", + ) + _fields_ = ( + ("zip_err", c_int), + ("sys_err", c_int), + ("str", c_char_p), + ) + + +class zip_stat(Structure): + __slots__ = ( + "valid", + "name", + "index", + "size", + "comp_size", + "mtime", + "crc", + "comp_method", + "encryption_method", + "flags", + ) + _fields_ = ( + ("valid", c_uint64), + ("name", c_char_p), + ("index", c_uint64), + ("size", c_uint64), + ("comp_size", c_uint64), + ("mtime", time_t), + ("crc", c_uint32), + ("comp_method", c_uint16), + ("encryption_method", c_uint16), + ("flags", c_uint32), + ) + + +class zip_buffer_fragment(Structure): + __slots__ = ( + "data", + "length", + ) + _fields_ = ( + ("data", POINTER(c_uint8)), + ("length", c_uint64), + ) + + +class zip_file_attributes(Structure): + __slots__ = ( + "valid", + "version", + "host_system", + "ascii", + "version_needed", + "external_file_attributes", + "general_purpose_bit_flags", + "general_purpose_bit_mask", + ) + _fields_ = ( + ("valid", c_uint64), + ("version", c_uint8), + ("host_system", c_uint8), + ("ascii", c_uint8), + ("version_needed", c_uint8), + ("external_file_attributes", c_uint32), + ("general_purpose_bit_flags", c_uint16), + ("general_purpose_bit_mask", c_uint16), + ) diff --git a/libzip/ctypes/utils.py b/libzip/ctypes/utils.py new file mode 100644 index 0000000..808914b --- /dev/null +++ b/libzip/ctypes/utils.py @@ -0,0 +1,15 @@ +import typing +from collections.abc import ByteString +from ctypes import c_byte, c_void_p, cast + + +def byteStringToPointer(data: ByteString) -> typing.Tuple[c_void_p, int]: + size = len(data) + if isinstance(data, bytes): + buf = cast(data, c_void_p) + else: + bufT = c_byte * size + buf = bufT.from_buffer(data) + buf = cast(buf, c_void_p) + + return buf, size diff --git a/libzip/enums/CompressionMethod.py b/libzip/enums/CompressionMethod.py new file mode 100644 index 0000000..0a6de9c --- /dev/null +++ b/libzip/enums/CompressionMethod.py @@ -0,0 +1,33 @@ +import zipfile +from enum import IntEnum + +__all__ = ("CompressionMethod", "ZIP_CM") + + +class CompressionMethod(IntEnum): + """https://libzip.org/documentation/zip_set_file_compression.html""" + + default = -1 + store = ZIP_STORED = zipfile.ZIP_STORED + shrink = 1 + reduce_1 = 2 + reduce_2 = 3 + reduce_3 = 4 + reduce_4 = 5 + implode = 6 + deflate = ZIP_DEFLATED = zipfile.ZIP_DEFLATED + deflate64 = 9 + pkware_implode = 10 + bzip2 = ZIP_BZIP2 = zipfile.ZIP_BZIP2 + LZMA = ZIP_LZMA = zipfile.ZIP_LZMA + terse = 18 + lz77 = 19 + lzma2 = 33 + zstd = 93 + xz = 95 + jpeg = 96 + wavpack = 97 + ppmd = 98 + + +ZIP_CM = CompressionMethod diff --git a/libzip/enums/Flags.py b/libzip/enums/Flags.py new file mode 100644 index 0000000..e105600 --- /dev/null +++ b/libzip/enums/Flags.py @@ -0,0 +1,23 @@ +from enum import IntFlag + +__all__ = ("ZipFlags", "ZIP_FL") + + +class ZipFlags(IntFlag): + nocase = 1 + nodir = 1 << 1 + compressed = 1 << 2 + unchanged = 1 << 3 + recompress = 1 << 4 + encrypted = 1 << 5 + enc_guess = 0 + enc_raw = 1 << 6 + enc_strict = 1 << 7 + local = 1 << 8 + central = 1 << 9 + enc_utf_8 = 1 << 11 + enc_cp437 = 1 << 12 + overwrite = 1 << 13 + + +ZIP_FL = ZipFlags diff --git a/libzip/enums/OS.py b/libzip/enums/OS.py new file mode 100644 index 0000000..e20a998 --- /dev/null +++ b/libzip/enums/OS.py @@ -0,0 +1,30 @@ +from enum import IntEnum + +__all__ = ("OS", "ZIP_OPSYS") + + +class OS(IntEnum): + dos = 0 + amiga = 1 + openvms = 2 + unix = 3 + vm_cms = 4 + atari_st = 5 + os_2 = 6 + macintosh = 7 + z_system = 8 + cpm = 9 + windows_ntfs = 10 + mvs = 11 + vse = 12 + acorn_risc = 13 + vfat = 14 + alternate_mvs = 15 + beos = 16 + tandem = 17 + os_400 = 18 + os_x = 19 + default = unix + + +ZIP_OPSYS = OS diff --git a/libzip/enums/ZipError.py b/libzip/enums/ZipError.py new file mode 100644 index 0000000..f221050 --- /dev/null +++ b/libzip/enums/ZipError.py @@ -0,0 +1,54 @@ +from enum import IntEnum + +__all__ = ("ZipError", "ZIP_ER", "SystemErrorType", "ZIP_ET") + + +class ZipError(IntEnum): + ok = no_error = 0 + multidisk_not_supported = multidisk = 1 + renaming_tempfile_failed = rename = 2 + close = 3 + seek = 4 + read = 5 + write = 6 + crc = 7 + zip_closed = zipclosed = 8 + no_such_file = ENOENT = noent = 9 + already_exists = exists = 10 + open = 11 + tmp_open = tmpopen = 12 + zlib = 13 + bad_alloc = memory = 14 + entry_changed = changed = 15 + compression_not_supported = compnotsupp = 16 + eof = 17 + invalid_argument = inval = 18 + not_zip = no_zip = nozip = 19 + internal_error = internal = 20 + inconsistent = incons = 21 + remove = 22 + deleted = 23 + encryption_not_supported = encrnotsupp = 24 + read_only = rdonly = 25 + password_required = no_passwd = nopasswd = 26 + wrong_password = wrong_passwd = wrongpasswd = 27 + operation_not_supported = opnotsupp = 28 + resource_in_use = in_use = inuse = 29 + tell = 30 + compressed_data_invalid = compressed_data = 31 + cancelled = 32 + + +ZIP_ER = ZipError + + +class SystemErrorType(IntEnum): + """https://libzip.org/documentation/zip_error_system_type.html""" + + none = 0 + sys = 1 + zlib = 2 + libzip = 3 + + +ZIP_ET = SystemErrorType diff --git a/libzip/enums/ZipSource.py b/libzip/enums/ZipSource.py new file mode 100644 index 0000000..45618b9 --- /dev/null +++ b/libzip/enums/ZipSource.py @@ -0,0 +1,56 @@ +from enum import IntEnum, IntFlag + +__all__ = ("ZipSource", "ZIP_SOURCE", "ZipSourceB") + + +class ZipSource(IntEnum): + open = 0 + read = open + 1 + close = read + 1 + stat = close + 1 + error = stat + 1 + free = error + 1 + seek = free + 1 + tell = seek + 1 + begin_write = tell + 1 + commit_write = begin_write + 1 + rollback_write = commit_write + 1 + write = rollback_write + 1 + seek_write = write + 1 + tell_write = seek_write + 1 + supports = tell_write + 1 + remove = supports + 1 + reserved_1 = remove + 1 + begin_write_cloning = reserved_1 + 1 + accept_empty = begin_write_cloning + 1 + get_file_attributes = accept_empty + 1 + + +ZIP_SOURCE = ZipSource + + +class ZipSourceB(IntFlag): + open = 1 << ZipSource.open + read = 1 << ZipSource.read + close = 1 << ZipSource.close + stat = 1 << ZipSource.stat + error = 1 << ZipSource.error + free = 1 << ZipSource.free + seek = 1 << ZipSource.seek + tell = 1 << ZipSource.tell + begin_write = 1 << ZipSource.begin_write + commit_write = 1 << ZipSource.commit_write + rollback_write = 1 << ZipSource.rollback_write + write = 1 << ZipSource.write + seek_write = 1 << ZipSource.seek_write + tell_write = 1 << ZipSource.tell_write + supports = 1 << ZipSource.supports + remove = 1 << ZipSource.remove + reserved_1 = 1 << ZipSource.reserved_1 + begin_write_cloning = 1 << ZipSource.begin_write_cloning + accept_empty = 1 << ZipSource.accept_empty + get_file_attributes = 1 << ZipSource.get_file_attributes + + supports_readable = open | read | close | stat | error | free + supports_seekable = supports_readable | seek | tell | supports + supports_writable = supports_seekable | begin_write | commit_write | rollback_write | write | seek_write | tell_write | remove diff --git a/libzip/enums/__init__.py b/libzip/enums/__init__.py new file mode 100644 index 0000000..5ab4df6 --- /dev/null +++ b/libzip/enums/__init__.py @@ -0,0 +1,76 @@ +from enum import IntEnum, IntFlag + +from .CompressionMethod import * +from .Flags import * +from .OS import * +from .ZipError import * +from .ZipSource import * + + +class OpenFlags(IntFlag): + """https://libzip.org/documentation/zip_open.html""" + + read_write = create = ZIP_CREATE = 1 + dont_create = excl = ZIP_EXCL = 2 + check = check_consistency = checkcons = ZIP_CHECKCONS = 4 + overwrite = truncate = ZIP_TRUNCATE = 8 + read_only = rdonly = ZIP_RDONLY = 16 + + +modesRemap = { + "r": OpenFlags.read_only | OpenFlags.check, + "w": OpenFlags.overwrite | OpenFlags.check, + "x": OpenFlags.read_write | OpenFlags.dont_create | OpenFlags.check, + "a": OpenFlags.read_write | OpenFlags.check, +} + + +class ArchiveFlags(IntFlag): + read_only = rdonly = 2 + + +ZIP_AFL = ArchiveFlags + + +class EncryptionMethod(IntEnum): + """https://libzip.org/documentation/zip_file_set_encryption.html""" + + none = 0 + trad_pkware = 1 + aes_128 = 257 + aes_192 = 258 + aes_256 = 259 + unknown = 65535 + + +ZIP_EM = EncryptionMethod + + +class ZipStat(IntFlag): + """https://libzip.org/documentation/zip_stat.html""" + + name = 1 + index = 1 << 1 + originalSize = size = 1 << 2 + compressedSize = comp_size = 1 << 3 + modificationTime = mtime = 1 << 4 + crc = 1 << 5 + compressionMethod = comp_method = 1 << 6 + encryptionMethod = 1 << 7 + flags = 1 << 8 + + everything = 0xFFFFFFFF + + +ZIP_STAT = ZipStat + + +class FileAttrs(IntFlag): + host_system = 1 + ascii = 1 << 1 + version_needed = 1 << 2 + external_file_attributes = 1 << 3 + general_purpose_bit_flags = 1 << 4 + + +ZIP_FILE_ATTRIBUTES = FileAttrs diff --git a/libzip/middleLevel.py b/libzip/middleLevel.py new file mode 100644 index 0000000..1df3fd6 --- /dev/null +++ b/libzip/middleLevel.py @@ -0,0 +1,31 @@ +from ctypes import c_char_p, c_double, c_void_p + +from .ctypes import functions as f +from .ctypes.callbacks import zip_cancel_callback, zip_progress_callback +from .ctypes.opaque import zip_ptr +from .enums import CompressionMethod, EncryptionMethod +from .version import libzip_version # pylint:disable=unused-import + + +def discard(za: zip_ptr) -> None: + return f.discard(za) + + +def register_progress_callback_with_state(za: zip_ptr, precision: c_double, callback: zip_progress_callback, ud_free: f.userDataFreer_p, ud: c_void_p) -> None: + return f.register_progress_callback_with_state(za, precision, callback, ud_free, ud) + + +def register_cancel_callback_with_state(za: zip_ptr, callback: zip_cancel_callback, ud_free: f.userDataFreer_p, ud: c_void_p) -> None: + return f.register_cancel_callback_with_state(za, callback, ud_free, ud) + + +def strerror(za: zip_ptr) -> c_char_p: + return f.strerror(za) + + +def compression_method_supported(method: CompressionMethod, compress: bool) -> bool: + return bool(f.compression_method_supported(int(method), int(compress))) + + +def encryption_method_supported(method: EncryptionMethod, encrypt: bool) -> bool: + return bool(f.encryption_method_supported(int(method), int(encrypt))) diff --git a/libzip/py.typed b/libzip/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/libzip/utils/__init__.py b/libzip/utils/__init__.py new file mode 100644 index 0000000..f01b77b --- /dev/null +++ b/libzip/utils/__init__.py @@ -0,0 +1,38 @@ +import typing +from collections.abc import ByteString +from pathlib import PurePath + +AnyStr = typing.Union[ByteString, str] +PathT = typing.Union[AnyStr, PurePath] + + +def acceptStrOrBytes(s: AnyStr) -> ByteString: + if not isinstance(s, ByteString): + s = s.encode("utf-8") + assert isinstance(s, ByteString), "Must be bytes, a string or convertible to them, but " + repr(s) + " was the result of the possible conversions" + return s + + +def acceptPathOrStrOrBytes(p: PathT) -> ByteString: + if not isinstance(p, (PurePath, str, ByteString)): + p = p.__path__() + + if isinstance(p, PurePath): + p = str(p) + + assert isinstance(p, (PurePath, str, ByteString)), "Must be bytes, a string or a [Pure]Path or convertible to them, but " + repr(p) + " was the result of the possible conversions" + + return acceptStrOrBytes(p) + + +def toPurePathOrStrOrBytes(v: ByteString) -> PathT: + if isinstance(v, ByteString): + try: + v = str(v, "utf-8") + except UnicodeDecodeError: + pass + + if isinstance(v, str): + v = PurePath(v) + + return v diff --git a/libzip/utils/dosTime.py b/libzip/utils/dosTime.py new file mode 100644 index 0000000..35c0c5d --- /dev/null +++ b/libzip/utils/dosTime.py @@ -0,0 +1,16 @@ +import typing +from datetime import date, time + + +def timeToDosTimeInt(dosTime: typing.Optional[time]) -> int: + if dosTime: + return dosTime.hour << 11 | dosTime.minute << 5 | (dosTime.second // 2) + + return 0 + + +def dateToDosDateInt(dosDate: typing.Optional[date]) -> int: + if dosDate: + return (dosDate.year - 1980) << 9 | dosDate.month << 5 | dosDate.day + + return 0 diff --git a/libzip/version.py b/libzip/version.py new file mode 100644 index 0000000..ef3d09c --- /dev/null +++ b/libzip/version.py @@ -0,0 +1,12 @@ +import typing + +from .ctypes import library as lib + +__all__ = ("libzip_version", "LIB_VERSION") + + +def libzip_version() -> typing.Tuple[int]: + return tuple(int(el) for el in lib.libzip_version().split(b".")) + + +LIB_VERSION = libzip_version() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..89bc2e1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,37 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools>=61.2", "setuptools_scm[toml]>=3.4.3", "wheel"] + +[project] +name = "libzip" +description = "ctypes-based bindings to libzip" +readme = "ReadMe.md" +keywords = ["libzip", "zip",] +license = {text = "Unlicense"} +authors = [{name = "KOLANICH"}] +requires-python = ">=3.4" +dependencies = [] +dynamic = ["version"] +classifiers = [ + "Development Status :: 4 - Beta", + "Environment :: Other Environment", + "Intended Audience :: Developers", + "License :: Public Domain", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Topic :: Software Development :: Libraries :: Python Modules", +] +[project.urls] +Homepage = "https://codeberg.org/KOLANICH-libs/libzip.py" + + +[tool.setuptools] +zip-safe = true +include-package-data = false + +[tool.setuptools.packages] +find = {namespaces = false} + +[tool.setuptools_scm] diff --git a/tests/tests.py b/tests/tests.py new file mode 100755 index 0000000..4521a30 --- /dev/null +++ b/tests/tests.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +import sys +from pathlib import Path +import unittest +import itertools, re +import colorama + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from collections import OrderedDict + +dict = OrderedDict + +import libzip +from libzip import * + + +class Tests(unittest.TestCase): + + def testSimple(self): + raise NotImplementedError + + +if __name__ == "__main__": + unittest.main() diff --git a/tutorial.ipynb b/tutorial.ipynb new file mode 100644 index 0000000..a075a2d --- /dev/null +++ b/tutorial.ipynb @@ -0,0 +1,492 @@ +{ + "cells": [{ + "cell_type": "markdown", + "id": "first-folder", + "metadata": {}, + "source": [ + "# `libzip` python bindings tutorial" + ] + }, + { + "cell_type": "markdown", + "id": "former-reform", + "metadata": {}, + "source": [ + "First let's choose a file we will test on." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "failing-junior", + "metadata": {}, + "outputs": [{ + "data": { + "text/plain": [ + "PosixPath('dist/libzip-0.1.dev1+g9f16643-py3-none-any.whl')" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + }], + "source": [ + "from pathlib import Path, PurePath\n", + "testFile = next(iter(Path(\"./dist/\").glob(\"*.whl\")))\n", + "testFile" + ] + }, + { + "cell_type": "markdown", + "id": "fluid-blair", + "metadata": {}, + "source": [ + "You need to import `libzip` in order to use it. We don't put everything into the root, so import from the submodules. In order to open an archive you need `libzip.Archive.Archive` and `libzip.enums.OpenFlags`." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "involved-partnership", + "metadata": {}, + "outputs": [], + "source": [ + "import libzip\n", + "from libzip.Archive import Archive\n", + "from libzip.enums import OpenFlags, CompressionMethod, EncryptionMethod" + ] + }, + { + "cell_type": "markdown", + "id": "universal-period", + "metadata": {}, + "source": [ + "Since we are dealing with C resources that must be opened and closed, almost everything is a context manager.\n", + "\n", + "In order to open an archive construct `Archive` class with the needed arguments.\n", + "In order to spread our stuff among different cells we'll `__enter__` the object manually." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "mobile-sleep", + "metadata": {}, + "outputs": [{ + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + }], + "source": [ + "a = Archive(testFile, OpenFlags.read_write | OpenFlags.check).__enter__()\n", + "a" + ] + }, + { + "cell_type": "markdown", + "id": "prescribed-juice", + "metadata": {}, + "source": [ + "Let's see the which files the archive contains. The archive is an iterator, so let's pass it to a constructor for a collection. Since there are pretty a lot of them, let's limit the shown by some small amount." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "assured-dispatch", + "metadata": {}, + "outputs": [{ + "data": { + "text/plain": [ + "[ExistingFile(, 0, PurePosixPath('libzip/Error.py'), , None),\n", + " ExistingFile(, 1, PurePosixPath('libzip/Stat.py'), , None),\n", + " ExistingFile(, 2, PurePosixPath('libzip/__init__.py'), , None)]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + }], + "source": [ + "fs = list(a)\n", + "fs[:3]" + ] + }, + { + "cell_type": "markdown", + "id": "labeled-cartoon", + "metadata": {}, + "source": [ + "We can get a file by its index ..." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "distinguished-custody", + "metadata": {}, + "outputs": [{ + "data": { + "text/plain": [ + "ExistingFile(, 1, PurePosixPath('libzip/Stat.py'), , None)" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + }], + "source": [ + "a[1]" + ] + }, + { + "cell_type": "markdown", + "id": "identical-building", + "metadata": {}, + "source": [ + "... or by name:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "exposed-parent", + "metadata": {}, + "outputs": [{ + "data": { + "text/plain": [ + "ExistingFile(, 1, PurePosixPath('libzip/Stat.py'), , None)" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + }], + "source": [ + "a[PurePath('libzip/Stat.py')]" + ] + }, + { + "cell_type": "markdown", + "id": "preceding-marketplace", + "metadata": {}, + "source": [ + "If we ask for a nonexistent file, we get an error:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "noble-outdoors", + "metadata": {}, + "outputs": [{ + "name": "stdout", + "output_type": "stream", + "text": [ + "(\"File with this index doesn't exist in the archive\", 100500)\n", + "('File not found in the archive', 'blah')\n" + ] + }], + "source": [ + "try:\n", + " a[100500]\n", + "except KeyError as ex:\n", + " print(ex)\n", + "\n", + "try:\n", + " a[\"blah\"]\n", + "except KeyError as ex:\n", + " print(ex)" + ] + }, + { + "cell_type": "markdown", + "id": "accepted-christopher", + "metadata": {}, + "source": [ + "Let's examine the first file" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "leading-database", + "metadata": {}, + "outputs": [{ + "data": { + "text/plain": [ + "Stat, encryptionMethod=>" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "f = fs[0]\n", + "display(f.stat, f.compressionMethod, f.encryptionMethod)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "piano-device", + "metadata": {}, + "outputs": [{ + "data": { + "text/plain": [ + "SubscriptableDateTime(2022, 6, 20, 18, 34, 52)" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "(libzip.Stat.SubscriptableDateTime, datetime.datetime, datetime.date, object)" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "display(f.modTime, f.modTime.__class__.__mro__)" + ] + }, + { + "cell_type": "markdown", + "id": "wooden-preference", + "metadata": {}, + "source": [ + "By setting some properties we can edit the archive. Let's change the date." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "sealed-hampshire", + "metadata": {}, + "outputs": [{ + "data": { + "text/plain": [ + "b'blah blah blah'" + ] + }, + "metadata": {}, + "output_type": "display_data" + }], + "source": [ + "from datetime import datetime\n", + "f.modTime = datetime(1970, 1, 1, 0, 0, 0)\n", + "f.comment = \"blah blah blah\"\n", + "display(f.comment)" + ] + }, + { + "cell_type": "markdown", + "id": "fluid-silver", + "metadata": {}, + "source": [ + "Let's recompress. When just setting a property, maximum compression is used." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "following-universe", + "metadata": {}, + "outputs": [], + "source": [ + "f.compressionMethod = CompressionMethod.bzip2" + ] + }, + { + "cell_type": "markdown", + "id": "discrete-knight", + "metadata": {}, + "source": [ + "To use another preset, use a method" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "colored-prisoner", + "metadata": {}, + "outputs": [], + "source": [ + "f.setCompression(CompressionMethod.bzip2, 1)" + ] + }, + { + "cell_type": "markdown", + "id": "casual-repeat", + "metadata": {}, + "source": [ + "For encryption you must specify the password, so we cannot encrypt by using just a prop." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "challenging-signal", + "metadata": {}, + "outputs": [{ + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + }], + "source": [ + "f.setEncryption(EncryptionMethod.aes_128, \"correct horse battery staple\")\n", + "f.encryptionMethod" + ] + }, + { + "cell_type": "markdown", + "id": "bound-priest", + "metadata": {}, + "source": [ + "To add or replace a file you need to create a new one. It is created from `Source`." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "wrapped-bangladesh", + "metadata": {}, + "outputs": [], + "source": [ + "from libzip.Source import Source\n", + "s = Source.make(b\"our new bullshit file\")\n", + "f.replace(s)" + ] + }, + { + "cell_type": "markdown", + "id": "declared-coffee", + "metadata": {}, + "source": [ + "To read a file enter it and use `read` method. But we cannot, we have replaced it!" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "defensive-challenge", + "metadata": {}, + "outputs": [{ + "name": "stdout", + "output_type": "stream", + "text": [ + "Doing this op to a dirty file will result in a crash\n" + ] + }], + "source": [ + "try:\n", + " ff = f.__enter__()\n", + "except RuntimeError as ex:\n", + " print(ex)" + ] + }, + { + "cell_type": "markdown", + "id": "mineral-birth", + "metadata": {}, + "source": [ + "OK, let's reset it back." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "ruled-group", + "metadata": {}, + "outputs": [], + "source": [ + "f.dirty = False" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "mathematical-morgan", + "metadata": {}, + "outputs": [{ + "data": { + "text/plain": [ + "bytearray(b'import typing\\n')" + ] + }, + "metadata": {}, + "output_type": "display_data" + }], + "source": [ + "with f as ff:\n", + " res = bytearray(f.stat.originalSize)\n", + " ff.read(res)\n", + " display(res[:14])" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "liberal-shape", + "metadata": {}, + "outputs": [], + "source": [ + "a.__exit__(None, None, None)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}