Skip to content

Commit

Permalink
Change Fastapi to Starlette
Browse files Browse the repository at this point in the history
  • Loading branch information
talsabagport committed Nov 4, 2024
1 parent 751f26e commit d88c644
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 79 deletions.
17 changes: 14 additions & 3 deletions integrations/jira/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import json
from enum import StrEnum
from typing import Any

from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route

from jira.client import JiraClient
from loguru import logger
from port_ocean.context.ocean import ocean
Expand Down Expand Up @@ -59,8 +64,9 @@ async def on_resync_issues(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
yield issues


@ocean.router.post("/webhook")
async def handle_webhook_request(data: dict[str, Any]) -> dict[str, Any]:
async def handle_webhook_request(request: Request) -> JSONResponse:
raw_data = await request.body()
data = json.loads(raw_data)
client = JiraClient(
ocean.integration_config["jira_host"],
ocean.integration_config["atlassian_user_email"],
Expand All @@ -76,7 +82,12 @@ async def handle_webhook_request(data: dict[str, Any]) -> dict[str, Any]:
issue = await client.get_single_issue(data["issue"]["key"])
await ocean.register_raw(ObjectKind.ISSUE, [issue])
logger.info("Webhook event processed")
return {"ok": True}
return JSONResponse({"ok": True})


ocean.router.routes.append(
Route("/webhook", methods=["post"], endpoint=handle_webhook_request)
)


# Called once when the integration starts.
Expand Down
4 changes: 2 additions & 2 deletions port_ocean/context/ocean.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Callable, TYPE_CHECKING, Any, Literal, Union

from fastapi import APIRouter
from pydantic.main import BaseModel
from starlette.routing import Router
from werkzeug.local import LocalProxy

from port_ocean.clients.port.types import UserAgentType
Expand Down Expand Up @@ -45,7 +45,7 @@ def config(self) -> "IntegrationConfiguration":
return self.app.config

@property
def router(self) -> APIRouter:
def router(self) -> Router:
return self.app.integration_router

@property
Expand Down
9 changes: 4 additions & 5 deletions port_ocean/core/event_listener/http.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Literal, Any

from fastapi import APIRouter
from loguru import logger
from pydantic import AnyHttpUrl
from pydantic.fields import Field
from starlette.requests import Request
from starlette.routing import Router, Route

from port_ocean.context.ocean import ocean
from port_ocean.core.event_listener.base import (
Expand Down Expand Up @@ -60,10 +61,8 @@ async def _start(self) -> None:
It sets up an APIRouter to handle the `/resync` endpoint and registers the "on_resync" event handler.
"""
logger.info("Setting up HTTP Event Listener")
target_channel_router = APIRouter()

@target_channel_router.post("/resync")
async def resync() -> None:
async def resync(request: Request) -> None:
await self._resync({})

ocean.app.fast_api_app.include_router(target_channel_router)
ocean.router.routes.append(Route("/resync", methods=["post"], endpoint=resync))
2 changes: 1 addition & 1 deletion port_ocean/exceptions/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc

from fastapi.responses import Response, PlainTextResponse
from port_ocean.exceptions.base import BaseOceanException
from starlette.responses import Response, PlainTextResponse


class BaseAPIException(BaseOceanException, abc.ABC):
Expand Down
107 changes: 52 additions & 55 deletions port_ocean/middlewares.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,70 @@
from typing import Callable, Awaitable

from fastapi import Request, Response
from loguru import logger
from starlette.requests import Request
from starlette.responses import Response

from port_ocean.exceptions.api import BaseAPIException, InternalServerException
from .context.event import event_context, EventType
from .context.ocean import ocean
from .utils.misc import get_time, generate_uuid
from starlette.middleware.base import (
BaseHTTPMiddleware,
RequestResponseEndpoint,
)


async def _handle_silently(
call_next: Callable[[Request], Awaitable[Response]], request: Request
) -> Response:
response: Response
try:
if request.url.path.startswith("/integration"):
async with event_context(EventType.HTTP_REQUEST, trigger_type="request"):
await ocean.integration.port_app_config_handler.get_port_app_config()
response = await call_next(request)
else:
response = await call_next(request)
class RequestHandlerMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
start_time = get_time(seconds_precision=False)
request_id = generate_uuid()

except BaseAPIException as ex:
response = ex.response()
if response.status_code < 500:
logger.bind(exception=str(ex)).info(
"Request did not succeed due to client-side error"
with logger.contextualize(request_id=request_id):
log_level = (
"DEBUG"
if request.url.path == "/docs" or request.url.path == "/openapi.json"
else "INFO"
)
else:
logger.opt(exception=True).warning(
"Request did not succeed due to server-side error"
logger.bind(url=str(request.url), method=request.method).log(
log_level, f"Request to {request.url.path} started"
)
response = await self._handle_silently(request, call_next)

except Exception:
logger.opt(exception=True).error("Request failed due to unexpected error")
response = InternalServerException().response()

return response
end_time = get_time(seconds_precision=False)
time_elapsed = round(end_time - start_time, 5)
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = str(time_elapsed)
logger.bind(
time_elapsed=time_elapsed, response_status=response.status_code
).log(log_level, f"Request to {request.url.path} ended")

return response

async def request_handler(
request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response:
"""Middleware used by FastAPI to process each request, featuring:
- Contextualize request logs with a unique Request ID (UUID4) for each unique request.
- Catch exceptions during the request handling. Translate custom API exceptions into responses,
or treat (and log) unexpected exceptions.
"""
start_time = get_time(seconds_precision=False)
request_id = generate_uuid()
async def _handle_silently(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
response: Response
try:
if request.url.path.startswith("/integration"):
async with event_context(
EventType.HTTP_REQUEST, trigger_type="request"
):
await ocean.integration.port_app_config_handler.get_port_app_config()
response = await call_next(request)
else:
response = await call_next(request)

with logger.contextualize(request_id=request_id):
log_level = (
"DEBUG"
if request.url.path == "/docs" or request.url.path == "/openapi.json"
else "INFO"
)
logger.bind(url=str(request.url), method=request.method).log(
log_level, f"Request to {request.url.path} started"
)
response = await _handle_silently(call_next, request)
except BaseAPIException as ex:
response = ex.response()
if response.status_code < 500:
logger.bind(exception=str(ex)).info(
"Request did not succeed due to client-side error"
)
else:
logger.opt(exception=True).warning(
"Request did not succeed due to server-side error"
)

end_time = get_time(seconds_precision=False)
time_elapsed = round(end_time - start_time, 5)
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = str(time_elapsed)
logger.bind(
time_elapsed=time_elapsed, response_status=response.status_code
).log(log_level, f"Request to {request.url.path} ended")
except Exception:
logger.opt(exception=True).error("Request failed due to unexpected error")
response = InternalServerException().response()

return response
37 changes: 25 additions & 12 deletions port_ocean/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
from contextlib import asynccontextmanager
from typing import Callable, Any, Dict, AsyncIterator, Type

from fastapi import FastAPI, APIRouter
from loguru import logger
from pydantic import BaseModel
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route, Mount, Router
from starlette.types import Scope, Receive, Send

from port_ocean.core.handlers.resync_state_updater import ResyncStateUpdater
Expand All @@ -22,7 +26,7 @@
)
from port_ocean.core.integrations.base import BaseIntegration
from port_ocean.log.sensetive import sensitive_log_filter
from port_ocean.middlewares import request_handler
from port_ocean.middlewares import RequestHandlerMiddleware
from port_ocean.utils.repeat import repeat_every
from port_ocean.utils.signal import signal_handler
from port_ocean.version import __integration_version__
Expand All @@ -32,15 +36,16 @@
class Ocean:
def __init__(
self,
app: FastAPI | None = None,
app: Starlette | None = None,
integration_class: Callable[[PortOceanContext], BaseIntegration] | None = None,
integration_router: APIRouter | None = None,
integration_router: Router | None = None,
config_factory: Type[BaseModel] | None = None,
config_override: Dict[str, Any] | None = None,
):
initialize_port_ocean_context(self)
self.fast_api_app = app or FastAPI()
self.fast_api_app.middleware("http")(request_handler)

self.starlette_app = app or Starlette()
self.integration_router = integration_router or Router()

self.config = IntegrationConfiguration(
# type: ignore
Expand All @@ -52,7 +57,6 @@ def __init__(
sensitive_log_filter.hide_sensitive_strings(
*self.config.get_sensitive_fields_data()
)
self.integration_router = integration_router or APIRouter()

self.port_client = PortClient(
base_url=self.config.port.base_url,
Expand Down Expand Up @@ -113,10 +117,8 @@ async def execute_resync_all() -> None:
await repeated_function()

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
self.fast_api_app.include_router(self.integration_router, prefix="/integration")

@asynccontextmanager
async def lifecycle(_: FastAPI) -> AsyncIterator[None]:
async def lifespan(_: Starlette) -> AsyncIterator[None]:
try:
await self.integration.start()
await self._setup_scheduled_resync()
Expand All @@ -127,5 +129,16 @@ async def lifecycle(_: FastAPI) -> AsyncIterator[None]:
finally:
signal_handler.exit()

self.fast_api_app.router.lifespan_context = lifecycle
await self.fast_api_app(scope, receive, send)
async def health(request: Request) -> JSONResponse:
return JSONResponse({"ok": True})

self.starlette_app = Starlette(
routes=[
Route("/docs", endpoint=health),
Mount("/integration", routes=self.integration_router.routes),
],
middleware=[Middleware(RequestHandlerMiddleware)],
lifespan=lifespan,
)

await self.starlette_app(scope, receive, send)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.12.7"
version = "0.12.7-dev01"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit d88c644

Please sign in to comment.