diff --git a/js/sdk/__tests__/GraphsIntegrationSuperUser.test.ts b/js/sdk/__tests__/GraphsIntegrationSuperUser.test.ts index 7603440a9..53463e2e0 100644 --- a/js/sdk/__tests__/GraphsIntegrationSuperUser.test.ts +++ b/js/sdk/__tests__/GraphsIntegrationSuperUser.test.ts @@ -394,7 +394,7 @@ describe("r2rClient V3 Graphs Integration Tests", () => { await client.graphs.exportEntities({ collectionId: collectionId, outputPath: outputPath, - filters: { document_type: { $eq: "non_existent_type" } }, + filters: { name: { $eq: "non_existent_name" } }, }); expect(fs.existsSync(outputPath)).toBe(true); @@ -482,7 +482,7 @@ describe("r2rClient V3 Graphs Integration Tests", () => { await client.graphs.exportEntities({ collectionId: collectionId, outputPath: outputPath, - filters: { document_type: { $eq: "non_existent_type" } }, + filters: { name: { $eq: "non_existent_name" } }, }); expect(fs.existsSync(outputPath)).toBe(true); @@ -518,6 +518,91 @@ describe("r2rClient V3 Graphs Integration Tests", () => { expect(response.totalEntries).toBeGreaterThanOrEqual(1); }); + test("Export graph communities to CSV with default options", async () => { + const outputPath = path.join( + TEST_OUTPUT_DIR, + "graph_communities_default.csv", + ); + await client.graphs.exportCommunities({ + collectionId: documentId, + outputPath: outputPath, + }); + + expect(fs.existsSync(outputPath)).toBe(true); + const content = fs.readFileSync(outputPath, "utf-8"); + expect(content).toBeTruthy(); + expect(content.split("\n").length).toBeGreaterThan(1); + }); + + test("Export graph communities to CSV with custom columns", async () => { + const outputPath = path.join(TEST_OUTPUT_DIR, "graph_entities_custom.csv"); + await client.graphs.exportCommunities({ + collectionId: collectionId, + outputPath: outputPath, + columns: ["id", "name", "created_at"], + includeHeader: true, + }); + + expect(fs.existsSync(outputPath)).toBe(true); + const content = fs.readFileSync(outputPath, "utf-8"); + const headers = content + .split("\n")[0] + .split(",") + .map((h) => h.trim()); + + expect(headers).toContain('"id"'); + expect(headers).toContain('"name"'); + expect(headers).toContain('"created_at"'); + }); + + test("Export filtered graph communities to CSV", async () => { + const outputPath = path.join( + TEST_OUTPUT_DIR, + "graph_communities_filtered.csv", + ); + await client.graphs.exportCommunities({ + collectionId: collectionId, + outputPath: outputPath, + filters: { name: { $eq: "txt" } }, + includeHeader: true, + }); + + expect(fs.existsSync(outputPath)).toBe(true); + const content = fs.readFileSync(outputPath, "utf-8"); + expect(content).toBeTruthy(); + }); + + test("Export graph communities without headers", async () => { + const outputPath = path.join( + TEST_OUTPUT_DIR, + "graph_communities_no_header.csv", + ); + await client.graphs.exportCommunities({ + collectionId: collectionId, + outputPath: outputPath, + includeHeader: false, + }); + + expect(fs.existsSync(outputPath)).toBe(true); + const content = fs.readFileSync(outputPath, "utf-8"); + }); + + test("Handle empty graph communities export result", async () => { + const outputPath = path.join( + TEST_OUTPUT_DIR, + "graph_communities_empty.csv", + ); + await client.graphs.exportCommunities({ + collectionId: collectionId, + outputPath: outputPath, + filters: { name: { $eq: "non_existent_name" } }, + }); + + expect(fs.existsSync(outputPath)).toBe(true); + const content = fs.readFileSync(outputPath, "utf-8"); + expect(content.split("\n").filter((line) => line.trim()).length).toBe(1); + }); + test("Create a new entity", async () => { const response = await client.graphs.createEntity({ collectionId: collectionId, diff --git a/js/sdk/src/v3/clients/graphs.ts b/js/sdk/src/v3/clients/graphs.ts index e420af9d1..a77c0de98 100644 --- a/js/sdk/src/v3/clients/graphs.ts +++ b/js/sdk/src/v3/clients/graphs.ts @@ -367,7 +367,7 @@ export class GraphsClient { } /** - * Export document entities as a CSV file with support for filtering and column selection. + * Export graph entities as a CSV file with support for filtering and column selection. * * @param options Export configuration options * @param options.outputPath Path where the CSV file should be saved (Node.js only) @@ -376,7 +376,7 @@ export class GraphsClient { * @param options.includeHeader Whether to include column headers (default: true) * @returns Promise in browser environments, Promise in Node.js */ - @feature("documents.exportEntities") + @feature("graphs.exportEntities") async exportEntities(options: { collectionId: string; outputPath?: string; @@ -385,7 +385,6 @@ export class GraphsClient { includeHeader?: boolean; }): Promise { const data: Record = { - id: options.collectionId, include_header: options.includeHeader ?? true, }; @@ -398,7 +397,7 @@ export class GraphsClient { const response = await this.client.makeRequest( "POST", - `documents/${options.collectionId}/entities/export`, + `graphs/${options.collectionId}/entities/export`, { data, responseType: "arraybuffer", @@ -417,11 +416,11 @@ export class GraphsClient { } /** - * Export documents as a CSV file and save it to the user's device. + * Export graph entities as a CSV file and save it to the user's device. * @param filename * @param options */ - @feature("documents.exportEntitiesToFile") + @feature("graphs.exportEntitiesToFile") async exportEntitiesToFile(options: { filename: string; collectionId: string; @@ -436,7 +435,7 @@ export class GraphsClient { } /** - * Export document relationships as a CSV file with support for filtering and column selection. + * Export graph relationships as a CSV file with support for filtering and column selection. * * @param options Export configuration options * @param options.outputPath Path where the CSV file should be saved (Node.js only) @@ -445,7 +444,7 @@ export class GraphsClient { * @param options.includeHeader Whether to include column headers (default: true) * @returns Promise in browser environments, Promise in Node.js */ - @feature("documents.exportRelationships") + @feature("graphs.exportRelationships") async exportRelationships(options: { collectionId: string; outputPath?: string; @@ -466,7 +465,7 @@ export class GraphsClient { const response = await this.client.makeRequest( "POST", - `documents/${options.collectionId}/relationships/export`, + `graphs/${options.collectionId}/relationships/export`, { data, responseType: "arraybuffer", @@ -485,11 +484,11 @@ export class GraphsClient { } /** - * Export document relationships as a CSV file and save it to the user's device. + * Export graph relationships as a CSV file and save it to the user's device. * @param filename * @param options */ - @feature("documents.exportRelationshipsToFile") + @feature("graphs.exportRelationshipsToFile") async exportRelationshipsToFile(options: { filename: string; collectionId: string; @@ -503,6 +502,74 @@ export class GraphsClient { } } + /** + * Export graph communities as a CSV file with support for filtering and column selection. + * + * @param options Export configuration options + * @param options.outputPath Path where the CSV file should be saved (Node.js only) + * @param options.columns Optional list of specific columns to include + * @param options.filters Optional filters to limit which documents are exported + * @param options.includeHeader Whether to include column headers (default: true) + * @returns Promise in browser environments, Promise in Node.js + */ + @feature("graphs.exportCommunities") + async exportCommunities(options: { + collectionId: string; + outputPath?: string; + columns?: string[]; + filters?: Record; + includeHeader?: boolean; + }): Promise { + const data: Record = { + include_header: options.includeHeader ?? true, + }; + + if (options.columns) { + data.columns = options.columns; + } + if (options.filters) { + data.filters = options.filters; + } + + const response = await this.client.makeRequest( + "POST", + `graphs/${options.collectionId}/communities/export`, + { + data, + responseType: "arraybuffer", + headers: { Accept: "text/csv" }, + }, + ); + + // Node environment + if (options.outputPath && typeof process !== "undefined") { + await fs.promises.writeFile(options.outputPath, Buffer.from(response)); + return; + } + + // Browser + return new Blob([response], { type: "text/csv" }); + } + + /** + * Export graph communities as a CSV file and save it to the user's device. + * @param filename + * @param options + */ + @feature("graphs.exportCommunitiesToFile") + async exportCommunitiesToFile(options: { + filename: string; + collectionId: string; + columns?: string[]; + filters?: Record; + includeHeader?: boolean; + }): Promise { + const blob = await this.exportRelationships(options); + if (blob instanceof Blob) { + downloadBlob(blob, options.filename); + } + } + /** * Creates a new community in the graph. * diff --git a/py/core/database/graphs.py b/py/core/database/graphs.py index 45eaf2af3..72de4e2d3 100644 --- a/py/core/database/graphs.py +++ b/py/core/database/graphs.py @@ -1391,6 +1391,125 @@ async def get( return communities, count + async def export_to_csv( + self, + parent_id: UUID, + store_type: StoreType, + columns: Optional[list[str]] = None, + filters: Optional[dict] = None, + include_header: bool = True, + ) -> tuple[str, IO]: + """ + Creates a CSV file from the PostgreSQL data and returns the path to the temp file. + """ + valid_columns = { + "id", + "collection_id", + "community_id", + "level", + "name", + "summary", + "findings", + "rating", + "rating_explanation", + "created_at", + "updated_at", + "metadata", + } + + if not columns: + columns = list(valid_columns) + else: + invalid_cols = set(columns) - valid_columns + if invalid_cols: + raise ValueError(f"Invalid columns: {invalid_cols}") + + table_name = "graphs_communities" + + select_stmt = f""" + SELECT + id::text, + collection_id::text, + community_id::text, + level, + name, + summary, + findings::text, + rating, + rating_explanation, + to_char(created_at, 'YYYY-MM-DD HH24:MI:SS') AS created_at, + to_char(updated_at, 'YYYY-MM-DD HH24:MI:SS') AS updated_at + metadata::text, + FROM {self._get_table_name(self._get_table_name(table_name))} + """ + + conditions = ["collection_id = $1"] + params: list[Any] = [parent_id] + param_index = 2 + + if filters: + for field, value in filters.items(): + if field not in valid_columns: + continue + + if isinstance(value, dict): + for op, val in value.items(): + if op == "$eq": + conditions.append(f"{field} = ${param_index}") + params.append(val) + param_index += 1 + elif op == "$gt": + conditions.append(f"{field} > ${param_index}") + params.append(val) + param_index += 1 + elif op == "$lt": + conditions.append(f"{field} < ${param_index}") + params.append(val) + param_index += 1 + else: + # Direct equality + conditions.append(f"{field} = ${param_index}") + params.append(value) + param_index += 1 + + if conditions: + select_stmt = f"{select_stmt} WHERE {' AND '.join(conditions)}" + + select_stmt = f"{select_stmt} ORDER BY created_at DESC" + + temp_file = None + try: + temp_file = tempfile.NamedTemporaryFile( + mode="w", delete=True, suffix=".csv" + ) + writer = csv.writer(temp_file, quoting=csv.QUOTE_ALL) + + async with self.connection_manager.pool.get_connection() as conn: # type: ignore + async with conn.transaction(): + cursor = await conn.cursor(select_stmt, *params) + + if include_header: + writer.writerow(columns) + + chunk_size = 1000 + while True: + rows = await cursor.fetch(chunk_size) + if not rows: + break + for row in rows: + writer.writerow(row) + + temp_file.flush() + return temp_file.name, temp_file + + except Exception as e: + if temp_file: + temp_file.close() + raise HTTPException( + status_code=500, + detail=f"Failed to export data: {str(e)}", + ) + class PostgresGraphsHandler(Handler): """Handler for Knowledge Graph METHODS in PostgreSQL.""" diff --git a/py/core/main/api/v3/graph_router.py b/py/core/main/api/v3/graph_router.py index edb64a609..f7da9b2d6 100644 --- a/py/core/main/api/v3/graph_router.py +++ b/py/core/main/api/v3/graph_router.py @@ -718,7 +718,7 @@ async def get_entities( @self.base_endpoint async def export_entities( background_tasks: BackgroundTasks, - id: UUID = Path( + collection_id: UUID = Path( ..., description="The ID of the collection to export entities from.", ), @@ -745,7 +745,7 @@ async def export_entities( csv_file_path, temp_file = ( await self.services.management.export_graph_entities( - id=id, + id=collection_id, columns=columns, filters=filters, include_header=include_header, @@ -871,7 +871,7 @@ async def create_relationship( @self.router.post( "/graphs/{collection_id}/relationships/export", - summary="Export document relationships to CSV", + summary="Export graph relationships to CSV", dependencies=[Depends(self.rate_limit_dependency)], openapi_extra={ "x-codeSamples": [ @@ -940,7 +940,7 @@ async def create_relationship( @self.base_endpoint async def export_relationships( background_tasks: BackgroundTasks, - id: UUID = Path( + collection_id: UUID = Path( ..., description="The ID of the document to export entities from.", ), @@ -967,7 +967,7 @@ async def export_relationships( csv_file_path, temp_file = ( await self.services.management.export_graph_relationships( - id=id, + id=collection_id, columns=columns, filters=filters, include_header=include_header, @@ -1843,6 +1843,119 @@ async def delete_community( ) return GenericBooleanResponse(success=True) # type: ignore + @self.router.post( + "/graphs/{collection_id}/communities/export", + summary="Export document communities to CSV", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent( + """ + from r2r import R2RClient + + client = R2RClient("http://localhost:7272") + # when using auth, do client.login(...) + + response = client.graphs.export_communities( + collection_id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + output_path="export.csv", + columns=["id", "title", "created_at"], + include_header=True, + ) + """ + ), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent( + """ + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient("http://localhost:7272"); + + function main() { + await client.graphs.exportCommunities({ + collectionId: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + outputPath: "export.csv", + columns: ["id", "title", "created_at"], + includeHeader: true, + }); + } + + main(); + """ + ), + }, + { + "lang": "CLI", + "source": textwrap.dedent( + """ + """ + ), + }, + { + "lang": "cURL", + "source": textwrap.dedent( + """ + curl -X POST "http://127.0.0.1:7272/v3/graphs/export_communities" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -H "Accept: text/csv" \ + -d '{ "columns": ["id", "title", "created_at"], "include_header": true }' \ + --output export.csv + """ + ), + }, + ] + }, + ) + @self.base_endpoint + async def export_relationships( + background_tasks: BackgroundTasks, + collection_id: UUID = Path( + ..., + description="The ID of the document to export entities from.", + ), + columns: Optional[list[str]] = Body( + None, description="Specific columns to export" + ), + filters: Optional[dict] = Body( + None, description="Filters to apply to the export" + ), + include_header: Optional[bool] = Body( + True, description="Whether to include column headers" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> FileResponse: + """ + Export documents as a downloadable CSV file. + """ + + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can export data.", + 403, + ) + + csv_file_path, temp_file = ( + await self.services.management.export_graph_communities( + id=collection_id, + columns=columns, + filters=filters, + include_header=include_header, + ) + ) + + background_tasks.add_task(temp_file.close) + + return FileResponse( + path=csv_file_path, + media_type="text/csv", + filename="documents_export.csv", + ) + @self.router.post( "/graphs/{collection_id}/communities/{community_id}", dependencies=[Depends(self.rate_limit_dependency)], diff --git a/py/core/main/services/management_service.py b/py/core/main/services/management_service.py index 4debd7d59..e3122ac2b 100644 --- a/py/core/main/services/management_service.py +++ b/py/core/main/services/management_service.py @@ -324,6 +324,22 @@ async def export_graph_relationships( include_header=include_header, ) + @telemetry_event("ExportGraphCommunities") + async def export_graph_communities( + self, + id: UUID, + columns: Optional[list[str]] = None, + filters: Optional[dict] = None, + include_header: bool = True, + ) -> tuple[str, IO]: + return await self.providers.database.graphs_handler.communities.export_to_csv( + parent_id=id, + store_type="graphs", # type: ignore + columns=columns, + filters=filters, + include_header=include_header, + ) + @telemetry_event("ExportUsers") async def export_users( self,