Skip to content

Commit

Permalink
Merge pull request #883 from roflcoopter/feature/websocket-api-async
Browse files Browse the repository at this point in the history
Async Websocket API
  • Loading branch information
roflcoopter authored Jan 10, 2025
2 parents 9ffcb3a + 5ea744f commit 4736501
Show file tree
Hide file tree
Showing 19 changed files with 365 additions and 314 deletions.
4 changes: 2 additions & 2 deletions tests/components/webserver/api/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ def test_get(self):
"""Handle get request."""
self.write({"test": "test"})

def test_camera_identifier(self, camera_identifier):
async def test_camera_identifier(self, camera_identifier):
"""Handle request with camera_identifier."""
self.response_success(response={"camera_identifier": camera_identifier})
await self.response_success(response={"camera_identifier": camera_identifier})

def test_error(self):
"""Handle error."""
Expand Down
47 changes: 42 additions & 5 deletions viseron/components/data_stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

import fnmatch
import inspect
import logging
import multiprocessing as mp
import subprocess
Expand Down Expand Up @@ -151,6 +152,37 @@ def remove_all_subscriptions() -> None:
DataStream._subscribers.clear()
DataStream._wildcard_subscribers.clear()

async def run_callback_in_ioloop(
self, callback: Callable, data: Any, ioloop: IOLoop
) -> None:
"""Run callback in IOLoop."""

def _wrapper():
IOLoop.current()
if data:
callback(data)
return
callback()

if inspect.iscoroutinefunction(callback):
if data:
await callback(data)
return
await callback()
else:
await ioloop.run_in_executor(None, _wrapper)

async def put_tornado_queue(
self, queue: tornado_queue, data: Any, ioloop: IOLoop
) -> None:
"""Put data in tornado queue."""

def _wrapper():
IOLoop.current()
helpers.pop_if_full(queue, data)

await ioloop.run_in_executor(None, _wrapper)

