From e5873356643219b1eefb28538acef75ed9ff8a03 Mon Sep 17 00:00:00 2001 From: Bob Firestone Date: Mon, 31 Jul 2023 08:05:47 -0600 Subject: [PATCH 1/2] fallback to METADATA & black reformatted the file --- pipreqs/pipreqs.py | 251 ++++++++++++++++++++++++++------------------- 1 file changed, 146 insertions(+), 105 deletions(-) diff --git a/pipreqs/pipreqs.py b/pipreqs/pipreqs.py index 8182115..1443b46 100644 --- a/pipreqs/pipreqs.py +++ b/pipreqs/pipreqs.py @@ -51,13 +51,13 @@ from pipreqs import __version__ REGEXP = [ - re.compile(r'^import (.+)$'), - re.compile(r'^from ((?!\.+).*?) import (?:.*)$') + re.compile(r"^import (.+)$"), + re.compile(r"^from ((?!\.+).*?) import (?:.*)$"), ] @contextmanager -def _open(filename=None, mode='r'): +def _open(filename=None, mode="r"): """Open a file or ``sys.stdout`` depending on the provided filename. Args: @@ -70,13 +70,13 @@ def _open(filename=None, mode='r'): A file handle. """ - if not filename or filename == '-': - if not mode or 'r' in mode: + if not filename or filename == "-": + if not mode or "r" in mode: file = sys.stdin - elif 'w' in mode: + elif "w" in mode: file = sys.stdout else: - raise ValueError('Invalid mode for file: {}'.format(mode)) + raise ValueError("Invalid mode for file: {}".format(mode)) else: file = open(filename, mode) @@ -87,8 +87,7 @@ def _open(filename=None, mode='r'): file.close() -def get_all_imports( - path, encoding=None, extra_ignore_dirs=None, follow_links=True): +def get_all_imports(path, encoding=None, extra_ignore_dirs=None, follow_links=True): imports = set() raw_imports = set() candidates = [] @@ -137,11 +136,11 @@ def get_all_imports( # Cleanup: We only want to first part of the import. # Ex: from django.conf --> django.conf. But we only want django # as an import. - cleaned_name, _, _ = name.partition('.') + cleaned_name, _, _ = name.partition(".") imports.add(cleaned_name) packages = imports - (set(candidates) & imports) - logging.debug('Found packages: {0}'.format(packages)) + logging.debug("Found packages: {0}".format(packages)) with open(join("stdlib"), "r") as f: data = {x.strip() for x in f} @@ -155,61 +154,68 @@ def filter_line(line): def generate_requirements_file(path, imports, symbol): with _open(path, "w") as out_file: - logging.debug('Writing {num} requirements: {imports} to {file}'.format( - num=len(imports), - file=path, - imports=", ".join([x['name'] for x in imports]) - )) - fmt = '{name}' + symbol + '{version}' - out_file.write('\n'.join( - fmt.format(**item) if item['version'] else '{name}'.format(**item) - for item in imports) + '\n') + logging.debug( + "Writing {num} requirements: {imports} to {file}".format( + num=len(imports), + file=path, + imports=", ".join([x["name"] for x in imports]), + ) + ) + fmt = "{name}" + symbol + "{version}" + out_file.write( + "\n".join( + fmt.format(**item) if item["version"] else "{name}".format(**item) + for item in imports + ) + + "\n" + ) def output_requirements(imports, symbol): - generate_requirements_file('-', imports, symbol) + generate_requirements_file("-", imports, symbol) -def get_imports_info( - imports, pypi_server="https://pypi.python.org/pypi/", proxy=None): +def get_imports_info(imports, pypi_server="https://pypi.python.org/pypi/", proxy=None): result = [] for item in imports: try: logging.warning( 'Import named "%s" not found locally. ' - 'Trying to resolve it at the PyPI server.', - item + "Trying to resolve it at the PyPI server.", + item, ) response = requests.get( - "{0}{1}/json".format(pypi_server, item), proxies=proxy) + "{0}{1}/json".format(pypi_server, item), proxies=proxy + ) if response.status_code == 200: - if hasattr(response.content, 'decode'): + if hasattr(response.content, "decode"): data = json2package(response.content.decode()) else: data = json2package(response.content) elif response.status_code >= 300: - raise HTTPError(status_code=response.status_code, - reason=response.reason) + raise HTTPError( + status_code=response.status_code, reason=response.reason + ) except HTTPError: - logging.warning( - 'Package "%s" does not exist or network problems', item) + logging.warning('Package "%s" does not exist or network problems', item) continue logging.warning( 'Import named "%s" was resolved to "%s:%s" package (%s).\n' - 'Please, verify manually the final list of requirements.txt ' - 'to avoid possible dependency confusions.', + "Please, verify manually the final list of requirements.txt " + "to avoid possible dependency confusions.", item, data.name, data.latest_release_id, - data.pypi_url + data.pypi_url, ) - result.append({'name': item, 'version': data.latest_release_id}) + result.append({"name": item, "version": data.latest_release_id}) return result def get_locally_installed_packages(encoding=None): packages = [] + unique_package_names = set() ignore = ["tests", "_tests", "egg", "EGG", "info"] for path in sys.path: for root, dirs, files in os.walk(path): @@ -218,36 +224,58 @@ def get_locally_installed_packages(encoding=None): item = os.path.join(root, item) with open(item, "r", encoding=encoding) as f: package = root.split(os.sep)[-1].split("-") + if package[0] not in unique_package_names: + try: + top_level_modules = f.read().strip().split("\n") + except: # NOQA + # TODO: What errors do we intend to suppress here? + continue + + # filter off explicitly ignored top-level modules + # such as test, egg, etc. + filtered_top_level_modules = list() + + for module in top_level_modules: + if (module not in ignore) and ( + package[0] not in ignore + ): + # append exported top level modules to the list + filtered_top_level_modules.append(module) + + version = None + if len(package) > 1: + version = ( + package[1].replace(".dist", "").replace(".egg", "") + ) + + # append package: top_level_modules pairs + # instead of top_level_module: package pairs + + unique_package_names.add(package[0]) + + packages.append( + { + "name": package[0], + "version": version, + "exports": filtered_top_level_modules, + } + ) + if "METADATA" in item: + item = os.path.join(root, item) + with open(item, "r", encoding=encoding) as f: try: - top_level_modules = f.read().strip().split("\n") - except: # NOQA - # TODO: What errors do we intend to suppress here? + data = dict(x.strip().split(":") for x in f) + name = data["Name"] + version = data["Version"] + + if name not in unique_package_names: + unique_package_names.add(name) + packages.append( + {"name": name, "version": version, "exports": []} + ) + except: continue - # filter off explicitly ignored top-level modules - # such as test, egg, etc. - filtered_top_level_modules = list() - - for module in top_level_modules: - if ( - (module not in ignore) and - (package[0] not in ignore) - ): - # append exported top level modules to the list - filtered_top_level_modules.append(module) - - version = None - if len(package) > 1: - version = package[1].replace( - ".dist", "").replace(".egg", "") - - # append package: top_level_modules pairs - # instead of top_level_module: package pairs - packages.append({ - 'name': package[0], - 'version': version, - 'exports': filtered_top_level_modules - }) return packages @@ -260,14 +288,14 @@ def get_import_local(imports, encoding=None): # if candidate import name matches export name # or candidate import name equals to the package name # append it to the result - if item in package['exports'] or item == package['name']: + if item in package["exports"] or item == package["name"]: result.append(package) # removing duplicates of package/version # had to use second method instead of the previous one, # because we have a list in the 'exports' field # https://stackoverflow.com/questions/9427163/remove-duplicate-dict-in-list-in-python - result_unique = [i for n, i in enumerate(result) if i not in result[n+1:]] + result_unique = [i for n, i in enumerate(result) if i not in result[n + 1 :]] return result_unique @@ -298,7 +326,7 @@ def get_name_without_alias(name): match = REGEXP[0].match(name.strip()) if match: name = match.groups(0)[0] - return name.partition(' as ')[0].partition('.')[0].strip() + return name.partition(" as ")[0].partition(".")[0].strip() def join(f): @@ -384,7 +412,8 @@ def diff(file_, imports): logging.info( "The following modules are in {} but do not seem to be imported: " - "{}".format(file_, ", ".join(x for x in modules_not_imported))) + "{}".format(file_, ", ".join(x for x in modules_not_imported)) + ) def clean(file_, imports): @@ -432,30 +461,38 @@ def dynamic_versioning(scheme, imports): def init(args): - encoding = args.get('--encoding') - extra_ignore_dirs = args.get('--ignore') - follow_links = not args.get('--no-follow-links') - input_path = args[''] + encoding = args.get("--encoding") + extra_ignore_dirs = args.get("--ignore") + follow_links = not args.get("--no-follow-links") + input_path = args[""] if input_path is None: input_path = os.path.abspath(os.curdir) if extra_ignore_dirs: - extra_ignore_dirs = extra_ignore_dirs.split(',') - - path = (args["--savepath"] if args["--savepath"] else - os.path.join(input_path, "requirements.txt")) - if (not args["--print"] - and not args["--savepath"] - and not args["--force"] - and os.path.exists(path)): - logging.warning("requirements.txt already exists, " - "use --force to overwrite it") + extra_ignore_dirs = extra_ignore_dirs.split(",") + + path = ( + args["--savepath"] + if args["--savepath"] + else os.path.join(input_path, "requirements.txt") + ) + if ( + not args["--print"] + and not args["--savepath"] + and not args["--force"] + and os.path.exists(path) + ): + logging.warning( + "requirements.txt already exists, " "use --force to overwrite it" + ) return - candidates = get_all_imports(input_path, - encoding=encoding, - extra_ignore_dirs=extra_ignore_dirs, - follow_links=follow_links) + candidates = get_all_imports( + input_path, + encoding=encoding, + extra_ignore_dirs=extra_ignore_dirs, + follow_links=follow_links, + ) candidates = get_pkg_names(candidates) logging.debug("Found imports: " + ", ".join(candidates)) pypi_server = "https://pypi.python.org/pypi/" @@ -464,11 +501,10 @@ def init(args): pypi_server = args["--pypi-server"] if args["--proxy"]: - proxy = {'http': args["--proxy"], 'https': args["--proxy"]} + proxy = {"http": args["--proxy"], "https": args["--proxy"]} if args["--use-local"]: - logging.debug( - "Getting package information ONLY from local installation.") + logging.debug("Getting package information ONLY from local installation.") imports = get_import_local(candidates, encoding=encoding) else: logging.debug("Getting packages information from Local/PyPI") @@ -478,20 +514,23 @@ def init(args): # the list of exported modules, installed locally # and the package name is not in the list of local module names # it add to difference - difference = [x for x in candidates if - # aggregate all export lists into one - # flatten the list - # check if candidate is in exports - x.lower() not in [y for x in local for y in x['exports']] - and - # check if candidate is package names - x.lower() not in [x['name'] for x in local]] - - imports = local + get_imports_info(difference, - proxy=proxy, - pypi_server=pypi_server) + difference = [ + x + for x in candidates + if + # aggregate all export lists into one + # flatten the list + # check if candidate is in exports + x.lower() not in [y for x in local for y in x["exports"]] and + # check if candidate is package names + x.lower() not in [x["name"] for x in local] + ] + + imports = local + get_imports_info( + difference, proxy=proxy, pypi_server=pypi_server + ) # sort imports based on lowercase name of package, similar to `pip freeze`. - imports = sorted(imports, key=lambda x: x['name'].lower()) + imports = sorted(imports, key=lambda x: x["name"].lower()) if args["--diff"]: diff(args["--diff"], imports) @@ -506,8 +545,10 @@ def init(args): if scheme in ["compat", "gt", "no-pin"]: imports, symbol = dynamic_versioning(scheme, imports) else: - raise ValueError("Invalid argument for mode flag, " - "use 'compat', 'gt' or 'no-pin' instead") + raise ValueError( + "Invalid argument for mode flag, " + "use 'compat', 'gt' or 'no-pin' instead" + ) else: symbol = "==" @@ -521,8 +562,8 @@ def init(args): def main(): # pragma: no cover args = docopt(__doc__, version=__version__) - log_level = logging.DEBUG if args['--debug'] else logging.INFO - logging.basicConfig(level=log_level, format='%(levelname)s: %(message)s') + log_level = logging.DEBUG if args["--debug"] else logging.INFO + logging.basicConfig(level=log_level, format="%(levelname)s: %(message)s") try: init(args) @@ -530,5 +571,5 @@ def main(): # pragma: no cover sys.exit(0) -if __name__ == '__main__': +if __name__ == "__main__": main() # pragma: no cover From 715e08b03bfafd578057d88c63aca622dad089f8 Mon Sep 17 00:00:00 2001 From: Bob Firestone Date: Mon, 31 Jul 2023 09:22:16 -0600 Subject: [PATCH 2/2] fix METADATA parsing --- pipreqs/pipreqs.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pipreqs/pipreqs.py b/pipreqs/pipreqs.py index 1443b46..6b0dd18 100644 --- a/pipreqs/pipreqs.py +++ b/pipreqs/pipreqs.py @@ -262,13 +262,18 @@ def get_locally_installed_packages(encoding=None): ) if "METADATA" in item: item = os.path.join(root, item) - with open(item, "r", encoding=encoding) as f: + with open(item, "r", encoding=encoding) as file: try: - data = dict(x.strip().split(":") for x in f) + data = {} + for line in file: + if ":" in line: + key, value = line.split(":", 1) + data[key.strip()] = value.strip() + name = data["Name"] version = data["Version"] - if name not in unique_package_names: + if name and name not in unique_package_names: unique_package_names.add(name) packages.append( {"name": name, "version": version, "exports": []}