Skip to content

Commit

Permalink
Fix GeoJson export for SQL.
Browse files Browse the repository at this point in the history
  • Loading branch information
Grandro committed Dec 4, 2024
1 parent f30040a commit 449c607
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 36 deletions.
87 changes: 68 additions & 19 deletions src/qlever-petrimaps/SQLCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,14 @@ void SQLCache::loadNew() {
std::string rowCountQuery = "SELECT COUNT(*) FROM (" + _finalQuery + ") AS finalQuery;";
pqxx::result rowCountQueryResult = processQuery(rowCountQuery);
const pqxx::field rowCountField = rowCountQueryResult[0][0];
size_t rowCount = rowCountField.as<size_t>();
_rowCount = rowCountField.as<size_t>();
// Count geometries instead of rows
_totalSize = rowCount * _geomColumnIdxs.size();
_totalSize = _rowCount * _geomColumnIdxs.size();

// Process final query and parse
_curRow = 0;
_curUniqueGeom = 0;
_numPointGeoms = 0;
_numLineGeoms = 0;
_numGeoms = 0;
if (_totalSize == 0) {
throw std::runtime_error("No geometries found.");
}
Expand All @@ -155,17 +154,14 @@ void SQLCache::loadNew() {
_lines.clear();
_linePoints.clear();

// Process final query in batches to use less total RAM
size_t batchSize = 100000;

// Use cursor to process data in batches
LOG(INFO) << "[GEOMCACHE] Process Query: " << _finalQuery;
try {
pqxx::work w(*_sqlConn);
pqxx::stateless_cursor<pqxx::cursor_base::read_only, pqxx::cursor_base::owned> cursor(w, _finalQuery, "myCursor", false);
for (size_t pos = 0; pos < rowCount; pos += batchSize) {
for (size_t pos = 0; pos < _rowCount; pos += _batchSize) {
_loadStatusStage = _LoadStatusStages::FinalQuery;
pqxx::result result = cursor.retrieve(pos, pos + batchSize);
pqxx::result result = cursor.retrieve(pos, pos + _batchSize);

_loadStatusStage = _LoadStatusStages::Parse;
parse(result);
Expand Down Expand Up @@ -200,12 +196,14 @@ void SQLCache::loadFromFile(const std::string& fname) {
size_t numLinePoints;
size_t numLines;
size_t numRowIdToResultTableRowId;
size_t numGeomColumnIdxs;
size_t numNonGeomColumnIdxs;
size_t numResultColumnNames;
std::streampos posPoints;
std::streampos posLinePoints;
std::streampos posLines;
std::streampos posRowIdToResultTableRowId;
std::streampos posGeomColumnIdxs;
std::streampos posNonGeomColumnIdxs;
std::streampos posResultColumnNames;

Expand Down Expand Up @@ -233,12 +231,22 @@ void SQLCache::loadFromFile(const std::string& fname) {
posRowIdToResultTableRowId = f.tellg();
f.seekg(sizeof(size_t) * 2 * numRowIdToResultTableRowId, f.cur);

// _geomColumnIdxs
f.read(reinterpret_cast<char*>(&numGeomColumnIdxs), sizeof(size_t));
_geomColumnIdxs.resize(numGeomColumnIdxs);
posGeomColumnIdxs = f.tellg();
f.seekg(sizeof(size_t) * numGeomColumnIdxs, f.cur);

// _nonGeomColumnIdxs
f.read(reinterpret_cast<char*>(&numNonGeomColumnIdxs), sizeof(size_t));
_nonGeomColumnIdxs.resize(numNonGeomColumnIdxs);
posNonGeomColumnIdxs = f.tellg();
f.seekg(sizeof(size_t) * numNonGeomColumnIdxs, f.cur);

// _rowCount
f.read(reinterpret_cast<char*>(&_rowCount), sizeof(size_t));
f.seekg(sizeof(size_t), f.cur);

// _resultColumnNames
f.read(reinterpret_cast<char*>(&numResultColumnNames), sizeof(size_t));
_resultColumnNames.resize(numResultColumnNames);
Expand Down Expand Up @@ -280,6 +288,12 @@ void SQLCache::loadFromFile(const std::string& fname) {
_rowIdToResultTableRowId[rowId] = resultTableRowId;
}

// _geomColumnIdxs
f.seekg(posGeomColumnIdxs);
for (size_t i = 0; i < numGeomColumnIdxs; i++) {
f.read(reinterpret_cast<char*>(&_geomColumnIdxs[i]), sizeof(size_t));
}

// _nonGeomColumnIdxs
f.seekg(posNonGeomColumnIdxs);
for (size_t i = 0; i < numNonGeomColumnIdxs; i++) {
Expand Down Expand Up @@ -341,11 +355,19 @@ void SQLCache::serializeToFile(const std::string& fname) const {
f.write(reinterpret_cast<const char*>(&resultTableRowId), sizeof(size_t));
}

// _geomColumnIdxs
num = _geomColumnIdxs.size();
f.write(reinterpret_cast<const char*>(&num), sizeof(size_t));
f.write(reinterpret_cast<const char*>(&_geomColumnIdxs[0]), sizeof(size_t) * num);

// _nonGeomColumnIdxs
num = _nonGeomColumnIdxs.size();
f.write(reinterpret_cast<const char*>(&num), sizeof(size_t));
f.write(reinterpret_cast<const char*>(&_nonGeomColumnIdxs[0]), sizeof(size_t) * num);

// _rowCount
f.write(reinterpret_cast<const char*>(&_rowCount), sizeof(size_t));

// _resultColumnNames
num = _resultColumnNames.size();
f.write(reinterpret_cast<const char*>(&num), sizeof(size_t));
Expand Down Expand Up @@ -422,13 +444,12 @@ std::vector<std::pair<ID_TYPE, ID_TYPE>> SQLCache::getRelObjects() const {
objects.push_back({i, idx});
}

idx = 0;
for (size_t i = 0; i < _lines.size(); i++) {
bool isFirst = std::get<1>(_lines[i]);
if (isFirst && i > 0) {
idx++;
}
objects.push_back({i + I_OFFSET, idx + I_OFFSET});
objects.push_back({i + I_OFFSET, idx});
}

return objects;
Expand All @@ -446,15 +467,43 @@ std::map<std::string, std::string> SQLCache::getRowAttr(size_t rowId) const {
int y = _nonGeomColumnIdxs[i];
pqxx::field field = row[y + 1];
std::string columnName = _resultColumnNames[y];
LOG(INFO) << "[GEOMCACHE] columnName: " << columnName;
std::string fieldValue = field.c_str();
LOG(INFO) << "[GEOMCACHE] fieldValue: " << fieldValue;
attr[columnName] = fieldValue;
}

return attr;
}

// _____________________________________________________________________________
std::vector<std::map<std::string, std::string>> SQLCache::getAttr() const {
// Use cursor to process data in batches
std::vector<std::map<std::string, std::string>> attr;
attr.reserve(_rowCount);

try {
pqxx::work w(*_sqlConn);
pqxx::stateless_cursor<pqxx::cursor_base::read_only, pqxx::cursor_base::owned> cursor(w, _finalQuery, "myCursor", false);
for (size_t pos = 0; pos < _rowCount; pos += _batchSize) {
pqxx::result result = cursor.retrieve(pos, pos + _batchSize);
for (const auto &row: result) {
std::map<std::string, std::string> rowAttr;
for (size_t i = 0; i < _nonGeomColumnIdxs.size(); i++) {
int y = _nonGeomColumnIdxs[i];
pqxx::field field = row[y + 1];
std::string columnName = _resultColumnNames[y];
std::string fieldValue = field.c_str();
rowAttr[columnName] = fieldValue;
}
attr.push_back(rowAttr);
}
}
} catch (const std::exception &e) {
throw std::runtime_error(e.what());
}

return attr;
}

// _____________________________________________________________________________
pqxx::result SQLCache::processQuery(std::string query, int limit, int offset, bool commit) const {
query = setQueryLimitOffset(query, limit, offset);
Expand Down Expand Up @@ -784,8 +833,8 @@ size_t SQLCache::parsePoint(std::string WKT, size_t rowNumber, size_t startPos,
_curUniqueGeom++;

if (isFirst) {
_rowIdToResultTableRowId[_numPointGeoms] = rowNumber;
_numPointGeoms++;
_rowIdToResultTableRowId[_numGeoms] = rowNumber;
_numGeoms++;
}

return pos;
Expand Down Expand Up @@ -814,8 +863,8 @@ size_t SQLCache::parseLineString(std::string WKT, size_t rowNumber, size_t start
_curUniqueGeom++;

if (isFirst) {
_rowIdToResultTableRowId[_numLineGeoms + I_OFFSET] = rowNumber;
_numLineGeoms++;
_rowIdToResultTableRowId[_numGeoms] = rowNumber;
_numGeoms++;
}

return pos;
Expand Down Expand Up @@ -856,8 +905,8 @@ size_t SQLCache::parsePolygon(std::string WKT, size_t rowNumber, size_t startPos
_curUniqueGeom++;

if (!isMulti || isMultiFirst) {
_rowIdToResultTableRowId[_numLineGeoms + I_OFFSET] = rowNumber;
_numLineGeoms++;
_rowIdToResultTableRowId[_numGeoms] = rowNumber;
_numGeoms++;
}

return pos + 1;
Expand Down
6 changes: 4 additions & 2 deletions src/qlever-petrimaps/SQLCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class SQLCache : public GeomCache {

std::vector<std::pair<ID_TYPE, ID_TYPE>> getRelObjects() const;
std::map<std::string, std::string> getRowAttr(size_t rowId) const;
std::vector<std::map<std::string, std::string>> getAttr() const;
void setQuery(std::string query);
void setQueryHash(std::string queryHash);

Expand All @@ -56,6 +57,7 @@ class SQLCache : public GeomCache {
std::string _queryHash;
std::string _createCacheViewQuery;
std::string _finalQuery;
size_t _batchSize = 100000;

std::vector<std::tuple<util::geo::FPoint, bool>> _points;
std::vector<std::tuple<size_t, bool>> _lines;
Expand All @@ -64,8 +66,8 @@ class SQLCache : public GeomCache {
std::map<size_t, size_t> _rowIdToResultTableRowId;
std::vector<size_t> _geomColumnIdxs;
std::vector<size_t> _nonGeomColumnIdxs;
size_t _numPointGeoms;
size_t _numLineGeoms;
size_t _numGeoms;
size_t _rowCount;

// PostgreSQL
std::string _sqlCredentials = "host=localhost port=5432 dbname=test_database user=test_user password=123456";
Expand Down
37 changes: 22 additions & 15 deletions src/qlever-petrimaps/server/SQLRequestor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,30 +59,37 @@ std::vector<std::pair<std::string, std::string>> SQLRequestor::requestRow(uint64
}

std::map<std::string, std::string> rowAttr = _cache->getRowAttr(row);
std::vector<std::pair<std::string, std::string>> res;
for (auto const& entry : rowAttr) {
std::string key = entry.first;
std::string val = entry.second;
std::pair<std::string, std::string> pair{key, val};
res.push_back(pair);
}
std::vector<std::pair<std::string, std::string>> pairs = getRowAttrPairs(rowAttr);

return res;
return pairs;
}

void SQLRequestor::requestRows(std::function<void(std::vector<std::vector<std::pair<std::string, std::string>>>)> cb) const {
if (!_cache->ready()) {
throw std::runtime_error("Geom cache not ready");
}

std::vector<std::map<std::string, std::string>> attr = _cache->getAttr();
std::vector<std::vector<std::pair<std::string, std::string>>> res;
auto relObjects = _cache->getRelObjects();
for(auto const& object : relObjects) {
// vector<pair<geomID, Row>>
// geomID starts from 0 ascending, Row = geomID
ID_TYPE row = object.second;
auto rowAttr = requestRow(row);
res.push_back(rowAttr);
res.reserve(attr.size());
for (size_t i = 0; i < attr.size(); i++) {
std::map<std::string, std::string> rowAttr = attr[i];
std::vector<std::pair<std::string, std::string>> pairs = getRowAttrPairs(rowAttr);
res.push_back(pairs);
}

cb(res);
}

std::vector<std::pair<std::string, std::string>> SQLRequestor::getRowAttrPairs(std::map<std::string, std::string> rowAttr) const {
std::vector<std::pair<std::string, std::string>> pairs;
pairs.reserve(rowAttr.size());
for (auto const& keyVal : rowAttr) {
std::string key = keyVal.first;
std::string val = keyVal.second;
std::pair<std::string, std::string> pair{key, val};
pairs.push_back(pair);
}

return pairs;
}
1 change: 1 addition & 0 deletions src/qlever-petrimaps/server/SQLRequestor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class SQLRequestor : public Requestor {
void request();
std::vector<std::pair<std::string, std::string>> requestRow(uint64_t row) const;
void requestRows(std::function<void(std::vector<std::vector<std::pair<std::string, std::string>>>)> cb) const;
std::vector<std::pair<std::string, std::string>> getRowAttrPairs(std::map<std::string, std::string> rowAttr) const;

private:
std::shared_ptr<const SQLCache> _cache;
Expand Down

0 comments on commit 449c607

Please sign in to comment.