Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented jsprit parallelization as queue #3236

Merged
merged 3 commits into from
Apr 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,97 +195,29 @@ public static void runJsprit(Scenario scenario) throws ExecutionException, Inter
carrierActivityCounterMap.put(carrier.getId(), carrierActivityCounterMap.getOrDefault(carrier.getId(), 0) + 2*carrier.getShipments().size());
}

HashMap<Id<Carrier>, Integer> sortedMap = carrierActivityCounterMap.entrySet().stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2, LinkedHashMap::new));

AtomicInteger startedVRPCounter = new AtomicInteger(0);
AtomicInteger solvedVRPCounter = new AtomicInteger(0);

ArrayList<Id<Carrier>> tempList = new ArrayList<>(sortedMap.keySet());

int nThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
int nThreads = Runtime.getRuntime().availableProcessors();
log.info("Starting VRP solving for {} carriers in parallel with {} threads.", carriers.getCarriers().size(), nThreads);

List<List<Id<Carrier>>> splitList = splitListAlternating(nThreads, tempList);
log.info("Distribution of carriers on threads: {}", splitList.stream().map(List::size).toList());
List<Future<?>> futureList = new ArrayList<>();
for (List<Id<Carrier>> subList : splitList) {
futureList.add(executorService.submit(() -> subList.forEach(carrierId -> {
Carrier carrier = carriers.getCarriers().get(carrierId);

double start = System.currentTimeMillis();
if (!carrier.getServices().isEmpty())
log.info("Start tour planning for {} which has {} services", carrier.getId(), carrier.getServices().size());
else if (!carrier.getShipments().isEmpty())
log.info("Start tour planning for {} which has {} shipments", carrier.getId(), carrier.getShipments().size());

startedVRPCounter.incrementAndGet();
log.info("started VRP solving for carrier number {} out of {} carriers. Current thread id: {}", startedVRPCounter.get(), carriers.getCarriers()
.size(), Thread.currentThread()
.getId());

VehicleRoutingProblem problem = MatsimJspritFactory.createRoutingProblemBuilder(carrier, scenario.getNetwork())
.setRoutingCost(netBasedCosts).build();
VehicleRoutingAlgorithm algorithm = MatsimJspritFactory.loadOrCreateVehicleRoutingAlgorithm(scenario, freightCarriersConfigGroup, netBasedCosts, problem);

algorithm.getAlgorithmListeners().addListener(new StopWatch(), VehicleRoutingAlgorithmListeners.Priority.HIGH);
int jspritIterations = getJspritIterations(carrier);
try {
if (jspritIterations > 0) {
algorithm.setMaxIterations(jspritIterations);
} else {
throw new InvalidAttributeValueException(
"Carrier has invalid number of jsprit iterations. They must be positive! Carrier id: "
+ carrier.getId().toString());
}
} catch (Exception e) {
throw new RuntimeException(e);
// e.printStackTrace();
}

VehicleRoutingProblemSolution solution = Solutions.bestOf(algorithm.searchSolutions());

log.info("tour planning for carrier {} took {} seconds.", carrier.getId(), (System.currentTimeMillis() - start) / 1000);
ThreadPoolExecutor executor = new JspritTreadPoolExecutor(new PriorityBlockingQueue<>(), nThreads);

CarrierPlan newPlan = MatsimJspritFactory.createPlan(carrier, solution);
// yy In principle, the carrier should know the vehicle types that it can deploy.
List<Future<?>> futures = new ArrayList<>();
List<Map.Entry<Id<Carrier>, Integer>> sorted = carrierActivityCounterMap.entrySet().stream()
.sorted(Map.Entry.comparingByValue((o1, o2) -> o2 - o1))
.toList();

log.info("routing plan for carrier {}", carrier.getId());
NetworkRouter.routePlan(newPlan, netBasedCosts);
solvedVRPCounter.incrementAndGet();
double timeForPlanningAndRouting = (System.currentTimeMillis() - start) / 1000;
log.info("routing for carrier {} finished. Tour planning plus routing took {} seconds.", carrier.getId(),
timeForPlanningAndRouting);
log.info("solved {} out of {} carriers.", solvedVRPCounter.get(), carriers.getCarriers().size());

carrier.setSelectedPlan(newPlan);
setJspritComputationTime(carrier, timeForPlanningAndRouting);
})));
for (Map.Entry<Id<Carrier>, Integer> entry : sorted){
JspritCarrierTask task = new JspritCarrierTask(entry.getValue(), carriers.getCarriers().get(entry.getKey()), scenario, netBasedCosts, startedVRPCounter, carriers.getCarriers().size());
log.info("Adding task for carrier {} with priority {}", entry.getKey(), entry.getValue());
futures.add(executor.submit(task));
}

for (Future<?> future : futureList) {
for (Future<?> future : futures) {
future.get();
}
}

// split tempList in nThreads parts such that it is split to 1,2,...,n,1,2,..
private static List<List<Id<Carrier>>> splitListAlternating(int nThreads, ArrayList<Id<Carrier>> tempList) {
List<List<Id<Carrier>>> splitList = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
List<Id<Carrier>> subList = new ArrayList<>();
for (int j = i; j < tempList.size(); j += nThreads) {
subList.add(tempList.get(j));
}
splitList.add(subList);
}

assert splitList.size() == nThreads;
assert splitList.stream().mapToInt(List::size).sum() == tempList.size();
return splitList;
}

