Skip to content

Commit

Permalink
update-search
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelsty committed Sep 10, 2024
1 parent 8d8cef4 commit af848fc
Show file tree
Hide file tree
Showing 18 changed files with 360 additions and 59 deletions.
3 changes: 3 additions & 0 deletions ducksearch/delete/delete/documents_queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DELETE FROM {schema}.documents_queries
USING parquet_scan('{parquet_file}') AS _df_documents
WHERE {schema}.documents_queries.document_id = _df_documents.id;
3 changes: 0 additions & 3 deletions ducksearch/delete/delete/index.sql

This file was deleted.

76 changes: 76 additions & 0 deletions ducksearch/delete/delete/scores.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
-- This query finds the set of tokens scores for which there won't be any docid / score to keep.
WITH _docs_to_delete AS (
SELECT DISTINCT
bm25.docid
FROM parquet_scan('{parquet_file}') p
INNER JOIN bm25_documents.docs bm25
ON p.id = bm25.name
),

_terms_to_recompute AS (
SELECT DISTINCT
term
FROM bm25_documents.terms
INNER JOIN _docs_to_delete
ON bm25_documents.terms.docid = _docs_to_delete.docid
INNER JOIN bm25_documents.dict
ON bm25_documents.terms.termid = bm25_documents.dict.termid
),

_scores_to_update AS (
SELECT
_bm25.term,
_bm25.list_scores,
_bm25.list_docids
FROM bm25_documents.scores _bm25
INNER JOIN _terms_to_recompute _terms
ON _bm25.term = _terms.term
),

_unested_scores AS (
SELECT
term,
UNNEST(list_scores) AS score,
UNNEST(list_docids) AS docid
FROM _scores_to_update
),

_unested_unfiltered_scores AS (
SELECT
_scores.term,
_scores.docid,
_scores.score,
_docs.docid AS to_delete
FROM _unested_scores _scores
LEFT JOIN _docs_to_delete _docs
ON _scores.docid = _docs.docid
),

_unested_filtered_scores AS (
SELECT
term,
docid,
score
FROM _unested_unfiltered_scores
WHERE to_delete IS NULL
),

_terms_to_delete AS (
SELECT DISTINCT
ttr.term,
ufs.term AS missing
FROM _terms_to_recompute ttr
LEFT JOIN _unested_filtered_scores ufs
ON ttr.term = ufs.term
),

_scores_to_delete_completely AS (
SELECT DISTINCT
term,
FROM _terms_to_delete
WHERE missing IS NULL
)

DELETE FROM bm25_documents.scores as _scores
USING _scores_to_delete_completely as _scores_to_delete
WHERE _scores.term = _scores_to_delete.term;
71 changes: 68 additions & 3 deletions ducksearch/delete/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,41 @@ def _drop_documents() -> None:
"""Delete documents from the documents table in DuckDB."""


@execute_with_duckdb(
relative_path="delete/update/scores.sql",
)
def _update_score() -> None:
"""Update the score after deleting documents."""


@execute_with_duckdb(
relative_path="delete/delete/scores.sql",
)
def _delete_score() -> None:
"""Delete the scores for which we don't keep any document."""


@execute_with_duckdb(
relative_path="delete/update/docs.sql",
)
def _update_docs() -> None:
"""Update the docs table."""


@execute_with_duckdb(
relative_path="delete/update/terms.sql",
)
def _update_terms() -> None:
"""Update the term table."""


@execute_with_duckdb(
relative_path="delete/update/stats.sql",
)
def _update_stats() -> None:
"""Update the term table."""


