Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSL: Journey fixes #419

Merged
merged 18 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api/schemas/motis/import.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ ScheduleEvent:
description: TODO
NigiriEvent:
description: TODO
fields: {}
fields: { hash: { description: TODO } }
StationsEvent:
description: TODO
fields: {}
6 changes: 6 additions & 0 deletions docs/api/schemas/motis/ris.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ RISStatusResponse:
description: TODO
init_status:
description: TODO
delayed_init:
description: TODO
init_forward_started:
description: TODO
init_forward_done:
description: TODO
FullTripMessageType:
description: TODO
TripFormationMessageType:
Expand Down
19 changes: 13 additions & 6 deletions modules/paxforecast/src/monitoring_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,19 @@ void handle_major_delays(paxforecast& mod, universe& uv, schedule const& sched,
}
last_pgi = pgwrap.pgwr_.pg_;

auto& ar = affected_routes.emplace_back(affected_route_info{
.pgwrap_ = pgwrap,
.destination_station_id_ = get_destination_station_id(
sched, event->group_route()->route()->journey()),
.loc_now_ = from_fbs(sched, event->localization_type(),
event->localization())});
auto loc_now =
from_fbs(sched, event->localization_type(), event->localization());
auto const destination_station_id = get_destination_station_id(
sched, event->group_route()->route()->journey());

if (loc_now.at_station_->index_ == destination_station_id) {
continue;
}

auto& ar = affected_routes.emplace_back(
affected_route_info{.pgwrap_ = pgwrap,
.destination_station_id_ = destination_station_id,
.loc_now_ = std::move(loc_now)});

ar.alts_now_ =
alts_set.add_request(ar.loc_now_, ar.destination_station_id_);
Expand Down
19 changes: 19 additions & 0 deletions modules/paxmon/include/motis/paxmon/delayed_init.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once

#include <string>

#include "motis/core/schedule/schedule.h"

#include "motis/paxmon/paxmon_data.h"

namespace motis::paxmon {

struct delay_init_options {
bool reroute_unmatched_{};
std::string initial_reroute_router_;
};

void delayed_init(paxmon_data& data, universe& uv, schedule const& sched,
delay_init_options const& opt);

} // namespace motis::paxmon
22 changes: 13 additions & 9 deletions modules/paxmon/include/motis/paxmon/loaded_files.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,32 @@

#include <cstddef>
#include <filesystem>
#include <vector>

#include "motis/core/common/unixtime.h"

#include "motis/paxmon/loader/capacities/load_capacities.h"
#include "motis/paxmon/loader/unmatched_journey.h"

