Skip to content

Commit

Permalink
Merge branch 'master' into staging/sme_frequencia
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Mar 22, 2024
2 parents 8eaceb0 + 99300bc commit d0a42ed
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 10 deletions.
12 changes: 6 additions & 6 deletions pipelines/rj_sme/dump_url_educacao_basica/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
],
)

sme_gsheets_default_parameters = {
"dataset_id": "educacao_basica_alocacao",
}
sme_gsheets_flow = set_default_parameters(
sme_gsheets_flow, default_parameters=sme_gsheets_default_parameters
)
# sme_gsheets_default_parameters = {
# "dataset_id": "educacao_basica_alocacao",
# }
# sme_gsheets_flow = set_default_parameters(
# sme_gsheets_flow, default_parameters=sme_gsheets_default_parameters
# )

sme_gsheets_flow.schedule = gsheets_year_update_schedule
85 changes: 83 additions & 2 deletions pipelines/rj_sme/dump_url_educacao_basica/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,93 @@
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_alocacao",
}
},
"bimestral_2023": {
"dump_mode": "overwrite",
"url": "https://drive.google.com/file/d/1bC-I6mT9SdRVDDL583WpeK8WOJMuIhfz/view?usp=drive_link",
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_avaliacao",
},
"bimestral_2022": {
"dump_mode": "overwrite",
"url": "https://drive.google.com/file/d/19PFXJKvaOrbexnt_jA4otE-LnMfHUH0H/view?usp=drive_link",
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_avaliacao",
"encoding": "latin-1",
"on_bad_lines": "skip",
"separator": ";",
},
"bimestral_2021": {
"dump_mode": "overwrite",
"url": "https://drive.google.com/file/d/1k-taU8bMEYJ2U5EHvrNWQZnzN2Ht3uso/view?usp=drive_link",
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_avaliacao",
"encoding": "latin-1",
},
"bimestral_2019": {
"dump_mode": "overwrite",
"url": "https://drive.google.com/file/d/1Q_drlgajGOpSsNlqw1cV2pRJ30Oh47MJ/view?usp=drive_link",
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_avaliacao",
},
"bimestral_2018": {
"dump_mode": "overwrite",
"url": "https://drive.google.com/file/d/1b7wyFsX6T4W6U_VWIjPmJZ4HI9btaLah/view?usp=drive_link",
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_avaliacao",
},
"bimestral_2017": {
"dump_mode": "overwrite",
"url": "https://drive.google.com/file/d/1kclQeNuzDCy0Npny1ZZLPjqiPMScw_1P/view?usp=drive_link",
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_avaliacao",
},
"bimestral_2016": {
"dump_mode": "overwrite",
"url": "https://drive.google.com/file/d/1QH9VsphqPvFwUfE7FgQYI6YJ4TJFTptv/view?usp=drive_link",
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_avaliacao",
},
"bimestral_2015": {
"dump_mode": "overwrite",
"url": "https://drive.google.com/file/d/1VKDnvgOzrEdT5LkNYBDE_ayVvKsj5jR0/view?usp=drive_link",
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_avaliacao",
},
"bimestral_2014": {
"dump_mode": "overwrite",
"url": "https://drive.google.com/file/d/18pJonyKwV210dpXr_B2M0p708jYYGwKz/view?usp=drive_link",
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_avaliacao",
},
"bimestral_2013": {
"dump_mode": "overwrite",
"url": "https://drive.google.com/file/d/1rSi-UgB3qZDLh8U3geKRkMgSdmxddO5v/view?usp=drive_link",
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_avaliacao",
},
"bimestral_2012": {
"dump_mode": "overwrite",
"url": "https://drive.google.com/file/d/1scfnos9iER86QVMx7Y_qPM1SKVv0MUED/view?usp=drive_link",
"url_type": "google_drive",
"materialize_after_dump": True,
"dataset_id": "educacao_basica_avaliacao",
},
}

gsheets_clocks = generate_dump_url_schedules(
interval=timedelta(days=365),
start_date=datetime(2022, 11, 4, 20, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
start_date=datetime(2024, 3, 22, 12, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_SME_AGENT_LABEL.value,
],
Expand Down
6 changes: 6 additions & 0 deletions pipelines/utils/dump_url/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@

# Table parameters
partition_columns = Parameter("partition_columns", required=False, default="")
encoding = Parameter("encoding", required=False, default="utf-8")
on_bad_lines = Parameter("on_bad_lines", required=False, default="error")
separator = Parameter("separator", required=False, default=",")

# Materialization parameters
materialize_after_dump = Parameter(
Expand Down Expand Up @@ -119,6 +122,9 @@
save_path=DUMP_DATA_PATH,
build_json_dataframe=build_json_dataframe,
dataframe_key_column=dataframe_key_column,
encoding=encoding,
on_bad_lines=on_bad_lines,
separator=separator,
)
DUMP_CHUNKS_TASK.set_upstream(DOWNLOAD_URL_TASK)

Expand Down
15 changes: 13 additions & 2 deletions pipelines/utils/dump_url/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,23 @@ def dump_files(
chunksize: int = 10**6,
build_json_dataframe: bool = False,
dataframe_key_column: str = None,
encoding: str = "utf-8",
on_bad_lines: str = "error",
separator: str = ",",
) -> None:
"""
Dump files according to chunk size
Dump files according to chunk size and read mode
"""
event_id = datetime.now().strftime("%Y%m%d-%H%M%S")
for idx, chunk in enumerate(pd.read_csv(Path(file_path), chunksize=chunksize)):
for idx, chunk in enumerate(
pd.read_csv(
Path(file_path),
chunksize=chunksize,
encoding=encoding,
on_bad_lines=on_bad_lines,
sep=separator,
)
):
log(f"Dumping batch {idx} with size {chunksize}")
handle_dataframe_chunk(
dataframe=chunk,
Expand Down
6 changes: 6 additions & 0 deletions pipelines/utils/dump_url/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ def generate_dump_url_schedules( # pylint: disable=too-many-arguments,too-many-
parameter_defaults["materialize_to_datario"] = parameters[
"materialize_to_datario"
]
if "encoding" in parameters:
parameter_defaults["encoding"] = parameters["encoding"]
if "on_bad_lines" in parameters:
parameter_defaults["on_bad_lines"] = parameters["on_bad_lines"]
if "separator" in parameters:
parameter_defaults["separator"] = parameters["separator"]
# if "dbt_model_secret_parameters" in parameters:
# parameter_defaults["dbt_model_secret_parameters"] = parameters[
# "dbt_model_secret_parameters"
Expand Down

0 comments on commit d0a42ed

Please sign in to comment.