Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Covid Recovery Dash generation code to ingestor #112

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
finish up, add cron jobs and policy files
  • Loading branch information
idreyn committed Sep 2, 2024
commit 635b5ed0a3dfd08bdcd6c20337bbf38bc4ef4550
7 changes: 6 additions & 1 deletion ingestor/.chalice/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,13 @@
"update_time_predictions": {
"iam_policy_file": "policy-time-predictions.json",
"lambda_timeout": 300
},
"update_service_ridership_dashboard": {
"iam_policy_file": "policy-service-ridership-dashboard.json",
"lambda_timeout": 900,
"lambda_memory_size": 1024
}
}
}
}
}
}
34 changes: 34 additions & 0 deletions ingestor/.chalice/policy-service-ridership-dashboard.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "arn:*:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::tm-gtfs",
"arn:aws:s3:::tm-gtfs/*",
"arn:aws:s3:::tm-service-ridership-dashboard",
"arn:aws:s3:::tm-service-ridership-dashboard/*"
]
},
{
"Effect": "Allow",
"Action": [
"dynamodb:Query"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:473352343756:table/Ridership",
"arn:aws:dynamodb:us-east-1:473352343756:table/ScheduledServiceDaily"
]
}
]
}
7 changes: 7 additions & 0 deletions ingestor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
predictions,
landing,
trip_metrics,
service_ridership_dashboard,
)

app = Chalice(app_name="ingestor")
Expand Down Expand Up @@ -148,3 +149,9 @@ def store_landing_data(event):
ridership_data = landing.get_ridership_data()
landing.upload_to_s3(json.dumps(trip_metrics_data), json.dumps(ridership_data))
landing.clear_cache()


# 9:00 UTC -> 4:30/5:30am ET every day (after GTFS and ridership have bene ingested)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# 9:00 UTC -> 4:30/5:30am ET every day (after GTFS and ridership have bene ingested)
# 9:00 UTC -> 4:30/5:30am ET every day (after GTFS and ridership have been ingested)

@app.schedule(Cron(30, 9, "*", "*", "?", "*"))
def update_service_ridership_dashboard():
service_ridership_dashboard.create_service_ridership_dash_json()
3 changes: 3 additions & 0 deletions ingestor/chalicelib/service_ridership_dashboard/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .ingest import create_service_ridership_dash_json

__all__ = ["create_service_ridership_dash_json"]
17 changes: 13 additions & 4 deletions ingestor/chalicelib/service_ridership_dashboard/gtfs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import boto3
from typing import Dict
from typing import Optional
from tempfile import TemporaryDirectory
from mbta_gtfs_sqlite import MbtaGtfsArchive
from mbta_gtfs_sqlite.models import Route, Line
Expand All @@ -8,8 +8,10 @@

from .config import IGNORE_LINE_IDS

RoutesByLine = dict[Line, Route]

def get_routes_by_line() -> Dict[Line, Route]:

