Skip to content

Commit

Permalink
complete logging port (#1499)
Browse files Browse the repository at this point in the history
  • Loading branch information
emrgnt-cmplxty authored Oct 25, 2024
1 parent fef16fe commit d4a8047
Show file tree
Hide file tree
Showing 52 changed files with 1,016 additions and 166 deletions.
2 changes: 1 addition & 1 deletion py/core/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .abstractions import *
from .agent import *
from .api.models import *
from .logging import *
from .logger import *
from .parsers import *
from .pipeline import *
from .pipes import *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
LogFilterCriteria,
LogProcessor,
)
from .r2r_logger import PersistentLoggingConfig
from .base import PersistentLoggingConfig, RunInfoLog
from .run_manager import RunManager, manage_run

__all__ = [
Expand All @@ -19,6 +19,7 @@
"LogProcessor",
# Logging Providers
"PersistentLoggingConfig",
"RunInfoLog",
# Run Manager
"RunManager",
"manage_run",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
import logging
from abc import abstractmethod
from datetime import datetime
Expand All @@ -9,7 +10,6 @@
from core.base import Message

from ..providers.base import Provider, ProviderConfig
from .base import RunType

logger = logging.getLogger()

Expand All @@ -35,6 +35,17 @@ def supported_providers(self) -> list[str]:
return ["local", "postgres"]


class RunType(str, Enum):
"""Enumeration of the different types of runs."""

RETRIEVAL = "RETRIEVAL"
MANAGEMENT = "MANAGEMENT"
INGESTION = "INGESTION"
AUTH = "AUTH"
UNSPECIFIED = "UNSPECIFIED"
KG = "KG"


class PersistentLoggingProvider(Provider):
@abstractmethod
async def close(self):
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from uuid import UUID

from core.base.api.models import UserResponse
from core.base.logging.base import RunType
from core.base.logger.base import RunType
from core.base.utils import generate_run_id

from .r2r_logger import PersistentLoggingProvider
from .base import PersistentLoggingProvider

run_id_var = contextvars.ContextVar("run_id", default=generate_run_id())

Expand Down
12 changes: 0 additions & 12 deletions py/core/base/logging/base.py

This file was deleted.

4 changes: 2 additions & 2 deletions py/core/base/pipeline/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import traceback
from typing import Any, AsyncGenerator, Optional

from ..logging.r2r_logger import PersistentLoggingProvider
from ..logging.run_manager import RunManager, manage_run
from ..logger.base import PersistentLoggingProvider
from ..logger.run_manager import RunManager, manage_run
from ..pipes.base_pipe import AsyncPipe, AsyncState

logger = logging.getLogger()
Expand Down
5 changes: 2 additions & 3 deletions py/core/base/pipes/base_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@

from pydantic import BaseModel

from core.base.logging import RunType
from core.base.logging.r2r_logger import PersistentLoggingProvider
from core.base.logging.run_manager import RunManager, manage_run
from core.base.logger.base import PersistentLoggingProvider, RunType
from core.base.logger.run_manager import RunManager, manage_run

logger = logging.getLogger()

Expand Down
2 changes: 2 additions & 0 deletions py/core/base/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
DocumentHandler,
FileHandler,
KGHandler,
LoggingHandler,
PostgresConfigurationSettings,
PromptHandler,
TokenHandler,
Expand Down Expand Up @@ -41,6 +42,7 @@
"CollectionHandler",
"TokenHandler",
"UserHandler",
"LoggingHandler",
"VectorHandler",
"KGHandler",
"PromptHandler",
Expand Down
155 changes: 155 additions & 0 deletions py/core/base/providers/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
CommunityReport,
Entity,
KGExtraction,
Message,
Triple,
VectorEntry,
)
Expand Down Expand Up @@ -50,6 +51,8 @@
)
from core.base.utils import _decorate_vector_type

