Skip to content

Commit

Permalink
fix: resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-nguyen-cs committed Jan 12, 2024
1 parent 94bfa7d commit f1e936d
Showing 1 changed file with 64 additions and 54 deletions.
118 changes: 64 additions & 54 deletions parser/openfoodfacts_taxonomy_parser/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,33 @@ def _create_other_node(self, tx: Transaction, node_data: NodeData, project_label
SET n.src_position = $src_position
"""
if node_data.get_node_type() == NodeType.TEXT:
id_query = f" CREATE (n:{project_label}:TEXT) \n "
id_query = f"CREATE (n:{project_label}:TEXT) \n"
elif node_data.get_node_type() == NodeType.SYNONYMS:
id_query = f" CREATE (n:{project_label}:SYNONYMS) \n "
id_query = f"CREATE (n:{project_label}:SYNONYMS) \n"
elif node_data.get_node_type() == NodeType.STOPWORDS:
id_query = f" CREATE (n:{project_label}:STOPWORDS) \n "
id_query = f"CREATE (n:{project_label}:STOPWORDS) \n"
else:
raise ValueError(f"ENTRY node type should be batched")
raise ValueError(f"ENTRY nodes should not be passed to this function")

entry_query = ""
for key in node_data.tags:
entry_query += " SET n." + key + " = $" + key + "\n"
entry_queries = [f"SET n.{key} = ${key}" for key in node_data.tags]
entry_query = "\n".join(entry_queries) + "\n"

query = id_query + entry_query + position_query
tx.run(query, node_data.to_dict())

def _create_other_nodes(self, other_nodes: list[NodeData], project_label: str):
"""Create all TEXT, SYNONYMS and STOPWORDS nodes"""
self.parser_logger.info("Creating TEXT, SYNONYMS and STOPWORDS nodes")
start_time = timeit.default_timer()

with self.session.begin_transaction() as tx:
for node in other_nodes:
self._create_other_node(tx, node, project_label)

self.parser_logger.info(
f"Created {len(other_nodes)} TEXT, SYNONYMS and STOPWORDS nodes in {timeit.default_timer() - start_time} seconds"
)

def _create_entry_nodes(self, entry_nodes: list[NodeData], project_label: str):
"""Create all ENTRY nodes in a single batch query"""
self.parser_logger.info("Creating ENTRY nodes")
Expand All @@ -66,20 +78,21 @@ def _create_entry_nodes(self, entry_nodes: list[NodeData], project_label: str):
SET n.src_position = entry_node.src_position
SET n.main_language = entry_node.main_language
"""
additional_query = ""
seen_properties = set()
seen_tags = set()

# we don't know in advance which properties and tags
# we will encounter in the batch
# so we accumulate them in this set
seen_properties_and_tags = set()

for entry_node in entry_nodes:
if entry_node.get_node_type() != NodeType.ENTRY:
raise ValueError(f"Only ENTRY nodes can be batched")
for key in entry_node.properties:
if not key in seen_properties:
additional_query += " SET n." + key + " = entry_node." + key + "\n"
seen_properties.add(key)
for key in entry_node.tags:
if not key in seen_tags:
additional_query += " SET n." + key + " = entry_node." + key + "\n"
seen_tags.add(key)
raise ValueError(f"Only ENTRY nodes should be passed to this function")
seen_properties_and_tags.update(entry_node.tags)
seen_properties_and_tags.update(entry_node.properties)

additional_query = "\n" + "\n".join(
[f"SET n.{key} = entry_node.{key}" for key in seen_properties_and_tags]
)

query = base_query + additional_query
self.session.run(query, entry_nodes=[entry_node.to_dict() for entry_node in entry_nodes])
Expand All @@ -88,24 +101,13 @@ def _create_entry_nodes(self, entry_nodes: list[NodeData], project_label: str):
f"Created {len(entry_nodes)} ENTRY nodes in {timeit.default_timer() - start_time} seconds"
)

def _create_other_nodes(self, other_nodes: list[NodeData], project_label: str):
"""Create all TEXT, SYNONYMS and STOPWORDS nodes"""
self.parser_logger.info("Creating TEXT, SYNONYMS and STOPWORDS nodes")
start_time = timeit.default_timer()

with self.session.begin_transaction() as tx:
for node in other_nodes:
self._create_other_node(tx, node, project_label)

self.parser_logger.info(
f"Created {len(other_nodes)} TEXT, SYNONYMS and STOPWORDS nodes in {timeit.default_timer() - start_time} seconds"
)

