Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for select_multiple in XLSForms #257

Merged
merged 11 commits into from
Jun 5, 2024
Merged
213 changes: 85 additions & 128 deletions osm_fieldwork/CSVDump.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import csv
import logging
import os
import re
import sys
from datetime import datetime

import pandas as pd
from geojson import Feature, FeatureCollection, Point, dump

from osm_fieldwork.convert import Convert
Expand Down Expand Up @@ -58,11 +56,22 @@ def __init__(
self.config = super().__init__(yaml)
self.saved = dict()
self.defaults = dict()
self.entries = dict()
self.types = dict()

def lastSaved(
self,
keyword: str,
):
) -> str:
"""Get the last saved value for a question.

Args:
keyword (str): The keyword to search for

Returns:
(str): The last saved value for the question

"""
if keyword is not None and len(keyword) > 0:
return self.saved[keyword]
return None
Expand All @@ -71,39 +80,32 @@ def updateSaved(
self,
keyword: str,
value: str,
):
) -> bool:
"""Update the last saved value for a question.

Args:
keyword (str): The keyword to search for
value (str): The new value

Returns:
(bool): If the new value got saved

"""
if keyword is not None and value is not None and len(value) > 0:
self.saved[keyword] = value

def parseXLS(
self,
xlsfile: str,
):
"""Parse the source XLSFile if available to look for details we need."""
if xlsfile is not None and len(xlsfile) > 0:
entries = pd.read_excel(xlsfile, sheet_name=[0])
# There will only be a single sheet
names = entries[0]["name"]
defaults = entries[0]["default"]
total = len(names)
i = 0
while i < total:
entry = defaults[i]
if str(entry) != "nan":
pat = re.compile("..last-saved.*")
if pat.match(entry):
name = entry.split("#")[1][:-1]
self.saved[name] = None
else:
self.defaults[names[i]] = entry
i += 1
return True
return True
else:
return False

def createOSM(
self,
filespec: str,
):
"""Create an OSM XML output files."""
"""Create an OSM XML output files.

Args:
filespec (str): The output file name
"""
log.debug("Creating OSM XML file: %s" % filespec)
self.osm = OsmFile(filespec)
# self.osm.header()
Expand All @@ -112,7 +114,11 @@ def writeOSM(
self,
feature: dict,
):
"""Write a feature to an OSM XML output file."""
"""Write a feature to an OSM XML output file.

Args:
feature (dict): The OSM feature to write to
"""
out = ""
if "id" in feature["tags"]:
feature["id"] = feature["tags"]["id"]
Expand All @@ -131,17 +137,25 @@ def finishOSM(self):

def createGeoJson(
self,
file: str = "tmp.geojson",
filespec: str = "tmp.geojson",
):
"""Create a GeoJson output file."""
log.debug("Creating GeoJson file: %s" % file)
self.json = open(file, "w")
"""Create a GeoJson output file.

Args:
filespec (str): The output file name
"""
log.debug("Creating GeoJson file: %s" % filespec)
self.json = open(filespec, "w")

