diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index ec0fd5cce..399454c54 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -63,6 +63,24 @@ bilhetagem_transacao_captura.schedule = every_minute +# BILHETAGEM FISCALIZAÇÃO - CAPTURA A CADA 5 MINUTOS # + +bilhetagem_fiscalizacao_captura = deepcopy(default_capture_flow) +bilhetagem_fiscalizacao_captura.name = "SMTR: Bilhetagem Fiscalização - Captura" +bilhetagem_fiscalizacao_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +bilhetagem_fiscalizacao_captura.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) + +bilhetagem_fiscalizacao_captura = set_default_parameters( + flow=bilhetagem_fiscalizacao_captura, + default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value + | constants.BILHETAGEM_FISCALIZACAO_CAPTURE_PARAMS.value, +) + +bilhetagem_fiscalizacao_captura.schedule = every_5_minutes + # BILHETAGEM INTEGRAÇÃO - CAPTURA A CADA MINUTO # bilhetagem_integracao_captura = deepcopy(default_capture_flow) @@ -255,6 +273,7 @@ | {"recapture": True}, ) + # TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO # with Flow( @@ -294,6 +313,22 @@ raise_final_state=True, ) + # Recaptura Fiscalização + + run_recaptura_fiscalizacao = create_flow_run( + flow_name=bilhetagem_recaptura.name, + project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, + labels=LABELS, + parameters=constants.BILHETAGEM_FISCALIZACAO_CAPTURE_PARAMS.value, + ) + + wait_recaptura_fiscalizacao_true = wait_for_flow_run( + run_recaptura_fiscalizacao, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + # Captura Auxiliar runs_captura = create_flow_run.map( diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index 9a0f54328..e6ba37cae 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -747,10 +747,16 @@ class constants(Enum): # pylint: disable=c0103 "engine": "postgresql", "host": "10.5.12.107", }, + "fiscalizacao_db": { + "engine": "postgresql", + "host": "10.5.115.29", + }, }, "source_type": "db", } + BILHETAGEM_PRIVATE_BUCKET = "rj-smtr-jae-private" + BILHETAGEM_TRANSACAO_CAPTURE_PARAMS = { "table_id": "transacao", "partition_date_only": False, @@ -770,6 +776,26 @@ class constants(Enum): # pylint: disable=c0103 "interval_minutes": 1, } + BILHETAGEM_FISCALIZACAO_CAPTURE_PARAMS = { + "table_id": "fiscalizacao", + "partition_date_only": False, + "extract_params": { + "database": "fiscalizacao_db", + "query": """ + SELECT + * + FROM + fiscalizacao + WHERE + dt_inclusao >= '{start}' + dt_inclusao < '{end}' + """, + }, + "primary_key": ["id"], + "interval_minutes": 5, + "save_bucket_name": BILHETAGEM_PRIVATE_BUCKET, + } + BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS = { "table_id": "integracao_transacao", "partition_date_only": False, @@ -888,8 +914,6 @@ class constants(Enum): # pylint: disable=c0103 BILHETAGEM_TRATAMENTO_INTERVAL = 60 - BILHETAGEM_PRIVATE_BUCKET = "rj-smtr-jae-private" - BILHETAGEM_CAPTURE_PARAMS = [ { "table_id": "linha", @@ -1117,6 +1141,8 @@ class constants(Enum): # pylint: disable=c0103 }, ] + BILHETAGEM_EXCLUDE = "+operadoras +consorcios" + BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS = { "dataset_id": BILHETAGEM_DATASET_ID, "table_id": "integracao", @@ -1128,12 +1154,11 @@ class constants(Enum): # pylint: disable=c0103 }, "version": {}, }, - "exclude": "+operadoras +consorcios", + "exclude": BILHETAGEM_EXCLUDE, } BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = { - "dataset_id": BILHETAGEM_DATASET_ID, - "table_id": "passageiros_hora", + "dataset_id": "dashboard_bilhetagem_jae", "upstream": True, "dbt_vars": { "date_range": { @@ -1142,14 +1167,14 @@ class constants(Enum): # pylint: disable=c0103 }, "version": {}, }, - "exclude": "integracao matriz_integracao", + "exclude": "integracao matriz_integracao stops_gtfs2 routes_gtfs2 feed_info_gtfs2", } BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS = { "dataset_id": BILHETAGEM_DATASET_ID, "table_id": "ordem_pagamento", "upstream": True, - "exclude": f"+{BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS['table_id']}", + "exclude": BILHETAGEM_EXCLUDE, "dbt_vars": { "date_range": { "table_run_datetime_column_name": "data_ordem", @@ -1163,7 +1188,7 @@ class constants(Enum): # pylint: disable=c0103 "dataset_id": BILHETAGEM_DATASET_ID, "upstream": True, "downstream": True, - "exclude": "+operadoras +consorcios", + "exclude": BILHETAGEM_EXCLUDE, "dbt_vars": { "date_range": { "table_run_datetime_column_name": "datetime_captura",