diff --git a/contribs/freight/src/main/java/org/matsim/freight/carriers/CarriersUtils.java b/contribs/freight/src/main/java/org/matsim/freight/carriers/CarriersUtils.java index d6d182c9a7c..9b594c05f6b 100644 --- a/contribs/freight/src/main/java/org/matsim/freight/carriers/CarriersUtils.java +++ b/contribs/freight/src/main/java/org/matsim/freight/carriers/CarriersUtils.java @@ -195,97 +195,29 @@ public static void runJsprit(Scenario scenario) throws ExecutionException, Inter carrierActivityCounterMap.put(carrier.getId(), carrierActivityCounterMap.getOrDefault(carrier.getId(), 0) + 2*carrier.getShipments().size()); } - HashMap, Integer> sortedMap = carrierActivityCounterMap.entrySet().stream() - .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2, LinkedHashMap::new)); - AtomicInteger startedVRPCounter = new AtomicInteger(0); - AtomicInteger solvedVRPCounter = new AtomicInteger(0); - - ArrayList> tempList = new ArrayList<>(sortedMap.keySet()); - int nThreads = Runtime.getRuntime().availableProcessors(); - ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + int nThreads = Runtime.getRuntime().availableProcessors(); log.info("Starting VRP solving for {} carriers in parallel with {} threads.", carriers.getCarriers().size(), nThreads); - List>> splitList = splitListAlternating(nThreads, tempList); - log.info("Distribution of carriers on threads: {}", splitList.stream().map(List::size).toList()); - List> futureList = new ArrayList<>(); - for (List> subList : splitList) { - futureList.add(executorService.submit(() -> subList.forEach(carrierId -> { - Carrier carrier = carriers.getCarriers().get(carrierId); - - double start = System.currentTimeMillis(); - if (!carrier.getServices().isEmpty()) - log.info("Start tour planning for {} which has {} services", carrier.getId(), carrier.getServices().size()); - else if (!carrier.getShipments().isEmpty()) - log.info("Start tour planning for {} which has {} shipments", carrier.getId(), carrier.getShipments().size()); - - startedVRPCounter.incrementAndGet(); - log.info("started VRP solving for carrier number {} out of {} carriers. Current thread id: {}", startedVRPCounter.get(), carriers.getCarriers() - .size(), Thread.currentThread() - .getId()); - - VehicleRoutingProblem problem = MatsimJspritFactory.createRoutingProblemBuilder(carrier, scenario.getNetwork()) - .setRoutingCost(netBasedCosts).build(); - VehicleRoutingAlgorithm algorithm = MatsimJspritFactory.loadOrCreateVehicleRoutingAlgorithm(scenario, freightCarriersConfigGroup, netBasedCosts, problem); - - algorithm.getAlgorithmListeners().addListener(new StopWatch(), VehicleRoutingAlgorithmListeners.Priority.HIGH); - int jspritIterations = getJspritIterations(carrier); - try { - if (jspritIterations > 0) { - algorithm.setMaxIterations(jspritIterations); - } else { - throw new InvalidAttributeValueException( - "Carrier has invalid number of jsprit iterations. They must be positive! Carrier id: " - + carrier.getId().toString()); - } - } catch (Exception e) { - throw new RuntimeException(e); - // e.printStackTrace(); - } - - VehicleRoutingProblemSolution solution = Solutions.bestOf(algorithm.searchSolutions()); - - log.info("tour planning for carrier {} took {} seconds.", carrier.getId(), (System.currentTimeMillis() - start) / 1000); + ThreadPoolExecutor executor = new JspritTreadPoolExecutor(new PriorityBlockingQueue<>(), nThreads); - CarrierPlan newPlan = MatsimJspritFactory.createPlan(carrier, solution); - // yy In principle, the carrier should know the vehicle types that it can deploy. + List> futures = new ArrayList<>(); + List, Integer>> sorted = carrierActivityCounterMap.entrySet().stream() + .sorted(Map.Entry.comparingByValue((o1, o2) -> o2 - o1)) + .toList(); - log.info("routing plan for carrier {}", carrier.getId()); - NetworkRouter.routePlan(newPlan, netBasedCosts); - solvedVRPCounter.incrementAndGet(); - double timeForPlanningAndRouting = (System.currentTimeMillis() - start) / 1000; - log.info("routing for carrier {} finished. Tour planning plus routing took {} seconds.", carrier.getId(), - timeForPlanningAndRouting); - log.info("solved {} out of {} carriers.", solvedVRPCounter.get(), carriers.getCarriers().size()); - - carrier.setSelectedPlan(newPlan); - setJspritComputationTime(carrier, timeForPlanningAndRouting); - }))); + for (Map.Entry, Integer> entry : sorted){ + JspritCarrierTask task = new JspritCarrierTask(entry.getValue(), carriers.getCarriers().get(entry.getKey()), scenario, netBasedCosts, startedVRPCounter, carriers.getCarriers().size()); + log.info("Adding task for carrier {} with priority {}", entry.getKey(), entry.getValue()); + futures.add(executor.submit(task)); } - for (Future future : futureList) { + for (Future future : futures) { future.get(); } } - // split tempList in nThreads parts such that it is split to 1,2,...,n,1,2,.. - private static List>> splitListAlternating(int nThreads, ArrayList> tempList) { - List>> splitList = new ArrayList<>(); - for (int i = 0; i < nThreads; i++) { - List> subList = new ArrayList<>(); - for (int j = i; j < tempList.size(); j += nThreads) { - subList.add(tempList.get(j)); - } - splitList.add(subList); - } - - assert splitList.size() == nThreads; - assert splitList.stream().mapToInt(List::size).sum() == tempList.size(); - return splitList; - } - /** * Creates a new {@link Carriers} container only with {@link CarrierShipment}s * for creating a new VRP. As consequence of the transformation of @@ -667,4 +599,100 @@ public static void writeCarriers(Carriers carriers, String filename ) { new CarrierPlanWriter( carriers ).write( filename ); } + static class JspritCarrierTask implements Runnable{ + private final int priority; + private final Carrier carrier; + private final Scenario scenario; + private final NetworkBasedTransportCosts netBasedCosts; + private final AtomicInteger startedVRPCounter; + private final int taskCount; + + public JspritCarrierTask(int priority, Carrier carrier, Scenario scenario, NetworkBasedTransportCosts netBasedCosts, AtomicInteger startedVRPCounter, int taskCount) { + this.priority = priority; + this.carrier = carrier; + this.scenario = scenario; + this.netBasedCosts = netBasedCosts; + this.startedVRPCounter = startedVRPCounter; + this.taskCount = taskCount; + } + + public int getPriority() { + return priority; + } + + @Override + public void run() { + FreightCarriersConfigGroup freightCarriersConfigGroup = ConfigUtils.addOrGetModule( scenario.getConfig(), FreightCarriersConfigGroup.class ); + + double start = System.currentTimeMillis(); + if (!carrier.getServices().isEmpty()) + log.info("Start tour planning for {} which has {} services", carrier.getId(), carrier.getServices().size()); + else if (!carrier.getShipments().isEmpty()) + log.info("Start tour planning for {} which has {} shipments", carrier.getId(), carrier.getShipments().size()); + + startedVRPCounter.incrementAndGet(); + log.info("started VRP solving for carrier number {} out of {} carriers. Thread id: {}. Priority: {}", startedVRPCounter.get(), taskCount, Thread.currentThread().getId(), this.priority); + + VehicleRoutingProblem problem = MatsimJspritFactory.createRoutingProblemBuilder(carrier, scenario.getNetwork()) + .setRoutingCost(netBasedCosts).build(); + VehicleRoutingAlgorithm algorithm = MatsimJspritFactory.loadOrCreateVehicleRoutingAlgorithm(scenario, freightCarriersConfigGroup, netBasedCosts, problem); + + algorithm.getAlgorithmListeners().addListener(new StopWatch(), VehicleRoutingAlgorithmListeners.Priority.HIGH); + int jspritIterations = getJspritIterations(carrier); + try { + if (jspritIterations > 0) { + algorithm.setMaxIterations(jspritIterations); + } else { + throw new InvalidAttributeValueException( + "Carrier has invalid number of jsprit iterations. They must be positive! Carrier id: " + + carrier.getId().toString()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + VehicleRoutingProblemSolution solution = Solutions.bestOf(algorithm.searchSolutions()); + + log.info("tour planning for carrier {} took {} seconds.", carrier.getId(), (System.currentTimeMillis() - start) / 1000); + + CarrierPlan newPlan = MatsimJspritFactory.createPlan(carrier, solution); + // yy In principle, the carrier should know the vehicle types that it can deploy. + + log.info("routing plan for carrier {}", carrier.getId()); + NetworkRouter.routePlan(newPlan, netBasedCosts); + double timeForPlanningAndRouting = (System.currentTimeMillis() - start) / 1000; + log.info("routing for carrier {} finished. Tour planning plus routing took {} seconds. Thread id: {}", carrier.getId(), + timeForPlanningAndRouting, Thread.currentThread().getId()); + + carrier.setSelectedPlan(newPlan); + setJspritComputationTime(carrier, timeForPlanningAndRouting); + } + } + + // we need this class because otherwise there is a runtime error in the PriorityBlockingQueue + // https://jvmaware.com/priority-queue-and-threadpool/ + private static class JspritTreadPoolExecutor extends ThreadPoolExecutor{ + public JspritTreadPoolExecutor(BlockingQueue workQueue, int nThreads) { + super(nThreads, nThreads, 0, TimeUnit.SECONDS, workQueue); + } + + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new CustomFutureTask<>(runnable); + } + } + + private static class CustomFutureTask extends FutureTask implements Comparable>{ + private final JspritCarrierTask task; + + public CustomFutureTask(Runnable task) { + super(task, null); + this.task = (JspritCarrierTask) task; + } + + @Override + public int compareTo(CustomFutureTask that) { + return that.task.getPriority() - this.task.getPriority(); + } + } }