diff --git a/examples/filter_unused_stops.py b/examples/filter_unused_stops.py
index 461597c2..b9680397 100755
--- a/examples/filter_unused_stops.py
+++ b/examples/filter_unused_stops.py
@@ -24,40 +24,46 @@
def main():
- parser = optparse.OptionParser(
- usage="usage: %prog [options] input_feed output_feed",
- version="%prog "+transitfeed.__version__)
- parser.add_option("-l", "--list_removed", dest="list_removed",
- default=False,
- action="store_true",
- help="Print removed stops to stdout")
- (options, args) = parser.parse_args()
- if len(args) != 2:
- print(parser.format_help(), file=sys.stderr)
- print("\n\nYou must provide input_feed and output_feed\n\n", file=sys.stderr)
- sys.exit(2)
- input_path = args[0]
- output_path = args[1]
-
- loader = transitfeed.Loader(input_path)
- schedule = loader.Load()
-
- print("Removing unused stops...")
- removed = 0
- for stop_id, stop in schedule.stops.items():
- if not stop.GetTrips(schedule):
- removed += 1
- del schedule.stops[stop_id]
- if options.list_removed:
- print("Removing %s (%s)" % (stop_id, stop.stop_name))
- if removed == 0:
- print("No unused stops.")
- elif removed == 1:
- print("Removed 1 stop")
- else:
- print("Removed %d stops" % removed)
-
- schedule.WriteGoogleTransitFeed(output_path)
+ parser = optparse.OptionParser(
+ usage="usage: %prog [options] input_feed output_feed",
+ version="%prog " + transitfeed.__version__,
+ )
+ parser.add_option(
+ "-l",
+ "--list_removed",
+ dest="list_removed",
+ default=False,
+ action="store_true",
+ help="Print removed stops to stdout",
+ )
+ (options, args) = parser.parse_args()
+ if len(args) != 2:
+ print(parser.format_help(), file=sys.stderr)
+ print("\n\nYou must provide input_feed and output_feed\n\n", file=sys.stderr)
+ sys.exit(2)
+ input_path = args[0]
+ output_path = args[1]
+
+ loader = transitfeed.Loader(input_path)
+ schedule = loader.Load()
+
+ print("Removing unused stops...")
+ removed = 0
+ for stop_id, stop in schedule.stops.items():
+ if not stop.GetTrips(schedule):
+ removed += 1
+ del schedule.stops[stop_id]
+ if options.list_removed:
+ print("Removing %s (%s)" % (stop_id, stop.stop_name))
+ if removed == 0:
+ print("No unused stops.")
+ elif removed == 1:
+ print("Removed 1 stop")
+ else:
+ print("Removed %d stops" % removed)
+
+ schedule.WriteGoogleTransitFeed(output_path)
+
if __name__ == "__main__":
- main()
+ main()
diff --git a/examples/google_random_queries.py b/examples/google_random_queries.py
index 72928381..0cd6f7cb 100755
--- a/examples/google_random_queries.py
+++ b/examples/google_random_queries.py
@@ -38,7 +38,7 @@
def Distance(lat0, lng0, lat1, lng1):
- """
+ """
Compute the geodesic distance in meters between two points on the
surface of the Earth. The latitude and longitude angles are in
degrees.
@@ -52,75 +52,82 @@ def Distance(lat0, lng0, lat1, lng1):
(see "When is it NOT okay to assume the Earth is a sphere?" in the
same faq).
"""
- deg2rad = math.pi / 180.0
- lat0 = lat0 * deg2rad
- lng0 = lng0 * deg2rad
- lat1 = lat1 * deg2rad
- lng1 = lng1 * deg2rad
- dlng = lng1 - lng0
- dlat = lat1 - lat0
- a = math.sin(dlat*0.5)
- b = math.sin(dlng*0.5)
- a = a * a + math.cos(lat0) * math.cos(lat1) * b * b
- c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
- return 6367000.0 * c
+ deg2rad = math.pi / 180.0
+ lat0 = lat0 * deg2rad
+ lng0 = lng0 * deg2rad
+ lat1 = lat1 * deg2rad
+ lng1 = lng1 * deg2rad
+ dlng = lng1 - lng0
+ dlat = lat1 - lat0
+ a = math.sin(dlat * 0.5)
+ b = math.sin(dlng * 0.5)
+ a = a * a + math.cos(lat0) * math.cos(lat1) * b * b
+ c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
+ return 6367000.0 * c
def AddNoiseToLatLng(lat, lng):
- """Add up to 500m of error to each coordinate of lat, lng."""
- m_per_tenth_lat = Distance(lat, lng, lat + 0.1, lng)
- m_per_tenth_lng = Distance(lat, lng, lat, lng + 0.1)
- lat_per_100m = 1 / m_per_tenth_lat * 10
- lng_per_100m = 1 / m_per_tenth_lng * 10
- return (lat + (lat_per_100m * 5 * (random.random() * 2 - 1)),
- lng + (lng_per_100m * 5 * (random.random() * 2 - 1)))
+ """Add up to 500m of error to each coordinate of lat, lng."""
+ m_per_tenth_lat = Distance(lat, lng, lat + 0.1, lng)
+ m_per_tenth_lng = Distance(lat, lng, lat, lng + 0.1)
+ lat_per_100m = 1 / m_per_tenth_lat * 10
+ lng_per_100m = 1 / m_per_tenth_lng * 10
+ return (
+ lat + (lat_per_100m * 5 * (random.random() * 2 - 1)),
+ lng + (lng_per_100m * 5 * (random.random() * 2 - 1)),
+ )
def GetRandomLocationsNearStops(schedule):
- """Return a list of (lat, lng) tuples."""
- locations = []
- for s in schedule.GetStopList():
- locations.append(AddNoiseToLatLng(s.stop_lat, s.stop_lon))
- return locations
+ """Return a list of (lat, lng) tuples."""
+ locations = []
+ for s in schedule.GetStopList():
+ locations.append(AddNoiseToLatLng(s.stop_lat, s.stop_lon))
+ return locations
def GetRandomDatetime():
- """Return a datetime in the next week."""
- seconds_offset = random.randint(0, 60 * 60 * 24 * 7)
- dt = datetime.today() + timedelta(seconds=seconds_offset)
- return dt.replace(second=0, microsecond=0)
+ """Return a datetime in the next week."""
+ seconds_offset = random.randint(0, 60 * 60 * 24 * 7)
+ dt = datetime.today() + timedelta(seconds=seconds_offset)
+ return dt.replace(second=0, microsecond=0)
def FormatLatLng(lat_lng):
- """Format a (lat, lng) tuple into a string for maps.google.com."""
- return "%0.6f,%0.6f" % lat_lng
+ """Format a (lat, lng) tuple into a string for maps.google.com."""
+ return "%0.6f,%0.6f" % lat_lng
def LatLngsToGoogleUrl(source, destination, dt):
- """Return a URL for routing between two (lat, lng) at a datetime."""
- params = {"saddr": FormatLatLng(source),
- "daddr": FormatLatLng(destination),
- "time": dt.strftime("%I:%M%p"),
- "date": dt.strftime("%Y-%m-%d"),
- "dirflg": "r",
- "ie": "UTF8",
- "oe": "UTF8"}
- url = urlparse.urlunsplit(("http", "maps.google.com", "/maps",
- urllib.urlencode(params), ""))
- return url
+ """Return a URL for routing between two (lat, lng) at a datetime."""
+ params = {
+ "saddr": FormatLatLng(source),
+ "daddr": FormatLatLng(destination),
+ "time": dt.strftime("%I:%M%p"),
+ "date": dt.strftime("%Y-%m-%d"),
+ "dirflg": "r",
+ "ie": "UTF8",
+ "oe": "UTF8",
+ }
+ url = urlparse.urlunsplit(
+ ("http", "maps.google.com", "/maps", urllib.urlencode(params), "")
+ )
+ return url
def LatLngsToGoogleLink(source, destination):
- """Return a string "from:%s to:%s on %s " % (
- LatLngsToGoogleUrl(source, destination, dt),
- FormatLatLng(source), FormatLatLng(destination),
- dt.ctime())
+ """Return a string "from:%s to:%s on %s " % (
+ LatLngsToGoogleUrl(source, destination, dt),
+ FormatLatLng(source),
+ FormatLatLng(destination),
+ dt.ctime(),
+ )
def WriteOutput(title, locations, limit, f):
- """Write html to f for up to limit trips between locations.
+ """Write html to f for up to limit trips between locations.
Args:
title: String used in html title
@@ -128,7 +135,8 @@ def WriteOutput(title, locations, limit, f):
limit: maximum number of queries in the html
f: a file object
"""
- output_prefix = """
+ output_prefix = (
+ """
@@ -163,34 +171,41 @@ def WriteOutput(title, locations, limit, f):
If you find a problem be sure to save the URL. This file is generated randomly.
-""" % locals()
+"""
+ % locals()
+ )
- output_suffix = """
+ output_suffix = (
+ """
-""" % locals()
+"""
+ % locals()
+ )
- f.write(transitfeed.EncodeUnicode(output_prefix))
- for source, destination in zip(locations[0:limit], locations[1:limit + 1]):
- f.write(transitfeed.EncodeUnicode("%s\n" %
- LatLngsToGoogleLink(source, destination)))
- f.write(transitfeed.EncodeUnicode(output_suffix))
+ f.write(transitfeed.EncodeUnicode(output_prefix))
+ for source, destination in zip(locations[0:limit], locations[1 : limit + 1]):
+ f.write(
+ transitfeed.EncodeUnicode(
+ " %s\n" % LatLngsToGoogleLink(source, destination)
+ )
+ )
+ f.write(transitfeed.EncodeUnicode(output_suffix))
def ParentAndBaseName(path):
- """Given a path return only the parent name and file name as a string."""
- dirname, basename = os.path.split(path)
- dirname = dirname.rstrip(os.path.sep)
- if os.path.altsep:
- dirname = dirname.rstrip(os.path.altsep)
- _, parentname = os.path.split(dirname)
- return os.path.join(parentname, basename)
+ """Given a path return only the parent name and file name as a string."""
+ dirname, basename = os.path.split(path)
+ dirname = dirname.rstrip(os.path.sep)
+ if os.path.altsep:
+ dirname = dirname.rstrip(os.path.altsep)
+ _, parentname = os.path.split(dirname)
+ return os.path.join(parentname, basename)
def main():
- usage = \
-"""%prog [options]
+ usage = """%prog [options]
Create an HTML page of random URLs for the Google Maps transit trip
planner. The queries go between places near stops listed in a .
@@ -200,37 +215,46 @@ def main():
https://github.com/google/transitfeed/wiki/GoogleRandomQueries
"""
- parser = optparse.OptionParser(
- usage=usage,
- version="%prog "+transitfeed.__version__)
- parser.add_option("-l", "--limit", dest="limit", type="int",
- help="Maximum number of URLs to generate")
- parser.add_option("-o", "--output", dest="output", metavar="HTML_OUTPUT_PATH",
- help="write HTML output to HTML_OUTPUT_PATH")
- parser.set_defaults(output="google_random_queries.html", limit=50)
- (options, args) = parser.parse_args()
- if len(args) != 1:
- print(parser.format_help(), file=sys.stderr)
- print("\n\nYou must provide the path of a single feed\n\n", file=sys.stderr)
- sys.exit(2)
- feed_path = args[0]
-
- # ProblemReporter prints problems on console.
- loader = transitfeed.Loader(feed_path, problems=transitfeed.ProblemReporter(),
- load_stop_times=False)
- schedule = loader.Load()
- locations = GetRandomLocationsNearStops(schedule)
- random.shuffle(locations)
- agencies = ", ".join([a.agency_name for a in schedule.GetAgencyList()])
- title = "%s (%s)" % (agencies, ParentAndBaseName(feed_path))
-
- WriteOutput(title,
- locations,
- options.limit,
- open(options.output, "w"))
- print ("Load %s in your web browser. It contains more instructions." %
- options.output)
+ parser = optparse.OptionParser(
+ usage=usage, version="%prog " + transitfeed.__version__
+ )
+ parser.add_option(
+ "-l",
+ "--limit",
+ dest="limit",
+ type="int",
+ help="Maximum number of URLs to generate",
+ )
+ parser.add_option(
+ "-o",
+ "--output",
+ dest="output",
+ metavar="HTML_OUTPUT_PATH",
+ help="write HTML output to HTML_OUTPUT_PATH",
+ )
+ parser.set_defaults(output="google_random_queries.html", limit=50)
+ (options, args) = parser.parse_args()
+ if len(args) != 1:
+ print(parser.format_help(), file=sys.stderr)
+ print("\n\nYou must provide the path of a single feed\n\n", file=sys.stderr)
+ sys.exit(2)
+ feed_path = args[0]
+
+ # ProblemReporter prints problems on console.
+ loader = transitfeed.Loader(
+ feed_path, problems=transitfeed.ProblemReporter(), load_stop_times=False
+ )
+ schedule = loader.Load()
+ locations = GetRandomLocationsNearStops(schedule)
+ random.shuffle(locations)
+ agencies = ", ".join([a.agency_name for a in schedule.GetAgencyList()])
+ title = "%s (%s)" % (agencies, ParentAndBaseName(feed_path))
+
+ WriteOutput(title, locations, options.limit, open(options.output, "w"))
+ print(
+ "Load %s in your web browser. It contains more instructions." % options.output
+ )
if __name__ == "__main__":
- main()
+ main()
diff --git a/examples/shuttle_from_xmlfeed.py b/examples/shuttle_from_xmlfeed.py
index da470a66..b88f3528 100755
--- a/examples/shuttle_from_xmlfeed.py
+++ b/examples/shuttle_from_xmlfeed.py
@@ -27,111 +27,131 @@
import urllib
try:
- import xml.etree.ElementTree as ET # python 2.5
+ import xml.etree.ElementTree as ET # python 2.5
except ImportError as e:
- import elementtree.ElementTree as ET # older pythons
+ import elementtree.ElementTree as ET # older pythons
class NoUnusedStopExceptionProblemReporter(transitfeed.ProblemReporter):
- """The company shuttle database has a few unused stops for reasons unrelated
+ """The company shuttle database has a few unused stops for reasons unrelated
to this script. Ignore them.
"""
- def __init__(self):
- accumulator = transitfeed.ExceptionProblemAccumulator()
- transitfeed.ProblemReporter.__init__(self, accumulator)
+ def __init__(self):
+ accumulator = transitfeed.ExceptionProblemAccumulator()
+ transitfeed.ProblemReporter.__init__(self, accumulator)
+
+ def UnusedStop(self, stop_id, stop_name):
+ pass
- def UnusedStop(self, stop_id, stop_name):
- pass
def SaveFeed(input, output):
- tree = ET.parse(urllib.urlopen(input))
-
- schedule = transitfeed.Schedule()
- service_period = schedule.GetDefaultServicePeriod()
- service_period.SetWeekdayService()
- service_period.SetStartDate('20070314')
- service_period.SetEndDate('20071231')
- # Holidays for 2007
- service_period.SetDateHasService('20070528', has_service=False)
- service_period.SetDateHasService('20070704', has_service=False)
- service_period.SetDateHasService('20070903', has_service=False)
- service_period.SetDateHasService('20071122', has_service=False)
- service_period.SetDateHasService('20071123', has_service=False)
- service_period.SetDateHasService('20071224', has_service=False)
- service_period.SetDateHasService('20071225', has_service=False)
- service_period.SetDateHasService('20071226', has_service=False)
- service_period.SetDateHasService('20071231', has_service=False)
-
- stops = {} # Map from xml stop id to python Stop object
- agency = schedule.NewDefaultAgency(name='GBus', url='http://shuttle/',
- timezone='America/Los_Angeles')
-
- for xml_stop in tree.getiterator('stop'):
- stop = schedule.AddStop(lat=float(xml_stop.attrib['lat']),
- lng=float(xml_stop.attrib['lng']),
- name=xml_stop.attrib['name'])
- stops[xml_stop.attrib['id']] = stop
-
- for xml_shuttleGroup in tree.getiterator('shuttleGroup'):
- if xml_shuttleGroup.attrib['name'] == 'Test':
- continue
- r = schedule.AddRoute(short_name="",
- long_name=xml_shuttleGroup.attrib['name'], route_type='Bus')
- for xml_route in xml_shuttleGroup.getiterator('route'):
- t = r.AddTrip(schedule=schedule, headsign=xml_route.attrib['name'],
- trip_id=xml_route.attrib['id'])
- trip_stops = [] # Build a list of (time, Stop) tuples
- for xml_schedule in xml_route.getiterator('schedule'):
- trip_stops.append( (int(xml_schedule.attrib['time']) / 1000,
- stops[xml_schedule.attrib['stopId']]) )
- trip_stops.sort() # Sort by time
- for (time, stop) in trip_stops:
- t.AddStopTime(stop=stop, arrival_secs=time, departure_secs=time)
-
- schedule.Validate(problems=NoUnusedStopExceptionProblemReporter())
- schedule.WriteGoogleTransitFeed(output)
+ tree = ET.parse(urllib.urlopen(input))
+
+ schedule = transitfeed.Schedule()
+ service_period = schedule.GetDefaultServicePeriod()
+ service_period.SetWeekdayService()
+ service_period.SetStartDate("20070314")
+ service_period.SetEndDate("20071231")
+ # Holidays for 2007
+ service_period.SetDateHasService("20070528", has_service=False)
+ service_period.SetDateHasService("20070704", has_service=False)
+ service_period.SetDateHasService("20070903", has_service=False)
+ service_period.SetDateHasService("20071122", has_service=False)
+ service_period.SetDateHasService("20071123", has_service=False)
+ service_period.SetDateHasService("20071224", has_service=False)
+ service_period.SetDateHasService("20071225", has_service=False)
+ service_period.SetDateHasService("20071226", has_service=False)
+ service_period.SetDateHasService("20071231", has_service=False)
+
+ stops = {} # Map from xml stop id to python Stop object
+ agency = schedule.NewDefaultAgency(
+ name="GBus", url="http://shuttle/", timezone="America/Los_Angeles"
+ )
+
+ for xml_stop in tree.getiterator("stop"):
+ stop = schedule.AddStop(
+ lat=float(xml_stop.attrib["lat"]),
+ lng=float(xml_stop.attrib["lng"]),
+ name=xml_stop.attrib["name"],
+ )
+ stops[xml_stop.attrib["id"]] = stop
+
+ for xml_shuttleGroup in tree.getiterator("shuttleGroup"):
+ if xml_shuttleGroup.attrib["name"] == "Test":
+ continue
+ r = schedule.AddRoute(
+ short_name="", long_name=xml_shuttleGroup.attrib["name"], route_type="Bus"
+ )
+ for xml_route in xml_shuttleGroup.getiterator("route"):
+ t = r.AddTrip(
+ schedule=schedule,
+ headsign=xml_route.attrib["name"],
+ trip_id=xml_route.attrib["id"],
+ )
+ trip_stops = [] # Build a list of (time, Stop) tuples
+ for xml_schedule in xml_route.getiterator("schedule"):
+ trip_stops.append(
+ (
+ int(xml_schedule.attrib["time"]) / 1000,
+ stops[xml_schedule.attrib["stopId"]],
+ )
+ )
+ trip_stops.sort() # Sort by time
+ for (time, stop) in trip_stops:
+ t.AddStopTime(stop=stop, arrival_secs=time, departure_secs=time)
+
+ schedule.Validate(problems=NoUnusedStopExceptionProblemReporter())
+ schedule.WriteGoogleTransitFeed(output)
def main():
- parser = OptionParser()
- parser.add_option('--input', dest='input',
- help='Path or URL of input')
- parser.add_option('--output', dest='output',
- help='Path of output file. Should end in .zip and if it '
- 'contains the substring YYYYMMDD it will be replaced with '
- 'today\'s date. It is impossible to include the literal '
- 'string YYYYYMMDD in the path of the output file.')
- parser.add_option('--execute', dest='execute',
- help='Commands to run to copy the output. %(path)s is '
- 'replaced with full path of the output and %(name)s is '
- 'replaced with name part of the path. Try '
- 'scp %(path)s myhost:www/%(name)s',
- action='append')
- parser.set_defaults(input=None, output=None, execute=[])
- (options, args) = parser.parse_args()
-
- today = datetime.date.today().strftime('%Y%m%d')
- options.output = re.sub(r'YYYYMMDD', today, options.output)
- (_, name) = os.path.split(options.output)
- path = options.output
-
- SaveFeed(options.input, options.output)
-
- for command in options.execute:
- import subprocess
- def check_call(cmd):
- """Convenience function that is in the docs for subprocess but not
+ parser = OptionParser()
+ parser.add_option("--input", dest="input", help="Path or URL of input")
+ parser.add_option(
+ "--output",
+ dest="output",
+ help="Path of output file. Should end in .zip and if it "
+ "contains the substring YYYYMMDD it will be replaced with "
+ "today's date. It is impossible to include the literal "
+ "string YYYYYMMDD in the path of the output file.",
+ )
+ parser.add_option(
+ "--execute",
+ dest="execute",
+ help="Commands to run to copy the output. %(path)s is "
+ "replaced with full path of the output and %(name)s is "
+ "replaced with name part of the path. Try "
+ "scp %(path)s myhost:www/%(name)s",
+ action="append",
+ )
+ parser.set_defaults(input=None, output=None, execute=[])
+ (options, args) = parser.parse_args()
+
+ today = datetime.date.today().strftime("%Y%m%d")
+ options.output = re.sub(r"YYYYMMDD", today, options.output)
+ (_, name) = os.path.split(options.output)
+ path = options.output
+
+ SaveFeed(options.input, options.output)
+
+ for command in options.execute:
+ import subprocess
+
+ def check_call(cmd):
+ """Convenience function that is in the docs for subprocess but not
installed on my system."""
- retcode = subprocess.call(cmd, shell=True)
- if retcode < 0:
- raise Exception("Child '%s' was terminated by signal %d" % (cmd,
- -retcode))
- elif retcode != 0:
- raise Exception("Child '%s' returned %d" % (cmd, retcode))
-
- # path_output and filename_current can be used to run arbitrary commands
- check_call(command % locals())
-
-if __name__ == '__main__':
- main()
+ retcode = subprocess.call(cmd, shell=True)
+ if retcode < 0:
+ raise Exception(
+ "Child '%s' was terminated by signal %d" % (cmd, -retcode)
+ )
+ elif retcode != 0:
+ raise Exception("Child '%s' returned %d" % (cmd, retcode))
+
+ # path_output and filename_current can be used to run arbitrary commands
+ check_call(command % locals())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/small_builder.py b/examples/small_builder.py
index d245594f..d78806d0 100755
--- a/examples/small_builder.py
+++ b/examples/small_builder.py
@@ -8,32 +8,33 @@
parser = OptionParser()
-parser.add_option('--output', dest='output',
- help='Path of output file. Should end in .zip')
-parser.set_defaults(output='google_transit.zip')
+parser.add_option(
+ "--output", dest="output", help="Path of output file. Should end in .zip"
+)
+parser.set_defaults(output="google_transit.zip")
(options, args) = parser.parse_args()
schedule = transitfeed.Schedule()
-schedule.AddAgency("Fly Agency", "http://iflyagency.com",
- "America/Los_Angeles")
+schedule.AddAgency("Fly Agency", "http://iflyagency.com", "America/Los_Angeles")
service_period = schedule.GetDefaultServicePeriod()
service_period.SetWeekdayService(True)
-service_period.SetDateHasService('20070704')
+service_period.SetDateHasService("20070704")
stop1 = schedule.AddStop(lng=-122, lat=37.2, name="Suburbia")
stop2 = schedule.AddStop(lng=-122.001, lat=37.201, name="Civic Center")
-route = schedule.AddRoute(short_name="22", long_name="Civic Center Express",
- route_type="Bus")
+route = schedule.AddRoute(
+ short_name="22", long_name="Civic Center Express", route_type="Bus"
+)
trip = route.AddTrip(schedule, headsign="To Downtown")
-trip.AddStopTime(stop1, stop_time='09:00:00')
-trip.AddStopTime(stop2, stop_time='09:15:00')
+trip.AddStopTime(stop1, stop_time="09:00:00")
+trip.AddStopTime(stop2, stop_time="09:15:00")
trip = route.AddTrip(schedule, headsign="To Suburbia")
-trip.AddStopTime(stop1, stop_time='17:30:00')
-trip.AddStopTime(stop2, stop_time='17:45:00')
+trip.AddStopTime(stop1, stop_time="17:30:00")
+trip.AddStopTime(stop2, stop_time="17:45:00")
schedule.Validate()
schedule.WriteGoogleTransitFeed(options.output)
diff --git a/examples/table.py b/examples/table.py
index ecd69376..81c16cbf 100755
--- a/examples/table.py
+++ b/examples/table.py
@@ -46,24 +46,30 @@
# [time_at_1, time_at_2, ...] # times for trip 2
# ... ]
def AddRouteToSchedule(schedule, table):
- if len(table) >= 2:
- r = schedule.AddRoute(short_name=table[0][0], long_name=table[0][1], route_type='Bus')
- for trip in table[2:]:
- if len(trip) > len(table[1]):
- print("ignoring %s" % trip[len(table[1]):])
- trip = trip[0:len(table[1])]
- t = r.AddTrip(schedule, headsign='My headsign')
- trip_stops = [] # Build a list of (time, stopname) tuples
- for i in range(0, len(trip)):
- if re.search(r'\S', trip[i]):
- trip_stops.append( (transitfeed.TimeToSecondsSinceMidnight(trip[i]), table[1][i]) )
- trip_stops.sort() # Sort by time
- for (time, stopname) in trip_stops:
- t.AddStopTime(stop=stops[stopname.lower()], arrival_secs=time,
- departure_secs=time)
+ if len(table) >= 2:
+ r = schedule.AddRoute(
+ short_name=table[0][0], long_name=table[0][1], route_type="Bus"
+ )
+ for trip in table[2:]:
+ if len(trip) > len(table[1]):
+ print("ignoring %s" % trip[len(table[1]) :])
+ trip = trip[0 : len(table[1])]
+ t = r.AddTrip(schedule, headsign="My headsign")
+ trip_stops = [] # Build a list of (time, stopname) tuples
+ for i in range(0, len(trip)):
+ if re.search(r"\S", trip[i]):
+ trip_stops.append(
+ (transitfeed.TimeToSecondsSinceMidnight(trip[i]), table[1][i])
+ )
+ trip_stops.sort() # Sort by time
+ for (time, stopname) in trip_stops:
+ t.AddStopTime(
+ stop=stops[stopname.lower()], arrival_secs=time, departure_secs=time
+ )
+
def TransposeTable(table):
- """Transpose a list of lists, using None to extend all input lists to the
+ """Transpose a list of lists, using None to extend all input lists to the
same length.
For example:
@@ -77,101 +83,105 @@ def TransposeTable(table):
[13, None, 33],
[None, None, 34]]
"""
- transposed = []
- rows = len(table)
- cols = max(len(row) for row in table)
- for x in range(cols):
- transposed.append([])
- for y in range(rows):
- if x < len(table[y]):
- transposed[x].append(table[y][x])
- else:
- transposed[x].append(None)
- return transposed
+ transposed = []
+ rows = len(table)
+ cols = max(len(row) for row in table)
+ for x in range(cols):
+ transposed.append([])
+ for y in range(rows):
+ if x < len(table[y]):
+ transposed[x].append(table[y][x])
+ else:
+ transposed[x].append(None)
+ return transposed
+
def ProcessOptions(schedule, table):
- service_period = schedule.GetDefaultServicePeriod()
- agency_name, agency_url, agency_timezone = (None, None, None)
-
- for row in table[1:]:
- command = row[0].lower()
- if command == 'weekday':
- service_period.SetWeekdayService()
- elif command == 'start_date':
- service_period.SetStartDate(row[1])
- elif command == 'end_date':
- service_period.SetEndDate(row[1])
- elif command == 'add_date':
- service_period.SetDateHasService(date=row[1])
- elif command == 'remove_date':
- service_period.SetDateHasService(date=row[1], has_service=False)
- elif command == 'agency_name':
- agency_name = row[1]
- elif command == 'agency_url':
- agency_url = row[1]
- elif command == 'agency_timezone':
- agency_timezone = row[1]
-
- if not (agency_name and agency_url and agency_timezone):
- print("You must provide agency information")
-
- schedule.NewDefaultAgency(agency_name=agency_name, agency_url=agency_url,
- agency_timezone=agency_timezone)
+ service_period = schedule.GetDefaultServicePeriod()
+ agency_name, agency_url, agency_timezone = (None, None, None)
+
+ for row in table[1:]:
+ command = row[0].lower()
+ if command == "weekday":
+ service_period.SetWeekdayService()
+ elif command == "start_date":
+ service_period.SetStartDate(row[1])
+ elif command == "end_date":
+ service_period.SetEndDate(row[1])
+ elif command == "add_date":
+ service_period.SetDateHasService(date=row[1])
+ elif command == "remove_date":
+ service_period.SetDateHasService(date=row[1], has_service=False)
+ elif command == "agency_name":
+ agency_name = row[1]
+ elif command == "agency_url":
+ agency_url = row[1]
+ elif command == "agency_timezone":
+ agency_timezone = row[1]
+
+ if not (agency_name and agency_url and agency_timezone):
+ print("You must provide agency information")
+
+ schedule.NewDefaultAgency(
+ agency_name=agency_name, agency_url=agency_url, agency_timezone=agency_timezone
+ )
def AddStops(schedule, table):
- for name, lat_str, lng_str in table[1:]:
- stop = schedule.AddStop(lat=float(lat_str), lng=float(lng_str), name=name)
- stops[name.lower()] = stop
+ for name, lat_str, lng_str in table[1:]:
+ stop = schedule.AddStop(lat=float(lat_str), lng=float(lng_str), name=name)
+ stops[name.lower()] = stop
def ProcessTable(schedule, table):
- if table[0][0].lower() == 'options':
- ProcessOptions(schedule, table)
- elif table[0][0].lower() == 'stops':
- AddStops(schedule, table)
- else:
- transposed = [table[0]] # Keep route_short_name and route_long_name on first row
-
- # Transpose rest of table. Input contains the stop names in table[x][0], x
- # >= 1 with trips found in columns, so we need to transpose table[1:].
- # As a diagram Transpose from
- # [['stop 1', '10:00', '11:00', '12:00'],
- # ['stop 2', '10:10', '11:10', '12:10'],
- # ['stop 3', '10:20', '11:20', '12:20']]
- # to
- # [['stop 1', 'stop 2', 'stop 3'],
- # ['10:00', '10:10', '10:20'],
- # ['11:00', '11:11', '11:20'],
- # ['12:00', '12:12', '12:20']]
- transposed.extend(TransposeTable(table[1:]))
- AddRouteToSchedule(schedule, transposed)
+ if table[0][0].lower() == "options":
+ ProcessOptions(schedule, table)
+ elif table[0][0].lower() == "stops":
+ AddStops(schedule, table)
+ else:
+ transposed = [
+ table[0]
+ ] # Keep route_short_name and route_long_name on first row
+
+ # Transpose rest of table. Input contains the stop names in table[x][0], x
+ # >= 1 with trips found in columns, so we need to transpose table[1:].
+ # As a diagram Transpose from
+ # [['stop 1', '10:00', '11:00', '12:00'],
+ # ['stop 2', '10:10', '11:10', '12:10'],
+ # ['stop 3', '10:20', '11:20', '12:20']]
+ # to
+ # [['stop 1', 'stop 2', 'stop 3'],
+ # ['10:00', '10:10', '10:20'],
+ # ['11:00', '11:11', '11:20'],
+ # ['12:00', '12:12', '12:20']]
+ transposed.extend(TransposeTable(table[1:]))
+ AddRouteToSchedule(schedule, transposed)
def main():
- parser = OptionParser()
- parser.add_option('--input', dest='input',
- help='Path of input file')
- parser.add_option('--output', dest='output',
- help='Path of output file, should end in .zip')
- parser.set_defaults(output='feed.zip')
- (options, args) = parser.parse_args()
-
- schedule = transitfeed.Schedule()
-
- table = []
- for line in open(options.input):
- line = line.rstrip()
- if not line:
- ProcessTable(schedule, table)
- table = []
- else:
- table.append(line.split('\t'))
+ parser = OptionParser()
+ parser.add_option("--input", dest="input", help="Path of input file")
+ parser.add_option(
+ "--output", dest="output", help="Path of output file, should end in .zip"
+ )
+ parser.set_defaults(output="feed.zip")
+ (options, args) = parser.parse_args()
+
+ schedule = transitfeed.Schedule()
+
+ table = []
+ for line in open(options.input):
+ line = line.rstrip()
+ if not line:
+ ProcessTable(schedule, table)
+ table = []
+ else:
+ table.append(line.split("\t"))
- ProcessTable(schedule, table)
+ ProcessTable(schedule, table)
- schedule.WriteGoogleTransitFeed(options.output)
+ schedule.WriteGoogleTransitFeed(options.output)
-if __name__ == '__main__':
- main()
+if __name__ == "__main__":
+ main()
diff --git a/extensions/__init__.py b/extensions/__init__.py
index b3931f71..c57c3101 100644
--- a/extensions/__init__.py
+++ b/extensions/__init__.py
@@ -16,4 +16,4 @@
# This file marks the base folder of the FeedValiator extensions as a Python
# module. This module, however, is never used directly. So no imports are needed
-# here.
\ No newline at end of file
+# here.
diff --git a/extensions/googletransit/agency.py b/extensions/googletransit/agency.py
index e27be13b..5708c327 100644
--- a/extensions/googletransit/agency.py
+++ b/extensions/googletransit/agency.py
@@ -19,15 +19,17 @@
from . import extension_util
import transitfeed
+
class Agency(transitfeed.Agency):
- """Extension of transitfeed.Agency:
+ """Extension of transitfeed.Agency:
- Overriding ValidateAgencyLang() for supporting BCP-47 agency_lang codes.
"""
- # Overrides transitfeed.Agency.ValidateAgencyLang() and validates agency_lang
- # using the new pybcp47 module via extension_util.py
- def ValidateAgencyLang(self, problems):
- if not self.agency_lang:
- return False
- return not extension_util.ValidateLanguageCode(
- self.agency_lang, 'agency_lang', problems)
+ # Overrides transitfeed.Agency.ValidateAgencyLang() and validates agency_lang
+ # using the new pybcp47 module via extension_util.py
+ def ValidateAgencyLang(self, problems):
+ if not self.agency_lang:
+ return False
+ return not extension_util.ValidateLanguageCode(
+ self.agency_lang, "agency_lang", problems
+ )
diff --git a/extensions/googletransit/extension_util.py b/extensions/googletransit/extension_util.py
index 7f93bbc4..6149e318 100644
--- a/extensions/googletransit/extension_util.py
+++ b/extensions/googletransit/extension_util.py
@@ -21,17 +21,19 @@
parser = Bcp47LanguageParser()
+
def IsValidLanguageCode(lang):
- """
+ """
Checks the validity of a language code value:
- checks whether the code, as lower case, is well formed and valid BCP47
using the pybcp47 module
"""
- bcp47_obj = parser.ParseLanguage(str(lang.lower()))
- return bcp47_obj.IsWellformed() and bcp47_obj.IsValid()
+ bcp47_obj = parser.ParseLanguage(str(lang.lower()))
+ return bcp47_obj.IsWellformed() and bcp47_obj.IsValid()
+
def ValidateLanguageCode(lang, column_name=None, problems=None):
- """
+ """
Validates a non-required language code value using the pybcp47 module:
- if invalid adds InvalidValue error (if problems accumulator is provided)
- distinguishes between 'not well-formed' and 'not valid' and adds error
@@ -41,19 +43,25 @@ def ValidateLanguageCode(lang, column_name=None, problems=None):
- returns true if the language is valid, false if not well-formed or
invalid.
"""
- if util.IsEmpty(lang):
+ if util.IsEmpty(lang):
+ return True
+ bcp47_obj = parser.ParseLanguage(str(lang.lower()))
+ if not bcp47_obj.wellformed:
+ if problems:
+ problems.InvalidValue(
+ column_name,
+ lang,
+ 'language code "%s" is not well-formed' % lang,
+ type=problems_class.TYPE_ERROR,
+ )
+ return False
+ if not bcp47_obj.valid:
+ if problems:
+ problems.InvalidValue(
+ column_name,
+ lang,
+ 'language code "%s" is not valid, parses as: %s' % (lang, bcp47_obj),
+ type=problems_class.TYPE_WARNING,
+ )
+ return False
return True
- bcp47_obj = parser.ParseLanguage(str(lang.lower()))
- if not bcp47_obj.wellformed:
- if problems:
- problems.InvalidValue(column_name, lang,
- 'language code "%s" is not well-formed' %
- lang, type=problems_class.TYPE_ERROR)
- return False
- if not bcp47_obj.valid:
- if problems:
- problems.InvalidValue(column_name, lang,
- 'language code "%s" is not valid, parses as: %s' %
- (lang, bcp47_obj), type=problems_class.TYPE_WARNING)
- return False
- return True
diff --git a/extensions/googletransit/fareattribute.py b/extensions/googletransit/fareattribute.py
index 397f3baf..a2d1cda1 100644
--- a/extensions/googletransit/fareattribute.py
+++ b/extensions/googletransit/fareattribute.py
@@ -16,27 +16,27 @@
import transitfeed
+
class FareAttribute(transitfeed.FareAttribute):
- """Extension of transitfeed.FareAttribute:
+ """Extension of transitfeed.FareAttribute:
- Adding field 'agency_id' and ValidateAgencyId() function.
- Overriding ValidateAfterAdd() in order to call ValidateAgencyId().
- See open proposal "add agency_id column to fare_attributes.txt" at
http://groups.google.com/group/gtfs-changes/browse_frm/thread/4e74c23bb1f80480
"""
- _FIELD_NAMES = transitfeed.FareAttribute._FIELD_NAMES + [ 'agency_id' ]
+ _FIELD_NAMES = transitfeed.FareAttribute._FIELD_NAMES + ["agency_id"]
- def ValidateAgencyId(self, problems):
- agencies = self._schedule.GetAgencyList()
- for agency in agencies:
- if agency.agency_id == self.agency_id:
- return
- if len(agencies) > 1 or self.agency_id is not None:
- # If there is only one agency and Fare.agencyid is empty or not present
- # then it isn't an error
- problems.InvalidAgencyID('agency_id', self.agency_id,
- 'fare', self.fare_id)
+ def ValidateAgencyId(self, problems):
+ agencies = self._schedule.GetAgencyList()
+ for agency in agencies:
+ if agency.agency_id == self.agency_id:
+ return
+ if len(agencies) > 1 or self.agency_id is not None:
+ # If there is only one agency and Fare.agencyid is empty or not present
+ # then it isn't an error
+ problems.InvalidAgencyID("agency_id", self.agency_id, "fare", self.fare_id)
- def ValidateAfterAdd(self, problems):
- super(FareAttribute, self).ValidateAfterAdd(problems)
- self.ValidateAgencyId(problems)
+ def ValidateAfterAdd(self, problems):
+ super(FareAttribute, self).ValidateAfterAdd(problems)
+ self.ValidateAgencyId(problems)
diff --git a/extensions/googletransit/pybcp47/bcp47languageparser.py b/extensions/googletransit/pybcp47/bcp47languageparser.py
index 289dff21..f7db6f38 100644
--- a/extensions/googletransit/pybcp47/bcp47languageparser.py
+++ b/extensions/googletransit/pybcp47/bcp47languageparser.py
@@ -21,359 +21,413 @@
import string
from functools import reduce
+
class FileParseError(Exception):
- """Exception raised for errors in the subtag registry file. """
+ """Exception raised for errors in the subtag registry file. """
- def __init__(self, line_number, msg):
- self.msg = ("Error at line %s in the subtag registry file: %s" %
- (line_number, msg))
+ def __init__(self, line_number, msg):
+ self.msg = "Error at line %s in the subtag registry file: %s" % (
+ line_number,
+ msg,
+ )
- def __str__(self):
- return repr(self.msg)
+ def __str__(self):
+ return repr(self.msg)
class Bcp47LanguageParser(object):
- """Validates language tags to be well-formed and registered BCP-47 codes. """
-
- def __init__(self):
- # Dictionaries for mapping tags and subtags to their descriptions.
- self.languages = {} # language tags, e.g. 'de'
- self.extlangs = {} # extlang subtags, e.g. 'aao'
- self.scripts = {} # script subtagss, e.g. 'Latn'
- self.regions = {} # region subtags, e.g. 'CA'
- self.variants = {} # variant subtags, e.g. '1901'
- self.grandfathereds = {} # grandfathered tags, e.g. 'sgn-CH-DE'
- self.redundants = {} # redundant subtags, e.g. 'zh-Hant-CN'
- self._ReadLanguageSubtagRegistryFile()
-
- def _GetLinesFromLanguageSubtagRegistryFile(self):
- # Read and yield the registry file from this package. This should be a most
- # recent copy of http://www.iana.org/assignments/language-subtag-registry
- # Formatting rules of this file can be found at page 20 of
- # http://tools.ietf.org/html/rfc5646
- file_name = 'language-subtag-registry.txt'
- # Read Unicode string from the UTF-8 bytes in the file.
- file_string_utf8 = resource_string(__name__, file_name).decode('utf-8')
- # Yield the lines from the file. Handle "folding" indicated by two leading
- # whitespaces.
- accumulated_line_parts = None
- line_number = 0
- for line in file_string_utf8.splitlines():
- line_number += 1
- if line[:2] == ' ':
- accumulated_line_parts.append(line.strip())
- else:
+ """Validates language tags to be well-formed and registered BCP-47 codes. """
+
+ def __init__(self):
+ # Dictionaries for mapping tags and subtags to their descriptions.
+ self.languages = {} # language tags, e.g. 'de'
+ self.extlangs = {} # extlang subtags, e.g. 'aao'
+ self.scripts = {} # script subtagss, e.g. 'Latn'
+ self.regions = {} # region subtags, e.g. 'CA'
+ self.variants = {} # variant subtags, e.g. '1901'
+ self.grandfathereds = {} # grandfathered tags, e.g. 'sgn-CH-DE'
+ self.redundants = {} # redundant subtags, e.g. 'zh-Hant-CN'
+ self._ReadLanguageSubtagRegistryFile()
+
+ def _GetLinesFromLanguageSubtagRegistryFile(self):
+ # Read and yield the registry file from this package. This should be a most
+ # recent copy of http://www.iana.org/assignments/language-subtag-registry
+ # Formatting rules of this file can be found at page 20 of
+ # http://tools.ietf.org/html/rfc5646
+ file_name = "language-subtag-registry.txt"
+ # Read Unicode string from the UTF-8 bytes in the file.
+ file_string_utf8 = resource_string(__name__, file_name).decode("utf-8")
+ # Yield the lines from the file. Handle "folding" indicated by two leading
+ # whitespaces.
+ accumulated_line_parts = None
+ line_number = 0
+ for line in file_string_utf8.splitlines():
+ line_number += 1
+ if line[:2] == " ":
+ accumulated_line_parts.append(line.strip())
+ else:
+ if accumulated_line_parts:
+ yield (" ".join(accumulated_line_parts), line_number)
+ accumulated_line_parts = [line.strip()]
+ else:
+ accumulated_line_parts = [line.strip()]
if accumulated_line_parts:
- yield (' '.join(accumulated_line_parts), line_number)
- accumulated_line_parts = [line.strip()]
- else:
- accumulated_line_parts = [line.strip()]
- if accumulated_line_parts:
- yield (' '.join(accumulated_line_parts), line_number)
-
- def _ReadLanguageSubtagRegistryFile(self):
- # Load the entries from the registry file in this package.
- line_iterator = self._GetLinesFromLanguageSubtagRegistryFile()
- # Read the header lines with the File-Date record.
- first_line, line_number = next(line_iterator)
- if not first_line[:11] == 'File-Date: ':
- raise FileParseError(line_number,
- "Invalid first line '%s'! Must be a File-Date record." % (first_line))
- second_line, line_number = next(line_iterator)
- if not second_line == '%%':
- raise FileParseError(line_number,
- "Invalid first record '%s'! Must start with '%%%%'." % (second_line))
- # Read the (Sub)tag records.
- current_type = None
- current_tag = None
- current_descriptions = []
- current_prefixes = []
- for line, line_number in line_iterator:
- if line == '%%':
- self._AddSubtagFromRegistryFile(current_type, current_tag,
- current_descriptions, current_prefixes,
- line_number)
+ yield (" ".join(accumulated_line_parts), line_number)
+
+ def _ReadLanguageSubtagRegistryFile(self):
+ # Load the entries from the registry file in this package.
+ line_iterator = self._GetLinesFromLanguageSubtagRegistryFile()
+ # Read the header lines with the File-Date record.
+ first_line, line_number = next(line_iterator)
+ if not first_line[:11] == "File-Date: ":
+ raise FileParseError(
+ line_number,
+ "Invalid first line '%s'! Must be a File-Date record." % (first_line),
+ )
+ second_line, line_number = next(line_iterator)
+ if not second_line == "%%":
+ raise FileParseError(
+ line_number,
+ "Invalid first record '%s'! Must start with '%%%%'." % (second_line),
+ )
+ # Read the (Sub)tag records.
current_type = None
current_tag = None
current_descriptions = []
current_prefixes = []
- continue
-
- line_parts = line.split(': ')
- if len(line_parts) > 2 and line_parts[0] == 'Comments':
- # Silently ignore comments. They may contain multiple ':'.
- continue
- if len(line_parts) != 2:
- raise FileParseError(line_number,
- "Invalid line %s in registry file!" % (line))
-
- line_key, line_value = line_parts
- if line_key == 'Type':
- if current_type:
- raise FileParseError(line_number,
- "Duplicate Type for (Sub)tag %s" % (current_tag))
- current_type = line_value.lower()
- elif line_key == 'Subtag' or line_key == 'Tag':
- if current_tag:
- raise FileParseError(line_number,
- "Duplicate (Sub)tag %s" % (current_tag))
- current_tag = line_value.lower()
- elif line_key == 'Description':
- current_descriptions.append(line_value)
- elif line_key == 'Prefix':
- current_prefixes.append(line_value)
- elif line_key not in ['Added', 'Deprecated', 'Preferred-Value',
- 'Suppress-Script', 'Macrolanguage', 'Scope',
- 'Comments']:
- raise FileParseError(line_number,
- "Invalid registry field %s with value %s" %
- (line_key, line_value))
-
- # The last record does not get terminated by the '%%' preceding the next
- # record. So we have to add it after the 'for' loop.
- self._AddSubtagFromRegistryFile(current_type, current_tag,
- current_descriptions, current_prefixes,
- line_number)
-
- def IntStr26ToInt(self, int_str):
- return reduce(lambda x, y: 26 * x + y, map(string.lowercase.index, int_str))
-
- def IntToIntStr26(self, int_value, int_str=''):
- if int_value == 0:
- return int_str
- return self.IntToIntStr26(
- int_value/26, string.lowercase[int_value%26] + int_str)
-
- def _AddSubtagFromRegistryFile(self, current_type, current_tag,
- current_descriptions, current_prefixes,
- line_number):
- if not current_descriptions:
- raise FileParseError(line_number,
- "Missing Description(s) for (Sub)tag %s" % (current_tag))
- current_description = ', '.join(current_descriptions)
-
- if not current_tag:
- raise FileParseError(line_number,
- "Missing (Sub)tag for Type %s and Description(s) %s" %
- (current_type, current_description))
-
- if '..' in current_tag:
- # Decompose ranges (private use range) and add them recursively.
- range_parts = current_tag.split('..')
- if len(range_parts) != 2:
- raise FileParseError(line_number,
- "(Sub)tag ranges must consist of two tags being separated by '..'! "
- "The range '%s' of Type '%s' is invalid." %
- (current_tag, current_type))
- start_str, end_str = range_parts
- if len(start_str) != len(end_str):
- raise FileParseError(line_number,
- "The start and end tags in ranges must have the same length! "
- "The tags '%s' and '%s' are different!" % (start_str, end_str))
- for i in range(
- self.IntStr26ToInt(start_str), self.IntStr26ToInt(end_str) + 1):
- range_tag = self.IntToIntStr26(i)
- self._AddSubtagFromRegistryFile(current_type, range_tag,
- current_descriptions, current_prefixes,
- line_number)
- # Range tags are added as recursion so we have to return afterwards.
- return
-
- if current_type == 'language':
- self.languages[current_tag] = current_description
- elif current_type == 'extlang':
- if current_prefixes:
- for current_prefix in current_prefixes:
- extlang = current_prefix + "-" + current_tag
- self.extlangs[extlang] = current_description
- self.extlangs[current_tag] = current_description
- elif current_type == 'region':
- self.regions[current_tag] = current_description
- elif current_type == 'variant':
- self.variants[current_tag] = current_description
- elif current_type == 'grandfathered':
- self.grandfathereds[current_tag] = current_description
- elif current_type == 'redundant':
- self.redundants[current_tag] = current_description
- elif current_type == 'script':
- self.scripts[current_tag] = current_description
- else:
- raise FileParseError(line_number,
- "Invalid Type %s for Subtag %s" % (current_type, current_tag))
-
- # A dictionary for the regular expression strings to test wellformedness.
- _wellformed_dict = {}
-
- # Language subtag of 4 to 8 characters registered in the IANA subtag registry.
- _wellformed_dict['iana_lang'] = "[a-zA-Z]{4,8}"
- # Language subtag of 2 or 3 characters according to ISO 639-1/-2/-3/-5,
- # optionally followed by up to 3 extended language subtags of 3 characters
- # each as in ISO 639-3.
- _wellformed_dict['extlang'] = "[a-zA-Z]{3}"
- _wellformed_dict['iso_lang_extlang'] = (
- "[a-zA-Z]{2,3}(-%(extlang)s){0,3}" % _wellformed_dict)
- _wellformed_dict['lang'] = (
- "(%(iana_lang)s|%(iso_lang_extlang)s|x)" % _wellformed_dict)
-
- # Script subtag of 4 characters as in ISO 15924.
- _wellformed_dict['script'] = "[a-zA-Z]{4}"
-
- # Region subtag of 2 characters as in ISO 3166-1 or of 3 digits as in the
- # "UN Standard Country or Area Codes for Statistical Use".
- _wellformed_dict['region'] = "([a-zA-Z]{2}|\d{3})"
-
- # Variant subtag of 4 to 8 characters (must begin with a digit if length 4).
- _wellformed_dict['variant'] = "([a-zA-Z0-9]{5,8}|\d[a-zA-Z0-9]{3})"
-
- # Extension subtag consisting of a singleton subtag (1 character, not "x")
- # followed by at least one subtag of 2 to 8 characters.
- _wellformed_dict['extension'] = "([a-wyzA-WYZ0-9](-[a-zA-Z0-9]{2,8})+)"
-
- # Private subtag consisting of the subtag "x" followed by 1..n subtags of
- # 1 to 8 characters.
- _wellformed_dict['private'] = "x(-([a-zA-Z0-9]{1,8}))+"
-
- # Full BCP-47 wellformed regular expression.
- _wellformed_bcp47 = re.compile(
- "^("
- "((%(iana_lang)s)" # 1 language subtag as registered at IANA
- "|(%(iso_lang_extlang)s))" # or 1 language subtag as in ISO 639x
- "(-%(script)s)?" # 0..1 script subtags
- "(-%(region)s)?" # 0..1 region subtags
- "(-%(variant)s)*" # 0..n variant subtags
- "(-%(extension)s)*" # 0..n extension subtags
- ")?"
- "((^|-)%(private)s)?$" # 0..1 private subtags (can be standalone)
- % _wellformed_dict)
-
- def IsWellformedSubtag(self, subtag, subtag_type):
- if subtag_type in self._wellformed_dict:
- subtag_regexp = "^%s$" % (self._wellformed_dict[subtag_type])
- return re.search(subtag_regexp, subtag) is not None
- return False
-
- def IsWellformed(self, lang_code):
- if lang_code.lower() in self.grandfathereds:
- return True
- match_obj = self._wellformed_bcp47.match(lang_code)
- if not match_obj:
- return False
- elif match_obj.group(0) != lang_code:
- return False
- else:
- return True
-
- def ParseLanguage(self, lang_code):
- lang_obj = Bcp47LanguageObject(lang_code)
-
- if not self.IsWellformed(lang_code):
- return lang_obj
- lang_obj.wellformed = True
-
- lang_code = lang_code.lower()
- if lang_code in self.grandfathereds:
- return lang_obj.Update(self.grandfathereds[lang_code], True, True)
- if lang_code in self.redundants:
- return lang_obj.Update(self.redundants[lang_code], True, True)
-
- lang_code_parts = lang_code.split('-')
- lang_code_part_len = len(lang_code_parts)
- lang_code_part_idx = 0
- lang_tag = lang_code_parts[lang_code_part_idx]
- if not self.IsWellformedSubtag(lang_tag, "lang"):
- return lang_obj.Update(None, False, False)
- elif lang_tag != 'x':
- if lang_tag in self.languages:
- lang_obj.descriptions.append(self.languages[lang_tag])
- else:
- return lang_obj.Update("unknown language \'" + lang_tag + "\'",
- True, False)
- lang_code_part_idx = lang_code_part_idx + 1
-
- if lang_code_part_idx == lang_code_part_len:
- return lang_obj.Update(None, True, True)
-
- extlang_tag = lang_code_parts[lang_code_part_idx]
- if self.IsWellformedSubtag(extlang_tag, "extlang"):
- if extlang_tag in self.extlangs:
- lang_obj.descriptions.append(self.extlangs[extlang_tag])
- else:
- return lang_obj.Update("unknown extlang \'" + extlang_tag + "\'",
- True, False)
- lang_code_part_idx = lang_code_part_idx + 1
-
- if lang_code_part_idx == lang_code_part_len:
- return lang_obj.Update(None, True, True)
-
- script_tag = lang_code_parts[lang_code_part_idx]
- if self.IsWellformedSubtag(script_tag, "script"):
- if script_tag in self.scripts:
- lang_obj.descriptions.append(self.scripts[script_tag]+" script")
- else:
- return lang_obj.Update("unknown script \'" + script_tag + "\'",
- True, False)
- lang_code_part_idx = lang_code_part_idx + 1
-
- if lang_code_part_idx == lang_code_part_len:
- return lang_obj.Update(None, True, True)
-
- region_tag = lang_code_parts[lang_code_part_idx]
- if self.IsWellformedSubtag(region_tag, "region"):
- if region_tag in self.regions:
- lang_obj.descriptions.append(self.regions[region_tag])
- else:
- return lang_obj.Update("unknown region \'" + region_tag + "\'",
- True, False)
- lang_code_part_idx = lang_code_part_idx + 1
-
- if lang_code_part_idx == lang_code_part_len:
- return lang_obj.Update(None, True, True)
-
- variant_tag = lang_code_parts[lang_code_part_idx]
- if self.IsWellformedSubtag(variant_tag, "variant"):
- if variant_tag in self.variants:
- lang_obj.descriptions.append(self.variants[variant_tag])
- else:
- return lang_obj.Update("unknown variant \'" + variant_tag + "\'",
- True, False)
- lang_code_part_idx = lang_code_part_idx + 1
-
- if lang_code_part_len > lang_code_part_idx:
- remainder = "-".join(lang_code_parts[lang_code_part_idx:])
- if len(remainder) > 0:
- return lang_obj.Update("subtag \'"+remainder+"\' was ignored", True,
- True)
-
- return lang_obj.Update(None, True, True)
-
- def Parse_ISO639_1_Language(self, lang_code):
- lang_obj = Bcp47LanguageObject(lang_code)
-
- lang_code = lang_code.lower()
- if len(lang_code) == 2:
- if lang_code in self.languages:
- lang_obj.Update(self.languages[lang_code], True, True)
- else:
- match_obj = re.match("^([a-z]{2})", lang_code)
- if match_obj:
- lang_obj.wellformed = True
-
- return lang_obj
+ for line, line_number in line_iterator:
+ if line == "%%":
+ self._AddSubtagFromRegistryFile(
+ current_type,
+ current_tag,
+ current_descriptions,
+ current_prefixes,
+ line_number,
+ )
+ current_type = None
+ current_tag = None
+ current_descriptions = []
+ current_prefixes = []
+ continue
+
+ line_parts = line.split(": ")
+ if len(line_parts) > 2 and line_parts[0] == "Comments":
+ # Silently ignore comments. They may contain multiple ':'.
+ continue
+ if len(line_parts) != 2:
+ raise FileParseError(
+ line_number, "Invalid line %s in registry file!" % (line)
+ )
+
+ line_key, line_value = line_parts
+ if line_key == "Type":
+ if current_type:
+ raise FileParseError(
+ line_number, "Duplicate Type for (Sub)tag %s" % (current_tag)
+ )
+ current_type = line_value.lower()
+ elif line_key == "Subtag" or line_key == "Tag":
+ if current_tag:
+ raise FileParseError(
+ line_number, "Duplicate (Sub)tag %s" % (current_tag)
+ )
+ current_tag = line_value.lower()
+ elif line_key == "Description":
+ current_descriptions.append(line_value)
+ elif line_key == "Prefix":
+ current_prefixes.append(line_value)
+ elif line_key not in [
+ "Added",
+ "Deprecated",
+ "Preferred-Value",
+ "Suppress-Script",
+ "Macrolanguage",
+ "Scope",
+ "Comments",
+ ]:
+ raise FileParseError(
+ line_number,
+ "Invalid registry field %s with value %s" % (line_key, line_value),
+ )
+
+ # The last record does not get terminated by the '%%' preceding the next
+ # record. So we have to add it after the 'for' loop.
+ self._AddSubtagFromRegistryFile(
+ current_type,
+ current_tag,
+ current_descriptions,
+ current_prefixes,
+ line_number,
+ )
+
+ def IntStr26ToInt(self, int_str):
+ return reduce(lambda x, y: 26 * x + y, map(string.lowercase.index, int_str))
+
+ def IntToIntStr26(self, int_value, int_str=""):
+ if int_value == 0:
+ return int_str
+ return self.IntToIntStr26(
+ int_value / 26, string.lowercase[int_value % 26] + int_str
+ )
+
+ def _AddSubtagFromRegistryFile(
+ self,
+ current_type,
+ current_tag,
+ current_descriptions,
+ current_prefixes,
+ line_number,
+ ):
+ if not current_descriptions:
+ raise FileParseError(
+ line_number, "Missing Description(s) for (Sub)tag %s" % (current_tag)
+ )
+ current_description = ", ".join(current_descriptions)
+
+ if not current_tag:
+ raise FileParseError(
+ line_number,
+ "Missing (Sub)tag for Type %s and Description(s) %s"
+ % (current_type, current_description),
+ )
+
+ if ".." in current_tag:
+ # Decompose ranges (private use range) and add them recursively.
+ range_parts = current_tag.split("..")
+ if len(range_parts) != 2:
+ raise FileParseError(
+ line_number,
+ "(Sub)tag ranges must consist of two tags being separated by '..'! "
+ "The range '%s' of Type '%s' is invalid."
+ % (current_tag, current_type),
+ )
+ start_str, end_str = range_parts
+ if len(start_str) != len(end_str):
+ raise FileParseError(
+ line_number,
+ "The start and end tags in ranges must have the same length! "
+ "The tags '%s' and '%s' are different!" % (start_str, end_str),
+ )
+ for i in range(
+ self.IntStr26ToInt(start_str), self.IntStr26ToInt(end_str) + 1
+ ):
+ range_tag = self.IntToIntStr26(i)
+ self._AddSubtagFromRegistryFile(
+ current_type,
+ range_tag,
+ current_descriptions,
+ current_prefixes,
+ line_number,
+ )
+ # Range tags are added as recursion so we have to return afterwards.
+ return
+
+ if current_type == "language":
+ self.languages[current_tag] = current_description
+ elif current_type == "extlang":
+ if current_prefixes:
+ for current_prefix in current_prefixes:
+ extlang = current_prefix + "-" + current_tag
+ self.extlangs[extlang] = current_description
+ self.extlangs[current_tag] = current_description
+ elif current_type == "region":
+ self.regions[current_tag] = current_description
+ elif current_type == "variant":
+ self.variants[current_tag] = current_description
+ elif current_type == "grandfathered":
+ self.grandfathereds[current_tag] = current_description
+ elif current_type == "redundant":
+ self.redundants[current_tag] = current_description
+ elif current_type == "script":
+ self.scripts[current_tag] = current_description
+ else:
+ raise FileParseError(
+ line_number,
+ "Invalid Type %s for Subtag %s" % (current_type, current_tag),
+ )
+
+ # A dictionary for the regular expression strings to test wellformedness.
+ _wellformed_dict = {}
+
+ # Language subtag of 4 to 8 characters registered in the IANA subtag registry.
+ _wellformed_dict["iana_lang"] = "[a-zA-Z]{4,8}"
+ # Language subtag of 2 or 3 characters according to ISO 639-1/-2/-3/-5,
+ # optionally followed by up to 3 extended language subtags of 3 characters
+ # each as in ISO 639-3.
+ _wellformed_dict["extlang"] = "[a-zA-Z]{3}"
+ _wellformed_dict["iso_lang_extlang"] = (
+ "[a-zA-Z]{2,3}(-%(extlang)s){0,3}" % _wellformed_dict
+ )
+ _wellformed_dict["lang"] = (
+ "(%(iana_lang)s|%(iso_lang_extlang)s|x)" % _wellformed_dict
+ )
+
+ # Script subtag of 4 characters as in ISO 15924.
+ _wellformed_dict["script"] = "[a-zA-Z]{4}"
+
+ # Region subtag of 2 characters as in ISO 3166-1 or of 3 digits as in the
+ # "UN Standard Country or Area Codes for Statistical Use".
+ _wellformed_dict["region"] = "([a-zA-Z]{2}|\d{3})"
+
+ # Variant subtag of 4 to 8 characters (must begin with a digit if length 4).
+ _wellformed_dict["variant"] = "([a-zA-Z0-9]{5,8}|\d[a-zA-Z0-9]{3})"
+
+ # Extension subtag consisting of a singleton subtag (1 character, not "x")
+ # followed by at least one subtag of 2 to 8 characters.
+ _wellformed_dict["extension"] = "([a-wyzA-WYZ0-9](-[a-zA-Z0-9]{2,8})+)"
+
+ # Private subtag consisting of the subtag "x" followed by 1..n subtags of
+ # 1 to 8 characters.
+ _wellformed_dict["private"] = "x(-([a-zA-Z0-9]{1,8}))+"
+
+ # Full BCP-47 wellformed regular expression.
+ _wellformed_bcp47 = re.compile(
+ "^("
+ "((%(iana_lang)s)" # 1 language subtag as registered at IANA
+ "|(%(iso_lang_extlang)s))" # or 1 language subtag as in ISO 639x
+ "(-%(script)s)?" # 0..1 script subtags
+ "(-%(region)s)?" # 0..1 region subtags
+ "(-%(variant)s)*" # 0..n variant subtags
+ "(-%(extension)s)*" # 0..n extension subtags
+ ")?"
+ "((^|-)%(private)s)?$" # 0..1 private subtags (can be standalone)
+ % _wellformed_dict
+ )
+
+ def IsWellformedSubtag(self, subtag, subtag_type):
+ if subtag_type in self._wellformed_dict:
+ subtag_regexp = "^%s$" % (self._wellformed_dict[subtag_type])
+ return re.search(subtag_regexp, subtag) is not None
+ return False
+
+ def IsWellformed(self, lang_code):
+ if lang_code.lower() in self.grandfathereds:
+ return True
+ match_obj = self._wellformed_bcp47.match(lang_code)
+ if not match_obj:
+ return False
+ elif match_obj.group(0) != lang_code:
+ return False
+ else:
+ return True
+
+ def ParseLanguage(self, lang_code):
+ lang_obj = Bcp47LanguageObject(lang_code)
+
+ if not self.IsWellformed(lang_code):
+ return lang_obj
+ lang_obj.wellformed = True
+
+ lang_code = lang_code.lower()
+ if lang_code in self.grandfathereds:
+ return lang_obj.Update(self.grandfathereds[lang_code], True, True)
+ if lang_code in self.redundants:
+ return lang_obj.Update(self.redundants[lang_code], True, True)
+
+ lang_code_parts = lang_code.split("-")
+ lang_code_part_len = len(lang_code_parts)
+ lang_code_part_idx = 0
+ lang_tag = lang_code_parts[lang_code_part_idx]
+ if not self.IsWellformedSubtag(lang_tag, "lang"):
+ return lang_obj.Update(None, False, False)
+ elif lang_tag != "x":
+ if lang_tag in self.languages:
+ lang_obj.descriptions.append(self.languages[lang_tag])
+ else:
+ return lang_obj.Update(
+ "unknown language '" + lang_tag + "'", True, False
+ )
+ lang_code_part_idx = lang_code_part_idx + 1
+
+ if lang_code_part_idx == lang_code_part_len:
+ return lang_obj.Update(None, True, True)
+
+ extlang_tag = lang_code_parts[lang_code_part_idx]
+ if self.IsWellformedSubtag(extlang_tag, "extlang"):
+ if extlang_tag in self.extlangs:
+ lang_obj.descriptions.append(self.extlangs[extlang_tag])
+ else:
+ return lang_obj.Update(
+ "unknown extlang '" + extlang_tag + "'", True, False
+ )
+ lang_code_part_idx = lang_code_part_idx + 1
+
+ if lang_code_part_idx == lang_code_part_len:
+ return lang_obj.Update(None, True, True)
+
+ script_tag = lang_code_parts[lang_code_part_idx]
+ if self.IsWellformedSubtag(script_tag, "script"):
+ if script_tag in self.scripts:
+ lang_obj.descriptions.append(self.scripts[script_tag] + " script")
+ else:
+ return lang_obj.Update(
+ "unknown script '" + script_tag + "'", True, False
+ )
+ lang_code_part_idx = lang_code_part_idx + 1
+
+ if lang_code_part_idx == lang_code_part_len:
+ return lang_obj.Update(None, True, True)
+
+ region_tag = lang_code_parts[lang_code_part_idx]
+ if self.IsWellformedSubtag(region_tag, "region"):
+ if region_tag in self.regions:
+ lang_obj.descriptions.append(self.regions[region_tag])
+ else:
+ return lang_obj.Update(
+ "unknown region '" + region_tag + "'", True, False
+ )
+ lang_code_part_idx = lang_code_part_idx + 1
+
+ if lang_code_part_idx == lang_code_part_len:
+ return lang_obj.Update(None, True, True)
+
+ variant_tag = lang_code_parts[lang_code_part_idx]
+ if self.IsWellformedSubtag(variant_tag, "variant"):
+ if variant_tag in self.variants:
+ lang_obj.descriptions.append(self.variants[variant_tag])
+ else:
+ return lang_obj.Update(
+ "unknown variant '" + variant_tag + "'", True, False
+ )
+ lang_code_part_idx = lang_code_part_idx + 1
+
+ if lang_code_part_len > lang_code_part_idx:
+ remainder = "-".join(lang_code_parts[lang_code_part_idx:])
+ if len(remainder) > 0:
+ return lang_obj.Update(
+ "subtag '" + remainder + "' was ignored", True, True
+ )
+
+ return lang_obj.Update(None, True, True)
+
+ def Parse_ISO639_1_Language(self, lang_code):
+ lang_obj = Bcp47LanguageObject(lang_code)
+
+ lang_code = lang_code.lower()
+ if len(lang_code) == 2:
+ if lang_code in self.languages:
+ lang_obj.Update(self.languages[lang_code], True, True)
+ else:
+ match_obj = re.match("^([a-z]{2})", lang_code)
+ if match_obj:
+ lang_obj.wellformed = True
+
+ return lang_obj
class Bcp47LanguageObject(object):
- def __init__(self, lang_code):
- self.lang_code = lang_code
- self.descriptions = []
- self.wellformed = False
- self.valid = False
-
- def Update(self, description, wellformed, valid):
- if description:
- self.descriptions.append(description)
- self.wellformed = wellformed
- self.valid = valid
- return self
-
- def __str__(self):
- return ', '.join(self.descriptions)
+ def __init__(self, lang_code):
+ self.lang_code = lang_code
+ self.descriptions = []
+ self.wellformed = False
+ self.valid = False
+
+ def Update(self, description, wellformed, valid):
+ if description:
+ self.descriptions.append(description)
+ self.wellformed = wellformed
+ self.valid = valid
+ return self
+
+ def __str__(self):
+ return ", ".join(self.descriptions)
diff --git a/extensions/googletransit/pybcp47/testpybcp47.py b/extensions/googletransit/pybcp47/testpybcp47.py
index 4980e2a9..ba94770b 100644
--- a/extensions/googletransit/pybcp47/testpybcp47.py
+++ b/extensions/googletransit/pybcp47/testpybcp47.py
@@ -23,71 +23,89 @@
from .bcp47languageparser import Bcp47LanguageParser
+
class PyBcp47TestCase(unittest.TestCase):
- bcp47parser = Bcp47LanguageParser()
+ bcp47parser = Bcp47LanguageParser()
- def testRegistryFileRecordsBeingWellformed(self):
- # Test whether the parsed entries from the registry file in this package are
- # valid. The registry file in this package is originally downloaded from
- # http://www.iana.org/assignments/language-subtag-registry. Formatting
- # rules of this file can be found at http://tools.ietf.org/html/rfc5646
- for tag in self.bcp47parser.grandfathereds.keys():
- self.assertTrue(self.bcp47parser.IsWellformed(tag),
- "Grandfathered tag '%s' in language-subtag-registry.txt "
- "seems to be invalid!" % (tag))
- for tag in self.bcp47parser.redundants.keys():
- self.assertTrue(self.bcp47parser.IsWellformed(tag),
- "Redundant tag '%s' in language-subtag-registry.txt "
- "seems to be invalid!" % (tag))
- for tag in self.bcp47parser.languages.keys():
- self.assertTrue(self.bcp47parser.IsWellformedSubtag(tag, "lang"),
- "Language subtag '%s' in language-subtag-registry.txt "
- "seems to be invalid!" % (tag))
- for tag in self.bcp47parser.extlangs.keys():
- # extlangs contains each for each extlang just the tag and the tag
- # combined with its prefix. E.g. 'aao' and 'ar-aao'.
- extlang_parts = tag.split("-")
- extlang = extlang_parts[len(extlang_parts) - 1]
- self.assertTrue(self.bcp47parser.IsWellformedSubtag(extlang, "extlang"),
- "Extlang subtag '%s' in language-subtag-registry.txt "
- "seems to be invalid!" % (tag))
- for tag in self.bcp47parser.scripts.keys():
- self.assertTrue(self.bcp47parser.IsWellformedSubtag(tag, "script"),
- "Script subtag '%s' in language-subtag-registry.txt "
- "seems to be invalid!" % (tag))
- for tag in self.bcp47parser.regions.keys():
- self.assertTrue(self.bcp47parser.IsWellformedSubtag(tag, "region"),
- "Region subtag '%s' in language-subtag-registry.txt "
- "seems to be invalid!" % (tag))
- for tag in self.bcp47parser.variants.keys():
- self.assertTrue(self.bcp47parser.IsWellformedSubtag(tag, "variant"),
- "Variant subtag '%s' in language-subtag-registry.txt "
- "seems to be invalid!" % (tag))
+ def testRegistryFileRecordsBeingWellformed(self):
+ # Test whether the parsed entries from the registry file in this package are
+ # valid. The registry file in this package is originally downloaded from
+ # http://www.iana.org/assignments/language-subtag-registry. Formatting
+ # rules of this file can be found at http://tools.ietf.org/html/rfc5646
+ for tag in self.bcp47parser.grandfathereds.keys():
+ self.assertTrue(
+ self.bcp47parser.IsWellformed(tag),
+ "Grandfathered tag '%s' in language-subtag-registry.txt "
+ "seems to be invalid!" % (tag),
+ )
+ for tag in self.bcp47parser.redundants.keys():
+ self.assertTrue(
+ self.bcp47parser.IsWellformed(tag),
+ "Redundant tag '%s' in language-subtag-registry.txt "
+ "seems to be invalid!" % (tag),
+ )
+ for tag in self.bcp47parser.languages.keys():
+ self.assertTrue(
+ self.bcp47parser.IsWellformedSubtag(tag, "lang"),
+ "Language subtag '%s' in language-subtag-registry.txt "
+ "seems to be invalid!" % (tag),
+ )
+ for tag in self.bcp47parser.extlangs.keys():
+ # extlangs contains each for each extlang just the tag and the tag
+ # combined with its prefix. E.g. 'aao' and 'ar-aao'.
+ extlang_parts = tag.split("-")
+ extlang = extlang_parts[len(extlang_parts) - 1]
+ self.assertTrue(
+ self.bcp47parser.IsWellformedSubtag(extlang, "extlang"),
+ "Extlang subtag '%s' in language-subtag-registry.txt "
+ "seems to be invalid!" % (tag),
+ )
+ for tag in self.bcp47parser.scripts.keys():
+ self.assertTrue(
+ self.bcp47parser.IsWellformedSubtag(tag, "script"),
+ "Script subtag '%s' in language-subtag-registry.txt "
+ "seems to be invalid!" % (tag),
+ )
+ for tag in self.bcp47parser.regions.keys():
+ self.assertTrue(
+ self.bcp47parser.IsWellformedSubtag(tag, "region"),
+ "Region subtag '%s' in language-subtag-registry.txt "
+ "seems to be invalid!" % (tag),
+ )
+ for tag in self.bcp47parser.variants.keys():
+ self.assertTrue(
+ self.bcp47parser.IsWellformedSubtag(tag, "variant"),
+ "Variant subtag '%s' in language-subtag-registry.txt "
+ "seems to be invalid!" % (tag),
+ )
- def testValidationWithSamples(self):
- # Test whether samples are all well-formed but not valid.
- self._CheckTagsInFile("well-formed-not-valid-tags.txt", True, False)
+ def testValidationWithSamples(self):
+ # Test whether samples are all well-formed but not valid.
+ self._CheckTagsInFile("well-formed-not-valid-tags.txt", True, False)
- # Test whether samples are all not well-formed.
- self._CheckTagsInFile("not-well-formed-tags.txt", False, False)
+ # Test whether samples are all not well-formed.
+ self._CheckTagsInFile("not-well-formed-tags.txt", False, False)
- # Test whether samples are all valid.
- self._CheckTagsInFile("valid-tags.txt", True, True)
+ # Test whether samples are all valid.
+ self._CheckTagsInFile("valid-tags.txt", True, True)
- def _CheckTagsInFile(self, filename, should_be_wellformed, should_be_valid):
- full_filename = os.path.join(os.path.dirname(__file__), "testdata",
- filename)
- fileObj = codecs.open(full_filename, "r", "utf-8" )
- for line in fileObj:
- line_parts = line.split("#")
- tag = line_parts[0].strip()
- if tag:
- lang_obj = self.bcp47parser.ParseLanguage(tag)
- self.assertEqual(
- lang_obj.wellformed, should_be_wellformed,
- "the language code '%s' (%s) should%s be well-formed" %
- (tag, lang_obj, str((not should_be_wellformed and " not") or "")))
- self.assertEqual(
- lang_obj.valid, should_be_valid,
- "the language code '%s' (%s) should%s be valid" %
- (tag, lang_obj, str((not should_be_valid and " not") or "")))
+ def _CheckTagsInFile(self, filename, should_be_wellformed, should_be_valid):
+ full_filename = os.path.join(os.path.dirname(__file__), "testdata", filename)
+ fileObj = codecs.open(full_filename, "r", "utf-8")
+ for line in fileObj:
+ line_parts = line.split("#")
+ tag = line_parts[0].strip()
+ if tag:
+ lang_obj = self.bcp47parser.ParseLanguage(tag)
+ self.assertEqual(
+ lang_obj.wellformed,
+ should_be_wellformed,
+ "the language code '%s' (%s) should%s be well-formed"
+ % (tag, lang_obj, str((not should_be_wellformed and " not") or "")),
+ )
+ self.assertEqual(
+ lang_obj.valid,
+ should_be_valid,
+ "the language code '%s' (%s) should%s be valid"
+ % (tag, lang_obj, str((not should_be_valid and " not") or "")),
+ )
diff --git a/extensions/googletransit/route.py b/extensions/googletransit/route.py
index fc9f33d7..1874bdd9 100644
--- a/extensions/googletransit/route.py
+++ b/extensions/googletransit/route.py
@@ -17,8 +17,9 @@
import transitfeed
import transitfeed.util as util
+
class Route(transitfeed.Route):
- """Extension of transitfeed.Route:
+ """Extension of transitfeed.Route:
- Adding field 'co2_per_km' and ValidateCo2PerKm() function. See proposal at
https://sites.google.com/site/gtfschanges/open-proposals
- Adding HVT (Hierarchical Vehicle Type) route types, also used in
@@ -27,57 +28,60 @@ class Route(transitfeed.Route):
at http://groups.google.com/group/gtfs-changes/msg/ed917a69cf8c5bef
"""
- _FIELD_NAMES = transitfeed.Route._FIELD_NAMES + [ 'co2_per_km' ]
+ _FIELD_NAMES = transitfeed.Route._FIELD_NAMES + ["co2_per_km"]
- _ROUTE_TYPES = dict(transitfeed.Route._ROUTE_TYPES.items() + {
- 8: {'name':'Horse Carriage', 'max_speed':50},
- 9: {'name':'Intercity Bus', 'max_speed':120},
- 10: {'name':'Commuter Train', 'max_speed':150},
- 11: {'name':'Trolleybus', 'max_speed':100},
- 12: {'name':'Monorail', 'max_speed':150},
- # adding IDs from hierarchical vehicle types (HVT) list
- 100: {'name':'Railway Service', 'max_speed':300},
- 101: {'name':'High Speed Rail', 'max_speed':300},
- 102: {'name':'Long Distance Trains', 'max_speed':300},
- 108: {'name':'Rail Shuttle', 'max_speed':300},
- 109: {'name':'Suburban Railway', 'max_speed':300},
- 200: {'name':'CoachService', 'max_speed':120},
- 201: {'name':'InternationalCoach', 'max_speed':120},
- 202: {'name':'NationalCoach', 'max_speed':120},
- 204: {'name':'RegionalCoach', 'max_speed':120},
- 208: {'name':'CommuterCoach', 'max_speed':120},
- 400: {'name':'UrbanRailwayService', 'max_speed':150},
- 401: {'name':'Metro', 'max_speed':150},
- 402: {'name':'Underground', 'max_speed':150},
- 405: {'name':'Monorail', 'max_speed':150},
- 700: {'name':'BusService', 'max_speed':100},
- 701: {'name':'RegionalBus', 'max_speed':120},
- 702: {'name':'ExpressBus', 'max_speed':120},
- 704: {'name':'LocalBus', 'max_speed':100},
- 800: {'name':'TrolleybusService', 'max_speed':100},
- 900: {'name':'TramService', 'max_speed':100},
- 1000: {'name':'WaterTransportService', 'max_speed':80},
- 1100: {'name':'AirService', 'max_speed':1000},
- 1300: {'name':'TelecabinService', 'max_speed':50},
- 1400: {'name':'FunicularService', 'max_speed':50},
- 1500: {'name':'TaxiService', 'max_speed':100},
- 1501: {'name':'CommunalTaxi', 'max_speed':100},
- 1700: {'name':'MiscellaneousService', 'max_speed':100},
- 1701: {'name':'CableCar', 'max_speed':50},
- 1702: {'name':'HorseDrawnCarriage', 'max_speed':50}
- }.items())
+ _ROUTE_TYPES = dict(
+ transitfeed.Route._ROUTE_TYPES.items()
+ + {
+ 8: {"name": "Horse Carriage", "max_speed": 50},
+ 9: {"name": "Intercity Bus", "max_speed": 120},
+ 10: {"name": "Commuter Train", "max_speed": 150},
+ 11: {"name": "Trolleybus", "max_speed": 100},
+ 12: {"name": "Monorail", "max_speed": 150},
+ # adding IDs from hierarchical vehicle types (HVT) list
+ 100: {"name": "Railway Service", "max_speed": 300},
+ 101: {"name": "High Speed Rail", "max_speed": 300},
+ 102: {"name": "Long Distance Trains", "max_speed": 300},
+ 108: {"name": "Rail Shuttle", "max_speed": 300},
+ 109: {"name": "Suburban Railway", "max_speed": 300},
+ 200: {"name": "CoachService", "max_speed": 120},
+ 201: {"name": "InternationalCoach", "max_speed": 120},
+ 202: {"name": "NationalCoach", "max_speed": 120},
+ 204: {"name": "RegionalCoach", "max_speed": 120},
+ 208: {"name": "CommuterCoach", "max_speed": 120},
+ 400: {"name": "UrbanRailwayService", "max_speed": 150},
+ 401: {"name": "Metro", "max_speed": 150},
+ 402: {"name": "Underground", "max_speed": 150},
+ 405: {"name": "Monorail", "max_speed": 150},
+ 700: {"name": "BusService", "max_speed": 100},
+ 701: {"name": "RegionalBus", "max_speed": 120},
+ 702: {"name": "ExpressBus", "max_speed": 120},
+ 704: {"name": "LocalBus", "max_speed": 100},
+ 800: {"name": "TrolleybusService", "max_speed": 100},
+ 900: {"name": "TramService", "max_speed": 100},
+ 1000: {"name": "WaterTransportService", "max_speed": 80},
+ 1100: {"name": "AirService", "max_speed": 1000},
+ 1300: {"name": "TelecabinService", "max_speed": 50},
+ 1400: {"name": "FunicularService", "max_speed": 50},
+ 1500: {"name": "TaxiService", "max_speed": 100},
+ 1501: {"name": "CommunalTaxi", "max_speed": 100},
+ 1700: {"name": "MiscellaneousService", "max_speed": 100},
+ 1701: {"name": "CableCar", "max_speed": 50},
+ 1702: {"name": "HorseDrawnCarriage", "max_speed": 50},
+ }.items()
+ )
- _ROUTE_TYPE_IDS = set(_ROUTE_TYPES.keys())
- # _ROUTE_TYPE_NAMES is not getting updated as we should not continue to allow
- # reverse lookup by name. The new non GTFS route types are only valid as int.
+ _ROUTE_TYPE_IDS = set(_ROUTE_TYPES.keys())
+ # _ROUTE_TYPE_NAMES is not getting updated as we should not continue to allow
+ # reverse lookup by name. The new non GTFS route types are only valid as int.
- def ValidateCo2PerKm(self, problems):
- if not util.IsEmpty(self.co2_per_km):
- try:
- self.co2_per_km = float(self.co2_per_km)
- except ValueError:
- problems.InvalidValue('co2_per_km', self.co2_per_km)
+ def ValidateCo2PerKm(self, problems):
+ if not util.IsEmpty(self.co2_per_km):
+ try:
+ self.co2_per_km = float(self.co2_per_km)
+ except ValueError:
+ problems.InvalidValue("co2_per_km", self.co2_per_km)
- def ValidateBeforeAdd(self, problems):
- self.ValidateCo2PerKm(problems)
- return super(Route, self).ValidateBeforeAdd(problems)
+ def ValidateBeforeAdd(self, problems):
+ self.ValidateCo2PerKm(problems)
+ return super(Route, self).ValidateBeforeAdd(problems)
diff --git a/extensions/googletransit/setup_extension.py b/extensions/googletransit/setup_extension.py
index c3ed3fe6..a1292852 100644
--- a/extensions/googletransit/setup_extension.py
+++ b/extensions/googletransit/setup_extension.py
@@ -22,20 +22,21 @@
from . import route
from . import stop
-def GetGtfsFactory(factory = None):
- if not factory:
- factory = transitfeed.GetGtfsFactory()
- # Agency class extension
- factory.UpdateClass('Agency', agency.Agency)
+def GetGtfsFactory(factory=None):
+ if not factory:
+ factory = transitfeed.GetGtfsFactory()
- # FareAttribute class extension
- factory.UpdateClass('FareAttribute', fareattribute.FareAttribute)
+ # Agency class extension
+ factory.UpdateClass("Agency", agency.Agency)
- # Route class extension
- factory.UpdateClass('Route', route.Route)
+ # FareAttribute class extension
+ factory.UpdateClass("FareAttribute", fareattribute.FareAttribute)
- # Stop class extension
- factory.UpdateClass('Stop', stop.Stop)
+ # Route class extension
+ factory.UpdateClass("Route", route.Route)
- return factory
+ # Stop class extension
+ factory.UpdateClass("Stop", stop.Stop)
+
+ return factory
diff --git a/extensions/googletransit/stop.py b/extensions/googletransit/stop.py
index e9210745..c0c2b02e 100644
--- a/extensions/googletransit/stop.py
+++ b/extensions/googletransit/stop.py
@@ -18,57 +18,74 @@
import transitfeed.util as util
import transitfeed.problems as problems_module
+
class Stop(transitfeed.Stop):
- """Extension of transitfeed.Stop:
+ """Extension of transitfeed.Stop:
- Adding and validating new fields (see _FIELD_NAMES). See proposal at
https://sites.google.com/site/gtfschanges/spec-changes-summary#stops
- Overriding ValidateAfterAdd() in order to call new validation functions.
- Overriding ValidateStopLocationType(), adding location_type 2 (entrance).
"""
- _FIELD_NAMES = transitfeed.Stop._FIELD_NAMES + ['vehicle_type', 'platform_code']
+ _FIELD_NAMES = transitfeed.Stop._FIELD_NAMES + ["vehicle_type", "platform_code"]
- LOCATION_TYPE_ENTRANCE = 2
+ LOCATION_TYPE_ENTRANCE = 2
- # New validation function for field 'vehicle_type'.
- def ValidateVehicleType(self, problems):
- self.vehicle_type = util.ValidateAndReturnIntValue(
- self.vehicle_type, self._gtfs_factory.Route._ROUTE_TYPE_IDS, None, True,
- 'vehicle_type', problems)
- # Entrances must not have a vehicle type, in general google transit does not
- # read vehicle types from stops with a parent station.
- if self.vehicle_type:
- if self.location_type == 2:
- problems.InvalidValue('vehicle_type', self.location_type,
- reason='an entrance must not have a vehicle type')
- elif not util.IsEmpty(self.parent_station):
- problems.InvalidValue('vehicle_type', self.location_type,
- reason='Google Transit does not read vehicle types for stops '
- 'having a parent station', type=problems_module.TYPE_WARNING)
+ # New validation function for field 'vehicle_type'.
+ def ValidateVehicleType(self, problems):
+ self.vehicle_type = util.ValidateAndReturnIntValue(
+ self.vehicle_type,
+ self._gtfs_factory.Route._ROUTE_TYPE_IDS,
+ None,
+ True,
+ "vehicle_type",
+ problems,
+ )
+ # Entrances must not have a vehicle type, in general google transit does not
+ # read vehicle types from stops with a parent station.
+ if self.vehicle_type:
+ if self.location_type == 2:
+ problems.InvalidValue(
+ "vehicle_type",
+ self.location_type,
+ reason="an entrance must not have a vehicle type",
+ )
+ elif not util.IsEmpty(self.parent_station):
+ problems.InvalidValue(
+ "vehicle_type",
+ self.location_type,
+ reason="Google Transit does not read vehicle types for stops "
+ "having a parent station",
+ type=problems_module.TYPE_WARNING,
+ )
- # Overriding transitfeed.Stop.ValidateBeforeAdd().
- def ValidateBeforeAdd(self, problems):
- super(Stop, self).ValidateBeforeAdd(problems)
- self.ValidateVehicleType(problems)
- return True # None of these checks are blocking
+ # Overriding transitfeed.Stop.ValidateBeforeAdd().
+ def ValidateBeforeAdd(self, problems):
+ super(Stop, self).ValidateBeforeAdd(problems)
+ self.ValidateVehicleType(problems)
+ return True # None of these checks are blocking
- # Overriding transitfeed.Stop.ValidateStopLocationType().
- # Adding location_type 2 (entrance).
- def ValidateStopLocationType(self, problems):
- self.location_type = util.ValidateAndReturnIntValue(
- self.location_type, [0, 1, 2], 0, True, 'location_type', problems)
- # Entrances must have a parent_station.
- if self.location_type == 2 and util.IsEmpty(self.parent_station):
- problems.InvalidValue('location_type', self.location_type,
- reason='an entrance must have a parent_station')
+ # Overriding transitfeed.Stop.ValidateStopLocationType().
+ # Adding location_type 2 (entrance).
+ def ValidateStopLocationType(self, problems):
+ self.location_type = util.ValidateAndReturnIntValue(
+ self.location_type, [0, 1, 2], 0, True, "location_type", problems
+ )
+ # Entrances must have a parent_station.
+ if self.location_type == 2 and util.IsEmpty(self.parent_station):
+ problems.InvalidValue(
+ "location_type",
+ self.location_type,
+ reason="an entrance must have a parent_station",
+ )
- # Overriding _ReportMissingRequiredField() in order to allow empty stop_name
- # if location_type=2 (entrance).
- def _ReportMissingRequiredField(self, problems, required):
- if required == 'stop_name':
- # stops of type 2 (entrance) may have an empty stop_name
- self.ValidateStopLocationType(problems)
- if self.location_type == 2:
- return
- problems.MissingValue(required)
- setattr(self, required, None)
+ # Overriding _ReportMissingRequiredField() in order to allow empty stop_name
+ # if location_type=2 (entrance).
+ def _ReportMissingRequiredField(self, problems, required):
+ if required == "stop_name":
+ # stops of type 2 (entrance) may have an empty stop_name
+ self.ValidateStopLocationType(problems)
+ if self.location_type == 2:
+ return
+ problems.MissingValue(required)
+ setattr(self, required, None)
diff --git a/feedvalidator.py b/feedvalidator.py
index 06de811e..fc203f8c 100755
--- a/feedvalidator.py
+++ b/feedvalidator.py
@@ -36,175 +36,181 @@
from transitfeed import util
import webbrowser
+
def MaybePluralizeWord(count, word):
- if count == 1:
- return word
- else:
- return word + 's'
+ if count == 1:
+ return word
+ else:
+ return word + "s"
def PrettyNumberWord(count, word):
- return '%d %s' % (count, MaybePluralizeWord(count, word))
+ return "%d %s" % (count, MaybePluralizeWord(count, word))
def UnCamelCase(camel):
- return re.sub(r'([a-z])([A-Z])', r'\1 \2', camel)
+ return re.sub(r"([a-z])([A-Z])", r"\1 \2", camel)
def ProblemCountText(error_count, warning_count):
- results = []
- if error_count:
- results.append(PrettyNumberWord(error_count, 'error'))
- if warning_count:
- results.append(PrettyNumberWord(warning_count, 'warning'))
+ results = []
+ if error_count:
+ results.append(PrettyNumberWord(error_count, "error"))
+ if warning_count:
+ results.append(PrettyNumberWord(warning_count, "warning"))
- return ' and '.join(results)
+ return " and ".join(results)
def CalendarSummary(schedule):
- today = datetime.date.today()
- summary_end_date = today + datetime.timedelta(days=60)
- start_date, end_date = schedule.GetDateRange()
-
- if not start_date or not end_date:
- return {}
-
- start_date_object = transitfeed.DateStringToDateObject(start_date)
- end_date_object = transitfeed.DateStringToDateObject(end_date)
- if not start_date_object or not end_date_object:
- return {}
-
- # Get the list of trips only during the period the feed is active.
- # As such we have to check if it starts in the future and/or if
- # if it ends in less than 60 days.
- date_trips_departures = schedule.GenerateDateTripsDeparturesList(
- max(today, start_date_object),
- min(summary_end_date, end_date_object))
-
- if not date_trips_departures:
- return {}
-
- # Check that the dates which will be shown in summary agree with these
- # calculations. Failure implies a bug which should be fixed. It isn't good
- # for users to discover assertion failures but means it will likely be fixed.
- assert start_date <= date_trips_departures[0][0].strftime("%Y%m%d")
- assert end_date >= date_trips_departures[-1][0].strftime("%Y%m%d")
-
- # Generate a map from int number of trips in a day to a list of date objects
- # with that many trips. The list of dates is sorted.
- trips_dates = defaultdict(lambda: [])
- trips = 0
- for date, day_trips, day_departures in date_trips_departures:
- trips += day_trips
- trips_dates[day_trips].append(date)
- mean_trips = trips / len(date_trips_departures)
- max_trips = max(trips_dates.keys())
- min_trips = min(trips_dates.keys())
-
- calendar_summary = {}
- calendar_summary['mean_trips'] = mean_trips
- calendar_summary['max_trips'] = max_trips
- calendar_summary['max_trips_dates'] = FormatDateList(trips_dates[max_trips])
- calendar_summary['min_trips'] = min_trips
- calendar_summary['min_trips_dates'] = FormatDateList(trips_dates[min_trips])
- calendar_summary['date_trips_departures'] = date_trips_departures
- calendar_summary['date_summary_range'] = "%s to %s" % (
- date_trips_departures[0][0].strftime("%a %b %d"),
- date_trips_departures[-1][0].strftime("%a %b %d"))
-
- return calendar_summary
+ today = datetime.date.today()
+ summary_end_date = today + datetime.timedelta(days=60)
+ start_date, end_date = schedule.GetDateRange()
+
+ if not start_date or not end_date:
+ return {}
+
+ start_date_object = transitfeed.DateStringToDateObject(start_date)
+ end_date_object = transitfeed.DateStringToDateObject(end_date)
+ if not start_date_object or not end_date_object:
+ return {}
+
+ # Get the list of trips only during the period the feed is active.
+ # As such we have to check if it starts in the future and/or if
+ # if it ends in less than 60 days.
+ date_trips_departures = schedule.GenerateDateTripsDeparturesList(
+ max(today, start_date_object), min(summary_end_date, end_date_object)
+ )
+
+ if not date_trips_departures:
+ return {}
+
+ # Check that the dates which will be shown in summary agree with these
+ # calculations. Failure implies a bug which should be fixed. It isn't good
+ # for users to discover assertion failures but means it will likely be fixed.
+ assert start_date <= date_trips_departures[0][0].strftime("%Y%m%d")
+ assert end_date >= date_trips_departures[-1][0].strftime("%Y%m%d")
+
+ # Generate a map from int number of trips in a day to a list of date objects
+ # with that many trips. The list of dates is sorted.
+ trips_dates = defaultdict(lambda: [])
+ trips = 0
+ for date, day_trips, day_departures in date_trips_departures:
+ trips += day_trips
+ trips_dates[day_trips].append(date)
+ mean_trips = trips / len(date_trips_departures)
+ max_trips = max(trips_dates.keys())
+ min_trips = min(trips_dates.keys())
+
+ calendar_summary = {}
+ calendar_summary["mean_trips"] = mean_trips
+ calendar_summary["max_trips"] = max_trips
+ calendar_summary["max_trips_dates"] = FormatDateList(trips_dates[max_trips])
+ calendar_summary["min_trips"] = min_trips
+ calendar_summary["min_trips_dates"] = FormatDateList(trips_dates[min_trips])
+ calendar_summary["date_trips_departures"] = date_trips_departures
+ calendar_summary["date_summary_range"] = "%s to %s" % (
+ date_trips_departures[0][0].strftime("%a %b %d"),
+ date_trips_departures[-1][0].strftime("%a %b %d"),
+ )
+
+ return calendar_summary
def FormatDateList(dates):
- if not dates:
- return "0 service dates"
+ if not dates:
+ return "0 service dates"
- formatted = [d.strftime("%a %b %d") for d in dates[0:3]]
- if len(dates) > 3:
- formatted.append("...")
- return "%s (%s)" % (PrettyNumberWord(len(dates), "service date"),
- ", ".join(formatted))
+ formatted = [d.strftime("%a %b %d") for d in dates[0:3]]
+ if len(dates) > 3:
+ formatted.append("...")
+ return "%s (%s)" % (
+ PrettyNumberWord(len(dates), "service date"),
+ ", ".join(formatted),
+ )
class CountingConsoleProblemAccumulator(transitfeed.SimpleProblemAccumulator):
- """Accumulate problems and count errors and warnings.
+ """Accumulate problems and count errors and warnings.
Args:
ignore_types: list of error type names that will be ignored. E.g.
['ExpirationDate', 'UnusedStop']
"""
- def __init__(self, ignore_types=None):
- self._error_count = 0
- self._warning_count = 0
- self._notice_count = 0
- self._ignore_types = ignore_types or set()
+ def __init__(self, ignore_types=None):
+ self._error_count = 0
+ self._warning_count = 0
+ self._notice_count = 0
+ self._ignore_types = ignore_types or set()
- def _Report(self, e):
- if e.__class__.__name__ in self._ignore_types:
- return
- transitfeed.SimpleProblemAccumulator._Report(self, e)
- if e.IsError():
- self._error_count += 1
- elif e.IsWarning():
- self._warning_count += 1
- elif e.IsNotice():
- self._notice_count += 1
+ def _Report(self, e):
+ if e.__class__.__name__ in self._ignore_types:
+ return
+ transitfeed.SimpleProblemAccumulator._Report(self, e)
+ if e.IsError():
+ self._error_count += 1
+ elif e.IsWarning():
+ self._warning_count += 1
+ elif e.IsNotice():
+ self._notice_count += 1
- def ErrorCount(self):
- return self._error_count
+ def ErrorCount(self):
+ return self._error_count
- def WarningCount(self):
- return self._warning_count
+ def WarningCount(self):
+ return self._warning_count
- def NoticeCount(self):
- return self._notice_count
+ def NoticeCount(self):
+ return self._notice_count
- def FormatCount(self):
- return ProblemCountText(self.ErrorCount(), self.WarningCount())
+ def FormatCount(self):
+ return ProblemCountText(self.ErrorCount(), self.WarningCount())
- def HasIssues(self):
- return self.ErrorCount() or self.WarningCount()
+ def HasIssues(self):
+ return self.ErrorCount() or self.WarningCount()
- def HasNotices(self):
- return self.NoticeCount()
+ def HasNotices(self):
+ return self.NoticeCount()
-class BoundedProblemList(object):
- """A list of one type of ExceptionWithContext objects with bounded size."""
- def __init__(self, size_bound):
- self._count = 0
- self._exceptions = []
- self._size_bound = size_bound
-
- def Add(self, e):
- self._count += 1
- try:
- bisect.insort(self._exceptions, e)
- except TypeError:
- # The base class ExceptionWithContext raises this exception in __cmp__
- # to signal that an object is not comparable. Instead of keeping the most
- # significant issue keep the first reported.
- if self._count <= self._size_bound:
- self._exceptions.append(e)
- else:
- # self._exceptions is in order. Drop the least significant if the list is
- # now too long.
- if self._count > self._size_bound:
- del self._exceptions[-1]
- def _GetDroppedCount(self):
- return self._count - len(self._exceptions)
+class BoundedProblemList(object):
+ """A list of one type of ExceptionWithContext objects with bounded size."""
- def __repr__(self):
- return "" % repr(self._exceptions)
+ def __init__(self, size_bound):
+ self._count = 0
+ self._exceptions = []
+ self._size_bound = size_bound
- count = property(lambda s: s._count)
- dropped_count = property(_GetDroppedCount)
- problems = property(lambda s: s._exceptions)
+ def Add(self, e):
+ self._count += 1
+ try:
+ bisect.insort(self._exceptions, e)
+ except TypeError:
+ # The base class ExceptionWithContext raises this exception in __cmp__
+ # to signal that an object is not comparable. Instead of keeping the most
+ # significant issue keep the first reported.
+ if self._count <= self._size_bound:
+ self._exceptions.append(e)
+ else:
+ # self._exceptions is in order. Drop the least significant if the list is
+ # now too long.
+ if self._count > self._size_bound:
+ del self._exceptions[-1]
+
+ def _GetDroppedCount(self):
+ return self._count - len(self._exceptions)
+
+ def __repr__(self):
+ return "" % repr(self._exceptions)
+
+ count = property(lambda s: s._count)
+ dropped_count = property(_GetDroppedCount)
+ problems = property(lambda s: s._exceptions)
class LimitPerTypeProblemAccumulator(transitfeed.ProblemAccumulatorInterface):
- """Accumulate problems up to a maximum number per type.
+ """Accumulate problems up to a maximum number per type.
Args:
limit_per_type: maximum number of errors and warnings to keep of each type
@@ -212,47 +218,49 @@ class LimitPerTypeProblemAccumulator(transitfeed.ProblemAccumulatorInterface):
['ExpirationDate', 'UnusedStop']
"""
- def __init__(self, limit_per_type, ignore_types=None):
- # {TYPE_WARNING: {"ClassName": BoundedProblemList()}}
- self._type_to_name_to_problist = {
- TYPE_WARNING: defaultdict(lambda: BoundedProblemList(limit_per_type)),
- TYPE_ERROR: defaultdict(lambda: BoundedProblemList(limit_per_type)),
- TYPE_NOTICE: defaultdict(lambda: BoundedProblemList(limit_per_type))
- }
- self._ignore_types = ignore_types or set()
+ def __init__(self, limit_per_type, ignore_types=None):
+ # {TYPE_WARNING: {"ClassName": BoundedProblemList()}}
+ self._type_to_name_to_problist = {
+ TYPE_WARNING: defaultdict(lambda: BoundedProblemList(limit_per_type)),
+ TYPE_ERROR: defaultdict(lambda: BoundedProblemList(limit_per_type)),
+ TYPE_NOTICE: defaultdict(lambda: BoundedProblemList(limit_per_type)),
+ }
+ self._ignore_types = ignore_types or set()
- def HasIssues(self):
- return (self._type_to_name_to_problist[TYPE_ERROR] or
- self._type_to_name_to_problist[TYPE_WARNING])
+ def HasIssues(self):
+ return (
+ self._type_to_name_to_problist[TYPE_ERROR]
+ or self._type_to_name_to_problist[TYPE_WARNING]
+ )
- def HasNotices(self):
- return self._type_to_name_to_problist[TYPE_NOTICE]
+ def HasNotices(self):
+ return self._type_to_name_to_problist[TYPE_NOTICE]
- def _Report(self, e):
- if e.__class__.__name__ in self._ignore_types:
- return
- self._type_to_name_to_problist[e.GetType()][e.__class__.__name__].Add(e)
+ def _Report(self, e):
+ if e.__class__.__name__ in self._ignore_types:
+ return
+ self._type_to_name_to_problist[e.GetType()][e.__class__.__name__].Add(e)
- def ErrorCount(self):
- error_sets = self._type_to_name_to_problist[TYPE_ERROR].values()
- return sum(map(lambda v: v.count, error_sets))
+ def ErrorCount(self):
+ error_sets = self._type_to_name_to_problist[TYPE_ERROR].values()
+ return sum(map(lambda v: v.count, error_sets))
- def WarningCount(self):
- warning_sets = self._type_to_name_to_problist[TYPE_WARNING].values()
- return sum(map(lambda v: v.count, warning_sets))
+ def WarningCount(self):
+ warning_sets = self._type_to_name_to_problist[TYPE_WARNING].values()
+ return sum(map(lambda v: v.count, warning_sets))
- def ProblemList(self, problem_type, class_name):
- """Return the BoundedProblemList object for given type and class."""
- return self._type_to_name_to_problist[problem_type][class_name]
+ def ProblemList(self, problem_type, class_name):
+ """Return the BoundedProblemList object for given type and class."""
+ return self._type_to_name_to_problist[problem_type][class_name]
- def ProblemListMap(self, problem_type):
- """Return the map from class name to BoundedProblemList object."""
- return self._type_to_name_to_problist[problem_type]
+ def ProblemListMap(self, problem_type):
+ """Return the map from class name to BoundedProblemList object."""
+ return self._type_to_name_to_problist[problem_type]
class HTMLCountingProblemAccumulator(LimitPerTypeProblemAccumulator):
- def FormatType(self, level_name, class_problist):
- """Write the HTML dumping all problems of one type.
+ def FormatType(self, level_name, class_problist):
+ """Write the HTML dumping all problems of one type.
Args:
level_name: string such as "Error" or "Warning"
@@ -262,21 +270,24 @@ def FormatType(self, level_name, class_problist):
Returns:
HTML in a string
"""
- class_problist.sort()
- output = []
- for classname, problist in class_problist:
- output.append('\n' %
- (level_name, classname, UnCamelCase(classname)))
- for e in problist.problems:
- self.FormatException(e, output)
- if problist.dropped_count:
- output.append('and %d more of this type.' %
- (problist.dropped_count))
- output.append(' \n')
- return ''.join(output)
-
- def FormatTypeSummaryTable(self, level_name, name_to_problist):
- """Return an HTML table listing the number of problems by class name.
+ class_problist.sort()
+ output = []
+ for classname, problist in class_problist:
+ output.append(
+ '\n'
+ % (level_name, classname, UnCamelCase(classname))
+ )
+ for e in problist.problems:
+ self.FormatException(e, output)
+ if problist.dropped_count:
+ output.append(
+ "and %d more of this type." % (problist.dropped_count)
+ )
+ output.append(" \n")
+ return "".join(output)
+
+ def FormatTypeSummaryTable(self, level_name, name_to_problist):
+ """Return an HTML table listing the number of problems by class name.
Args:
level_name: string such as "Error" or "Warning"
@@ -285,140 +296,172 @@ def FormatTypeSummaryTable(self, level_name, name_to_problist):
Returns:
HTML in a string
"""
- output = []
- output.append('')
- for classname in sorted(name_to_problist.keys()):
- problist = name_to_problist[classname]
- human_name = MaybePluralizeWord(problist.count, UnCamelCase(classname))
- output.append('%d %s \n' %
- (problist.count, level_name, classname, human_name))
- output.append('
\n')
- return ''.join(output)
-
- def FormatException(self, e, output):
- """Append HTML version of e to list output."""
- d = e.GetDictToFormat()
- for k in ('file_name', 'feedname', 'column_name'):
- if k in d.keys():
- d[k] = '%s
' % d[k]
- if 'url' in d.keys():
- d['url'] = '%(url)s ' % d
-
- problem_text = e.FormatProblem(d).replace('\n', ' ')
- problem_class = 'problem'
- if e.IsNotice():
- problem_class += ' notice'
- output.append('')
- output.append('%s
' %
- (problem_class, transitfeed.EncodeUnicode(problem_text)))
- try:
- if hasattr(e, 'row_num'):
- line_str = 'line %d of ' % e.row_num
- else:
- line_str = ''
- output.append('in %s%s
\n' %
- (line_str, transitfeed.EncodeUnicode(e.file_name)))
- row = e.row
- headers = e.headers
- column_name = e.column_name
- table_header = '' # HTML
- table_data = '' # HTML
- for header, value in zip(headers, row):
- attributes = ''
- if header == column_name:
- attributes = ' class="problem"'
- table_header += '%s ' % (attributes, header)
- table_data += '%s ' % (attributes, value)
- # Make sure output is encoded into UTF-8
- output.append('%s \n' %
- transitfeed.EncodeUnicode(table_header))
- output.append('%s
\n' %
- transitfeed.EncodeUnicode(table_data))
- except AttributeError as e:
- pass # Hope this was getting an attribute from e ;-)
- output.append(' \n')
-
- def FormatCount(self):
- return ProblemCountText(self.ErrorCount(), self.WarningCount())
-
- def CountTable(self):
- output = []
- output.append('\n')
- output.append('')
- if self.ProblemListMap(TYPE_ERROR):
- output.append('%s ' %
- PrettyNumberWord(self.ErrorCount(), "error"))
- if self.ProblemListMap(TYPE_WARNING):
- output.append('%s ' %
- PrettyNumberWord(self.WarningCount(), "warning"))
- output.append(' \n')
- if self.ProblemListMap(TYPE_ERROR):
- output.append('\n')
- output.append(self.FormatTypeSummaryTable("Error",
- self.ProblemListMap(TYPE_ERROR)))
- output.append(' \n')
- if self.ProblemListMap(TYPE_WARNING):
- output.append('\n')
- output.append(self.FormatTypeSummaryTable("Warning",
- self.ProblemListMap(TYPE_WARNING)))
- output.append(' \n')
- output.append('
')
- return ''.join(output)
-
- def WriteOutput(self, feed_location, f, schedule, extension):
- """Write the html output to f."""
- if self.HasIssues():
- if self.ErrorCount() + self.WarningCount() == 1:
- summary = ('Found this problem: \n%s' %
- self.CountTable())
- else:
- summary = ('Found these problems: \n%s' %
- self.CountTable())
- else:
- summary = 'feed validated successfully '
-
- if self.HasNotices():
- summary = ('' +
- self.FormatType("Notice", self.ProblemListMap(TYPE_NOTICE).items()) +
- summary)
-
- basename = os.path.basename(feed_location)
- feed_path = (feed_location[:feed_location.rfind(basename)], basename)
-
- agencies = ', '.join(['%s ' % (a.agency_url, a.agency_name)
- for a in schedule.GetAgencyList()])
- if not agencies:
- agencies = '?'
-
- dates = "No valid service dates found"
- (start, end) = schedule.GetDateRange()
- if start and end:
- def FormatDate(yyyymmdd):
- src_format = "%Y%m%d"
- dst_format = "%B %d, %Y"
+ output = []
+ output.append("")
+ for classname in sorted(name_to_problist.keys()):
+ problist = name_to_problist[classname]
+ human_name = MaybePluralizeWord(problist.count, UnCamelCase(classname))
+ output.append(
+ '%d %s \n'
+ % (problist.count, level_name, classname, human_name)
+ )
+ output.append("
\n")
+ return "".join(output)
+
+ def FormatException(self, e, output):
+ """Append HTML version of e to list output."""
+ d = e.GetDictToFormat()
+ for k in ("file_name", "feedname", "column_name"):
+ if k in d.keys():
+ d[k] = "%s
" % d[k]
+ if "url" in d.keys():
+ d["url"] = '%(url)s ' % d
+
+ problem_text = e.FormatProblem(d).replace("\n", " ")
+ problem_class = "problem"
+ if e.IsNotice():
+ problem_class += " notice"
+ output.append("")
+ output.append(
+ '%s
'
+ % (problem_class, transitfeed.EncodeUnicode(problem_text))
+ )
try:
- return time.strftime(dst_format,
- time.strptime(yyyymmdd, src_format))
- except ValueError:
- return yyyymmdd
-
- formatted_start = FormatDate(start)
- formatted_end = FormatDate(end)
- dates = "%s to %s" % (formatted_start, formatted_end)
-
- calendar_summary = CalendarSummary(schedule)
- if calendar_summary:
- calendar_summary_html = """
+ if hasattr(e, "row_num"):
+ line_str = "line %d of " % e.row_num
+ else:
+ line_str = ""
+ output.append(
+ "in %s%s
\n"
+ % (line_str, transitfeed.EncodeUnicode(e.file_name))
+ )
+ row = e.row
+ headers = e.headers
+ column_name = e.column_name
+ table_header = "" # HTML
+ table_data = "" # HTML
+ for header, value in zip(headers, row):
+ attributes = ""
+ if header == column_name:
+ attributes = ' class="problem"'
+ table_header += "%s " % (attributes, header)
+ table_data += "%s " % (attributes, value)
+ # Make sure output is encoded into UTF-8
+ output.append(
+ '%s \n'
+ % transitfeed.EncodeUnicode(table_header)
+ )
+ output.append(
+ "%s
\n" % transitfeed.EncodeUnicode(table_data)
+ )
+ except AttributeError as e:
+ pass # Hope this was getting an attribute from e ;-)
+ output.append(" \n")
+
+ def FormatCount(self):
+ return ProblemCountText(self.ErrorCount(), self.WarningCount())
+
+ def CountTable(self):
+ output = []
+ output.append('\n')
+ output.append("")
+ if self.ProblemListMap(TYPE_ERROR):
+ output.append(
+ '%s '
+ % PrettyNumberWord(self.ErrorCount(), "error")
+ )
+ if self.ProblemListMap(TYPE_WARNING):
+ output.append(
+ '%s '
+ % PrettyNumberWord(self.WarningCount(), "warning")
+ )
+ output.append(" \n")
+ if self.ProblemListMap(TYPE_ERROR):
+ output.append("\n")
+ output.append(
+ self.FormatTypeSummaryTable("Error", self.ProblemListMap(TYPE_ERROR))
+ )
+ output.append(" \n")
+ if self.ProblemListMap(TYPE_WARNING):
+ output.append("\n")
+ output.append(
+ self.FormatTypeSummaryTable(
+ "Warning", self.ProblemListMap(TYPE_WARNING)
+ )
+ )
+ output.append(" \n")
+ output.append("
")
+ return "".join(output)
+
+ def WriteOutput(self, feed_location, f, schedule, extension):
+ """Write the html output to f."""
+ if self.HasIssues():
+ if self.ErrorCount() + self.WarningCount() == 1:
+ summary = (
+ 'Found this problem: \n%s'
+ % self.CountTable()
+ )
+ else:
+ summary = (
+ 'Found these problems: \n%s'
+ % self.CountTable()
+ )
+ else:
+ summary = 'feed validated successfully '
+
+ if self.HasNotices():
+ summary = (
+ ''
+ + self.FormatType("Notice", self.ProblemListMap(TYPE_NOTICE).items())
+ + summary
+ )
+
+ basename = os.path.basename(feed_location)
+ feed_path = (feed_location[: feed_location.rfind(basename)], basename)
+
+ agencies = ", ".join(
+ [
+ '%s ' % (a.agency_url, a.agency_name)
+ for a in schedule.GetAgencyList()
+ ]
+ )
+ if not agencies:
+ agencies = "?"
+
+ dates = "No valid service dates found"
+ (start, end) = schedule.GetDateRange()
+ if start and end:
+
+ def FormatDate(yyyymmdd):
+ src_format = "%Y%m%d"
+ dst_format = "%B %d, %Y"
+ try:
+ return time.strftime(
+ dst_format, time.strptime(yyyymmdd, src_format)
+ )
+ except ValueError:
+ return yyyymmdd
+
+ formatted_start = FormatDate(start)
+ formatted_end = FormatDate(end)
+ dates = "%s to %s" % (formatted_start, formatted_end)
+
+ calendar_summary = CalendarSummary(schedule)
+ if calendar_summary:
+ calendar_summary_html = (
+ """
During the upcoming service dates %(date_summary_range)s:
""" % calendar_summary
- else:
- calendar_summary_html = ""
+"""
+ % calendar_summary
+ )
+ else:
+ calendar_summary_html = ""
- output_prefix = """
+ output_prefix = """
@@ -460,96 +503,104 @@ def FormatDate(yyyymmdd):
%(problem_summary)s
-""" % { "feed_file": feed_path[1],
- "feed_dir": feed_path[0],
- "agencies": agencies,
- "routes": len(schedule.GetRouteList()),
- "stops": len(schedule.GetStopList()),
- "trips": len(schedule.GetTripList()),
- "shapes": len(schedule.GetShapeList()),
- "dates": dates,
- "problem_summary": summary,
- "calendar_summary": calendar_summary_html,
- "extension": extension}
-
-# In output_suffix string
-# time.strftime() returns a regular local time string (not a Unicode one) with
-# default system encoding. And decode() will then convert this time string back
-# into a Unicode string. We use decode() here because we don't want the operating
-# system to do any system encoding (which may cause some problem if the string
-# contains some non-English characters) for the string. Therefore we decode it
-# back to its original Unicode code print.
-
- time_unicode = (time.strftime('%B %d, %Y at %I:%M %p %Z').
- decode(sys.getfilesystemencoding()))
- output_suffix = """
+""" % {
+ "feed_file": feed_path[1],
+ "feed_dir": feed_path[0],
+ "agencies": agencies,
+ "routes": len(schedule.GetRouteList()),
+ "stops": len(schedule.GetStopList()),
+ "trips": len(schedule.GetTripList()),
+ "shapes": len(schedule.GetShapeList()),
+ "dates": dates,
+ "problem_summary": summary,
+ "calendar_summary": calendar_summary_html,
+ "extension": extension,
+ }
+
+ # In output_suffix string
+ # time.strftime() returns a regular local time string (not a Unicode one) with
+ # default system encoding. And decode() will then convert this time string back
+ # into a Unicode string. We use decode() here because we don't want the operating
+ # system to do any system encoding (which may cause some problem if the string
+ # contains some non-English characters) for the string. Therefore we decode it
+ # back to its original Unicode code print.
+
+ time_unicode = time.strftime("%B %d, %Y at %I:%M %p %Z").decode(
+ sys.getfilesystemencoding()
+ )
+ output_suffix = """
-""" % (transitfeed.__version__, time_unicode)
-
- f.write(transitfeed.EncodeUnicode(output_prefix))
- if self.ProblemListMap(TYPE_ERROR):
- f.write('')
- f.write(self.FormatType("Error", self.ProblemListMap(TYPE_ERROR).items()))
- if self.ProblemListMap(TYPE_WARNING):
- f.write('')
- f.write(self.FormatType("Warning", self.ProblemListMap(TYPE_WARNING).items()))
- f.write(transitfeed.EncodeUnicode(output_suffix))
+