Skip to content

Commit

Permalink
fix tf record's benchmark to read the data
Browse files Browse the repository at this point in the history
  • Loading branch information
KeplerC committed Aug 26, 2024
1 parent e83e6da commit 4ba6453
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 30 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,5 @@ dmypy.json
temp.gif

*.vla
*.mkv
*.mkv
*.csv
63 changes: 34 additions & 29 deletions benchmarks/openx.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,17 @@ def measure_loading_time_per_trajectory(self):
for data in loader:
start_time = time.time()
l = list(data)
print("length of loaded data", len(l))
for i in l:
# recursively load all data
def _recursively_load_data(data):
for key in data.keys():
if isinstance(data[key], dict):
_recursively_load_data(data[key])
else:
(key, np.array(data[key]))
(key, np.array(data[key]).shape)
_recursively_load_data(i)
# print("length of loaded data", len(l))
end_time = time.time()
loading_time = end_time - start_time
times.append(loading_time)
Expand All @@ -174,44 +184,21 @@ def __init__(self, exp_dir, dataset_name, num_trajectories):
super().__init__(exp_dir, dataset_name, num_trajectories, dataset_type="vla")
self.trajectories_objects = []

def measure_loading_time(self, is_add_to_trajectories=False):
"""Measures the time taken to load data into memory using VLALoader."""

def _recursively_load_h5_data(data):
for key in data.keys():
if isinstance(data[key], dict):
_recursively_load_h5_data(data[key])
else:
(key, np.array(data[key]))
(key, np.array(data[key]).shape)

start_time = time.time()
loader = VLALoader(self.dataset_dir, cache_dir=CACHE_DIR)
for data in loader:
_recursively_load_h5_data(data.load())
if is_add_to_trajectories:
self.trajectories_objects.append(data)

end_time = time.time()
loading_time = end_time - start_time
print(
f"Loaded {len(loader)} trajectories in {loading_time:.2f} seconds start time {start_time} end time {end_time}"
)
return loading_time, len(loader)

def convert_data_to_vla_format(self, loader):
"""Converts data to VLA format and saves it to the same directory."""
for index, data_traj in enumerate(loader):
output_path = os.path.join(self.dataset_dir, f"output_{index}.vla")
fog_x.Trajectory.from_list_of_dicts(data_traj, path=output_path)

def measure_loading_time_per_trajectory(self):
def measure_loading_time_per_trajectory(self, save_trajectorie_objects=False):
"""Measures the time taken to load each trajectory separately using VLALoader."""
times = []
loader = VLALoader(self.dataset_dir, cache_dir=CACHE_DIR)
for data in loader:
start_time = time.time()
self._recursively_load_h5_data(data.load())
if save_trajectorie_objects:
self.trajectories_objects.append(data)
end_time = time.time()
loading_time = end_time - start_time
times.append(loading_time)
Expand Down Expand Up @@ -378,6 +365,7 @@ def evaluation():
# Process RLDS data
rlds_handler = RLDSHandler(args.exp_dir, dataset_name, args.num_trajectories)
rlds_sizes = rlds_handler.measure_file_size_per_trajectory()
rlds_handler.clear_os_cache()
rlds_loading_times = rlds_handler.measure_loading_time_per_trajectory()

for i, (size, time) in enumerate(zip(rlds_sizes, rlds_loading_times)):
Expand All @@ -393,22 +381,39 @@ def evaluation():
# Process VLA data
vla_handler = VLAHandler(args.exp_dir, dataset_name, args.num_trajectories)
vla_sizes = vla_handler.measure_file_size_per_trajectory()
vla_loading_times = vla_handler.measure_loading_time_per_trajectory()
vla_handler.clear_os_cache()
vla_loading_times = vla_handler.measure_loading_time_per_trajectory(save_trajectorie_objects=True)

for i, (size, time) in enumerate(zip(vla_sizes, vla_loading_times)):
results.append({
'Dataset': dataset_name,
'Format': 'VLA-ColdCache',
'Trajectory': i,
'LoadingTime(s)': time,
'FileSize(MB)': size / (1024 * 1024),
'Throughput(traj/s)': 1 / time if time > 0 else 0
})

vla_handler.clear_os_cache()
# hot cache test
vla_loading_times = vla_handler.measure_loading_time_per_trajectory(save_trajectorie_objects=False)

for i, (size, time) in enumerate(zip(vla_sizes, vla_loading_times)):
results.append({
'Dataset': dataset_name,
'Format': 'VLA',
'Format': 'VLA-HotCache',
'Trajectory': i,
'LoadingTime(s)': time,
'FileSize(MB)': size / (1024 * 1024),
'Throughput(traj/s)': 1 / time if time > 0 else 0
})


# Convert VLA to HDF5 and benchmark
hdf5_handler = HDF5Handler(args.exp_dir, dataset_name, args.num_trajectories)
hdf5_handler.convert_data_to_hdf5(vla_handler.trajectories_objects)
hdf5_sizes = hdf5_handler.measure_file_size_per_trajectory()
hdf5_handler.clear_os_cache()
hdf5_loading_times = hdf5_handler.measure_loading_time_per_trajectory()

for i, (size, time) in enumerate(zip(hdf5_sizes, hdf5_loading_times)):
Expand Down

0 comments on commit 4ba6453

Please sign in to comment.