Skip to content

Commit

Permalink
Merge pull request #5 from KeplerC/cloud
Browse files Browse the repository at this point in the history
Cloud Support
  • Loading branch information
KeplerC authored Apr 18, 2024
2 parents daac973 + e27cd54 commit 54858d9
Show file tree
Hide file tree
Showing 12 changed files with 854 additions and 114 deletions.
15 changes: 13 additions & 2 deletions Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,22 @@ FROM ubuntu:latest
RUN apt-get update && \
apt-get install -y python3.9 \
python3-pip \
libgmp-dev
libgmp-dev \
ffmpeg

RUN pip3 install pandas \
polars \
numpy \
tensorflow \
torch \
tensorflow_datasets \
envlogger \
datasets \
pyarrow

COPY . /app
WORKDIR /app
RUN pip install .
RUN pip install .[full]
RUN pip3 install jupyter

COPY . /
Expand Down
8 changes: 7 additions & 1 deletion design_doc/planning_doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@
7. ROS / ROS2
8. recompaction to save space
9. CLI
10. hugging face

### For loader refactor
1. able to load from the saved dataset and recover semantics
2. able to generate gif and extract text message
3. generate dataset efficiently with {}


425 changes: 425 additions & 0 deletions examples/Fog_RTX_Cloud_Demo.ipynb

Large diffs are not rendered by default.

130 changes: 130 additions & 0 deletions examples/analytics/dataset_organizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import fog_x

DATASETS = [
"fractal20220817_data",
"kuka",
"bridge",
"taco_play",
"jaco_play",
"berkeley_cable_routing",
"roboturk",
"nyu_door_opening_surprising_effectiveness",
"viola",
"berkeley_autolab_ur5",
"toto",
"columbia_cairlab_pusht_real",
"stanford_kuka_multimodal_dataset_converted_externally_to_rlds",
"nyu_rot_dataset_converted_externally_to_rlds",
"stanford_hydra_dataset_converted_externally_to_rlds",
"austin_buds_dataset_converted_externally_to_rlds",
"nyu_franka_play_dataset_converted_externally_to_rlds",
"maniskill_dataset_converted_externally_to_rlds",
"cmu_franka_exploration_dataset_converted_externally_to_rlds",
"ucsd_kitchen_dataset_converted_externally_to_rlds",
"ucsd_pick_and_place_dataset_converted_externally_to_rlds",
"austin_sailor_dataset_converted_externally_to_rlds",
"austin_sirius_dataset_converted_externally_to_rlds",
"bc_z",
"usc_cloth_sim_converted_externally_to_rlds",
"utokyo_pr2_opening_fridge_converted_externally_to_rlds",
"utokyo_pr2_tabletop_manipulation_converted_externally_to_rlds",
"utokyo_saytap_converted_externally_to_rlds",
"utokyo_xarm_pick_and_place_converted_externally_to_rlds",
"utokyo_xarm_bimanual_converted_externally_to_rlds",
"robo_net",
"berkeley_mvp_converted_externally_to_rlds",
"berkeley_rpt_converted_externally_to_rlds",
"kaist_nonprehensile_converted_externally_to_rlds",
"stanford_mask_vit_converted_externally_to_rlds",
"tokyo_u_lsmo_converted_externally_to_rlds",
"dlr_sara_pour_converted_externally_to_rlds",
"dlr_sara_grid_clamp_converted_externally_to_rlds",
"dlr_edan_shared_control_converted_externally_to_rlds",
"asu_table_top_converted_externally_to_rlds",
"stanford_robocook_converted_externally_to_rlds",
"eth_agent_affordances",
"imperialcollege_sawyer_wrist_cam",
"iamlab_cmu_pickup_insert_converted_externally_to_rlds",
"uiuc_d3field",
"utaustin_mutex",
"berkeley_fanuc_manipulation",
"cmu_play_fusion",
"cmu_stretch",
"berkeley_gnm_recon",
"berkeley_gnm_cory_hall",
# "berkeley_gnm_sac_son",
]


objects = ["NOTEXIST", "marker", "cloth", "cup", "object", "bottle", "block", "drawer", "lid", "mug"]
tasks = ["NOTEXIST", "put", "move", "pick", "remove", "take", "open", "close", "place", "turn", "push",
"insert", "stack", "lift", "pour"] # things not in DROID
views = ["NOTEXIST", "wrist", "top", "other"]

dataset_id = 0
for dataset_name in DATASETS:
dataset = fog_x.dataset.Dataset(
name=dataset_name,
path="~/rtx_datasets",
)

