diff --git a/vespa/repository/api/ingestion/streamer.py b/vespa/repository/api/ingestion/streamer.py index a094be36..64680b69 100644 --- a/vespa/repository/api/ingestion/streamer.py +++ b/vespa/repository/api/ingestion/streamer.py @@ -7,10 +7,10 @@ import logging import os import urllib.parse -import xml.etree.ElementTree import zipfile import ijson +import lxml import requests from lxml.etree import iterparse @@ -185,22 +185,20 @@ async def _parse_xml_stream(self, stream): """ Asynchronous parser for XML streams. """ - # Ensure the stream is read synchronously - with io.TextIOWrapper(stream, encoding="utf-8", errors="replace") as wrapper: - for event, elem in iterparse(wrapper, events=("end",)): - if elem.tag == "node": - # Create a dictionary from element attributes - elem_data = dict(elem.attrib) - - # Add child tag attributes (k, v) to the dictionary - for tag in elem.findall("tag"): - key = tag.attrib.get("k") - value = tag.attrib.get("v") - if key and value: - elem_data[key] = value - - yield elem_data - elem.clear() # Free memory + for event, elem in lxml.etree.iterparse(stream, events=("end",)): + if elem.tag == "node": + # Create a dictionary from element attributes + elem_data = dict(elem.attrib) + + # Add child tag attributes (k, v) to the dictionary + for tag in elem.findall("tag"): + key = tag.attrib.get("k") + value = tag.attrib.get("v") + if key and value: + elem_data[key] = value + + yield elem_data + elem.clear() # Free memory def _split_triple(self, line): parts = line.rstrip(' .').split(' ', 2)