Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alert delays to daily runs #113

Merged
merged 7 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ingestor/.chalice/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@
"lambda_memory_size": 1024,
"lambda_timeout": 900
},
"update_alert_delays": {
"iam_policy_file": "policy-alert-delays.json",
"lambda_timeout": 600
},
"update_time_predictions": {
"iam_policy_file": "policy-time-predictions.json",
"lambda_timeout": 300
Expand Down
21 changes: 21 additions & 0 deletions ingestor/.chalice/policy-alert-delays.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "arn:*:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": ["dynamodb:BatchWriteItem"],
"Resource": [
"arn:aws:dynamodb:us-east-1:473352343756:table/AlertDelaysWeekly"
]
}
]
}
21 changes: 21 additions & 0 deletions ingestor/.chalice/resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,27 @@
}
}
},
"UpdateAlertDelays": {
"Type": "AWS::Serverless::Function",
"Properties": {
"Environment": {
"Variables": {
"DD_API_KEY": {
"Ref": "DDApiKey"
},
"DD_VERSION": {
"Ref": "GitVersion"
},
"DD_TAGS": {
"Ref": "DDTags"
},
"DD_GIT_REPOSITORY_URL": {
"Ref": "DDGitRepositoryUrl"
}
}
}
}
},
"UpdateDeliveredTripMetricsYesterday": {
"Type": "AWS::Serverless::Function",
"Properties": {
Expand Down
20 changes: 16 additions & 4 deletions ingestor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
agg_speed_tables,
gtfs,
ridership,
delays,
speed_restrictions,
predictions,
landing,
Expand Down Expand Up @@ -94,8 +95,8 @@ def update_ridership(event):
ridership.ingest_ridership_data()


# 7:20am UTC -> 2:20/3:20am ET every day
@app.schedule(Cron(20, 7, "*", "*", "?", "*"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reducing runs to reduce AWS cost slightly since it shouldn't serve any benefit

# 7:20am UTC -> 2:20/3:20am ET every weekday
@app.schedule(Cron(20, 7, "?", "*", "MON-FRI", "*"))
def update_speed_restrictions(event):
speed_restrictions.update_speed_restrictions(max_lookback_months=2)

Expand All @@ -106,6 +107,15 @@ def update_time_predictions(event):
predictions.update_predictions()


# 7:45am UTC -> 2:45/3:45am ET every Monday
# There's no benefit to running it more frequently than once a week.
@app.schedule(Cron(45, 7, "?", "*", "MON", "*"))
def update_alert_delays(event):
today = datetime.now()
one_week_ago = (today - timedelta(days=8)).date()
delays.update_table(one_week_ago, today.date())


# 8:00am UTC -> 3:00/4:00am ET every day
@app.schedule(Cron(0, 8, "*", "*", "?", "*"))
def update_gtfs(event):
Expand Down Expand Up @@ -138,8 +148,10 @@ def populate_agg_delivered_trip_metrics(params, context):
agg_speed_tables.populate_table(line, "weekly")


# 9:00 UTC -> 4:00/5:00am ET every day.
@app.schedule(Cron(0, 9, "*", "*", "?", "*"))
# 9:00 UTC -> 4:00/5:00am ET every weekday.
# This is the last job that runs for the day.
# No need to run on weekends
@app.schedule(Cron(0, 9, "?", "*", "MON-FRI", "*"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reducing runs to reduce AWS cost slightly since it shouldn't serve any benefit

def store_landing_data(event):
print(
f"Uploading ridership and trip metric data for landing page from {constants.NINETY_DAYS_AGO_STRING} to {constants.ONE_WEEK_AGO_STRING}"
Expand Down
10 changes: 0 additions & 10 deletions ingestor/chalicelib/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@
ALL_LINES: list[str] = ["Red", "Orange", "Blue", "Green-B", "Green-C", "Green-D", "Green-E"]
STATIONS = stations.STATIONS

ROUTE_TO_LINE_MAP = {
"Green-B": "line-green",
"Green-C": "line-green",
"Green-D": "line-green",
"Green-E": "line-green",
"Red": "line-red",
"Orange": "line-orange",
"Blue": "line-blue",
}

TERMINI_NEW = {
"line-red": {
"a": {
Expand Down
3 changes: 3 additions & 0 deletions ingestor/chalicelib/delays/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__all__ = ["update_table"]

from .process import update_table
56 changes: 38 additions & 18 deletions ingestor/chalicelib/delays/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import pandas as pd
import requests

from ingestor.chalicelib import constants, dynamo
from ingestor.chalicelib.delays.aggregate import group_weekly_data
from ingestor.chalicelib.delays.types import Alert, AlertsRequest
from chalicelib import constants, dynamo
from chalicelib.delays.aggregate import group_weekly_data
from chalicelib.delays.types import Alert, AlertsRequest

TABLE_NAME = "AlertDelaysWeekly"

Expand Down Expand Up @@ -59,13 +59,31 @@ def alert_type(alert: Alert):
or "disabled trolley" in alert["text"].lower()
or "train that was disabled" in alert["text"].lower()
or "disabled bus" in alert["text"].lower()
or "train being taken out of service" in alert["text"].lower()
):
return "disabled_vehicle"
elif "signal problem" in alert["text"].lower() or "signal issue" in alert["text"].lower():
elif (
"signal problem" in alert["text"].lower()
or "signal issue" in alert["text"].lower()
or "signal repairs" in alert["text"].lower()
or "signal maintenance" in alert["text"].lower()
or "signal repair" in alert["text"].lower()
):
return "signal_problem"
elif "switch problem" in alert["text"].lower():
elif (
"switch problem" in alert["text"].lower()
or "switch issue" in alert["text"].lower()
or "witch problem" in alert["text"].lower()
or "switching issue" in alert["text"].lower()
):
return "switch_problem"
elif "brake issue" in alert["text"].lower() or "brake problem" in alert["text"].lower():
elif (
"brake issue" in alert["text"].lower()
or "brake problem" in alert["text"].lower()
or "brakes activated" in alert["text"].lower()
or "brakes holding" in alert["text"].lower()
or "brakes applied" in alert["text"].lower()
):
return "brake_problem"
elif (
"power problem" in alert["text"].lower()
Expand All @@ -76,6 +94,8 @@ def alert_type(alert: Alert):
or "overheard wires" in alert["text"].lower() # typo in the alert
or "catenary wires" in alert["text"].lower()
or "the overhead" in alert["text"].lower()
or "wire repair" in alert["text"].lower()
or "wire maintenance" in alert["text"].lower()
or "wire problem" in alert["text"].lower()
or "electrical problem" in alert["text"].lower()
or "overhead catenary" in alert["text"].lower()
Expand All @@ -88,6 +108,7 @@ def alert_type(alert: Alert):
"track issue" in alert["text"].lower()
or "track problem" in alert["text"].lower()
or "cracked rail" in alert["text"].lower()
or "broken rail" in alert["text"].lower()
):
return "track_issue"
elif (
Expand All @@ -103,7 +124,11 @@ def alert_type(alert: Alert):
return "police_activity"
elif "fire" in alert["text"].lower() or "smoke" in alert["text"].lower() or "burning" in alert["text"].lower():
return "fire"
elif "mechanical problem" in alert["text"].lower() or "mechanical issue" in alert["text"].lower():
elif (
"mechanical problem" in alert["text"].lower()
or "mechanical issue" in alert["text"].lower()
or "motor problem" in alert["text"].lower()
):
return "mechanical_problem"

print(alert["valid_from"], alert["text"].lower())
Expand Down Expand Up @@ -154,27 +179,22 @@ def process_delay_time(alerts: List[Alert]):

def process_requests(requests: List[AlertsRequest]):
# process all requests
all_data = {
"line-red": [],
"line-orange": [],
"line-blue": [],
"line-green": [],
}
all_data = {"Red": [], "Orange": [], "Blue": [], "Green-B": [], "Green-C": [], "Green-D": [], "Green-E": []}
for request in requests:
data = process_single_day(request)
if data is not None and len(data) != 0:
total_delay, delay_by_type = process_delay_time(data)
all_data[constants.ROUTE_TO_LINE_MAP[request.route]].append(
all_data[request.route].append(
{
"date": request.date.isoformat(),
"line": constants.ROUTE_TO_LINE_MAP[request.route],
"line": request.route,
"total_delay_time": total_delay,
"delay_by_type": delay_by_type,
}
)

df_data = {}
for line in constants.LINES:
for line in constants.ALL_LINES:
df = pd.DataFrame(all_data[line])
df = df.join(pd.json_normalize(df["delay_by_type"]))
df.drop(columns=["delay_by_type"], inplace=True)
Expand All @@ -199,6 +219,6 @@ def update_table(start_date: date, end_date: date):


if __name__ == "__main__":
start_date = date(2024, 5, 15)
end_date = date(2024, 8, 31)
start_date = date(2018, 1, 1)
end_date = date(2018, 6, 15)
update_table(start_date, end_date)
Loading