From c2e7e222a6b9789bdd515e3f9941cb33948f48dd Mon Sep 17 00:00:00 2001 From: "houhan@gmail.com" Date: Fri, 20 Dec 2024 21:51:31 +0000 Subject: [PATCH] add querying "dynamic-foraging-task" from stimulus_epochs --- code/util/fetch_data_docDB.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/code/util/fetch_data_docDB.py b/code/util/fetch_data_docDB.py index 811f64e..d063ed4 100644 --- a/code/util/fetch_data_docDB.py +++ b/code/util/fetch_data_docDB.py @@ -114,15 +114,19 @@ def fetch_dynamic_foraging_data(client): # let's directly query the software name logger.warning("fetching 'dynamic foraging' in software name...") software_name_results = client.retrieve_docdb_records( - filter_query={"session.data_streams.software.name": "dynamic-foraging-task", - "name": {"$not": {"$regex": ".*processed.*"}}, # only raw data - }, - paginate_batch_size=500 - ) + filter_query={ + "$or": [ + {"session.data_streams.software.name": "dynamic-foraging-task"}, + {"session.stimulus_epochs.software.name": "dynamic-foraging-task"}, + ], + "name": {"$not": {"$regex": ".*processed.*"}}, # only raw data + }, + paginate_batch_size=500, + ) logger.warning(f"found {len(software_name_results)} results") - # there are more from the past that didn't specify modality correctly. - # until this is fixed, need to guess by asset name + # there are more from the past that didn't specify modality correctly. + # until this is fixed, need to guess by asset name logger.warning("fetching FIP records by name...") name_FIP_results = client.retrieve_docdb_records( filter_query={"name": {"$regex": "^FIP.*"}}, @@ -134,32 +138,32 @@ def fetch_dynamic_foraging_data(client): unique_results_by_id = {**{ r['_id']: r for r in software_name_results }, **{ r['_id']: r for r in name_FIP_results }} results = list(unique_results_by_id.values()) logger.warning(f"found {len(results)} unique results") - + # make a dataframe records_df = pd.DataFrame.from_records([map_record_to_dict(d) for d in results ]) - + # PREVIOUSLY, there are some sessions uploaded twice in two different locations. # let's filter out the ones in aind-ophys-data, a deprecated location - # this is no longer a problem-- so I'm taking off the drop but keeping the dupe check on. + # this is no longer a problem-- so I'm taking off the drop but keeping the dupe check on. dup_df = records_df[records_df.duplicated('session_name',keep=False)] dup_df = dup_df[dup_df.session_loc.str.contains("aind-ophys-data")] if len(dup_df): logger.warning('duplicated entries found, please fix') # records_df = records_df.drop(dup_df.index.values) - + # let's get processed results too logger.warning("fetching processed results...") processed_results = client.retrieve_docdb_records(filter_query={ "name": {"$regex": "^behavior_.*processed_.*"} }) - + # converting to a dictionary processed_results_by_name = { r['name']: r for r in processed_results } - + # adding two columns to our master dataframe - result name and result s3 location records_df['processed_session_name'] = records_df.session_name.apply(lambda x: find_result(x, processed_results_by_name).get('name')) records_df['processed_session_loc'] = records_df.session_name.apply(lambda x: find_result(x, processed_results_by_name).get('location')) - # get external_links, strip it down to the string + # get external_links, strip it down to the string co_data_asset_id_processed = records_df.session_name.apply(lambda x: find_result(x, processed_results_by_name).get('external_links')) records_df['processed_CO_dataID'] = strip_dict_for_id(co_data_asset_id_processed) records_df['CO_dataID'] = strip_dict_for_id(records_df['CO_dataID'])