diff --git a/reproschema/reproschema2redcap.py b/reproschema/reproschema2redcap.py index 05350fc..3d03cf3 100644 --- a/reproschema/reproschema2redcap.py +++ b/reproschema/reproschema2redcap.py @@ -2,6 +2,7 @@ import json import csv from pathlib import Path +import requests def read_json_file(file_path): @@ -13,38 +14,69 @@ def read_json_file(file_path): return None -def find_Ftype_and_colH(item_json, row_data): - """ - Find the field type and column header based on the given item_json. +def fetch_choices_from_url(url): + try: + response = requests.get(url) + response.raise_for_status() + data = response.json() + + if isinstance(data, list): + choices = [ + list(item.values())[0] + for item in data + if isinstance(item, dict) and item + ] + elif isinstance(data, dict): + choices = list(data.values()) + else: + return "" - Args: - item_json (dict): The JSON object containing the item information. - row_data (dict): The row data dictionary. + # Format choices as 'code, description' + formatted_choices = [f"{idx}, {choice}" for idx, choice in enumerate(choices)] + return " | ".join(formatted_choices) + except Exception as e: + print(f"Error fetching choices from {url}: {e}") + return "" - Returns: - dict: The updated row data dictionary with field type and column header. - """ +def find_Ftype_and_colH(item_json, row_data): # Extract the input type from the item_json f_type = item_json.get("ui", {}).get("inputType", "") col_h = "" - # Check the input type and update the field type and column header accordingly - if f_type == "integer": + if f_type in ["text", "textarea", "email"]: + f_type = "text" + elif f_type == "integer": + f_type = "text" + col_h = "integer" + elif f_type in ["number", "float"]: f_type = "text" col_h = "number" - elif f_type == "select": - f_type = "dropdown" elif f_type == "date": f_type = "text" - col_h = "ddate_mdy" + col_h = "date_mdy" + elif f_type == "select": + multiple_choice = item_json.get("responseOptions", {}).get( + "multipleChoice", False + ) + f_type = "checkbox" if multiple_choice else "dropdown" + elif f_type.startswith("select"): + # Adjusting for selectCountry, selectLanguage, selectState types + f_type = "radio" + choices_url = item_json.get("responseOptions", {}).get("choices", "") + if choices_url and isinstance(choices_url, str): + choices_data = fetch_choices_from_url(choices_url) + if choices_data: + row_data["choices"] = choices_data + elif f_type.startswith(("audio", "video", "image", "document")): + f_type = "file" + else: + f_type = "text" - # Update the row_data dictionary with the field type - row_data["field_type"] = f_type + row_data["field_type"] = f_type.lower() - # Update the row_data dictionary with the column header if available if col_h: - row_data["val_type_OR_slider"] = col_h + row_data["val_type_OR_slider"] = col_h.lower() return row_data @@ -60,43 +92,43 @@ def process_item(item_json, activity_name): Returns: dict: A dictionary containing the extracted information. """ - row_data = {} + row_data = { + "val_min": "", + "val_max": "", + "choices": "", + "required": "", + "field_notes": "", + "var_name": "", + "activity": activity_name.lower(), + "field_label": "", + } # Extract min and max values from response options, if available response_options = item_json.get("responseOptions", {}) row_data["val_min"] = response_options.get("schema:minValue", "") row_data["val_max"] = response_options.get("schema:maxValue", "") + # 'choices' processing is now handled in 'find_Ftype_and_colH' if it's a URL choices = response_options.get("choices") - if choices: + if choices and not isinstance(choices, str): if isinstance(choices, list): - # Extract choice values and names, and join them with a '|' item_choices = [ f"{ch.get('schema:value', ch.get('value', ''))}, {ch.get('schema:name', ch.get('name', ''))}" for ch in choices ] row_data["choices"] = " | ".join(item_choices) - elif isinstance(choices, str): - row_data["choices"] = choices - else: - row_data["choices"] = "" row_data["required"] = response_options.get("requiredValue", "") - row_data["field_notes"] = item_json.get("skos:altLabel", "") - row_data["var_name"] = item_json.get("@id", "") - row_data["activity"] = activity_name question = item_json.get("question") if isinstance(question, dict): row_data["field_label"] = question.get("en", "") elif isinstance(question, str): row_data["field_label"] = question - else: - row_data["field_label"] = "" - # Call helper function to find Ftype and colH values and update row_data + # Call helper function to find field type and validation type (if any) and update row_data row_data = find_Ftype_and_colH(item_json, row_data) return row_data @@ -140,28 +172,54 @@ def get_csv_data(dir_path): def write_to_csv(csv_data, output_csv_filename): - # Define the headers for the CSV file as per the JavaScript file + # REDCap-specific headers headers = [ - "var_name", - "activity", - "section", - "field_type", - "field_label", - "choices", - "field_notes", - "val_type_OR_slider", - "val_min", - "val_max", - "identifier", - "visibility", - "required", + "Variable / Field Name", + "Form Name", + "Section Header", + "Field Type", + "Field Label", + "Choices, Calculations, OR Slider Labels", + "Field Note", + "Text Validation Type OR Show Slider Number", + "Text Validation Min", + "Text Validation Max", + "Identifier?", + "Branching Logic (Show field only if...)", + "Required Field?", + "Custom Alignment", + "Question Number (surveys only)", + "Matrix Group Name", + "Matrix Ranking?", + "Field Annotation", ] # Writing to the CSV file with open(output_csv_filename, "w", newline="", encoding="utf-8") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=headers) - writer.writeheader() + + # Map the data from your format to REDCap format + redcap_data = [] for row in csv_data: + redcap_row = { + "Variable / Field Name": row["var_name"], + "Form Name": row["activity"], + "Section Header": "", # Update this if your data includes section headers + "Field Type": row["field_type"], + "Field Label": row["field_label"], + "Choices, Calculations, OR Slider Labels": row["choices"], + "Field Note": row["field_notes"], + "Text Validation Type OR Show Slider Number": row.get( + "val_type_OR_slider", "" + ), + "Text Validation Min": row["val_min"], + "Text Validation Max": row["val_max"], + # Add other fields as necessary based on your data + } + redcap_data.append(redcap_row) + + writer.writeheader() + for row in redcap_data: writer.writerow(row) print("The CSV file was written successfully")