Skip to content

Commit

Permalink
Calculate QR code time based on start time from file name and code po…
Browse files Browse the repository at this point in the history
…sition in video, WiP #96
  • Loading branch information
vmdocua committed Jun 27, 2024
1 parent 0782d8c commit cddb6de
Showing 1 changed file with 60 additions and 24 deletions.
84 changes: 60 additions & 24 deletions Parsing/parse_wQR.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import os
import re
from datetime import datetime
from datetime import datetime, timedelta
from typing import Optional

from pydantic import BaseModel, Field
Expand All @@ -30,33 +30,50 @@ class VideoTimeInfo(BaseModel):
error: Optional[str] = Field(None, description="Error message if any")
start_time: Optional[datetime] = Field(None, description="Start time of the video")
end_time: Optional[datetime] = Field(None, description="End time of the video")
duration_sec: Optional[float] = Field(None, description="Duration of the video in seconds")
duration_sec: Optional[float] = Field(None, description="Duration of the video "
"in seconds")


# Define the data model for the QR record
class QrRecord(BaseModel):
frame_start: int = Field(..., description="Frame number where QR code starts")
frame_start: int = Field(None, description="Frame number where QR code starts")
frame_end: int = Field(None, description="Frame number where QR code ends")
time_start: str = Field(..., description="Time where QR code starts")
time_end: str = Field(None, description="Time where QR code ends")
data: dict = Field(..., description="QR code data")
start_time: Optional[str] = Field(None, description="Time where QR code starts")
end_time: Optional[str] = Field(None, description="Time where QR code ends")
start_pos_sec: Optional[float] = Field(None, description="Position in seconds "
"where QR code starts")
end_pos_sec: Optional[float] = Field(None, description="Position in seconds "
"where QR code ends")
data: Optional[dict] = Field(None, description="QR code data")

def __str__(self):
return (f"QrRecord(frames=[{self.frame_start}...{self.frame_end}], "
f"time=[{self.time_start}..{self.time_end}], "
return (f"QrRecord(frames=[{self.frame_start}, {self.frame_end}], "
f"pos=[{self.start_pos_sec}, {self.end_pos_sec} sec], "
f"start_time={self.start_time}, end_time={self.end_time}], "
f"data={self.data})"
)


def calc_time(ts: datetime, pos_sec: float) -> datetime:
return ts + timedelta(seconds=pos_sec)


def get_iso_time(ts: str) -> datetime:
dt: datetime = datetime.fromisoformat(ts)
dt = dt.replace(tzinfo=None)
return dt


def get_video_time_info(path_video: str) -> VideoTimeInfo:
res: VideoTimeInfo = VideoTimeInfo(success=False, error=None,
start_time=None, end_time=None)
# Define the regex pattern for the timestamp and file extension (either .mkv or .mp4)
# Define the regex pattern for the timestamp and file extension
# (either .mkv or .mp4)
pattern = (r'^(\d{4}\.\d{2}\.\d{2}\.\d{2}\.\d{2}\.\d{2}\.\d{3})'
r'_(\d{4}\.\d{2}\.\d{2}\.\d{2}\.\d{2}\.\d{2}\.\d{3})\.(mkv|mp4)$')

file_name: str = os.path.basename(path_video)
logger.debug(f"Video file name : {file_name}")
logger.info(f"Video file name : {file_name}")

match = re.match(pattern, file_name)
if not match:
Expand Down Expand Up @@ -89,10 +106,24 @@ def get_video_time_info(path_video: str) -> VideoTimeInfo:
return res


def finalize_record(record: QrRecord, iframe: int) -> QrRecord:
def finalize_record(vti: VideoTimeInfo,
record: QrRecord, iframe: int,
pos_sec: float) -> QrRecord:
record.frame_end = iframe
record.time_end = 'TODO'
# Note: unclear should we also use last frame duration or not
record.end_time = calc_time(vti.start_time, pos_sec)
record.end_pos_sec = pos_sec
logger.info(f"QR: {str(record)}")
# dump times
event_time = get_iso_time(record.data['time_formatted'])
keys_time = get_iso_time(record.data['keys_time_str'])
logger.info(f" - QR code time : {record.start_time}")
logger.info(f" - Event time : "
f"{event_time} / "
f"dt={(event_time - record.start_time).total_seconds()} sec")
logger.info(f" - Keys time : "
f"{keys_time} / "
f"dt={(keys_time - record.start_time).total_seconds()} sec")
return None


Expand All @@ -102,9 +133,9 @@ def do_parse(path_video: str) -> int:
logger.error(f"Failed parse file name time patter, error: {vti.error}")
return 1

logger.debug(f"Video start time : {vti.start_time}")
logger.debug(f"Video end time : {vti.end_time}")
logger.debug(f"Video duration : {vti.duration_sec} sec")
logger.info(f"Video start time : {vti.start_time}")
logger.info(f"Video end time : {vti.end_time}")
logger.info(f"Video duration : {vti.duration_sec} sec")

starttime = time.time()
cap = cv2.VideoCapture(path_video)
Expand Down Expand Up @@ -143,18 +174,21 @@ def do_parse(path_video: str) -> int:

# TODO: just use tqdm for progress indication
iframe: int = 0
pos_sec: float = 0.0
record: QrRecord = None

while True:
iframe += 1
# pos time in ms
pos_sec = round((iframe-1) / fps, 3)
ret, frame = cap.read()
if not ret:
break

f = np.mean(frame, axis=2) # poor man greyscale from RGB

if np.mod(iframe, 50) == 0:
logger.info(f"iframe={iframe} {np.std(f)}")
logger.debug(f"iframe={iframe} {np.std(f)}")

# if np.std(f) > 10:
# cv2.imwrite('grayscale_image.png', f)
Expand All @@ -171,19 +205,21 @@ def do_parse(path_video: str) -> int:
logger.debug(f"Same QR code: continue")
continue
# It is a different QR code! we need to finalize current one
record = finalize_record(record, iframe)
record = finalize_record(vti, record, iframe, pos_sec)
# We just got beginning of the QR code!
logger.debug("New QR code: " + str(data))
record = QrRecord(frame_start=iframe, frame_end=-1,
time_start="TODO-figureout",
time_end="",
data=data)
record = QrRecord()
record.frame_start = iframe
record.start_time = calc_time(vti.start_time, pos_sec)
record.end_time = ""
record.start_pos_sec = pos_sec
record.data = data
else:
if record:
record = finalize_record(record, iframe)
record = finalize_record(vti, record, iframe, pos_sec)

if record:
record = finalize_record(record, iframe)
record = finalize_record(vti, record, iframe, pos_sec)


@click.command(help='Utility to parse video and locate integrated '
Expand All @@ -199,7 +235,7 @@ def main(ctx, path: str, log_level):
logger.setLevel(log_level)
logger.debug("parse_wQR.py tool")
logger.debug(f"Working dir : {os.getcwd()}")
logger.debug(f"Video full path : {path}")
logger.info(f"Video full path : {path}")

if not os.path.exists(path):
logger.error(f"Path does not exist: {path}")
Expand Down

0 comments on commit cddb6de

Please sign in to comment.