def documents(
database: str,
ids: list[str],
Expand Down Expand Up @@ -61,12 +96,13 @@ def documents(
>>> delete.documents(
... database="test.duckdb",
... ids=[1, 2],
... ids=[1, 2, 3],
... )
| Table | Size |
|----------------|------|
| documents | 1 |
| bm25_documents | 3 |
| bm25_documents | 1 |
"""
# Convert the list of document keys into a pyarrow Table for deletion
Expand All @@ -79,7 +115,36 @@ def documents(
compression="snappy",
)

# Call the SQL function to delete the documents
_delete_score(
database=database,
parquet_file="_documents_ids.parquet",
config=config,
)

_update_score(
database=database,
parquet_file="_documents_ids.parquet",
config=config,
)

_update_terms(
database=database,
parquet_file="_documents_ids.parquet",
config=config,
)

_update_docs(
database=database,
parquet_file="_documents_ids.parquet",
config=config,
)

_update_stats(
database=database,
parquet_file="_documents_ids.parquet",
config=config,
)

_drop_documents(
database=database,
schema=schema,
Expand Down
3 changes: 3 additions & 0 deletions ducksearch/delete/update/docs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DELETE FROM bm25_documents.docs as _docs
USING parquet_scan('{parquet_file}') AS _df_documents
WHERE _docs.name = _df_documents.id;
72 changes: 72 additions & 0 deletions ducksearch/delete/update/scores.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
-- This query finds the set of tokens scores for which there won't be any docid / score to keep.
WITH _docs_to_delete AS (
SELECT DISTINCT
bm25.docid
FROM parquet_scan('{parquet_file}') p
INNER JOIN bm25_documents.docs bm25
ON p.id = bm25.name
),

_terms_to_recompute AS (
SELECT DISTINCT
term
FROM bm25_documents.terms
INNER JOIN _docs_to_delete
ON bm25_documents.terms.docid = _docs_to_delete.docid
INNER JOIN bm25_documents.dict
ON bm25_documents.terms.termid = bm25_documents.dict.termid
),

_scores_to_update AS (
SELECT
_bm25.term,
_bm25.list_scores,
_bm25.list_docids
FROM bm25_documents.scores _bm25
INNER JOIN _terms_to_recompute _terms
ON _bm25.term = _terms.term
),

_unested_scores AS (
SELECT
term,
UNNEST(list_scores) AS score,
UNNEST(list_docids) AS docid
FROM _scores_to_update
),

_unested_unfiltered_scores AS (
SELECT
_scores.term,
_scores.docid,
_scores.score,
_docs.docid AS to_delete
FROM _unested_scores _scores
LEFT JOIN _docs_to_delete _docs
ON _scores.docid = _docs.docid
),

_unested_filtered_scores AS (
SELECT
term,
docid,
score
FROM _unested_unfiltered_scores
WHERE to_delete IS NULL
),

_list_scores AS (
SELECT
term,
LIST(docid ORDER BY score DESC, docid ASC) AS list_docids,
LIST(score ORDER BY score DESC, docid ASC) AS list_scores
FROM _unested_filtered_scores
GROUP BY 1
)

UPDATE bm25_documents.scores s
SET
list_docids = u.list_docids,
list_scores = u.list_scores
FROM _list_scores u
WHERE s.term = u.term;
11 changes: 11 additions & 0 deletions ducksearch/delete/update/stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
WITH _stats AS (
SELECT
COUNT(*) AS num_docs,
AVG(len) AS avgdl
FROM bm25_documents.docs
)

UPDATE bm25_documents.stats
SET num_docs = _stats.num_docs,
avgdl = _stats.avgdl
FROM _stats;
11 changes: 11 additions & 0 deletions ducksearch/delete/update/terms.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
WITH _docs_to_delete AS (
SELECT
bm25.docid
FROM parquet_scan('{parquet_file}') p
INNER JOIN bm25_documents.docs bm25
ON p.id = bm25.name
)

DELETE FROM bm25_documents.terms as _terms
USING _docs_to_delete as _docs
WHERE _terms.docid = _docs.docid;
21 changes: 10 additions & 11 deletions ducksearch/search/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def update_index(
)[0]["table_exists"]

if not settings_exists:
if isinstance(stopwords, list):
if not isinstance(stopwords, str):
stopwords_table = pa.Table.from_pydict({"sw": stopwords})
pq.write_table(stopwords_table, "_stopwords.parquet", compression="snappy")

Expand Down Expand Up @@ -290,7 +290,6 @@ def update_index(
settings["k1"] != k1
or settings["b"] != b
or settings["stemmer"] != stemmer
or settings["stopwords"] != stopwords
or settings["ignore"] != ignore
or settings["strip_accents"] != int(strip_accents)
or settings["lower"] != int(lower)
Expand Down Expand Up @@ -326,17 +325,10 @@ def update_index(
config=config,
)

_update_terms(
database=database,
schema=bm25_schema,
config=config,
)

termids_to_score = _termids_to_score(
database=database,
schema=bm25_schema,
config=config,
max_df=100_000,
)

_drop_scores_to_recompute(
Expand All @@ -362,6 +354,13 @@ def update_index(
termids = pa.Table.from_pydict({"termid": [term["termid"] for term in batch]})
pq.write_table(termids, "_termids.parquet", compression="snappy")

_update_terms(
database=database,
schema=bm25_schema,
parquet_file="_termids.parquet",
config=config,
)

_update_scores(
database=database,
schema=bm25_schema,
Expand Down Expand Up @@ -402,7 +401,7 @@ def update_index_documents(
k1: float = 1.5,
b: float = 0.75,
stemmer: str = "porter",
stopwords: str | list[str] = "english",
stopwords: str | list[str] = None,
ignore: str = "(\\.|[^a-z])+",
strip_accents: bool = True,
lower: bool = True,
Expand Down Expand Up @@ -483,7 +482,7 @@ def update_index_queries(
k1: float = 1.5,
b: float = 0.75,
stemmer: str = "porter",
stopwords: str | list[str] = "english",
stopwords: str | list[str] = None,
ignore: str = "(\\.|[^a-z])+",
strip_accents: bool = True,
lower: bool = True,
Expand Down
2 changes: 1 addition & 1 deletion ducksearch/search/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def graphs(
queries: str | list[str],
batch_size: int = 30,
top_k: int = 1000,
top_k_token: int = 10_000,
top_k_token: int = 30_000,
n_jobs: int = -1,
config: dict | None = None,
filters: str | None = None,
Expand Down
Loading

0 comments on commit af848fc

Please sign in to comment.