def get_routes_by_line(include_only_line_ids: Optional[list[str]]) -> dict[Line, Route]:
s3 = boto3.resource("s3")
archive = MbtaGtfsArchive(
local_archive_path=TemporaryDirectory().name,
Expand All @@ -19,8 +21,15 @@ def get_routes_by_line() -> Dict[Line, Route]:
feed.use_compact_only()
feed.download_or_build()
session = feed.create_sqlite_session(compact=True)
lines_by_id = index_by(session.query(Line).all(), lambda line: line.line_id)
lines_by_id = index_by(
session.query(Line).all(),
lambda line: line.line_id,
)
all_routes_with_line_ids = [
route for route in session.query(Route).all() if route.line_id and route.line_id not in IGNORE_LINE_IDS
route
for route in session.query(Route).all()
if route.line_id
and route.line_id not in IGNORE_LINE_IDS
and (not include_only_line_ids or route.line_id in include_only_line_ids)
]
return bucket_by(all_routes_with_line_ids, lambda route: lines_by_id[route.line_id])
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import click
from datetime import date, datetime, timedelta
from typing import cast
from typing import cast, Optional
from pathlib import PurePath

from .config import (
Expand All @@ -10,11 +11,13 @@
)
from .service_levels import get_service_level_entries_by_line_id, ServiceLevelsByDate, ServiceLevelsEntry
from .ridership import ridership_by_line_id, RidershipEntry
from .gtfs import get_routes_by_line
from .service_summaries import summarize_weekly_service_around_date
from .util import date_to_string
from .time_series import get_time_series
from .util import date_to_string, date_from_string
from .time_series import get_weekly_median_time_series
from .summary import get_summary_data
from .types import ServiceRegimes, LineData, DashJSON, LineKind
from .s3 import put_dashboard_json_to_s3

parent_dir = PurePath(__file__).parent
debug_file_name = parent_dir / "dash.json"
Expand Down Expand Up @@ -70,13 +73,13 @@ def create_line_data(
route_ids=service_level_entry.route_ids,
line_id=service_level_entry.line_id,
),
"ridershipHistory": get_time_series(
"ridershipHistory": get_weekly_median_time_series(
entries=ridership,
entry_value_getter=lambda entry: entry.ridership,
start_date=start_date,
max_end_date=end_date,
),
"serviceHistory": get_time_series(
"serviceHistory": get_weekly_median_time_series(
entries=service_levels,
entry_value_getter=lambda entry: round(sum(entry.service_levels)),
start_date=start_date,
Expand All @@ -92,9 +95,17 @@ def create_line_data(
def create_service_ridership_dash_json(
start_date: date = START_DATE,
end_date: date = datetime.now(TIME_ZONE).date(),
write_debug_file: bool = False,
write_debug_files: bool = False,
write_to_s3: bool = True,
include_only_line_ids: Optional[list[str]] = None,
):
print(
f"Creating service ridership dashboard JSON for {start_date} to {end_date} "
+ f"{'for lines ' + ', '.join(include_only_line_ids) if include_only_line_ids else ''}"
)
routes_by_line = get_routes_by_line(include_only_line_ids=include_only_line_ids)
service_level_entries = get_service_level_entries_by_line_id(
routes_by_line=routes_by_line,
start_date=start_date,
end_date=end_date,
)
Expand Down Expand Up @@ -125,10 +136,34 @@ def create_service_ridership_dash_json(
"summaryData": summary_data,
"lineData": line_data_by_line_id,
}
if write_debug_file:
if write_debug_files:
with open(debug_file_name, "w") as f:
json.dump(dash_json, f)
if write_to_s3:
put_dashboard_json_to_s3(today=end_date, dash_json=dash_json)


@click.command()
@click.option("--start", default=START_DATE, help="Start date for the dashboard")
@click.option("--end", default=datetime.now(TIME_ZONE).date(), help="End date for the dashboard")
@click.option("--debug", default=False, help="Write debug file", is_flag=True)
@click.option("--s3", default=False, help="Write to S3", is_flag=True)
@click.option("--lines", default=None, help="Include only these line IDs")
def create_service_ridership_dash_json_command(
start: str,
end: str,
debug: bool = False,
s3: bool = False,
lines: Optional[str] = None,
):
create_service_ridership_dash_json(
start_date=date_from_string(start),
end_date=date_from_string(end),
write_debug_files=debug,
write_to_s3=s3,
include_only_line_ids=[f"line-{line}" for line in lines.split(",")] if lines else None,
)


if __name__ == "__main__":
create_service_ridership_dash_json(write_debug_file=True)
create_service_ridership_dash_json_command()
15 changes: 15 additions & 0 deletions ingestor/chalicelib/service_ridership_dashboard/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import boto3
import json
from datetime import date

from .util import date_to_string
from .types import DashJSON

bucket = boto3.resource("s3").Bucket("tm-service-ridership-dashboard")


def put_dashboard_json_to_s3(today: date, dash_json: DashJSON) -> None:
print("Uploading dashboard JSON to S3")
contents = json.dumps(dash_json)
bucket.put_object(Key=f"{date_to_string(today)}.json", Body=contents)
bucket.put_object(Key="latest.json", Body=contents)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from tqdm import tqdm

from .queries import query_scheduled_service, ScheduledServiceRow
from .gtfs import get_routes_by_line
from .gtfs import RoutesByLine
from .util import bucket_by, index_by, date_range, date_to_string


Expand Down Expand Up @@ -33,12 +33,16 @@ def _get_trip_count_by_hour_totals_for_day(rows_for_day: list[ScheduledServiceRo


def _get_has_service_exception(rows_for_day: list[ScheduledServiceRow]) -> bool:
return any(item["hasServiceExceptions"] for item in rows_for_day)
return any(item.get("hasServiceExceptions") for item in rows_for_day)


def get_service_level_entries_by_line_id(start_date: date, end_date: date) -> ServiceLevelsByLineId:
def get_service_level_entries_by_line_id(
routes_by_line: RoutesByLine,
start_date: date,
end_date: date,
) -> ServiceLevelsByLineId:
entries: dict[str, list[ServiceLevelsEntry]] = {}
for line, routes in (progress := tqdm(get_routes_by_line().items())):
for line, routes in (progress := tqdm(routes_by_line.items())):
entries.setdefault(line.line_id, [])
progress.set_description(f"Loading service levels for {line.line_id}")
results_by_date_str: dict[str, list[ScheduledServiceRow]] = bucket_by(
Expand Down
58 changes: 12 additions & 46 deletions ingestor/chalicelib/service_ridership_dashboard/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

from .util import date_to_string
from .types import LineData, SummaryData
from .config import PRE_COVID_DATE
from .time_series import (
merge_weekly_median_time_series,
get_latest_weekly_median_time_series_entry,
)


def _line_is_cancelled(line: LineData) -> bool:
Expand All @@ -27,58 +30,21 @@ def _line_has_increased_service(line: LineData) -> bool:
return False


def _merge_time_series(many_series: list[list[float]]) -> list[float]:
min_length = min(len(series) for series in many_series)
entries = [0.0] * min_length
for series in many_series:
for idx, value in enumerate(series):
if idx >= min_length:
break
entries[idx] += value
return [round(e) for e in entries]


def _get_fraction_of_timeseries_value(
time_series: list[float],
start_date: date,
present_date: date,
denominator_date: date,
) -> float:
present_idx = min(
(present_date - start_date).days,
len(time_series) - 1,
)
denominator_idx = (denominator_date - start_date).days
numerator = time_series[present_idx]
denominator = time_series[denominator_idx]
return numerator / denominator if denominator != 0 else 0.0


def get_summary_data(line_data: list[LineData], start_date: date, end_date: date) -> SummaryData:
total_ridership_history = _merge_time_series([line["ridershipHistory"] for line in line_data])
total_service_history = _merge_time_series([line["serviceHistory"] for line in line_data])
total_passengers = total_ridership_history[-1]
total_trips = total_service_history[-1]
total_ridership_history = merge_weekly_median_time_series([line["ridershipHistory"] for line in line_data])
total_service_history = merge_weekly_median_time_series([line["serviceHistory"] for line in line_data])
total_passengers = get_latest_weekly_median_time_series_entry(total_ridership_history)
total_trips = get_latest_weekly_median_time_series_entry(total_service_history)
total_routes_cancelled = sum(_line_is_cancelled(line) for line in line_data)
total_reduced_service = sum(_line_has_reduced_service(line) for line in line_data)
total_increased_service = sum(_line_has_increased_service(line) for line in line_data)
return {
"totalRidershipHistory": total_ridership_history,
"totalServiceHistory": total_service_history,
"totalRidershipPercentage": _get_fraction_of_timeseries_value(
time_series=total_ridership_history,
start_date=start_date,
present_date=end_date,
denominator_date=PRE_COVID_DATE,
),
"totalServicePercentage": _get_fraction_of_timeseries_value(
time_series=total_service_history,
start_date=start_date,
present_date=end_date,
denominator_date=PRE_COVID_DATE,
),
"totalPassengers": total_passengers,
"totalTrips": total_trips,
"totalRidershipPercentage": 0, # From CRD, remove
"totalServicePercentage": 0, # From CRD, remove
"totalPassengers": total_passengers or 0,
"totalTrips": total_trips or 0,
"totalRoutesCancelled": total_routes_cancelled,
"totalReducedService": total_reduced_service,
"totalIncreasedService": total_increased_service,
Expand Down
Loading