namespace motis::paxmon {

struct loaded_journey_file {
std::filesystem::path path_;
unixtime last_modified_{};

std::size_t matched_journeys_{};
std::size_t unmatched_journeys_{};
std::size_t unmatched_journeys_rerouted_{};
std::size_t matched_journey_count_{};
std::size_t unmatched_journey_count_{};
std::size_t unmatched_journey_rerouted_count_{};

std::size_t matched_groups_{};
std::size_t unmatched_groups_{};
std::size_t unmatched_groups_rerouted_{};
std::size_t matched_group_count_{};
std::size_t unmatched_group_count_{};
std::size_t unmatched_group_rerouted_count_{};

std::size_t matched_pax_{};
std::size_t unmatched_pax_{};
std::size_t unmatched_pax_rerouted_{};
std::size_t matched_pax_count_{};
std::size_t unmatched_pax_count_{};
std::size_t unmatched_pax_rerouted_count_{};

std::vector<loader::unmatched_journey> unmatched_journeys_;
};

struct loaded_capacity_file {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <cstdint>
#include <vector>

#include "motis/core/schedule/time.h"

Expand All @@ -11,9 +12,11 @@ namespace motis::paxmon::loader {
struct unmatched_journey {
std::uint32_t start_station_idx_{};
std::uint32_t destination_station_idx_{};
time departure_time_{};
time departure_time_{INVALID_TIME};
time arrival_time_{INVALID_TIME};
motis::paxmon::data_source source_{};
std::uint16_t passengers_{};
std::vector<std::uint16_t> group_sizes_;
};

} // namespace motis::paxmon::loader
2 changes: 1 addition & 1 deletion modules/paxmon/include/motis/paxmon/paxmon.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct paxmon : public motis::module::module {
void reg_subc(motis::module::subc_reg&) override;
void import(motis::module::import_dispatcher& reg) override;
void init(motis::module::registry&) override;
void init_op();

bool import_successful() const override { return import_successful_; }

Expand All @@ -56,7 +57,6 @@ struct paxmon : public motis::module::module {
std::string capacity_match_log_file_{};
std::string initial_over_capacity_report_file_{};
std::string initial_broken_report_file_{};
std::string initial_reroute_query_file_{};
std::string initial_reroute_router_{"/tripbased"};
conf::time start_time_{};
conf::time end_time_{};
Expand Down
12 changes: 7 additions & 5 deletions modules/paxmon/src/api/dataset_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ msg_ptr dataset_info(paxmon_data& data, schedule const& sched) {
[&](auto const& ljf) {
return CreatePaxMonJourneyFileInfo(
mc, mc.CreateString(ljf.path_.filename().string()),
ljf.last_modified_, ljf.matched_journeys_,
ljf.unmatched_journeys_, ljf.unmatched_journeys_rerouted_,
ljf.matched_groups_, ljf.unmatched_groups_,
ljf.unmatched_groups_rerouted_, ljf.matched_pax_,
ljf.unmatched_pax_, ljf.unmatched_pax_rerouted_);
ljf.last_modified_, ljf.matched_journey_count_,
ljf.unmatched_journey_count_,
ljf.unmatched_journey_rerouted_count_,
ljf.matched_group_count_, ljf.unmatched_group_count_,
ljf.unmatched_group_rerouted_count_, ljf.matched_pax_count_,
ljf.unmatched_pax_count_,
ljf.unmatched_pax_rerouted_count_);
})),
mc.CreateVector(utl::to_vec(
data.loaded_capacity_files_,
Expand Down
175 changes: 175 additions & 0 deletions modules/paxmon/src/delayed_init.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#include "motis/paxmon/delayed_init.h"

#include <cstdint>
#include <algorithm>
#include <limits>
#include <memory>

#include "utl/verify.h"

#include "motis/core/common/logging.h"
#include "motis/core/access/time_access.h"
#include "motis/core/journey/message_to_journeys.h"

#include "motis/module/context/motis_call.h"
#include "motis/module/message.h"

#include "motis/paxmon/loader/motis_journeys/motis_journeys.h"

using namespace motis::module;
using namespace motis::logging;

namespace motis::paxmon {

msg_ptr initial_reroute_query(schedule const& sched,
loader::unmatched_journey const& uj,
std::string const& router,
unixtime const min_time,
unixtime const max_time) {
using namespace motis::routing;

message_creator fbb;

auto const planned_departure = static_cast<unixtime>(
motis_to_unixtime(sched.schedule_begin_, uj.departure_time_));

// should be ensured while loading the journey
utl::verify(planned_departure >= min_time && planned_departure <= max_time,
"initial_reroute_query: departure time out of range");

auto const interval =
Interval{std::max(min_time, planned_departure - 2 * 60 * 60),
std::min(max_time, planned_departure + 2 * 60 * 60)};
auto const& start_station = sched.stations_.at(uj.start_station_idx_);
auto const& destination_station =
sched.stations_.at(uj.destination_station_idx_);
fbb.create_and_finish(
MsgContent_RoutingRequest,
CreateRoutingRequest(
fbb, Start_PretripStart,
CreatePretripStart(
fbb,
CreateInputStation(fbb, fbb.CreateString(start_station->eva_nr_),
fbb.CreateString(start_station->name_)),
&interval)
.Union(),
CreateInputStation(fbb,
fbb.CreateString(destination_station->eva_nr_),
fbb.CreateString(destination_station->name_)),
SearchType_Default, SearchDir_Forward,
fbb.CreateVector(std::vector<flatbuffers::Offset<Via>>{}),
fbb.CreateVector(
std::vector<flatbuffers::Offset<AdditionalEdgeWrapper>>{}))
.Union(),
router);
return make_msg(fbb);
}

journey const* get_closest_journey(schedule const& sched,
loader::unmatched_journey const& uj,
std::vector<journey> const& journeys) {
journey const* best_journey = nullptr;
auto best_score = std::numeric_limits<std::int64_t>::max();
auto const planned_dep = static_cast<std::int64_t>(
motis_to_unixtime(sched.schedule_begin_, uj.departure_time_));
auto const planned_arr = static_cast<std::int64_t>(
motis_to_unixtime(sched.schedule_begin_, uj.arrival_time_));

for (auto const& j : journeys) {
if (j.stops_.empty()) {
continue;
}
auto const dep_diff = static_cast<std::int64_t>(
j.stops_.front().departure_.schedule_timestamp_) -
planned_dep;
auto const arr_diff = static_cast<std::int64_t>(
j.stops_.back().arrival_.schedule_timestamp_) -
planned_arr;
auto const score = dep_diff * dep_diff + arr_diff * arr_diff;
if (score < best_score) {
best_score = score;
best_journey = &j;
}
}

return best_journey;
}

void delayed_init(paxmon_data& data, universe& uv, schedule const& sched,
delay_init_options const& opt) {
using namespace motis::ris;
using namespace motis::routing;

auto const ris_status_msg = motis_call(make_no_msg("/ris/status"))->val();
auto const ris_status = motis_content(RISStatusResponse, ris_status_msg);

if (!ris_status->delayed_init()) {
LOG(warn) << "required option ris.delayed_init=1 not set, rerouting "
"unmatched journeys not possible";
return;
}

if (opt.reroute_unmatched_) {
auto const min_time = external_schedule_begin(sched);
auto const max_time = external_schedule_end(sched);

for (auto& ljf : data.loaded_journey_files_) {
if (ljf.unmatched_journeys_.empty()) {
continue;
}

scoped_timer const timer{"reroute unmatched journeys"};
LOG(info) << "routing " << ljf.unmatched_journeys_.size()
<< " unmatched journeys from " << ljf.path_.filename()
<< " using " << opt.initial_reroute_router_ << "...";
auto const futures =
utl::to_vec(ljf.unmatched_journeys_, [&](auto const& uj) {
return motis_call(initial_reroute_query(
sched, uj, opt.initial_reroute_router_, min_time, max_time));
});
ctx::await_all(futures);
LOG(info) << "adding replacement journeys...";
for (auto const& [uj, fut] : utl::zip(ljf.unmatched_journeys_, futures)) {
auto const rr_msg = fut->val();
auto const rr = motis_content(RoutingResponse, rr_msg);
auto const journeys = message_to_journeys(rr);
auto const* journey = get_closest_journey(sched, uj, journeys);
if (journey == nullptr) {
continue;
}
++ljf.unmatched_journey_rerouted_count_;
ljf.unmatched_pax_rerouted_count_ += uj.passengers_;

if (uj.group_sizes_.empty()) {
loader::motis_journeys::load_journey(
sched, uv, *journey, uj.source_, uj.passengers_,
route_source_flags::MATCH_REROUTED);
++ljf.unmatched_group_rerouted_count_;
} else {
auto source = uj.source_;
for (auto const& group_size : uj.group_sizes_) {
loader::motis_journeys::load_journey(
sched, uv, *journey, source, group_size,
route_source_flags::MATCH_REROUTED);
++source.secondary_ref_;
++ljf.unmatched_group_rerouted_count_;
}
}
}
ljf.unmatched_journeys_.clear();
}
} else {
for (auto& ljf : data.loaded_journey_files_) {
if (!ljf.unmatched_journeys_.empty()) {
LOG(warn) << "ignoring " << ljf.unmatched_journeys_.size()
<< " unmatched journeys from " << ljf.path_.filename()
<< ", set paxmon.reroute_unmatched=1 to enable rerouting";
ljf.unmatched_journeys_.clear();
}
}
}

motis_call(make_no_msg("/ris/delayed_init"))->val();
}

} // namespace motis::paxmon
Loading