Skip to content

Commit

Permalink
Added Avro file format to AirPollutionAnalysis
Browse files Browse the repository at this point in the history
  • Loading branch information
frievoe97 committed Jun 3, 2024
1 parent 798f5d3 commit 8b87a9e
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import it.unimi.dsi.fastutil.objects.Object2DoubleLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2DoubleMap;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.logging.log4j.LogManager;
Expand All @@ -14,6 +18,8 @@
import org.matsim.application.ApplicationUtils;
import org.matsim.application.CommandSpec;
import org.matsim.application.MATSimAppCommand;
import org.matsim.application.avro.AvroNetwork;
import org.matsim.application.avro.XYTData;
import org.matsim.application.options.InputOptions;
import org.matsim.application.options.OutputOptions;
import org.matsim.application.options.SampleOptions;
Expand Down Expand Up @@ -43,20 +49,19 @@
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.*;

@CommandLine.Command(
name = "air-pollution", description = "General air pollution analysis.",
mixinStandardHelpOptions = true, showDefaultValues = true
)
@CommandSpec(requireRunDirectory = true,
produces = {
"emissions_total.csv", "emissions_grid_per_day.csv", "emissions_per_link.csv",
"emissions_per_link_per_m.csv",
"emissions_grid_per_hour.csv",
"emissions_total.csv", "emissions_per_link.%s",
"emissions_per_link_per_m.%s",
"emissions_grid_per_hour.%s",
"emissions_vehicle_info.csv",
"emissions_grid_per_day.%s"
}
)
public class AirPollutionAnalysis implements MATSimAppCommand {
Expand Down Expand Up @@ -127,9 +132,11 @@ public void install() {

writeTotal(filteredNetwork, emissionsEventHandler);

writeRaster(filteredNetwork, config, emissionsEventHandler);
// writeRaster(filteredNetwork, config, emissionsEventHandler);
writeAvroRaster(filteredNetwork, config, emissionsEventHandler);

writeTimeDependentRaster(filteredNetwork, config, emissionsEventHandler);
writeTimeDependentAvroRaster(filteredNetwork, config, emissionsEventHandler);
// writeTimeDependentRaster(filteredNetwork, config, emissionsEventHandler);

return 0;
}
Expand Down Expand Up @@ -161,8 +168,8 @@ private void writeOutput(Network network, EmissionsOnLinkEventHandler emissionsE
nf.setMaximumFractionDigits(4);
nf.setGroupingUsed(false);

CSVPrinter absolute = new CSVPrinter(Files.newBufferedWriter(output.getPath("emissions_per_link.csv")), CSVFormat.DEFAULT);
CSVPrinter perMeter = new CSVPrinter(Files.newBufferedWriter(output.getPath("emissions_per_link_per_m.csv")), CSVFormat.DEFAULT);
CSVPrinter absolute = new CSVPrinter(Files.newBufferedWriter(output.getPath("emissions_per_link.%s", "csv")), CSVFormat.DEFAULT);
CSVPrinter perMeter = new CSVPrinter(Files.newBufferedWriter(output.getPath("emissions_per_link_per_m.%s", "csv")), CSVFormat.DEFAULT);

absolute.print("linkId");
perMeter.print("linkId");
Expand Down Expand Up @@ -241,6 +248,62 @@ private void writeTotal(Network network, EmissionsOnLinkEventHandler emissionsEv
}
}

/**
* Creates the data for the XY-Time plot. The time is fixed and the data is summarized over the run.
* Currently only the CO2_Total Values is printed because Simwrapper can handle only one value.
*/
private void writeAvroRaster(Network network, Config config, EmissionsOnLinkEventHandler emissionsEventHandler) {

String crs = ProjectionUtils.getCRS(network);
if (crs == null)
crs = config.network().getInputCRS();
if (crs == null)
crs = config.global().getCoordinateSystem();

XYTData avroData = new XYTData();
avroData.setCrs(crs);

Map<Pollutant, Raster> rasterMap = FastEmissionGridAnalyzer.processHandlerEmissions(emissionsEventHandler.getLink2pollutants(), network, gridSize, 20);
List<Integer> xLength = rasterMap.values().stream().map(Raster::getXLength).distinct().toList();
List<Integer> yLength = rasterMap.values().stream().map(Raster::getYLength).distinct().toList();
Raster raster = rasterMap.values().stream().findFirst().orElseThrow();

List<Float> xCoords = new ArrayList<>();
List<Float> yCoords = new ArrayList<>();
Map<CharSequence, List<Float>> values = new HashMap<>();
List<Float> valuesList = new ArrayList<>();
List<Integer> times = new ArrayList<>();

times.add(0);

for (int xi = 0; xi < xLength.get(0); xi++) {
for (int yi = 0; yi < yLength.get(0); yi++) {
Coord coord = raster.getCoordForIndex(xi, yi);
double value = rasterMap.get(Pollutant.CO2_TOTAL).getValueByIndex(xi, yi);
if (xi == 0) yCoords.add((float) coord.getY());
if (yi == 0) xCoords.add((float) coord.getX());
valuesList.add((float) value);
}
}


values.put(String.valueOf(Pollutant.CO2_TOTAL), valuesList);

avroData.setYCoords(yCoords);
avroData.setXCoords(xCoords);
avroData.setData(values);
avroData.setTimestamps(times);

DatumWriter<XYTData> datumWriter = new SpecificDatumWriter<>(XYTData.class);
try (DataFileWriter<XYTData> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.setCodec(CodecFactory.deflateCodec(9));
dataFileWriter.create(avroData.getSchema(), IOUtils.getOutputStream(IOUtils.getFileUrl(output.getPath("emissions_grid_per_day.%s", "avro").toString()), false));
dataFileWriter.append(avroData);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
* Creates the data for the XY-Time plot. The time is fixed and the data is summarized over the run.
* Currently only the CO2_Total Values is printed because Simwrapper can handle only one value.
Expand All @@ -254,7 +317,7 @@ private void writeRaster(Network network, Config config, EmissionsOnLinkEventHan

Raster raster = rasterMap.values().stream().findFirst().orElseThrow();

try (CSVPrinter printer = new CSVPrinter(Files.newBufferedWriter(output.getPath("emissions_grid_per_day.csv")),
try (CSVPrinter printer = new CSVPrinter(Files.newBufferedWriter(output.getPath("emissions_grid_per_day.%s", "csv")),
CSVFormat.DEFAULT.builder().setCommentMarker('#').build())) {

String crs = ProjectionUtils.getCRS(network);
Expand Down Expand Up @@ -310,7 +373,7 @@ private void writeTimeDependentRaster(Network network, Config config, EmissionsO

Raster raster = firstBin.values().stream().findFirst().orElseThrow();

try (CSVPrinter printer = new CSVPrinter(IOUtils.getBufferedWriter(output.getPath("emissions_grid_per_hour.csv").toString()),
try (CSVPrinter printer = new CSVPrinter(IOUtils.getBufferedWriter(output.getPath("emissions_grid_per_hour.%s", "csv").toString()),
CSVFormat.DEFAULT.builder().setCommentMarker('#').build())) {

String crs = ProjectionUtils.getCRS(network);
Expand Down Expand Up @@ -358,4 +421,61 @@ private void writeTimeDependentRaster(Network network, Config config, EmissionsO

}

private void writeTimeDependentAvroRaster(Network network, Config config, EmissionsOnLinkEventHandler emissionsEventHandler) {

TimeBinMap<Map<Pollutant, Raster>> timeBinMap = FastEmissionGridAnalyzer.processHandlerEmissionsPerTimeBin(emissionsEventHandler.getTimeBins(), network, gridSize, 20);

String crs = ProjectionUtils.getCRS(network);
if (crs == null)
crs = config.network().getInputCRS();
if (crs == null)
crs = config.global().getCoordinateSystem();

XYTData avroData = new XYTData();
avroData.setCrs(crs);

Map<Pollutant, Raster> rasterMap = FastEmissionGridAnalyzer.processHandlerEmissions(emissionsEventHandler.getLink2pollutants(), network, gridSize, 20);
List<Integer> xLength = rasterMap.values().stream().map(Raster::getXLength).distinct().toList();
List<Integer> yLength = rasterMap.values().stream().map(Raster::getYLength).distinct().toList();
Raster raster = rasterMap.values().stream().findFirst().orElseThrow();

List<Float> xCoords = new ArrayList<>();
List<Float> yCoords = new ArrayList<>();
Map<CharSequence, List<Float>> values = new HashMap<>();
List<Float> valuesList = new ArrayList<>();
List<Integer> times = new ArrayList<>();

timeBinMap.getTimeBins().forEach(timeBin -> {
times.add((int) timeBin.getStartTime());
});

for (int xi = 0; xi < xLength.get(0); xi++) {
for (int yi = 0; yi < yLength.get(0); yi++) {
Coord coord = raster.getCoordForIndex(xi, yi);
if (xi == 0) yCoords.add((float) coord.getY());
if (yi == 0) xCoords.add((float) coord.getX());
for (TimeBinMap.TimeBin<Map<Pollutant, Raster>> timeBin : timeBinMap.getTimeBins()) {
valuesList.add((float) timeBin.getValue().get(Pollutant.CO2_TOTAL).getValueByIndex(xi, yi));
}
}
}

values.put(String.valueOf(Pollutant.CO2_TOTAL), valuesList);

avroData.setYCoords(yCoords);
avroData.setXCoords(xCoords);
avroData.setData(values);
avroData.setTimestamps(times);

DatumWriter<XYTData> datumWriter = new SpecificDatumWriter<>(XYTData.class);
try (DataFileWriter<XYTData> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.setCodec(CodecFactory.deflateCodec(9));
dataFileWriter.create(avroData.getSchema(), IOUtils.getOutputStream(IOUtils.getFileUrl(output.getPath("emissions_grid_per_hour.%s", "avro").toString()), false));
dataFileWriter.append(avroData);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.matsim.simwrapper.dashboard;

import org.matsim.application.analysis.emissions.AirPollutionAnalysis;
import org.matsim.application.analysis.noise.NoiseAnalysis;
import org.matsim.application.prepare.network.CreateGeoJsonNetwork;
import org.matsim.simwrapper.Dashboard;
import org.matsim.simwrapper.Header;
Expand Down Expand Up @@ -36,7 +37,8 @@ public void configure(Header header, Layout layout) {
viz.title = "Emissions per Link per Meter";
viz.description = "Displays the emissions for each link per meter.";
viz.height = 12.;
viz.datasets.csvFile = data.compute(AirPollutionAnalysis.class, "emissions_per_link_per_m.csv");
// viz.datasets.csvFile = data.compute(AirPollutionAnalysis.class, "emissions_per_link_per_m.csv");
viz.datasets.csvFile = data.computeWithPlaceholder(AirPollutionAnalysis.class, "emissions_per_link_per_m.%s", "csv");
viz.network = data.compute(CreateGeoJsonNetwork.class, "network.geojson");
viz.display.color.columnName = "CO2_TOTAL [g/m]";
viz.display.color.dataset = "csvFile";
Expand All @@ -61,7 +63,8 @@ public void configure(Header header, Layout layout) {
viz.center = data.context().getCenter();

viz.setColorRamp("greenRed", 10, false);
viz.file = data.compute(AirPollutionAnalysis.class, "emissions_grid_per_day.csv");
// viz.file = data.compute(AirPollutionAnalysis.class, "emissions_grid_per_day.csv");
viz.file = data.computeWithPlaceholder(AirPollutionAnalysis.class, "emissions_grid_per_day.%s", "avro");
});

layout.row("third")
Expand All @@ -77,7 +80,7 @@ public void configure(Header header, Layout layout) {
viz.center = data.context().getCenter();

viz.setColorRamp("greenRed", 10, false);
viz.file = data.compute(AirPollutionAnalysis.class, "emissions_grid_per_hour.csv");
viz.file = data.computeWithPlaceholder(AirPollutionAnalysis.class, "emissions_grid_per_hour.%s", "avro");
});


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void generate() {

Assertions.assertThat(out)
.isDirectoryContaining("glob:**emissions_total.csv")
.isDirectoryContaining("glob:**emissions_grid_per_day.csv");
.isDirectoryContaining("glob:**emissions_grid_per_day.avro");

}

Expand Down

0 comments on commit 8b87a9e

Please sign in to comment.