Skip to content

Commit

Permalink
criar novos arquivos utils
Browse files Browse the repository at this point in the history
  • Loading branch information
pixuimpou committed Jan 5, 2024
1 parent e7e3cb6 commit 7ec6ab1
Show file tree
Hide file tree
Showing 14 changed files with 1,006 additions and 1,079 deletions.
18 changes: 8 additions & 10 deletions pipelines/br_rj_riodejaneiro_brt_gps/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
Tasks for br_rj_riodejaneiro_brt_gps
"""

from datetime import timedelta
import traceback
from datetime import timedelta

import pandas as pd
from prefect import task
from prefeitura_rio.pipelines_utils.logging import log

from pipelines.constants import constants
from pipelines.utils.backup.utils import log_critical, map_dict_keys

# EMD Imports #

from prefeitura_rio.pipelines_utils.logging import log

# SMTR Imports #

from pipelines.constants import constants
from pipelines.utils.utils import log_critical, map_dict_keys

# Tasks #

Expand Down Expand Up @@ -53,9 +55,7 @@ def pre_treatment_br_rj_riodejaneiro_brt_gps(status: dict, timestamp):
df = pd.DataFrame(columns=columns) # pylint: disable=c0103

# map_dict_keys change data keys to match project data structure
df["content"] = [
map_dict_keys(piece, constants.GPS_BRT_MAPPING_KEYS.value) for piece in data
]
df["content"] = [map_dict_keys(piece, constants.GPS_BRT_MAPPING_KEYS.value) for piece in data]
df[key_column] = [piece[key_column] for piece in data]
df["timestamp_gps"] = [piece["timestamp_gps"] for piece in data]
df["timestamp_captura"] = timestamp
Expand All @@ -64,9 +64,7 @@ def pre_treatment_br_rj_riodejaneiro_brt_gps(status: dict, timestamp):
# Remove timezone and force it to be config timezone
log(f"Before converting, timestamp_gps was: \n{df['timestamp_gps']}")
df["timestamp_gps"] = (
pd.to_datetime(df["timestamp_gps"], unit="ms")
.dt.tz_localize("UTC")
.dt.tz_convert(timezone)
pd.to_datetime(df["timestamp_gps"], unit="ms").dt.tz_localize("UTC").dt.tz_convert(timezone)
)
log(f"After converting the timezone, timestamp_gps is: \n{df['timestamp_gps']}")

Expand Down
52 changes: 17 additions & 35 deletions pipelines/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
Schedules for rj_smtr
"""

from datetime import timedelta, datetime
from pytz import timezone
from datetime import datetime, timedelta

from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock, CronClock
from pipelines.constants import constants as emd_constants
from pipelines.utils.utils import generate_ftp_schedules
from prefect.schedules.clocks import CronClock, IntervalClock
from pytz import timezone

from pipelines.constants import constants
from pipelines.constants import constants as emd_constants
from pipelines.utils.backup.utils import generate_ftp_schedules

every_minute = Schedule(
clocks=[
IntervalClock(
interval=timedelta(minutes=1),
start_date=datetime(
2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
start_date=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
Expand All @@ -28,9 +28,7 @@
clocks=[
IntervalClock(
interval=timedelta(minutes=1),
start_date=datetime(
2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
start_date=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value,
],
Expand All @@ -42,9 +40,7 @@
clocks=[
IntervalClock(
interval=timedelta(minutes=10),
start_date=datetime(
2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
start_date=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
Expand All @@ -57,9 +53,7 @@
clocks=[
IntervalClock(
interval=timedelta(hours=1),
start_date=datetime(
2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
start_date=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
Expand All @@ -71,9 +65,7 @@
clocks=[
IntervalClock(
interval=timedelta(hours=1),
start_date=datetime(
2021, 1, 1, 0, 6, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
start_date=datetime(2021, 1, 1, 0, 6, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
Expand All @@ -85,9 +77,7 @@
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(
2021, 1, 1, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
start_date=datetime(2021, 1, 1, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
Expand All @@ -103,9 +93,7 @@
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(
2022, 11, 30, 5, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
start_date=datetime(2022, 11, 30, 5, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
Expand All @@ -117,9 +105,7 @@
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(
2022, 11, 30, 7, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
start_date=datetime(2022, 11, 30, 7, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
Expand All @@ -131,18 +117,14 @@
clocks=[
CronClock(
cron="0 12 16 * *",
start_date=datetime(
2022, 12, 16, 12, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
start_date=datetime(2022, 12, 16, 12, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
),
CronClock(
cron="0 12 1 * *",
start_date=datetime(
2023, 1, 1, 12, 0, tzinfo=timezone(constants.TIMEZONE.value)
),
start_date=datetime(2023, 1, 1, 12, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
Expand Down
Loading

0 comments on commit 7ec6ab1

Please sign in to comment.