Skip to content

Commit

Permalink
Refactor RLDSLoader class to improve code readability and add lazy lo…
Browse files Browse the repository at this point in the history
…ading for data
  • Loading branch information
KeplerC committed Aug 26, 2024
1 parent 975c4e5 commit e83e6da
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 72 deletions.
176 changes: 122 additions & 54 deletions benchmarks/openx.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
import numpy as np
from fog_x.loader import RLDSLoader, VLALoader, HDF5Loader
import tensorflow as tf # this prevents tensorflow printed logs

import pandas as pd
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

# Constants
DEFAULT_EXP_DIR = "/tmp/fog_x"
DEFAULT_NUMBER_OF_TRAJECTORIES = 1
DEFAULT_DATASET_NAMES = ["berkeley_autolab_ur5"]
DEFAULT_EXP_DIR = "/home/kych/datasets/fog_x/"
DEFAULT_NUMBER_OF_TRAJECTORIES = 64
DEFAULT_DATASET_NAMES = ["berkeley_autolab_ur5", "bridge", "berkeley_cable_routing", "nyu_door_opening_surprising_effectiveness"]
CACHE_DIR = "/tmp/fog_x/cache/"


Expand Down Expand Up @@ -121,6 +121,15 @@ def measure_file_size(self):
)
return total_size

def measure_file_size_per_trajectory(self):
"""Calculates the size of each trajectory file in the dataset directory."""
trajectory_sizes = []
for dirpath, dirnames, filenames in os.walk(self.dataset_dir):
for f in filenames:
file_path = os.path.join(dirpath, f)
file_size = os.path.getsize(file_path)
trajectory_sizes.append(file_size)
return trajectory_sizes

class RLDSHandler(DatasetHandler):
"""Handles RLDS dataset operations, including loading and measuring loading times."""
Expand All @@ -142,6 +151,21 @@ def measure_loading_time(self):
)
return loading_time, len(loader)

def measure_loading_time_per_trajectory(self):
"""Measures the time taken to load each trajectory separately."""
times = []
loader = RLDSLoader(self.dataset_dir, split=f"train[:{self.num_trajectories}]")
for data in loader:
start_time = time.time()
l = list(data)
print("length of loaded data", len(l))
end_time = time.time()
loading_time = end_time - start_time
times.append(loading_time)
print(
f"Loaded 1 trajectory in {loading_time:.2f} seconds start time {start_time} end time {end_time}"
)
return times

class VLAHandler(DatasetHandler):
"""Handles VLA dataset operations, including loading, converting, and measuring loading times."""
Expand Down Expand Up @@ -181,11 +205,33 @@ def convert_data_to_vla_format(self, loader):
output_path = os.path.join(self.dataset_dir, f"output_{index}.vla")
fog_x.Trajectory.from_list_of_dicts(data_traj, path=output_path)


class HDF5Handler:
def measure_loading_time_per_trajectory(self):
"""Measures the time taken to load each trajectory separately using VLALoader."""
times = []
loader = VLALoader(self.dataset_dir, cache_dir=CACHE_DIR)
for data in loader:
start_time = time.time()
self._recursively_load_h5_data(data.load())
end_time = time.time()
loading_time = end_time - start_time
times.append(loading_time)
print(
f"Loaded 1 trajectory in {loading_time:.2f} seconds start time {start_time} end time {end_time}"
)
return times
def _recursively_load_h5_data(self, data):
for key in data.keys():
if isinstance(data[key], dict):
self._recursively_load_h5_data(data[key])
else:
(key, np.array(data[key]))
(key, np.array(data[key]).shape)

class HDF5Handler(DatasetHandler):
"""Handles HDF5 dataset operations, including conversion and measuring file sizes."""

def __init__(self, exp_dir, dataset_name):
def __init__(self, exp_dir, dataset_name, num_trajectories):
super().__init__(exp_dir, dataset_name, num_trajectories, dataset_type="hdf5")
self.hdf5_dir = os.path.join(exp_dir, "hdf5", dataset_name)
if not os.path.exists(self.hdf5_dir):
os.makedirs(self.hdf5_dir)
Expand Down Expand Up @@ -232,6 +278,29 @@ def _recursively_load_h5_data(data):
return loading_time, count


def measure_loading_time_per_trajectory(self):
"""Measures the time taken to load each trajectory separately using HDF5Loader."""
times = []
loader = HDF5Loader(path=os.path.join(self.hdf5_dir, "*.h5"))
for data in loader:
start_time = time.time()
self._recursively_load_h5_data(data)
end_time = time.time()
loading_time = end_time - start_time
times.append(loading_time)
print(
f"Loaded 1 trajectory in {loading_time:.2f} seconds start time {start_time} end time {end_time}"
)
return times

def _recursively_load_h5_data(self, data):
for key in data.keys():
if isinstance(data[key], dict):
self._recursively_load_h5_data(data[key])
else:
(key, np.array(data[key]))
(key, np.array(data[key]).shape)

def prepare():
# Parse command-line arguments
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -296,6 +365,8 @@ def evaluation():
)
args = parser.parse_args()

results = []

for dataset_name in args.dataset_names:
print(f"Processing dataset: {dataset_name}")

Expand All @@ -306,58 +377,55 @@ def evaluation():

# Process RLDS data
rlds_handler = RLDSHandler(args.exp_dir, dataset_name, args.num_trajectories)
rlds_file_size = rlds_handler.measure_file_size()
rlds_loading_time, num_loaded_rlds = rlds_handler.measure_loading_time()

