-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnormalize_addresses.py
executable file
·126 lines (106 loc) · 3.51 KB
/
normalize_addresses.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python2
import defusedxml.ElementTree as ET
import re
import sys
# regular expression for finding things that look like international phone numbers
# developed from a handful of examples rather than the RFCs, so... works generally, could miss edge cases
PHONE_RE = re.compile(r"^(?:00[ -]?|\+?)(\d{0,3}?)[ -]?\(?(\d{3})\)?[ -]?(\d{3})[ -]?(\d{4})$")
# names of groups extracted by regex, by position
RE_PARTS = ["country", "area", "first", "last"]
# XML XPATH expression for finding nodes that have "address" attributes
ADDR_XPATH = ".//*[@address]"
class AddrData(object):
def __init__(self, addrdict):
self.canon = addrdict["first"]+addrdict["last"]
self.area = addrdict["area"]
if not len(self.area):
self.area = None
self.country = addrdict["country"]
if not len(self.country):
self.country = None
def update(self, other):
assert self.canon == other.canon
if other.country is not None:
if self.country is None:
self.country = other.country
assert self.country == other.country
if other.area is not None:
if self.area is None:
self.area = other.area
assert self.area == other.area
def __str__(self):
out = ""
if self.country:
out += "+"+self.country
if self.area:
out += self.area
out += self.canon
return out
# functions for gathering addresses
def add_addr(addrmap, addr):
match = PHONE_RE.match(addr)
if match is None:
return
parts = dict(zip(RE_PARTS, match.groups()))
canon = parts["first"] + parts["last"]
if canon in addrmap:
new_addr = AddrData(parts)
addrmap[canon].update(new_addr)
else:
addrmap[canon] = AddrData(parts)
def gather_addrs(root):
# here we look for multiple versions of the same address, some of which might have more information than others
# to make sure that when we canonicalize addresses, we do so correctly
# (rather than assuming, eg, that all addresses with unspecified country codes are USA)
addrmap = {}
addrs = [e.get("address") for e in root.findall(ADDR_XPATH)]
for addr in addrs:
if '~' in addr:
parts = addr.split('~')
for part in parts:
add_addr(addrmap, part)
else:
add_addr(addrmap, addr)
return addrmap
# functions for outputting normalized addresses
def normalize_addr(addrmap, addr):
match = PHONE_RE.match(addr)
if match is None:
return addr
parts = dict(zip(RE_PARTS, match.groups()))
canon = parts["first"] + parts["last"]
assert canon in addrmap
return str(addrmap[canon])
def update_addrs(root, addrmap):
nodes = root.findall(ADDR_XPATH)
for node in nodes:
address = node.get("address")
if '~' in address:
addresses = address.split('~')
else:
addresses = [address]
addresses = [normalize_addr(addrmap, addr) for addr in addresses]
address = '~'.join(addresses)
node.set("address", address)
def parse_args():
if len(sys.argv) < 2:
print "USAGE: %s path/to/input/db.xml [path/to/output/db.xml]"%sys.argv[0]
sys.exit(-1)
inpath = sys.argv[1]
if len(sys.argv) >= 3:
outpath = sys.argv[2]
else:
inpath_parts = inpath.split('.')
inpath_suffix = inpath_parts[-1]
inpath_prefix = '.'.join(inpath_parts[:-1])
outpath = inpath_prefix+"-compressed."+inpath_suffix
return (inpath, outpath)
def main():
(inpath, outpath) = parse_args()
tree = ET.parse(inpath, forbid_dtd=True)
root = tree.getroot()
addrmap = gather_addrs(root)
update_addrs(root, addrmap)
tree.write(outpath)
if __name__ == "__main__":
main()