Skip to content

Commit

Permalink
Add file registry
Browse files Browse the repository at this point in the history
  • Loading branch information
barseghyanartur committed Dec 1, 2023
1 parent dcd28fe commit d487d32
Showing 1 changed file with 139 additions and 10 deletions.
149 changes: 139 additions & 10 deletions fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
import contextlib
import io
import logging
import os
import random
import re
Expand All @@ -17,6 +18,7 @@
from datetime import date, datetime, timedelta
from pathlib import Path
from tempfile import NamedTemporaryFile, gettempdir
from threading import Lock
from typing import (
Any,
Dict,
Expand All @@ -42,10 +44,13 @@
"DocxGenerator",
"FACTORY",
"FAKER",
"FILE_REGISTRY",
"Factory",
"Faker",
"FileRegistry",
"FileSystemStorage",
"GraphicPdfGenerator",
"MetaData",
"ModelFactory",
"StringValue",
"SubFactory",
Expand All @@ -55,6 +60,7 @@
"pre_save",
)

LOGGER = logging.getLogger(__name__)

PDF_TEXT_TPL_PAGE_OBJECT = """{page_num} 0 obj
<</Type /Page
Expand Down Expand Up @@ -171,8 +177,104 @@
)


class MetaData:
__slots__ = ("content",)

def __init__(self):
self.content: Optional[str] = None

def add_content(self, content: Union[str, List[str]]) -> None:
if isinstance(content, list):
self.content = "\n".join(content)
else:
self.content = content


class StringValue(str):
data: Dict[str, Any] = {}
__slots__ = ("data",)

data: Dict[str, Any]

def __new__(cls, value, *args, **kwargs):
obj = str.__new__(cls, value)
obj.data = {}
return obj


class FileRegistry:
"""Stores list `StringValue` instances.
.. code-block:: python
from fake import FAKER, FILE_REGISTRY
txt_file_1 = FAKER.txt_file()
txt_file_2 = FAKER.txt_file()
...
txt_file_n = FAKER.txt_file()
# The FileRegistry._registry would then contain this:
{
txt_file_1,
txt_file_2,
...,
txt_file_n,
}
# Clean up created files as follows:
FILE_REGISTRY.clean_up()
"""

def __init__(self):
self._registry: Set[StringValue] = set()
self._lock = Lock()

def add(self, string_value: StringValue) -> None:
with self._lock:
self._registry.add(string_value)

def remove(self, string_value: Union[StringValue, str]) -> bool:
if not isinstance(string_value, StringValue):
string_value = self.search(string_value)

if not string_value:
return False

with self._lock:
# No error if the element doesn't exist
self._registry.discard(string_value)
try:
string_value.data["storage"].unlink(
string_value.data["filename"]
)
return True
except Exception as e:
LOGGER.error(
f"Failed to unlink file "
f"{string_value.data['filename']}: {e}"
)
return False

def search(self, value: str) -> Optional[StringValue]:
with self._lock:
for string_value in self._registry:
if string_value == value:
return string_value
return None

def clean_up(self) -> None:
with self._lock:
while self._registry:
file = self._registry.pop()
try:
file.data["storage"].unlink(file.data["filename"])
except Exception as err:
LOGGER.error(
f"Failed to unlink file {file.data['filename']}: {err}"
)


FILE_REGISTRY = FileRegistry()


class BaseStorage:
Expand Down Expand Up @@ -378,6 +480,8 @@ def create(
self,
nb_pages: Optional[int] = None,
texts: Optional[List[str]] = None,
metadata: Optional[MetaData] = None,
**kwargs,
) -> bytes:
# Initialization
if not nb_pages and not texts:
Expand All @@ -391,6 +495,9 @@ def create(
self.nb_pages: int = nb_pages or 1
self.texts = self.faker.sentences(nb=self.nb_pages)

if metadata:
metadata.add_content(self.texts)

# Construction
pdf_bytes = io.BytesIO()

Expand Down Expand Up @@ -493,6 +600,7 @@ def create(
nb_pages: int = 1,
image_size: Tuple[int, int] = (100, 100),
image_color: Tuple[int, int, int] = (255, 0, 0),
**kwargs,
) -> bytes:
# Initialization
self.nb_pages = nb_pages
Expand Down Expand Up @@ -674,7 +782,10 @@ def _create_page(self, text: str, is_last_page: bool) -> str:
return page_content

def create(
self, nb_pages: Optional[int] = None, texts: Optional[List[str]] = None
self,
nb_pages: Optional[int] = None,
texts: Optional[List[str]] = None,
metadata: Optional[MetaData] = None,
) -> bytes:
if not nb_pages and not texts:
raise ValueError(
Expand All @@ -685,6 +796,9 @@ def create(
else:
texts = self.faker.sentences(nb=nb_pages) # type: ignore

if metadata:
metadata.add_content(texts)

# Construct the main document content
document_content = DOCX_TPL_DOC_HEADER
for i, page_text in enumerate(texts):
Expand Down Expand Up @@ -1001,11 +1115,12 @@ def pdf(
generator: Union[
Type[TextPdfGenerator], Type[GraphicPdfGenerator]
] = GraphicPdfGenerator,
metadata: Optional[MetaData] = None,
**kwargs,
) -> bytes:
"""Create a PDF document of a given size."""
_pdf = generator(faker=self)
return _pdf.create(nb_pages=nb_pages, **kwargs)
return _pdf.create(nb_pages=nb_pages, metadata=metadata, **kwargs)

def png(
self,
Expand Down Expand Up @@ -1218,9 +1333,10 @@ def docx(
self,
nb_pages: Optional[int] = 1,
texts: Optional[List[str]] = None,
metadata: Optional[MetaData] = None,
) -> bytes:
_docx = DocxGenerator(faker=self)
return _docx.create(nb_pages=nb_pages, texts=texts)
return _docx.create(nb_pages=nb_pages, texts=texts, metadata=metadata)

def pdf_file(
self,
Expand All @@ -1240,10 +1356,18 @@ def pdf_file(
prefix=prefix,
basename=basename,
)
data = self.pdf(nb_pages=nb_pages, generator=generator, **kwargs)
metadata = MetaData()
data = self.pdf(
nb_pages=nb_pages, generator=generator, metadata=metadata, **kwargs
)
storage.write_bytes(filename=filename, data=data)
file = StringValue(storage.relpath(filename))
file.data = {"storage": storage, "filename": filename}
file.data = {
"storage": storage,
"filename": filename,
"content": metadata.content,
}
FILE_REGISTRY.add(file)
return file

def _image_file(
Expand All @@ -1266,6 +1390,7 @@ def _image_file(
storage.write_bytes(filename=filename, data=data)
file = StringValue(storage.relpath(filename))
file.data = {"storage": storage, "filename": filename}
FILE_REGISTRY.add(file)
return file

def png_file(
Expand Down Expand Up @@ -1351,16 +1476,16 @@ def docx_file(
prefix=prefix,
basename=basename,
)
if not texts:
texts = self.sentences(nb=nb_pages) # type: ignore
data = self.docx(texts=texts)
metadata = MetaData()
data = self.docx(texts=texts, metadata=metadata)
storage.write_bytes(filename=filename, data=data)
file = StringValue(storage.relpath(filename))
file.data = {
"storage": storage,
"filename": filename,
"content": "\n".join(texts),
"content": metadata.content,
}
FILE_REGISTRY.add(file)
return file

def txt_file(
Expand Down Expand Up @@ -1389,6 +1514,7 @@ def txt_file(
"filename": filename,
"content": text,
}
FILE_REGISTRY.add(file)
return file


Expand Down Expand Up @@ -1568,6 +1694,9 @@ class TestFaker(unittest.TestCase):
def setUp(self) -> None:
self.faker = FAKER

def tearDown(self):
FILE_REGISTRY.clean_up()

def test_first_name(self) -> None:
first_name: str = self.faker.first_name()
self.assertIsInstance(first_name, str)
Expand Down

0 comments on commit d487d32

Please sign in to comment.