Skip to content

Commit

Permalink
dvrp: fix non-deterministic PersonStuckEvent due to rejection
Browse files Browse the repository at this point in the history
Due to race conditions, DefaultPassengerEngine may get notified about a rejection
before, during or after its doSimStep() finishes.
This means the rejection may be processed in the current or next time step.

To overcome this non-determinism, all rejections from the current time step are processed
in the next time step.
  • Loading branch information
michalmac committed Nov 16, 2023
1 parent 39362b1 commit ef4b649
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,15 @@ public final class DefaultPassengerEngine implements PassengerEngine, PassengerR

//accessed in doSimStep() and handleDeparture() (no need to sync)
private final Map<Id<Request>, MobsimPassengerAgent> activePassengers = new HashMap<>();

// holds vehicle stop activities for requests that have not arrived at departure point yet
private final Map<Id<Request>, PassengerPickupActivity> waitingForPassenger = new HashMap<>();

//accessed in doSimStep() and handleEvent() (potential data races)
private final Queue<PassengerRequestRejectedEvent> rejectedRequestsEvents = new ConcurrentLinkedQueue<>();

DefaultPassengerEngine(String mode, EventsManager eventsManager, MobsimTimer mobsimTimer,
PassengerRequestCreator requestCreator, VrpOptimizer optimizer, Network network,
PassengerRequestValidator requestValidator, AdvanceRequestProvider advanceRequestProvider) {
DefaultPassengerEngine(String mode, EventsManager eventsManager, MobsimTimer mobsimTimer, PassengerRequestCreator requestCreator,
VrpOptimizer optimizer, Network network, PassengerRequestValidator requestValidator, AdvanceRequestProvider advanceRequestProvider) {
this.mode = mode;
this.mobsimTimer = mobsimTimer;
this.requestCreator = requestCreator;
Expand Down Expand Up @@ -106,17 +105,22 @@ public void doSimStep(double time) {
// event) after submission, but before departure, the PassengerEngine does not
// know this agent yet. Hence, we wait with setting the state to abort until the
// agent has arrived here (if ever).

Iterator<PassengerRequestRejectedEvent> iterator = rejectedRequestsEvents.iterator();

while (iterator.hasNext()) {
PassengerRequestRejectedEvent event = iterator.next();
MobsimPassengerAgent passenger = activePassengers.remove(event.getRequestId());
if (event.getTime() == time) {
// There is a potential race condition wrt processing rejection events between doSimStep() and handleEvent().
// To ensure a deterministic behaviour, we only process events from the previous time step.
break;
}

MobsimPassengerAgent passenger = activePassengers.remove(event.getRequestId());
if (passenger != null) {
// not much else can be done for immediate requests
// set the passenger agent to abort - the event will be thrown by the QSim
passenger.setStateToAbort(mobsimTimer.getTimeOfDay());
passenger.setStateToAbort(time);
internalInterface.arrangeNextAgentState(passenger);
iterator.remove();
}
Expand All @@ -137,20 +141,20 @@ public boolean handleDeparture(double now, MobsimAgent agent, Id<Link> fromLinkI
internalInterface.registerAdditionalAgentOnLink(passenger);

Id<Link> toLinkId = passenger.getDestinationLinkId();

// try to find a prebooked requests that is associated to this leg
Leg leg = (Leg) ((PlanAgent) passenger).getCurrentPlanElement();
Leg leg = (Leg)((PlanAgent)passenger).getCurrentPlanElement();
PassengerRequest request = advanceRequestProvider.retrieveRequest(agent, leg);

if (request == null) { // immediate request
Route route = ((Leg)((PlanAgent)passenger).getCurrentPlanElement()).getRoute();
request = requestCreator.createRequest(internalPassengerHandling.createRequestId(), passenger.getId(),
route, getLink(fromLinkId), getLink(toLinkId), now, now);
request = requestCreator.createRequest(internalPassengerHandling.createRequestId(), passenger.getId(), route, getLink(fromLinkId),
getLink(toLinkId), now, now);

// must come before validateAndSubmitRequest (to come before rejection event)
eventsManager.processEvent(new PassengerWaitingEvent(now, mode, request.getId(), request.getPassengerId()));
activePassengers.put(request.getId(), passenger);

validateAndSubmitRequest(passenger, request, now);
} else { // advance request
eventsManager.processEvent(new PassengerWaitingEvent(now, mode, request.getId(), request.getPassengerId()));
Expand All @@ -162,7 +166,7 @@ public boolean handleDeparture(double now, MobsimAgent agent, Id<Link> fromLinkI
pickupActivity.notifyPassengerIsReadyForDeparture(passenger, now);
}
}

return true;
}

Expand All @@ -182,21 +186,20 @@ private void validateAndSubmitRequest(MobsimPassengerAgent passenger, PassengerR

private Link getLink(Id<Link> linkId) {
return Preconditions.checkNotNull(network.getLinks().get(linkId),
"Link id=%s does not exist in network for mode %s. Agent departs from a link that does not belong to that network?",
linkId, mode);
"Link id=%s does not exist in network for mode %s. Agent departs from a link that does not belong to that network?", linkId, mode);
}

/**
* There are two ways of interacting with the PassengerEngine:
*
* <p>
* - (1) The stop activity tries to pick up a passenger and receives whether the
* pickup succeeded or not (see tryPickUpPassenger). In the classic
* implementation, the vehicle only calls tryPickUpPassenger at the time when it
* actually wants to pick up the person (at the end of the activity). It may
* happen that the person is not present yet. In that case, the pickup request
* is saved and notifyPassengerReady is called on the stop activity upen
* departure of the agent.
*
* <p>
* - (2) If pickup and dropoff times are handled more flexibly by the stop
* activity, it might want to detect whether an agent is ready to be picked up,
* then start an "interaction time" and only after perform the actual pickup.
Expand All @@ -210,15 +213,13 @@ public boolean notifyWaitForPassenger(PassengerPickupActivity pickupActivity, Mo
waitingForPassenger.put(requestId, pickupActivity);
return false;
}

return true;
}

@Override
public boolean tryPickUpPassenger(PassengerPickupActivity pickupActivity, MobsimDriverAgent driver,
Id<Request> requestId, double now) {
return internalPassengerHandling.tryPickUpPassenger(driver, activePassengers.get(requestId),
requestId, now);
public boolean tryPickUpPassenger(PassengerPickupActivity pickupActivity, MobsimDriverAgent driver, Id<Request> requestId, double now) {
return internalPassengerHandling.tryPickUpPassenger(driver, activePassengers.get(requestId), requestId, now);
}

@Override
Expand All @@ -243,10 +244,9 @@ public static Provider<PassengerEngine> createProvider(String mode) {

@Override
public DefaultPassengerEngine get() {
return new DefaultPassengerEngine(getMode(), eventsManager, mobsimTimer,
getModalInstance(PassengerRequestCreator.class), getModalInstance(VrpOptimizer.class),
getModalInstance(Network.class), getModalInstance(PassengerRequestValidator.class),
getModalInstance(AdvanceRequestProvider.class));
return new DefaultPassengerEngine(getMode(), eventsManager, mobsimTimer, getModalInstance(PassengerRequestCreator.class),
getModalInstance(VrpOptimizer.class), getModalInstance(Network.class), getModalInstance(PassengerRequestValidator.class),
getModalInstance(AdvanceRequestProvider.class));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void test_invalid_rejected() {
new PersonDepartureEvent(0, fixture.PERSON_ID, fixture.linkAB.getId(), MODE, MODE),
new PassengerWaitingEvent(departureTime, MODE, requestId, fixture.PERSON_ID),
new PassengerRequestRejectedEvent(0, MODE, requestId, fixture.PERSON_ID, "invalid"),
new PersonStuckEvent(0, fixture.PERSON_ID, fixture.linkAB.getId(), MODE));
new PersonStuckEvent(1, fixture.PERSON_ID, fixture.linkAB.getId(), MODE));
}

@Test
Expand All @@ -140,7 +140,7 @@ public void test_valid_rejected() {
new PersonDepartureEvent(0, fixture.PERSON_ID, fixture.linkAB.getId(), MODE, MODE),
new PassengerWaitingEvent(departureTime, MODE, requestId, fixture.PERSON_ID),
new PassengerRequestRejectedEvent(0, MODE, requestId, fixture.PERSON_ID, "rejecting_all_requests"),
new PersonStuckEvent(0, fixture.PERSON_ID, fixture.linkAB.getId(), MODE));
new PersonStuckEvent(1, fixture.PERSON_ID, fixture.linkAB.getId(), MODE));
}

private static class RejectingOneTaxiOptimizer implements VrpOptimizer {
Expand Down

0 comments on commit ef4b649

Please sign in to comment.