Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Covid Recovery Dash generation code to ingestor #112

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
wip
  • Loading branch information
idreyn committed Sep 2, 2024
commit 76ec75d59c2609733cd517cdb899e04da7bbfd8a
1 change: 1 addition & 0 deletions dash.json

Large diffs are not rendered by default.

18 changes: 15 additions & 3 deletions ingestor/chalicelib/gtfs/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
date_range,
index_by,
is_valid_route_id,
get_services_for_date,
get_service_ids_for_date_to_has_exceptions,
get_total_service_minutes,
)
from .models import SessionModels, RouteDateTotals
Expand Down Expand Up @@ -50,29 +50,39 @@ def create_gl_route_date_totals(totals: List[RouteDateTotals]):
total_by_hour[i] += total.by_hour[i]
total_count = sum(t.count for t in gl_totals)
total_service_minutes = sum(t.service_minutes for t in gl_totals)
has_service_exceptions = any((t.has_service_exceptions for t in gl_totals))
return RouteDateTotals(
route_id="Green",
line_id="Green",
date=totals[0].date,
count=total_count,
service_minutes=total_service_minutes,
by_hour=total_by_hour,
has_service_exceptions=has_service_exceptions,
)


def create_route_date_totals(today: date, models: SessionModels):
all_totals = []
services_for_today = get_services_for_date(models, today)
service_ids_and_exception_status_for_today = get_service_ids_for_date_to_has_exceptions(models, today)
for route_id, route in models.routes.items():
if not is_valid_route_id(route_id):
continue
trips = [trip for trip in models.trips_by_route_id.get(route_id, []) if trip.service_id in services_for_today]
trips = [
trip
for trip in models.trips_by_route_id.get(route_id, [])
if trip.service_id in service_ids_and_exception_status_for_today.keys()
]
has_service_exceptions = any(
(service_ids_and_exception_status_for_today.get(trip.service_id, False) for trip in trips)
)
totals = RouteDateTotals(
route_id=route_id,
line_id=route.line_id,
date=today,
count=len(trips),
by_hour=bucket_trips_by_hour(trips),
has_service_exceptions=has_service_exceptions,
service_minutes=get_total_service_minutes(trips),
)
all_totals.append(totals)
Expand All @@ -99,6 +109,7 @@ def ingest_feed_to_dynamo(
"lineId": total.line_id,
"count": total.count,
"serviceMinutes": total.service_minutes,
"hasServiceExceptions": total.has_service_exceptions,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "recovery dash" code needs to know, in general, whether the service levels on a given day include any service exceptions (usually additions/removals for holidays). It was easiest just to add this as a column to the ScheduledServiceDaily table — the migration has already run.

"byHour": {"totals": total.by_hour},
}
batch.put_item(Item=item)
Expand All @@ -112,6 +123,7 @@ def ingest_feeds(
force_rebuild_feeds: bool = False,
):
for feed in feeds:
feed.use_compact_only()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this okay for all the s3 uploads? This is what mbta-performance will use every half hour

try:
if force_rebuild_feeds:
print(f"[{feed.key}] Forcing rebuild locally")
Expand Down
1 change: 1 addition & 0 deletions ingestor/chalicelib/gtfs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class RouteDateTotals:
count: int
service_minutes: int
by_hour: List[int]
has_service_exceptions: bool

@property
def timestamp(self):
Expand Down
19 changes: 11 additions & 8 deletions ingestor/chalicelib/gtfs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Trip,
CalendarServiceExceptionType,
ServiceDayAvailability,
CalendarService,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -64,8 +65,12 @@ def date_range(start_date: date, end_date: date):
now = now + timedelta(days=1)


def get_services_for_date(models: "SessionModels", today: date):
services_for_today = set()
def get_service_ids_for_date_to_has_exceptions(models: "SessionModels", today: date) -> dict[str, bool]:
"""
Reports a dict of service IDs that are active on the given date mapped to a boolean indicating if
there are any exceptions for that service on that date.
"""
services_for_today: dict[str, bool] = {}
for service_id in models.calendar_services.keys():
service = models.calendar_services.get(service_id)
if not service:
Expand All @@ -81,15 +86,13 @@ def get_services_for_date(models: "SessionModels", today: date):
service.saturday,
service.sunday,
][today.weekday()] == ServiceDayAvailability.AVAILABLE
service_exceptions_today = [ex for ex in service_exceptions if ex.date == today]
is_removed_by_exception = any(
(
ex.date == today and ex.exception_type == CalendarServiceExceptionType.REMOVED
for ex in service_exceptions
)
(ex.exception_type == CalendarServiceExceptionType.REMOVED for ex in service_exceptions_today)
)
is_added_by_exception = any(
(ex.date == today and ex.exception_type == CalendarServiceExceptionType.ADDED for ex in service_exceptions)
(ex.exception_type == CalendarServiceExceptionType.ADDED for ex in service_exceptions_today)
)
if is_added_by_exception or (in_range and on_sevice_day and not is_removed_by_exception):
services_for_today.add(service_id)
services_for_today[service_id] = len(service_exceptions_today) > 0
return services_for_today
23 changes: 23 additions & 0 deletions ingestor/chalicelib/service_ridership_dashboard/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from datetime import date
from pytz import timezone