dataset._prepare_rtx_metadata(
name=dataset_name,
sample_size = 10,
shuffle=True,
)

for dataset_name in DATASETS:
dataset = fog_x.dataset.Dataset(
name=dataset_name,
path="~/rtx_datasets",
)
info = dataset.get_episode_info()

for episode_metadata in info.iter_rows(named = True):
instruction = episode_metadata["natural_language_instruction"]

d = dict()
instruction = instruction.lower().replace(",", "").replace("\n", "").replace("\"", "").replace("\'", "")
d["dataset_id"] = f"dataset-{dataset_id}"
d["info"] = instruction
task_id = -1
for task in tasks:
if task in instruction:
task_id = tasks.index(task)
if task_id == -1:
task_id = len(tasks) - 1

obj_id = -1
for obj in objects:
if obj in instruction:
obj_id = objects.index(obj)
if obj_id == -1:
obj_id = len(objects) - 1

d["task_id"] = f"task-{task_id}"
d["object_id"] = f"object-{obj_id}"

images_features = [col for col in info.columns if col.startswith("video_path_")]
for i, image_feature in enumerate(images_features):
path = episode_metadata[image_feature]
d["poster"] = f"videos/{dataset_name}_viz/{path}.jpg"
d["src"] = f"videos/{dataset_name}_viz/{path}.mp4"
view_id = -1
for view in views:
if view in path:
view_id = views.index(view)
if view_id == -1:
view_id = len(views) - 1

d["view_id"] = f"view-{view_id}"

# print d in JSON format
with open("/tmp/dataset_info.txt", "a") as file:
printable = str(d).replace("\'", "\"")
file.write(f'JSON.parse(\'{printable}\'),\n')


# write as a line of JSON.parse('{"info": "Unfold the tea towel", "poster": "videos/bridge_viz/bridge_0_image.jpg", "src": "videos/bridge_viz/bridge_0_image.mp4"}'),
# print (f'JSON.parse(\'{{"info": "{instruction}", "poster": "videos/{dataset_name}_viz/{dataset_name}_{episode_id}_image.jpg", "src": "videos/{dataset_name}_viz/{dataset_name}_{dataset_id}_image.mp4"}}\'),')
dataset_id += 1
2 changes: 1 addition & 1 deletion examples/analytics/extract_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

dataset.load_rtx_episodes(
name="berkeley_autolab_ur5",
split="train[:1]",
split="train[:5]",
)

all_step_data = dataset.get_step_data() # get lazy polars frame of the entire dataset
Expand Down
109 changes: 64 additions & 45 deletions fog_x/database/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def add(
value: Any,
timestamp: int,
feature_type: Optional[FeatureType] = None,
metadata_only = False,
):
if feature_name not in self.features.keys():
logger.warning(
Expand All @@ -219,15 +220,16 @@ def add(
)
self._initialize_feature(feature_name, feature_type)

# insert data into the table
self.step_data_connector.insert_data(
self._get_feature_table_name(feature_name),
{
"episode_id": self.current_episode_id,
"Timestamp": timestamp,
feature_name: value,
},
)
if not metadata_only:
# insert data into the table
self.step_data_connector.insert_data(
self._get_feature_table_name(feature_name),
{
"episode_id": self.current_episode_id,
"Timestamp": timestamp,
feature_name: value,
},
)

def compact(self):
# create a table for the compacted data
Expand All @@ -244,6 +246,8 @@ def compact(self):
clear_feature_tables = True,
)

self.step_data_connector.remove_tables(table_names)

# def get_table(
# self, table_name: Optional[str] = None, format: str = "pandas"
# ):
Expand Down Expand Up @@ -276,45 +280,60 @@ def _get_feature_table_name(self, feature_name):
)
return table_name

def close(self):
self.compact()

update_dict = {"Finished": True}
compacted_table = self.step_data_connector.select_table(
f"{self.dataset_name}_{self.current_episode_id}"
)

for feature_name in self.features.keys():
if "count" in self.required_stats:
update_dict[f"{feature_name}_count"] = compacted_table[
feature_name
].count()
if "mean" in self.required_stats:
update_dict[f"{feature_name}_mean"] = compacted_table[
feature_name
].mean()
if "max" in self.required_stats:
update_dict[f"{feature_name}_max"] = compacted_table[
feature_name
].max()
if "min" in self.required_stats:
update_dict[f"{feature_name}_min"] = compacted_table[
feature_name
].min()
def close(self, save_data=True, save_metadata=True, additional_metadata = None):

