diff --git a/src/main/java/eu/dissco/core/translator/repository/BatchInserter.java b/src/main/java/eu/dissco/core/translator/repository/BatchInserter.java index 09db4d6..d7c2eae 100644 --- a/src/main/java/eu/dissco/core/translator/repository/BatchInserter.java +++ b/src/main/java/eu/dissco/core/translator/repository/BatchInserter.java @@ -2,9 +2,10 @@ import com.fasterxml.jackson.databind.JsonNode; import eu.dissco.core.translator.exception.DisscoRepositoryException; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.util.List; @@ -21,22 +22,6 @@ public class BatchInserter { private final CopyManager copyManager; - public void batchCopy(String tableName, List> dbRecords) - throws DisscoRepositoryException { - try (var outputStream = new ByteArrayOutputStream()) { - for (var dbRecord : dbRecords) { - outputStream.write(getCsvRow(dbRecord)); - } - var inputStream = new ByteArrayInputStream(outputStream.toByteArray()); - copyManager.copyIn("COPY " + tableName - + " FROM stdin DELIMITER ','", inputStream); - } catch (IOException | SQLException e) { - throw new DisscoRepositoryException( - String.format("An error has occurred inserting %d records into temp table %s", - dbRecords.size(), tableName), e); - } - } - private static byte[] getCsvRow(Pair dbRecord) { return (dbRecord.getLeft() + "," + cleanString(dbRecord.getRight()) @@ -54,4 +39,31 @@ private static String cleanString(JsonNode jsonNode) { return node; } + public void batchCopy(String tableName, List> dbRecords) + throws DisscoRepositoryException { + try (var outputStream = new ByteArrayOutputStream(); + var in = new PipedInputStream(); + var out = new PipedOutputStream(in)) { + for (var dbRecord : dbRecords) { + outputStream.write(getCsvRow(dbRecord)); + } + try (in) { + new Thread(() -> { + try (out) { + outputStream.writeTo(out); + } catch (IOException e) { + log.error("Error writing to pipe", e); + } + }).start(); + + copyManager.copyIn("COPY " + tableName + + " FROM stdin DELIMITER ','", in); + } + } catch (IOException | SQLException e) { + throw new DisscoRepositoryException( + String.format("An error has occurred inserting %d records into temp table %s", + dbRecords.size(), tableName), e); + } + } + }