diff --git a/pipelines/rj_sme/dump_url_educacao_basica/flows.py b/pipelines/rj_sme/dump_url_educacao_basica/flows.py index 3e06f1039..a8dc7b8fe 100644 --- a/pipelines/rj_sme/dump_url_educacao_basica/flows.py +++ b/pipelines/rj_sme/dump_url_educacao_basica/flows.py @@ -26,11 +26,11 @@ ], ) -sme_gsheets_default_parameters = { - "dataset_id": "educacao_basica_alocacao", -} -sme_gsheets_flow = set_default_parameters( - sme_gsheets_flow, default_parameters=sme_gsheets_default_parameters -) +# sme_gsheets_default_parameters = { +# "dataset_id": "educacao_basica_alocacao", +# } +# sme_gsheets_flow = set_default_parameters( +# sme_gsheets_flow, default_parameters=sme_gsheets_default_parameters +# ) sme_gsheets_flow.schedule = gsheets_year_update_schedule diff --git a/pipelines/rj_sme/dump_url_educacao_basica/schedules.py b/pipelines/rj_sme/dump_url_educacao_basica/schedules.py index ab508bbb8..ff43bafa7 100644 --- a/pipelines/rj_sme/dump_url_educacao_basica/schedules.py +++ b/pipelines/rj_sme/dump_url_educacao_basica/schedules.py @@ -25,12 +25,93 @@ "url_type": "google_drive", "materialize_after_dump": True, "dataset_id": "educacao_basica_alocacao", - } + }, + "bimestral_2023": { + "dump_mode": "overwrite", + "url": "https://drive.google.com/file/d/1bC-I6mT9SdRVDDL583WpeK8WOJMuIhfz/view?usp=drive_link", + "url_type": "google_drive", + "materialize_after_dump": True, + "dataset_id": "educacao_basica_avaliacao", + }, + "bimestral_2022": { + "dump_mode": "overwrite", + "url": "https://drive.google.com/file/d/19PFXJKvaOrbexnt_jA4otE-LnMfHUH0H/view?usp=drive_link", + "url_type": "google_drive", + "materialize_after_dump": True, + "dataset_id": "educacao_basica_avaliacao", + "encoding": "latin-1", + "on_bad_lines": "skip", + "separator": ";", + }, + "bimestral_2021": { + "dump_mode": "overwrite", + "url": "https://drive.google.com/file/d/1k-taU8bMEYJ2U5EHvrNWQZnzN2Ht3uso/view?usp=drive_link", + "url_type": "google_drive", + "materialize_after_dump": True, + "dataset_id": "educacao_basica_avaliacao", + "encoding": "latin-1", + }, + "bimestral_2019": { + "dump_mode": "overwrite", + "url": "https://drive.google.com/file/d/1Q_drlgajGOpSsNlqw1cV2pRJ30Oh47MJ/view?usp=drive_link", + "url_type": "google_drive", + "materialize_after_dump": True, + "dataset_id": "educacao_basica_avaliacao", + }, + "bimestral_2018": { + "dump_mode": "overwrite", + "url": "https://drive.google.com/file/d/1b7wyFsX6T4W6U_VWIjPmJZ4HI9btaLah/view?usp=drive_link", + "url_type": "google_drive", + "materialize_after_dump": True, + "dataset_id": "educacao_basica_avaliacao", + }, + "bimestral_2017": { + "dump_mode": "overwrite", + "url": "https://drive.google.com/file/d/1kclQeNuzDCy0Npny1ZZLPjqiPMScw_1P/view?usp=drive_link", + "url_type": "google_drive", + "materialize_after_dump": True, + "dataset_id": "educacao_basica_avaliacao", + }, + "bimestral_2016": { + "dump_mode": "overwrite", + "url": "https://drive.google.com/file/d/1QH9VsphqPvFwUfE7FgQYI6YJ4TJFTptv/view?usp=drive_link", + "url_type": "google_drive", + "materialize_after_dump": True, + "dataset_id": "educacao_basica_avaliacao", + }, + "bimestral_2015": { + "dump_mode": "overwrite", + "url": "https://drive.google.com/file/d/1VKDnvgOzrEdT5LkNYBDE_ayVvKsj5jR0/view?usp=drive_link", + "url_type": "google_drive", + "materialize_after_dump": True, + "dataset_id": "educacao_basica_avaliacao", + }, + "bimestral_2014": { + "dump_mode": "overwrite", + "url": "https://drive.google.com/file/d/18pJonyKwV210dpXr_B2M0p708jYYGwKz/view?usp=drive_link", + "url_type": "google_drive", + "materialize_after_dump": True, + "dataset_id": "educacao_basica_avaliacao", + }, + "bimestral_2013": { + "dump_mode": "overwrite", + "url": "https://drive.google.com/file/d/1rSi-UgB3qZDLh8U3geKRkMgSdmxddO5v/view?usp=drive_link", + "url_type": "google_drive", + "materialize_after_dump": True, + "dataset_id": "educacao_basica_avaliacao", + }, + "bimestral_2012": { + "dump_mode": "overwrite", + "url": "https://drive.google.com/file/d/1scfnos9iER86QVMx7Y_qPM1SKVv0MUED/view?usp=drive_link", + "url_type": "google_drive", + "materialize_after_dump": True, + "dataset_id": "educacao_basica_avaliacao", + }, } gsheets_clocks = generate_dump_url_schedules( interval=timedelta(days=365), - start_date=datetime(2022, 11, 4, 20, 0, tzinfo=pytz.timezone("America/Sao_Paulo")), + start_date=datetime(2024, 3, 22, 12, 0, tzinfo=pytz.timezone("America/Sao_Paulo")), labels=[ constants.RJ_SME_AGENT_LABEL.value, ], diff --git a/pipelines/utils/dump_url/flows.py b/pipelines/utils/dump_url/flows.py index 21fa641e8..66b759a34 100644 --- a/pipelines/utils/dump_url/flows.py +++ b/pipelines/utils/dump_url/flows.py @@ -46,6 +46,9 @@ # Table parameters partition_columns = Parameter("partition_columns", required=False, default="") + encoding = Parameter("encoding", required=False, default="utf-8") + on_bad_lines = Parameter("on_bad_lines", required=False, default="error") + separator = Parameter("separator", required=False, default=",") # Materialization parameters materialize_after_dump = Parameter( @@ -119,6 +122,9 @@ save_path=DUMP_DATA_PATH, build_json_dataframe=build_json_dataframe, dataframe_key_column=dataframe_key_column, + encoding=encoding, + on_bad_lines=on_bad_lines, + separator=separator, ) DUMP_CHUNKS_TASK.set_upstream(DOWNLOAD_URL_TASK) diff --git a/pipelines/utils/dump_url/tasks.py b/pipelines/utils/dump_url/tasks.py index 91f9f75ac..d5b6d44e1 100644 --- a/pipelines/utils/dump_url/tasks.py +++ b/pipelines/utils/dump_url/tasks.py @@ -151,12 +151,23 @@ def dump_files( chunksize: int = 10**6, build_json_dataframe: bool = False, dataframe_key_column: str = None, + encoding: str = "utf-8", + on_bad_lines: str = "error", + separator: str = ",", ) -> None: """ - Dump files according to chunk size + Dump files according to chunk size and read mode """ event_id = datetime.now().strftime("%Y%m%d-%H%M%S") - for idx, chunk in enumerate(pd.read_csv(Path(file_path), chunksize=chunksize)): + for idx, chunk in enumerate( + pd.read_csv( + Path(file_path), + chunksize=chunksize, + encoding=encoding, + on_bad_lines=on_bad_lines, + sep=separator, + ) + ): log(f"Dumping batch {idx} with size {chunksize}") handle_dataframe_chunk( dataframe=chunk, diff --git a/pipelines/utils/dump_url/utils.py b/pipelines/utils/dump_url/utils.py index 4bfe0fcc8..6397c65ce 100644 --- a/pipelines/utils/dump_url/utils.py +++ b/pipelines/utils/dump_url/utils.py @@ -135,6 +135,12 @@ def generate_dump_url_schedules( # pylint: disable=too-many-arguments,too-many- parameter_defaults["materialize_to_datario"] = parameters[ "materialize_to_datario" ] + if "encoding" in parameters: + parameter_defaults["encoding"] = parameters["encoding"] + if "on_bad_lines" in parameters: + parameter_defaults["on_bad_lines"] = parameters["on_bad_lines"] + if "separator" in parameters: + parameter_defaults["separator"] = parameters["separator"] # if "dbt_model_secret_parameters" in parameters: # parameter_defaults["dbt_model_secret_parameters"] = parameters[ # "dbt_model_secret_parameters"