print(f"Total RLDS file size: {rlds_file_size / (1024 * 1024):.2f} MB")
print(
f"RLDS format loading time for {num_loaded_rlds} trajectories: {rlds_loading_time:.2f} seconds"
)
print(
f"RLDS format throughput: {num_loaded_rlds / rlds_loading_time:.2f} trajectories per second"
)

# # Process VLA data
rlds_sizes = rlds_handler.measure_file_size_per_trajectory()
rlds_loading_times = rlds_handler.measure_loading_time_per_trajectory()

for i, (size, time) in enumerate(zip(rlds_sizes, rlds_loading_times)):
results.append({
'Dataset': dataset_name,
'Format': 'RLDS',
'Trajectory': i,
'LoadingTime(s)': time,
'FileSize(MB)': size / (1024 * 1024),
'Throughput(traj/s)': 1 / time if time > 0 else 0
})

# Process VLA data
vla_handler = VLAHandler(args.exp_dir, dataset_name, args.num_trajectories)
vla_loading_time, num_loaded_vla = vla_handler.measure_loading_time(
is_add_to_trajectories=True
)
vla_file_size = vla_handler.measure_file_size()
print(f"Total VLA file size: {vla_file_size / (1024 * 1024):.2f} MB")
print(
f"VLA format loading time for {num_loaded_vla} trajectories: {vla_loading_time:.2f} seconds"
)
print(
f"VLA format throughput: {num_loaded_vla / vla_loading_time:.2f} trajectories per second\n"
)

vla_handler.clear_os_cache()
# hot cache VLA loading time
vla_loading_time, num_loaded_vla = vla_handler.measure_loading_time(
is_add_to_trajectories=False
)
print(
f"VLA format loading time for {num_loaded_vla} trajectories: {vla_loading_time:.2f} seconds"
)
print(
f"VLA format throughput: {num_loaded_vla / vla_loading_time:.2f} trajectories per second\n"
)
vla_sizes = vla_handler.measure_file_size_per_trajectory()
vla_loading_times = vla_handler.measure_loading_time_per_trajectory()

for i, (size, time) in enumerate(zip(vla_sizes, vla_loading_times)):
results.append({
'Dataset': dataset_name,
'Format': 'VLA',
'Trajectory': i,
'LoadingTime(s)': time,
'FileSize(MB)': size / (1024 * 1024),
'Throughput(traj/s)': 1 / time if time > 0 else 0
})

# Convert VLA to HDF5 and benchmark
hdf5_handler = HDF5Handler(args.exp_dir, dataset_name)
hdf5_handler = HDF5Handler(args.exp_dir, dataset_name, args.num_trajectories)
hdf5_handler.convert_data_to_hdf5(vla_handler.trajectories_objects)
hdf5_file_size = hdf5_handler.measure_file_size()
print(f"Total HDF5 file size: {hdf5_file_size / (1024 * 1024):.2f} MB")
hdf5_sizes = hdf5_handler.measure_file_size_per_trajectory()
hdf5_loading_times = hdf5_handler.measure_loading_time_per_trajectory()

for i, (size, time) in enumerate(zip(hdf5_sizes, hdf5_loading_times)):
results.append({
'Dataset': dataset_name,
'Format': 'HDF5',
'Trajectory': i,
'LoadingTime(s)': time,
'FileSize(MB)': size / (1024 * 1024),
'Throughput(traj/s)': 1 / time if time > 0 else 0
})

# Save results to CSV
results_df = pd.DataFrame(results)
results_df.to_csv('trajectory_results.csv', index=False)
print("Results written to trajectory_results.csv")

vla_handler.clear_os_cache()
# Measure HDF5 loading time
hdf5_loading_time, num_loaded_hdf5 = hdf5_handler.measure_loading_time()
print(
f"HDF5 format loading time for {num_loaded_hdf5} trajectories: {hdf5_loading_time:.2f} seconds"
)
print(
f"HDF5 format throughput: {num_loaded_hdf5 / hdf5_loading_time:.2f} trajectories per second\n"
)


if __name__ == "__main__":
Expand Down
28 changes: 10 additions & 18 deletions fog_x/loader/rlds.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,35 @@ def __init__(self, path, split):

builder = tfds.builder_from_directory(path)
self.ds = builder.as_dataset(split)
self.iterator = iter(self.ds)

self.split = split
self.index = 0

def __len__(self):
return len(self.ds)
return tf.data.experimental.cardinality(self.ds).numpy()

def __iter__(self):
return self

def __next__(self):

if self.index < len(self):
self.index += 1
nest_ds = self.ds.__iter__()
traj = list(nest_ds)[0]["steps"]
try:
nest_ds = next(self.iterator)
traj = nest_ds["steps"]
data = []

for step_data in traj:
step = {}
for key, val in step_data.items():

if key == "observation":
step["observation"] = {}
for obs_key, obs_val in val.items():
step["observation"][obs_key] = np.array(obs_val)

step["observation"] = {obs_key: np.array(obs_val) for obs_key, obs_val in val.items()}
elif key == "action":
step["action"] = {}
for act_key, act_val in val.items():
step["action"][act_key] = np.array(act_val)
step["action"] = {act_key: np.array(act_val) for act_key, act_val in val.items()}
else:
step[key] = np.array(val)

data.append(step)
return data
else:
except StopIteration:
self.index = 0
raise StopIteration

self.iterator = iter(self.ds)
raise StopIteration

0 comments on commit e83e6da

Please sign in to comment.