From 1e0519a60a13e8342de6e4b00558316625ec9730 Mon Sep 17 00:00:00 2001
From: gabriel-milan The prediction in the following format:
+ Returns: The camera with image and classification in the following format:
+{
+"id_camera": "1",
+"url_camera": "rtsp://…",
+"latitude": -22.912,
+"longitude": -43.230,
+"image_base64": "base64…",
+"ai_classification": [
{
"object": "alagamento",
"label": True,
"confidence": 0.7,
+}
+],
} The snapshot in base64 format. The camera with image in the following format:
+{
+"id_camera": "1",
+"url_camera": "rtsp://…",
+"latitude": -22.912,
+"longitude": -43.230,
+"attempt_classification": True,
+"image_base64": "base64…",
+}Module
pipelines.rj_escritorio.flooding_detection.flows<
)
from pipelines.utils.decorators import Flow
-filter_results = FilterTask(
- filter_func=lambda x: not isinstance(x, (BaseException, type(None)))
-)
-
with Flow(
name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API",
code_owners=[
@@ -115,7 +111,6 @@
Module
pipelines.rj_escritorio.flooding_detection.flows<
cameras_with_image = get_snapshot.map(
camera=cameras,
)
- cameras_with_image = filter_results(cameras_with_image)
cameras_with_image_and_classification = get_prediction.map(
camera_with_image=cameras_with_image,
flooding_prompt=unmapped(openai_flooding_detection_prompt),
@@ -124,9 +119,6 @@
Module
pipelines.rj_escritorio.flooding_detection.flows<
openai_api_max_tokens=unmapped(openai_api_max_tokens),
openai_api_url=unmapped(openai_api_url),
)
- cameras_with_image_and_classification = filter_results(
- cameras_with_image_and_classification
- )
update_flooding_api_data(
cameras_with_image_and_classification=cameras_with_image_and_classification,
data_key=redis_key_flooding_detection_data,
diff --git a/rj_escritorio/flooding_detection/schedules.html b/rj_escritorio/flooding_detection/schedules.html
index c468de3df..d00e26a0d 100644
--- a/rj_escritorio/flooding_detection/schedules.html
+++ b/rj_escritorio/flooding_detection/schedules.html
@@ -49,7 +49,7 @@
Module
pipelines.rj_escritorio.flooding_detection.schedu
constants.RJ_ESCRITORIO_AGENT_LABEL.value,
],
parameter_defaults={
- "cameras_geodf_url": "https://prefeitura-rio.github.io/storage/cameras_geo_min_bolsao.csv", # noqa
+ "cameras_geodf_url": "https://prefeitura-rio.github.io/storage/cameras_geo_min_bolsao_sample.csv", # noqa
"mocked_cameras_number": 0,
"openai_api_key_secret_path": "openai-api-key-flooding-detection",
"openai_api_max_tokens": 300,
diff --git a/rj_escritorio/flooding_detection/tasks.html b/rj_escritorio/flooding_detection/tasks.html
index a3832b4c5..16a2676ff 100644
--- a/rj_escritorio/flooding_detection/tasks.html
+++ b/rj_escritorio/flooding_detection/tasks.html
@@ -107,6 +107,7 @@
Module
pipelines.rj_escritorio.flooding_detection.tasks<
"latitude": -22.912,
"longitude": -43.230,
"image_base64": "base64...",
+ "attempt_classification": True,
}
flooding_prompt: The flooding prompt.
openai_api_key: The OpenAI API key.
@@ -114,17 +115,43 @@
Module
pipelines.rj_escritorio.flooding_detection.tasks<
openai_api_max_tokens: The OpenAI API max tokens.
openai_api_url: The OpenAI API URL.
- Returns:
- The prediction in the following format:
- {
- "object": "alagamento",
- "label": True,
- "confidence": 0.7,
- }
+ Returns: The camera with image and classification in the following format:
+ {
+ "id_camera": "1",
+ "url_camera": "rtsp://...",
+ "latitude": -22.912,
+ "longitude": -43.230,
+ "image_base64": "base64...",
+ "ai_classification": [
+ {
+ "object": "alagamento",
+ "label": True,
+ "confidence": 0.7,
+ }
+ ],
+ }
"""
# TODO:
# - Add confidence value
# Setup the request
+ if not camera_with_image["attempt_classification"]:
+ camera_with_image["ai_classification"] = [
+ {
+ "object": "alagamento",
+ "label": False,
+ "confidence": 0.7,
+ }
+ ]
+ return camera_with_image
+ if not camera_with_image["image_base64"]:
+ camera_with_image["ai_classification"] = [
+ {
+ "object": "alagamento",
+ "label": None,
+ "confidence": 0.7,
+ }
+ ]
+ return camera_with_image
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}",
@@ -153,12 +180,14 @@
Module
pipelines.rj_escritorio.flooding_detection.tasks<
response = requests.post(openai_api_url, headers=headers, json=payload)
data: dict = response.json()
if data.get("error"):
- raise RuntimeError(f"Failed to get prediction: {data['error']}")
- content: str = data["choices"][0]["message"]["content"]
- json_string = content.replace("```json\n", "").replace("\n```", "")
- json_object = json.loads(json_string)
- flooding_detected = json_object["flooding_detected"]
- log(f"Successfully got prediction: {flooding_detected}")
+ flooding_detected = None
+ log(f"Failed to get prediction: {data['error']}")
+ else:
+ content: str = data["choices"][0]["message"]["content"]
+ json_string = content.replace("```json\n", "").replace("\n```", "")
+ json_object = json.loads(json_string)
+ flooding_detected = json_object["flooding_detected"]
+ log(f"Successfully got prediction: {flooding_detected}")
camera_with_image["ai_classification"] = [
{
"object": "alagamento",
@@ -186,23 +215,36 @@
Module
pipelines.rj_escritorio.flooding_detection.tasks<
"url_camera": "rtsp://...",
"latitude": -22.912,
"longitude": -43.230,
+ "attempt_classification": True,
}
Returns:
- The snapshot in base64 format.
+ The camera with image in the following format:
+ {
+ "id_camera": "1",
+ "url_camera": "rtsp://...",
+ "latitude": -22.912,
+ "longitude": -43.230,
+ "attempt_classification": True,
+ "image_base64": "base64...",
+ }
"""
- rtsp_url = camera["url_camera"]
- cap = cv2.VideoCapture(rtsp_url)
- ret, frame = cap.read()
- if not ret:
- raise RuntimeError(f"Failed to get snapshot from URL {rtsp_url}.")
- cap.release()
- img = Image.fromarray(frame)
- buffer = io.BytesIO()
- img.save(buffer, format="JPEG")
- img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
- log(f"Successfully got snapshot from URL {rtsp_url}.")
- camera["image_base64"] = img_b64
+ try:
+ rtsp_url = camera["url_camera"]
+ cap = cv2.VideoCapture(rtsp_url)
+ ret, frame = cap.read()
+ if not ret:
+ raise RuntimeError(f"Failed to get snapshot from URL {rtsp_url}.")
+ cap.release()
+ img = Image.fromarray(frame)
+ buffer = io.BytesIO()
+ img.save(buffer, format="JPEG")
+ img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
+ log(f"Successfully got snapshot from URL {rtsp_url}.")
+ camera["image_base64"] = img_b64
+ except Exception:
+ log(f"Failed to get snapshot from URL {rtsp_url}.")
+ camera["image_base64"] = None
return camera
@@ -230,6 +272,7 @@
Module
pipelines.rj_escritorio.flooding_detection.tasks<
"url_camera": "rtsp://...",
"latitude": -22.912,
"longitude": -43.230,
+ "attempt_classification": True,
},
...
]
@@ -266,8 +309,10 @@
Module
pipelines.rj_escritorio.flooding_detection.tasks<
most_common_prediction = max(
set(predictions_buffer), key=predictions_buffer.count
)
+ # Get last prediction
+ last_prediction = predictions_buffer[-1]
# Add classifications
- if most_common_prediction or predictions_buffer[-1]:
+ if most_common_prediction or last_prediction:
row["status"] = "chuva moderada"
# Mock a few cameras when argument is set
@@ -278,12 +323,6 @@
Module
pipelines.rj_escritorio.flooding_detection.tasks<
df_cameras_h3.loc[mocked_index, "status"] = "chuva moderada"
log(f'Mocked camera ID: {df_cameras_h3.loc[mocked_index]["id_camera"]}')
- # Pick cameras
- mask = np.logical_not(df_cameras_h3["status"].isin(["sem chuva", "chuva fraca"]))
- df_cameras_h3 = df_cameras_h3[mask]
- log("Successfully picked cameras.")
- log(f"Picked cameras shape: {df_cameras_h3.shape}")
-
# Set output
output = []
for _, row in df_cameras_h3.iterrows():
@@ -294,6 +333,9 @@
Module
pipelines.rj_escritorio.flooding_detection.tasks<
"url_camera": row["rtsp"],
"latitude": row["geometry"].y,
"longitude": row["geometry"].x,
+ "attempt_classification": (
+ row["status"] not in ["sem chuva", "chuva fraca"]
+ ),
}
)
log(f"Picked cameras: {output}")
@@ -343,6 +385,21 @@
Module
pipelines.rj_escritorio.flooding_detection.tasks<
current_prediction = camera_with_image_and_classification["ai_classification"][
0
]["label"]
+ if current_prediction is None:
+ api_data.append(
+ {
+ "datetime": last_update.to_datetime_string(),
+ "id_camera": camera_with_image_and_classification["id_camera"],
+ "url_camera": camera_with_image_and_classification["url_camera"],
+ "latitude": camera_with_image_and_classification["latitude"],
+ "longitude": camera_with_image_and_classification["longitude"],
+ "image_base64": camera_with_image_and_classification[
+ "image_base64"
+ ],
+ "ai_classification": ai_classification,
+ }
+ )
+ continue
predictions_buffer_camera_key = f"{predictions_buffer_key}_{camera_with_image_and_classification['id_camera']}" # noqa
predictions_buffer = redis_add_to_prediction_buffer(
predictions_buffer_camera_key, current_prediction
@@ -352,27 +409,24 @@
Module
pipelines.rj_escritorio.flooding_detection.tasks<
set(predictions_buffer), key=predictions_buffer.count
)
# Add classifications
- if most_common_prediction:
- ai_classification.append(
- {
- "object": "alagamento",
- "label": True,
- "confidence": 0.7,
- }
- )
- api_data.append(
- {
- "datetime": last_update.to_datetime_string(),
- "id_camera": cameras_with_image_and_classification["id_camera"],
- "url_camera": cameras_with_image_and_classification["url_camera"],
- "latitude": cameras_with_image_and_classification["latitude"],
- "longitude": cameras_with_image_and_classification["longitude"],
- "image_base64": cameras_with_image_and_classification[
- "image_base64"
- ],
- "ai_classification": ai_classification,
- }
- )
+ ai_classification.append(
+ {
+ "object": "alagamento",
+ "label": most_common_prediction,
+ "confidence": 0.7,
+ }
+ )
+ api_data.append(
+ {
+ "datetime": last_update.to_datetime_string(),
+ "id_camera": camera_with_image_and_classification["id_camera"],
+ "url_camera": camera_with_image_and_classification["url_camera"],
+ "latitude": camera_with_image_and_classification["latitude"],
+ "longitude": camera_with_image_and_classification["longitude"],
+ "image_base64": camera_with_image_and_classification["image_base64"],
+ "ai_classification": ai_classification,
+ }
+ )
# Update API data
redis_client = get_redis_client(db=1)
@@ -469,6 +523,7 @@
@@ -655,6 +769,7 @@ Args
"latitude": -22.912,
"longitude": -43.230,
"image_base64": "base64…",
+"attempt_classification": True,
}
flooding_prompt
Args
openai_api_url
Returns
-
@@ -512,6 +575,7 @@
Returns
"latitude": -22.912,
"longitude": -43.230,
"image_base64": "base64...",
+ "attempt_classification": True,
}
flooding_prompt: The flooding prompt.
openai_api_key: The OpenAI API key.
@@ -519,17 +583,43 @@ Returns
openai_api_max_tokens: The OpenAI API max tokens.
openai_api_url: The OpenAI API URL.
- Returns:
- The prediction in the following format:
- {
- "object": "alagamento",
- "label": True,
- "confidence": 0.7,
- }
+ Returns: The camera with image and classification in the following format:
+ {
+ "id_camera": "1",
+ "url_camera": "rtsp://...",
+ "latitude": -22.912,
+ "longitude": -43.230,
+ "image_base64": "base64...",
+ "ai_classification": [
+ {
+ "object": "alagamento",
+ "label": True,
+ "confidence": 0.7,
+ }
+ ],
+ }
"""
# TODO:
# - Add confidence value
# Setup the request
+ if not camera_with_image["attempt_classification"]:
+ camera_with_image["ai_classification"] = [
+ {
+ "object": "alagamento",
+ "label": False,
+ "confidence": 0.7,
+ }
+ ]
+ return camera_with_image
+ if not camera_with_image["image_base64"]:
+ camera_with_image["ai_classification"] = [
+ {
+ "object": "alagamento",
+ "label": None,
+ "confidence": 0.7,
+ }
+ ]
+ return camera_with_image
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}",
@@ -558,12 +648,14 @@ Returns
response = requests.post(openai_api_url, headers=headers, json=payload)
data: dict = response.json()
if data.get("error"):
- raise RuntimeError(f"Failed to get prediction: {data['error']}")
- content: str = data["choices"][0]["message"]["content"]
- json_string = content.replace("```json\n", "").replace("\n```", "")
- json_object = json.loads(json_string)
- flooding_detected = json_object["flooding_detected"]
- log(f"Successfully got prediction: {flooding_detected}")
+ flooding_detected = None
+ log(f"Failed to get prediction: {data['error']}")
+ else:
+ content: str = data["choices"][0]["message"]["content"]
+ json_string = content.replace("```json\n", "").replace("\n```", "")
+ json_object = json.loads(json_string)
+ flooding_detected = json_object["flooding_detected"]
+ log(f"Successfully got prediction: {flooding_detected}")
camera_with_image["ai_classification"] = [
{
"object": "alagamento",
@@ -588,10 +680,19 @@ Args
"url_camera": "rtsp://…",
"latitude": -22.912,
"longitude": -43.230,
+"attempt_classification": True,
}
Returns
-
Expand source code
@@ -613,23 +714,36 @@
Returns
"url_camera": "rtsp://...",
"latitude": -22.912,
"longitude": -43.230,
+ "attempt_classification": True,
}
Returns:
- The snapshot in base64 format.
+ The camera with image in the following format:
+ {
+ "id_camera": "1",
+ "url_camera": "rtsp://...",
+ "latitude": -22.912,
+ "longitude": -43.230,
+ "attempt_classification": True,
+ "image_base64": "base64...",
+ }
"""
- rtsp_url = camera["url_camera"]
- cap = cv2.VideoCapture(rtsp_url)
- ret, frame = cap.read()
- if not ret:
- raise RuntimeError(f"Failed to get snapshot from URL {rtsp_url}.")
- cap.release()
- img = Image.fromarray(frame)
- buffer = io.BytesIO()
- img.save(buffer, format="JPEG")
- img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
- log(f"Successfully got snapshot from URL {rtsp_url}.")
- camera["image_base64"] = img_b64
+ try:
+ rtsp_url = camera["url_camera"]
+ cap = cv2.VideoCapture(rtsp_url)
+ ret, frame = cap.read()
+ if not ret:
+ raise RuntimeError(f"Failed to get snapshot from URL {rtsp_url}.")
+ cap.release()
+ img = Image.fromarray(frame)
+ buffer = io.BytesIO()
+ img.save(buffer, format="JPEG")
+ img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
+ log(f"Successfully got snapshot from URL {rtsp_url}.")
+ camera["image_base64"] = img_b64
+ except Exception:
+ log(f"Failed to get snapshot from URL {rtsp_url}.")
+ camera["image_base64"] = None
return cameraReturns
"url_camera": "rtsp://…",
"latitude": -22.912,
"longitude": -43.230,
+"attempt_classification": True,
},
…
]