/**
* Creates a new {@link Carriers} container only with {@link CarrierShipment}s
* for creating a new VRP. As consequence of the transformation of
Expand Down Expand Up @@ -667,4 +599,100 @@ public static void writeCarriers(Carriers carriers, String filename ) {
new CarrierPlanWriter( carriers ).write( filename );
}

static class JspritCarrierTask implements Runnable{
private final int priority;
private final Carrier carrier;
private final Scenario scenario;
private final NetworkBasedTransportCosts netBasedCosts;
private final AtomicInteger startedVRPCounter;
private final int taskCount;

public JspritCarrierTask(int priority, Carrier carrier, Scenario scenario, NetworkBasedTransportCosts netBasedCosts, AtomicInteger startedVRPCounter, int taskCount) {
this.priority = priority;
this.carrier = carrier;
this.scenario = scenario;
this.netBasedCosts = netBasedCosts;
this.startedVRPCounter = startedVRPCounter;
this.taskCount = taskCount;
}

public int getPriority() {
return priority;
}

@Override
public void run() {
FreightCarriersConfigGroup freightCarriersConfigGroup = ConfigUtils.addOrGetModule( scenario.getConfig(), FreightCarriersConfigGroup.class );

double start = System.currentTimeMillis();
if (!carrier.getServices().isEmpty())
log.info("Start tour planning for {} which has {} services", carrier.getId(), carrier.getServices().size());
else if (!carrier.getShipments().isEmpty())
log.info("Start tour planning for {} which has {} shipments", carrier.getId(), carrier.getShipments().size());

startedVRPCounter.incrementAndGet();
log.info("started VRP solving for carrier number {} out of {} carriers. Thread id: {}. Priority: {}", startedVRPCounter.get(), taskCount, Thread.currentThread().getId(), this.priority);

VehicleRoutingProblem problem = MatsimJspritFactory.createRoutingProblemBuilder(carrier, scenario.getNetwork())
.setRoutingCost(netBasedCosts).build();
VehicleRoutingAlgorithm algorithm = MatsimJspritFactory.loadOrCreateVehicleRoutingAlgorithm(scenario, freightCarriersConfigGroup, netBasedCosts, problem);

algorithm.getAlgorithmListeners().addListener(new StopWatch(), VehicleRoutingAlgorithmListeners.Priority.HIGH);
int jspritIterations = getJspritIterations(carrier);
try {
if (jspritIterations > 0) {
algorithm.setMaxIterations(jspritIterations);
} else {
throw new InvalidAttributeValueException(
"Carrier has invalid number of jsprit iterations. They must be positive! Carrier id: "
+ carrier.getId().toString());
}
} catch (Exception e) {
throw new RuntimeException(e);
}

VehicleRoutingProblemSolution solution = Solutions.bestOf(algorithm.searchSolutions());

log.info("tour planning for carrier {} took {} seconds.", carrier.getId(), (System.currentTimeMillis() - start) / 1000);

CarrierPlan newPlan = MatsimJspritFactory.createPlan(carrier, solution);
// yy In principle, the carrier should know the vehicle types that it can deploy.

log.info("routing plan for carrier {}", carrier.getId());
NetworkRouter.routePlan(newPlan, netBasedCosts);
double timeForPlanningAndRouting = (System.currentTimeMillis() - start) / 1000;
log.info("routing for carrier {} finished. Tour planning plus routing took {} seconds. Thread id: {}", carrier.getId(),
timeForPlanningAndRouting, Thread.currentThread().getId());

carrier.setSelectedPlan(newPlan);
setJspritComputationTime(carrier, timeForPlanningAndRouting);
}
}

// we need this class because otherwise there is a runtime error in the PriorityBlockingQueue
// https://jvmaware.com/priority-queue-and-threadpool/
private static class JspritTreadPoolExecutor extends ThreadPoolExecutor{
public JspritTreadPoolExecutor(BlockingQueue<Runnable> workQueue, int nThreads) {
super(nThreads, nThreads, 0, TimeUnit.SECONDS, workQueue);
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new CustomFutureTask<>(runnable);
}
}

private static class CustomFutureTask<T> extends FutureTask<T> implements Comparable<CustomFutureTask<T>>{
private final JspritCarrierTask task;

public CustomFutureTask(Runnable task) {
super(task, null);
this.task = (JspritCarrierTask) task;
}

@Override
public int compareTo(CustomFutureTask that) {
return that.task.getPriority() - this.task.getPriority();
}
}
}
Loading