From 95ac89c8046306f7ed552ad113c7c5b4513c214f Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Tue, 16 Apr 2024 06:06:06 +0000 Subject: [PATCH 01/21] making load episode to be a separate function --- fog_x/dataset.py | 110 ++++++++++++++++++++++++++--------------------- 1 file changed, 62 insertions(+), 48 deletions(-) diff --git a/fog_x/dataset.py b/fog_x/dataset.py index 1c8cd83..e2c88c2 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -290,6 +290,64 @@ def export_rtx( else: writer._record_step(stepdata, is_new_episode=True) + + def load_rtx_step_data_from_episode( + self, + tf_episode: Dict[str, Any], + additional_metadata: Optional[Dict[str, Any]] = None, + data_type: Dict[str, Any] = {} + ): + from tensorflow_datasets.core.features import ( + FeaturesDict, + Image, + Scalar, + Tensor, + Text, + ) + + fog_episode = self.new_episode( + metadata=additional_metadata, + ) + for step in tf_episode["steps"]: + for k, v in step.items(): + if k == "observation" or k == "action": + for k2, v2 in v.items(): + # TODO: abstract this to feature.py + + if ( + isinstance(data_type[k][k2], Tensor) + and data_type[k][k2].shape != () + ): + memfile = io.BytesIO() + np.save(memfile, v2.numpy()) + value = memfile.getvalue() + elif isinstance(data_type[k][k2], Image): + memfile = io.BytesIO() + np.save(memfile, v2.numpy()) + value = memfile.getvalue() + else: + value = v2.numpy() + + fog_episode.add( + feature=str(k2), + value=value, + feature_type=FeatureType( + tf_feature_spec=data_type[k][k2] + ), + ) + if k == "observation": + self.obs_keys.append(k2) + elif k == "action": + self.act_keys.append(k2) + else: + fog_episode.add( + feature=str(k), + value=v.numpy(), + feature_type=FeatureType(tf_feature_spec=data_type[k]), + ) + self.step_keys.append(k) + fog_episode.close() + def load_rtx_episodes( self, name: str, @@ -313,13 +371,6 @@ def load_rtx_episodes( # this is only required if rtx format is used import tensorflow_datasets as tfds - from tensorflow_datasets.core.features import ( - FeaturesDict, - Image, - Scalar, - Tensor, - Text, - ) from fog_x.rlds.utils import dataset2path @@ -330,48 +381,11 @@ def load_rtx_episodes( for tf_episode in ds: logger.info(tf_episode) - fog_episode = self.new_episode( - metadata=additional_metadata, + self.load_rtx_step_data_from_episode( + tf_episode, additional_metadata, data_type ) - for step in tf_episode["steps"]: - for k, v in step.items(): - if k == "observation" or k == "action": - for k2, v2 in v.items(): - # TODO: abstract this to feature.py - - if ( - isinstance(data_type[k][k2], Tensor) - and data_type[k][k2].shape != () - ): - memfile = io.BytesIO() - np.save(memfile, v2.numpy()) - value = memfile.getvalue() - elif isinstance(data_type[k][k2], Image): - memfile = io.BytesIO() - np.save(memfile, v2.numpy()) - value = memfile.getvalue() - else: - value = v2.numpy() - - fog_episode.add( - feature=str(k2), - value=value, - feature_type=FeatureType( - tf_feature_spec=data_type[k][k2] - ), - ) - if k == "observation": - self.obs_keys.append(k2) - elif k == "action": - self.act_keys.append(k2) - else: - fog_episode.add( - feature=str(k), - value=v.numpy(), - feature_type=FeatureType(tf_feature_spec=data_type[k]), - ) - self.step_keys.append(k) - fog_episode.close() + + def get_episode_info(self) -> pandas.DataFrame: """ From d92c0c3898396f220404453c23252b89dfbfc66e Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Tue, 16 Apr 2024 07:49:00 +0000 Subject: [PATCH 02/21] Refactor dataset split in extract_column.py, close method in Episode class, remove_tables method in PolarsConnector class, and close method in DatabaseManager class --- examples/analytics/dataset_organizer.py | 12 ++ examples/analytics/extract_column.py | 2 +- fog_x/database/db_manager.py | 78 +++++++----- fog_x/database/polars_connector.py | 8 +- fog_x/dataset.py | 158 ++++++++++++++---------- fog_x/episode.py | 4 +- 6 files changed, 161 insertions(+), 101 deletions(-) create mode 100644 examples/analytics/dataset_organizer.py diff --git a/examples/analytics/dataset_organizer.py b/examples/analytics/dataset_organizer.py new file mode 100644 index 0000000..003cf2c --- /dev/null +++ b/examples/analytics/dataset_organizer.py @@ -0,0 +1,12 @@ +import fog_x + +dataset = fog_x.dataset.Dataset( + name="demo_ds", + path="~/test_dataset", +) + +dataset._prepare_rtx_metadata( + name="berkeley_autolab_ur5", +) + + diff --git a/examples/analytics/extract_column.py b/examples/analytics/extract_column.py index 482e376..ef321d4 100644 --- a/examples/analytics/extract_column.py +++ b/examples/analytics/extract_column.py @@ -7,7 +7,7 @@ dataset.load_rtx_episodes( name="berkeley_autolab_ur5", - split="train[:1]", + split="train[:5]", ) all_step_data = dataset.get_step_data() # get lazy polars frame of the entire dataset diff --git a/fog_x/database/db_manager.py b/fog_x/database/db_manager.py index 12b3bf1..35ae4fc 100644 --- a/fog_x/database/db_manager.py +++ b/fog_x/database/db_manager.py @@ -244,6 +244,8 @@ def compact(self): clear_feature_tables = True, ) + self.step_data_connector.remove_tables(table_names) + # def get_table( # self, table_name: Optional[str] = None, format: str = "pandas" # ): @@ -276,47 +278,59 @@ def _get_feature_table_name(self, feature_name): ) return table_name - def close(self): - self.compact() + def close(self, save_data=True, save_metadata=True): update_dict = {"Finished": True} - compacted_table = self.step_data_connector.select_table( - f"{self.dataset_name}_{self.current_episode_id}" - ) + if save_data: + self.compact() + compacted_table = self.step_data_connector.select_table( + f"{self.dataset_name}_{self.current_episode_id}" + ) + for feature_name in self.features.keys(): + if "count" in self.required_stats: + update_dict[f"{feature_name}_count"] = compacted_table[ + feature_name + ].count() + if "mean" in self.required_stats: + update_dict[f"{feature_name}_mean"] = compacted_table[ + feature_name + ].mean() + if "max" in self.required_stats: + update_dict[f"{feature_name}_max"] = compacted_table[ + feature_name + ].max() + if "min" in self.required_stats: + update_dict[f"{feature_name}_min"] = compacted_table[ + feature_name + ].min() + self.step_data_connector.save_table( + f"{self.dataset_name}_{self.current_episode_id}", + ) + # TODO: this is a hack clear the old dataframe and load as a lazy frame + # TODO: future iteration: serve as cache + self.step_data_connector.load_tables( + [self.current_episode_id], + [f"{self.dataset_name}_{self.current_episode_id}"], + ) + else: + for feature_name in self.features.keys(): + feature_table = self.step_data_connector.select_table( + self._get_feature_table_name(feature_name) + ) + update_dict[f"{feature_name}_count"] = feature_table[feature_name].count() + + table_names = [ + self._get_feature_table_name(feature_name) + for feature_name in self.features.keys() + ] + self.step_data_connector.remove_tables(table_names) - for feature_name in self.features.keys(): - if "count" in self.required_stats: - update_dict[f"{feature_name}_count"] = compacted_table[ - feature_name - ].count() - if "mean" in self.required_stats: - update_dict[f"{feature_name}_mean"] = compacted_table[ - feature_name - ].mean() - if "max" in self.required_stats: - update_dict[f"{feature_name}_max"] = compacted_table[ - feature_name - ].max() - if "min" in self.required_stats: - update_dict[f"{feature_name}_min"] = compacted_table[ - feature_name - ].min() # update the metadata field marking the episode as compacted self.episode_info_connector.update_data( self.dataset_name, self.current_episode_id, update_dict ) - self.step_data_connector.save_table( - f"{self.dataset_name}_{self.current_episode_id}", - ) - - # TODO: this is a hack clear the old dataframe and load as a lazy frame - self.step_data_connector.load_tables( - [self.current_episode_id], - [f"{self.dataset_name}_{self.current_episode_id}"], - ) - self.episode_info_connector.save_table( f"{self.dataset_name}" ) diff --git a/fog_x/database/polars_connector.py b/fog_x/database/polars_connector.py index b312c74..c8c63d2 100644 --- a/fog_x/database/polars_connector.py +++ b/fog_x/database/polars_connector.py @@ -105,10 +105,10 @@ def merge_tables_with_timestamp( logger.debug("Only one table to merge.") self.tables[output_table] = merged_df - if clear_feature_tables: - for table_name in tables: - self.tables.pop(table_name) - logger.debug(f"Table {table_name} removed.") + def remove_tables(self, tables: List[str]): + for table_name in tables: + self.tables.pop(table_name) + logger.debug(f"Table {table_name} removed.") def select_table(self, table_name: str): # Return the DataFrame diff --git a/fog_x/dataset.py b/fog_x/dataset.py index e2c88c2..d456929 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -291,68 +291,11 @@ def export_rtx( writer._record_step(stepdata, is_new_episode=True) - def load_rtx_step_data_from_episode( - self, - tf_episode: Dict[str, Any], - additional_metadata: Optional[Dict[str, Any]] = None, - data_type: Dict[str, Any] = {} - ): - from tensorflow_datasets.core.features import ( - FeaturesDict, - Image, - Scalar, - Tensor, - Text, - ) - - fog_episode = self.new_episode( - metadata=additional_metadata, - ) - for step in tf_episode["steps"]: - for k, v in step.items(): - if k == "observation" or k == "action": - for k2, v2 in v.items(): - # TODO: abstract this to feature.py - - if ( - isinstance(data_type[k][k2], Tensor) - and data_type[k][k2].shape != () - ): - memfile = io.BytesIO() - np.save(memfile, v2.numpy()) - value = memfile.getvalue() - elif isinstance(data_type[k][k2], Image): - memfile = io.BytesIO() - np.save(memfile, v2.numpy()) - value = memfile.getvalue() - else: - value = v2.numpy() - - fog_episode.add( - feature=str(k2), - value=value, - feature_type=FeatureType( - tf_feature_spec=data_type[k][k2] - ), - ) - if k == "observation": - self.obs_keys.append(k2) - elif k == "action": - self.act_keys.append(k2) - else: - fog_episode.add( - feature=str(k), - value=v.numpy(), - feature_type=FeatureType(tf_feature_spec=data_type[k]), - ) - self.step_keys.append(k) - fog_episode.close() - def load_rtx_episodes( self, name: str, - split: Optional[str], - additional_metadata: Optional[Dict[str, Any]] = None, + split: Optional[str] = None, + additional_metadata: Optional[Dict[str, Any]] = dict(), ): """ Load robot data from Tensorflow Datasets. @@ -375,17 +318,108 @@ def load_rtx_episodes( from fog_x.rlds.utils import dataset2path b = tfds.builder_from_directory(builder_dir=dataset2path(name)) + ds = b.as_dataset(split=split) data_type = b.info.features["steps"] for tf_episode in ds: logger.info(tf_episode) - self.load_rtx_step_data_from_episode( - tf_episode, additional_metadata, data_type - ) + fog_episode = self.new_episode( + metadata=additional_metadata, + ) + for step in tf_episode["steps"]: + self._load_rtx_step_data_from_tf_step( + step, fog_episode, additional_metadata, data_type, + ) + fog_episode.close() + + def _prepare_rtx_metadata( + self, + name: str, + ): + + # this is only required if rtx format is used + import tensorflow_datasets as tfds + + from fog_x.rlds.utils import dataset2path + + b = tfds.builder_from_directory(builder_dir=dataset2path(name)) + + ds = b.as_dataset(split="all") + + data_type = b.info.features["steps"] + + counter = 0 + + for tf_episode in ds: + additional_metadata = { + "loading_method": f"{name}, {all}, {counter}", + } + logger.info(tf_episode) + fog_episode = self.new_episode( + metadata=additional_metadata, + ) + for step in tf_episode["steps"]: + self._load_rtx_step_data_from_tf_step( + step, fog_episode, additional_metadata, data_type, + ) + fog_episode.close(save_data = False) + counter += 1 + def _load_rtx_step_data_from_tf_step( + self, + step: Dict[str, Any], + fog_episode : Episode, + additional_metadata: Optional[Dict[str, Any]] = None, + data_type: Dict[str, Any] = {}, + ): + from tensorflow_datasets.core.features import ( + FeaturesDict, + Image, + Scalar, + Tensor, + Text, + ) + + for k, v in step.items(): + if k == "observation" or k == "action": + for k2, v2 in v.items(): + # TODO: abstract this to feature.py + + if ( + isinstance(data_type[k][k2], Tensor) + and data_type[k][k2].shape != () + ): + memfile = io.BytesIO() + np.save(memfile, v2.numpy()) + value = memfile.getvalue() + elif isinstance(data_type[k][k2], Image): + memfile = io.BytesIO() + np.save(memfile, v2.numpy()) + value = memfile.getvalue() + else: + value = v2.numpy() + fog_episode.add( + feature=str(k2), + value=value, + feature_type=FeatureType( + tf_feature_spec=data_type[k][k2] + ), + ) + if k == "observation": + self.obs_keys.append(k2) + elif k == "action": + self.act_keys.append(k2) + else: + fog_episode.add( + feature=str(k), + value=v.numpy(), + feature_type=FeatureType(tf_feature_spec=data_type[k]), + ) + self.step_keys.append(k) + def get_episode_info(self) -> pandas.DataFrame: """ diff --git a/fog_x/episode.py b/fog_x/episode.py index 662bd76..3a50a14 100644 --- a/fog_x/episode.py +++ b/fog_x/episode.py @@ -99,8 +99,8 @@ def get_steps(self) -> List[Dict[str, Any]]: """ return self.db_manager.get_steps() - def close(self) -> None: + def close(self, save_data = True, save_metadata = True) -> None: """ Saves the episode object. """ - self.db_manager.close() + self.db_manager.close(save_data, save_metadata) From 85186b6078676ebc4f1b7a04c90944d16f5dd8b2 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Tue, 16 Apr 2024 07:50:47 +0000 Subject: [PATCH 03/21] Fix loading_method typo in fog_x/dataset.py --- fog_x/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fog_x/dataset.py b/fog_x/dataset.py index d456929..ca19db1 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -354,7 +354,7 @@ def _prepare_rtx_metadata( for tf_episode in ds: additional_metadata = { - "loading_method": f"{name}, {all}, {counter}", + "loading_method": f"{name}, all, {counter}", } logger.info(tf_episode) From 913b0c629050bb13010ef7fd8b13ad94f6b69f66 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Tue, 16 Apr 2024 08:07:26 +0000 Subject: [PATCH 04/21] Refactor loading method in fog_x/dataset.py --- fog_x/dataset.py | 55 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/fog_x/dataset.py b/fog_x/dataset.py index ca19db1..e90664c 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -329,9 +329,12 @@ def load_rtx_episodes( metadata=additional_metadata, ) for step in tf_episode["steps"]: - self._load_rtx_step_data_from_tf_step( - step, fog_episode, additional_metadata, data_type, + ret = self._load_rtx_step_data_from_tf_step( + step, additional_metadata, data_type, ) + for r in ret: + fog_episode.add(**r) + fog_episode.close() def _prepare_rtx_metadata( @@ -362,16 +365,17 @@ def _prepare_rtx_metadata( metadata=additional_metadata, ) for step in tf_episode["steps"]: - self._load_rtx_step_data_from_tf_step( - step, fog_episode, additional_metadata, data_type, + ret = self._load_rtx_step_data_from_tf_step( + step, additional_metadata, data_type, ) + for r in ret: + fog_episode.add(**r) fog_episode.close(save_data = False) counter += 1 def _load_rtx_step_data_from_tf_step( self, step: Dict[str, Any], - fog_episode : Episode, additional_metadata: Optional[Dict[str, Any]] = None, data_type: Dict[str, Any] = {}, ): @@ -382,6 +386,7 @@ def _load_rtx_step_data_from_tf_step( Tensor, Text, ) + ret = [] for k, v in step.items(): if k == "observation" or k == "action": @@ -401,24 +406,44 @@ def _load_rtx_step_data_from_tf_step( value = memfile.getvalue() else: value = v2.numpy() - fog_episode.add( - feature=str(k2), - value=value, - feature_type=FeatureType( - tf_feature_spec=data_type[k][k2] - ), + + ret.append( + { + "feature": str(k2), + "value": value, + "feature_type": FeatureType( + tf_feature_spec=data_type[k][k2] + ), + } ) + # fog_episode.add( + # feature=str(k2), + # value=value, + # feature_type=FeatureType( + # tf_feature_spec=data_type[k][k2] + # ), + # ) if k == "observation": self.obs_keys.append(k2) elif k == "action": self.act_keys.append(k2) else: - fog_episode.add( - feature=str(k), - value=v.numpy(), - feature_type=FeatureType(tf_feature_spec=data_type[k]), + # fog_episode.add( + # feature=str(k), + # value=v.numpy(), + # feature_type=FeatureType(tf_feature_spec=data_type[k]), + # ) + ret.append( + { + "feature": str(k), + "value": v.numpy(), + "feature_type": FeatureType( + tf_feature_spec=data_type[k] + ), + } ) self.step_keys.append(k) + return ret def get_episode_info(self) -> pandas.DataFrame: From f6123f07049d75204a2bce65111865e8d678267c Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Tue, 16 Apr 2024 08:10:07 +0000 Subject: [PATCH 05/21] update planning doc --- design_doc/planning_doc.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/design_doc/planning_doc.md b/design_doc/planning_doc.md index f47db31..839ded3 100644 --- a/design_doc/planning_doc.md +++ b/design_doc/planning_doc.md @@ -17,4 +17,10 @@ 7. ROS / ROS2 8. recompaction to save space 9. CLI -10. hugging face \ No newline at end of file + +### For loader refactor +1. able to load from the saved dataset and recover semantics +2. able to generate gif and extract text message +3. generate dataset efficiently with {} + + From c4233110ad9e118988f9cee17b89f61779dc02c0 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Tue, 16 Apr 2024 17:13:45 +0000 Subject: [PATCH 06/21] Refactor database manager and dataset loading method --- fog_x/database/db_manager.py | 43 ++++++++++++++++++------------------ fog_x/dataset.py | 4 +++- fog_x/episode.py | 5 +++-- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/fog_x/database/db_manager.py b/fog_x/database/db_manager.py index 35ae4fc..21e0dbf 100644 --- a/fog_x/database/db_manager.py +++ b/fog_x/database/db_manager.py @@ -206,6 +206,7 @@ def add( value: Any, timestamp: int, feature_type: Optional[FeatureType] = None, + metadata_only = False, ): if feature_name not in self.features.keys(): logger.warning( @@ -219,15 +220,16 @@ def add( ) self._initialize_feature(feature_name, feature_type) - # insert data into the table - self.step_data_connector.insert_data( - self._get_feature_table_name(feature_name), - { - "episode_id": self.current_episode_id, - "Timestamp": timestamp, - feature_name: value, - }, - ) + if not metadata_only: + # insert data into the table + self.step_data_connector.insert_data( + self._get_feature_table_name(feature_name), + { + "episode_id": self.current_episode_id, + "Timestamp": timestamp, + feature_name: value, + }, + ) def compact(self): # create a table for the compacted data @@ -278,9 +280,12 @@ def _get_feature_table_name(self, feature_name): ) return table_name - def close(self, save_data=True, save_metadata=True): + def close(self, save_data=True, save_metadata=True, additional_metadata = None): - update_dict = {"Finished": True} + if additional_metadata is None: + additional_metadata = {} + logger.info(f"Closing the episode with metadata {additional_metadata}") + additional_metadata["Finished"] = True if save_data: self.compact() compacted_table = self.step_data_connector.select_table( @@ -288,19 +293,19 @@ def close(self, save_data=True, save_metadata=True): ) for feature_name in self.features.keys(): if "count" in self.required_stats: - update_dict[f"{feature_name}_count"] = compacted_table[ + additional_metadata[f"{feature_name}_count"] = compacted_table[ feature_name ].count() if "mean" in self.required_stats: - update_dict[f"{feature_name}_mean"] = compacted_table[ + additional_metadata[f"{feature_name}_mean"] = compacted_table[ feature_name ].mean() if "max" in self.required_stats: - update_dict[f"{feature_name}_max"] = compacted_table[ + additional_metadata[f"{feature_name}_max"] = compacted_table[ feature_name ].max() if "min" in self.required_stats: - update_dict[f"{feature_name}_min"] = compacted_table[ + additional_metadata[f"{feature_name}_min"] = compacted_table[ feature_name ].min() self.step_data_connector.save_table( @@ -313,12 +318,6 @@ def close(self, save_data=True, save_metadata=True): [f"{self.dataset_name}_{self.current_episode_id}"], ) else: - for feature_name in self.features.keys(): - feature_table = self.step_data_connector.select_table( - self._get_feature_table_name(feature_name) - ) - update_dict[f"{feature_name}_count"] = feature_table[feature_name].count() - table_names = [ self._get_feature_table_name(feature_name) for feature_name in self.features.keys() @@ -328,7 +327,7 @@ def close(self, save_data=True, save_metadata=True): # update the metadata field marking the episode as compacted self.episode_info_connector.update_data( - self.dataset_name, self.current_episode_id, update_dict + self.dataset_name, self.current_episode_id, additional_metadata ) self.episode_info_connector.save_table( diff --git a/fog_x/dataset.py b/fog_x/dataset.py index e90664c..5e74880 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -357,7 +357,8 @@ def _prepare_rtx_metadata( for tf_episode in ds: additional_metadata = { - "loading_method": f"{name}, all, {counter}", + "load_from": name, + "load_index": f"all, {counter}", } logger.info(tf_episode) @@ -369,6 +370,7 @@ def _prepare_rtx_metadata( step, additional_metadata, data_type, ) for r in ret: + r["metadata_only"] = True fog_episode.add(**r) fog_episode.close(save_data = False) counter += 1 diff --git a/fog_x/episode.py b/fog_x/episode.py index 3a50a14..a57f69b 100644 --- a/fog_x/episode.py +++ b/fog_x/episode.py @@ -27,6 +27,7 @@ def add( value: Any, timestamp: Optional[int] = None, feature_type: Optional[FeatureType] = None, + metadata_only = False, ) -> None: """ Add one feature step data. @@ -51,7 +52,7 @@ def add( f"Adding {feature} with value {value} at timestamp {timestamp}" ) if self.db_manager: - self.db_manager.add(feature, value, timestamp, feature_type) + self.db_manager.add(feature, value, timestamp, feature_type, metadata_only) else: logger.warning( "No database manager provided, data will not be saved" @@ -103,4 +104,4 @@ def close(self, save_data = True, save_metadata = True) -> None: """ Saves the episode object. """ - self.db_manager.close(save_data, save_metadata) + self.db_manager.close(save_data=save_data, save_metadata=save_metadata) From 6bff2c5d5e55c0ff7391fc675892f5aa3dc45331 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Tue, 16 Apr 2024 19:48:28 +0000 Subject: [PATCH 07/21] Refactor database manager and dataset loading methods, add metadata key to db_manager.py, and fix close method in Episode class --- fog_x/database/db_manager.py | 8 +++++- fog_x/dataset.py | 56 +++++++++++++++++++++++++++++------- fog_x/episode.py | 4 +-- 3 files changed, 54 insertions(+), 14 deletions(-) diff --git a/fog_x/database/db_manager.py b/fog_x/database/db_manager.py index 21e0dbf..a5d35ce 100644 --- a/fog_x/database/db_manager.py +++ b/fog_x/database/db_manager.py @@ -324,7 +324,13 @@ def close(self, save_data=True, save_metadata=True, additional_metadata = None) ] self.step_data_connector.remove_tables(table_names) - + for metadata_key in additional_metadata.keys(): + logger.debug(f"Adding metadata key {metadata_key} to the database") + self.episode_info_connector.add_column( + self.dataset_name, + metadata_key, + "str", # TODO: support more types + ) # update the metadata field marking the episode as compacted self.episode_info_connector.update_data( self.dataset_name, self.current_episode_id, additional_metadata diff --git a/fog_x/dataset.py b/fog_x/dataset.py index 5e74880..cc517a7 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -330,7 +330,7 @@ def load_rtx_episodes( ) for step in tf_episode["steps"]: ret = self._load_rtx_step_data_from_tf_step( - step, additional_metadata, data_type, + step, data_type, ) for r in ret: fog_episode.add(**r) @@ -340,45 +340,79 @@ def load_rtx_episodes( def _prepare_rtx_metadata( self, name: str, + export_path: Optional[str] = None, ): # this is only required if rtx format is used import tensorflow_datasets as tfds - from fog_x.rlds.utils import dataset2path + import cv2 b = tfds.builder_from_directory(builder_dir=dataset2path(name)) - ds = b.as_dataset(split="all") - data_type = b.info.features["steps"] - counter = 0 + if export_path == None: + export_path = self.path + "/viz" + if not os.path.exists(export_path): + os.makedirs(export_path) + + for tf_episode in ds: + video_writers = {} + additional_metadata = { "load_from": name, "load_index": f"all, {counter}", } logger.info(tf_episode) - fog_episode = self.new_episode( - metadata=additional_metadata, - ) + fog_episode = self.new_episode() + for step in tf_episode["steps"]: ret = self._load_rtx_step_data_from_tf_step( - step, additional_metadata, data_type, + step, data_type, ) for r in ret: + feature_name = r["feature"] + if "image" in feature_name and "depth" not in feature_name: + if feature_name not in video_writers: + + output_filename = f"{self.name}_{counter}_{feature_name}" + output_path = f"{export_path}/{output_filename}" + + image = np.load(io.BytesIO(r["value"])) + logger.info(f"feature: {feature_name}, image: {image}") + + # save the initial image + cv2.imwrite(f"{output_path}.jpg", image) + # save the video + video_writers[feature_name] = cv2.VideoWriter( + f"{output_path}.mp4", + cv2.VideoWriter_fourcc(*"mp4v"), + 30, + (640, 480), + ) + additional_metadata[f"video_path_{feature_name}"] = output_filename + video_writers[r["feature"]].write(image) + + if "instruction" in r["feature"]: + natural_language_instruction = r["value"].decode("utf-8") + additional_metadata["natural_language_instruction"] = natural_language_instruction + r["metadata_only"] = True fog_episode.add(**r) - fog_episode.close(save_data = False) + + for _, video_writer in video_writers.items(): + video_writer.release() + video_writers = {} + fog_episode.close(save_data = False, additional_metadata = additional_metadata) counter += 1 def _load_rtx_step_data_from_tf_step( self, step: Dict[str, Any], - additional_metadata: Optional[Dict[str, Any]] = None, data_type: Dict[str, Any] = {}, ): from tensorflow_datasets.core.features import ( diff --git a/fog_x/episode.py b/fog_x/episode.py index a57f69b..9e83eb8 100644 --- a/fog_x/episode.py +++ b/fog_x/episode.py @@ -100,8 +100,8 @@ def get_steps(self) -> List[Dict[str, Any]]: """ return self.db_manager.get_steps() - def close(self, save_data = True, save_metadata = True) -> None: + def close(self, save_data = True, save_metadata = True, additional_metadata = {}) -> None: """ Saves the episode object. """ - self.db_manager.close(save_data=save_data, save_metadata=save_metadata) + self.db_manager.close(save_data=save_data, save_metadata=save_metadata, additional_metadata=additional_metadata) From e065c9e34cfe06948883e51a2b8f9bd45390649c Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Tue, 16 Apr 2024 19:54:33 +0000 Subject: [PATCH 08/21] Fix video frame rate in fog_x/dataset.py --- fog_x/dataset.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fog_x/dataset.py b/fog_x/dataset.py index cc517a7..ab3cec3 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -374,27 +374,27 @@ def _prepare_rtx_metadata( ret = self._load_rtx_step_data_from_tf_step( step, data_type, ) + for r in ret: feature_name = r["feature"] if "image" in feature_name and "depth" not in feature_name: + image = np.load(io.BytesIO(r["value"])) if feature_name not in video_writers: output_filename = f"{self.name}_{counter}_{feature_name}" output_path = f"{export_path}/{output_filename}" - image = np.load(io.BytesIO(r["value"])) - logger.info(f"feature: {feature_name}, image: {image}") - # save the initial image cv2.imwrite(f"{output_path}.jpg", image) # save the video video_writers[feature_name] = cv2.VideoWriter( f"{output_path}.mp4", cv2.VideoWriter_fourcc(*"mp4v"), - 30, + 10, (640, 480), ) additional_metadata[f"video_path_{feature_name}"] = output_filename + video_writers[r["feature"]].write(image) if "instruction" in r["feature"]: From a29712b5762f7bb07517787eb23df982672cff3b Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Tue, 16 Apr 2024 20:19:57 +0000 Subject: [PATCH 09/21] Refactor dataset loading method and update dataset list in dataset_organizer.py --- examples/analytics/dataset_organizer.py | 78 ++++++++++++++++++++++--- fog_x/dataset.py | 15 ++++- 2 files changed, 82 insertions(+), 11 deletions(-) diff --git a/examples/analytics/dataset_organizer.py b/examples/analytics/dataset_organizer.py index 003cf2c..f99197b 100644 --- a/examples/analytics/dataset_organizer.py +++ b/examples/analytics/dataset_organizer.py @@ -1,12 +1,74 @@ import fog_x -dataset = fog_x.dataset.Dataset( - name="demo_ds", - path="~/test_dataset", -) - -dataset._prepare_rtx_metadata( - name="berkeley_autolab_ur5", -) + + + +DATASETS = [ + "fractal20220817_data", + "kuka", + "bridge", + "taco_play", + "jaco_play", + "berkeley_cable_routing", + "roboturk", + "nyu_door_opening_surprising_effectiveness", + "viola", + "berkeley_autolab_ur5", + "toto", + "language_table", + "columbia_cairlab_pusht_real", + "stanford_kuka_multimodal_dataset_converted_externally_to_rlds", + "nyu_rot_dataset_converted_externally_to_rlds", + "stanford_hydra_dataset_converted_externally_to_rlds", + "austin_buds_dataset_converted_externally_to_rlds", + "nyu_franka_play_dataset_converted_externally_to_rlds", + "maniskill_dataset_converted_externally_to_rlds", + "cmu_franka_exploration_dataset_converted_externally_to_rlds", + "ucsd_kitchen_dataset_converted_externally_to_rlds", + "ucsd_pick_and_place_dataset_converted_externally_to_rlds", + "austin_sailor_dataset_converted_externally_to_rlds", + "austin_sirius_dataset_converted_externally_to_rlds", + "bc_z", + "usc_cloth_sim_converted_externally_to_rlds", + "utokyo_pr2_opening_fridge_converted_externally_to_rlds", + "utokyo_pr2_tabletop_manipulation_converted_externally_to_rlds", + "utokyo_saytap_converted_externally_to_rlds", + "utokyo_xarm_pick_and_place_converted_externally_to_rlds", + "utokyo_xarm_bimanual_converted_externally_to_rlds", + "robo_net", + "berkeley_mvp_converted_externally_to_rlds", + "berkeley_rpt_converted_externally_to_rlds", + "kaist_nonprehensile_converted_externally_to_rlds", + "stanford_mask_vit_converted_externally_to_rlds", + "tokyo_u_lsmo_converted_externally_to_rlds", + "dlr_sara_pour_converted_externally_to_rlds", + "dlr_sara_grid_clamp_converted_externally_to_rlds", + "dlr_edan_shared_control_converted_externally_to_rlds", + "asu_table_top_converted_externally_to_rlds", + "stanford_robocook_converted_externally_to_rlds", + "eth_agent_affordances", + "imperialcollege_sawyer_wrist_cam", + "iamlab_cmu_pickup_insert_converted_externally_to_rlds", + "uiuc_d3field", + "utaustin_mutex", + "berkeley_fanuc_manipulation", + "cmu_play_fusion", + "cmu_stretch", + "berkeley_gnm_recon", + "berkeley_gnm_cory_hall", + "berkeley_gnm_sac_son", +] + +for dataset_name in DATASETS: + dataset = fog_x.dataset.Dataset( + name=dataset_name, + path="~/test_dataset", + ) + + dataset._prepare_rtx_metadata( + name=dataset_name, + sample_size = 10, + shuffle=True, + ) diff --git a/fog_x/dataset.py b/fog_x/dataset.py index ab3cec3..7cd5d5f 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -341,6 +341,9 @@ def _prepare_rtx_metadata( self, name: str, export_path: Optional[str] = None, + sample_size = 20, + shuffle = False, + seed = 42, ): # this is only required if rtx format is used @@ -350,11 +353,13 @@ def _prepare_rtx_metadata( b = tfds.builder_from_directory(builder_dir=dataset2path(name)) ds = b.as_dataset(split="all") + if shuffle: + ds = ds.shuffle(sample_size, seed=seed) data_type = b.info.features["steps"] counter = 0 if export_path == None: - export_path = self.path + "/viz" + export_path = self.path + "/" + self.name + "_viz" if not os.path.exists(export_path): os.makedirs(export_path) @@ -364,7 +369,7 @@ def _prepare_rtx_metadata( additional_metadata = { "load_from": name, - "load_index": f"all, {counter}", + "load_index": f"all, {shuffle}, {seed}, {counter}", } logger.info(tf_episode) @@ -384,6 +389,8 @@ def _prepare_rtx_metadata( output_filename = f"{self.name}_{counter}_{feature_name}" output_path = f"{export_path}/{output_filename}" + frame_size = (image.shape[1], image.shape[0]) + # save the initial image cv2.imwrite(f"{output_path}.jpg", image) # save the video @@ -391,7 +398,7 @@ def _prepare_rtx_metadata( f"{output_path}.mp4", cv2.VideoWriter_fourcc(*"mp4v"), 10, - (640, 480), + frame_size ) additional_metadata[f"video_path_{feature_name}"] = output_filename @@ -409,6 +416,8 @@ def _prepare_rtx_metadata( video_writers = {} fog_episode.close(save_data = False, additional_metadata = additional_metadata) counter += 1 + if counter > sample_size: + break def _load_rtx_step_data_from_tf_step( self, From 9e2b41c7d39a73027296dd145f75c47a85abed32 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Tue, 16 Apr 2024 20:40:03 +0000 Subject: [PATCH 10/21] Update video frame rate in fog_x/dataset.py --- fog_x/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fog_x/dataset.py b/fog_x/dataset.py index 7cd5d5f..15e686c 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -397,7 +397,7 @@ def _prepare_rtx_metadata( video_writers[feature_name] = cv2.VideoWriter( f"{output_path}.mp4", cv2.VideoWriter_fourcc(*"mp4v"), - 10, + 15, frame_size ) additional_metadata[f"video_path_{feature_name}"] = output_filename From 457a75909929090641825ef46f0c9e2b86c8b1b9 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Wed, 17 Apr 2024 01:14:31 +0000 Subject: [PATCH 11/21] support all datasets --- examples/analytics/dataset_organizer.py | 7 ++----- fog_x/dataset.py | 3 ++- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/examples/analytics/dataset_organizer.py b/examples/analytics/dataset_organizer.py index f99197b..bc417ef 100644 --- a/examples/analytics/dataset_organizer.py +++ b/examples/analytics/dataset_organizer.py @@ -15,7 +15,6 @@ "viola", "berkeley_autolab_ur5", "toto", - "language_table", "columbia_cairlab_pusht_real", "stanford_kuka_multimodal_dataset_converted_externally_to_rlds", "nyu_rot_dataset_converted_externally_to_rlds", @@ -56,7 +55,7 @@ "cmu_stretch", "berkeley_gnm_recon", "berkeley_gnm_cory_hall", - "berkeley_gnm_sac_son", + # "berkeley_gnm_sac_son", ] for dataset_name in DATASETS: @@ -69,6 +68,4 @@ name=dataset_name, sample_size = 10, shuffle=True, - ) - - + ) \ No newline at end of file diff --git a/fog_x/dataset.py b/fog_x/dataset.py index 15e686c..0eb7943 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -434,7 +434,8 @@ def _load_rtx_step_data_from_tf_step( ret = [] for k, v in step.items(): - if k == "observation" or k == "action": + # logger.info(f"k {k} , v {v}") + if isinstance(v, dict): #and (k == "observation" or k == "action"): for k2, v2 in v.items(): # TODO: abstract this to feature.py From 2e8046cb1a85fb4be533aa9a6427020917712ba8 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Wed, 17 Apr 2024 02:37:26 +0000 Subject: [PATCH 12/21] able to print as website --- examples/analytics/dataset_organizer.py | 68 +++++++++++++++++++++++-- fog_x/database/polars_connector.py | 2 +- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/examples/analytics/dataset_organizer.py b/examples/analytics/dataset_organizer.py index bc417ef..95ec058 100644 --- a/examples/analytics/dataset_organizer.py +++ b/examples/analytics/dataset_organizer.py @@ -58,14 +58,72 @@ # "berkeley_gnm_sac_son", ] + +objects = ["marker", "cloth", "cup", "object", "bottle", "block", "drawer", "lid", "mug"] +tasks = ["put", "move", "pick", "remove", "take", "open", "close", "place", "turn", "push", + "insert", "stack", "lift", "pour"] # things not in DROID +views = ["wrist", "top", "other"] + +dataset_id = 0 for dataset_name in DATASETS: dataset = fog_x.dataset.Dataset( name=dataset_name, path="~/test_dataset", ) - dataset._prepare_rtx_metadata( - name=dataset_name, - sample_size = 10, - shuffle=True, - ) \ No newline at end of file + # dataset._prepare_rtx_metadata( + # name=dataset_name, + # sample_size = 10, + # shuffle=True, + # ) + + info = dataset.get_episode_info() + + for episode_metadata in info.iter_rows(named = True): + instruction = episode_metadata["natural_language_instruction"] + + d = dict() + instruction = instruction.lower().replace(",", "").replace("\n", "") + d["dataset_id"] = f"dataset-{dataset_id}" + d["info"] = instruction + task_id = -1 + for task in tasks: + if task in instruction: + task_id = tasks.index(task) + if task_id == -1: + task_id = len(tasks) - 1 + + obj_id = -1 + for obj in objects: + if obj in instruction: + obj_id = objects.index(obj) + if obj_id == -1: + obj_id = len(objects) - 1 + + d["task_id"] = f"task-{task_id}" + d["object_id"] = f"object-{obj_id}" + + images_features = [col for col in info.columns if col.startswith("video_path_")] + for i, image_feature in enumerate(images_features): + path = episode_metadata[image_feature] + d["poster"] = f"videos/{dataset_name}_viz/{path}.jpg" + d["src"] = f"videos/{dataset_name}_viz/{path}.mp4" + view_id = -1 + for view in views: + if view in path: + view_id = views.index(view) + break + if view_id == -1: + view_id = len(views) - 1 + + d["view_id"] = f"view-{view_id}" + + # print d in JSON format + with open("/tmp/dataset_info.txt", "a") as file: + printable = str(d).replace("\'", "\"") + file.write(f'JSON.parse(\'{printable}\'),\n') + + + # write as a line of JSON.parse('{"info": "Unfold the tea towel", "poster": "videos/bridge_viz/bridge_0_image.jpg", "src": "videos/bridge_viz/bridge_0_image.mp4"}'), + # print (f'JSON.parse(\'{{"info": "{instruction}", "poster": "videos/{dataset_name}_viz/{dataset_name}_{episode_id}_image.jpg", "src": "videos/{dataset_name}_viz/{dataset_name}_{dataset_id}_image.mp4"}}\'),') + dataset_id += 1 \ No newline at end of file diff --git a/fog_x/database/polars_connector.py b/fog_x/database/polars_connector.py index c8c63d2..e53cd45 100644 --- a/fog_x/database/polars_connector.py +++ b/fog_x/database/polars_connector.py @@ -170,7 +170,7 @@ def load_tables(self, episode_ids: List[str], table_names: List[str]): self.tables[table_name] = self.dataset.filter( pl.col("episode_id") == episode_id ) - logger.debug(f"Tables loaded from {self.tables}") + # logger.debug(f"Tables loaded from {self.tables}") def create_table(self, table_name: str): # Create a new DataFrame with specified columns From 5df308b36b274e5c454d54da52b2b80a5f21196c Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Wed, 17 Apr 2024 03:56:32 +0000 Subject: [PATCH 13/21] encode vic with h264 --- examples/analytics/dataset_organizer.py | 12 ++++----- fog_x/dataset.py | 33 +++++++++++++++++++++---- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/examples/analytics/dataset_organizer.py b/examples/analytics/dataset_organizer.py index 95ec058..08742f3 100644 --- a/examples/analytics/dataset_organizer.py +++ b/examples/analytics/dataset_organizer.py @@ -68,14 +68,14 @@ for dataset_name in DATASETS: dataset = fog_x.dataset.Dataset( name=dataset_name, - path="~/test_dataset", + path="~/rtx_datasets", ) - # dataset._prepare_rtx_metadata( - # name=dataset_name, - # sample_size = 10, - # shuffle=True, - # ) + dataset._prepare_rtx_metadata( + name=dataset_name, + sample_size = 20, + shuffle=True, + ) info = dataset.get_episode_info() diff --git a/fog_x/dataset.py b/fog_x/dataset.py index 0eb7943..387ac49 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -2,7 +2,7 @@ import logging import os from typing import Any, Dict, List, Optional, Tuple - +import subprocess import numpy as np import polars import pandas @@ -20,6 +20,19 @@ logger = logging.getLogger(__name__) + +def convert_to_h264(input_file, output_file): + + # FFmpeg command to convert video to H.264 + command = [ + 'ffmpeg', + '-i', input_file, # Input file + '-loglevel', 'error', # Suppress the logs + '-vcodec', 'h264', # Specify the codec + output_file # Output file + ] + subprocess.run(command) + class Dataset: """ Create or load from a new dataset. @@ -337,6 +350,7 @@ def load_rtx_episodes( fog_episode.close() + def _prepare_rtx_metadata( self, name: str, @@ -387,6 +401,7 @@ def _prepare_rtx_metadata( if feature_name not in video_writers: output_filename = f"{self.name}_{counter}_{feature_name}" + tmp_vid_output_path = f"/tmp/{output_filename}.mp4" output_path = f"{export_path}/{output_filename}" frame_size = (image.shape[1], image.shape[0]) @@ -395,12 +410,12 @@ def _prepare_rtx_metadata( cv2.imwrite(f"{output_path}.jpg", image) # save the video video_writers[feature_name] = cv2.VideoWriter( - f"{output_path}.mp4", + tmp_vid_output_path, cv2.VideoWriter_fourcc(*"mp4v"), - 15, + 10, frame_size ) - additional_metadata[f"video_path_{feature_name}"] = output_filename + video_writers[r["feature"]].write(image) @@ -411,8 +426,16 @@ def _prepare_rtx_metadata( r["metadata_only"] = True fog_episode.add(**r) - for _, video_writer in video_writers.items(): + for feature_name, video_writer in video_writers.items(): video_writer.release() + # need to convert to h264 to properly display over chrome / vscode + output_filename = f"{self.name}_{counter}_{feature_name}" + tmp_vid_output_path = f"/tmp/{output_filename}.mp4" + vid_output_path = f"{export_path}/{output_filename}.mp4" + convert_to_h264(tmp_vid_output_path, vid_output_path) + if os.path.isfile(tmp_vid_output_path): + os.remove(tmp_vid_output_path) + video_writers = {} fog_episode.close(save_data = False, additional_metadata = additional_metadata) counter += 1 From 8401ef6cdb02d9dc6f7f5152e3a565a22bc0ba03 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Wed, 17 Apr 2024 06:24:48 +0000 Subject: [PATCH 14/21] Refactor dataset loading method and update dataset list in dataset_organizer.py, and fix video frame rate in fog_x/dataset.py --- examples/analytics/dataset_organizer.py | 14 +++++--------- fog_x/dataset.py | 1 + 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/examples/analytics/dataset_organizer.py b/examples/analytics/dataset_organizer.py index 08742f3..f461f1a 100644 --- a/examples/analytics/dataset_organizer.py +++ b/examples/analytics/dataset_organizer.py @@ -1,8 +1,5 @@ import fog_x - - - DATASETS = [ "fractal20220817_data", "kuka", @@ -59,10 +56,10 @@ ] -objects = ["marker", "cloth", "cup", "object", "bottle", "block", "drawer", "lid", "mug"] -tasks = ["put", "move", "pick", "remove", "take", "open", "close", "place", "turn", "push", +objects = ["NOTEXIST", "marker", "cloth", "cup", "object", "bottle", "block", "drawer", "lid", "mug"] +tasks = ["NOTEXIST", "put", "move", "pick", "remove", "take", "open", "close", "place", "turn", "push", "insert", "stack", "lift", "pour"] # things not in DROID -views = ["wrist", "top", "other"] +views = ["NOTEXIST", "wrist", "top", "other"] dataset_id = 0 for dataset_name in DATASETS: @@ -73,7 +70,7 @@ dataset._prepare_rtx_metadata( name=dataset_name, - sample_size = 20, + sample_size = 10, shuffle=True, ) @@ -83,7 +80,7 @@ instruction = episode_metadata["natural_language_instruction"] d = dict() - instruction = instruction.lower().replace(",", "").replace("\n", "") + instruction = instruction.lower().replace(",", "").replace("\n", "").replace("\"", "").replace("\'", "") d["dataset_id"] = f"dataset-{dataset_id}" d["info"] = instruction task_id = -1 @@ -112,7 +109,6 @@ for view in views: if view in path: view_id = views.index(view) - break if view_id == -1: view_id = len(views) - 1 diff --git a/fog_x/dataset.py b/fog_x/dataset.py index 387ac49..3783f54 100644 --- a/fog_x/dataset.py +++ b/fog_x/dataset.py @@ -433,6 +433,7 @@ def _prepare_rtx_metadata( tmp_vid_output_path = f"/tmp/{output_filename}.mp4" vid_output_path = f"{export_path}/{output_filename}.mp4" convert_to_h264(tmp_vid_output_path, vid_output_path) + additional_metadata[f"video_path_{feature_name}"] = output_filename if os.path.isfile(tmp_vid_output_path): os.remove(tmp_vid_output_path) From c6a4768099f744963d5be46074878a794e577ad7 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Wed, 17 Apr 2024 07:43:01 +0000 Subject: [PATCH 15/21] Update dependencies and fix dataset loading in dataset_organizer.py --- Containerfile | 2 +- examples/analytics/dataset_organizer.py | 13 +++++++++---- requirements-test.txt => requirements-full.txt | 0 setup.py | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) rename requirements-test.txt => requirements-full.txt (100%) diff --git a/Containerfile b/Containerfile index 90aaf34..571449c 100644 --- a/Containerfile +++ b/Containerfile @@ -7,7 +7,7 @@ RUN apt-get update && \ COPY . /app WORKDIR /app -RUN pip install . +RUN pip install .[full] RUN pip3 install jupyter COPY . / diff --git a/examples/analytics/dataset_organizer.py b/examples/analytics/dataset_organizer.py index f461f1a..9ebd454 100644 --- a/examples/analytics/dataset_organizer.py +++ b/examples/analytics/dataset_organizer.py @@ -68,12 +68,17 @@ path="~/rtx_datasets", ) - dataset._prepare_rtx_metadata( + # dataset._prepare_rtx_metadata( + # name=dataset_name, + # sample_size = 10, + # shuffle=True, + # ) + +for dataset_name in DATASETS: + dataset = fog_x.dataset.Dataset( name=dataset_name, - sample_size = 10, - shuffle=True, + path="~/rtx_datasets", ) - info = dataset.get_episode_info() for episode_metadata in info.iter_rows(named = True): diff --git a/requirements-test.txt b/requirements-full.txt similarity index 100% rename from requirements-test.txt rename to requirements-full.txt diff --git a/setup.py b/setup.py index 722ddc9..307521b 100644 --- a/setup.py +++ b/setup.py @@ -42,5 +42,5 @@ def read_requirements(path): entry_points={ "console_scripts": ["fog_x = fog_x.__main__:main"] }, - extras_require={"test": read_requirements("requirements-test.txt")}, + extras_require={"full": read_requirements("requirements-full.txt")}, ) From 1abe058d35b4d586fc520752bc538095f03d995e Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Wed, 17 Apr 2024 08:05:47 +0000 Subject: [PATCH 16/21] Update dependencies, add new libraries, and fix dataset loading and metadata preparation --- Containerfile | 13 ++++++++++++- examples/analytics/dataset_organizer.py | 10 +++++----- requirements.txt | 1 + 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/Containerfile b/Containerfile index 571449c..83c8e89 100644 --- a/Containerfile +++ b/Containerfile @@ -3,8 +3,19 @@ FROM ubuntu:latest RUN apt-get update && \ apt-get install -y python3.9 \ python3-pip \ - libgmp-dev + libgmp-dev \ + ffmpeg +RUN pip3 install pandas \ + polars \ + numpy \ + tensorflow \ + torch \ + tensorflow_datasets \ + envlogger \ + datasets \ + pyarrow + COPY . /app WORKDIR /app RUN pip install .[full] diff --git a/examples/analytics/dataset_organizer.py b/examples/analytics/dataset_organizer.py index 9ebd454..a36bf8f 100644 --- a/examples/analytics/dataset_organizer.py +++ b/examples/analytics/dataset_organizer.py @@ -68,11 +68,11 @@ path="~/rtx_datasets", ) - # dataset._prepare_rtx_metadata( - # name=dataset_name, - # sample_size = 10, - # shuffle=True, - # ) + dataset._prepare_rtx_metadata( + name=dataset_name, + sample_size = 10, + shuffle=True, + ) for dataset_name in DATASETS: dataset = fog_x.dataset.Dataset( diff --git a/requirements.txt b/requirements.txt index 5d00c94..c65802b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ torch tensorflow tensorflow_datasets envlogger +opencv-python \ No newline at end of file From 44cef09f3f0393ce7021939ff518774accab93b4 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Thu, 18 Apr 2024 22:15:05 +0000 Subject: [PATCH 17/21] Refactor save_table method in polars_connector.py to use Arrow format for writing tables --- fog_x/database/polars_connector.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fog_x/database/polars_connector.py b/fog_x/database/polars_connector.py index e53cd45..4bece21 100644 --- a/fog_x/database/polars_connector.py +++ b/fog_x/database/polars_connector.py @@ -146,9 +146,7 @@ def load_tables(self, table_names: List[str]): logger.debug(f"Table {table_name} does not exist in {path}.") def save_table(self, table_name: str): - self.tables[table_name].write_parquet( - f"{self.path}/{table_name}.parquet" - ) + pq.write_table(self.tables[table_name].to_arrow(), f"{self.path}/{table_name}.parquet") class LazyFrameConnector(PolarsConnector): From 512218bc098d2fa7d5887b450da796d40f6db333 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Thu, 18 Apr 2024 22:32:31 +0000 Subject: [PATCH 18/21] use pyarrow to read --- fog_x/database/polars_connector.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/fog_x/database/polars_connector.py b/fog_x/database/polars_connector.py index 4bece21..8c8e79b 100644 --- a/fog_x/database/polars_connector.py +++ b/fog_x/database/polars_connector.py @@ -139,11 +139,14 @@ def load_tables(self, table_names: List[str]): # load tables from the path for table_name in table_names: path = f"{self.path}/{table_name}.parquet" - if os.path.exists(os.path.expanduser(path)): - self.tables[table_name] = pl.read_parquet(path) - self.table_len[table_name] = len(self.tables[table_name]) - else: - logger.debug(f"Table {table_name} does not exist in {path}.") + # if os.path.exists(os.path.expanduser(path)): + # self.tables[table_name] = pl.read_parquet(path) + # self.table_len[table_name] = len(self.tables[table_name]) + # else: + # logger.debug(f"Table {table_name} does not exist in {path}.") + path = os.path.expanduser(path) + self.tables[table_name] = pl.from_arrow(pq.read_table(path)) + self.table_len[table_name] = len(self.tables[table_name]) def save_table(self, table_name: str): pq.write_table(self.tables[table_name].to_arrow(), f"{self.path}/{table_name}.parquet") From 4647695a903ae9da9e8ad69fe62e8aa18b4e348d Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Thu, 18 Apr 2024 22:36:14 +0000 Subject: [PATCH 19/21] Add logging statement to DataFrameConnector's load_table method --- fog_x/database/polars_connector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fog_x/database/polars_connector.py b/fog_x/database/polars_connector.py index 8c8e79b..8dbbccf 100644 --- a/fog_x/database/polars_connector.py +++ b/fog_x/database/polars_connector.py @@ -147,6 +147,7 @@ def load_tables(self, table_names: List[str]): path = os.path.expanduser(path) self.tables[table_name] = pl.from_arrow(pq.read_table(path)) self.table_len[table_name] = len(self.tables[table_name]) + logger.info(f"Table {table_name} loaded from {path}.") def save_table(self, table_name: str): pq.write_table(self.tables[table_name].to_arrow(), f"{self.path}/{table_name}.parquet") From 12803d50094d37b79d883cb1f69d4f785f30eb86 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Thu, 18 Apr 2024 22:38:55 +0000 Subject: [PATCH 20/21] Add logging statement to DataFrameConnector's load_table method and update dependencies --- fog_x/database/polars_connector.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fog_x/database/polars_connector.py b/fog_x/database/polars_connector.py index 8dbbccf..42aac30 100644 --- a/fog_x/database/polars_connector.py +++ b/fog_x/database/polars_connector.py @@ -136,9 +136,11 @@ def create_table(self, table_name: str): def load_tables(self, table_names: List[str]): + # load tables from the path for table_name in table_names: path = f"{self.path}/{table_name}.parquet" + logger.info(f"Prepare to load table {table_name} loaded from {path}.") # if os.path.exists(os.path.expanduser(path)): # self.tables[table_name] = pl.read_parquet(path) # self.table_len[table_name] = len(self.tables[table_name]) From e27cd54941b414306f9fcb6a7fbddab55144eed2 Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Thu, 18 Apr 2024 23:40:30 +0000 Subject: [PATCH 21/21] add cloud demo --- examples/Fog_RTX_Cloud_Demo.ipynb | 425 ++++++++++++++++++++++++++++++ 1 file changed, 425 insertions(+) create mode 100644 examples/Fog_RTX_Cloud_Demo.ipynb diff --git a/examples/Fog_RTX_Cloud_Demo.ipynb b/examples/Fog_RTX_Cloud_Demo.ipynb new file mode 100644 index 0000000..9469984 --- /dev/null +++ b/examples/Fog_RTX_Cloud_Demo.ipynb @@ -0,0 +1,425 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + }, + "language_info": { + "name": "python" + } + }, + "cells": [ + { + "cell_type": "markdown", + "source": [ + "# Fog-RTX Work With Cloud\n", + "FogRTX supports a wide range of cloud service providers.\n", + "\n", + "### AWS" + ], + "metadata": { + "id": "upgVMgpfdSCk" + } + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "ex7qQ3klf2_y" + }, + "outputs": [], + "source": [ + "! git clone https://github.com/KeplerC/fog_x.git\n", + "! cd fog_x && git checkout cloud && pip install ." + ] + }, + { + "cell_type": "markdown", + "source": [ + "Install required AWS dependency and configure with your aws credential" + ], + "metadata": { + "id": "TbCXBT0rdgR7" + } + }, + { + "cell_type": "code", + "source": [ + "! curl \"https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip\" -o \"awscliv2.zip\"\n", + "! unzip awscliv2.zip\n", + "! sudo ./aws/install\n", + "! aws configure" + ], + "metadata": { + "id": "k4Q0PXALS0Ol" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# create an AWS bucket named fog-rtx-test-east-1 (anything you want)\n", + "!aws s3api create-bucket --bucket fog-rtx-test-east-1 --region us-east-1" + ], + "metadata": { + "id": "r-VhhYFBdqit" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Fog-RTX can load from the existing bucket, and add more to it!" + ], + "metadata": { + "id": "ZesCYo2udsei" + } + }, + { + "cell_type": "code", + "source": [ + "import fog_x\n", + "\n", + "dataset = fog_x.dataset.Dataset(\n", + " name=\"demo_ds\",\n", + " path='s3://fog-rtx-test-east-1',\n", + ")" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "Il5tj-i1f8bG", + "outputId": "401bee3b-0807-4a01-e6e8-30a8f9756b48" + }, + "execution_count": 8, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:fog_x.database.polars_connector:Prepare to load table demo_ds loaded from s3://fog-rtx-test-east-1//demo_ds.parquet.\n", + "INFO:fog_x.database.polars_connector:Table demo_ds loaded from s3://fog-rtx-test-east-1//demo_ds.parquet.\n" + ] + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "read from exisitng AWS stored dataset" + ], + "metadata": { + "id": "TArD9Frhd_U8" + } + }, + { + "cell_type": "code", + "source": [ + "dataset.get_episode_info()" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 244 + }, + "id": "oLMT8ioUd6Km", + "outputId": "c821cf86-2b3e-4d6a-9b3b-71887e9a8b6f" + }, + "execution_count": 9, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "shape: (4, 44)\n", + "┌───────────┬──────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬───────────┐\n", + "│ episode_i ┆ Finished ┆ feature_g ┆ feature_g ┆ … ┆ robot_sta ┆ feature_r ┆ feature_r ┆ reward_co │\n", + "│ d ┆ --- ┆ ripper_cl ┆ ripper_cl ┆ ┆ te_count ┆ eward_typ ┆ eward_sha ┆ unt │\n", + "│ --- ┆ bool ┆ osedness_ ┆ osedness_ ┆ ┆ --- ┆ e ┆ pe ┆ --- │\n", + "│ i64 ┆ ┆ actio… ┆ actio… ┆ ┆ f64 ┆ --- ┆ --- ┆ f64 │\n", + "│ ┆ ┆ --- ┆ --- ┆ ┆ ┆ str ┆ str ┆ │\n", + "│ ┆ ┆ str ┆ str ┆ ┆ ┆ ┆ ┆ │\n", + "╞═══════════╪══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪═══════════╡\n", + "│ 0 ┆ true ┆ float32 ┆ () ┆ … ┆ 71.0 ┆ float32 ┆ () ┆ 71.0 │\n", + "│ 1 ┆ true ┆ float32 ┆ () ┆ … ┆ 76.0 ┆ float32 ┆ () ┆ 76.0 │\n", + "│ 2 ┆ true ┆ float32 ┆ () ┆ … ┆ 71.0 ┆ float32 ┆ () ┆ 71.0 │\n", + "│ 3 ┆ true ┆ float32 ┆ () ┆ … ┆ 76.0 ┆ float32 ┆ () ┆ 76.0 │\n", + "└───────────┴──────────┴───────────┴───────────┴───┴───────────┴───────────┴───────────┴───────────┘" + ], + "text/html": [ + "
\n", + "shape: (4, 44)
episode_idFinishedfeature_gripper_closedness_action_typefeature_gripper_closedness_action_shapegripper_closedness_action_countfeature_rotation_delta_typefeature_rotation_delta_shaperotation_delta_countfeature_terminate_episode_typefeature_terminate_episode_shapeterminate_episode_countfeature_world_vector_typefeature_world_vector_shapeworld_vector_countfeature_is_first_typefeature_is_first_shapeis_first_countfeature_is_last_typefeature_is_last_shapeis_last_countfeature_is_terminal_typefeature_is_terminal_shapeis_terminal_countfeature_hand_image_typefeature_hand_image_shapehand_image_countfeature_image_typefeature_image_shapeimage_countfeature_image_with_depth_typefeature_image_with_depth_shapeimage_with_depth_countfeature_natural_language_embedding_typefeature_natural_language_embedding_shapenatural_language_embedding_countfeature_natural_language_instruction_typefeature_natural_language_instruction_shapenatural_language_instruction_countfeature_robot_state_typefeature_robot_state_shaperobot_state_countfeature_reward_typefeature_reward_shapereward_count
i64boolstrstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64
0true"float32""()"71.0"float32""(3,)"71.0"float32""()"71.0"float32""(3,)"71.0"bool""()"71.0"bool""()"71.0"bool""()"71.0"uint8""(480, 640, 3)"71.0"uint8""(480, 640, 3)"71.0"float32""(480, 640, 1)"71.0"float32""(512,)"71.0"string""()"71.0"float32""(15,)"71.0"float32""()"71.0
1true"float32""()"76.0"float32""(3,)"76.0"float32""()"76.0"float32""(3,)"76.0"bool""()"76.0"bool""()"76.0"bool""()"76.0"uint8""(480, 640, 3)"76.0"uint8""(480, 640, 3)"76.0"float32""(480, 640, 1)"76.0"float32""(512,)"76.0"string""()"76.0"float32""(15,)"76.0"float32""()"76.0
2true"float32""()"71.0"float32""(3,)"71.0"float32""()"71.0"float32""(3,)"71.0"bool""()"71.0"bool""()"71.0"bool""()"71.0"uint8""(480, 640, 3)"71.0"uint8""(480, 640, 3)"71.0"float32""(480, 640, 1)"71.0"float32""(512,)"71.0"string""()"71.0"float32""(15,)"71.0"float32""()"71.0
3true"float32""()"76.0"float32""(3,)"76.0"float32""()"76.0"float32""(3,)"76.0"bool""()"76.0"bool""()"76.0"bool""()"76.0"uint8""(480, 640, 3)"76.0"uint8""(480, 640, 3)"76.0"float32""(480, 640, 1)"76.0"float32""(512,)"76.0"string""()"76.0"float32""(15,)"76.0"float32""()"76.0
" + ] + }, + "metadata": {}, + "execution_count": 9 + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "add more to the stored dataset" + ], + "metadata": { + "id": "mul7xRzFeDRt" + } + }, + { + "cell_type": "code", + "source": [ + "dataset.load_rtx_episodes(\n", + " name=\"berkeley_autolab_ur5\",\n", + " split=\"train[4:10]\",\n", + ")" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "ivIyifc4f9ih", + "outputId": "daef546e-c378-45a1-87eb-562d0f92c8f0" + }, + "execution_count": 10, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:absl:Load dataset info from gs://gresearch/robotics/berkeley_autolab_ur5/0.1.0\n", + "INFO:absl:Creating a tf.data.Dataset reading 4 files located in folders: gs://gresearch/robotics/berkeley_autolab_ur5/0.1.0.\n", + "INFO:absl:Constructing tf.data.Dataset berkeley_autolab_ur5 for split train[4:10], from gs://gresearch/robotics/berkeley_autolab_ur5/0.1.0\n", + "INFO:fog_x.dataset:{'steps': <_VariantDataset element_spec={'action': {'gripper_closedness_action': TensorSpec(shape=(), dtype=tf.float32, name=None), 'rotation_delta': TensorSpec(shape=(3,), dtype=tf.float32, name=None), 'terminate_episode': TensorSpec(shape=(), dtype=tf.float32, name=None), 'world_vector': TensorSpec(shape=(3,), dtype=tf.float32, name=None)}, 'is_first': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_last': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_terminal': TensorSpec(shape=(), dtype=tf.bool, name=None), 'observation': {'hand_image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image_with_depth': TensorSpec(shape=(480, 640, 1), dtype=tf.float32, name=None), 'natural_language_embedding': TensorSpec(shape=(512,), dtype=tf.float32, name=None), 'natural_language_instruction': TensorSpec(shape=(), dtype=tf.string, name=None), 'robot_state': TensorSpec(shape=(15,), dtype=tf.float32, name=None)}, 'reward': TensorSpec(shape=(), dtype=tf.float32, name=None)}>}\n", + "INFO:fog_x.database.db_manager:Closing the episode with metadata {'Finished': True, 'gripper_closedness_action_count': 76, 'rotation_delta_count': 76, 'terminate_episode_count': 76, 'world_vector_count': 76, 'is_first_count': 76, 'is_last_count': 76, 'is_terminal_count': 76, 'hand_image_count': 76, 'image_count': 76, 'image_with_depth_count': 76, 'natural_language_embedding_count': 76, 'natural_language_instruction_count': 76, 'robot_state_count': 76, 'reward_count': 76}\n", + "INFO:fog_x.dataset:{'steps': <_VariantDataset element_spec={'action': {'gripper_closedness_action': TensorSpec(shape=(), dtype=tf.float32, name=None), 'rotation_delta': TensorSpec(shape=(3,), dtype=tf.float32, name=None), 'terminate_episode': TensorSpec(shape=(), dtype=tf.float32, name=None), 'world_vector': TensorSpec(shape=(3,), dtype=tf.float32, name=None)}, 'is_first': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_last': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_terminal': TensorSpec(shape=(), dtype=tf.bool, name=None), 'observation': {'hand_image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image_with_depth': TensorSpec(shape=(480, 640, 1), dtype=tf.float32, name=None), 'natural_language_embedding': TensorSpec(shape=(512,), dtype=tf.float32, name=None), 'natural_language_instruction': TensorSpec(shape=(), dtype=tf.string, name=None), 'robot_state': TensorSpec(shape=(15,), dtype=tf.float32, name=None)}, 'reward': TensorSpec(shape=(), dtype=tf.float32, name=None)}>}\n", + "INFO:fog_x.database.db_manager:Closing the episode with metadata {'Finished': True, 'gripper_closedness_action_count': 123, 'rotation_delta_count': 123, 'terminate_episode_count': 123, 'world_vector_count': 123, 'is_first_count': 123, 'is_last_count': 123, 'is_terminal_count': 123, 'hand_image_count': 123, 'image_count': 123, 'image_with_depth_count': 123, 'natural_language_embedding_count': 123, 'natural_language_instruction_count': 123, 'robot_state_count': 123, 'reward_count': 123}\n", + "INFO:fog_x.dataset:{'steps': <_VariantDataset element_spec={'action': {'gripper_closedness_action': TensorSpec(shape=(), dtype=tf.float32, name=None), 'rotation_delta': TensorSpec(shape=(3,), dtype=tf.float32, name=None), 'terminate_episode': TensorSpec(shape=(), dtype=tf.float32, name=None), 'world_vector': TensorSpec(shape=(3,), dtype=tf.float32, name=None)}, 'is_first': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_last': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_terminal': TensorSpec(shape=(), dtype=tf.bool, name=None), 'observation': {'hand_image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image_with_depth': TensorSpec(shape=(480, 640, 1), dtype=tf.float32, name=None), 'natural_language_embedding': TensorSpec(shape=(512,), dtype=tf.float32, name=None), 'natural_language_instruction': TensorSpec(shape=(), dtype=tf.string, name=None), 'robot_state': TensorSpec(shape=(15,), dtype=tf.float32, name=None)}, 'reward': TensorSpec(shape=(), dtype=tf.float32, name=None)}>}\n", + "INFO:fog_x.database.db_manager:Closing the episode with metadata {'Finished': True, 'gripper_closedness_action_count': 103, 'rotation_delta_count': 103, 'terminate_episode_count': 103, 'world_vector_count': 103, 'is_first_count': 103, 'is_last_count': 103, 'is_terminal_count': 103, 'hand_image_count': 103, 'image_count': 103, 'image_with_depth_count': 103, 'natural_language_embedding_count': 103, 'natural_language_instruction_count': 103, 'robot_state_count': 103, 'reward_count': 103}\n", + "INFO:fog_x.dataset:{'steps': <_VariantDataset element_spec={'action': {'gripper_closedness_action': TensorSpec(shape=(), dtype=tf.float32, name=None), 'rotation_delta': TensorSpec(shape=(3,), dtype=tf.float32, name=None), 'terminate_episode': TensorSpec(shape=(), dtype=tf.float32, name=None), 'world_vector': TensorSpec(shape=(3,), dtype=tf.float32, name=None)}, 'is_first': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_last': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_terminal': TensorSpec(shape=(), dtype=tf.bool, name=None), 'observation': {'hand_image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image_with_depth': TensorSpec(shape=(480, 640, 1), dtype=tf.float32, name=None), 'natural_language_embedding': TensorSpec(shape=(512,), dtype=tf.float32, name=None), 'natural_language_instruction': TensorSpec(shape=(), dtype=tf.string, name=None), 'robot_state': TensorSpec(shape=(15,), dtype=tf.float32, name=None)}, 'reward': TensorSpec(shape=(), dtype=tf.float32, name=None)}>}\n", + "INFO:fog_x.database.db_manager:Closing the episode with metadata {'Finished': True, 'gripper_closedness_action_count': 110, 'rotation_delta_count': 110, 'terminate_episode_count': 110, 'world_vector_count': 110, 'is_first_count': 110, 'is_last_count': 110, 'is_terminal_count': 110, 'hand_image_count': 110, 'image_count': 110, 'image_with_depth_count': 110, 'natural_language_embedding_count': 110, 'natural_language_instruction_count': 110, 'robot_state_count': 110, 'reward_count': 110}\n", + "INFO:fog_x.dataset:{'steps': <_VariantDataset element_spec={'action': {'gripper_closedness_action': TensorSpec(shape=(), dtype=tf.float32, name=None), 'rotation_delta': TensorSpec(shape=(3,), dtype=tf.float32, name=None), 'terminate_episode': TensorSpec(shape=(), dtype=tf.float32, name=None), 'world_vector': TensorSpec(shape=(3,), dtype=tf.float32, name=None)}, 'is_first': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_last': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_terminal': TensorSpec(shape=(), dtype=tf.bool, name=None), 'observation': {'hand_image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image_with_depth': TensorSpec(shape=(480, 640, 1), dtype=tf.float32, name=None), 'natural_language_embedding': TensorSpec(shape=(512,), dtype=tf.float32, name=None), 'natural_language_instruction': TensorSpec(shape=(), dtype=tf.string, name=None), 'robot_state': TensorSpec(shape=(15,), dtype=tf.float32, name=None)}, 'reward': TensorSpec(shape=(), dtype=tf.float32, name=None)}>}\n", + "INFO:fog_x.database.db_manager:Closing the episode with metadata {'Finished': True, 'gripper_closedness_action_count': 118, 'rotation_delta_count': 118, 'terminate_episode_count': 118, 'world_vector_count': 118, 'is_first_count': 118, 'is_last_count': 118, 'is_terminal_count': 118, 'hand_image_count': 118, 'image_count': 118, 'image_with_depth_count': 118, 'natural_language_embedding_count': 118, 'natural_language_instruction_count': 118, 'robot_state_count': 118, 'reward_count': 118}\n", + "INFO:fog_x.dataset:{'steps': <_VariantDataset element_spec={'action': {'gripper_closedness_action': TensorSpec(shape=(), dtype=tf.float32, name=None), 'rotation_delta': TensorSpec(shape=(3,), dtype=tf.float32, name=None), 'terminate_episode': TensorSpec(shape=(), dtype=tf.float32, name=None), 'world_vector': TensorSpec(shape=(3,), dtype=tf.float32, name=None)}, 'is_first': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_last': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_terminal': TensorSpec(shape=(), dtype=tf.bool, name=None), 'observation': {'hand_image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'image_with_depth': TensorSpec(shape=(480, 640, 1), dtype=tf.float32, name=None), 'natural_language_embedding': TensorSpec(shape=(512,), dtype=tf.float32, name=None), 'natural_language_instruction': TensorSpec(shape=(), dtype=tf.string, name=None), 'robot_state': TensorSpec(shape=(15,), dtype=tf.float32, name=None)}, 'reward': TensorSpec(shape=(), dtype=tf.float32, name=None)}>}\n", + "INFO:fog_x.database.db_manager:Closing the episode with metadata {'Finished': True, 'gripper_closedness_action_count': 84, 'rotation_delta_count': 84, 'terminate_episode_count': 84, 'world_vector_count': 84, 'is_first_count': 84, 'is_last_count': 84, 'is_terminal_count': 84, 'hand_image_count': 84, 'image_count': 84, 'image_with_depth_count': 84, 'natural_language_embedding_count': 84, 'natural_language_instruction_count': 84, 'robot_state_count': 84, 'reward_count': 84}\n" + ] + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "The data is automatically uploaded to the cloud!\n", + "We can create a separate reader by" + ], + "metadata": { + "id": "mUneci9XeHsE" + } + }, + { + "cell_type": "code", + "source": [ + "dataset2 = fog_x.dataset.Dataset(\n", + " name=\"demo_ds\",\n", + " path='s3://fog-rtx-test-east-1',\n", + ")" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "cQHIKeNAeSrY", + "outputId": "27379fd0-3a51-401c-e2b3-099bb7356ba6" + }, + "execution_count": 11, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:fog_x.database.polars_connector:Prepare to load table demo_ds loaded from s3://fog-rtx-test-east-1//demo_ds.parquet.\n", + "INFO:fog_x.database.polars_connector:Table demo_ds loaded from s3://fog-rtx-test-east-1//demo_ds.parquet.\n" + ] + } + ] + }, + { + "cell_type": "code", + "source": [ + "# metadata\n", + "trajectory_metadata = dataset2.get_episode_info()\n", + "trajectory_metadata" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 432 + }, + "id": "E4slMiSzf-se", + "outputId": "9053cc4e-c8e2-4a16-c799-84a08712edd9" + }, + "execution_count": 12, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "shape: (10, 44)\n", + "┌───────────┬──────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬───────────┐\n", + "│ episode_i ┆ Finished ┆ feature_g ┆ feature_g ┆ … ┆ robot_sta ┆ feature_r ┆ feature_r ┆ reward_co │\n", + "│ d ┆ --- ┆ ripper_cl ┆ ripper_cl ┆ ┆ te_count ┆ eward_typ ┆ eward_sha ┆ unt │\n", + "│ --- ┆ bool ┆ osedness_ ┆ osedness_ ┆ ┆ --- ┆ e ┆ pe ┆ --- │\n", + "│ i64 ┆ ┆ actio… ┆ actio… ┆ ┆ f64 ┆ --- ┆ --- ┆ f64 │\n", + "│ ┆ ┆ --- ┆ --- ┆ ┆ ┆ str ┆ str ┆ │\n", + "│ ┆ ┆ str ┆ str ┆ ┆ ┆ ┆ ┆ │\n", + "╞═══════════╪══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪═══════════╡\n", + "│ 0 ┆ true ┆ float32 ┆ () ┆ … ┆ 71.0 ┆ float32 ┆ () ┆ 71.0 │\n", + "│ 1 ┆ true ┆ float32 ┆ () ┆ … ┆ 76.0 ┆ float32 ┆ () ┆ 76.0 │\n", + "│ 2 ┆ true ┆ float32 ┆ () ┆ … ┆ 71.0 ┆ float32 ┆ () ┆ 71.0 │\n", + "│ 3 ┆ true ┆ float32 ┆ () ┆ … ┆ 76.0 ┆ float32 ┆ () ┆ 76.0 │\n", + "│ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │\n", + "│ 6 ┆ true ┆ float32 ┆ () ┆ … ┆ 110.0 ┆ float32 ┆ () ┆ 110.0 │\n", + "│ 7 ┆ true ┆ float32 ┆ () ┆ … ┆ 118.0 ┆ float32 ┆ () ┆ 118.0 │\n", + "│ 8 ┆ true ┆ float32 ┆ () ┆ … ┆ 84.0 ┆ float32 ┆ () ┆ 84.0 │\n", + "│ 9 ┆ true ┆ float32 ┆ () ┆ … ┆ 97.0 ┆ float32 ┆ () ┆ 97.0 │\n", + "└───────────┴──────────┴───────────┴───────────┴───┴───────────┴───────────┴───────────┴───────────┘" + ], + "text/html": [ + "
\n", + "shape: (10, 44)
episode_idFinishedfeature_gripper_closedness_action_typefeature_gripper_closedness_action_shapegripper_closedness_action_countfeature_rotation_delta_typefeature_rotation_delta_shaperotation_delta_countfeature_terminate_episode_typefeature_terminate_episode_shapeterminate_episode_countfeature_world_vector_typefeature_world_vector_shapeworld_vector_countfeature_is_first_typefeature_is_first_shapeis_first_countfeature_is_last_typefeature_is_last_shapeis_last_countfeature_is_terminal_typefeature_is_terminal_shapeis_terminal_countfeature_hand_image_typefeature_hand_image_shapehand_image_countfeature_image_typefeature_image_shapeimage_countfeature_image_with_depth_typefeature_image_with_depth_shapeimage_with_depth_countfeature_natural_language_embedding_typefeature_natural_language_embedding_shapenatural_language_embedding_countfeature_natural_language_instruction_typefeature_natural_language_instruction_shapenatural_language_instruction_countfeature_robot_state_typefeature_robot_state_shaperobot_state_countfeature_reward_typefeature_reward_shapereward_count
i64boolstrstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64strstrf64
0true"float32""()"71.0"float32""(3,)"71.0"float32""()"71.0"float32""(3,)"71.0"bool""()"71.0"bool""()"71.0"bool""()"71.0"uint8""(480, 640, 3)"71.0"uint8""(480, 640, 3)"71.0"float32""(480, 640, 1)"71.0"float32""(512,)"71.0"string""()"71.0"float32""(15,)"71.0"float32""()"71.0
1true"float32""()"76.0"float32""(3,)"76.0"float32""()"76.0"float32""(3,)"76.0"bool""()"76.0"bool""()"76.0"bool""()"76.0"uint8""(480, 640, 3)"76.0"uint8""(480, 640, 3)"76.0"float32""(480, 640, 1)"76.0"float32""(512,)"76.0"string""()"76.0"float32""(15,)"76.0"float32""()"76.0
2true"float32""()"71.0"float32""(3,)"71.0"float32""()"71.0"float32""(3,)"71.0"bool""()"71.0"bool""()"71.0"bool""()"71.0"uint8""(480, 640, 3)"71.0"uint8""(480, 640, 3)"71.0"float32""(480, 640, 1)"71.0"float32""(512,)"71.0"string""()"71.0"float32""(15,)"71.0"float32""()"71.0
3true"float32""()"76.0"float32""(3,)"76.0"float32""()"76.0"float32""(3,)"76.0"bool""()"76.0"bool""()"76.0"bool""()"76.0"uint8""(480, 640, 3)"76.0"uint8""(480, 640, 3)"76.0"float32""(480, 640, 1)"76.0"float32""(512,)"76.0"string""()"76.0"float32""(15,)"76.0"float32""()"76.0
4true"float32""()"123.0"float32""(3,)"123.0"float32""()"123.0"float32""(3,)"123.0"bool""()"123.0"bool""()"123.0"bool""()"123.0"uint8""(480, 640, 3)"123.0"uint8""(480, 640, 3)"123.0"float32""(480, 640, 1)"123.0"float32""(512,)"123.0"string""()"123.0"float32""(15,)"123.0"float32""()"123.0
5true"float32""()"103.0"float32""(3,)"103.0"float32""()"103.0"float32""(3,)"103.0"bool""()"103.0"bool""()"103.0"bool""()"103.0"uint8""(480, 640, 3)"103.0"uint8""(480, 640, 3)"103.0"float32""(480, 640, 1)"103.0"float32""(512,)"103.0"string""()"103.0"float32""(15,)"103.0"float32""()"103.0
6true"float32""()"110.0"float32""(3,)"110.0"float32""()"110.0"float32""(3,)"110.0"bool""()"110.0"bool""()"110.0"bool""()"110.0"uint8""(480, 640, 3)"110.0"uint8""(480, 640, 3)"110.0"float32""(480, 640, 1)"110.0"float32""(512,)"110.0"string""()"110.0"float32""(15,)"110.0"float32""()"110.0
7true"float32""()"118.0"float32""(3,)"118.0"float32""()"118.0"float32""(3,)"118.0"bool""()"118.0"bool""()"118.0"bool""()"118.0"uint8""(480, 640, 3)"118.0"uint8""(480, 640, 3)"118.0"float32""(480, 640, 1)"118.0"float32""(512,)"118.0"string""()"118.0"float32""(15,)"118.0"float32""()"118.0
8true"float32""()"84.0"float32""(3,)"84.0"float32""()"84.0"float32""(3,)"84.0"bool""()"84.0"bool""()"84.0"bool""()"84.0"uint8""(480, 640, 3)"84.0"uint8""(480, 640, 3)"84.0"float32""(480, 640, 1)"84.0"float32""(512,)"84.0"string""()"84.0"float32""(15,)"84.0"float32""()"84.0
9true"float32""()"97.0"float32""(3,)"97.0"float32""()"97.0"float32""(3,)"97.0"bool""()"97.0"bool""()"97.0"bool""()"97.0"uint8""(480, 640, 3)"97.0"uint8""(480, 640, 3)"97.0"float32""(480, 640, 1)"97.0"float32""(512,)"97.0"string""()"97.0"float32""(15,)"97.0"float32""()"97.0
" + ] + }, + "metadata": {}, + "execution_count": 12 + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "# Google Cloud Platform" + ], + "metadata": { + "id": "cB7QVbp6i-Mx" + } + }, + { + "cell_type": "markdown", + "source": [ + "Register google cloud credentials\n", + "\n", + "Alternative in non-colab environment, run following command instead:\n", + "```\n", + "gcloud auth application-default login --quiet --no-launch-browser\n", + "```\n" + ], + "metadata": { + "id": "8MIV3MZUjNta" + } + }, + { + "cell_type": "code", + "source": [ + "from google.colab import auth\n", + "PROJECT_ID = \"canvas-rampart-342500\"\n", + "auth.authenticate_user(project_id=PROJECT_ID)" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "ryd_To6LL3nX", + "outputId": "d571dfd9-6ffc-4847-e5d2-a455359eefd6" + }, + "execution_count": 13, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:google.colab.auth:Failure refreshing credentials: (\"Failed to retrieve http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/?recursive=true from the Google Compute Engine metadata service. Status: 404 Response:\\nb''\", )\n", + "INFO:google.colab.auth:Failure refreshing credentials: (\"Failed to retrieve http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/?recursive=true from the Google Compute Engine metadata service. Status: 404 Response:\\nb''\", )\n" + ] + } + ] + }, + { + "cell_type": "code", + "source": [ + "! gcloud storage buckets create gs://fog_rtx_test --location=us-east1" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "fYM3ExvGL3z7", + "outputId": "721e97ca-0a90-4e85-af14-3503eb62dc06" + }, + "execution_count": 14, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "Creating gs://fog_rtx_test/...\n", + "\u001b[1;31mERROR:\u001b[0m (gcloud.storage.buckets.create) HTTPError 409: Your previous request to create the named bucket succeeded and you already own it.\n" + ] + } + ] + }, + { + "cell_type": "code", + "source": [ + "dataset = fog_x.dataset.Dataset(\n", + " name=\"demo_ds\",\n", + " path='gs://fog_rtx_test/',\n", + ")" + ], + "metadata": { + "id": "pd94S4VlL32u" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Known issues\n", + "\n", + "1. `export` as rlds format to the cloud directly does not work yet for S3 (known issue for tensorflow Gfile)\n", + "2. (will fix) automatically check the existence" + ], + "metadata": { + "id": "P2RCUMs6knNc" + } + }, + { + "cell_type": "code", + "source": [], + "metadata": { + "id": "QKS5jK-Qk9fN" + }, + "execution_count": null, + "outputs": [] + } + ] +} \ No newline at end of file