def run_callbacks(
self,
callbacks: dict[uuid.UUID, DataSubscriber],
Expand Down Expand Up @@ -202,10 +234,12 @@ def run_callbacks(
continue

if callable(callback["callback"]) and callback["ioloop"] is not None:
if data:
callback["ioloop"].add_callback(callback["callback"], data)
else:
callback["ioloop"].add_callback(callback["callback"])
callback["ioloop"].add_callback(
self.run_callback_in_ioloop,
callback["callback"],
data,
callback["ioloop"],
)
continue

if isinstance(callback["callback"], Queue):
Expand All @@ -216,7 +250,10 @@ def run_callbacks(
callback["callback"], tornado_queue
):
callback["ioloop"].add_callback(
helpers.pop_if_full, callback["callback"], data
self.put_tornado_queue,
callback["callback"],
data,
callback["ioloop"],
)
continue

Expand Down
50 changes: 29 additions & 21 deletions viseron/components/webserver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from __future__ import annotations

import asyncio
import concurrent
import logging
import secrets
import threading
Expand Down Expand Up @@ -260,6 +259,9 @@ def __init__(self, vis: Viseron, config) -> None:

self._asyncio_ioloop = asyncio.new_event_loop()
asyncio.set_event_loop(self._asyncio_ioloop)
if config[CONFIG_DEBUG]:
self._asyncio_ioloop.set_debug(True)

self._application = create_application(vis, config, self._store.cookie_secret)
self._httpserver = None
try:
Expand Down Expand Up @@ -305,27 +307,33 @@ def run(self) -> None:
def stop(self) -> None:
"""Stop ioloop."""
LOGGER.debug("Stopping webserver")
futures = []
connection: WebSocketHandler
for connection in self._vis.data[WEBSOCKET_CONNECTIONS]:
LOGGER.debug("Closing websocket connection, %s", connection)
futures.append(
asyncio.run_coroutine_threadsafe(
connection.force_close(), self._asyncio_ioloop
)
)

for future in concurrent.futures.as_completed(futures):
# Await results
future.result()

asyncio.set_event_loop(self._asyncio_ioloop)
for task in asyncio.all_tasks(self._asyncio_ioloop):
task.cancel()

if self._httpserver:
LOGGER.debug("Stopping HTTPServer")
self._httpserver.stop()

LOGGER.debug("Stopping IOloop")
self._ioloop.add_callback(self._ioloop.stop)
shutdown_event = threading.Event()

async def shutdown():
connection: WebSocketHandler
for connection in self._vis.data[WEBSOCKET_CONNECTIONS]:
LOGGER.debug("Closing websocket connection, %s", connection)
await connection.force_close()

tasks = [
t
for t in asyncio.all_tasks(self._asyncio_ioloop)
if t is not asyncio.current_task()
]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
LOGGER.debug("Stopping IOloop")
self._asyncio_ioloop.stop()
self._ioloop.stop()
LOGGER.debug("IOloop stopped")
shutdown_event.set()

self._ioloop.add_callback(shutdown)
self.join()
shutdown_event.wait()
LOGGER.debug("Webserver stopped")
10 changes: 7 additions & 3 deletions viseron/components/webserver/api/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,14 @@ def json_body(self, value) -> None:
"""Set JSON body."""
self._json_body = value

def response_success(
async def response_success(
self, *, status: HTTPStatus = HTTPStatus.OK, response=None, headers=None
) -> None:
"""Send successful response."""

def _json_dumps() -> str:
return partial(json.dumps, cls=JSONEncoder, allow_nan=False)(response)

if response is None:
response = {"success": True}
self.set_status(status)
Expand All @@ -84,10 +88,10 @@ def response_success(
self.set_header(header, value)

if isinstance(response, dict):
self.finish(partial(json.dumps, cls=JSONEncoder, allow_nan=False)(response))
await self.finish(await self.run_in_executor(_json_dumps))
return

self.finish(response)
await self.finish(response)

def response_error(self, status_code: HTTPStatus, reason: str) -> None:
"""Send error response."""
Expand Down
12 changes: 6 additions & 6 deletions viseron/components/webserver/api/v1/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async def auth_enabled(self) -> None:
if self._webserver.auth
else False,
}
self.response_success(response=response)
await self.response_success(response=response)

async def auth_create(self) -> None:
"""Create a new user."""
Expand All @@ -105,7 +105,7 @@ async def auth_create(self) -> None:
except UserExistsError as error:
self.response_error(HTTPStatus.BAD_REQUEST, reason=str(error))
return
self.response_success()
await self.response_success()

async def auth_user(self, user_id: str) -> None:
"""Get a user.
Expand All @@ -116,7 +116,7 @@ async def auth_user(self, user_id: str) -> None:
if user is None:
self.response_error(HTTPStatus.NOT_FOUND, reason="User not found")
return
self.response_success(
await self.response_success(
response={
"name": user.name,
"username": user.username,
Expand Down Expand Up @@ -152,7 +152,7 @@ async def auth_login(self) -> None:

self.set_cookies(refresh_token, access_token, user, new_session=True)

self.response_success(
await self.response_success(
response=token_response(
refresh_token,
access_token,
Expand All @@ -173,7 +173,7 @@ async def auth_logout(self) -> None:
)

self.clear_all_cookies()
self.response_success()
await self.response_success()

def _handle_refresh_token(
self,
Expand Down Expand Up @@ -218,7 +218,7 @@ async def auth_token(self) -> None:
if self.json_body["grant_type"] == "refresh_token":
status, response = await self.run_in_executor(self._handle_refresh_token)
if status == HTTPStatus.OK:
self.response_success(response=response)
await self.response_success(response=response)
return
self.clear_all_cookies()
# Mypy doesn't understand that status is HTTPStatus.BAD_REQUEST here
Expand Down
6 changes: 4 additions & 2 deletions viseron/components/webserver/api/v1/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ async def get_snapshot(self, camera_identifier: str) -> None:
)
return

self.response_success(response=jpg, headers={"Content-Type": "image/jpeg"})
await self.response_success(
response=jpg, headers={"Content-Type": "image/jpeg"}
)
return

async def get_camera_endpoint(self, camera_identifier: str) -> None:
Expand All @@ -164,5 +166,5 @@ async def get_camera_endpoint(self, camera_identifier: str) -> None:
)
return

self.response_success(response=camera.as_dict())
await self.response_success(response=camera.as_dict())
return
4 changes: 2 additions & 2 deletions viseron/components/webserver/api/v1/cameras.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class CamerasAPIHandler(BaseAPIHandler):

async def get_cameras(self) -> None:
"""Return cameras."""
self.response_success(
await self.response_success(
response=self._vis.data[REGISTERED_DOMAINS].get(CAMERA_DOMAIN, {})
)

Expand All @@ -39,4 +39,4 @@ async def get_failed_cameras(self) -> None:
self._vis.data[DOMAIN_FAILED].get(CAMERA_DOMAIN, {}).values()
):
failed_cameras[failed_camera.identifier] = failed_camera.error_instance
self.response_success(response=failed_cameras)
await self.response_success(response=failed_cameras)
2 changes: 1 addition & 1 deletion viseron/components/webserver/api/v1/compreface.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,4 @@ async def update_subjects(self) -> None:
)
response = {}
response["added_subjects"] = added_subjects
self.response_success(response=response)
await self.response_success(response=response)
2 changes: 1 addition & 1 deletion viseron/components/webserver/api/v1/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ def read_config() -> str:
return config_file.read()

config = await self.run_in_executor(read_config)
self.response_success(response=config)
await self.response_success(response=config)
21 changes: 13 additions & 8 deletions viseron/components/webserver/api/v1/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,18 @@ async def get_events(
time_to,
)

sorted_events = sorted(
motion_events + recording_events + object_events + post_processor_events,
key=lambda k: k["created_at"],
reverse=True,
)
def sort_events():
return sorted(
motion_events
+ recording_events
+ object_events
+ post_processor_events,
key=lambda k: k["created_at"],
reverse=True,
)

self.response_success(response={"events": sorted_events})
sorted_events = await self.run_in_executor(sort_events)
await self.response_success(response={"events": sorted_events})

def _events_amount(
self,
Expand Down Expand Up @@ -389,7 +394,7 @@ async def get_events_amount(
self._get_session,
[camera.identifier],
)
self.response_success(response={"events_amount": events_amount})
await self.response_success(response={"events_amount": events_amount})

async def post_events_amount_multiple(self):
"""Get amount of events per day for multiple cameras.
Expand All @@ -401,4 +406,4 @@ async def post_events_amount_multiple(self):
self._get_session,
self.json_body["camera_identifiers"],
)
self.response_success(response={"events_amount": events_amount})
await self.response_success(response={"events_amount": events_amount})
6 changes: 3 additions & 3 deletions viseron/components/webserver/api/v1/hls.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def get_recording_hls_playlist(
self.set_header("Content-Type", "application/x-mpegURL")
self.set_header("Cache-Control", "no-cache")
self.set_header("Access-Control-Allow-Origin", "*")
self.response_success(response=playlist)
await self.response_success(response=playlist)

async def get_hls_playlist_time_period(
self,
Expand Down Expand Up @@ -169,7 +169,7 @@ async def get_hls_playlist_time_period(
self.set_header("Content-Type", "application/x-mpegURL")
self.set_header("Cache-control", "no-cache, must-revalidate, max-age=0")
self.set_header("Access-Control-Allow-Origin", "*")
self.response_success(response=playlist)
await self.response_success(response=playlist)

async def get_available_timespans(
self,
Expand Down Expand Up @@ -203,7 +203,7 @@ async def get_available_timespans(
time_from,
time_to,
)
self.response_success(response={"timespans": timespans})
await self.response_success(response={"timespans": timespans})


def _get_init_file(
Expand Down
2 changes: 1 addition & 1 deletion viseron/components/webserver/api/v1/onboarding.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def onboarding(self) -> None:

self.set_cookies(refresh_token, access_token, user, new_session=True)

self.response_success(
await self.response_success(
response=token_response(
refresh_token,
access_token,
Expand Down
Loading

0 comments on commit 4736501

Please sign in to comment.