Skip to content

Commit

Permalink
Merge pull request #3 from cal-itp/retries
Browse files Browse the repository at this point in the history
retry on fs.get() plus some other cleanup
  • Loading branch information
atvaccaro authored Apr 13, 2022
2 parents 02bebd8 + 9543c1e commit ee629b1
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 95 deletions.
27 changes: 27 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.4.0
hooks:
- id: flake8
args: ["--ignore=E501,W503"] # line too long and line before binary operator (black is ok with these)
types:
- python
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-added-large-files
- repo: https://github.com/psf/black
rev: 19.10b0
hooks:
- id: black
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- id: isort
name: isort (python)
args: ["--profile", "black"]
- repo: https://github.com/pycqa/bandit
rev: 1.7.0
hooks:
- id: bandit
args: ["-ll", "--skip=B108,B608,B310,B303"]
files: .py$
16 changes: 11 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ FROM openjdk:11
ENV GTFS_VALIDATOR_JAR=/usr/gtfs-validator.jar
ENV GTFS_VALIDATOR_VERSION=v2.0.0

RUN wget \
https://github.com/MobilityData/gtfs-validator/releases/download/${GTFS_VALIDATOR_VERSION}/gtfs-validator-${GTFS_VALIDATOR_VERSION}_cli.jar \
-O ${GTFS_VALIDATOR_JAR}
WORKDIR /

# Install python
RUN apt-get update -y \
&& apt-get install -y python3 python3-pip \
&& python3 -m pip install argh==0.26.2 gcsfs==0.8.0
&& apt-get install -y python3 python3-pip

# Deps, etc.
ADD requirements.txt ./requirements.txt
RUN python3 -m pip install -r requirements.txt


RUN wget \
https://github.com/MobilityData/gtfs-validator/releases/download/${GTFS_VALIDATOR_VERSION}/gtfs-validator-${GTFS_VALIDATOR_VERSION}_cli.jar \
-O ${GTFS_VALIDATOR_JAR}