def writeGeoJson(
self,
feature: dict,
):
"""Write a feature to a GeoJson output file."""
"""Write a feature to a GeoJson output file.

Args:
feature (dict): The OSM feature to write to
"""
# These get written later when finishing , since we have to create a FeatureCollection
if "lat" not in feature["attrs"] or "lon" not in feature["attrs"]:
return None
Expand All @@ -167,8 +181,16 @@ def parse(
self,
filespec: str,
data: str = None,
):
"""Parse the CSV file from ODK Central and convert it to a data structure."""
) -> list:
"""Parse the CSV file from ODK Central and convert it to a data structure.

Args:
filespec (str): The file to parse.
data (str): Or the data to parse.

Returns:
(list): The list of features with tags
"""
all_tags = list()
if not data:
f = open(filespec, newline="")
Expand All @@ -179,32 +201,34 @@ def parse(
tags = dict()
# log.info(f"ROW: {row}")
for keyword, value in row.items():
if keyword is None or len(keyword) == 0:
if keyword is None or len(value) == 0:
continue

base = self.basename(keyword).lower()
# There's many extraneous fields in the input file which we don't need.
if base is None or base in self.ignore or value is None:
continue
# if base in self.multiple:
# epdb.st()
# entry = reader[keyword]
# for key, val in entry.items():
# print(key)75.66.108.181
# if key == "name":
# tags['name'] = val
# continue
else:
# log.info(f"ITEM: {keyword} = {value}")
if base in self.types:
if self.types[base] == "select_multiple":
vals = self.convertMultiple(value)
if len(vals) > 0:
for tag in vals:
tags.update(tag)
# print(f"BASE {tags}")
continue
# When using geopoint warmup, once the display changes to the map

# location, there is not always a value if the accuracy is way
# off. In this case use the warmup value, which is where we are
# standing anyway.
# hopefully standing anyway.
if base == "latitude" and len(value) == 0:
if "warmup-Latitude" in row:
value = row["warmup-Latitude"]
if base == "longitude" and len(value) == 0:
value = row["warmup-Longitude"]
items = self.convertEntry(base, value)

# log.info(f"ROW: {base} {value}")
if len(items) > 0:
if base in self.saved:
Expand All @@ -224,101 +248,32 @@ def parse(
tags[k] = v
else:
tags[base] = value

# log.debug(f"\tFIXME1: {tags}")
all_tags.append(tags)
return all_tags

def basename(
self,
line: str,
):
"""Extract the basename of a path after the last -."""
) -> str:
"""Extract the basename of a path after the last -.

Args:
line (str): The path from the json file entry

Returns:
(str): The last node of the path
"""
tmp = line.split("-")
if len(tmp) == 0:
return line
base = tmp[len(tmp) - 1]
return base

def createEntry(
self,
entry: dict,
):
"""Create the feature data structure."""
# print(line)
feature = dict()
attrs = dict()
tags = dict()
priv = dict()
refs = list()

# log.debug("Creating entry")
# First convert the tag to the approved OSM equivalent
if "lat" in entry and "lon" in entry:
attrs["lat"] = entry["lat"]
attrs["lon"] = entry["lon"]
for key, value in entry.items():
attributes = (
"id",
"timestamp",
"lat",
"lon",
"uid",
"user",
"version",
"action",
)

# When using existing OSM data, there's a special geometry field.
# Otherwise use the GPS coordinates where you are.
if key == "geometry" and len(value) > 0:
geometry = value.split(" ")
if len(geometry) == 4:
attrs["lat"] = geometry[0]
attrs["lon"] = geometry[1]
continue

if len(attrs["lat"]) == 0:
continue
if key is not None and len(key) > 0 and key in attributes:
attrs[key] = value
log.debug("Adding attribute %s with value %s" % (key, value))
else:
if key in self.multiple:
for item in value:
if key in item:
for entry in item[key].split():
vals = self.getValues(key)
if entry in vals:
if vals[entry].find("="):
tmp = vals[entry].split("=")
tags[tmp[0]] = tmp[1]
else:
tags[entry] = "yes"
continue

if value is not None and value != "no" and value != "unknown":
if key == "track" or key == "geoline":
# refs.append(tags)
# log.debug("Adding reference %s" % tags)
refs = value.split(";")
elif len(value) > 0:
if self.privateData(key):
priv[key] = value
else:
tags[key] = value
if len(tags) > 0:
feature["attrs"] = attrs
feature["tags"] = tags
if len(refs) > 1:
feature["refs"] = refs
if len(priv) > 0:
feature["private"] = priv

return feature


def main():
""" """
"""Run conversion directly from the terminal."""
parser = argparse.ArgumentParser(description="convert CSV from ODK Central to OSM XML")
parser.add_argument("-v", "--verbose", action="store_true", help="verbose output")
parser.add_argument("-y", "--yaml", help="Alternate YAML file")
Expand All @@ -340,6 +295,7 @@ def main():
csvin = CSVDump(args.yaml)
else:
csvin = CSVDump()

csvin.parseXLS(args.xlsfile)
osmoutfile = os.path.basename(args.infile.replace(".csv", ".osm"))
csvin.createOSM(osmoutfile)
Expand Down Expand Up @@ -367,6 +323,7 @@ def main():
csvin.writeOSM(node)
refs.append(nodeid)
nodeid -= 1

feature["refs"] = refs
csvin.writeOSM(feature)
else:
Expand Down
Loading
Loading