Skip to content

Commit

Permalink
try different docker setup (#1371)
Browse files Browse the repository at this point in the history
* try different docker setup

* action

* add login

* add full

* update action

* cleanup upload script

* cleanup upload script

* tweak action

* tweak action

* tweak action

* tweak action

* tweak action

* tweak action
  • Loading branch information
emrgnt-cmplxty authored Oct 10, 2024
1 parent 8bd0960 commit 272b358
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 93 deletions.
17 changes: 17 additions & 0 deletions .github/actions/login-docker/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: 'Login Docker'
description: 'Sets up Docker for running R2R'
inputs:
docker_username:
description: 'Docker Hub username'
required: true
docker_password:
description: 'Docker Hub password or token'
required: true
runs:
using: "composite"
steps:
- name: Login to Docker Hub
uses: docker/login-action@v2
with:
username: ${{ inputs.docker_username }}
password: ${{ inputs.docker_password }}
34 changes: 3 additions & 31 deletions .github/actions/setup-docker/action.yml
Original file line number Diff line number Diff line change
@@ -1,41 +1,13 @@
name: 'Setup Docker'
description: 'Sets up Docker for running R2R'
inputs:
os:
description: 'Operating system'
required: true
docker_username:
description: 'Docker Hub username'
required: true
docker_password:
description: 'Docker Hub password or token'
required: true
runs:
using: "composite"
steps:
- name: Set up Docker (Ubuntu)
if: inputs.os == 'ubuntu-latest'
- name: Set up Docker
uses: docker-practice/actions-setup-docker@master
with:
docker_version: 20.10
docker_buildx: true

- name: Set up Docker (Windows)
if: inputs.os == 'macos-latest'
uses: docker-practice/actions-setup-docker@master
with:
docker_version: 20.10
docker_buildx: true

- name: Set up Docker (Windows)
if: inputs.os == 'windows-latest'
uses: docker-practice/actions-setup-docker@master
with:
docker_version: 20.10
docker_buildx: true

- name: Login to Docker Hub
uses: docker/login-action@v2
with:
username: ${{ inputs.docker_username }}
password: ${{ inputs.docker_password }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
7 changes: 5 additions & 2 deletions .github/workflows/r2r-full-py-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ jobs:

strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
# os: [ubuntu-latest, windows-latest, macos-latest]
os: [ubuntu-latest, macos-latest]
# os: [ubuntu-latest]
test_category:
- cli-ingestion
Expand All @@ -44,8 +45,10 @@ jobs:

- name: Setup and start Docker
uses: ./.github/actions/setup-docker

- name: Login Docker
uses: ./.github/actions/login-docker
with:
os: ${{ matrix.os }}
docker_username: ${{ secrets.RAGTORICHES_DOCKER_UNAME }}
docker_password: ${{ secrets.RAGTORICHES_DOCKER_TOKEN }}

Expand Down
4 changes: 2 additions & 2 deletions py/core/base/providers/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Workflow(Enum):

class OrchestrationConfig(ProviderConfig):
provider: str
max_threads: int = 256
max_runs: int = 2_048
kg_creation_concurrency_limit: int = 32
ingestion_concurrency_limit: int = 64

Expand All @@ -36,7 +36,7 @@ async def start_worker(self):
pass

@abstractmethod
def get_worker(self, name: str, max_threads: int) -> Any:
def get_worker(self, name: str, max_runs: int) -> Any:
pass

@abstractmethod
Expand Down
70 changes: 33 additions & 37 deletions py/core/examples/scripts/upload_hf_textbooks_ex.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,78 @@
import asyncio
import os
import uuid
from concurrent.futures import ThreadPoolExecutor

import aiofiles
from datasets import load_dataset

from r2r import R2RClient
from r2r import R2RAsyncClient

batch_size = 128
rest_time_in_s = 5
batch_size = 64
total_batches = 8
rest_time_in_s = 1


def generate_id(label: str) -> uuid.UUID:
return uuid.uuid5(uuid.NAMESPACE_DNS, label)


async def upload_batch(client, file_paths, executor):
print(f"Uploading batch of {len(file_paths)} files")
loop = asyncio.get_event_loop()
try:
response = await loop.run_in_executor(
executor, client.ingest_files, file_paths
)
print(response)
except Exception as e:
print(f"Error uploading batch: {e}")
finally:
await asyncio.gather(*(remove_file(fname) for fname in file_paths))


async def remove_file(file_path):
def remove_file(file_path):
try:
os.remove(file_path)
except Exception as e:
print(f"Error removing file {file_path}: {e}")


async def process_dataset(client, dataset, batch_size, executor):
async def process_batch(client, batch):
results = await client.ingest_files(batch)
print(f"Submitted {len(results['results'])} files for processing")
print("results = ", results["results"])
# Remove the processed files
for file_path in batch:
remove_file(file_path)


async def process_dataset(client, dataset, batch_size):
current_batch = []
count = 0
tasks = []

for example in dataset:
count += 1
fname = f"example_{generate_id(example['completion'])}.txt"
print(f"Streaming {fname} w/ completion {count} ...")

async with aiofiles.open(fname, "w") as f:
await f.write(example["completion"])
with open(fname, "w") as f:
f.write(example["completion"])

current_batch.append(fname)

if len(current_batch) >= batch_size:
asyncio.create_task(
upload_batch(client, current_batch.copy(), executor)
)
current_batch.clear()
await asyncio.sleep(rest_time_in_s)
if len(current_batch) == batch_size:
task = asyncio.create_task(process_batch(client, current_batch))
tasks.append(task)
current_batch = []

if len(tasks) == total_batches:
await asyncio.gather(*tasks)
tasks = [] # Reset the tasks list
# await asyncio.sleep(rest_time_in_s)

# Process any remaining files in the last batch
if current_batch:
asyncio.create_task(
upload_batch(client, current_batch.copy(), executor)
)
await process_batch(client, current_batch)


async def main():
r2r_url = os.getenv("R2R_API_URL", "http://localhost:7272")
print(f"Using R2R API at: {r2r_url}")
client = R2RClient(r2r_url)
client = R2RAsyncClient(r2r_url)

dataset = load_dataset(
"SciPhi/textbooks-are-all-you-need-lite", streaming=True
)["train"]

with ThreadPoolExecutor() as executor:
print("Submitting batches for processing ...")
await process_dataset(client, dataset, batch_size, executor)
print("All batches submitted for processing")
print("Submitting batches for processing ...")
await process_dataset(client, dataset, batch_size)
print("All batches submitted for processing")


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions py/core/main/api/data/retrieval_router_openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -218,5 +218,6 @@ agent:
kg_search_settings: "Knowledge graph search settings"
rag_generation_config: "RAG generation configuration"
task_prompt_override: "Task prompt override"
conversation_id: "The ID of the conversation, a new ID is assigned if not provided"
include_title_if_available: "Includes document title in chunk response, if available."
rag_strategy: "The RAG strategy to use (default | hyde | rag_fusion)"
4 changes: 4 additions & 0 deletions py/core/main/api/retrieval_router.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from pathlib import Path
from typing import Any, Optional, Union
from uuid import UUID

import yaml
from fastapi import Body, Depends
Expand Down Expand Up @@ -239,6 +240,9 @@ async def agent_app(
"include_title_if_available"
),
),
conversation_id: Optional[UUID] = Body(
None, description=agent_descriptions.get("conversation_id")
),
auth_user=Depends(self.service.providers.auth.auth_wrapper),
) -> WrappedRAGAgentResponse: # type: ignore
"""
Expand Down
32 changes: 16 additions & 16 deletions py/core/main/orchestration/hatchet/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,30 +76,30 @@ async def parse(self, context: Context) -> dict:
async for extraction in extractions_generator:
extractions.append(extraction)

serializable_extractions = [
extraction.to_dict() for extraction in extractions
]

return {
"status": "Successfully extracted data",
"extractions": serializable_extractions,
"document_info": document_info.to_dict(),
}

@orchestration_provider.step(parents=["parse"], timeout="60m")
async def embed(self, context: Context) -> dict:
document_info_dict = context.step_output("parse")["document_info"]
document_info = DocumentInfo(**document_info_dict)
# serializable_extractions = [
# extraction.to_dict() for extraction in extractions
# ]

# return {
# "status": "Successfully extracted data",
# "extractions": serializable_extractions,
# "document_info": document_info.to_dict(),
# }

# @orchestration_provider.step(parents=["parse"], timeout="60m")
# async def embed(self, context: Context) -> dict:
# document_info_dict = context.step_output("parse")["document_info"]
# document_info = DocumentInfo(**document_info_dict)

await self.ingestion_service.update_document_status(
document_info,
status=IngestionStatus.EMBEDDING,
)

extractions = context.step_output("parse")["extractions"]
# extractions = context.step_output("parse")["extractions"]

embedding_generator = await self.ingestion_service.embed_document(
extractions
[extraction.to_dict() for extraction in extractions]
)

embeddings = []
Expand Down
8 changes: 4 additions & 4 deletions py/core/providers/orchestration/hatchet.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ def step(self, *args, **kwargs) -> Callable:
def failure(self, *args, **kwargs) -> Callable:
return self.orchestrator.on_failure_step(*args, **kwargs)

def get_worker(self, name: str, max_threads: Optional[int] = None) -> Any:
if not max_threads:
max_threads = self.config.max_threads
self.worker = self.orchestrator.worker(name, max_threads)
def get_worker(self, name: str, max_runs: Optional[int] = None) -> Any:
if not max_runs:
max_runs = self.config.max_runs
self.worker = self.orchestrator.worker(name, max_runs)
return self.worker

def concurrency(self, *args, **kwargs) -> Callable:
Expand Down
2 changes: 1 addition & 1 deletion py/core/providers/orchestration/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self, config: OrchestrationConfig):
async def start_worker(self):
pass

def get_worker(self, name: str, max_threads: int) -> Any:
def get_worker(self, name: str, max_runs: int) -> Any:
pass

def step(self, *args, **kwargs) -> Any:
Expand Down

0 comments on commit 272b358

Please sign in to comment.