from ..logger import RunInfoLog
from ..logger.base import RunType
from .base import Provider, ProviderConfig

"""Base classes for knowledge graph providers."""
Expand Down Expand Up @@ -960,6 +963,119 @@ async def get_files_overview(
pass


class LoggingHandler(Handler):
"""Abstract base class defining the interface for logging handlers."""

@abstractmethod
async def initialize(self) -> None:
"""Initialize the logging handler and create necessary tables."""
pass

@abstractmethod
async def close(self) -> None:
"""Close any open connections."""
pass

# Basic logging methods
@abstractmethod
async def log(self, run_id: UUID, key: str, value: str) -> None:
"""Log a key-value pair for a specific run."""
pass

@abstractmethod
async def info_log(
self, run_id: UUID, run_type: RunType, user_id: UUID
) -> None:
"""Log run information."""
pass

@abstractmethod
async def get_logs(
self, run_ids: List[UUID], limit_per_run: int = 10
) -> List[Dict]:
"""Retrieve logs for specified run IDs."""
pass

@abstractmethod
async def get_info_logs(
self,
offset: int = 0,
limit: int = 100,
run_type_filter: Optional[RunType] = None,
user_ids: Optional[List[UUID]] = None,
) -> List[RunInfoLog]:
"""Retrieve run information logs with filtering options."""
pass

# Conversation management methods
@abstractmethod
async def create_conversation(self) -> str:
"""Create a new conversation and return its ID."""
pass

@abstractmethod
async def delete_conversation(self, conversation_id: str) -> None:
"""Delete a conversation and all associated data."""
pass

@abstractmethod
async def get_conversations_overview(
self,
conversation_ids: Optional[List[UUID]] = None,
offset: int = 0,
limit: int = -1,
) -> Dict[str, Union[List[Dict], int]]:
"""Get an overview of conversations with pagination."""
pass

# Message management methods
@abstractmethod
async def add_message(
self,
conversation_id: str,
content: Message,
parent_id: Optional[str] = None,
metadata: Optional[Dict] = None,
) -> str:
"""Add a message to a conversation."""
pass

@abstractmethod
async def edit_message(
self, message_id: str, new_content: str
) -> Tuple[str, str]:
"""Edit an existing message and return new message ID and branch ID."""
pass

@abstractmethod
async def get_conversation(
self, conversation_id: str, branch_id: Optional[str] = None
) -> List[Tuple[str, Message]]:
"""Retrieve all messages in a conversation branch."""
pass

# Branch management methods
@abstractmethod
async def get_branches_overview(self, conversation_id: str) -> List[Dict]:
"""Get an overview of all branches in a conversation."""
pass

@abstractmethod
async def get_next_branch(self, current_branch_id: str) -> Optional[str]:
"""Get the ID of the next branch in chronological order."""
pass

@abstractmethod
async def get_prev_branch(self, current_branch_id: str) -> Optional[str]:
"""Get the ID of the previous branch in chronological order."""
pass

@abstractmethod
async def branch_at_message(self, message_id: str) -> str:
"""Create a new branch starting at a specific message."""
pass


class DatabaseProvider(Provider):
connection_manager: DatabaseConnectionManager
document_handler: DocumentHandler
Expand All @@ -970,6 +1086,7 @@ class DatabaseProvider(Provider):
kg_handler: KGHandler
prompt_handler: PromptHandler
file_handler: FileHandler
logging_handler: LoggingHandler
config: DatabaseConfig
project_name: str

Expand Down Expand Up @@ -1697,3 +1814,41 @@ async def get_files_overview(
return await self.file_handler.get_files_overview(
filter_document_ids, filter_file_names, offset, limit
)

async def log(
self,
run_id: UUID,
key: str,
value: str,
) -> None:
"""Add a new log entry."""
return await self.logging_handler.log(run_id, key, value)

async def info_log(
self,
run_id: UUID,
run_type: RunType,
user_id: UUID,
) -> None:
"""Add or update a log info entry."""
return await self.logging_handler.info_log(run_id, run_type, user_id)

async def get_info_logs(
self,
offset: int = 0,
limit: int = 100,
run_type_filter: Optional[RunType] = None,
user_ids: Optional[List[UUID]] = None,
) -> List[RunInfoLog]:
"""Retrieve log info entries with filtering and pagination."""
return await self.logging_handler.get_info_logs(
offset, limit, run_type_filter, user_ids
)

async def get_logs(
self,
run_ids: List[UUID],
limit_per_run: int = 10,
) -> List[Dict[str, Any]]:
"""Retrieve logs for specified run IDs with a per-run limit."""
return await self.logging_handler.get_logs(run_ids, limit_per_run)
3 changes: 2 additions & 1 deletion py/core/main/api/auth_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
SimpleOrchestrationProvider,
)

from ...base.logger.base import RunType
from ..services.auth_service import AuthService
from .base_router import BaseRouter, RunType
from .base_router import BaseRouter

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

Expand Down
2 changes: 1 addition & 1 deletion py/core/main/api/base_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fastapi.responses import StreamingResponse

from core.base import R2RException, manage_run
from core.base.logging.base import RunType
from core.base.logger.base import RunType
from core.providers import (
HatchetOrchestrationProvider,
SimpleOrchestrationProvider,
Expand Down
3 changes: 2 additions & 1 deletion py/core/main/api/ingestion_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
SimpleOrchestrationProvider,
)

from ...base.logger.base import RunType
from ..services.ingestion_service import IngestionService
from .base_router import BaseRouter, RunType
from .base_router import BaseRouter

logger = logging.getLogger()

Expand Down
3 changes: 2 additions & 1 deletion py/core/main/api/kg_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import yaml
from fastapi import Body, Depends, Query

from core.base import RunType, Workflow
from core.base import Workflow
from core.base.abstractions import EntityLevel, KGRunType
from core.base.api.models import (
WrappedKGCommunitiesResponse,
Expand All @@ -17,6 +17,7 @@
WrappedKGTriplesResponse,
WrappedKGTunePromptResponse,
)
from core.base.logger.base import RunType
from core.providers import (
HatchetOrchestrationProvider,
SimpleOrchestrationProvider,
Expand Down
5 changes: 3 additions & 2 deletions py/core/main/api/management_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@
WrappedUserOverviewResponse,
WrappedUsersInCollectionResponse,
)
from core.base.logging import AnalysisTypes, LogFilterCriteria
from core.base.logger import AnalysisTypes, LogFilterCriteria
from core.providers import (
HatchetOrchestrationProvider,
SimpleOrchestrationProvider,
)

from ...base.logger.base import RunType
from ..services.management_service import ManagementService
from .base_router import BaseRouter, RunType
from .base_router import BaseRouter


class ManagementRouter(BaseRouter):
Expand Down
2 changes: 1 addition & 1 deletion py/core/main/api/retrieval_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
KGSearchSettings,
Message,
R2RException,
RunType,
VectorSearchSettings,
)
from core.base.api.models import (
Expand All @@ -21,6 +20,7 @@
WrappedRAGResponse,
WrappedSearchResponse,
)
from core.base.logger.base import RunType
from core.providers import (
HatchetOrchestrationProvider,
SimpleOrchestrationProvider,
Expand Down
2 changes: 1 addition & 1 deletion py/core/main/assembly/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
RunManager,
)
from core.pipelines import KGEnrichmentPipeline, RAGPipeline, SearchPipeline
from core.providers.logging.r2r_logging import SqlitePersistentLoggingProvider
from core.providers.logger.r2r_logger import SqlitePersistentLoggingProvider

from ..abstractions import R2RProviders
from ..api.auth_router import AuthRouter
Expand Down
Loading

0 comments on commit d4a8047

Please sign in to comment.