# Install package
WORKDIR /application
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,3 @@ gtfs-validator-api validate-gcs-bucket \
gs://gtfs-data-test/schedule/2021-03-28T00:00:00+00:00
```

101 changes: 53 additions & 48 deletions gtfs_validator_api.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,41 @@
__version__ = "0.0.6"
__version__ = "0.0.7"

import os
import json
import os
import shutil
import subprocess
import warnings
from pathlib import Path
from tempfile import TemporaryDirectory

import argh
import backoff
import gcsfs
from aiohttp.client_exceptions import ClientOSError, ClientResponseError
from argh import arg

from tempfile import TemporaryDirectory
from pathlib import Path

GTFS_VALIDATOR_JAR_ENV = "GTFS_VALIDATOR_JAR"

try:
JAR_PATH = os.environ.get("GTFS_VALIDATOR_JAR")
JAR_PATH = os.environ[GTFS_VALIDATOR_JAR_ENV]
except KeyError:
raise Exception("Must set the environment variable GTFS_VALIDATOR_JAR")
raise Exception(f"Must set the environment variable {GTFS_VALIDATOR_JAR_ENV}")


@backoff.on_exception(backoff.expo, (ClientResponseError, ClientOSError), max_tries=2)
def get_with_retry(fs: gcsfs.GCSFileSystem, *args, **kwargs):
return fs.get(*args, **kwargs)

# Utility funcs ----

def retry_on_fail(f, max_retries=2):
n_retries = 0
@backoff.on_exception(backoff.expo, (ClientResponseError, ClientOSError), max_tries=2)
def pipe_with_retry(fs: gcsfs.GCSFileSystem, *args, **kwargs):
return fs.pipe(*args, **kwargs)

for n_retries in range(max_retries + 1):
try:
f()
except Exception as e:
if n_retries < max_retries:
n_retries += 1
warnings.warn("Function failed, starting retry: %s" % n_retries)
else:
raise e

# API ----


@arg("gtfs_file", help="a zipped gtfs file", type=str)
def validate(gtfs_file, out_file=None, verbose=False, feed_name="us-na"):
def validate(gtfs_file: str, out_file=None, verbose=False, feed_name="us-na"):
if not isinstance(gtfs_file, str):
raise NotImplementedError("gtfs_file must be a string")

Expand All @@ -46,21 +45,29 @@ def validate(gtfs_file, out_file=None, verbose=False, feed_name="us-na"):

with TemporaryDirectory() as tmp_out_dir:

subprocess.check_call([
"java",
"-jar", JAR_PATH,
"--input", gtfs_file,
"--output_base", tmp_out_dir,
"--feed_name", feed_name,
], stderr=stderr, stdout=stdout)
subprocess.check_call(
[
"java",
"-jar",
JAR_PATH,
"--input",
gtfs_file,
"--output_base",
tmp_out_dir,
"--feed_name",
feed_name,
],
stderr=stderr,
stdout=stdout,
)

report = Path(tmp_out_dir) / "report.json"
system_errors = Path(tmp_out_dir) / "system_errors.json"

result = {
"report": json.load(open(report)),
"system_errors": json.load(open(system_errors)),
}
"report": json.load(open(report)),
"system_errors": json.load(open(system_errors)),
}

if out_file is not None:
with open(out_file, "w") as f:
Expand All @@ -84,6 +91,7 @@ def validate_many(gtfs_files, out_file=None, verbose=False):

def _get_paths_from_status(f, bucket_path):
from csv import DictReader

tmpl_path = "{bucket_path}/{itp_id}_{url_number}"

rows = list(DictReader(f))
Expand All @@ -100,10 +108,14 @@ def _get_paths_from_status(f, bucket_path):

@arg("bucket_paths", nargs="+")
def validate_gcs_bucket(
project_id, token, bucket_paths,
feed_name="us-na",
recursive=False, out_file=None, verbose=False
):
project_id,
token,
bucket_paths,
feed_name="us-na",
recursive=False,
out_file=None,
verbose=False,
):
"""
Arguments:
project_id: name of google cloud project
Expand All @@ -121,9 +133,6 @@ def validate_gcs_bucket(
It will look for subfolders named {itp_id}/{url_number}.
"""
import gcsfs
import shutil

fs = gcsfs.GCSFileSystem(project_id, token=token)

if recursive:
Expand All @@ -142,13 +151,13 @@ def validate_gcs_bucket(
path_raw = tmp_dir + "/gtfs"
path_zip = tmp_dir + "/gtfs.zip"

fs.get(path, path_raw, recursive=True)
get_with_retry(fs, path, path_raw, recursive=True)
shutil.make_archive(path_raw, "zip", path_raw)

result = {
"version": os.environ["GTFS_VALIDATOR_VERSION"],
"data": validate(path_zip, verbose=verbose, feed_name=feed_name)
}
"data": validate(path_zip, verbose=verbose, feed_name=feed_name),
}

results.append(result)

Expand All @@ -160,10 +169,7 @@ def validate_gcs_bucket(
print("Saving to path: %s" % bucket_out)

# fs.pipe expects contents to be byte encoded
retry_on_fail(
lambda: fs.pipe(bucket_out, json.dumps(result).encode()),
2
)
pipe_with_retry(fs, bucket_out, json.dumps(result).encode())

# if not saving to disk, return results
if out_file is None:
Expand All @@ -172,11 +178,10 @@ def validate_gcs_bucket(

# Cli ----


def main():
# TODO: make into simple CLI
result = argh.dispatch_commands([
validate, validate_many, validate_gcs_bucket
])
result = argh.dispatch_commands([validate, validate_many, validate_gcs_bucket])

if result is not None:
print(json.dumps(result))
Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
argh==0.26.2
backoff==1.11.1
gcsfs==0.8.0
aiohttp==3.8.1
22 changes: 11 additions & 11 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from setuptools import setup, find_packages
import ast
import re

from setuptools import setup

# Read version number ----

import re
import ast
_version_re = re.compile(r'__version__\s+=\s+(.*)')

with open('gtfs_validator_api.py', 'rb') as f:
VERSION = str(ast.literal_eval(_version_re.search(
f.read().decode('utf-8')).group(1)))
_version_re = re.compile(r"__version__\s+=\s+(.*)")

with open("gtfs_validator_api.py", "rb") as f:
VERSION = str(
ast.literal_eval(_version_re.search(f.read().decode("utf-8")).group(1))
)

# Get README ----

Expand All @@ -27,10 +29,8 @@
# license='MIT',
# author_email='[email protected]',
# url='https://github.com/cal-itp/gtfs_validator_api',
keywords=['package', ],
entry_points={
'console_scripts': ['gtfs-validator-api=gtfs_validator_api:main'],
},
keywords=["package"],
entry_points={"console_scripts": ["gtfs-validator-api=gtfs_validator_api:main"]},
install_requires=["argh"],
python_requires=">=3.6",
# long_description=README,
Expand Down
30 changes: 0 additions & 30 deletions tests/test_retry_on_fail.py

This file was deleted.

0 comments on commit ee629b1

Please sign in to comment.