Skip to content

Commit

Permalink
D3v (#1398)
Browse files Browse the repository at this point in the history
* Feature/add zerox parser (#1396)

* Add KG tests (#1351)

* cli tests

* add sdk tests

* typo fix

* change workflow ordering

* add collection integration tests (#1352)

* bump pkg

* remove workflows

* fix sdk test port

* fix delete collection return check

* Fix document info serialization (#1353)

* Update integration-test-workflow-debian.yml

* pre-commit

* slightly modify

* up

* up

* smaller file

* up

* typo, change order

* up

* up

* change order

---------

Co-authored-by: emrgnt-cmplxty <68796651+emrgnt-cmplxty@users.noreply.github.com>
Co-authored-by: emrgnt-cmplxty <owen@algofi.org>
Co-authored-by: Nolan Tremelling <34580718+NolanTrem@users.noreply.github.com>

* add graphrag docs (#1362)

* add documentation

* up

* Update js/sdk/src/models.tsx

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

* pre-commit

---------

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

* Concurrent index creation, allow -1 for paginated entries (#1363)

* update webdev-template for current next.js and r2r-js sdk (#1218)

Co-authored-by: Simeon <simeon@theobald.nz>

* Feature/extend integration tests rebased (#1361)

* cleanups

* add back overzealous edits

* extend workflows

* fix full setup

* simplify cli

* add ymls

* rename to light

* try again

* start light

* add cli tests

* fix

* fix

* testing..

* trying complete matrix testflow

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* up

* up

* up

* All actions

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* try offic pgvec formula

* sudo make

* sudo make

* push and pray

* push and pray

* add new actions

* add new actions

* docker push & pray

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* setup docker

* setup docker

* fix default

* fix default

* Feature/rebase to r2r vars (#1364)

* cleanups

* add back overzealous edits

* extend workflows

* fix full setup

* simplify cli

* add ymls

* rename to light

* try again

* start light

* add cli tests

* fix

* fix

* testing..

* trying complete matrix testflow

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* up

* up

* up

* All actions

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* try offic pgvec formula

* sudo make

* sudo make

* push and pray

* push and pray

* add new actions

* add new actions

* docker push & pray

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* setup docker

* setup docker

* fix default

* fix default

* make changes

* update the windows workflow

* update the windows workflow

* remove extra workflows for now

* bump pkg

* push and pray

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive tests

* revive tests

* revive tests

* revive tests

* update tests

* fix typos (#1366)

* update tests

* up

* up

* up

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* Add ingestion concurrency limit (#1367)

* up

* up

* up

---------

Co-authored-by: --global=Shreyas Pimpalgaonkar <--global=shreyas.gp.7@gmail.com>

* tweaks and fixes

* Fix Ollama Tool Calling (#1372)

* Update graphrag.mdx

* Fix Ollama tool calling

---------

Co-authored-by: Shreyas Pimpalgaonkar <shreyas.gp.7@gmail.com>

* Clean up Docker Compose (#1368)

* Fix hatchet, dockerfile

* Update compose

* point to correct docker image

* Fix bug in deletion, better validation error handling (#1374)

* Update graphrag.mdx

* Fix bug in deletion, better validation error handling

---------

Co-authored-by: Shreyas Pimpalgaonkar <shreyas.gp.7@gmail.com>

* vec index creation endpoint (#1373)

* Update graphrag.mdx

* upload files

* create vector index endpoint

* add to fastapi background task

* pre-commit

* move logging

* add api spec, support for all vecs

* pre-commit

* add workflow

* Modify KG Endpoints and update API spec (#1369)

* Update graphrag.mdx

* modify API endpoints and update documentation

* Update ingestion_router.py

* try different docker setup (#1371)

* try different docker setup

* action

* add login

* add full

* update action

* cleanup upload script

* cleanup upload script

* tweak action

* tweak action

* tweak action

* tweak action

* tweak action

* tweak action

* Nolan/ingest chunks js (#1375)

* Update graphrag.mdx

* Clean up ingest chunks, add to JS SDK

* Update JS docs

---------

Co-authored-by: Shreyas Pimpalgaonkar <shreyas.gp.7@gmail.com>

* up (#1376)

* Bump JS package (#1378)

* add conversation

* checkin progress

* checkin progress

* Fix Create Graph (#1379)

* up

* up

* modify assertion

* up

* up

* increase entity limit

* changing aristotle back to v2

* pre-commit

* typos

* add test_ingest_sample_file_2_sdk

* Update server.py

* checkin progress

* up

* update

* Graphrag docs (#1382)

* add docs and refine code

* add python SDK documentation

* up

* update

* checkin

* up

* cleanup

* working sync logging

* test conversation history

* fix runner tests, rename `CHUNKS` to `chunks`

* adding zerox parser

---------

Co-authored-by: Shreyas Pimpalgaonkar <shreyas.gp.7@gmail.com>
Co-authored-by: Nolan Tremelling <34580718+NolanTrem@users.noreply.github.com>
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
Co-authored-by: FutureProofTechOps <operations@theobald.nz>
Co-authored-by: Simeon <simeon@theobald.nz>
Co-authored-by: --global=Shreyas Pimpalgaonkar <--global=shreyas.gp.7@gmail.com>

* Nolan/update hatchet (#1397)

* Move Hatchet to latest

* Update js package-lock

---------

Co-authored-by: NolanTrem <34580718+NolanTrem@users.noreply.github.com>

* Add hatchet logging (#1391)

* Add KG tests (#1351)

* cli tests

* add sdk tests

* typo fix

* change workflow ordering

* add collection integration tests (#1352)

* bump pkg

* remove workflows

* fix sdk test port

* fix delete collection return check

* Fix document info serialization (#1353)

* Update integration-test-workflow-debian.yml

* pre-commit

* slightly modify

* up

* up

* smaller file

* up

* typo, change order

* up

* up

* change order

---------

Co-authored-by: emrgnt-cmplxty <68796651+emrgnt-cmplxty@users.noreply.github.com>
Co-authored-by: emrgnt-cmplxty <owen@algofi.org>
Co-authored-by: Nolan Tremelling <34580718+NolanTrem@users.noreply.github.com>

* add graphrag docs (#1362)

* add documentation

* up

* Update js/sdk/src/models.tsx

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

* pre-commit

---------

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

* Concurrent index creation, allow -1 for paginated entries (#1363)

* update webdev-template for current next.js and r2r-js sdk (#1218)

Co-authored-by: Simeon <simeon@theobald.nz>

* Feature/extend integration tests rebased (#1361)

* cleanups

* add back overzealous edits

* extend workflows

* fix full setup

* simplify cli

* add ymls

* rename to light

* try again

* start light

* add cli tests

* fix

* fix

* testing..

* trying complete matrix testflow

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* up

* up

* up

* All actions

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* try offic pgvec formula

* sudo make

* sudo make

* push and pray

* push and pray

* add new actions

* add new actions

* docker push & pray

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* setup docker

* setup docker

* fix default

* fix default

* Feature/rebase to r2r vars (#1364)

* cleanups

* add back overzealous edits

* extend workflows

* fix full setup

* simplify cli

* add ymls

* rename to light

* try again

* start light

* add cli tests

* fix

* fix

* testing..

* trying complete matrix testflow

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* cleanup matrix logic

* up

* up

* up

* All actions

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* rename to runner

* try offic pgvec formula

* sudo make

* sudo make

* push and pray

* push and pray

* add new actions

* add new actions

* docker push & pray

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* inspect manifests during launch

* setup docker

* setup docker

* fix default

* fix default

* make changes

* update the windows workflow

* update the windows workflow

* remove extra workflows for now

* bump pkg

* push and pray

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive full workflow

* revive tests

* revive tests

* revive tests

* revive tests

* update tests

* fix typos (#1366)

* update tests

* up

* up

* up

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* bump max connections

* Add ingestion concurrency limit (#1367)

* up

* up

* up

---------

Co-authored-by: --global=Shreyas Pimpalgaonkar <--global=shreyas.gp.7@gmail.com>

* tweaks and fixes

* Fix Ollama Tool Calling (#1372)

* Update graphrag.mdx

* Fix Ollama tool calling

---------

Co-authored-by: Shreyas Pimpalgaonkar <shreyas.gp.7@gmail.com>

* Clean up Docker Compose (#1368)

* Fix hatchet, dockerfile

* Update compose

* point to correct docker image

* Fix bug in deletion, better validation error handling (#1374)

* Update graphrag.mdx

* Fix bug in deletion, better validation error handling

---------

Co-authored-by: Shreyas Pimpalgaonkar <shreyas.gp.7@gmail.com>

* vec index creation endpoint (#1373)

* Update graphrag.mdx

* upload files

* create vector index endpoint

* add to fastapi background task

* pre-commit

* move logging

* add api spec, support for all vecs

* pre-commit

* add workflow

* Modify KG Endpoints and update API spec (#1369)

* Update graphrag.mdx

* modify API endpoints and update documentation

* Update ingestion_router.py

* try different docker setup (#1371)

* try different docker setup

* action

* add login

* add full

* update action

* cleanup upload script

* cleanup upload script

* tweak action

* tweak action

* tweak action

* tweak action

* tweak action

* tweak action

* Nolan/ingest chunks js (#1375)

* Update graphrag.mdx

* Clean up ingest chunks, add to JS SDK

* Update JS docs

---------

Co-authored-by: Shreyas Pimpalgaonkar <shreyas.gp.7@gmail.com>

* up (#1376)

* Bump JS package (#1378)

* Fix Create Graph (#1379)

* up

* up

* modify assertion

* up

* up

* increase entity limit

* changing aristotle back to v2

* pre-commit

* typos

* add test_ingest_sample_file_2_sdk

* Update server.py

* add docs and refine code

* add python SDK documentation

* up

* add logs

* clean

* rm vq

* rm conflicts

* pre-commit

* up

* add logging

* update logs

* up

* up

* Update kg_service.py

---------

Co-authored-by: emrgnt-cmplxty <68796651+emrgnt-cmplxty@users.noreply.github.com>
Co-authored-by: emrgnt-cmplxty <owen@algofi.org>
Co-authored-by: Nolan Tremelling <34580718+NolanTrem@users.noreply.github.com>
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
Co-authored-by: FutureProofTechOps <operations@theobald.nz>
Co-authored-by: Simeon <simeon@theobald.nz>
Co-authored-by: --global=Shreyas Pimpalgaonkar <--global=shreyas.gp.7@gmail.com>

* feat: Add delete_node_via_document_id method to KGProvider (#1387)

Co-authored-by: shou.hsu <shou.hsu@utonia.com>

---------

Co-authored-by: Shreyas Pimpalgaonkar <shreyas.gp.7@gmail.com>
Co-authored-by: Nolan Tremelling <34580718+NolanTrem@users.noreply.github.com>
Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
Co-authored-by: FutureProofTechOps <operations@theobald.nz>
Co-authored-by: Simeon <simeon@theobald.nz>
Co-authored-by: --global=Shreyas Pimpalgaonkar <--global=shreyas.gp.7@gmail.com>
Co-authored-by: Shou-Hsu <148736054+Shou-Hsu@users.noreply.github.com>
Co-authored-by: shou.hsu <shou.hsu@utonia.com>
  • Loading branch information
9 people authored Oct 14, 2024
1 parent 975a405 commit 7331d3a
Show file tree
Hide file tree
Showing 33 changed files with 1,278 additions and 727 deletions.
10 changes: 5 additions & 5 deletions js/sdk/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions py/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ FROM python:3.10-slim AS builder
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc g++ musl-dev curl libffi-dev gfortran libopenblas-dev \
poppler-utils \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y

RUN pip install --no-cache-dir poetry


# Add Rust to PATH
ENV PATH="/root/.cargo/bin:${PATH}"

Expand Down Expand Up @@ -47,8 +49,4 @@ COPY r2r.toml /app/r2r.toml
COPY pyproject.toml /app/pyproject.toml

# Run the application
<<<<<<< HEAD
CMD ["sh", "-c", "uvicorn core.main.app_entry:app --host $HOST --port $R2R_PORT"]
=======
CMD ["sh", "-c", "uvicorn core.main.app_entry:app --host $R2R_HOST --port $R2R_PORT"]
>>>>>>> 8ae04c5bfdbeab77073b6ae1169c5bff1b32489b
8 changes: 4 additions & 4 deletions py/compose.full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ services:
- r2r-network

hatchet-migration:
image: ghcr.io/hatchet-dev/hatchet/hatchet-migrate:v0.49.0-alpha.4
image: ghcr.io/hatchet-dev/hatchet/hatchet-migrate:latest
environment:
DATABASE_URL: "postgres://${HATCHET_POSTGRES_USER:-hatchet_user}:${HATCHET_POSTGRES_PASSWORD:-hatchet_password}@hatchet-postgres:5432/${HATCHET_POSTGRES_DBNAME:-hatchet}?sslmode=disable"
depends_on:
Expand All @@ -117,7 +117,7 @@ services:
- r2r-network

hatchet-setup-config:
image: ghcr.io/hatchet-dev/hatchet/hatchet-admin:v0.49.0-alpha.4
image: ghcr.io/hatchet-dev/hatchet/hatchet-admin:latest
command: /hatchet/hatchet-admin quickstart --skip certs --generated-config-dir /hatchet/config --overwrite=false
environment:
DATABASE_URL: "postgres://${HATCHET_POSTGRES_USER:-hatchet_user}:${HATCHET_POSTGRES_PASSWORD:-hatchet_password}@hatchet-postgres:5432/${HATCHET_POSTGRES_DBNAME:-hatchet}?sslmode=disable"
Expand Down Expand Up @@ -151,7 +151,7 @@ services:
- r2r-network

hatchet-engine:
image: ghcr.io/hatchet-dev/hatchet/hatchet-engine:v0.49.0-alpha.4
image: ghcr.io/hatchet-dev/hatchet/hatchet-engine:latest
command: /hatchet/hatchet-engine --config /hatchet/config
restart: on-failure
depends_on:
Expand All @@ -178,7 +178,7 @@ services:
retries: 5

hatchet-dashboard:
image: ghcr.io/hatchet-dev/hatchet/hatchet-dashboard:v0.49.0-alpha.4
image: ghcr.io/hatchet-dev/hatchet/hatchet-dashboard:latest
command: sh ./entrypoint.sh --config /hatchet/config
restart: on-failure
depends_on:
Expand Down
4 changes: 3 additions & 1 deletion py/core/base/parsers/base_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@

class AsyncParser(ABC, Generic[T]):
@abstractmethod
async def ingest(self, data: T) -> AsyncGenerator[DataType, None]:
async def ingest(
self, data: T, **kwargs
) -> AsyncGenerator[DataType, None]:
pass
1 change: 1 addition & 0 deletions py/core/base/providers/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
VectorSearchResult,
default_embedding_prefixes,
)

from .base import Provider, ProviderConfig

logger = logging.getLogger(__name__)
Expand Down
1 change: 1 addition & 0 deletions py/core/base/providers/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
class IngestionConfig(ProviderConfig):
provider: str = "r2r"
excluded_parsers: list[str] = ["mp4"]
extra_parsers: dict[str, str] = {}

@property
def supported_providers(self) -> list[str]:
Expand Down
8 changes: 8 additions & 0 deletions py/core/base/providers/kg.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ async def get_entity_count(
self,
collection_id: Optional[UUID] = None,
document_id: Optional[UUID] = None,
distinct: bool = False,
entity_table_name: str = "entity_embedding",
) -> int:
"""Abstract method to get the entity count."""
Expand All @@ -208,6 +209,13 @@ async def delete_graph_for_collection(
"""Abstract method to delete the graph for a collection."""
pass

@abstractmethod
async def delete_node_via_document_id(
self, document_id: UUID, collection_id: UUID
) -> None:
"""Abstract method to delete the node via document id."""
pass

@abstractmethod
async def get_creation_estimate(self, *args: Any, **kwargs: Any) -> Any:
"""Abstract method to get the creation estimate."""
Expand Down
1 change: 1 addition & 0 deletions py/core/main/api/ingestion_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pydantic import Json

from core.base import R2RException, RawChunk, generate_document_id

from core.base.api.models import (
CreateVectorIndexResponse,
WrappedCreateVectorIndexResponse,
Expand Down
1 change: 0 additions & 1 deletion py/core/main/api/kg_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from core.utils import generate_default_user_collection_id
from shared.abstractions.kg import KGRunType
from shared.utils.base_utils import update_settings_from_dict

from ..services.kg_service import KgService
from .base_router import BaseRouter

Expand Down
39 changes: 38 additions & 1 deletion py/core/main/api/management_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from core.base.logging import AnalysisTypes, LogFilterCriteria
from core.base.providers import OrchestrationProvider
from shared.abstractions.kg import KGRunType

from ..services.management_service import ManagementService
from .base_router import BaseRouter, RunType
Expand Down Expand Up @@ -683,6 +684,15 @@ async def remove_document_from_collection_app(
document_id: str = Body(..., description="Document ID"),
collection_id: str = Body(..., description="Collection ID"),
auth_user=Depends(self.service.providers.auth.auth_wrapper),
run_type: Optional[KGRunType] = Body(
default=KGRunType.ESTIMATE,
description="Run type for the graph enrichment process.",
),
kg_enrichment_settings: Optional[dict] = Body(
default=None,
description="Settings for the graph enrichment process.",
),

) -> WrappedDeleteResponse:
collection_uuid = UUID(collection_id)
document_uuid = UUID(document_id)
Expand All @@ -695,9 +705,36 @@ async def remove_document_from_collection_app(
403,
)

await self.service.remove_document_from_collection(
enrichment = await self.service.remove_document_from_collection(
document_uuid, collection_uuid
)
if enrichment:
if not run_type:
run_type = KGRunType.ESTIMATE

server_kg_enrichment_settings = (
self.service.providers.kg.config.kg_enrichment_settings
)
if run_type is KGRunType.ESTIMATE:

return await self.service.get_enrichment_estimate(
collection_id, server_kg_enrichment_settings
)

if kg_enrichment_settings:
for key, value in kg_enrichment_settings.items():
if value is not None:
setattr(server_kg_enrichment_settings, key, value)

workflow_input = {
"collection_id": str(collection_id),
"kg_enrichment_settings": server_kg_enrichment_settings.model_dump_json(),
"user": auth_user.json(),
}
self.orchestration_provider.run_workflow(
"enrich-graph", {"request": workflow_input}, {}
)

return None # type: ignore

@self.router.get("/document_collections/{document_id}")
Expand Down
67 changes: 57 additions & 10 deletions py/core/main/orchestration/hatchet/kg_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import logging
import math
import uuid
import time

from hatchet_sdk import ConcurrencyLimitStrategy, Context

from core import GenerationConfig
from core.base import OrchestrationProvider

from shared.abstractions.document import KGExtractionStatus
from ...services import KgService

from shared.utils import create_hatchet_logger

logger = logging.getLogger(__name__)
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -57,9 +60,7 @@ def concurrency(self, context: Context) -> str:
@orchestration_provider.step(retries=1, timeout="360m")
async def kg_extract(self, context: Context) -> dict:

context.log(
f"Running KG Extraction for input: {context.workflow_input()['request']}"
)
start_time = time.time()

input_data = get_input_data_dict(
context.workflow_input()["request"]
Expand All @@ -70,12 +71,16 @@ async def kg_extract(self, context: Context) -> dict:

await self.kg_service.kg_triples_extraction(
document_id=uuid.UUID(document_id),
logger=context.log,
logger=create_hatchet_logger(context.log),
**input_data["kg_creation_settings"],
)

context.log(
f"Successfully ran kg triples extraction for document {document_id}"
)

return {
"result": f"successfully ran kg triples extraction for document {document_id}"
"result": f"successfully ran kg triples extraction for document {document_id} in {time.time() - start_time:.2f} seconds",
}

@orchestration_provider.step(
Expand All @@ -90,13 +95,44 @@ async def kg_entity_description(self, context: Context) -> dict:

await self.kg_service.kg_entity_description(
document_id=uuid.UUID(document_id),
logger=create_hatchet_logger(context.log),
**input_data["kg_creation_settings"],
)

context.log(
f"Successfully ran kg node description for document {document_id}"
)

return {
"result": f"successfully ran kg node description for document {document_id}"
}

@orchestration_provider.failure()
async def on_failure(self, context: Context) -> None:
request = context.workflow_input().get("request", {})
document_id = request.get("document_id")

if not document_id:
context.log(
"No document id was found in workflow input to mark a failure."
)
return

try:
await self.kg_service.providers.database.relational.set_workflow_status(
id=uuid.UUID(document_id),
status_type="kg_extraction_status",
status=KGExtractionStatus.FAILED,
)
context.log(
f"Updated KG extraction status for {document_id} to FAILED"
)

except Exception as e:
context.log(
f"Failed to update document status for {document_id}: {e}"
)

@orchestration_provider.workflow(name="create-graph", timeout="360m")
class CreateGraphWorkflow:
def __init__(self, kg_service: KgService):
Expand Down Expand Up @@ -187,6 +223,8 @@ def __init__(self, kg_service: KgService):
@orchestration_provider.step(retries=1, parents=[], timeout="360m")
async def kg_clustering(self, context: Context) -> dict:

start_time = time.time()

logger.info("Running KG Clustering")
input_data = get_input_data_dict(
context.workflow_input()["request"]
Expand All @@ -195,11 +233,12 @@ async def kg_clustering(self, context: Context) -> dict:

kg_clustering_results = await self.kg_service.kg_clustering(
collection_id=collection_id,
logger=create_hatchet_logger(context.log),
**input_data["kg_enrichment_settings"],
)

context.log(
f"Successfully ran kg clustering for collection {collection_id}: {json.dumps(kg_clustering_results)}"
f"Successfully ran kg clustering for collection {collection_id}: {json.dumps(kg_clustering_results)} in {time.time() - start_time:.2f} seconds"
)
logger.info(
f"Successfully ran kg clustering for collection {collection_id}: {json.dumps(kg_clustering_results)}"
Expand All @@ -220,10 +259,14 @@ async def kg_community_summary(self, context: Context) -> dict:
num_communities = context.step_output("kg_clustering")[
"kg_clustering"
][0]["num_communities"]

parallel_communities = min(100, num_communities)
total_workflows = math.ceil(num_communities / parallel_communities)
workflows = []

context.log(
f"Running KG Community Summary for {num_communities} communities, spawning {total_workflows} workflows"
)

for i in range(total_workflows):
offset = i * parallel_communities
workflows.append(
Expand Down Expand Up @@ -257,15 +300,19 @@ def __init__(self, kg_service: KgService):

@orchestration_provider.step(retries=1, timeout="360m")
async def kg_community_summary(self, context: Context) -> dict:

start_time = time.time()

input_data = get_input_data_dict(
context.workflow_input()["request"]
)

community_summary = await self.kg_service.kg_community_summary(
**input_data
logger=create_hatchet_logger(context.log),
**input_data,
)
context.log(
f"Successfully ran kg community summary for communities {input_data['offset']} to {input_data['offset'] + len(community_summary)}"
f"Successfully ran kg community summary for communities {input_data['offset']} to {input_data['offset'] + len(community_summary)} in {time.time() - start_time:.2f} seconds "
)
return {
"result": f"successfully ran kg community summary for communities {input_data['offset']} to {input_data['offset'] + len(community_summary)}"
Expand Down
25 changes: 16 additions & 9 deletions py/core/main/orchestration/simple/kg_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,22 @@ async def create_graph(input_data):

for _, document_id in enumerate(document_ids):
# Extract triples from the document
await service.kg_triples_extraction(
document_id=document_id,
**input_data["kg_creation_settings"],
)
# Describe the entities in the graph
await service.kg_entity_description(
document_id=document_id,
**input_data["kg_creation_settings"],
)

try:
await service.kg_triples_extraction(
document_id=document_id,
**input_data["kg_creation_settings"],
)
# Describe the entities in the graph
await service.kg_entity_description(
document_id=document_id,
**input_data["kg_creation_settings"],
)

except Exception as e:
logger.error(
f"Error in creating graph for document {document_id}: {e}"
)

async def enrich_graph(input_data):

Expand Down
Loading

0 comments on commit 7331d3a

Please sign in to comment.