# Lower bound for time series and GTFS feeds
PRE_COVID_DATE = date(2020, 2, 24)

# Date to use as a baseline
START_DATE = date(2018, 1, 1)

# Boston baby
TIME_ZONE = timezone("US/Eastern")

# Ignore these
IGNORE_LINE_IDS = ["line-CapeFlyer", "line-Foxboro"]

# Date ranges with service gaps that we paper over because of major holidays or catastrophes
# rather than doing more complicated special-casing with GTFS services
FILL_DATE_RANGES = [
(date(2021, 11, 19), date(2021, 11, 26)), # Thanksgiving 2021
(date(2021, 12, 18), date(2021, 12, 26)), # Christmas 2021
(date(2022, 12, 18), date(2023, 1, 3)), # Christmas 2022
(date(2022, 3, 28), date(2022, 3, 29)), # Haymarket garage collapse
]
125 changes: 125 additions & 0 deletions ingestor/chalicelib/service_ridership_dashboard/generate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import json
from datetime import date, datetime, timedelta
from typing import TypedDict, Literal

from .config import (
START_DATE,
TIME_ZONE,
PRE_COVID_DATE,
)
from .service_levels import get_service_level_entries_by_line_id, ServiceLevelsByDate, ServiceLevelsEntry
from .ridership import ridership_by_line_id, RidershipByDate, RidershipEntry
from .service_summaries import ServiceSummary, summarize_weekly_service_around_date
from .util import date_to_string
from .time_series import get_time_series

LineKind = Literal["bus", "subway", "commuter-rail", "ferry"]


class ServiceRegimes(TypedDict):
present: ServiceSummary
oneYearAgo: ServiceSummary
preCovid: ServiceSummary


class LineData(TypedDict):
id: str
shortName: str
longName: str
routeIds: list[str]
startDate: str
lineKind: LineKind
ridershipHistory: list[float]
serviceHistory: list[float]
serviceRegimes: ServiceRegimes


class DashJSON(TypedDict):
lineData: dict[str, LineData]


def create_service_regimes(
service_levels: ServiceLevelsByDate,
date: date,
) -> ServiceRegimes:
return {
"present": summarize_weekly_service_around_date(
date=date,
service_levels=service_levels,
),
"oneYearAgo": summarize_weekly_service_around_date(
date=date - timedelta(days=365),
service_levels=service_levels,
),
"preCovid": summarize_weekly_service_around_date(
date=PRE_COVID_DATE,
service_levels=service_levels,
),
}


def create_line_data(
start_date: date,
end_date: date,
service_levels: dict[date, ServiceLevelsEntry],
ridership: dict[date, RidershipEntry],
) -> LineData:
[latest_service_levels_date, *_] = sorted(service_levels.keys(), reverse=True)
service_level_entry = service_levels[latest_service_levels_date]
return {
"id": service_level_entry.line_id,
"shortName": service_level_entry.line_short_name,
"longName": service_level_entry.line_long_name,
"routeIds": service_level_entry.route_ids,
"startDate": date_to_string(start_date),
"lineKind": "bus",
"ridershipHistory": get_time_series(
entries=ridership,
entry_value_getter=lambda entry: entry.ridership,
start_date=start_date,
max_end_date=end_date,
),
"serviceHistory": get_time_series(
entries=service_levels,
entry_value_getter=lambda entry: sum(entry.service_levels),
start_date=start_date,
max_end_date=end_date,
),
"serviceRegimes": create_service_regimes(
service_levels=service_levels,
date=latest_service_levels_date,
),
}


def create_service_ridership_dash_json(
start_date: date = START_DATE,
end_date: date = datetime.now(TIME_ZONE).date(),
):
service_level_entries = get_service_level_entries_by_line_id(
start_date=start_date,
end_date=end_date,
)
print("service_level_entries", service_level_entries)
ridership_entries = ridership_by_line_id(
start_date=start_date,
end_date=end_date,
line_ids=list(service_level_entries.keys()),
)
dash_json: DashJSON = {
"lineData": {
line_id: create_line_data(
start_date=start_date,
end_date=end_date,
service_levels=service_level_entries[line_id],
ridership=ridership_entries[line_id],
)
for line_id in service_level_entries.keys()
},
}
with open("dash.json", "w") as f:
json.dump(dash_json, f)


if __name__ == "__main__":
create_service_ridership_dash_json()
Loading