From 1f9433a50cfe0c509b842f45fd71754b2bfa5d6c Mon Sep 17 00:00:00 2001 From: parisa-zahedi Date: Tue, 25 Jun 2024 14:51:28 +0200 Subject: [PATCH 1/4] change repo_name --- .github/workflows/python-package.yml | 2 +- dataQuest/__init__.py | 0 dataQuest/article_final_selection/__init__.py | 0 .../article_selector.py | 56 +++++ .../process_article.py | 94 ++++++++ .../process_articles.py | 102 +++++++++ dataQuest/filter/__init__.py | 7 + dataQuest/filter/delpher_kranten.py | 118 ++++++++++ dataQuest/filter/document.py | 146 ++++++++++++ dataQuest/filter/document_filter.py | 208 ++++++++++++++++++ dataQuest/filter/input_file.py | 119 ++++++++++ dataQuest/models/base.py | 20 ++ dataQuest/models/tfidf.py | 101 +++++++++ dataQuest/output_generator/text_formater.py | 117 ++++++++++ dataQuest/preprocessor/__init__.py | 1 + dataQuest/preprocessor/parser.py | 207 +++++++++++++++++ dataQuest/preprocessor/text_cleaner.py | 129 +++++++++++ dataQuest/settings.py | 10 + dataQuest/temporal_categorization/__init__.py | 8 + .../timestamped_data.py | 123 +++++++++++ dataQuest/utils.py | 174 +++++++++++++++ scripts/convert_input_files.py | 2 +- scripts/step1_filter_articles.py | 8 +- scripts/step2_categorize_by_timestamp.py | 4 +- scripts/step3_select_final_articles.py | 6 +- scripts/step4_generate_output.py | 10 +- 26 files changed, 1756 insertions(+), 16 deletions(-) create mode 100644 dataQuest/__init__.py create mode 100644 dataQuest/article_final_selection/__init__.py create mode 100644 dataQuest/article_final_selection/article_selector.py create mode 100644 dataQuest/article_final_selection/process_article.py create mode 100644 dataQuest/article_final_selection/process_articles.py create mode 100644 dataQuest/filter/__init__.py create mode 100644 dataQuest/filter/delpher_kranten.py create mode 100644 dataQuest/filter/document.py create mode 100644 dataQuest/filter/document_filter.py create mode 100644 dataQuest/filter/input_file.py create mode 100644 dataQuest/models/base.py create mode 100644 dataQuest/models/tfidf.py create mode 100644 dataQuest/output_generator/text_formater.py create mode 100644 dataQuest/preprocessor/__init__.py create mode 100644 dataQuest/preprocessor/parser.py create mode 100644 dataQuest/preprocessor/text_cleaner.py create mode 100644 dataQuest/settings.py create mode 100644 dataQuest/temporal_categorization/__init__.py create mode 100644 dataQuest/temporal_categorization/timestamped_data.py create mode 100644 dataQuest/utils.py diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 4a5cb18..4fc8315 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -18,7 +18,7 @@ on: # Replace package-name with your package name env: - PACKAGE_NAME: interest + PACKAGE_NAME: dataQuest jobs: build: diff --git a/dataQuest/__init__.py b/dataQuest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dataQuest/article_final_selection/__init__.py b/dataQuest/article_final_selection/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dataQuest/article_final_selection/article_selector.py b/dataQuest/article_final_selection/article_selector.py new file mode 100644 index 0000000..c94ab9b --- /dev/null +++ b/dataQuest/article_final_selection/article_selector.py @@ -0,0 +1,56 @@ +"""Module containing the ArticleSelector class for selecting articles based on +similarity scores.""" + +from typing import List, Dict, Union + + +class ArticleSelector: + """Class for selecting articles based on similarity scores and + configuration parameters.""" + # pylint: disable=too-few-public-methods + + def __init__(self, similarity_scores: List[float], + config: Dict[str, Union[str, float, int]]): + """Initializes the ArticleSelector object. + + Args: + similarity_scores (List[float]): A list of similarity scores + between keywords and articles. + config (Dict[str, Union[str, float, int]]): A dictionary containing + configuration parameters for selecting articles. + """ + self.similarity_scores = similarity_scores + self.config = config + + def select_articles(self) -> List[int]: + """Selects articles based on the configured selection method and value. + + Returns: + List[int]: A list of indices of selected articles. + """ + sorted_indices = sorted( + range(len(self.similarity_scores)), + key=lambda i: self.similarity_scores[i], + reverse=True + ) + + selected_indices: List[int] = [] + if self.config["type"] == "threshold": + threshold = float(self.config["value"]) + selected_indices.extend( + i for i, score in enumerate(self.similarity_scores) + if score >= threshold + ) + elif self.config["type"] == "num_articles": + num_articles = int(self.config["value"]) + selected_indices.extend(sorted_indices[:num_articles]) + + elif self.config["type"] == "percentage": + percentage = float(self.config["value"]) + num_articles = int(len(self.similarity_scores) * + (percentage / 100.0)) + num_articles = len(self.similarity_scores) if num_articles == 0\ + else num_articles + selected_indices.extend(sorted_indices[:num_articles]) + + return selected_indices diff --git a/dataQuest/article_final_selection/process_article.py b/dataQuest/article_final_selection/process_article.py new file mode 100644 index 0000000..f78036a --- /dev/null +++ b/dataQuest/article_final_selection/process_article.py @@ -0,0 +1,94 @@ +""" Module for processing articles from gzip files.""" +import gzip +import json +import logging +from typing import List, Union, Tuple +from dataQuest.preprocessor.text_cleaner import TextCleaner + +text_cleaner = TextCleaner() + + +def clean(text: Union[str, List[str]]) -> str: + """ + Clean the input text using TextCleaner. + + Args: + text (str): The input text to clean. + + Returns: + str: The cleaned text. + """ + return text_cleaner.preprocess(text) + +# pylint: disable=too-few-public-methods + + +class ArticleProcessor: + """ + Process individual articles from gzip files. + + This class handles the processing of individual articles from + gzip files. + It reads the content of the article, cleans it using TextCleaner, and + determines whether the article contains any keywords of interests in + the title. + """ + def __init__(self, gzip_file_path: str, article_id: int): + """ + Initialize ArticleProcessor with the gzip file path and article ID. + + Args: + gzip_file_path (str): The path to the gzip file. + article_id (int): The ID of the article. + """ + self._file_path = gzip_file_path + self._article_id = article_id + self._title: Union[str, None] = '' + self._body: Union[str, list, None] = '' + self.selected: bool = False + + def read_article_from_gzip(self) -> ( + Tuple)[Union[str, None], Union[List[str], None], Union[str, None]]: + """ + Read article content from a gzip file. + + Returns: + Tuple[Union[str, None], Union[list, None], Union[str, None]]: + A tuple containing the title, body, and date of the article. + """ + try: + with gzip.open(self._file_path, 'rt') as f: + data = json.load(f) + metadata = data.get('newsletter_metadata', {}) + date = metadata.get('date', {}) + articles = data.get('articles', {}) + article = articles.get(str(self._article_id), {}) + title = article.get('title', {}) + body = article.get('body', {}) + return title, body, date + except Exception as e: # pylint: disable=broad-except + logging.error("Error reading article %s from %s: %s", + str(self._article_id), self._file_path, e) + return None, None, None + + def process_article(self, clean_keywords: List[str]) -> str: + """ + Process the article content. + + Args: + clean_keywords (List[str]): A list of clean keywords. + + Returns: + str: The processed article body. + """ + self._title, self._body, _ = self.read_article_from_gzip() + if (self._title is None) or (self._body is None): + return "" + clean_title = clean(self._title) + title_with_keyword = any(keyword in clean_title + for keyword in clean_keywords) + if title_with_keyword: + self.selected = True + return "" + + return clean(self._body) diff --git a/dataQuest/article_final_selection/process_articles.py b/dataQuest/article_final_selection/process_articles.py new file mode 100644 index 0000000..d1f1419 --- /dev/null +++ b/dataQuest/article_final_selection/process_articles.py @@ -0,0 +1,102 @@ +""" +This module contains functions for selecting articles based on keywords +and similarity scores. +""" +from typing import List, Tuple, Dict, Union +import pandas as pd +from sklearn.metrics.pairwise import cosine_similarity +from dataQuest.models.tfidf import TfidfEmbedder +from dataQuest.article_final_selection.process_article import ArticleProcessor +from dataQuest.article_final_selection.process_article import clean +from dataQuest.article_final_selection.article_selector import ArticleSelector + + +def process_articles(articles_filepath: str, clean_keywords: List[str]) -> ( + Tuple)[List[str], List[int]]: + """ + Process articles from a CSV file. + + Args: + articles_filepath (str): The path to the CSV file containing articles. + clean_keywords (List[str]): A list of clean keywords. + + Returns: + Tuple[List[str], List[int]]: A tuple containing the processed article + bodies and selected indices. + """ + articles_df = pd.read_csv(articles_filepath) + article_bodies: List[str] = [] + selected_indices: List[int] = [] + for index, row in articles_df.iterrows(): + article_processor = ArticleProcessor(row['file_path'], + row['article_id']) + processed_article_body = article_processor.process_article( + clean_keywords) + if article_processor.selected: + selected_indices.append(int(str(index))) + elif processed_article_body != "": + article_bodies.append(processed_article_body) + return article_bodies, selected_indices + + +def apply_tfidf_similarity(documents: List[str], keywords: List[str]) -> ( + List)[float]: + """ + Apply TF-IDF similarity between documents and keywords. + + Args: + documents (List[str]): A list of document bodies. + keywords (List[str]): A list of keywords. + + Returns: + List[float]: A list of similarity scores. + """ + model = TfidfEmbedder(ngram_max=1, norm="l1", sublinear_tf=False, min_df=1, + max_df=1.0) + keywords_list = [" ".join(keywords)] + model.fit(documents) + embeddings_documents = model.transform(documents).tocsr() + embeddings_keywords = model.transform(keywords_list).tocsr() + similarity_scores = cosine_similarity(embeddings_keywords, + embeddings_documents) + return similarity_scores[0] + + +def select_top_articles(similarity_scores: List[float], + config: Dict[str, Union[str, float, int]]) \ + -> List[int]: + """ + Select top articles based on similarity scores and configuration. + + Args: + similarity_scores (List[float]): A list of similarity scores. + config (Dict[str, str]): Configuration for selecting articles. + + Returns: + List[int]: A list of selected article indices. + """ + selector = ArticleSelector(similarity_scores, config) + selected_indices = selector.select_articles() + return selected_indices + + +def select_articles(articles_filepath: str, keywords: List[str], + config: Dict[str, Union[str, float, int]]) -> List[int]: + """ + Select articles based on keywords, similarity scores, and configuration. + + Args: + articles_filepath (str): The path to the CSV file containing articles. + keywords (List[str]): A list of keywords. + config (Dict[str, str]): Configuration for selecting articles. + + Returns: + List[int]: A list of selected article indices. + """ + clean_keywords = [clean(keyword) for keyword in keywords] + article_bodies, selected_indices = process_articles(articles_filepath, + clean_keywords) + similarity_scores = apply_tfidf_similarity(article_bodies, clean_keywords) + indices = select_top_articles(similarity_scores, config) + selected_indices.extend(indices) + return selected_indices diff --git a/dataQuest/filter/__init__.py b/dataQuest/filter/__init__.py new file mode 100644 index 0000000..1351c2a --- /dev/null +++ b/dataQuest/filter/__init__.py @@ -0,0 +1,7 @@ +"""define input-file type""" +from dataQuest.filter.delpher_kranten import KrantenFile + +INPUT_FILE_TYPES = { + "delpher_kranten": KrantenFile + +} diff --git a/dataQuest/filter/delpher_kranten.py b/dataQuest/filter/delpher_kranten.py new file mode 100644 index 0000000..f3544ff --- /dev/null +++ b/dataQuest/filter/delpher_kranten.py @@ -0,0 +1,118 @@ +""" +Delpher Kranten Module + +This module provides classes and functions for handling Delpher Kranten files. +""" + +import json +import logging +import os +from typing import Optional +from dataQuest.filter.document import Document, Article +from dataQuest.filter.input_file import InputFile + + +class KrantenFile(InputFile): + """ + An InputFile implementation for Delpher Kranten. + + Input is a zip file which includes one JSON file. The JSON file contains + metadata and articles from one issue of a newspaper. + + Attributes: + METADATA_FIELD (str): The key for metadata field in JSON data. + TITLE_FIELD (str): The key for title field in metadata. + DATE_FIELD (str): The key for date field in metadata. + LANGUAGE_FIELD (str): The key for language field in metadata. + ARTICLES_FIELD (str): The key for articles field in JSON data. + ARTICLE_TITLE_FIELD (str): The key for title field in an article. + ARTICLE_BODY_FIELD (str): The key for body field in an article. + ENCODING (str): The encoding format for reading the file. + + Methods: + read_json(json_file): Read JSON data from a file and parse it into + a Document object. + base_file_name(): Extract the base file name without extension from + the filepath. + doc(): Read the directory and parse the JSON file into a Document + object. + """ + + METADATA_FIELD = "newsletter_metadata" + TITLE_FIELD = "title" + DATE_FIELD = "date" + LANGUAGE_FIELD = "language" + ARTICLES_FIELD = "articles" + ARTICLE_TITLE_FIELD = "title" + ARTICLE_BODY_FIELD = "body" + ENCODING = "utf-8" + + def read_json(self, json_file) -> Optional[Document]: + """ + Read JSON data from a file and parse it into a Document object. + + Args: + json_file: A file object containing JSON data. + + Returns: + Optional[Document]: A Document object parsed from + the JSON data, or None if parsing fails. + """ + try: + json_data = json.load(json_file) + metadata = json_data[self.METADATA_FIELD] + document_title = metadata[self.TITLE_FIELD] + publish_date = metadata[self.DATE_FIELD] + language = metadata[self.LANGUAGE_FIELD] + + articles_data = json_data[self.ARTICLES_FIELD] + + articles = [] + for article_id, article in articles_data.items(): + article_title = article[self.ARTICLE_TITLE_FIELD] + article_body = article[self.ARTICLE_BODY_FIELD] + article = Article(article_id=article_id, title=article_title, + body=article_body) + articles.append(article) + + document = Document(title=document_title, + publish_date=publish_date, + language=language, + articles=articles) + return document + + except (json.JSONDecodeError, KeyError) as e: + logging.error("Error parsing JSON data: %s", e) + return None + + def base_file_name(self) -> str: + """ + Extract the base file name without extension from the filepath. + + Returns: + str: The base file name without extension. + """ + file_name_json = os.path.splitext(os.path.basename(self.filepath))[0] + base_file_name = os.path.splitext(file_name_json)[0] + return base_file_name + + def doc(self) -> Optional[Document]: + """ + Read the directory and parse the JSON file into a Document + object. + + Returns: + Optional[Document]: A Document object parsed from the + JSON data, or None if parsing fails. + """ + try: + logging.info("Reading directory '%s'...", self._filepath) + fh = self.open(encoding=self.ENCODING) + document = self.read_json(fh) + fh.close() + return document + + except OSError as e: + logging.error("Error processing gzip file '%s': %s", + self._filepath, e) + return None diff --git a/dataQuest/filter/document.py b/dataQuest/filter/document.py new file mode 100644 index 0000000..eb3b1d3 --- /dev/null +++ b/dataQuest/filter/document.py @@ -0,0 +1,146 @@ +# pylint: disable=too-few-public-methods +""" +This module defines the Document class, which represents a document +containing articles. +""" +import logging +from typing import Optional, List, Union +from datetime import datetime + + +class Article: + """A class representing an article. + + This class represents an article with an ID, title, and body text. + The body text can be provided as a list + of paragraphs, which will be joined into a single string. + + Attributes: + id (str): The unique identifier of the article. + title (str): The title of the article. + body (str): The body text of the article, represented as + a single string. + """ + def __init__(self, article_id: str, title: str, + body: Union[str, List[str]]) -> None: + """Initialize an Article object with the given ID, title, and body. + + Args: + id (str): The unique identifier of the article. + title (str): The title of the article. + body (Union[str, List[str]): The body text of the article, + provided as a list of paragraphs. + """ + self.id = article_id + self.title = title + if isinstance(body, list): + if any(item is None for item in body): + logging.warning("There is a None value in body") + self.text = "" + else: + article_body = '\n'.join(body) + self.text = article_body + else: + self.text = body + + +class Document: + """ + Represents a document containing articles. + + Args: + title (str): The title of the document. + publish_date (str): The publication date of the document in + the format 'YYYY-MM-DD'. + language (str): The language of the document. + articles (List[Article]): A list of articles included in + the document. + + Attributes: + _title (str): The title of the document. + _publish_date (str): The publication date of the document in + the format 'YYYY-MM-DD'. + _year (Optional[int]): The year of publication, extracted from + publish_date. + _language (str): The language of the document. + _articles (List[Article]): A list of articles included in the + document. + + Properties: + title (str): Getter for the title of the document. + publish_date (str): Getter for the publication date of the + document. + year (Optional[int]): Getter for the year of publication. + decade (Optional[int]): Getter for the decade of publication. + language (str): Getter for the language of the document. + articles (List[Article]): Getter for the list of articles + included in the document. + """ + def __init__(self, title: str, publish_date: str, language: str, + articles: List[Article]) -> None: + self._year: Optional[int] = None + self._articles = articles + self._title = title + self._publish_date = publish_date + self._language = language + + @property + def title(self) -> str: + """ + Getter for the title of the document. + + Returns: + str: The title of the document. + """ + return self._title + + @property + def publish_date(self) -> str: + """ + Getter for the publish_date of the document. + + Returns: + str: The publish_date of the document. + """ + return self._publish_date + + @property + def year(self) -> Optional[int]: + """ + Getter for the year of publication. + + Returns: + Optional[int]: The year of publication extracted + from publish_date, or None if it cannot be determined. + """ + if self._year is not None: + return self._year + try: + date_obj = datetime.strptime(self._publish_date, '%Y-%m-%d') + self._year = date_obj.year + return self._year + except ValueError: + return None + + @property + def decade(self) -> Optional[int]: + """ + Getter for the decade of publication. + + Returns: + Optional[int]: The decade of publication extracted from + publish_date, + or None if it cannot be determined. + """ + _ = self.year + return int(self._year / 10) * 10 if self._year is not None else None + + @property + def articles(self) -> List[Article]: + """ + Getter for the list of articles included in the document. + + Returns: + List[Article]: The list of articles included in the document. + """ + return self._articles diff --git a/dataQuest/filter/document_filter.py b/dataQuest/filter/document_filter.py new file mode 100644 index 0000000..ee5bb8c --- /dev/null +++ b/dataQuest/filter/document_filter.py @@ -0,0 +1,208 @@ +""" +Document Filter Module +This module provides classes for filtering documents and articles. +""" +from abc import ABC, abstractmethod +from typing import List +from dataQuest.filter.document import Document, Article + + +class DocumentFilter(ABC): + """ + Abstract base class for document filters. + + Methods: + filter_document(document: Document) -> bool: Abstract method + to filter documents. + filter_article(article: Article) -> bool: Method to filter + articles. + """ + @abstractmethod + def filter_document(self, document: Document) -> bool: + """ + Abstract method to filter documents. + + Args: + document (Document): The document to be filtered. + + Returns: + bool: True if the document passes the filter, + False otherwise. + """ + return NotImplemented + + def filter_article(self, _article: Article) -> bool: + """ + Method to filter articles. + + By default, returns True, allowing all articles to + pass through. + + Args: + _article (Article): The article to be filtered. + + Returns: + bool: True if the article passes the filter, + False otherwise. + """ + return True + + +class TitleFilter(DocumentFilter): + """ + Filter documents by title. + + Attributes: + title (str): The title to filter by. + """ + def __init__(self, title: str): + self.title = title + + def filter_document(self, document: Document) -> bool: + """ + Filter documents by title. + + Args: + document (Document): The document to be filtered. + + Returns: + bool: True if the document's title contains the specified + title, False otherwise. + """ + return self.title in document.title + + +class YearFilter(DocumentFilter): + """ + Filter documents by year. + + Attributes: + year (int): The year to filter by. + """ + def __init__(self, year: int): + self.year = year + + def filter_document(self, document: Document) -> bool: + """ + Filter documents by year. + + Args: + document (Document): The document to be filtered. + + Returns: + bool: True if the document's year matches the specified + year, False otherwise. + """ + return document.year == self.year + + +class DecadeFilter(DocumentFilter): + """ + Filter documents by decade. + + Attributes: + decade (int): The decade to filter by. + """ + def __init__(self, decade: int): + self.decade = decade + + def filter_document(self, document: Document) -> bool: + """ + Filter documents by decade. + + Args: + document (Document): The document to be filtered. + + Returns: + bool: True if the document's decade matches the + specified decade, False otherwise. + """ + return document.decade == self.decade + + +class KeywordsFilter(DocumentFilter): + """ + Filter documents and articles by keywords. + + Attributes: + keywords (List[str]): The list of keywords to filter by. + """ + def __init__(self, keywords: List[str]): + self.keywords = keywords + + def filter_document(self, document: Document) -> bool: + """ + Filter documents by keywords. + + Args: + document (Document): The document to be filtered. + + Returns: + bool: Always returns True. + """ + return True + + def filter_article(self, article: Article) -> bool: + """ + Filter articles by keywords. + + Args: + article (Article): The article to be filtered. + + Returns: + bool: True if the article's title or text contains any + of the specified keywords, False otherwise. + """ + return any(keyword in article.title or keyword in article.text for + keyword in self.keywords) + + +class CompoundFilter(DocumentFilter): + """ + Compound filter combining multiple filters. + + Attributes: + filters (List[DocumentFilter]): The list of filters to apply. + """ + def __init__(self, filters: List[DocumentFilter]): + self.filters = filters + + def filter_document(self, document: Document) -> bool: + """ + Filter documents by applying all filters. + + Args: + document (Document): The document to be filtered. + + Returns: + bool: True if the document passes all filters, + False otherwise. + """ + return all(filter_.filter_document(document) + for filter_ in self.filters) + + def filter_article(self, article: Article) -> bool: + """ + Filter articles by applying all filters. + + Args: + article (Article): The article to be filtered. + + Returns: + bool: True if the article passes all filters, + False otherwise. + """ + return all(filter_.filter_article(article) for filter_ in self.filters) + + def include_keyword_filter(self) -> bool: + """ + Check if the compound filter includes a KeywordsFilter. + + Returns: + bool: True if the compound filter includes a + KeywordsFilter, False otherwise. + """ + for filter_ in self.filters: + if isinstance(filter_, KeywordsFilter): + return True + return False diff --git a/dataQuest/filter/input_file.py b/dataQuest/filter/input_file.py new file mode 100644 index 0000000..fc799fb --- /dev/null +++ b/dataQuest/filter/input_file.py @@ -0,0 +1,119 @@ +""" +Input File Module +This module provides an abstract class for representing various input files. +""" + +import abc +import gzip +import logging +from pathlib import Path +from typing import Iterable, TextIO, cast, Optional +from dataQuest.filter.document import Document, Article +from dataQuest.filter.document_filter import DocumentFilter + + +class InputFile(abc.ABC): + """ + Abstract class for representing various input files. + + Attributes: + _filepath (Path): The file path of the input file. + + Methods: + __init__(filepath): Initialize the InputFile with a file path. + filepath(): Get the file path of the input file. + base_file_name(): Output a list of documents in the input file. + open(mode, encoding): Open the input file for reading. + articles(): Return all articles for the document found in the + input file. + doc(): Output a list of documents in the input file. + """ + + def __init__(self, filepath: Path) -> None: + """ + Initialize the InputFile with a file path. + + Args: + filepath (Path): The file path of the input file. + """ + self._filepath = filepath + + @property + def filepath(self) -> Path: + """ + Get the file path of the input file. + + Returns: + Path: The file path of the input file. + """ + return self._filepath + + @abc.abstractmethod + def base_file_name(self) -> str: + """ + Output a list of documents in the input file. + + This can be a singleton list if an input file contains only + one document. + + Returns: + str: The base file name without extension. + """ + return NotImplemented + + def open(self, mode: str = "rt", encoding=None) -> TextIO: + """ + Open the input file for reading. + + Args: + mode (str): The file open mode. + encoding: The encoding format. + + Returns: + TextIO: A file object for reading the input file. + """ + if self._filepath.suffix.startswith(".gz"): + return cast(TextIO, gzip.open(self._filepath, mode=mode, + encoding=encoding)) + + # Default to text file + return cast(TextIO, open(self._filepath, + mode=mode, encoding=encoding)) + + # pylint: disable=no-member + def articles(self) -> Iterable[Article]: + """ + Return all articles for the document found in the input file. + + Yields: + Article: An article object. + """ + doc = self.doc() + if doc is not None: + yield from doc.articles + else: + logging.error("Document not found or is None for filepath: %s", + self.filepath) + return + + @abc.abstractmethod + def doc(self) -> Optional[Document]: + """ + Output a list of documents in the input file. + + This can be a singleton list if an input file contains only + one document. + + Returns: + Document: A document object. + """ + return NotImplemented + + def selected_articles(self, filter: DocumentFilter) -> Iterable[Article]: + document = self.doc() + if document is not None: + if filter.filter_document(document): + if document.articles is not None: + for article in document.articles: + if filter.filter_article(article): + yield article diff --git a/dataQuest/models/base.py b/dataQuest/models/base.py new file mode 100644 index 0000000..64832a1 --- /dev/null +++ b/dataQuest/models/base.py @@ -0,0 +1,20 @@ +"""Base class for document embeddings.""" + +from abc import ABC, abstractmethod +from typing import Union, Sequence +import scipy +from numpy import typing as npt +import numpy as np + + +class BaseEmbedder(ABC): + """Base class for creating document embeddings.""" + + @abstractmethod + def fit(self, documents: Sequence[str]) -> None: + """Train the model on documents.""" + + @abstractmethod + def transform(self, documents: Union[str, Sequence[str]]) -> ( + Union)[scipy.sparse.spmatrix, npt.NDArray[np.float_]]: + """Get the embedding for a document.""" diff --git a/dataQuest/models/tfidf.py b/dataQuest/models/tfidf.py new file mode 100644 index 0000000..8583f07 --- /dev/null +++ b/dataQuest/models/tfidf.py @@ -0,0 +1,101 @@ +"""Sklearn TF-IDF class.""" + +from typing import Sequence, Union, Optional +import warnings + +import scipy +from sklearn.feature_extraction.text import TfidfVectorizer + +from dataQuest.models.base import BaseEmbedder +from dataQuest.utils import load_spacy_model +from dataQuest.settings import SPACY_MODEL + + +class TfidfEmbedder(BaseEmbedder): + # pylint: disable=too-many-instance-attributes + """ + Sklearn TF-IDF class. + + Arguments + --------- + ngram_max: + Maximum n-gram, higher numbers mean bigger embeddings. + norm: + Which kind of normalization is used: "l1", "l2" or None. + sublinear_tf: + Apply sublinear term-frequency scaling. + min_df: + Minimum document frequency of word to be included in the embedding. + max_df: + Maximum document frequency of word to be included in the embedding. + """ + + # pylint: disable=too-many-arguments + + def __init__( + self, ngram_max: int = 1, norm: Optional[str] = "l1", + sublinear_tf: bool = False, min_df: int = 1, + max_df: float = 1.0, spacy_model=SPACY_MODEL): + """Initialize the TF-IDF embedder.""" + self.nlp = ( + load_spacy_model(spacy_model) + if isinstance(spacy_model, str) + else spacy_model + ) + self.stopword_list = self.nlp.Defaults.stop_words + self.stop_words = list(self.stopword_list) + self.ngram_max = ngram_max + + self.norm = norm + self.sublinear_tf = sublinear_tf + self.min_df = min_df + self.max_df = max_df + if self.norm == "None": + self.norm = None + + self._model: Optional[TfidfVectorizer] = None + + def fit(self, documents: Sequence[str]) -> None: + """ + Fit the TF-IDF model on the given documents. + + Args: + documents (Sequence[str]): A sequence of document strings. + """ + min_df = min(self.min_df, len(documents)) + max_df = max(min_df/len(documents), self.max_df) + + def _tokenizer(text): + doc = self.nlp(text) + tokens = [token.lemma_.lower() for token in doc + if not token.is_stop and not token.is_punct] + return tokens + + with warnings.catch_warnings(): + warnings.filterwarnings("ignore") + self._model = TfidfVectorizer( + ngram_range=(1, self.ngram_max), + stop_words=self.stop_words, + tokenizer=_tokenizer, # self.stem_tokenizer, + min_df=min_df, + norm=self.norm, + sublinear_tf=self.sublinear_tf, + max_df=max_df) + self._model.fit(documents) + + def transform(self, documents: Union[str, Sequence[str]]) -> Union[ + scipy.sparse.spmatrix]: + """ + Transform the input documents into TF-IDF embeddings. + + Args: + documents (Union[str, Sequence[str]]): A single document string or + a sequence of document strings. + + Returns: + Union[scipy.sparse.spmatrix]: The TF-IDF embeddings of the input + documents. + """ + if self._model is None: + raise ValueError("Fit TF-IDF model before transforming data.") + return self._model.transform(documents).tocsr() diff --git a/dataQuest/output_generator/text_formater.py b/dataQuest/output_generator/text_formater.py new file mode 100644 index 0000000..1bad6a0 --- /dev/null +++ b/dataQuest/output_generator/text_formater.py @@ -0,0 +1,117 @@ +""" This module defines a TextFormatter class for formatting text based on +specified output units. """ +from typing import List, Union +import logging +from dataQuest.settings import SPACY_MODEL +from dataQuest.utils import load_spacy_model + +PARAGRAPH_FORMATTER = 'paragraph' +FULLTEXT_FORMATTER = 'full_text' +SEGMENTED_TEXT_FORMATTER = 'segmented_text' + + +class TextFormatter: + # pylint: disable=R0903 + """Class for formatting text based on specified output units. """ + + def __init__(self, output_unit: str, sentences_per_segment: int, + spacy_model=SPACY_MODEL): # : Union[str, Language] + """ + Initializes the TextFormatter object. + + Args: + output_unit (str): The type of output unit ('paragraph', + 'full_text', 'segmented_text'). + sentences_per_segment (int): Number of sentences per + segment when output_unit is 'segmented_text'. + spacy_model (Union[str, Language], optional): Spacy model + or model name used for text processing. Defaults to the global + SPACY_MODEL value. + """ + self.nlp = ( + load_spacy_model(spacy_model) + if isinstance(spacy_model, str) + else spacy_model + ) + self.sentences_per_segment = sentences_per_segment + self.formatter = output_unit + self.is_fulltext = self._is_fulltext() + self.texts: List[str] = [] + + def format_output(self, texts: Union[None, List[str]]) -> ( + Union)[str, List[str], List[List[str]], None]: + """ + Formats input texts based on the specified output unit. + + Args: + texts (List[str]): List of input texts to be formatted. + + Returns: + Union[str, List[str], List[List[str]]]: Formatted output text + based on the selected output_unit. For 'full_text', returns a + single string. For 'paragraph' and 'segmented_text', returns a + list of segmented text lists. + + Raises: + ValueError: If input 'texts' is not a list of strings. + ValueError: If an unsupported formatter type is specified. + """ + try: + if (not isinstance(texts, list) or (texts is None) or + not all(isinstance(text, str) for text in texts)): + raise ValueError("Input 'texts' must be a list of strings.") + + self.texts = texts + + if self.formatter == PARAGRAPH_FORMATTER: + return self._format_paragraph() + if self.formatter == FULLTEXT_FORMATTER: + return self._format_fulltext() + if self.formatter == SEGMENTED_TEXT_FORMATTER: + return self._format_segmented_text() + + except ValueError as e: + logging.error("Unsupported formatter %s: %s", self.formatter, e) + return None + return None + + def _format_paragraph(self) -> List[str]: + """Formats texts as a single paragraph. + + Returns: + List[List[str]]: List of input texts, segmented in paragraphs. + """ + return self.texts + + def _format_fulltext(self) -> str: + """Formats texts as full text with newline separators. + + Returns: + str: Newline-separated string of input texts. + """ + return '\n'.join(self.texts) + + def _format_segmented_text(self) -> List[List[str]]: + """Formats texts as segmented text based on sentences_per_segment. + + Returns: + List[List[str]]: Flattened list of segmented text strings. + """ + segmented_texts = [] + for text in self.texts: + doc = self.nlp(text) + sentences = [sent.text for sent in doc.sents] + + for i in range(0, len(sentences), self.sentences_per_segment): + segment = sentences[i:i + self.sentences_per_segment] + segmented_texts.append(segment) + + return segmented_texts + + def _is_fulltext(self) -> bool: + """Checks if the formatter type is 'full_text'. + + Returns: + bool: True if formatter is 'full_text', False otherwise. + """ + return self.formatter == FULLTEXT_FORMATTER diff --git a/dataQuest/preprocessor/__init__.py b/dataQuest/preprocessor/__init__.py new file mode 100644 index 0000000..f6b7579 --- /dev/null +++ b/dataQuest/preprocessor/__init__.py @@ -0,0 +1 @@ +# from dataQuest.preprocessor.parser import XMLExtractor diff --git a/dataQuest/preprocessor/parser.py b/dataQuest/preprocessor/parser.py new file mode 100644 index 0000000..efb7dd3 --- /dev/null +++ b/dataQuest/preprocessor/parser.py @@ -0,0 +1,207 @@ + +import os +import tarfile +import gzip +import json +import xml.etree.ElementTree as ET +from typing import Dict, Union, Any, Optional, List +import logging + + +class XMLExtractor: + """Class for extracting XML content and metadata from nested .tgz files.""" # noqa: E501 + def __init__(self, root_dir: str, output_dir: str): + """ + Initializes the XMLExtractor object. + + Parameters: + root_dir (str): The root directory containing .tgz files. + output_dir (str): The output directory for saving extracted JSON files. # noqa: E501 + """ + self.root_dir = root_dir + self.output_dir = output_dir + self.fields = [ + "title", "language", "issuenumber", "date", "identifier", + "temporal", "recordRights", "publisher", "spatial", "source", + "recordIdentifier", "type", "isPartOf" + ] + + def extract_xml_string(self) -> None: + """ + Extracts XML content and metadata from .tgz files in the root directory. # noqa: E501 + """ + for folder_name in os.listdir(self.root_dir): + folder_path = os.path.join(self.root_dir, folder_name) + if not os.path.isdir(folder_path): + continue + if not folder_name.isdigit(): # Exclude in_progress, manifests, and ocr_complete folders and log files. # noqa: E501 + continue + self.process_folder(folder_name, folder_path) + + def process_folder(self, folder_name: str, folder_path: str) -> None: + """ + Processes .tgz files within a folder. + + Parameters: + folder_name (str): Name of the folder being processed. + folder_path (str): Path to the folder being processed. + """ + for tgz_filename in os.listdir(folder_path): + if not tgz_filename.endswith('.tgz'): + continue + tgz_file_path = os.path.join(folder_path, tgz_filename) + base_name = os.path.splitext(tgz_filename)[0] + output_folder = os.path.join(self.output_dir, folder_name) + os.makedirs(output_folder, exist_ok=True) + try: + with tarfile.open(tgz_file_path, "r:gz") as outer_tar: + news_dict = self.process_tar(outer_tar) + except tarfile.TarError as e: + logging.error(f"Error extracting {tgz_filename}: {e}") + continue + output_file = os.path.join(output_folder, f"{base_name}.json.gz") + self.save_as_json_compressed(news_dict, output_file) + # self.save_as_json(news_dict, output_file) + + def process_tar(self, outer_tar: tarfile.TarFile) -> Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]]: # noqa: E501 + """ + Processes a .tgz file and extracts XML content and metadata. + + Parameters: + outer_tar (tarfile.TarFile): The .tgz file being processed. + + Returns: + Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]]: A dictionary containing extracted content and metadata. # noqa: E501 + """ + news_dict: Dict[str, Any] = {"newsletter_metadata": {}, "articles": {}} + id = 0 + for entry in outer_tar: + try: + if entry.name.endswith(".xml"): + file = outer_tar.extractfile(entry) + if file is not None: + content = file.read() + xml_content = content.decode('utf-8', 'ignore') + article = self.extract_article(xml_content, entry.name) + id += 1 + news_dict["articles"][id] = article + + elif entry.name.endswith(".gz"): + gz_member = next(member for member in outer_tar.getmembers() if member.name.endswith('.gz')) # noqa: E501 + with outer_tar.extractfile(gz_member) as gz_file: # type: ignore # noqa: E501 + with gzip.open(gz_file, 'rt') as xml_file: + xml_string = xml_file.read() + if isinstance(xml_string, bytes): + xml_string = xml_string.decode('utf-8') + newsletter_metadata = self.extract_meta(xml_string) + news_dict["newsletter_metadata"] = newsletter_metadata # noqa: E501 + else: + continue + except Exception as e: + logging.error(f"Error processing file {entry.name}: {e}") + return news_dict + + @staticmethod + def save_as_json_compressed(data: Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]], output_file: str) -> None: # noqa: E501 + """ + Saves data as compressed JSON using gzip. + + Parameters: + data (Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]]): Data to be saved as JSON. # noqa: E501 + output_file (str): Path to the output JSON file. + """ + try: + with gzip.open(output_file, 'wt') as json_file: + json.dump(data, json_file, indent=4) + except Exception as e: + logging.error(f"Error saving compressed JSON to {output_file}: {e}") # noqa: E501 + + # @staticmethod + # def save_as_json(data: Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]], output_file: str) -> None: # noqa: E501 + # """ + # Saves data as JSON to a specified file. + + # Parameters: + # data (Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]]): Data to be saved as JSON. # noqa: E501 + # output_file (str): Path to the output JSON file. + # """ + # try: + # with open(output_file, 'w') as json_file: + # json.dump(data, json_file, indent=4) + # except Exception as e: + # logging.error(f"Error saving JSON to {output_file}: {e}") + + @staticmethod + def extract_article(xml_content: str, file_name: str) -> Dict[str, Union[str, List[Optional[str]]]]: # noqa: E501 + """ + Extracts article title and body from XML content. + + Parameters: + xml_content (str): XML content of the article. + file_name (str): Name of the XML file. + + Returns: + Dict[Optional[str], list[str]]: A dictionary containing the extracted title and body of the article. + body contains a list of paragraphs. # noqa: E501 + """ + try: + root = ET.fromstring(xml_content) + except ET.ParseError: + logging.error(f"Failed to parse XML from file: {file_name}") + return {} + + title_values = [element.text for element in root.iter() if element.tag.endswith('title')] # noqa: E501 + if len(title_values) > 1: + logging.warning("More than one titles are extracted for the article.") # noqa: E501 + if not title_values: + logging.warning("No title is extracted for the article.") + title = "" + else: + title = title_values[0] if title_values[0] is not None else "" + # title = title_values[0] + + body_values = [element.text for element in root.iter() if element.tag.endswith('p')] # noqa: E501 + if not body_values: + logging.warning("No body is extracted.") + body = [] + # elif len(body_values) > 1: + # logging.warning("There are more than one paragraphs in the article.") # noqa: E501 + # body = ' '.join(body_values) + else: + # body = body_values[0] + body = body_values + + return {"title": title, "body": body} + + def extract_meta(self, xml_string: str) -> Dict[str, Union[str, None]]: + """ + Extracts metadata from XML string. + + Parameters: + xml_string (str): XML string containing metadata. + + Returns: + Dict[str, Union[str, None]]: A dictionary containing the extracted metadata. # noqa: E501 + """ + newsletter_metadata: Dict[str, Union[str, None]] = {} + + try: + root = ET.fromstring(xml_string) + except ET.ParseError: + logging.error("Failed to parse XML from file") + return newsletter_metadata + + for field in self.fields: + field_values = [element.text for element in root.iter() if element.tag.endswith(field)] # noqa: E501 + if len(field_values) > 1: + logging.warning(f"More than one {field}s are extracted from metadata.") # noqa: E501 + if not field_values: + logging.warning(f"No {field} is extracted.") + newsletter_metadata[field] = None + else: + filtered_field_values = [value for value in field_values if value is not None] # noqa: E501 + newsletter_metadata[field] = filtered_field_values[0] if field != "spatial" else ", ".join(filtered_field_values) # noqa: E501 + + # newsletter_metadata[field] = field_values[0] if field != "spatial" else ", ".join(field_values) # noqa: E501 + + return newsletter_metadata diff --git a/dataQuest/preprocessor/text_cleaner.py b/dataQuest/preprocessor/text_cleaner.py new file mode 100644 index 0000000..33cd27b --- /dev/null +++ b/dataQuest/preprocessor/text_cleaner.py @@ -0,0 +1,129 @@ +""" +This module provides a TextCleaner class for preprocessing text +data using various cleaning techniques. +""" +import re +from typing import Union, List +from dataQuest.settings import SPACY_MODEL +from dataQuest.utils import load_spacy_model + + +def merge_texts_list(text: Union[str, List[str]]) -> str: + """ + Merge a list of texts into a single string by joining them with spaces. + + Args: + text (Union[str, List[str]]): The input text or list of texts to merge. + + Returns: + str: The merged text if input is a list of strings, otherwise returns + the input text unchanged. + """ + if isinstance(text, list): + merged_text = ' '.join(text) + return merged_text + return text + + +class TextCleaner: + """A class for cleaning text data using various preprocessing + techniques.""" + + def __init__(self, spacy_model=SPACY_MODEL) -> None: + """Initialize the TextCleaner instance. + + Args: + spacy_model (str or spacy.Language, optional): The SpaCy + model to use for text processing. + Defaults to the model specified in the settings. + """ + + self.nlp = ( + load_spacy_model(spacy_model) + if isinstance(spacy_model, str) + else spacy_model + ) + self.stopword_list = self.nlp.Defaults.stop_words + self.stopwords = set(self.stopword_list) + self.text = "" + + def get_lower_lemma_tokens(self) -> None: + """ + Get lowercased lemmatized tokens from the text. + + This method processes the text stored in the instance variable + `self.text`,tokenizes it using the SpaCy pipeline `self.nlp`, + and then lemmatizes each token, converting it to lowercase. + Stop words and punctuation tokens are filtered out. + """ + doc = self.nlp(self.text) + self.text = " ".join([token.lemma_.lower() for token in doc + if not token.is_stop and not token.is_punct]) + + def get_words(self): + """Tokenize words in the text.""" + doc = self.nlp(self.text) + self.text = " ".join([token.text for token in doc]) + + def lower(self): + """Transform the text to lower case.""" + self.text = self.text.lower() + + def remove_stopwords(self): + """Remove the stopwords from the text.""" + doc = self.nlp(self.text) + self.text = " ".join([token.text for token in doc if token.text + not in self.stopwords]) + + def remove_numeric(self): + """Remove numbers from the text.""" + self.text = re.sub(r'\d+', '', self.text) + + def remove_non_ascii(self): + """Remove non ASCII characters from the text.""" + self.text = re.sub(r'[^\x00-\x7f]', '', self.text) + + def remove_extra_whitespace_tabs(self): + """Remove extra whitespaces and tabs from the text.""" + self.text = re.sub(r'\s+', ' ', self.text) + + def remove_one_char(self): + """Remove single characters from the text.""" + self.text = " ".join([w for w in self.text.split() if len(w) > 1]) + + def keep_standard_chars(self): + """Keep only standard characters in the text.""" + self.text = re.sub(r'[^-0-9\w,. ?!()%/]', '', self.text) + + def preprocess(self, text): + """Preprocess the given text using a series of cleaning steps. + + Args: + text ( List[str]): The text to preprocess. + + Returns: + str: The preprocessed text. + """ + self.text = merge_texts_list(text) + self.get_lower_lemma_tokens() + self.remove_numeric() + self.remove_extra_whitespace_tabs() + self.remove_one_char() + return self.text + + def clean(self, text): + """Clean the given text by removing non-standard characters and + extra whitespace. + + Args: + text (str): The text to clean. + + Returns: + str: The cleaned text. + """ + self.text = merge_texts_list(text) + self.text = text + self.get_words() + self.keep_standard_chars() + self.remove_extra_whitespace_tabs() + return self.text diff --git a/dataQuest/settings.py b/dataQuest/settings.py new file mode 100644 index 0000000..391f9b4 --- /dev/null +++ b/dataQuest/settings.py @@ -0,0 +1,10 @@ +""" +Module containing configuration settings for the project. +""" +import os + +SPACY_MODEL = os.getenv("SPACY_MODEL", "nl_core_news_sm") +"""Spacy model to use for sentence splitting.""" + +ENCODING = os.getenv("ENCODING", "utf-8") +"""Encoding used for reading and writing files.""" diff --git a/dataQuest/temporal_categorization/__init__.py b/dataQuest/temporal_categorization/__init__.py new file mode 100644 index 0000000..ca3bb25 --- /dev/null +++ b/dataQuest/temporal_categorization/__init__.py @@ -0,0 +1,8 @@ +"""Mapping from string format descriptions to corresponding classes.""" +from dataQuest.temporal_categorization.timestamped_data \ + import (YearPeriodData, DecadePeriodData) + +PERIOD_TYPES = { + "decade": DecadePeriodData, + "year": YearPeriodData +} diff --git a/dataQuest/temporal_categorization/timestamped_data.py b/dataQuest/temporal_categorization/timestamped_data.py new file mode 100644 index 0000000..7a75488 --- /dev/null +++ b/dataQuest/temporal_categorization/timestamped_data.py @@ -0,0 +1,123 @@ +""" +This module provides classes and utilities for working with data +that includes timestamps. +""" +import json +from datetime import datetime +from pathlib import Path + + +class TimestampedData: + """ + Represents data with a timestamp. + + Attributes: + DATE_FIELD (str): The field name for the timestamp in the data. + _filename (Path): The path to the file containing the data. + _data (dict): The loaded JSON data. + _timestamp (datetime): The timestamp extracted from the data. + + Methods: + __init__(self, filename): Initializes the TimestampedData object. + filename(self) -> Path: Returns the filename path. + _load_data(self): Loads data from the file. + _get_timestamp(self): Extracts the timestamp from the data. + categorize(self): Abstract method for categorizing data by timestamp. + """ + + DATE_FIELD = "Date" + + def __init__(self, filename: Path): + """ + Initializes the TimestampedData object. + + Args: + filename (Path): The path to the file containing the data. + """ + self._filename = filename + self._data = self._load_data() + self._timestamp = self._get_timestamp() + + @property + def filename(self) -> Path: + """ + Returns the filename path. + + Returns: + Path: The filename path. + """ + return self._filename + + def _load_data(self): + """ + Loads data from the file. + + Returns: + dict: The loaded JSON data. + """ + with open(self._filename, 'r', encoding='utf-8') as file: + return json.load(file) + + def data(self): + """ + Returns the json data + + Returns: + dict: The loaded JSON data. + """ + return self._data + + def _get_timestamp(self): + """ + Extracts the timestamp from the data. + + Returns: + datetime: The extracted timestamp. + """ + return datetime.strptime(self._data[self.DATE_FIELD], '%Y-%m-%d') + + def categorize(self): + """ + Abstract method for categorizing data by timestamp. + + Raises: + NotImplementedError: Subclasses must implement categorize method. + """ + raise NotImplementedError("Subclass must implement categorize method") + + +class YearPeriodData(TimestampedData): + """ + Represents data categorized by year. + + Methods: + categorize(self): Categorizes data by year. + """ + + def categorize(self): + """ + Categorizes data by year. + + Returns: + int: The year of the timestamp. + """ + return self._timestamp.year + + +class DecadePeriodData(TimestampedData): + """ + Represents data categorized by decade. + + Methods: + categorize(self): Categorizes data by decade. + """ + + def categorize(self): + """ + Categorizes data by decade. + + Returns: + int: The decade of the timestamp. + """ + year = self._timestamp.year + return (year // 10) * 10 diff --git a/dataQuest/utils.py b/dataQuest/utils.py new file mode 100644 index 0000000..7d8fd65 --- /dev/null +++ b/dataQuest/utils.py @@ -0,0 +1,174 @@ +""" +Module containing utility functions for the project. +""" +import os +from pathlib import Path +from typing import List, Dict, Any, Optional +from functools import cache +import json +import spacy +import spacy.cli +from dataQuest.filter.document_filter import (YearFilter, + TitleFilter, + DocumentFilter) +from dataQuest.filter.document_filter import (CompoundFilter, + DecadeFilter, + KeywordsFilter) +from dataQuest.settings import ENCODING + + +@cache +def load_spacy_model(model_name: str, retry: bool = True) \ + -> Optional[spacy.Language]: + """Load and store a sentencize-only SpaCy model + + Downloads the model if necessary. + + Args: + model_name (str): The name of the SpaCy model to load. + retry (bool, optional): Whether to retry downloading the model + if loading fails initially. Defaults to True. + + Returns: + spacy.Language: The SpaCy model object for the given name. + """ + + try: + nlp = spacy.load(model_name, disable=["tagger", "parser", "ner"]) + except OSError as exc: + if retry: + spacy.cli.download(model_name) + return load_spacy_model(model_name, False) + raise exc + nlp.add_pipe("sentencizer") + return nlp + + +def load_filters_from_config(config_file: Path) -> CompoundFilter: + """Load document filters from a configuration file. + + Args: + config_file (Path): Path to the configuration file containing + filter settings. + + Returns: + CompoundFilter: A compound filter containing individual document + filters loaded from the configuration. + """ + with open(config_file, 'r', encoding=ENCODING) as f: + config: Dict[str, List[Dict[str, Any]]] = json.load(f) + + filters: List[DocumentFilter] = [] + for filter_config in config['filters']: + filter_type = filter_config['type'] + if filter_type == 'TitleFilter': + filters.append(TitleFilter(filter_config['title'])) + elif filter_type == 'YearFilter': + filters.append(YearFilter(filter_config['year'])) + elif filter_type == 'DecadeFilter': + filters.append(DecadeFilter(filter_config['decade'])) + elif filter_type == 'KeywordsFilter': + filters.append(KeywordsFilter(filter_config['keywords'])) + + return CompoundFilter(filters) + + +def get_keywords_from_config(config_file: Path) -> List[str]: + """ + Extract keywords from a JSON configuration file. + + Args: + config_file (Path): The path to the JSON configuration file. + + Returns: + List[str]: The list of keywords extracted from the configuration + file. + + Raises: + FileNotFoundError: If the config file is not found or cannot be + opened. + KeyError: If the required keys are not found in the configuration + file. + TypeError: If the data in the configuration file is not in the + expected format. + """ + try: + with open(config_file, 'r', encoding=ENCODING) as f: + config: Dict[str, List[Dict[str, Any]]] = json.load(f) + + for filter_config in config['filters']: + filter_type = filter_config['type'] + if filter_type == 'KeywordsFilter': + return filter_config['keywords'] + return [] + except FileNotFoundError as exc: + raise FileNotFoundError("Config file not found") from exc + except KeyError as exc: + raise KeyError("Keywords not found in config file") from exc + + +def read_config(config_file: Path, item_key: str) -> Dict[str, str]: + """ + Get the value of the given key item from a JSON file. + + Args: + config_file (Path): The path to the JSON config file. + item_key (str): Key item defined in config file. + Returns: + Dict[str, str]: The article selector configuration. + + Raises: + KeyError: If the key item is not found in the config file. + FileNotFoundError: If the config file is not found. + """ + try: + with open(config_file, 'r', encoding=ENCODING) as f: + config: Dict[str, str] = json.load(f)[item_key] + if not config: + raise ValueError("Config is empty") + return config + except FileNotFoundError as exc: + raise FileNotFoundError("Config file not found") from exc + except KeyError as exc: + raise KeyError("Key item %s not found in config file") from exc + + +def save_filtered_articles(input_file: Any, article_id: str, + output_dir: str) -> None: + """Save filtered articles data to a JSON file. + + Args: + input_file: The input file object. + article_id (str): The ID of the article. + output_dir (str): The directory where the JSON file will be saved. + + Returns: + None + """ + data = { + "file_path": str(input_file.filepath), + "article_id": str(article_id), + "Date": str(input_file.doc().publish_date), + "Title": input_file.doc().title, + } + + output_fp = os.path.join(output_dir, input_file.base_file_name() + '.json') + print('output_fp', output_fp) + with open(output_fp, "w", encoding=ENCODING) as json_file: + json.dump(data, json_file, indent=4) + + +def get_file_name_without_extension(full_path: str) -> str: + """ + Extracts the file name without extension from a full path. + + Args: + full_path (str): The full path of the file. + + Returns: + str: The file name without extension. + + """ + base_name = os.path.basename(full_path) + file_name_without_ext = os.path.splitext(base_name)[0] + return file_name_without_ext diff --git a/scripts/convert_input_files.py b/scripts/convert_input_files.py index b6d2dea..023d152 100644 --- a/scripts/convert_input_files.py +++ b/scripts/convert_input_files.py @@ -1,4 +1,4 @@ -from interest.preprocessor.parser import XMLExtractor +from dataQuest.preprocessor.parser import XMLExtractor from argparse import ArgumentParser from pathlib import Path import logging diff --git a/scripts/step1_filter_articles.py b/scripts/step1_filter_articles.py index 99d59a0..fa638f2 100644 --- a/scripts/step1_filter_articles.py +++ b/scripts/step1_filter_articles.py @@ -9,10 +9,10 @@ from tqdm import tqdm -from interest.filter import INPUT_FILE_TYPES -from interest.filter.input_file import InputFile -from interest.utils import load_filters_from_config -from interest.utils import save_filtered_articles +from dataQuest.filter import INPUT_FILE_TYPES +from dataQuest.filter.input_file import InputFile +from dataQuest.utils import load_filters_from_config +from dataQuest.utils import save_filtered_articles if __name__ == "__main__": parser = argparse.ArgumentParser("Filter articles from input files.") diff --git a/scripts/step2_categorize_by_timestamp.py b/scripts/step2_categorize_by_timestamp.py index 0979642..586eb3d 100644 --- a/scripts/step2_categorize_by_timestamp.py +++ b/scripts/step2_categorize_by_timestamp.py @@ -9,8 +9,8 @@ from pathlib import Path import pandas as pd from tqdm import tqdm # type: ignore -from interest.temporal_categorization import PERIOD_TYPES -from interest.temporal_categorization.timestamped_data import TimestampedData +from dataQuest.temporal_categorization import PERIOD_TYPES +from dataQuest.temporal_categorization.timestamped_data import TimestampedData OUTPUT_FILE_NAME = 'articles' FILENAME_COLUMN = 'file_path' diff --git a/scripts/step3_select_final_articles.py b/scripts/step3_select_final_articles.py index 37f723c..82a71f9 100644 --- a/scripts/step3_select_final_articles.py +++ b/scripts/step3_select_final_articles.py @@ -4,9 +4,9 @@ from typing import List from pathlib import Path import pandas as pd -from interest.utils import get_keywords_from_config -from interest.utils import read_config -from interest.article_final_selection.process_articles import select_articles +from dataQuest.utils import get_keywords_from_config +from dataQuest.utils import read_config +from dataQuest.article_final_selection.process_articles import select_articles ARTICLE_SELECTOR_FIELD = "article_selector" diff --git a/scripts/step4_generate_output.py b/scripts/step4_generate_output.py index 161140c..5a62e5a 100644 --- a/scripts/step4_generate_output.py +++ b/scripts/step4_generate_output.py @@ -7,11 +7,11 @@ from typing import Union import pandas as pd from pandas import DataFrame -from interest.settings import SPACY_MODEL -from interest.article_final_selection.process_article import ArticleProcessor -from interest.utils import read_config, get_file_name_without_extension -from interest.output_generator.text_formater import (TextFormatter, - SEGMENTED_TEXT_FORMATTER) +from dataQuest.settings import SPACY_MODEL +from dataQuest.article_final_selection.process_article import ArticleProcessor +from dataQuest.utils import read_config, get_file_name_without_extension +from dataQuest.output_generator.text_formater import (TextFormatter, + SEGMENTED_TEXT_FORMATTER) FILE_PATH_FIELD = "file_path" From bb52e561d73f940703808f6a46f2a021d533adc9 Mon Sep 17 00:00:00 2001 From: parisa-zahedi Date: Tue, 25 Jun 2024 14:53:47 +0200 Subject: [PATCH 2/4] remove files in old directory --- interest/__init__.py | 0 interest/article_final_selection/__init__.py | 0 .../article_selector.py | 56 ----- .../process_article.py | 94 -------- .../process_articles.py | 102 --------- interest/filter/__init__.py | 7 - interest/filter/delpher_kranten.py | 118 ---------- interest/filter/document.py | 146 ------------ interest/filter/document_filter.py | 208 ------------------ interest/filter/input_file.py | 119 ---------- interest/models/base.py | 20 -- interest/models/tfidf.py | 101 --------- interest/output_generator/text_formater.py | 117 ---------- interest/preprocessor/__init__.py | 1 - interest/preprocessor/parser.py | 207 ----------------- interest/preprocessor/text_cleaner.py | 129 ----------- interest/settings.py | 10 - interest/temporal_categorization/__init__.py | 8 - .../timestamped_data.py | 123 ----------- interest/utils.py | 174 --------------- 20 files changed, 1740 deletions(-) delete mode 100644 interest/__init__.py delete mode 100644 interest/article_final_selection/__init__.py delete mode 100644 interest/article_final_selection/article_selector.py delete mode 100644 interest/article_final_selection/process_article.py delete mode 100644 interest/article_final_selection/process_articles.py delete mode 100644 interest/filter/__init__.py delete mode 100644 interest/filter/delpher_kranten.py delete mode 100644 interest/filter/document.py delete mode 100644 interest/filter/document_filter.py delete mode 100644 interest/filter/input_file.py delete mode 100644 interest/models/base.py delete mode 100644 interest/models/tfidf.py delete mode 100644 interest/output_generator/text_formater.py delete mode 100644 interest/preprocessor/__init__.py delete mode 100644 interest/preprocessor/parser.py delete mode 100644 interest/preprocessor/text_cleaner.py delete mode 100644 interest/settings.py delete mode 100644 interest/temporal_categorization/__init__.py delete mode 100644 interest/temporal_categorization/timestamped_data.py delete mode 100644 interest/utils.py diff --git a/interest/__init__.py b/interest/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/interest/article_final_selection/__init__.py b/interest/article_final_selection/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/interest/article_final_selection/article_selector.py b/interest/article_final_selection/article_selector.py deleted file mode 100644 index c94ab9b..0000000 --- a/interest/article_final_selection/article_selector.py +++ /dev/null @@ -1,56 +0,0 @@ -"""Module containing the ArticleSelector class for selecting articles based on -similarity scores.""" - -from typing import List, Dict, Union - - -class ArticleSelector: - """Class for selecting articles based on similarity scores and - configuration parameters.""" - # pylint: disable=too-few-public-methods - - def __init__(self, similarity_scores: List[float], - config: Dict[str, Union[str, float, int]]): - """Initializes the ArticleSelector object. - - Args: - similarity_scores (List[float]): A list of similarity scores - between keywords and articles. - config (Dict[str, Union[str, float, int]]): A dictionary containing - configuration parameters for selecting articles. - """ - self.similarity_scores = similarity_scores - self.config = config - - def select_articles(self) -> List[int]: - """Selects articles based on the configured selection method and value. - - Returns: - List[int]: A list of indices of selected articles. - """ - sorted_indices = sorted( - range(len(self.similarity_scores)), - key=lambda i: self.similarity_scores[i], - reverse=True - ) - - selected_indices: List[int] = [] - if self.config["type"] == "threshold": - threshold = float(self.config["value"]) - selected_indices.extend( - i for i, score in enumerate(self.similarity_scores) - if score >= threshold - ) - elif self.config["type"] == "num_articles": - num_articles = int(self.config["value"]) - selected_indices.extend(sorted_indices[:num_articles]) - - elif self.config["type"] == "percentage": - percentage = float(self.config["value"]) - num_articles = int(len(self.similarity_scores) * - (percentage / 100.0)) - num_articles = len(self.similarity_scores) if num_articles == 0\ - else num_articles - selected_indices.extend(sorted_indices[:num_articles]) - - return selected_indices diff --git a/interest/article_final_selection/process_article.py b/interest/article_final_selection/process_article.py deleted file mode 100644 index 763e166..0000000 --- a/interest/article_final_selection/process_article.py +++ /dev/null @@ -1,94 +0,0 @@ -""" Module for processing articles from gzip files.""" -import gzip -import json -import logging -from typing import List, Union, Tuple -from interest.preprocessor.text_cleaner import TextCleaner - -text_cleaner = TextCleaner() - - -def clean(text: Union[str, List[str]]) -> str: - """ - Clean the input text using TextCleaner. - - Args: - text (str): The input text to clean. - - Returns: - str: The cleaned text. - """ - return text_cleaner.preprocess(text) - -# pylint: disable=too-few-public-methods - - -class ArticleProcessor: - """ - Process individual articles from gzip files. - - This class handles the processing of individual articles from - gzip files. - It reads the content of the article, cleans it using TextCleaner, and - determines whether the article contains any keywords of interests in - the title. - """ - def __init__(self, gzip_file_path: str, article_id: int): - """ - Initialize ArticleProcessor with the gzip file path and article ID. - - Args: - gzip_file_path (str): The path to the gzip file. - article_id (int): The ID of the article. - """ - self._file_path = gzip_file_path - self._article_id = article_id - self._title: Union[str, None] = '' - self._body: Union[str, list, None] = '' - self.selected: bool = False - - def read_article_from_gzip(self) -> ( - Tuple)[Union[str, None], Union[List[str], None], Union[str, None]]: - """ - Read article content from a gzip file. - - Returns: - Tuple[Union[str, None], Union[list, None], Union[str, None]]: - A tuple containing the title, body, and date of the article. - """ - try: - with gzip.open(self._file_path, 'rt') as f: - data = json.load(f) - metadata = data.get('newsletter_metadata', {}) - date = metadata.get('date', {}) - articles = data.get('articles', {}) - article = articles.get(str(self._article_id), {}) - title = article.get('title', {}) - body = article.get('body', {}) - return title, body, date - except Exception as e: # pylint: disable=broad-except - logging.error("Error reading article %s from %s: %s", - str(self._article_id), self._file_path, e) - return None, None, None - - def process_article(self, clean_keywords: List[str]) -> str: - """ - Process the article content. - - Args: - clean_keywords (List[str]): A list of clean keywords. - - Returns: - str: The processed article body. - """ - self._title, self._body, _ = self.read_article_from_gzip() - if (self._title is None) or (self._body is None): - return "" - clean_title = clean(self._title) - title_with_keyword = any(keyword in clean_title - for keyword in clean_keywords) - if title_with_keyword: - self.selected = True - return "" - - return clean(self._body) diff --git a/interest/article_final_selection/process_articles.py b/interest/article_final_selection/process_articles.py deleted file mode 100644 index c768271..0000000 --- a/interest/article_final_selection/process_articles.py +++ /dev/null @@ -1,102 +0,0 @@ -""" -This module contains functions for selecting articles based on keywords -and similarity scores. -""" -from typing import List, Tuple, Dict, Union -import pandas as pd -from sklearn.metrics.pairwise import cosine_similarity -from interest.models.tfidf import TfidfEmbedder -from interest.article_final_selection.process_article import ArticleProcessor -from interest.article_final_selection.process_article import clean -from interest.article_final_selection.article_selector import ArticleSelector - - -def process_articles(articles_filepath: str, clean_keywords: List[str]) -> ( - Tuple)[List[str], List[int]]: - """ - Process articles from a CSV file. - - Args: - articles_filepath (str): The path to the CSV file containing articles. - clean_keywords (List[str]): A list of clean keywords. - - Returns: - Tuple[List[str], List[int]]: A tuple containing the processed article - bodies and selected indices. - """ - articles_df = pd.read_csv(articles_filepath) - article_bodies: List[str] = [] - selected_indices: List[int] = [] - for index, row in articles_df.iterrows(): - article_processor = ArticleProcessor(row['file_path'], - row['article_id']) - processed_article_body = article_processor.process_article( - clean_keywords) - if article_processor.selected: - selected_indices.append(int(str(index))) - elif processed_article_body != "": - article_bodies.append(processed_article_body) - return article_bodies, selected_indices - - -def apply_tfidf_similarity(documents: List[str], keywords: List[str]) -> ( - List)[float]: - """ - Apply TF-IDF similarity between documents and keywords. - - Args: - documents (List[str]): A list of document bodies. - keywords (List[str]): A list of keywords. - - Returns: - List[float]: A list of similarity scores. - """ - model = TfidfEmbedder(ngram_max=1, norm="l1", sublinear_tf=False, min_df=1, - max_df=1.0) - keywords_list = [" ".join(keywords)] - model.fit(documents) - embeddings_documents = model.transform(documents).tocsr() - embeddings_keywords = model.transform(keywords_list).tocsr() - similarity_scores = cosine_similarity(embeddings_keywords, - embeddings_documents) - return similarity_scores[0] - - -def select_top_articles(similarity_scores: List[float], - config: Dict[str, Union[str, float, int]]) \ - -> List[int]: - """ - Select top articles based on similarity scores and configuration. - - Args: - similarity_scores (List[float]): A list of similarity scores. - config (Dict[str, str]): Configuration for selecting articles. - - Returns: - List[int]: A list of selected article indices. - """ - selector = ArticleSelector(similarity_scores, config) - selected_indices = selector.select_articles() - return selected_indices - - -def select_articles(articles_filepath: str, keywords: List[str], - config: Dict[str, Union[str, float, int]]) -> List[int]: - """ - Select articles based on keywords, similarity scores, and configuration. - - Args: - articles_filepath (str): The path to the CSV file containing articles. - keywords (List[str]): A list of keywords. - config (Dict[str, str]): Configuration for selecting articles. - - Returns: - List[int]: A list of selected article indices. - """ - clean_keywords = [clean(keyword) for keyword in keywords] - article_bodies, selected_indices = process_articles(articles_filepath, - clean_keywords) - similarity_scores = apply_tfidf_similarity(article_bodies, clean_keywords) - indices = select_top_articles(similarity_scores, config) - selected_indices.extend(indices) - return selected_indices diff --git a/interest/filter/__init__.py b/interest/filter/__init__.py deleted file mode 100644 index 5618aa7..0000000 --- a/interest/filter/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -"""define input-file type""" -from interest.filter.delpher_kranten import KrantenFile - -INPUT_FILE_TYPES = { - "delpher_kranten": KrantenFile - -} diff --git a/interest/filter/delpher_kranten.py b/interest/filter/delpher_kranten.py deleted file mode 100644 index ec2dc85..0000000 --- a/interest/filter/delpher_kranten.py +++ /dev/null @@ -1,118 +0,0 @@ -""" -Delpher Kranten Module - -This module provides classes and functions for handling Delpher Kranten files. -""" - -import json -import logging -import os -from typing import Optional -from interest.filter.document import Document, Article -from interest.filter.input_file import InputFile - - -class KrantenFile(InputFile): - """ - An InputFile implementation for Delpher Kranten. - - Input is a zip file which includes one JSON file. The JSON file contains - metadata and articles from one issue of a newspaper. - - Attributes: - METADATA_FIELD (str): The key for metadata field in JSON data. - TITLE_FIELD (str): The key for title field in metadata. - DATE_FIELD (str): The key for date field in metadata. - LANGUAGE_FIELD (str): The key for language field in metadata. - ARTICLES_FIELD (str): The key for articles field in JSON data. - ARTICLE_TITLE_FIELD (str): The key for title field in an article. - ARTICLE_BODY_FIELD (str): The key for body field in an article. - ENCODING (str): The encoding format for reading the file. - - Methods: - read_json(json_file): Read JSON data from a file and parse it into - a Document object. - base_file_name(): Extract the base file name without extension from - the filepath. - doc(): Read the directory and parse the JSON file into a Document - object. - """ - - METADATA_FIELD = "newsletter_metadata" - TITLE_FIELD = "title" - DATE_FIELD = "date" - LANGUAGE_FIELD = "language" - ARTICLES_FIELD = "articles" - ARTICLE_TITLE_FIELD = "title" - ARTICLE_BODY_FIELD = "body" - ENCODING = "utf-8" - - def read_json(self, json_file) -> Optional[Document]: - """ - Read JSON data from a file and parse it into a Document object. - - Args: - json_file: A file object containing JSON data. - - Returns: - Optional[Document]: A Document object parsed from - the JSON data, or None if parsing fails. - """ - try: - json_data = json.load(json_file) - metadata = json_data[self.METADATA_FIELD] - document_title = metadata[self.TITLE_FIELD] - publish_date = metadata[self.DATE_FIELD] - language = metadata[self.LANGUAGE_FIELD] - - articles_data = json_data[self.ARTICLES_FIELD] - - articles = [] - for article_id, article in articles_data.items(): - article_title = article[self.ARTICLE_TITLE_FIELD] - article_body = article[self.ARTICLE_BODY_FIELD] - article = Article(article_id=article_id, title=article_title, - body=article_body) - articles.append(article) - - document = Document(title=document_title, - publish_date=publish_date, - language=language, - articles=articles) - return document - - except (json.JSONDecodeError, KeyError) as e: - logging.error("Error parsing JSON data: %s", e) - return None - - def base_file_name(self) -> str: - """ - Extract the base file name without extension from the filepath. - - Returns: - str: The base file name without extension. - """ - file_name_json = os.path.splitext(os.path.basename(self.filepath))[0] - base_file_name = os.path.splitext(file_name_json)[0] - return base_file_name - - def doc(self) -> Optional[Document]: - """ - Read the directory and parse the JSON file into a Document - object. - - Returns: - Optional[Document]: A Document object parsed from the - JSON data, or None if parsing fails. - """ - try: - logging.info("Reading directory '%s'...", self._filepath) - fh = self.open(encoding=self.ENCODING) - document = self.read_json(fh) - fh.close() - return document - - except OSError as e: - logging.error("Error processing gzip file '%s': %s", - self._filepath, e) - return None diff --git a/interest/filter/document.py b/interest/filter/document.py deleted file mode 100644 index eb3b1d3..0000000 --- a/interest/filter/document.py +++ /dev/null @@ -1,146 +0,0 @@ -# pylint: disable=too-few-public-methods -""" -This module defines the Document class, which represents a document -containing articles. -""" -import logging -from typing import Optional, List, Union -from datetime import datetime - - -class Article: - """A class representing an article. - - This class represents an article with an ID, title, and body text. - The body text can be provided as a list - of paragraphs, which will be joined into a single string. - - Attributes: - id (str): The unique identifier of the article. - title (str): The title of the article. - body (str): The body text of the article, represented as - a single string. - """ - def __init__(self, article_id: str, title: str, - body: Union[str, List[str]]) -> None: - """Initialize an Article object with the given ID, title, and body. - - Args: - id (str): The unique identifier of the article. - title (str): The title of the article. - body (Union[str, List[str]): The body text of the article, - provided as a list of paragraphs. - """ - self.id = article_id - self.title = title - if isinstance(body, list): - if any(item is None for item in body): - logging.warning("There is a None value in body") - self.text = "" - else: - article_body = '\n'.join(body) - self.text = article_body - else: - self.text = body - - -class Document: - """ - Represents a document containing articles. - - Args: - title (str): The title of the document. - publish_date (str): The publication date of the document in - the format 'YYYY-MM-DD'. - language (str): The language of the document. - articles (List[Article]): A list of articles included in - the document. - - Attributes: - _title (str): The title of the document. - _publish_date (str): The publication date of the document in - the format 'YYYY-MM-DD'. - _year (Optional[int]): The year of publication, extracted from - publish_date. - _language (str): The language of the document. - _articles (List[Article]): A list of articles included in the - document. - - Properties: - title (str): Getter for the title of the document. - publish_date (str): Getter for the publication date of the - document. - year (Optional[int]): Getter for the year of publication. - decade (Optional[int]): Getter for the decade of publication. - language (str): Getter for the language of the document. - articles (List[Article]): Getter for the list of articles - included in the document. - """ - def __init__(self, title: str, publish_date: str, language: str, - articles: List[Article]) -> None: - self._year: Optional[int] = None - self._articles = articles - self._title = title - self._publish_date = publish_date - self._language = language - - @property - def title(self) -> str: - """ - Getter for the title of the document. - - Returns: - str: The title of the document. - """ - return self._title - - @property - def publish_date(self) -> str: - """ - Getter for the publish_date of the document. - - Returns: - str: The publish_date of the document. - """ - return self._publish_date - - @property - def year(self) -> Optional[int]: - """ - Getter for the year of publication. - - Returns: - Optional[int]: The year of publication extracted - from publish_date, or None if it cannot be determined. - """ - if self._year is not None: - return self._year - try: - date_obj = datetime.strptime(self._publish_date, '%Y-%m-%d') - self._year = date_obj.year - return self._year - except ValueError: - return None - - @property - def decade(self) -> Optional[int]: - """ - Getter for the decade of publication. - - Returns: - Optional[int]: The decade of publication extracted from - publish_date, - or None if it cannot be determined. - """ - _ = self.year - return int(self._year / 10) * 10 if self._year is not None else None - - @property - def articles(self) -> List[Article]: - """ - Getter for the list of articles included in the document. - - Returns: - List[Article]: The list of articles included in the document. - """ - return self._articles diff --git a/interest/filter/document_filter.py b/interest/filter/document_filter.py deleted file mode 100644 index 19f5412..0000000 --- a/interest/filter/document_filter.py +++ /dev/null @@ -1,208 +0,0 @@ -""" -Document Filter Module -This module provides classes for filtering documents and articles. -""" -from abc import ABC, abstractmethod -from typing import List -from interest.filter.document import Document, Article - - -class DocumentFilter(ABC): - """ - Abstract base class for document filters. - - Methods: - filter_document(document: Document) -> bool: Abstract method - to filter documents. - filter_article(article: Article) -> bool: Method to filter - articles. - """ - @abstractmethod - def filter_document(self, document: Document) -> bool: - """ - Abstract method to filter documents. - - Args: - document (Document): The document to be filtered. - - Returns: - bool: True if the document passes the filter, - False otherwise. - """ - return NotImplemented - - def filter_article(self, _article: Article) -> bool: - """ - Method to filter articles. - - By default, returns True, allowing all articles to - pass through. - - Args: - _article (Article): The article to be filtered. - - Returns: - bool: True if the article passes the filter, - False otherwise. - """ - return True - - -class TitleFilter(DocumentFilter): - """ - Filter documents by title. - - Attributes: - title (str): The title to filter by. - """ - def __init__(self, title: str): - self.title = title - - def filter_document(self, document: Document) -> bool: - """ - Filter documents by title. - - Args: - document (Document): The document to be filtered. - - Returns: - bool: True if the document's title contains the specified - title, False otherwise. - """ - return self.title in document.title - - -class YearFilter(DocumentFilter): - """ - Filter documents by year. - - Attributes: - year (int): The year to filter by. - """ - def __init__(self, year: int): - self.year = year - - def filter_document(self, document: Document) -> bool: - """ - Filter documents by year. - - Args: - document (Document): The document to be filtered. - - Returns: - bool: True if the document's year matches the specified - year, False otherwise. - """ - return document.year == self.year - - -class DecadeFilter(DocumentFilter): - """ - Filter documents by decade. - - Attributes: - decade (int): The decade to filter by. - """ - def __init__(self, decade: int): - self.decade = decade - - def filter_document(self, document: Document) -> bool: - """ - Filter documents by decade. - - Args: - document (Document): The document to be filtered. - - Returns: - bool: True if the document's decade matches the - specified decade, False otherwise. - """ - return document.decade == self.decade - - -class KeywordsFilter(DocumentFilter): - """ - Filter documents and articles by keywords. - - Attributes: - keywords (List[str]): The list of keywords to filter by. - """ - def __init__(self, keywords: List[str]): - self.keywords = keywords - - def filter_document(self, document: Document) -> bool: - """ - Filter documents by keywords. - - Args: - document (Document): The document to be filtered. - - Returns: - bool: Always returns True. - """ - return True - - def filter_article(self, article: Article) -> bool: - """ - Filter articles by keywords. - - Args: - article (Article): The article to be filtered. - - Returns: - bool: True if the article's title or text contains any - of the specified keywords, False otherwise. - """ - return any(keyword in article.title or keyword in article.text for - keyword in self.keywords) - - -class CompoundFilter(DocumentFilter): - """ - Compound filter combining multiple filters. - - Attributes: - filters (List[DocumentFilter]): The list of filters to apply. - """ - def __init__(self, filters: List[DocumentFilter]): - self.filters = filters - - def filter_document(self, document: Document) -> bool: - """ - Filter documents by applying all filters. - - Args: - document (Document): The document to be filtered. - - Returns: - bool: True if the document passes all filters, - False otherwise. - """ - return all(filter_.filter_document(document) - for filter_ in self.filters) - - def filter_article(self, article: Article) -> bool: - """ - Filter articles by applying all filters. - - Args: - article (Article): The article to be filtered. - - Returns: - bool: True if the article passes all filters, - False otherwise. - """ - return all(filter_.filter_article(article) for filter_ in self.filters) - - def include_keyword_filter(self) -> bool: - """ - Check if the compound filter includes a KeywordsFilter. - - Returns: - bool: True if the compound filter includes a - KeywordsFilter, False otherwise. - """ - for filter_ in self.filters: - if isinstance(filter_, KeywordsFilter): - return True - return False diff --git a/interest/filter/input_file.py b/interest/filter/input_file.py deleted file mode 100644 index dcb7504..0000000 --- a/interest/filter/input_file.py +++ /dev/null @@ -1,119 +0,0 @@ -""" -Input File Module -This module provides an abstract class for representing various input files. -""" - -import abc -import gzip -import logging -from pathlib import Path -from typing import Iterable, TextIO, cast, Optional -from interest.filter.document import Document, Article -from interest.filter.document_filter import DocumentFilter - - -class InputFile(abc.ABC): - """ - Abstract class for representing various input files. - - Attributes: - _filepath (Path): The file path of the input file. - - Methods: - __init__(filepath): Initialize the InputFile with a file path. - filepath(): Get the file path of the input file. - base_file_name(): Output a list of documents in the input file. - open(mode, encoding): Open the input file for reading. - articles(): Return all articles for the document found in the - input file. - doc(): Output a list of documents in the input file. - """ - - def __init__(self, filepath: Path) -> None: - """ - Initialize the InputFile with a file path. - - Args: - filepath (Path): The file path of the input file. - """ - self._filepath = filepath - - @property - def filepath(self) -> Path: - """ - Get the file path of the input file. - - Returns: - Path: The file path of the input file. - """ - return self._filepath - - @abc.abstractmethod - def base_file_name(self) -> str: - """ - Output a list of documents in the input file. - - This can be a singleton list if an input file contains only - one document. - - Returns: - str: The base file name without extension. - """ - return NotImplemented - - def open(self, mode: str = "rt", encoding=None) -> TextIO: - """ - Open the input file for reading. - - Args: - mode (str): The file open mode. - encoding: The encoding format. - - Returns: - TextIO: A file object for reading the input file. - """ - if self._filepath.suffix.startswith(".gz"): - return cast(TextIO, gzip.open(self._filepath, mode=mode, - encoding=encoding)) - - # Default to text file - return cast(TextIO, open(self._filepath, - mode=mode, encoding=encoding)) - - # pylint: disable=no-member - def articles(self) -> Iterable[Article]: - """ - Return all articles for the document found in the input file. - - Yields: - Article: An article object. - """ - doc = self.doc() - if doc is not None: - yield from doc.articles - else: - logging.error("Document not found or is None for filepath: %s", - self.filepath) - return - - @abc.abstractmethod - def doc(self) -> Optional[Document]: - """ - Output a list of documents in the input file. - - This can be a singleton list if an input file contains only - one document. - - Returns: - Document: A document object. - """ - return NotImplemented - - def selected_articles(self, filter: DocumentFilter) -> Iterable[Article]: - document = self.doc() - if document is not None: - if filter.filter_document(document): - if document.articles is not None: - for article in document.articles: - if filter.filter_article(article): - yield article diff --git a/interest/models/base.py b/interest/models/base.py deleted file mode 100644 index 64832a1..0000000 --- a/interest/models/base.py +++ /dev/null @@ -1,20 +0,0 @@ -"""Base class for document embeddings.""" - -from abc import ABC, abstractmethod -from typing import Union, Sequence -import scipy -from numpy import typing as npt -import numpy as np - - -class BaseEmbedder(ABC): - """Base class for creating document embeddings.""" - - @abstractmethod - def fit(self, documents: Sequence[str]) -> None: - """Train the model on documents.""" - - @abstractmethod - def transform(self, documents: Union[str, Sequence[str]]) -> ( - Union)[scipy.sparse.spmatrix, npt.NDArray[np.float_]]: - """Get the embedding for a document.""" diff --git a/interest/models/tfidf.py b/interest/models/tfidf.py deleted file mode 100644 index c443843..0000000 --- a/interest/models/tfidf.py +++ /dev/null @@ -1,101 +0,0 @@ -"""Sklearn TF-IDF class.""" - -from typing import Sequence, Union, Optional -import warnings - -import scipy -from sklearn.feature_extraction.text import TfidfVectorizer - -from interest.models.base import BaseEmbedder -from interest.utils import load_spacy_model -from interest.settings import SPACY_MODEL - - -class TfidfEmbedder(BaseEmbedder): - # pylint: disable=too-many-instance-attributes - """ - Sklearn TF-IDF class. - - Arguments - --------- - ngram_max: - Maximum n-gram, higher numbers mean bigger embeddings. - norm: - Which kind of normalization is used: "l1", "l2" or None. - sublinear_tf: - Apply sublinear term-frequency scaling. - min_df: - Minimum document frequency of word to be included in the embedding. - max_df: - Maximum document frequency of word to be included in the embedding. - """ - - # pylint: disable=too-many-arguments - - def __init__( - self, ngram_max: int = 1, norm: Optional[str] = "l1", - sublinear_tf: bool = False, min_df: int = 1, - max_df: float = 1.0, spacy_model=SPACY_MODEL): - """Initialize the TF-IDF embedder.""" - self.nlp = ( - load_spacy_model(spacy_model) - if isinstance(spacy_model, str) - else spacy_model - ) - self.stopword_list = self.nlp.Defaults.stop_words - self.stop_words = list(self.stopword_list) - self.ngram_max = ngram_max - - self.norm = norm - self.sublinear_tf = sublinear_tf - self.min_df = min_df - self.max_df = max_df - if self.norm == "None": - self.norm = None - - self._model: Optional[TfidfVectorizer] = None - - def fit(self, documents: Sequence[str]) -> None: - """ - Fit the TF-IDF model on the given documents. - - Args: - documents (Sequence[str]): A sequence of document strings. - """ - min_df = min(self.min_df, len(documents)) - max_df = max(min_df/len(documents), self.max_df) - - def _tokenizer(text): - doc = self.nlp(text) - tokens = [token.lemma_.lower() for token in doc - if not token.is_stop and not token.is_punct] - return tokens - - with warnings.catch_warnings(): - warnings.filterwarnings("ignore") - self._model = TfidfVectorizer( - ngram_range=(1, self.ngram_max), - stop_words=self.stop_words, - tokenizer=_tokenizer, # self.stem_tokenizer, - min_df=min_df, - norm=self.norm, - sublinear_tf=self.sublinear_tf, - max_df=max_df) - self._model.fit(documents) - - def transform(self, documents: Union[str, Sequence[str]]) -> Union[ - scipy.sparse.spmatrix]: - """ - Transform the input documents into TF-IDF embeddings. - - Args: - documents (Union[str, Sequence[str]]): A single document string or - a sequence of document strings. - - Returns: - Union[scipy.sparse.spmatrix]: The TF-IDF embeddings of the input - documents. - """ - if self._model is None: - raise ValueError("Fit TF-IDF model before transforming data.") - return self._model.transform(documents).tocsr() diff --git a/interest/output_generator/text_formater.py b/interest/output_generator/text_formater.py deleted file mode 100644 index 93bb85b..0000000 --- a/interest/output_generator/text_formater.py +++ /dev/null @@ -1,117 +0,0 @@ -""" This module defines a TextFormatter class for formatting text based on -specified output units. """ -from typing import List, Union -import logging -from interest.settings import SPACY_MODEL -from interest.utils import load_spacy_model - -PARAGRAPH_FORMATTER = 'paragraph' -FULLTEXT_FORMATTER = 'full_text' -SEGMENTED_TEXT_FORMATTER = 'segmented_text' - - -class TextFormatter: - # pylint: disable=R0903 - """Class for formatting text based on specified output units. """ - - def __init__(self, output_unit: str, sentences_per_segment: int, - spacy_model=SPACY_MODEL): # : Union[str, Language] - """ - Initializes the TextFormatter object. - - Args: - output_unit (str): The type of output unit ('paragraph', - 'full_text', 'segmented_text'). - sentences_per_segment (int): Number of sentences per - segment when output_unit is 'segmented_text'. - spacy_model (Union[str, Language], optional): Spacy model - or model name used for text processing. Defaults to the global - SPACY_MODEL value. - """ - self.nlp = ( - load_spacy_model(spacy_model) - if isinstance(spacy_model, str) - else spacy_model - ) - self.sentences_per_segment = sentences_per_segment - self.formatter = output_unit - self.is_fulltext = self._is_fulltext() - self.texts: List[str] = [] - - def format_output(self, texts: Union[None, List[str]]) -> ( - Union)[str, List[str], List[List[str]], None]: - """ - Formats input texts based on the specified output unit. - - Args: - texts (List[str]): List of input texts to be formatted. - - Returns: - Union[str, List[str], List[List[str]]]: Formatted output text - based on the selected output_unit. For 'full_text', returns a - single string. For 'paragraph' and 'segmented_text', returns a - list of segmented text lists. - - Raises: - ValueError: If input 'texts' is not a list of strings. - ValueError: If an unsupported formatter type is specified. - """ - try: - if (not isinstance(texts, list) or (texts is None) or - not all(isinstance(text, str) for text in texts)): - raise ValueError("Input 'texts' must be a list of strings.") - - self.texts = texts - - if self.formatter == PARAGRAPH_FORMATTER: - return self._format_paragraph() - if self.formatter == FULLTEXT_FORMATTER: - return self._format_fulltext() - if self.formatter == SEGMENTED_TEXT_FORMATTER: - return self._format_segmented_text() - - except ValueError as e: - logging.error("Unsupported formatter %s: %s", self.formatter, e) - return None - return None - - def _format_paragraph(self) -> List[str]: - """Formats texts as a single paragraph. - - Returns: - List[List[str]]: List of input texts, segmented in paragraphs. - """ - return self.texts - - def _format_fulltext(self) -> str: - """Formats texts as full text with newline separators. - - Returns: - str: Newline-separated string of input texts. - """ - return '\n'.join(self.texts) - - def _format_segmented_text(self) -> List[List[str]]: - """Formats texts as segmented text based on sentences_per_segment. - - Returns: - List[List[str]]: Flattened list of segmented text strings. - """ - segmented_texts = [] - for text in self.texts: - doc = self.nlp(text) - sentences = [sent.text for sent in doc.sents] - - for i in range(0, len(sentences), self.sentences_per_segment): - segment = sentences[i:i + self.sentences_per_segment] - segmented_texts.append(segment) - - return segmented_texts - - def _is_fulltext(self) -> bool: - """Checks if the formatter type is 'full_text'. - - Returns: - bool: True if formatter is 'full_text', False otherwise. - """ - return self.formatter == FULLTEXT_FORMATTER diff --git a/interest/preprocessor/__init__.py b/interest/preprocessor/__init__.py deleted file mode 100644 index 3cec932..0000000 --- a/interest/preprocessor/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# from interest.preprocessor.parser import XMLExtractor diff --git a/interest/preprocessor/parser.py b/interest/preprocessor/parser.py deleted file mode 100644 index efb7dd3..0000000 --- a/interest/preprocessor/parser.py +++ /dev/null @@ -1,207 +0,0 @@ - -import os -import tarfile -import gzip -import json -import xml.etree.ElementTree as ET -from typing import Dict, Union, Any, Optional, List -import logging - - -class XMLExtractor: - """Class for extracting XML content and metadata from nested .tgz files.""" # noqa: E501 - def __init__(self, root_dir: str, output_dir: str): - """ - Initializes the XMLExtractor object. - - Parameters: - root_dir (str): The root directory containing .tgz files. - output_dir (str): The output directory for saving extracted JSON files. # noqa: E501 - """ - self.root_dir = root_dir - self.output_dir = output_dir - self.fields = [ - "title", "language", "issuenumber", "date", "identifier", - "temporal", "recordRights", "publisher", "spatial", "source", - "recordIdentifier", "type", "isPartOf" - ] - - def extract_xml_string(self) -> None: - """ - Extracts XML content and metadata from .tgz files in the root directory. # noqa: E501 - """ - for folder_name in os.listdir(self.root_dir): - folder_path = os.path.join(self.root_dir, folder_name) - if not os.path.isdir(folder_path): - continue - if not folder_name.isdigit(): # Exclude in_progress, manifests, and ocr_complete folders and log files. # noqa: E501 - continue - self.process_folder(folder_name, folder_path) - - def process_folder(self, folder_name: str, folder_path: str) -> None: - """ - Processes .tgz files within a folder. - - Parameters: - folder_name (str): Name of the folder being processed. - folder_path (str): Path to the folder being processed. - """ - for tgz_filename in os.listdir(folder_path): - if not tgz_filename.endswith('.tgz'): - continue - tgz_file_path = os.path.join(folder_path, tgz_filename) - base_name = os.path.splitext(tgz_filename)[0] - output_folder = os.path.join(self.output_dir, folder_name) - os.makedirs(output_folder, exist_ok=True) - try: - with tarfile.open(tgz_file_path, "r:gz") as outer_tar: - news_dict = self.process_tar(outer_tar) - except tarfile.TarError as e: - logging.error(f"Error extracting {tgz_filename}: {e}") - continue - output_file = os.path.join(output_folder, f"{base_name}.json.gz") - self.save_as_json_compressed(news_dict, output_file) - # self.save_as_json(news_dict, output_file) - - def process_tar(self, outer_tar: tarfile.TarFile) -> Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]]: # noqa: E501 - """ - Processes a .tgz file and extracts XML content and metadata. - - Parameters: - outer_tar (tarfile.TarFile): The .tgz file being processed. - - Returns: - Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]]: A dictionary containing extracted content and metadata. # noqa: E501 - """ - news_dict: Dict[str, Any] = {"newsletter_metadata": {}, "articles": {}} - id = 0 - for entry in outer_tar: - try: - if entry.name.endswith(".xml"): - file = outer_tar.extractfile(entry) - if file is not None: - content = file.read() - xml_content = content.decode('utf-8', 'ignore') - article = self.extract_article(xml_content, entry.name) - id += 1 - news_dict["articles"][id] = article - - elif entry.name.endswith(".gz"): - gz_member = next(member for member in outer_tar.getmembers() if member.name.endswith('.gz')) # noqa: E501 - with outer_tar.extractfile(gz_member) as gz_file: # type: ignore # noqa: E501 - with gzip.open(gz_file, 'rt') as xml_file: - xml_string = xml_file.read() - if isinstance(xml_string, bytes): - xml_string = xml_string.decode('utf-8') - newsletter_metadata = self.extract_meta(xml_string) - news_dict["newsletter_metadata"] = newsletter_metadata # noqa: E501 - else: - continue - except Exception as e: - logging.error(f"Error processing file {entry.name}: {e}") - return news_dict - - @staticmethod - def save_as_json_compressed(data: Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]], output_file: str) -> None: # noqa: E501 - """ - Saves data as compressed JSON using gzip. - - Parameters: - data (Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]]): Data to be saved as JSON. # noqa: E501 - output_file (str): Path to the output JSON file. - """ - try: - with gzip.open(output_file, 'wt') as json_file: - json.dump(data, json_file, indent=4) - except Exception as e: - logging.error(f"Error saving compressed JSON to {output_file}: {e}") # noqa: E501 - - # @staticmethod - # def save_as_json(data: Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]], output_file: str) -> None: # noqa: E501 - # """ - # Saves data as JSON to a specified file. - - # Parameters: - # data (Dict[str, Union[Dict[str, str], Dict[int, Dict[str, str]]]]): Data to be saved as JSON. # noqa: E501 - # output_file (str): Path to the output JSON file. - # """ - # try: - # with open(output_file, 'w') as json_file: - # json.dump(data, json_file, indent=4) - # except Exception as e: - # logging.error(f"Error saving JSON to {output_file}: {e}") - - @staticmethod - def extract_article(xml_content: str, file_name: str) -> Dict[str, Union[str, List[Optional[str]]]]: # noqa: E501 - """ - Extracts article title and body from XML content. - - Parameters: - xml_content (str): XML content of the article. - file_name (str): Name of the XML file. - - Returns: - Dict[Optional[str], list[str]]: A dictionary containing the extracted title and body of the article. - body contains a list of paragraphs. # noqa: E501 - """ - try: - root = ET.fromstring(xml_content) - except ET.ParseError: - logging.error(f"Failed to parse XML from file: {file_name}") - return {} - - title_values = [element.text for element in root.iter() if element.tag.endswith('title')] # noqa: E501 - if len(title_values) > 1: - logging.warning("More than one titles are extracted for the article.") # noqa: E501 - if not title_values: - logging.warning("No title is extracted for the article.") - title = "" - else: - title = title_values[0] if title_values[0] is not None else "" - # title = title_values[0] - - body_values = [element.text for element in root.iter() if element.tag.endswith('p')] # noqa: E501 - if not body_values: - logging.warning("No body is extracted.") - body = [] - # elif len(body_values) > 1: - # logging.warning("There are more than one paragraphs in the article.") # noqa: E501 - # body = ' '.join(body_values) - else: - # body = body_values[0] - body = body_values - - return {"title": title, "body": body} - - def extract_meta(self, xml_string: str) -> Dict[str, Union[str, None]]: - """ - Extracts metadata from XML string. - - Parameters: - xml_string (str): XML string containing metadata. - - Returns: - Dict[str, Union[str, None]]: A dictionary containing the extracted metadata. # noqa: E501 - """ - newsletter_metadata: Dict[str, Union[str, None]] = {} - - try: - root = ET.fromstring(xml_string) - except ET.ParseError: - logging.error("Failed to parse XML from file") - return newsletter_metadata - - for field in self.fields: - field_values = [element.text for element in root.iter() if element.tag.endswith(field)] # noqa: E501 - if len(field_values) > 1: - logging.warning(f"More than one {field}s are extracted from metadata.") # noqa: E501 - if not field_values: - logging.warning(f"No {field} is extracted.") - newsletter_metadata[field] = None - else: - filtered_field_values = [value for value in field_values if value is not None] # noqa: E501 - newsletter_metadata[field] = filtered_field_values[0] if field != "spatial" else ", ".join(filtered_field_values) # noqa: E501 - - # newsletter_metadata[field] = field_values[0] if field != "spatial" else ", ".join(field_values) # noqa: E501 - - return newsletter_metadata diff --git a/interest/preprocessor/text_cleaner.py b/interest/preprocessor/text_cleaner.py deleted file mode 100644 index ca96945..0000000 --- a/interest/preprocessor/text_cleaner.py +++ /dev/null @@ -1,129 +0,0 @@ -""" -This module provides a TextCleaner class for preprocessing text -data using various cleaning techniques. -""" -import re -from typing import Union, List -from interest.settings import SPACY_MODEL -from interest.utils import load_spacy_model - - -def merge_texts_list(text: Union[str, List[str]]) -> str: - """ - Merge a list of texts into a single string by joining them with spaces. - - Args: - text (Union[str, List[str]]): The input text or list of texts to merge. - - Returns: - str: The merged text if input is a list of strings, otherwise returns - the input text unchanged. - """ - if isinstance(text, list): - merged_text = ' '.join(text) - return merged_text - return text - - -class TextCleaner: - """A class for cleaning text data using various preprocessing - techniques.""" - - def __init__(self, spacy_model=SPACY_MODEL) -> None: - """Initialize the TextCleaner instance. - - Args: - spacy_model (str or spacy.Language, optional): The SpaCy - model to use for text processing. - Defaults to the model specified in the settings. - """ - - self.nlp = ( - load_spacy_model(spacy_model) - if isinstance(spacy_model, str) - else spacy_model - ) - self.stopword_list = self.nlp.Defaults.stop_words - self.stopwords = set(self.stopword_list) - self.text = "" - - def get_lower_lemma_tokens(self) -> None: - """ - Get lowercased lemmatized tokens from the text. - - This method processes the text stored in the instance variable - `self.text`,tokenizes it using the SpaCy pipeline `self.nlp`, - and then lemmatizes each token, converting it to lowercase. - Stop words and punctuation tokens are filtered out. - """ - doc = self.nlp(self.text) - self.text = " ".join([token.lemma_.lower() for token in doc - if not token.is_stop and not token.is_punct]) - - def get_words(self): - """Tokenize words in the text.""" - doc = self.nlp(self.text) - self.text = " ".join([token.text for token in doc]) - - def lower(self): - """Transform the text to lower case.""" - self.text = self.text.lower() - - def remove_stopwords(self): - """Remove the stopwords from the text.""" - doc = self.nlp(self.text) - self.text = " ".join([token.text for token in doc if token.text - not in self.stopwords]) - - def remove_numeric(self): - """Remove numbers from the text.""" - self.text = re.sub(r'\d+', '', self.text) - - def remove_non_ascii(self): - """Remove non ASCII characters from the text.""" - self.text = re.sub(r'[^\x00-\x7f]', '', self.text) - - def remove_extra_whitespace_tabs(self): - """Remove extra whitespaces and tabs from the text.""" - self.text = re.sub(r'\s+', ' ', self.text) - - def remove_one_char(self): - """Remove single characters from the text.""" - self.text = " ".join([w for w in self.text.split() if len(w) > 1]) - - def keep_standard_chars(self): - """Keep only standard characters in the text.""" - self.text = re.sub(r'[^-0-9\w,. ?!()%/]', '', self.text) - - def preprocess(self, text): - """Preprocess the given text using a series of cleaning steps. - - Args: - text ( List[str]): The text to preprocess. - - Returns: - str: The preprocessed text. - """ - self.text = merge_texts_list(text) - self.get_lower_lemma_tokens() - self.remove_numeric() - self.remove_extra_whitespace_tabs() - self.remove_one_char() - return self.text - - def clean(self, text): - """Clean the given text by removing non-standard characters and - extra whitespace. - - Args: - text (str): The text to clean. - - Returns: - str: The cleaned text. - """ - self.text = merge_texts_list(text) - self.text = text - self.get_words() - self.keep_standard_chars() - self.remove_extra_whitespace_tabs() - return self.text diff --git a/interest/settings.py b/interest/settings.py deleted file mode 100644 index 391f9b4..0000000 --- a/interest/settings.py +++ /dev/null @@ -1,10 +0,0 @@ -""" -Module containing configuration settings for the project. -""" -import os - -SPACY_MODEL = os.getenv("SPACY_MODEL", "nl_core_news_sm") -"""Spacy model to use for sentence splitting.""" - -ENCODING = os.getenv("ENCODING", "utf-8") -"""Encoding used for reading and writing files.""" diff --git a/interest/temporal_categorization/__init__.py b/interest/temporal_categorization/__init__.py deleted file mode 100644 index 60ec91d..0000000 --- a/interest/temporal_categorization/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -"""Mapping from string format descriptions to corresponding classes.""" -from interest.temporal_categorization.timestamped_data \ - import (YearPeriodData, DecadePeriodData) - -PERIOD_TYPES = { - "decade": DecadePeriodData, - "year": YearPeriodData -} diff --git a/interest/temporal_categorization/timestamped_data.py b/interest/temporal_categorization/timestamped_data.py deleted file mode 100644 index 7a75488..0000000 --- a/interest/temporal_categorization/timestamped_data.py +++ /dev/null @@ -1,123 +0,0 @@ -""" -This module provides classes and utilities for working with data -that includes timestamps. -""" -import json -from datetime import datetime -from pathlib import Path - - -class TimestampedData: - """ - Represents data with a timestamp. - - Attributes: - DATE_FIELD (str): The field name for the timestamp in the data. - _filename (Path): The path to the file containing the data. - _data (dict): The loaded JSON data. - _timestamp (datetime): The timestamp extracted from the data. - - Methods: - __init__(self, filename): Initializes the TimestampedData object. - filename(self) -> Path: Returns the filename path. - _load_data(self): Loads data from the file. - _get_timestamp(self): Extracts the timestamp from the data. - categorize(self): Abstract method for categorizing data by timestamp. - """ - - DATE_FIELD = "Date" - - def __init__(self, filename: Path): - """ - Initializes the TimestampedData object. - - Args: - filename (Path): The path to the file containing the data. - """ - self._filename = filename - self._data = self._load_data() - self._timestamp = self._get_timestamp() - - @property - def filename(self) -> Path: - """ - Returns the filename path. - - Returns: - Path: The filename path. - """ - return self._filename - - def _load_data(self): - """ - Loads data from the file. - - Returns: - dict: The loaded JSON data. - """ - with open(self._filename, 'r', encoding='utf-8') as file: - return json.load(file) - - def data(self): - """ - Returns the json data - - Returns: - dict: The loaded JSON data. - """ - return self._data - - def _get_timestamp(self): - """ - Extracts the timestamp from the data. - - Returns: - datetime: The extracted timestamp. - """ - return datetime.strptime(self._data[self.DATE_FIELD], '%Y-%m-%d') - - def categorize(self): - """ - Abstract method for categorizing data by timestamp. - - Raises: - NotImplementedError: Subclasses must implement categorize method. - """ - raise NotImplementedError("Subclass must implement categorize method") - - -class YearPeriodData(TimestampedData): - """ - Represents data categorized by year. - - Methods: - categorize(self): Categorizes data by year. - """ - - def categorize(self): - """ - Categorizes data by year. - - Returns: - int: The year of the timestamp. - """ - return self._timestamp.year - - -class DecadePeriodData(TimestampedData): - """ - Represents data categorized by decade. - - Methods: - categorize(self): Categorizes data by decade. - """ - - def categorize(self): - """ - Categorizes data by decade. - - Returns: - int: The decade of the timestamp. - """ - year = self._timestamp.year - return (year // 10) * 10 diff --git a/interest/utils.py b/interest/utils.py deleted file mode 100644 index c601162..0000000 --- a/interest/utils.py +++ /dev/null @@ -1,174 +0,0 @@ -""" -Module containing utility functions for the project. -""" -import os -from pathlib import Path -from typing import List, Dict, Any, Optional -from functools import cache -import json -import spacy -import spacy.cli -from interest.filter.document_filter import (YearFilter, - TitleFilter, - DocumentFilter) -from interest.filter.document_filter import (CompoundFilter, - DecadeFilter, - KeywordsFilter) -from interest.settings import ENCODING - - -@cache -def load_spacy_model(model_name: str, retry: bool = True) \ - -> Optional[spacy.Language]: - """Load and store a sentencize-only SpaCy model - - Downloads the model if necessary. - - Args: - model_name (str): The name of the SpaCy model to load. - retry (bool, optional): Whether to retry downloading the model - if loading fails initially. Defaults to True. - - Returns: - spacy.Language: The SpaCy model object for the given name. - """ - - try: - nlp = spacy.load(model_name, disable=["tagger", "parser", "ner"]) - except OSError as exc: - if retry: - spacy.cli.download(model_name) - return load_spacy_model(model_name, False) - raise exc - nlp.add_pipe("sentencizer") - return nlp - - -def load_filters_from_config(config_file: Path) -> CompoundFilter: - """Load document filters from a configuration file. - - Args: - config_file (Path): Path to the configuration file containing - filter settings. - - Returns: - CompoundFilter: A compound filter containing individual document - filters loaded from the configuration. - """ - with open(config_file, 'r', encoding=ENCODING) as f: - config: Dict[str, List[Dict[str, Any]]] = json.load(f) - - filters: List[DocumentFilter] = [] - for filter_config in config['filters']: - filter_type = filter_config['type'] - if filter_type == 'TitleFilter': - filters.append(TitleFilter(filter_config['title'])) - elif filter_type == 'YearFilter': - filters.append(YearFilter(filter_config['year'])) - elif filter_type == 'DecadeFilter': - filters.append(DecadeFilter(filter_config['decade'])) - elif filter_type == 'KeywordsFilter': - filters.append(KeywordsFilter(filter_config['keywords'])) - - return CompoundFilter(filters) - - -def get_keywords_from_config(config_file: Path) -> List[str]: - """ - Extract keywords from a JSON configuration file. - - Args: - config_file (Path): The path to the JSON configuration file. - - Returns: - List[str]: The list of keywords extracted from the configuration - file. - - Raises: - FileNotFoundError: If the config file is not found or cannot be - opened. - KeyError: If the required keys are not found in the configuration - file. - TypeError: If the data in the configuration file is not in the - expected format. - """ - try: - with open(config_file, 'r', encoding=ENCODING) as f: - config: Dict[str, List[Dict[str, Any]]] = json.load(f) - - for filter_config in config['filters']: - filter_type = filter_config['type'] - if filter_type == 'KeywordsFilter': - return filter_config['keywords'] - return [] - except FileNotFoundError as exc: - raise FileNotFoundError("Config file not found") from exc - except KeyError as exc: - raise KeyError("Keywords not found in config file") from exc - - -def read_config(config_file: Path, item_key: str) -> Dict[str, str]: - """ - Get the value of the given key item from a JSON file. - - Args: - config_file (Path): The path to the JSON config file. - item_key (str): Key item defined in config file. - Returns: - Dict[str, str]: The article selector configuration. - - Raises: - KeyError: If the key item is not found in the config file. - FileNotFoundError: If the config file is not found. - """ - try: - with open(config_file, 'r', encoding=ENCODING) as f: - config: Dict[str, str] = json.load(f)[item_key] - if not config: - raise ValueError("Config is empty") - return config - except FileNotFoundError as exc: - raise FileNotFoundError("Config file not found") from exc - except KeyError as exc: - raise KeyError("Key item %s not found in config file") from exc - - -def save_filtered_articles(input_file: Any, article_id: str, - output_dir: str) -> None: - """Save filtered articles data to a JSON file. - - Args: - input_file: The input file object. - article_id (str): The ID of the article. - output_dir (str): The directory where the JSON file will be saved. - - Returns: - None - """ - data = { - "file_path": str(input_file.filepath), - "article_id": str(article_id), - "Date": str(input_file.doc().publish_date), - "Title": input_file.doc().title, - } - - output_fp = os.path.join(output_dir, input_file.base_file_name() + '.json') - print('output_fp', output_fp) - with open(output_fp, "w", encoding=ENCODING) as json_file: - json.dump(data, json_file, indent=4) - - -def get_file_name_without_extension(full_path: str) -> str: - """ - Extracts the file name without extension from a full path. - - Args: - full_path (str): The full path of the file. - - Returns: - str: The file name without extension. - - """ - base_name = os.path.basename(full_path) - file_name_without_ext = os.path.splitext(base_name)[0] - return file_name_without_ext From 76b70b7e30fae4c3f586960da5bc302d13581ed5 Mon Sep 17 00:00:00 2001 From: parisa-zahedi Date: Tue, 25 Jun 2024 16:15:29 +0200 Subject: [PATCH 3/4] modify project name --- README.md | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 0116b7c..a949991 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# INTEREST +# dataQuest The code in this repository implements a pipeline to extract specific articles from a large corpus. @@ -10,7 +10,7 @@ Articles can be filtered based on individual or multiple features such as title, ## Getting Started Clone this repository to your working station to obtain examples and python scripts: ``` -git clone https://github.com/UtrechtUniversity/historical-news-sentiment.git +git clone https://github.com/UtrechtUniversity/dataQuest.git ``` ### Prerequisites @@ -20,10 +20,10 @@ To install and run this project you need to have the following prerequisites ins ``` ### Installation -#### Option 1 - Install interest package -To run the project, ensure to install the interest package that is part of this project. +#### Option 1 - Install dataQuest package +To run the project, ensure to install the dataQuest package that is part of this project. ``` -pip install interest +pip install dataQuest ``` #### Option 2 - Run from source code If you want to run the scripts without installation you need to: @@ -42,7 +42,7 @@ pip install . On Linux and Mac OS, you might have to set the PYTHONPATH environment variable to point to this directory. ```commandline -export PYTHONPATH="current working directory/historical-news-sentiment:${PYTHONPATH}" +export PYTHONPATH="current working directory/dataQuest:${PYTHONPATH}" ``` ### Built with These packages are automatically installed in the step above: @@ -85,7 +85,7 @@ Below is a snapshot of the JSON file format: In our use case, the harvested KB data is in XML format. We have provided the following script to transform the original data into the expected format. ``` -from interest.preprocessor.parser import XMLExtractor +from dataQuest.preprocessor.parser import XMLExtractor extractor = XMLExtractor(Path(input_dir), Path(output_dir)) extractor.extract_xml_string() @@ -99,9 +99,9 @@ python3 convert_input_files.py --input_dir path/to/raw/xml/data --output_dir pat In order to define a corpus with a new data format you should: -- add a new input_file_type to [INPUT_FILE_TYPES](https://github.com/UtrechtUniversity/historical-news-sentiment/blob/main/interest/filter/__init__.py) -- implement a class that inherits from [input_file.py](https://github.com/UtrechtUniversity/historical-news-sentiment/blob/main/interest/filter/input_file.py). -This class is customized to read a new data format. In our case-study we defined [delpher_kranten.py](https://github.com/UtrechtUniversity/historical-news-sentiment/blob/main/interest/filter/delpher_kranten.py). +- add a new input_file_type to [INPUT_FILE_TYPES](https://github.com/UtrechtUniversity/dataQuest/blob/main/dataQuest/filter/__init__.py) +- implement a class that inherits from [input_file.py](https://github.com/UtrechtUniversity/dataQuest/blob/main/dataQuest/filter/input_file.py). +This class is customized to read a new data format. In our case-study we defined [delpher_kranten.py](https://github.com/UtrechtUniversity/dataQuest/blob/main/dataQuest/filter/delpher_kranten.py). ### 2. Filtering @@ -144,7 +144,7 @@ The output of this script is a JSON file for each selected article in the follow } ``` ### 3. Categorization by timestamp -The output files generated in the previous step are categorized based on a specified [period-type](https://github.com/UtrechtUniversity/historical-news-sentiment/blob/main/interest/temporal_categorization/__init__.py), +The output files generated in the previous step are categorized based on a specified [period-type](https://github.com/UtrechtUniversity/dataQuest/blob/main/dataQuest/temporal_categorization/__init__.py), such as ```year``` or ```decade```. This categorization is essential for subsequent steps, especially if you intend to apply tf-idf or other models to specific periods. In our case, we applied tf-idf per decade. ```commandline @@ -159,7 +159,7 @@ By utilizing tf-idf, the most relevant articles related to the specified topic ( Before applying tf-idf, articles containing any of the specified keywords in their title are selected. -From the rest of articles, to choose the most relevant ones, you can specify one of the following criteria in [config.py](https://github.com/UtrechtUniversity/historical-news-sentiment/blob/main/config.json): +From the rest of articles, to choose the most relevant ones, you can specify one of the following criteria in [config.py](https://github.com/UtrechtUniversity/dataQuest/blob/main/config.json): - Percentage of selected articles with the top scores - Maximum number of selected articles with the top scores @@ -192,12 +192,12 @@ From the rest of articles, to choose the most relevant ones, you can specify one The following script, add a new column, ```selected``` to the .csv files from the previous step. ```commandline -python3 scripts/3_select_final_articles.py --input_dir "output/output_timestamped/" +python3 scripts/step3_select_final_articles.py --input-dir "output/output_timestamped/" ``` ### 5. Generate output As the final step of the pipeline, the text of the selected articles is saved in a .csv file, which can be used for manual labeling. The user has the option to choose whether the text should be divided into paragraphs or a segmentation of the text. -This feature can be set in [config.py](https://github.com/UtrechtUniversity/historical-news-sentiment/blob/main/config.json). +This feature can be set in [config.py](https://github.com/UtrechtUniversity/dataQuest/blob/main/config.json). ```commandline "output_unit": "paragraph" @@ -211,7 +211,7 @@ OR ``` ```commandline -python3 scripts/step4_generate_output.py --input_dir "output/output_timestamped/” --output-dir “output/output_results/“ --glob “*.csv” +python3 scripts/step4_generate_output.py --input-dir "output/output_timestamped/” --output-dir “output/output_results/“ --glob “*.csv” ``` ## About the Project **Date**: February 2024 @@ -248,5 +248,5 @@ To contribute: Pim Huijnen - p.huijnen@uu.nl -Project Link: [https://github.com/UtrechtUniversity/historical-news-sentiment](https://github.com/UtrechtUniversity/historical-news-sentiment) +Project Link: [https://github.com/UtrechtUniversity/dataQuest](https://github.com/UtrechtUniversity/dataQuest) From 01d74591750213915b0a348547b3e5805b5e6b7d Mon Sep 17 00:00:00 2001 From: parisa-zahedi Date: Tue, 25 Jun 2024 16:19:22 +0200 Subject: [PATCH 4/4] change project name --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6878f35..23e1c3d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ requires = ["setuptools", "setuptools-scm"] build-backend = "setuptools.build_meta" [project] -name = "interest" +name = "dataQuest" description = "A package to extract hystorical news sentiments" authors = [ {name = "Shiva Nadi", email = "s.nadi@uu.nl"}, @@ -31,7 +31,7 @@ lint = ["flake8"] test = ["pytest", "mypy"] [tool.setuptools] -packages = ["interest"] +packages = ["dataQuest"] [tool.flake8] max-line-length = 99