Skip to content

Commit

Permalink
Merge pull request #3499 from matsim-org/csv-to-avro-converter
Browse files Browse the repository at this point in the history
Added a converter for xyt csv files to avro
  • Loading branch information
frievoe97 authored Oct 9, 2024
2 parents 989a5e6 + 73d70fa commit c7df7e9
Show file tree
Hide file tree
Showing 3 changed files with 1,224 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package org.matsim.application.avro;

import it.unimi.dsi.fastutil.objects.Object2FloatAVLTreeMap;
import it.unimi.dsi.fastutil.objects.Object2FloatSortedMap;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.matsim.core.utils.io.IOUtils;

import java.io.IOException;
import java.nio.file.Path;
import java.util.*;

public class CSVToAvroConverter {

public static void main(String[] args) throws IOException {
String projection = args.length > 2 ? args[2] : null;
String name = args.length > 3 ? args[3] : "Emissions";

XYTData avroData = readCSV(args[0], projection, name);
writeAvroFile(avroData, Path.of(args[1]));
}

/**
* Reads a CSV file, processes its data, and returns the corresponding Avro object.
*
* @param csvFilePath the path to the CSV file
* @param projection the projection (CRS)
* @param name the name for the data series (defaults is "Emissions")
* @throws IOException if an error occurs during reading the file
*/
public static XYTData readCSV(String csvFilePath, String projection, String name) throws IOException {
List<CSVEntries> entries = new ArrayList<>();
List<Float> xCoords = new ArrayList<>();
List<Float> yCoords = new ArrayList<>();
List<Integer> timestamps = new ArrayList<>();
Object2FloatSortedMap<XYT> valuesMap = new Object2FloatAVLTreeMap<>(Comparator.comparing((XYT e) -> e.t)
.thenComparing(e -> e.x)
.thenComparing(e -> e.y));

try (CSVParser csvReader = new CSVParser(IOUtils.getBufferedReader(csvFilePath), CSVFormat.DEFAULT.builder()
.setCommentMarker('#').setSkipHeaderRecord(true).setHeader().build())) {

String comment = csvReader.getHeaderComment();

if (comment != null && (projection == null || projection.isEmpty())) {
projection = comment;
} else if (projection == null) {
projection = "";
}

for (CSVRecord record : csvReader) {
try {
int time = (int) Double.parseDouble(record.get(0));
float x = Float.parseFloat(record.get(1));
float y = Float.parseFloat(record.get(2));
float value = Float.parseFloat(record.get(3));

entries.add(new CSVEntries(time, x, y, value));

} catch (NumberFormatException e) {
System.out.println("Skipping invalid line: " + String.join(",", record));
}
}
}

// Sort entries by time -> x -> y
entries.sort(Comparator.comparing((CSVEntries e) -> e.time)
.thenComparing(e -> e.x)
.thenComparing(e -> e.y));

for (CSVEntries entry : entries) {
if (!xCoords.contains(entry.x)) {
xCoords.add(entry.x);
}
if (!yCoords.contains(entry.y)) {
yCoords.add(entry.y);
}
if (!timestamps.contains(entry.time)) {
timestamps.add(entry.time);
}

valuesMap.put(new XYT(entry.x, entry.y, entry.time), entry.value);
}

// Check if all combinations of x, y, and time exist
for (int time : timestamps) {
for (float x : xCoords) {
for (float y : yCoords) {
XYT key = new XYT(x, y, time);
if (!valuesMap.containsKey(key)) {
valuesMap.put(key, 0f);
}
}
}
}

// Create Avro data object
XYTData avroData = new XYTData();
avroData.setCrs(projection);
avroData.setXCoords(xCoords);
avroData.setYCoords(yCoords);
avroData.setTimestamps(timestamps);

List<Float> valuesList = new ArrayList<>(valuesMap.values());
Map<CharSequence, List<Float>> result = new HashMap<>();
result.put(name != null && !name.isEmpty() ? name : "Emissions", valuesList);

avroData.setData(result);

return avroData;
}

/**
* Writes the Avro data
*
* @param avroData the Avro data
* @param avroFile the path to the output Avro file
* @throws IOException if an error occurs during writing the file
*/
public static void writeAvroFile(XYTData avroData, Path avroFile) throws IOException {
DatumWriter<XYTData> datumWriter = new SpecificDatumWriter<>(XYTData.class);
try (DataFileWriter<XYTData> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.setCodec(CodecFactory.deflateCodec(9));
dataFileWriter.create(XYTData.getClassSchema(), avroFile.toFile());
dataFileWriter.append(avroData);
}
}

private record CSVEntries(int time, float x, float y, float value) {
}

private record XYT(float x, float y, float t) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.matsim.application.avro;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.matsim.testcases.MatsimTestUtils;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.matsim.core.utils.io.IOUtils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.*;

class CSVToAvroConverterTest {

@RegisterExtension
public final MatsimTestUtils utils = new MatsimTestUtils();

@Test
void conversion() throws IOException {
String input = utils.getInputDirectory() + "exampleCSV.csv";
String output = utils.getOutputDirectory() + "exampleAvro.avro";

CSVToAvroConverter.main(new String[]{input, output});

// Verify the output Avro file exists
Path outputPath = Path.of(output);
assertTrue(Files.exists(outputPath), "The Avro output file should exist.");

Set<Double> uniqueTimes = new HashSet<>();
Set<Double> uniqueX = new HashSet<>();
Set<Double> uniqueY = new HashSet<>();

try (CSVParser csvParser = new CSVParser(IOUtils.getBufferedReader(input), CSVFormat.DEFAULT.builder()
.setCommentMarker('#')
.setSkipHeaderRecord(true)
.setHeader("time", "x", "y", "value")
.build())) {
for (CSVRecord record : csvParser) {
uniqueTimes.add(Double.parseDouble(record.get("time")));
uniqueX.add(Double.parseDouble(record.get("x")));
uniqueY.add(Double.parseDouble(record.get("y")));
}
}

// Check if the avro file has the expected number of unique entries
int expectedTimeCount = uniqueTimes.size();
int expectedXCount = uniqueX.size();
int expectedYCount = uniqueY.size();
int expectedEmissionsSize = expectedTimeCount * expectedXCount * expectedYCount;

// Verify the avro data
SpecificDatumReader<XYTData> datumReader = new SpecificDatumReader<>(XYTData.class);
try (DataFileReader<XYTData> dataFileReader = new DataFileReader<>(new File(output), datumReader)) {
assertTrue(dataFileReader.hasNext(), "There should be at least one record in the Avro file.");

XYTData avroData = dataFileReader.next();

// Verify the number of unique entries in the Avro file matches the CSV data
assertEquals(expectedTimeCount, avroData.getTimestamps().size(), "The number of unique time entries should match.");
assertEquals(expectedXCount, avroData.getXCoords().size(), "The number of unique x-coordinates should match.");
assertEquals(expectedYCount, avroData.getYCoords().size(), "The number of unique y-coordinates should match.");

// Check if the data map has the expected number of entries
Map<CharSequence, List<Float>> emissionsData = avroData.getData();

for (Map.Entry<CharSequence, List<Float>> entry : emissionsData.entrySet()) {
assertNotNull(entry.getValue(), "The Emissions data should not be null.");
assertEquals(expectedEmissionsSize, entry.getValue().size(), "The size of the Emissions data should be timeCount * xCount * yCount.");
}
}

Files.delete(outputPath);
}
}
Loading

0 comments on commit c7df7e9

Please sign in to comment.