-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtracker.py
337 lines (266 loc) · 12.8 KB
/
tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import cv2
import ast
class Tracking:
# Constant
DEFAULT_TRACK_RATE = 30
DEFAULT_MIN_SIZE = 750
DEFAULT_IOU_THRESHOLD = 0.87
def __init__(self, track_rate=DEFAULT_TRACK_RATE, ignore_path=None, min_size=DEFAULT_MIN_SIZE, iou_threshold=DEFAULT_IOU_THRESHOLD):
"""
Initializes a Tracking object for tracking objects and detect stationary objects.
Parameters:
track_rate (int, optional): The rate at which tracking is performed (in frames). Default is 30.
ignore_path (str, optional): Path to a file containing ignored regions for tracking.
min_size (int, optional): Minimum size threshold for considering detected objects. Default is 750.
iou_threshold (float, optional): Intersection over Union threshold for object overlap. Default is 0.87.
"""
self.TRACK_RATE = track_rate
self.MIN_SIZE_THRESHOLD = min_size
self.IOU_THRESHOLD = iou_threshold
self.ignores = self.get_ignore_list(ignore_path) if ignore_path else None
self.stationary_objects = {}
def get_contours(self, image):
"""
Finds contours in a binary image and arrange it from largest area to smallest.
Parameters:
image (numpy.ndarray): The input binary image.
Returns:
list: A list of contours found in the image sorted from largest to smallest area.
"""
contours = cv2.findContours(image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contours = contours[0] if len(contours) == 2 else contours[1]
contours_sorted = sorted(contours, key=lambda x: cv2.contourArea(x), reverse=True)
return contours_sorted
def get_ignore_list(self, directory):
"""
Read and parse a file containing locations and return a list of parsed locations.
Parameters:
directory (str): Path to the file containing locations.
Returns:
list: A list of parsed locations.
"""
locs = []
with open(directory, "r") as file:
lines = file.readlines()
for line in lines:
locs.append(ast.literal_eval(line.replace("\n", "")))
return locs
def get_stationary_objects(self):
"""
Get the dictionary of stationary objects.
Returns:
dict: The dictionary containing stationary objects.
"""
return self.stationary_objects
def set_label(self, frame, fps, thickness=2, color=(51, 153, 255), font_size=0.7):
"""
Annotates an image on a bounding box with relevant information such as ID and stationary time.
Args:
frame (numpy.ndarray): The input image or frame to annotate.
thickness (int, optional): The thickness of the text. Default is 2.
color (tuple, optional): The color of the text in BGR format. Default is orange (51, 153, 255)
font_size (float, optional): The font size of the text. Default is 0.7.
Returns:
numpy.ndarray: An annotated image with bounding box and text information.
Note:
This function uses the OpenCV library to draw text on the input frame.
"""
for key, value in self.stationary_objects.items():
[active, start_frame, end_frame, position] = value
if active:
duration = (end_frame - start_frame) // fps
x1, y1, x2, y2 = position
# Draw the object identifier (key)
frame = cv2.putText(
frame,
f'ID={key}',
org=(x1 + 2, y2 - 4),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=font_size,
color=color,
thickness=thickness,
lineType=cv2.LINE_AA
)
# Draw the time duration in minutes and seconds
frame = cv2.putText(
frame,
f'{int(duration // 60):d}m {int(duration % 60):d}s',
org=(x1 + 2, y1 - 4),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=font_size,
color=color,
thickness=thickness,
lineType=cv2.LINE_AA
)
return frame
def is_overlapping(self, new_box, existing_boxes, iou_threshold=None, epsilon=1e-5):
"""
Checks if a new bounding box overlaps with any of the existing bounding boxes.
Parameters:
new_box (tuple): Bounding box coordinates (x1, y1, x2, y2) of the new object.
existing_boxes (list): List of existing bounding boxes.
iou_threshold (float): IoU threshold for considering overlap.
epsilon: (float) Small value to prevent division by zero. Default=1e-5
Returns:
bool: True if the new box overlaps with any existing box, False otherwise.
"""
#Determine IoU threshold
iou_thres = self.IOU_THRESHOLD if iou_threshold is None else iou_threshold
# Case where nothing to compare to
if not existing_boxes:
return False
# Comparing
for existing_box in existing_boxes:
# Coordinates of the intersection box
x1 = max(new_box[0], existing_box[0])
y1 = max(new_box[1], existing_box[1])
x2 = min(new_box[2], existing_box[2])
y2 = min(new_box[3], existing_box[3])
# AREA OF OVERLAP - Area where the boxes intersect
width = x2 - x1
height = y2 - y1
# Handle case where there is NO overlap
if (width < 0) or (height < 0):
continue
# COMBINED AREA
area_overlap = width * height
area_a = (new_box[2] - new_box[0]) * (new_box[3] - new_box[1])
area_b = (existing_box[2] - existing_box[0]) * (existing_box[3] - existing_box[1])
area_combined = area_a + area_b - area_overlap
# RATIO OF AREA OF OVERLAP OVER COMBINED AREA
iou = area_overlap / (area_combined + epsilon)
# Determine if it is overlapping
if iou > iou_thres:
return True
return False
def filter_ignored_objects(self, detected_objects):
"""
Filters out detected objects that overlap with ignored regions.
Parameters:
detected_objects (list): List of detected object bounding boxes.
Returns:
list: List of filtered detected object bounding boxes.
"""
filtered_objects = []
for obj_box in detected_objects:
is_ignored = False
if self.is_overlapping(obj_box, self.ignores, iou_threshold=0.2):
is_ignored = True
if not is_ignored:
filtered_objects.append(obj_box)
return filtered_objects
def reset_stationary_status(self):
"""
Reset the stationary status of objects in the stationary_objects dictionary to False.
"""
for key, value in self.stationary_objects.items():
value[0] = False
def find_objects(self, frame, difference_mask):
"""
Detects and highlights new objects in an image.
Parameters:
frame (numpy.ndarray): The input image.
difference_mask (numpy.ndarray): The difference image between two images.
Returns:
numpy.ndarray: The image with rectangles drawn around the detected objects.
"""
detected_objects = []
tracked_frame = frame.copy()
contours = self.get_contours(difference_mask)
# Filter overlapping contours
for contour in contours:
area = cv2.contourArea(contour)
if area > self.MIN_SIZE_THRESHOLD:
x, y, w, h = cv2.boundingRect(contour)
object_box = [x, y, x + w, y + h]
if not self.is_overlapping(object_box, detected_objects, iou_threshold=0.02):
detected_objects.append(object_box)
filtered_objects = self.filter_ignored_objects(detected_objects)
for box in filtered_objects:
x1, y1, x2, y2 = box
cv2.rectangle(tracked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
return tracked_frame, filtered_objects
def find_potential_stationary(self, prev_objects, new_objects):
"""
Find temporary stationary objects by comparing new objects with previous objects.
Parameters:
prev_objects (list): List of previous objects.
new_objects (list): List of new objects.
Returns:
list: List of temporary stationary objects.
"""
stationary_objects = []
if len(prev_objects) == 0:
return new_objects
else:
for obj in new_objects:
if self.is_overlapping(obj, prev_objects):
stationary_objects.append(obj)
if len(stationary_objects) == 0:
return new_objects
else:
return stationary_objects
def update_existing_stationary_objects(self, stationary_potentials, frame_count):
"""
Updates the status of existing stationary objects.
Parameters:
stationary_potentials (list): List of positions of potentially stationary objects.
frame_count (int): The current frame count.
Return:
new_stationary_potentials (list): List of left over position that appears to be new and has not been added
"""
for stationary_potential in stationary_potentials.copy():
for key, value in self.stationary_objects.items():
prev_stationary = value[3]
if self.is_overlapping(prev_stationary, [stationary_potential]):
value[0] = True
value[2] = frame_count
value[3] = stationary_potential
stationary_potentials.remove(stationary_potential)
break
def add_new_stationary_object(self, stationary_potentials, frame_count, frame, out_path):
"""
Adds new stationary objects to the dictionary.
Parameters:
stationary_potentials (list): List of positions of potentially stationary objects.
frame_count (int): The current frame count.
frame (numpy.ndarray): The current frame.
out_path (str): The directory where object's snapshot will be saved.
"""
for position in stationary_potentials:
id = len(self.stationary_objects)
self.stationary_objects[id] = [True, frame_count - 3 * self.TRACK_RATE, frame_count, position]
self.save_stationary_snapshot(frame, position, id, out_path)
def save_stationary_snapshot(self, frame, position, id, out_path):
"""
Saves a cropped image of a stationary object.
Parameters:
frame (numpy.ndarray): The current frame.
position (tuple): Bounding box coordinates (x1, y1, x2, y2) of the stationary object.
id (int): Index of the stationary object.
out_path (str): The directory where object's snapshot will be saved.
"""
x1, y1, x2, y2 = position
cropped_image = frame[y1:y2, x1:x2]
image_path = out_path + f"/id_{id}.jpg"
cv2.imwrite(image_path, cropped_image)
def update_stationary_objects(self, frame_count, prev_temp_stationary, temp_stationary, frame, out_path):
"""
Updates the list of stationary objects based on frame differences.
Parameters:
frame_count (int): The current frame count.
prev_temp_stationary (list): List of positions of stationary objects in the previous frame.
temp_stationary (list): List of positions of objects in the current frame.
frame (numpy.ndarray): The current frame.
out_path (str): The directory where object's snapshot will be saved.
Returns:
dict: Updated dictionary of stationary object information.
"""
stationary_potentials = []
self.reset_stationary_status()
for new_object in temp_stationary:
if self.is_overlapping(new_object, prev_temp_stationary):
stationary_potentials.append(new_object)
self.update_existing_stationary_objects(stationary_potentials, frame_count)
self.add_new_stationary_object(stationary_potentials, frame_count, frame, out_path)
return self.stationary_objects