From d487d323b4bc12cb041a20db6ef2615b179297ab Mon Sep 17 00:00:00 2001 From: Artur Barseghyan Date: Fri, 1 Dec 2023 21:53:27 +0100 Subject: [PATCH] Add file registry --- fake.py | 149 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 139 insertions(+), 10 deletions(-) diff --git a/fake.py b/fake.py index b4edb2c..2deef92 100644 --- a/fake.py +++ b/fake.py @@ -5,6 +5,7 @@ import asyncio import contextlib import io +import logging import os import random import re @@ -17,6 +18,7 @@ from datetime import date, datetime, timedelta from pathlib import Path from tempfile import NamedTemporaryFile, gettempdir +from threading import Lock from typing import ( Any, Dict, @@ -42,10 +44,13 @@ "DocxGenerator", "FACTORY", "FAKER", + "FILE_REGISTRY", "Factory", "Faker", + "FileRegistry", "FileSystemStorage", "GraphicPdfGenerator", + "MetaData", "ModelFactory", "StringValue", "SubFactory", @@ -55,6 +60,7 @@ "pre_save", ) +LOGGER = logging.getLogger(__name__) PDF_TEXT_TPL_PAGE_OBJECT = """{page_num} 0 obj < None: + if isinstance(content, list): + self.content = "\n".join(content) + else: + self.content = content + + class StringValue(str): - data: Dict[str, Any] = {} + __slots__ = ("data",) + + data: Dict[str, Any] + + def __new__(cls, value, *args, **kwargs): + obj = str.__new__(cls, value) + obj.data = {} + return obj + + +class FileRegistry: + """Stores list `StringValue` instances. + + .. code-block:: python + + from fake import FAKER, FILE_REGISTRY + + txt_file_1 = FAKER.txt_file() + txt_file_2 = FAKER.txt_file() + ... + txt_file_n = FAKER.txt_file() + + # The FileRegistry._registry would then contain this: + { + txt_file_1, + txt_file_2, + ..., + txt_file_n, + } + + # Clean up created files as follows: + FILE_REGISTRY.clean_up() + """ + + def __init__(self): + self._registry: Set[StringValue] = set() + self._lock = Lock() + + def add(self, string_value: StringValue) -> None: + with self._lock: + self._registry.add(string_value) + + def remove(self, string_value: Union[StringValue, str]) -> bool: + if not isinstance(string_value, StringValue): + string_value = self.search(string_value) + + if not string_value: + return False + + with self._lock: + # No error if the element doesn't exist + self._registry.discard(string_value) + try: + string_value.data["storage"].unlink( + string_value.data["filename"] + ) + return True + except Exception as e: + LOGGER.error( + f"Failed to unlink file " + f"{string_value.data['filename']}: {e}" + ) + return False + + def search(self, value: str) -> Optional[StringValue]: + with self._lock: + for string_value in self._registry: + if string_value == value: + return string_value + return None + + def clean_up(self) -> None: + with self._lock: + while self._registry: + file = self._registry.pop() + try: + file.data["storage"].unlink(file.data["filename"]) + except Exception as err: + LOGGER.error( + f"Failed to unlink file {file.data['filename']}: {err}" + ) + + +FILE_REGISTRY = FileRegistry() class BaseStorage: @@ -378,6 +480,8 @@ def create( self, nb_pages: Optional[int] = None, texts: Optional[List[str]] = None, + metadata: Optional[MetaData] = None, + **kwargs, ) -> bytes: # Initialization if not nb_pages and not texts: @@ -391,6 +495,9 @@ def create( self.nb_pages: int = nb_pages or 1 self.texts = self.faker.sentences(nb=self.nb_pages) + if metadata: + metadata.add_content(self.texts) + # Construction pdf_bytes = io.BytesIO() @@ -493,6 +600,7 @@ def create( nb_pages: int = 1, image_size: Tuple[int, int] = (100, 100), image_color: Tuple[int, int, int] = (255, 0, 0), + **kwargs, ) -> bytes: # Initialization self.nb_pages = nb_pages @@ -674,7 +782,10 @@ def _create_page(self, text: str, is_last_page: bool) -> str: return page_content def create( - self, nb_pages: Optional[int] = None, texts: Optional[List[str]] = None + self, + nb_pages: Optional[int] = None, + texts: Optional[List[str]] = None, + metadata: Optional[MetaData] = None, ) -> bytes: if not nb_pages and not texts: raise ValueError( @@ -685,6 +796,9 @@ def create( else: texts = self.faker.sentences(nb=nb_pages) # type: ignore + if metadata: + metadata.add_content(texts) + # Construct the main document content document_content = DOCX_TPL_DOC_HEADER for i, page_text in enumerate(texts): @@ -1001,11 +1115,12 @@ def pdf( generator: Union[ Type[TextPdfGenerator], Type[GraphicPdfGenerator] ] = GraphicPdfGenerator, + metadata: Optional[MetaData] = None, **kwargs, ) -> bytes: """Create a PDF document of a given size.""" _pdf = generator(faker=self) - return _pdf.create(nb_pages=nb_pages, **kwargs) + return _pdf.create(nb_pages=nb_pages, metadata=metadata, **kwargs) def png( self, @@ -1218,9 +1333,10 @@ def docx( self, nb_pages: Optional[int] = 1, texts: Optional[List[str]] = None, + metadata: Optional[MetaData] = None, ) -> bytes: _docx = DocxGenerator(faker=self) - return _docx.create(nb_pages=nb_pages, texts=texts) + return _docx.create(nb_pages=nb_pages, texts=texts, metadata=metadata) def pdf_file( self, @@ -1240,10 +1356,18 @@ def pdf_file( prefix=prefix, basename=basename, ) - data = self.pdf(nb_pages=nb_pages, generator=generator, **kwargs) + metadata = MetaData() + data = self.pdf( + nb_pages=nb_pages, generator=generator, metadata=metadata, **kwargs + ) storage.write_bytes(filename=filename, data=data) file = StringValue(storage.relpath(filename)) - file.data = {"storage": storage, "filename": filename} + file.data = { + "storage": storage, + "filename": filename, + "content": metadata.content, + } + FILE_REGISTRY.add(file) return file def _image_file( @@ -1266,6 +1390,7 @@ def _image_file( storage.write_bytes(filename=filename, data=data) file = StringValue(storage.relpath(filename)) file.data = {"storage": storage, "filename": filename} + FILE_REGISTRY.add(file) return file def png_file( @@ -1351,16 +1476,16 @@ def docx_file( prefix=prefix, basename=basename, ) - if not texts: - texts = self.sentences(nb=nb_pages) # type: ignore - data = self.docx(texts=texts) + metadata = MetaData() + data = self.docx(texts=texts, metadata=metadata) storage.write_bytes(filename=filename, data=data) file = StringValue(storage.relpath(filename)) file.data = { "storage": storage, "filename": filename, - "content": "\n".join(texts), + "content": metadata.content, } + FILE_REGISTRY.add(file) return file def txt_file( @@ -1389,6 +1514,7 @@ def txt_file( "filename": filename, "content": text, } + FILE_REGISTRY.add(file) return file @@ -1568,6 +1694,9 @@ class TestFaker(unittest.TestCase): def setUp(self) -> None: self.faker = FAKER + def tearDown(self): + FILE_REGISTRY.clean_up() + def test_first_name(self) -> None: first_name: str = self.faker.first_name() self.assertIsInstance(first_name, str)