if additional_metadata is None:
additional_metadata = {}
logger.info(f"Closing the episode with metadata {additional_metadata}")
additional_metadata["Finished"] = True
if save_data:
self.compact()
compacted_table = self.step_data_connector.select_table(
f"{self.dataset_name}_{self.current_episode_id}"
)
for feature_name in self.features.keys():
if "count" in self.required_stats:
additional_metadata[f"{feature_name}_count"] = compacted_table[
feature_name
].count()
if "mean" in self.required_stats:
additional_metadata[f"{feature_name}_mean"] = compacted_table[
feature_name
].mean()
if "max" in self.required_stats:
additional_metadata[f"{feature_name}_max"] = compacted_table[
feature_name
].max()
if "min" in self.required_stats:
additional_metadata[f"{feature_name}_min"] = compacted_table[
feature_name
].min()
self.step_data_connector.save_table(
f"{self.dataset_name}_{self.current_episode_id}",
)
# TODO: this is a hack clear the old dataframe and load as a lazy frame
# TODO: future iteration: serve as cache
self.step_data_connector.load_tables(
[self.current_episode_id],
[f"{self.dataset_name}_{self.current_episode_id}"],
)
else:
table_names = [
self._get_feature_table_name(feature_name)
for feature_name in self.features.keys()
]
self.step_data_connector.remove_tables(table_names)

for metadata_key in additional_metadata.keys():
logger.debug(f"Adding metadata key {metadata_key} to the database")
self.episode_info_connector.add_column(
self.dataset_name,
metadata_key,
"str", # TODO: support more types
)
# update the metadata field marking the episode as compacted
self.episode_info_connector.update_data(
self.dataset_name, self.current_episode_id, update_dict
)

self.step_data_connector.save_table(
f"{self.dataset_name}_{self.current_episode_id}",
)

# TODO: this is a hack clear the old dataframe and load as a lazy frame
self.step_data_connector.load_tables(
[self.current_episode_id],
[f"{self.dataset_name}_{self.current_episode_id}"],
self.dataset_name, self.current_episode_id, additional_metadata
)

self.episode_info_connector.save_table(
Expand Down
30 changes: 17 additions & 13 deletions fog_x/database/polars_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ def merge_tables_with_timestamp(
logger.debug("Only one table to merge.")
self.tables[output_table] = merged_df

if clear_feature_tables:
for table_name in tables:
self.tables.pop(table_name)
logger.debug(f"Table {table_name} removed.")
def remove_tables(self, tables: List[str]):
for table_name in tables:
self.tables.pop(table_name)
logger.debug(f"Table {table_name} removed.")

def select_table(self, table_name: str):
# Return the DataFrame
Expand Down Expand Up @@ -136,19 +136,23 @@ def create_table(self, table_name: str):


def load_tables(self, table_names: List[str]):

# load tables from the path
for table_name in table_names:
path = f"{self.path}/{table_name}.parquet"
if os.path.exists(os.path.expanduser(path)):
self.tables[table_name] = pl.read_parquet(path)
self.table_len[table_name] = len(self.tables[table_name])
else:
logger.debug(f"Table {table_name} does not exist in {path}.")
logger.info(f"Prepare to load table {table_name} loaded from {path}.")
# if os.path.exists(os.path.expanduser(path)):
# self.tables[table_name] = pl.read_parquet(path)
# self.table_len[table_name] = len(self.tables[table_name])
# else:
# logger.debug(f"Table {table_name} does not exist in {path}.")
path = os.path.expanduser(path)
self.tables[table_name] = pl.from_arrow(pq.read_table(path))
self.table_len[table_name] = len(self.tables[table_name])
logger.info(f"Table {table_name} loaded from {path}.")

def save_table(self, table_name: str):
self.tables[table_name].write_parquet(
f"{self.path}/{table_name}.parquet"
)
pq.write_table(self.tables[table_name].to_arrow(), f"{self.path}/{table_name}.parquet")


class LazyFrameConnector(PolarsConnector):
Expand All @@ -170,7 +174,7 @@ def load_tables(self, episode_ids: List[str], table_names: List[str]):
self.tables[table_name] = self.dataset.filter(
pl.col("episode_id") == episode_id
)
logger.debug(f"Tables loaded from {self.tables}")
# logger.debug(f"Tables loaded from {self.tables}")

def create_table(self, table_name: str):
# Create a new DataFrame with specified columns
Expand Down
Loading

0 comments on commit 54858d9

Please sign in to comment.