From 06822606e50c0c5b67ff2b4eac182f18b39bc1ef Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Mon, 23 Dec 2024 18:55:26 -0300 Subject: [PATCH 01/11] cria captura movidesk --- pipelines/capture/movidesk/CHANGELOG.md | 7 ++++ pipelines/capture/movidesk/__init__.py | 0 pipelines/capture/movidesk/constants.py | 30 ++++++++++++++++ pipelines/capture/movidesk/flows.py | 15 ++++++++ pipelines/capture/movidesk/tasks.py | 47 +++++++++++++++++++++++++ 5 files changed, 99 insertions(+) create mode 100644 pipelines/capture/movidesk/CHANGELOG.md create mode 100644 pipelines/capture/movidesk/__init__.py create mode 100644 pipelines/capture/movidesk/constants.py create mode 100644 pipelines/capture/movidesk/flows.py create mode 100644 pipelines/capture/movidesk/tasks.py diff --git a/pipelines/capture/movidesk/CHANGELOG.md b/pipelines/capture/movidesk/CHANGELOG.md new file mode 100644 index 000000000..3a90159a4 --- /dev/null +++ b/pipelines/capture/movidesk/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - source_movidesk + +## [1.0.0] - 2024-12-26 + +### Adicionado + +- Cria flow de captura dos tickets do MoviDesk (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/) # adicionar numero PR \ No newline at end of file diff --git a/pipelines/capture/movidesk/__init__.py b/pipelines/capture/movidesk/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/capture/movidesk/constants.py b/pipelines/capture/movidesk/constants.py new file mode 100644 index 000000000..cab2622cb --- /dev/null +++ b/pipelines/capture/movidesk/constants.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +""" +Valores constantes para captura de dados da Rio Ônibus +""" + +from datetime import datetime +from enum import Enum + +from pipelines.schedules import create_daily_cron +from pipelines.utils.gcp.bigquery import SourceTable + + +class constants(Enum): # pylint: disable=c0103 + """ + Valores constantes para captura de dados do MoviDesk + """ + + MOVIDESK_SOURCE_NAME = "rioonibus" + MOVIDESK_SECRET_PATH = "sppo_subsidio_recursos_api" # movidesk_api + TICKETS_BASE_URL = "https://api.movidesk.com/public/v1/tickets" + TICKETS_TABLE_ID = "tickets" + TICKETS_SOURCE = SourceTable( + source_name=MOVIDESK_SOURCE_NAME, + table_id=TICKETS_TABLE_ID, + first_timestamp=datetime(2024, 12, 1, 0, 0, 0), + schedule_cron=create_daily_cron(hour=0), + partition_date_only=True, + # max_recaptures=5, + primary_keys=["protocol"], + ) diff --git a/pipelines/capture/movidesk/flows.py b/pipelines/capture/movidesk/flows.py new file mode 100644 index 000000000..476e8446f --- /dev/null +++ b/pipelines/capture/movidesk/flows.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +"""Flows de captura dos tickets do MoviDesk""" +from pipelines.capture.movidesk.constants import constants +from pipelines.capture.movidesk.tasks import create_tickets_extractor +from pipelines.capture.templates.flows import create_default_capture_flow +from pipelines.constants import constants as smtr_constants +from pipelines.utils.prefect import set_default_parameters + +CAPTURA_TICKETS = create_default_capture_flow( + flow_name="movidesk: tickets - captura", + source=constants.TICKETS_SOURCE.value, + create_extractor_task=create_tickets_extractor, + agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value, +) +set_default_parameters(CAPTURA_TICKETS, {"recapture": True}) diff --git a/pipelines/capture/movidesk/tasks.py b/pipelines/capture/movidesk/tasks.py new file mode 100644 index 000000000..1e1b81d54 --- /dev/null +++ b/pipelines/capture/movidesk/tasks.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +"""Tasks de captura dos tickets do MoviDesk""" +from datetime import datetime, timedelta +from functools import partial + +from prefect import task + +from pipelines.capture.movidesk.constants import constants +from pipelines.constants import constants as smtr_constants +from pipelines.utils.extractors.api import get_raw_api_top_skip +from pipelines.utils.gcp.bigquery import SourceTable +from pipelines.utils.secret import get_secret + + +@task( + max_retries=smtr_constants.MAX_RETRIES.value, + retry_delay=timedelta(seconds=smtr_constants.RETRY_DELAY.value), +) +def create_tickets_extractor( + source: SourceTable, # pylint: disable=W0613 + timestamp: datetime, +): + """Cria a extração dos tickets do MoviDesk""" + + start = datetime.strftime(timestamp - timedelta(days=1), "%Y-%m-%dT%H:%M:%S.%MZ") + end = datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%MZ") + token = get_secret(constants.MOVIDESK_SECRET_PATH.value)["token"] + params = [ + { + "token": token, + "$select": "id,protocol,createdDate,lastUpdate", + "$filter": f"serviceFirstLevel eq '{service} - Recurso Viagens Subsídio'and (lastUpdate ge {start} and lastUpdate lt {end} or createdDate ge {start} and createdDate lt {end})", # noqa + "$expand": "customFieldValues,customFieldValues($expand=items)", + "$orderby": "createdDate asc", + } + for service in smtr_constants.SUBSIDIO_SPPO_RECURSO_TABLE_CAPTURE_PARAMS.value + ] + + return partial( + get_raw_api_top_skip, + url=constants.TICKETS_BASE_URL.value, + params_list=params, + top_param_name="$top", + skip_param_name="$skip", + page_size=1000, + max_page=10, + ) From f1b7f703a08d87830262e4d47eb00c5f08f4a05c Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 26 Dec 2024 10:37:52 -0300 Subject: [PATCH 02/11] import movidesk flows --- pipelines/flows.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelines/flows.py b/pipelines/flows.py index 7f9448767..9ba881e81 100644 --- a/pipelines/flows.py +++ b/pipelines/flows.py @@ -3,6 +3,7 @@ Imports all flows for every project so we can register all of them. """ from pipelines.capture.jae.flows import * # noqa +from pipelines.capture.movidesk.flows import * # noqa from pipelines.capture.rioonibus.flows import * # noqa from pipelines.capture.sonda.flows import * # noqa from pipelines.exemplo import * # noqa From 823a81d16bf01e8e26d0678c4f44413932d07b13 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 26 Dec 2024 10:48:22 -0300 Subject: [PATCH 03/11] corrige parametro --- pipelines/capture/movidesk/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/capture/movidesk/tasks.py b/pipelines/capture/movidesk/tasks.py index 1e1b81d54..28f59a314 100644 --- a/pipelines/capture/movidesk/tasks.py +++ b/pipelines/capture/movidesk/tasks.py @@ -39,7 +39,7 @@ def create_tickets_extractor( return partial( get_raw_api_top_skip, url=constants.TICKETS_BASE_URL.value, - params_list=params, + params=params, top_param_name="$top", skip_param_name="$skip", page_size=1000, From 636c41cf8e36867468cc413276fc745138992db7 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 26 Dec 2024 10:56:36 -0300 Subject: [PATCH 04/11] add headers --- pipelines/capture/movidesk/tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelines/capture/movidesk/tasks.py b/pipelines/capture/movidesk/tasks.py index 28f59a314..f35ba26b4 100644 --- a/pipelines/capture/movidesk/tasks.py +++ b/pipelines/capture/movidesk/tasks.py @@ -39,6 +39,7 @@ def create_tickets_extractor( return partial( get_raw_api_top_skip, url=constants.TICKETS_BASE_URL.value, + headers=None, params=params, top_param_name="$top", skip_param_name="$skip", From 021f73bf8e1fec10e80507870c736cc9cfe7d652 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 26 Dec 2024 11:05:31 -0300 Subject: [PATCH 05/11] altera source name --- pipelines/capture/movidesk/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/capture/movidesk/constants.py b/pipelines/capture/movidesk/constants.py index cab2622cb..4cd9acb98 100644 --- a/pipelines/capture/movidesk/constants.py +++ b/pipelines/capture/movidesk/constants.py @@ -15,7 +15,7 @@ class constants(Enum): # pylint: disable=c0103 Valores constantes para captura de dados do MoviDesk """ - MOVIDESK_SOURCE_NAME = "rioonibus" + MOVIDESK_SOURCE_NAME = "movidesk" MOVIDESK_SECRET_PATH = "sppo_subsidio_recursos_api" # movidesk_api TICKETS_BASE_URL = "https://api.movidesk.com/public/v1/tickets" TICKETS_TABLE_ID = "tickets" From 20caeac704853dc90feebc9c0b1f046934f9e062 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 26 Dec 2024 11:27:42 -0300 Subject: [PATCH 06/11] altera retorno create_tickets_extractor --- pipelines/capture/movidesk/tasks.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/pipelines/capture/movidesk/tasks.py b/pipelines/capture/movidesk/tasks.py index f35ba26b4..4cd145026 100644 --- a/pipelines/capture/movidesk/tasks.py +++ b/pipelines/capture/movidesk/tasks.py @@ -36,13 +36,18 @@ def create_tickets_extractor( for service in smtr_constants.SUBSIDIO_SPPO_RECURSO_TABLE_CAPTURE_PARAMS.value ] - return partial( - get_raw_api_top_skip, - url=constants.TICKETS_BASE_URL.value, - headers=None, - params=params, - top_param_name="$top", - skip_param_name="$skip", - page_size=1000, - max_page=10, - ) + partial_list = [ + partial( + get_raw_api_top_skip, + url=constants.TICKETS_BASE_URL.value, + headers=None, + params=param, + top_param_name="$top", + skip_param_name="$skip", + page_size=1000, + max_page=10, + ) + for param in params + ] + + return partial_list From 25a69fd4aa9aedf4914ec30f383c78d8562b2a08 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 26 Dec 2024 11:44:06 -0300 Subject: [PATCH 07/11] teste create_tickets_extractor --- pipelines/capture/movidesk/tasks.py | 43 ++++++++++++----------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/pipelines/capture/movidesk/tasks.py b/pipelines/capture/movidesk/tasks.py index 4cd145026..05f02491b 100644 --- a/pipelines/capture/movidesk/tasks.py +++ b/pipelines/capture/movidesk/tasks.py @@ -25,29 +25,22 @@ def create_tickets_extractor( start = datetime.strftime(timestamp - timedelta(days=1), "%Y-%m-%dT%H:%M:%S.%MZ") end = datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%MZ") token = get_secret(constants.MOVIDESK_SECRET_PATH.value)["token"] - params = [ - { - "token": token, - "$select": "id,protocol,createdDate,lastUpdate", - "$filter": f"serviceFirstLevel eq '{service} - Recurso Viagens Subsídio'and (lastUpdate ge {start} and lastUpdate lt {end} or createdDate ge {start} and createdDate lt {end})", # noqa - "$expand": "customFieldValues,customFieldValues($expand=items)", - "$orderby": "createdDate asc", - } - for service in smtr_constants.SUBSIDIO_SPPO_RECURSO_TABLE_CAPTURE_PARAMS.value - ] + service = "recursos_sppo_reprocessamento" + params = { + "token": token, + "$select": "id,protocol,createdDate,lastUpdate", + "$filter": f"serviceFirstLevel eq '{service} - Recurso Viagens Subsídio'and (lastUpdate ge {start} and lastUpdate lt {end} or createdDate ge {start} and createdDate lt {end})", # noqa + "$expand": "customFieldValues,customFieldValues($expand=items)", + "$orderby": "createdDate asc", + } - partial_list = [ - partial( - get_raw_api_top_skip, - url=constants.TICKETS_BASE_URL.value, - headers=None, - params=param, - top_param_name="$top", - skip_param_name="$skip", - page_size=1000, - max_page=10, - ) - for param in params - ] - - return partial_list + return partial( + get_raw_api_top_skip, + url=constants.TICKETS_BASE_URL.value, + headers=None, + params=params, + top_param_name="$top", + skip_param_name="$skip", + page_size=1000, + max_page=10, + ) From 08f75a09d3075659d104649b6bbaa1514caf976c Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 26 Dec 2024 12:11:18 -0300 Subject: [PATCH 08/11] altera service para teste --- pipelines/capture/movidesk/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/capture/movidesk/tasks.py b/pipelines/capture/movidesk/tasks.py index 05f02491b..9abf76792 100644 --- a/pipelines/capture/movidesk/tasks.py +++ b/pipelines/capture/movidesk/tasks.py @@ -25,7 +25,7 @@ def create_tickets_extractor( start = datetime.strftime(timestamp - timedelta(days=1), "%Y-%m-%dT%H:%M:%S.%MZ") end = datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%MZ") token = get_secret(constants.MOVIDESK_SECRET_PATH.value)["token"] - service = "recursos_sppo_reprocessamento" + service = "recursos_sppo_viagens_individuais" params = { "token": token, "$select": "id,protocol,createdDate,lastUpdate", From 217751d83453cc0714b7bd683c77659b6f230f61 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 26 Dec 2024 12:41:56 -0300 Subject: [PATCH 09/11] corrige service para teste --- pipelines/capture/movidesk/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/capture/movidesk/tasks.py b/pipelines/capture/movidesk/tasks.py index 9abf76792..bc8427430 100644 --- a/pipelines/capture/movidesk/tasks.py +++ b/pipelines/capture/movidesk/tasks.py @@ -25,7 +25,7 @@ def create_tickets_extractor( start = datetime.strftime(timestamp - timedelta(days=1), "%Y-%m-%dT%H:%M:%S.%MZ") end = datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%MZ") token = get_secret(constants.MOVIDESK_SECRET_PATH.value)["token"] - service = "recursos_sppo_viagens_individuais" + service = "Viagem Individual" params = { "token": token, "$select": "id,protocol,createdDate,lastUpdate", From 2a6478f9167d9321ef5ce71c7184e39ce8e93f05 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 26 Dec 2024 14:30:18 -0300 Subject: [PATCH 10/11] altera transform_to_nested_structure --- pipelines/utils/pretreatment.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pipelines/utils/pretreatment.py b/pipelines/utils/pretreatment.py index 19f6f1bea..fb5a0f045 100644 --- a/pipelines/utils/pretreatment.py +++ b/pipelines/utils/pretreatment.py @@ -22,8 +22,12 @@ def transform_to_nested_structure(data: pd.DataFrame, primary_keys: list) -> pd. data["content"] = data.apply( lambda row: json.dumps( { - key: value if not pd.isna(value) else None - for key, value in row[content_columns].to_dict().items() + key: ( + row[key] + if isinstance(row[key], (list, dict)) or not pd.isna(row[key]) + else None + ) + for key in content_columns } ), axis=1, From 062421cade975b1eeb4488bfb195cfb4ec6af772 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 26 Dec 2024 14:47:41 -0300 Subject: [PATCH 11/11] altera transform_raw_to_nested_structure --- pipelines/migration/tasks.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pipelines/migration/tasks.py b/pipelines/migration/tasks.py index 329f90a25..c36b920c1 100644 --- a/pipelines/migration/tasks.py +++ b/pipelines/migration/tasks.py @@ -1559,8 +1559,12 @@ def transform_raw_to_nested_structure( data["content"] = data.apply( lambda row: json.dumps( { - key: value if not pd.isna(value) else None - for key, value in row[content_columns].to_dict().items() + key: ( + row[key] + if isinstance(row[key], (list, dict)) or not pd.isna(row[key]) + else None + ) + for key in content_columns }, ensure_ascii=( constants.CONTROLE_FINANCEIRO_DATASET_ID.value not in raw_filepath