Skip to content

Commit

Permalink
OSM
Browse files Browse the repository at this point in the history
  • Loading branch information
docuracy committed Jan 25, 2025
1 parent 8e65db6 commit a4c5c6b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 32 deletions.
2 changes: 1 addition & 1 deletion docs-source/content/500-System.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ source ~/.bashrc
```bash
cd ~ # Change to the home directory
sudo apt update && sudo apt upgrade -y
sudo apt install -y curl git unzip htop ufw
sudo apt install -y curl git unzip htop ufw aria2
git clone https://github.com/WorldHistoricalGazetteer/place.git
```

Expand Down
3 changes: 2 additions & 1 deletion vespa/docker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ httpx>=0.24.0,<0.25.0
requests>=2.31.0,<3.0.0
Rtree~=1.3.0
shapely~=2.0.6
pyvespa~=0.51.0
pyvespa~=0.51.0
xmltodict~=0.14.2
49 changes: 20 additions & 29 deletions vespa/repository/api/ingestion/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import io
import logging
import os
import subprocess
import urllib.parse
import zipfile

import ijson
import lxml
import requests
import xmltodict
from lxml.etree import iterparse


Expand Down Expand Up @@ -95,12 +97,14 @@ def _download_file(self):
file_path = self._get_file_path()
if not os.path.exists(file_path):
self.logger.info(f"Downloading file from {self.file_url} to {file_path}")
with requests.get(self.file_url, stream=True) as response:
response.raise_for_status()
with open(file_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
self.logger.info(f"File downloaded successfully to {file_path}")
result = subprocess.run([
"aria2c", "--dir", os.path.dirname(file_path), "--out", os.path.basename(file_path), self.file_url
], check=True)
if result.returncode == 0:
self.logger.info(f"File downloaded successfully to {file_path}")
else:
self.logger.error(f"Failed to download file from {self.file_url}")
raise Exception("File download failed")
else:
self.logger.info(f"File already exists at {file_path}, skipping download.")
return file_path
Expand Down Expand Up @@ -185,31 +189,18 @@ async def _parse_xml_stream(self, stream):
"""
Asynchronous parser for XML streams.
"""
# Ensure stream is wrapped in a thread-safe async operation
def parse():
for line in stream:
try:
# Parse XML into a dictionary
doc = xmltodict.parse(line)
yield doc
except Exception as e:
self.logger.error(f"Failed to parse XML line: {line}. Error: {e}")
continue

count = 0

for event, elem in lxml.etree.iterparse(stream, events=("end",)):

count += 1
if count > 100:
break


if elem.tag == "node":
# Create a dictionary from element attributes
elem_data = dict(elem.attrib)

# Add child tag attributes (k, v) to the dictionary
for tag in elem.findall("tag"):
key = tag.attrib.get("k")
value = tag.attrib.get("v")
if key and value:
elem_data[key] = value

yield elem_data
elem.clear() # Free memory
for item in await asyncio.to_thread(parse):
yield item

# Run synchronous parsing in a thread and asynchronously yield results
for elem_data in await asyncio.to_thread(parse):
Expand Down
2 changes: 1 addition & 1 deletion vespa/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ api:
image:
repository: worldhistoricalgazetteer/vespa-api
pullPolicy: IfNotPresent
tag: "0.0.8"
tag: "0.0.9"
securityContext:
container:
runAsUser: 0
Expand Down

0 comments on commit a4c5c6b

Please sign in to comment.