def _create_previous_links(self, previous_links: list[PreviousLink], project_label: str):
"""Create the 'is_before' relations between nodes"""
self.parser_logger.info("Creating 'is_before' links")
start_time = timeit.default_timer()

# The previous links creation is batched in a single query
# We also use the ID index to speed up the MATCH queries
query = f"""
UNWIND $previous_links as previous_link
MATCH(n:{project_label}) USING INDEX n:{project_label}(id)
Expand Down Expand Up @@ -137,6 +139,8 @@ def _create_child_links(
start_time = timeit.default_timer()

node_ids = set([node.id for node in entry_nodes])
# we collect nodes with a parent id which is the id of an entry (normalised)
# and nodes where a synonym was used to designate the parent (unnormalised)
normalised_child_links = []
unnormalised_child_links = []
for child_link in child_links:
Expand All @@ -145,6 +149,7 @@ def _create_child_links(
else:
unnormalised_child_links.append(child_link)

# adding normalised links is easy as we can directly match parent entries
normalised_query = f"""
UNWIND $normalised_child_links as child_link
MATCH (p:{project_label}) USING INDEX p:{project_label}(id)
Expand All @@ -157,27 +162,31 @@ def _create_child_links(
RETURN COUNT(relation)
"""

language_codes = set()
# for unnormalised links, we need to group them by language code of the parent id
lc_child_links_map = {}
for child_link in unnormalised_child_links:
lc, parent_id = child_link["parent_id"].split(":")
language_codes.add(lc)
child_link["parent_id"] = parent_id
if lc not in lc_child_links_map:
lc_child_links_map[lc] = []
lc_child_links_map[lc].append(child_link)

# we create a query for each language code
lc_queries = []
for lc, lc_child_links in lc_child_links_map.items():
lc_query = f"""
UNWIND $lc_child_links as child_link
MATCH (p:{project_label})
WHERE child_link.parent_id IN p.tags_ids_{lc}
MATCH (c:{project_label}) USING INDEX c:{project_label}(id)
WHERE c.id = child_link.id
CREATE (c)-[relations:is_child_of]->(p)
WITH relations
UNWIND relations AS relation
RETURN COUNT(relation)
"""
lc_queries.append((lc_query, lc_child_links))

parent_id_query = " OR ".join(
[f"child_link.parent_id IN p.tags_ids_{lc}" for lc in language_codes]
)

unnormalised_query = f"""
UNWIND $unnormalised_child_links as child_link
MATCH (p:{project_label})
WHERE {parent_id_query}
MATCH (c:{project_label}) USING INDEX c:{project_label}(id)
WHERE c.id = child_link.id
CREATE (c)-[relations:is_child_of]->(p)
WITH relations
UNWIND relations AS relation
RETURN COUNT(relation)
"""
count = 0

if normalised_child_links:
Expand All @@ -186,11 +195,10 @@ def _create_child_links(
)
count += normalised_result.value()[0]

if unnormalised_child_links:
unnormalised_result = self.session.run(
unnormalised_query, unnormalised_child_links=unnormalised_child_links
)
count += unnormalised_result.value()[0]
if lc_queries:
for lc_query, lc_child_links in lc_queries:
lc_result = self.session.run(lc_query, lc_child_links=lc_child_links)
count += lc_result.value()[0]

self.parser_logger.info(
f"Created {count} 'is_child_of' links in {timeit.default_timer() - start_time} seconds"
Expand Down Expand Up @@ -264,11 +272,13 @@ def _create_node_indexes(self, project_label: str):

def _write_to_database(self, taxonomy: Taxonomy, taxonomy_name: str, branch_name: str):
project_label = self._get_project_name(taxonomy_name, branch_name)
# First create nodes, then create node indexes to accelerate relationship creation, then create relationships
self._create_other_nodes(taxonomy.other_nodes, project_label)
self._create_entry_nodes(taxonomy.entry_nodes, project_label)
self._create_node_indexes(project_label)
self._create_child_links(taxonomy.child_links, taxonomy.entry_nodes, project_label)
self._create_previous_links(taxonomy.previous_links, project_label)
# Lastly create the parsing errors node
self._create_parsing_errors_node(taxonomy_name, branch_name, project_label)

def __call__(self, filename: str, branch_name: str, taxonomy_name: str):
Expand Down

0 comments on commit f1e936d

Please sign in to comment.