Skip to content

Commit

Permalink
OSM
Browse files Browse the repository at this point in the history
  • Loading branch information
docuracy committed Jan 25, 2025
1 parent 1c618df commit 96c9257
Showing 1 changed file with 5 additions and 8 deletions.
13 changes: 5 additions & 8 deletions vespa/repository/api/ingestion/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import ijson
import requests
from lxml.etree import iterparse


class StreamFetcher:
Expand Down Expand Up @@ -180,15 +181,13 @@ async def async_generator():

return async_generator()

def _parse_xml_stream(self, stream):
async def parse_xml_stream(stream):
"""
Asynchronous parser for XML streams.
"""
loop = asyncio.get_event_loop()
wrapper = io.TextIOWrapper(stream, encoding="utf-8", errors="replace")

async def async_generator():
for event, elem in xml.etree.ElementTree.iterparse(wrapper, events=("end",)):
# Ensure the stream is read synchronously
with io.TextIOWrapper(stream, encoding="utf-8", errors="replace") as wrapper:
for event, elem in iterparse(wrapper, events=("end",)):
if elem.tag == "node":
# Create a dictionary from element attributes
elem_data = dict(elem.attrib)
Expand All @@ -203,8 +202,6 @@ async def async_generator():
yield elem_data
elem.clear() # Free memory

return async_generator()

def _split_triple(self, line):
parts = line.rstrip(' .').split(' ', 2)

Expand Down

0 comments on commit 96c9257

Please sign in to comment.