Skip to content

Commit

Permalink
feat: screen recorder
Browse files Browse the repository at this point in the history
  • Loading branch information
codematrixer committed Sep 9, 2024
1 parent 2e23e41 commit 4d7bcea
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 6 deletions.
3 changes: 2 additions & 1 deletion hmdriver2/_gesture.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import List, Union
from . import logger
from .utils import delay
from .driver import Driver
from .proto import HypiumResponse, Point
from .exception import InjectGestureError

Expand All @@ -13,7 +14,7 @@ class _Gesture:
SAMPLE_TIME_NORMAL = 50
SAMPLE_TIME_MAX = 100

def __init__(self, driver: "Driver", sampling_ms=50): # type: ignore
def __init__(self, driver: Driver, sampling_ms=50):
"""
Initialize a gesture object.
Expand Down
118 changes: 118 additions & 0 deletions hmdriver2/_screenrecord.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-

import socket
import json
import time
import os
import typing
import subprocess
import atexit
import hashlib
import threading
import numpy as np
from queue import Queue
from datetime import datetime

import cv2

from . import logger
from .driver import Driver
from .exception import ScreenRecordError


# Developing


class _ScreenRecord:
def __init__(self, driver: Driver):
self._client = driver._client
self.video_path = None

self.jpeg_queue = Queue()
self.threads: typing.List[threading.Thread] = []
self.stop_event = threading.Event()

def _send_message(self, api: str, args: list):
_msg = {
"module": "com.ohos.devicetest.hypiumApiHelper",
"method": "Captures",
"params": {
"api": api,
"args": args
},
"request_id": datetime.now().strftime("%Y%m%d%H%M%S%f")
}
self._client._send_msg(_msg)

def start(self, video_path: str):

self.video_path = video_path

self._send_message("startCaptureScreen", [])

reply: str = self._client._recv_msg(1024, decode=True)
if "true" in reply:
record_th = threading.Thread(target=self._record_worker)
writer_th = threading.Thread(target=self._video_writer)
record_th.start()
writer_th.start()
self.threads.extend([record_th, writer_th])
else:
raise ScreenRecordError("Failed to start device screen capture.")

def _record_worker(self):
"""Capture screen frames and save current frames."""

# JPEG start and end markers.
start_flag = b'\xff\xd8'
end_flag = b'\xff\xd9'
buffer = bytearray()
while not self.stop_event.is_set():
try:
buffer += self._client._recv_msg(4096 * 1024)
except Exception as e:
print(f"Error receiving data: {e}")
break

start_idx = buffer.find(start_flag)
end_idx = buffer.find(end_flag)
while start_idx != -1 and end_idx != -1 and end_idx > start_idx:
# Extract one JPEG image
jpeg_image: bytearray = buffer[start_idx:end_idx + 2]
self.jpeg_queue.put(jpeg_image)

buffer = buffer[end_idx + 2:]

# Search for the next JPEG image in the buffer
start_idx = buffer.find(start_flag)
end_idx = buffer.find(end_flag)

def _video_writer(self):
"""Write frames to video file."""
cv2_instance = None
while not self.stop_event.is_set():
if not self.jpeg_queue.empty():
jpeg_image = self.jpeg_queue.get(timeout=0.1)
img = cv2.imdecode(np.frombuffer(jpeg_image, np.uint8), cv2.IMREAD_COLOR)
if cv2_instance is None:
height, width = img.shape[:2]
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
cv2_instance = cv2.VideoWriter(self.video_path, fourcc, 10, (width, height))

cv2_instance.write(img)

if cv2_instance:
cv2_instance.release()

def stop(self) -> str:
try:
self.stop_event.set()
for t in self.threads:
t.join()

self._send_message("stopCaptureScreen", [])
self._client._recv_msg(1024, decode=True)
except Exception as e:
logger.error(f"An error occurred: {e}")

return self.video_path
16 changes: 11 additions & 5 deletions hmdriver2/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from ._uiobject import UiObject
from .hdc import list_devices
from .exception import DeviceNotFoundError
from ._gesture import _Gesture
from .proto import HypiumResponse, KeyCode, Point, DisplayRotation, DeviceInfo


Expand Down Expand Up @@ -270,10 +269,6 @@ def input_text(self, x, y, text: str):
point = self._to_abs_pos(x, y)
self.hdc.input_text(point.x, point.y, text)

@cached_property
def gesture(self):
return _Gesture(self)

def dump_hierarchy(self) -> Dict:
"""
Dump the UI hierarchy of the device screen.
Expand All @@ -282,3 +277,14 @@ def dump_hierarchy(self) -> Dict:
Dict: The dumped UI hierarchy as a dictionary.
"""
return self.hdc.dump_hierarchy()

@cached_property
def gesture(self):
from ._gesture import _Gesture
return _Gesture(self)

# @cached_property
# def screenrecord(self):
# # FIXME
# from ._screenrecord import _ScreenRecord
# return _ScreenRecord(self)
4 changes: 4 additions & 0 deletions hmdriver2/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ class InvokeHypiumError(Exception):

class InjectGestureError(Exception):
pass


class ScreenRecordError(Exception):
pass

0 comments on commit 4d7bcea

Please sign in to comment.