From 3b0d9389fc15d5cbaed10c36ed985761be8769f1 Mon Sep 17 00:00:00 2001 From: Daniel Altiparmak Date: Thu, 2 May 2013 09:49:02 +0200 Subject: [PATCH 1/5] changed keyword prefetch to stream due compatibility problems with newer requests package --- crate/pypi/processor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crate/pypi/processor.py b/crate/pypi/processor.py index 4d69b0b..b2615c2 100644 --- a/crate/pypi/processor.py +++ b/crate/pypi/processor.py @@ -397,7 +397,7 @@ def download(self): "If-Modified-Since": stored_file_data["modified"], } - resp = requests.get(file_data["file"], headers=headers, prefetch=True) + resp = requests.get(file_data["file"], headers=headers, stream=False) if resp.status_code == 304: logger.info("[DOWNLOAD] skipping %(filename)s because it has not been modified" % {"filename": release_file.filename}) @@ -482,13 +482,13 @@ def verify_and_sync_pages(self): if self.datastore.get(SERVERKEY_KEY): key = load_key(self.datastore.get(SERVERKEY_KEY)) else: - serverkey = requests.get(SERVERKEY_URL, prefetch=True) + serverkey = requests.get(SERVERKEY_URL, stream=False) key = load_key(serverkey.content) self.datastore.set(SERVERKEY_KEY, serverkey.content) try: # Download the "simple" page from PyPI for this package - simple = requests.get(urlparse.urljoin(SIMPLE_URL, urllib.quote(self.name.encode("utf-8"))), prefetch=True) + simple = requests.get(urlparse.urljoin(SIMPLE_URL, urllib.quote(self.name.encode("utf-8"))), stream=False) simple.raise_for_status() except requests.HTTPError: if simple.status_code == 404: @@ -500,7 +500,7 @@ def verify_and_sync_pages(self): try: # Download the "serversig" page from PyPI for this package - serversig = requests.get(urlparse.urljoin(SERVERSIG_URL, urllib.quote(self.name.encode("utf-8"))), prefetch=True) + serversig = requests.get(urlparse.urljoin(SERVERSIG_URL, urllib.quote(self.name.encode("utf-8"))), stream=False) serversig.raise_for_status() except requests.HTTPError: if serversig.status_code == 404: From 6d89988de8b12cf5341cd6f1a0677aa9f2858486 Mon Sep 17 00:00:00 2001 From: InQuant Date: Fri, 3 May 2013 19:22:51 +0300 Subject: [PATCH 2/5] hanged support new requests property stream, and removed obsolete property prefetch --- crate/pypi/processor.py | 651 +++++++++++----------------------------- 1 file changed, 172 insertions(+), 479 deletions(-) diff --git a/crate/pypi/processor.py b/crate/pypi/processor.py index b2615c2..92d2755 100644 --- a/crate/pypi/processor.py +++ b/crate/pypi/processor.py @@ -1,535 +1,228 @@ -import base64 +import collections +import datetime import hashlib import logging import re -import urllib -import urlparse +import socket +import time import xmlrpclib import redis import requests -import lxml.html + +from celery.task import task from django.conf import settings -from django.core.exceptions import ValidationError -from django.core.files.base import ContentFile from django.db import transaction -from django.utils.timezone import utc +from django.utils.timezone import now -from crate.web.history.models import Event -from crate.web.packages.models import Package, Release, TroveClassifier -from crate.web.packages.models import ReleaseRequire, ReleaseProvide, ReleaseObsolete, ReleaseURI, ReleaseFile -from crate.pypi.exceptions import PackageHashMismatch -from crate.pypi.models import PyPIMirrorPage, PyPIServerSigPage -from crate.pypi.utils.serversigs import load_key, verify +from crate.pypi.utils.lock import Lock +from crate.web.packages.models import Package, ReleaseFile, TroveClassifier, DownloadDelta +from crate.pypi.models import PyPIIndexPage, PyPIDownloadChange +from crate.pypi.processor import PyPIPackage logger = logging.getLogger(__name__) INDEX_URL = "http://pypi.python.org/pypi" -SIMPLE_URL = "http://pypi.python.org/simple/" -SERVERSIG_URL = "http://pypi.python.org/serversig/" -SERVERKEY_URL = "http://pypi.python.org/serverkey" +SERVERKEY_URL = "http://pypi.python.org/serverkey" SERVERKEY_KEY = "crate:pypi:serverkey" -_disutils2_version_capture = re.compile("^(.*?)(?:\(([^()]+)\))?$") -_md5_re = re.compile(r"(https?://pypi\.python\.org/packages/.+)#md5=([a-f0-9]+)") +CLASSIFIER_URL = "http://pypi.python.org/pypi?%3Aaction=list_classifiers" +PYPI_SINCE_KEY = "crate:pypi:since" -def get_helper(data, key, default=None): - if data.get(key) and data[key] != "UNKNOWN": - return data[key] - return "" if default is None else default +def process(name, version, timestamp, action, matches): + package = PyPIPackage(name, version) + package.process() -def split_meta(meta): - meta_split = meta.split(";", 1) - meta_name, meta_version = _disutils2_version_capture.search(meta_split[0].strip()).groups() - meta_env = meta_split[1].strip() if len(meta_split) == 2 else "" - return { - "name": meta_name, - "version": meta_version if meta_version is not None else "", - "environment": meta_env, - } +def remove(name, version, timestamp, action, matches): + package = PyPIPackage(name, version) + package.delete() -class PyPIPackage(object): +def remove_file(name, version, timestamp, action, matches): + package = PyPIPackage(name, version) + package.remove_files(*matches.groups()) - def __init__(self, name, version=None): - self.name = name - self.version = version - self.stored = False +@task +def bulk_process(name, version, timestamp, action, matches): + package = PyPIPackage(name) + package.process(bulk=True) - self.pypi = xmlrpclib.ServerProxy(INDEX_URL, use_datetime=True) - self.datastore = redis.StrictRedis(**dict([(x.lower(), y) for x, y in settings.REDIS[settings.PYPI_DATASTORE].items()])) - def process(self, bulk=False, download=True, skip_modified=True): - self.bulk = bulk - self.skip_modified = skip_modified +@task +def bulk_synchronize(): + pypi = xmlrpclib.ServerProxy(INDEX_URL) - self.fetch() - self.build() + names = set() - with transaction.commit_on_success(): - self.store() + for package in pypi.list_packages(): + names.add(package) + bulk_process.delay(package, None, None, None, None) - if download: - self.download() + for package in Package.objects.exclude(name__in=names): + package.delete() - def delete(self): - with transaction.commit_on_success(): - self.verify_and_sync_pages() - if self.version is None: - # Delete the entire package - packages = Package.objects.filter(name=self.name).select_for_update() - releases = Release.objects.filter(package__in=packages).select_for_update() +@task +def synchronize(since=None): + with Lock("synchronize", expires=60 * 5, timeout=30): + datastore = redis.StrictRedis(**dict([(x.lower(), y) for x, y in settings.REDIS[settings.PYPI_DATASTORE].items()])) - for package in packages: - package.delete() - else: - # Delete only this release - try: - package = Package.objects.get(name=self.name) - except Package.DoesNotExist: - return + if since is None: + s = datastore.get(PYPI_SINCE_KEY) + if s is not None: + since = int(float(s)) - 30 - releases = Release.objects.filter(package=package, version=self.version).select_for_update() + current = time.mktime(datetime.datetime.utcnow().timetuple()) - for release in releases: - release.hidden = True - release.save() + pypi = xmlrpclib.ServerProxy(INDEX_URL) - def remove_files(self, *files): - self.verify_and_sync_pages() + headers = datastore.hgetall(SERVERKEY_KEY + ":headers") + sig = requests.get(SERVERKEY_URL, headers=headers, stream=False) - packages = Package.objects.filter(name=self.name) - releases = Release.objects.filter(package__in=packages) + f not sig.status_code == 304: + sig.raise_for_status() - for rf in ReleaseFile.objects.filter(release__in=releases, filename__in=files): - rf.hidden = True - rf.save() + if sig.content != datastore.get(SERVERKEY_KEY): + logger.error("Key Rollover Detected") + pypi_key_rollover.delay() + datastore.set(SERVERKEY_KEY, sig.content) - def fetch(self): - logger.debug("[FETCH] %s%s" % (self.name, " %s" % self.version if self.version else "")) + datastore.hmset(SERVERKEY_KEY + ":headers", {"If-Modified-Since": sig.headers["Last-Modified"]}) - # Fetch meta data for this release - self.releases = self.get_releases() - self.release_data = self.get_release_data() - self.release_url_data = self.get_release_urls() + if since is None: # @@@ Should we do this for more than just initial? + bulk_synchronize.delay() + else: + logger.info("[SYNCING] Changes since %s" % since) + changes = pypi.changelog(since) + + for name, version, timestamp, action in changes: + line_hash = hashlib.sha256(u":".join([unicode(x) for x in (name, version, timestamp, action)]).encode("utf-8")).hexdigest() + logdata = {"action": action, "name": name, "version": version, "timestamp": timestamp, "hash": line_hash} + + if not datastore.exists("crate:pypi:changelog:%s" % line_hash): + logger.debug("[PROCESS] %(name)s %(version)s %(timestamp)s %(action)s" % logdata) + logger.debug("[HASH] %(name)s %(version)s %(hash)s" % logdata) + + dispatch = collections.OrderedDict([ + (re.compile("^create$"), process), + (re.compile("^new release$"), process), + (re.compile("^add [\w\d\.]+ file .+$"), process), + (re.compile("^remove$"), remove), + (re.compile("^remove file (.+)$"), remove_file), + (re.compile("^update [\w]+(, [\w]+)*$"), process), + #(re.compile("^docupdate$"), docupdate), # @@@ Do Something + #(re.compile("^add (Owner|Maintainer) .+$"), add_user_role), # @@@ Do Something + #(re.compile("^remove (Owner|Maintainer) .+$"), remove_user_role), # @@@ Do Something + ]) + + # Dispatch Based on the action + for pattern, func in dispatch.iteritems(): + matches = pattern.search(action) + if matches is not None: + func(name, version, timestamp, action, matches) + break + else: + logger.warn("[UNHANDLED] %(name)s %(version)s %(timestamp)s %(action)s" % logdata) - def build(self): - logger.debug("[BUILD] %s%s" % (self.name, " %s" % self.version if self.version else "")) + datastore.setex("crate:pypi:changelog:%s" % line_hash, 2629743, datetime.datetime.utcnow().isoformat()) + else: + logger.debug("[SKIP] %(name)s %(version)s %(timestamp)s %(action)s" % logdata) + logger.debug("[HASH] %(name)s %(version)s %(hash)s" % logdata) - # Check to Make sure fetch has been ran - if not hasattr(self, "releases") or not hasattr(self, "release_data") or not hasattr(self, "release_url_data"): - raise Exception("fetch must be called prior to running build") # @@@ Make a Custom Exception + datastore.set(PYPI_SINCE_KEY, current) - # Construct our representation of the releases - self.data = {} - for release in self.releases: - data = {} - data["package"] = self.name - data["version"] = release +@task +def synchronize_troves(): + resp = requests.get(CLASSIFIER_URL) + resp.raise_for_status() - data["author"] = get_helper(self.release_data[release], "author") - data["author_email"] = get_helper(self.release_data[release], "author_email") + current_troves = set(TroveClassifier.objects.all().values_list("trove", flat=True)) + new_troves = set([x.strip() for x in resp.content.splitlines()]) - current_troves - data["maintainer"] = get_helper(self.release_data[release], "maintainer") - data["maintainer_email"] = get_helper(self.release_data[release], "maintainer_email") + with transaction.commit_on_success(): + for classifier in new_troves: + TroveClassifier.objects.get_or_create(trove=classifier) - data["summary"] = get_helper(self.release_data[release], "summary") - data["description"] = get_helper(self.release_data[release], "description") - data["license"] = get_helper(self.release_data[release], "license") - data["keywords"] = get_helper(self.release_data[release], "keywords") # @@@ Switch This to a List - data["platform"] = get_helper(self.release_data[release], "platform") - data["download_uri"] = get_helper(self.release_data[release], "download_url") # @@@ Should This Go Under URI? - data["requires_python"] = get_helper(self.release_data[release], "required_python") +@task +def synchronize_downloads(): + for package in Package.objects.all().order_by("downloads_synced_on").prefetch_related("releases", "releases__files")[:150]: + Package.objects.filter(pk=package.pk).update(downloads_synced_on=now()) - data["stable_version"] = get_helper(self.release_data[release], "stable_version") # @@@ What Is This? + for release in package.releases.all(): + update_download_counts.delay(package.name, release.version, dict([(x.filename, x.pk) for x in release.files.all()])) - data["classifiers"] = get_helper(self.release_data[release], "classifiers", []) - # Construct the URIs - data["uris"] = {} - - if get_helper(self.release_data[release], "home_page"): - data["uris"]["Home Page"] = get_helper(self.release_data[release], "home_page") +@task +def update_download_counts(package_name, version, files, index=None): + try: + pypi = xmlrpclib.ServerProxy(INDEX_URL) - if get_helper(self.release_data[release], "bugtrack_url"): - data["uris"]["Bug Tracker"] = get_helper(self.release_data[release], "bugtrack_url") - - for label, url in [x.split(",", 1) for x in get_helper(self.release_data[release], "project_url", [])]: - data["uris"][label] = url - - # Construct Requires - data["requires"] = [] - - for kind in ["requires", "requires_dist", "requires_external"]: - for require in get_helper(self.release_data[release], kind, []): - req = {"kind": kind if kind is not "requires_external" else "external"} - req.update(split_meta(require)) - data["requires"].append(req) - - # Construct Provides - data["provides"] = [] - - for kind in ["provides", "provides_dist"]: - for provides in get_helper(self.release_data[release], kind, []): - req = {"kind": kind} - req.update(split_meta(provides)) - data["provides"].append(req) - - # Construct Obsoletes - data["obsoletes"] = [] - - for kind in ["obsoletes", "obsoletes_dist"]: - for provides in get_helper(self.release_data[release], kind, []): - req = {"kind": kind} - req.update(split_meta(provides)) - data["obsoletes"].append(req) - - # Construct Files - data["files"] = [] + downloads = pypi.release_downloads(package_name, version) - for url_data in self.release_url_data[release]: - data["files"].append({ - "comment": get_helper(url_data, "comment_text"), - "downloads": get_helper(url_data, "downloads", 0), - "file": get_helper(url_data, "url"), - "filename": get_helper(url_data, "filename"), - "python_version": get_helper(url_data, "python_version"), - "type": get_helper(url_data, "packagetype"), - "digests": { - "md5": url_data["md5_digest"].lower(), - } - }) - if url_data.get("upload_time"): - data["files"][-1]["created"] = url_data["upload_time"].replace(tzinfo=utc) - - for file_data in data["files"]: - if file_data.get("created"): - if data.get("created"): - if file_data["created"] < data["created"]: - data["created"] = file_data["created"] - else: - data["created"] = file_data["created"] - - self.data[release] = data - - logger.debug("[RELEASE BUILD DATA] %s %s %s" % (self.name, release, data)) - - def store(self): - package, _ = Package.objects.get_or_create(name=self.name) - - for data in self.data.values(): - try: - release = Release.objects.get(package=package, version=data["version"]) - except Release.DoesNotExist: - release = Release(package=package, version=data["version"]) - release.full_clean() - release.save() - - # This is an extra database call but it should prevent ShareLocks - Release.objects.filter(pk=release.pk).select_for_update() - - if release.hidden: - release.hidden = False - - for key, value in data.iteritems(): - if key in ["package", "version"]: - # Short circuit package and version - continue - - if key == "uris": - ReleaseURI.objects.filter(release=release).delete() - for label, uri in value.iteritems(): - try: - ReleaseURI.objects.get(release=release, label=label, uri=uri) - except ReleaseURI.DoesNotExist: - try: - release_uri = ReleaseURI(release=release, label=label, uri=uri) - release_uri.full_clean() - release_uri.save(force_insert=True) - except ValidationError: - logger.exception("%s, %s for %s-%s Invalid Data" % (label, uri, release.package.name, release.version)) - elif key == "classifiers": - release.classifiers.clear() - for classifier in value: - try: - trove = TroveClassifier.objects.get(trove=classifier) - except TroveClassifier.DoesNotExist: - trove = TroveClassifier(trove=classifier) - trove.full_clean() - trove.save(force_insert=True) - release.classifiers.add(trove) - elif key in ["requires", "provides", "obsoletes"]: - model = {"requires": ReleaseRequire, "provides": ReleaseProvide, "obsoletes": ReleaseObsolete}.get(key) - model.objects.filter(release=release).delete() - for item in value: - try: - model.objects.get(release=release, **item) - except model.DoesNotExist: - m = model(release=release, **item) - m.full_clean() - m.save(force_insert=True) - elif key == "files": - files = ReleaseFile.objects.filter(release=release) - filenames = dict([(x.filename, x) for x in files]) - - for f in value: - try: - rf = ReleaseFile.objects.get( - release=release, - type=f["type"], - filename=f["filename"], - python_version=f["python_version"], - ) - - for k, v in f.iteritems(): - if k in ["digests", "file", "filename", "type", "python_version"]: - continue - setattr(rf, k, v) - - rf.hidden = False - rf.full_clean() - rf.save() - - except ReleaseFile.DoesNotExist: - rf = ReleaseFile( - release=release, - type=f["type"], - filename=f["filename"], - python_version=f["python_version"], - **dict([(k, v) for k, v in f.iteritems() if k not in ["digests", "file", "filename", "type", "python_version"]]) - ) - - rf.hidden = False - rf.full_clean() - rf.save() - - if f["filename"] in filenames.keys(): - del filenames[f["filename"]] - - if filenames: - for rf in ReleaseFile.objects.filter(pk__in=[f.pk for f in filenames.values()]): - rf.hidden = True - rf.save() - else: - setattr(release, key, value) - - while True: - try: - release.full_clean() - except ValidationError as e: - if "download_uri" in e.message_dict: - release.download_uri = "" - logger.exception("%s-%s Release Validation Error %s" % (release.package.name, release.version, str(e.message_dict))) - else: - raise - else: - break - release.save() - - # Mark unsynced as deleted when bulk processing - if self.bulk: - for release in Release.objects.filter(package=package).exclude(version__in=self.data.keys()): - release.hidden = True - release.save() - - self.stored = True - - def download(self): - # Check to Make sure fetch has been ran - if not hasattr(self, "releases") or not hasattr(self, "release_data") or not hasattr(self, "release_url_data"): - raise Exception("fetch and build must be called prior to running download") # @@@ Make a Custom Exception - - # Check to Make sure build has been ran - if not hasattr(self, "data"): - raise Exception("build must be called prior to running download") # @@@ Make a Custom Exception - - if not self.stored: - raise Exception("package must be stored prior to downloading") # @@@ Make a Custom Exception - - pypi_pages = self.verify_and_sync_pages() - - for data in self.data.values(): - try: - # if pypi_pages.get("has_sig"): - # simple_html = lxml.html.fromstring(pypi_pages["simple"]) - # simple_html.make_links_absolute(urlparse.urljoin(SIMPLE_URL, data["package"]) + "/") - - # verified_md5_hashes = {} - - # for link in simple_html.iterlinks(): - # m = _md5_re.search(link[2]) - # if m: - # url, md5_hash = m.groups() - # verified_md5_hashes[url] = md5_hash - - package = Package.objects.get(name=data["package"]) - release = Release.objects.filter(package=package, version=data["version"]).select_for_update() - - for release_file in ReleaseFile.objects.filter(release=release, filename__in=[x["filename"] for x in data["files"]]).select_for_update(): - file_data = [x for x in data["files"] if x["filename"] == release_file.filename][0] - - datastore_key = "crate:pypi:download:%(url)s" % {"url": file_data["file"]} - stored_file_data = self.datastore.hgetall(datastore_key) - - headers = None - - if stored_file_data and self.skip_modified: - # Stored data exists for this file - if release_file.file: - try: - release_file.file.read() - except IOError: - pass - else: - # We already have a file - if stored_file_data["md5"].lower() == file_data["digests"]["md5"].lower(): - # The supposed MD5 from PyPI matches our local - headers = { - "If-Modified-Since": stored_file_data["modified"], - } - - resp = requests.get(file_data["file"], headers=headers, stream=False) - - if resp.status_code == 304: - logger.info("[DOWNLOAD] skipping %(filename)s because it has not been modified" % {"filename": release_file.filename}) - return - logger.info("[DOWNLOAD] downloading %(filename)s" % {"filename": release_file.filename}) - - resp.raise_for_status() - - # Make sure the MD5 of the file we receive matched what we were told it is - if hashlib.md5(resp.content).hexdigest().lower() != file_data["digests"]["md5"].lower(): - raise PackageHashMismatch("%s does not match %s for %s %s" % ( - hashlib.md5(resp.content).hexdigest().lower(), - file_data["digests"]["md5"].lower(), - file_data["type"], - file_data["filename"], - )) - - release_file.digest = "$".join(["sha256", hashlib.sha256(resp.content).hexdigest().lower()]) - - release_file.full_clean() - release_file.file.save(file_data["filename"], ContentFile(resp.content), save=False) - release_file.save() - - Event.objects.create( - package=release_file.release.package.name, - version=release_file.release.version, - action=Event.ACTIONS.file_add, - data={ - "filename": release_file.filename, - "digest": release_file.digest, - "uri": release_file.get_absolute_url(), - } - ) - - # Store data relating to this file (if modified etc) - stored_file_data = { - "md5": file_data["digests"]["md5"].lower(), - "modified": resp.headers.get("Last-Modified"), - } - - if resp.headers.get("Last-Modified"): - self.datastore.hmset(datastore_key, { - "md5": file_data["digests"]["md5"].lower(), - "modified": resp.headers["Last-Modified"], - }) - # Set a year expire on the key so that stale entries disappear - self.datastore.expire(datastore_key, 31556926) - else: - self.datastore.delete(datastore_key) - except requests.HTTPError: - logger.exception("[DOWNLOAD ERROR]") + for filename, download_count in downloads: + if filename in files: + with transaction.commit_on_success(): + for releasefile in ReleaseFile.objects.filter(pk=files[filename]).select_for_update(): + old = releasefile.downloads + releasefile.downloads = download_count + releasefile.save() - def get_releases(self): - if self.version is None: - releases = self.pypi.package_releases(self.name, True) - else: - releases = [self.version] - - logger.debug("[RELEASES] %s%s [%s]" % (self.name, " %s" % self.version if self.version else "", ", ".join(releases))) - - return releases - - def get_release_data(self): - release_data = [] - for release in self.releases: - data = self.pypi.release_data(self.name, release) - logger.debug("[RELEASE DATA] %s %s" % (self.name, release)) - release_data.append([release, data]) - return dict(release_data) - - def get_release_urls(self): - release_url_data = [] - for release in self.releases: - data = self.pypi.release_urls(self.name, release) - logger.info("[RELEASE URL] %s %s" % (self.name, release)) - logger.debug("[RELEASE URL DATA] %s %s %s" % (self.name, release, data)) - release_url_data.append([release, data]) - return dict(release_url_data) - - def verify_and_sync_pages(self): - # Get the Server Key for PyPI - if self.datastore.get(SERVERKEY_KEY): - key = load_key(self.datastore.get(SERVERKEY_KEY)) - else: - serverkey = requests.get(SERVERKEY_URL, stream=False) - key = load_key(serverkey.content) - self.datastore.set(SERVERKEY_KEY, serverkey.content) - - try: - # Download the "simple" page from PyPI for this package - simple = requests.get(urlparse.urljoin(SIMPLE_URL, urllib.quote(self.name.encode("utf-8"))), stream=False) - simple.raise_for_status() - except requests.HTTPError: - if simple.status_code == 404: - return {"has_sig": False} - raise - except ValueError: - logger.exception("Got a ValueError from downloading the Simple page") - return {"has_sig": False} - - try: - # Download the "serversig" page from PyPI for this package - serversig = requests.get(urlparse.urljoin(SERVERSIG_URL, urllib.quote(self.name.encode("utf-8"))), stream=False) - serversig.raise_for_status() - except requests.HTTPError: - if serversig.status_code == 404: - return {"has_sig": False} - raise - - try: - if not verify(key, simple.content, serversig.content): - raise Exception("Simple API page does not match serversig") # @@@ This Should be Custom Exception - except (UnicodeDecodeError, UnicodeEncodeError, ValueError, AssertionError): - logger.exception("Exception trying to verify %s" % self.name) # @@@ Figure out a better way to handle this - - try: - package = Package.objects.get(name=self.name) - except Package.DoesNotExist: - logger.exception("Error Trying To Verify %s (Querying Package)" % self.name) - return - - simple_mirror, c = PyPIMirrorPage.objects.get_or_create(package=package, defaults={"content": simple.content}) - if not c and simple_mirror.content != simple.content: - simple_mirror.content = simple.content - simple_mirror.save() - - serversig_mirror, c = PyPIServerSigPage.objects.get_or_create(package=package, defaults={"content": serversig.content.encode("base64")}) - serversig_mirror.content = base64.b64encode(serversig.content) - serversig_mirror.save() - - return { - "simple": simple.content, - "serversig": serversig.content, - "has_sig": True, - } + change = releasefile.downloads - old + if change: + PyPIDownloadChange.objects.create(file=releasefile, change=change) + except socket.error: + logger.exception("[DOWNLOAD SYNC] Network Error") + + +@task +def pypi_key_rollover(): + datastore = redis.StrictRedis(**dict([(x.lower(), y) for x, y in settings.REDIS[settings.PYPI_DATASTORE].items()])) + + sig = requests.get(SERVERKEY_URL, stream=False) + sig.raise_for_status() + + datastore.set(SERVERKEY_KEY, sig.content) + + for package in Package.objects.all(): + fetch_server_key.delay(package.name) + + +@task +def fetch_server_key(package): + p = PyPIPackage(package) + p.verify_and_sync_pages() + + +@task +def refresh_pypi_package_index_cache(): + r = requests.get("http://pypi.python.org/simple/", stream=False) + PyPIIndexPage.objects.create(content=r.content) + + +@task +def integrate_download_deltas(): + with Lock("pypi-integrate-downloads", expires=60 * 5, timeout=30): + count = 0 + + for d in PyPIDownloadChange.objects.filter(integrated=False)[:1000]: + with transaction.commit_on_success(): + dd, c = DownloadDelta.objects.get_or_create(file=d.file, date=d.created.date(), defaults={"delta": d.change}) + + if not c: + DownloadDelta.objects.filter(pk=dd.pk).select_for_update() + + dd.delta += d.change + dd.save() + + PyPIDownloadChange.objects.filter(pk=d.pk).update(integrated=True) + count += 1 + + return count From f976091f94fb3b18803201e5de547917c8402a9f Mon Sep 17 00:00:00 2001 From: InQuant Date: Fri, 3 May 2013 19:56:19 +0300 Subject: [PATCH 3/5] Typo --- crate/pypi/processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crate/pypi/processor.py b/crate/pypi/processor.py index 92d2755..1aae6bc 100644 --- a/crate/pypi/processor.py +++ b/crate/pypi/processor.py @@ -85,7 +85,7 @@ def synchronize(since=None): headers = datastore.hgetall(SERVERKEY_KEY + ":headers") sig = requests.get(SERVERKEY_URL, headers=headers, stream=False) - f not sig.status_code == 304: + if not sig.status_code == 304: sig.raise_for_status() if sig.content != datastore.get(SERVERKEY_KEY): From 0ba9e28fba55aab42dece5b89a1f7926ce00fc98 Mon Sep 17 00:00:00 2001 From: InQuant Date: Fri, 3 May 2013 20:12:21 +0300 Subject: [PATCH 4/5] Update processor.py --- crate/pypi/processor.py | 651 +++++++++++++++++++++++++++++----------- 1 file changed, 479 insertions(+), 172 deletions(-) diff --git a/crate/pypi/processor.py b/crate/pypi/processor.py index 1aae6bc..b2615c2 100644 --- a/crate/pypi/processor.py +++ b/crate/pypi/processor.py @@ -1,228 +1,535 @@ -import collections -import datetime +import base64 import hashlib import logging import re -import socket -import time +import urllib +import urlparse import xmlrpclib import redis import requests - -from celery.task import task +import lxml.html from django.conf import settings +from django.core.exceptions import ValidationError +from django.core.files.base import ContentFile from django.db import transaction -from django.utils.timezone import now +from django.utils.timezone import utc -from crate.pypi.utils.lock import Lock -from crate.web.packages.models import Package, ReleaseFile, TroveClassifier, DownloadDelta -from crate.pypi.models import PyPIIndexPage, PyPIDownloadChange -from crate.pypi.processor import PyPIPackage +from crate.web.history.models import Event +from crate.web.packages.models import Package, Release, TroveClassifier +from crate.web.packages.models import ReleaseRequire, ReleaseProvide, ReleaseObsolete, ReleaseURI, ReleaseFile +from crate.pypi.exceptions import PackageHashMismatch +from crate.pypi.models import PyPIMirrorPage, PyPIServerSigPage +from crate.pypi.utils.serversigs import load_key, verify logger = logging.getLogger(__name__) INDEX_URL = "http://pypi.python.org/pypi" - +SIMPLE_URL = "http://pypi.python.org/simple/" +SERVERSIG_URL = "http://pypi.python.org/serversig/" SERVERKEY_URL = "http://pypi.python.org/serverkey" -SERVERKEY_KEY = "crate:pypi:serverkey" - -CLASSIFIER_URL = "http://pypi.python.org/pypi?%3Aaction=list_classifiers" - -PYPI_SINCE_KEY = "crate:pypi:since" - - -def process(name, version, timestamp, action, matches): - package = PyPIPackage(name, version) - package.process() - - -def remove(name, version, timestamp, action, matches): - package = PyPIPackage(name, version) - package.delete() - - -def remove_file(name, version, timestamp, action, matches): - package = PyPIPackage(name, version) - package.remove_files(*matches.groups()) - - -@task -def bulk_process(name, version, timestamp, action, matches): - package = PyPIPackage(name) - package.process(bulk=True) - - -@task -def bulk_synchronize(): - pypi = xmlrpclib.ServerProxy(INDEX_URL) - names = set() - - for package in pypi.list_packages(): - names.add(package) - bulk_process.delay(package, None, None, None, None) - - for package in Package.objects.exclude(name__in=names): - package.delete() +SERVERKEY_KEY = "crate:pypi:serverkey" +_disutils2_version_capture = re.compile("^(.*?)(?:\(([^()]+)\))?$") +_md5_re = re.compile(r"(https?://pypi\.python\.org/packages/.+)#md5=([a-f0-9]+)") -@task -def synchronize(since=None): - with Lock("synchronize", expires=60 * 5, timeout=30): - datastore = redis.StrictRedis(**dict([(x.lower(), y) for x, y in settings.REDIS[settings.PYPI_DATASTORE].items()])) - if since is None: - s = datastore.get(PYPI_SINCE_KEY) - if s is not None: - since = int(float(s)) - 30 +def get_helper(data, key, default=None): + if data.get(key) and data[key] != "UNKNOWN": + return data[key] + return "" if default is None else default - current = time.mktime(datetime.datetime.utcnow().timetuple()) - pypi = xmlrpclib.ServerProxy(INDEX_URL) +def split_meta(meta): + meta_split = meta.split(";", 1) + meta_name, meta_version = _disutils2_version_capture.search(meta_split[0].strip()).groups() + meta_env = meta_split[1].strip() if len(meta_split) == 2 else "" - headers = datastore.hgetall(SERVERKEY_KEY + ":headers") - sig = requests.get(SERVERKEY_URL, headers=headers, stream=False) + return { + "name": meta_name, + "version": meta_version if meta_version is not None else "", + "environment": meta_env, + } - if not sig.status_code == 304: - sig.raise_for_status() - if sig.content != datastore.get(SERVERKEY_KEY): - logger.error("Key Rollover Detected") - pypi_key_rollover.delay() - datastore.set(SERVERKEY_KEY, sig.content) +class PyPIPackage(object): - datastore.hmset(SERVERKEY_KEY + ":headers", {"If-Modified-Since": sig.headers["Last-Modified"]}) + def __init__(self, name, version=None): + self.name = name + self.version = version - if since is None: # @@@ Should we do this for more than just initial? - bulk_synchronize.delay() - else: - logger.info("[SYNCING] Changes since %s" % since) - changes = pypi.changelog(since) - - for name, version, timestamp, action in changes: - line_hash = hashlib.sha256(u":".join([unicode(x) for x in (name, version, timestamp, action)]).encode("utf-8")).hexdigest() - logdata = {"action": action, "name": name, "version": version, "timestamp": timestamp, "hash": line_hash} - - if not datastore.exists("crate:pypi:changelog:%s" % line_hash): - logger.debug("[PROCESS] %(name)s %(version)s %(timestamp)s %(action)s" % logdata) - logger.debug("[HASH] %(name)s %(version)s %(hash)s" % logdata) - - dispatch = collections.OrderedDict([ - (re.compile("^create$"), process), - (re.compile("^new release$"), process), - (re.compile("^add [\w\d\.]+ file .+$"), process), - (re.compile("^remove$"), remove), - (re.compile("^remove file (.+)$"), remove_file), - (re.compile("^update [\w]+(, [\w]+)*$"), process), - #(re.compile("^docupdate$"), docupdate), # @@@ Do Something - #(re.compile("^add (Owner|Maintainer) .+$"), add_user_role), # @@@ Do Something - #(re.compile("^remove (Owner|Maintainer) .+$"), remove_user_role), # @@@ Do Something - ]) - - # Dispatch Based on the action - for pattern, func in dispatch.iteritems(): - matches = pattern.search(action) - if matches is not None: - func(name, version, timestamp, action, matches) - break - else: - logger.warn("[UNHANDLED] %(name)s %(version)s %(timestamp)s %(action)s" % logdata) - - datastore.setex("crate:pypi:changelog:%s" % line_hash, 2629743, datetime.datetime.utcnow().isoformat()) - else: - logger.debug("[SKIP] %(name)s %(version)s %(timestamp)s %(action)s" % logdata) - logger.debug("[HASH] %(name)s %(version)s %(hash)s" % logdata) + self.stored = False - datastore.set(PYPI_SINCE_KEY, current) + self.pypi = xmlrpclib.ServerProxy(INDEX_URL, use_datetime=True) + self.datastore = redis.StrictRedis(**dict([(x.lower(), y) for x, y in settings.REDIS[settings.PYPI_DATASTORE].items()])) + def process(self, bulk=False, download=True, skip_modified=True): + self.bulk = bulk + self.skip_modified = skip_modified -@task -def synchronize_troves(): - resp = requests.get(CLASSIFIER_URL) - resp.raise_for_status() + self.fetch() + self.build() - current_troves = set(TroveClassifier.objects.all().values_list("trove", flat=True)) - new_troves = set([x.strip() for x in resp.content.splitlines()]) - current_troves + with transaction.commit_on_success(): + self.store() - with transaction.commit_on_success(): - for classifier in new_troves: - TroveClassifier.objects.get_or_create(trove=classifier) + if download: + self.download() + def delete(self): + with transaction.commit_on_success(): + self.verify_and_sync_pages() -@task -def synchronize_downloads(): - for package in Package.objects.all().order_by("downloads_synced_on").prefetch_related("releases", "releases__files")[:150]: - Package.objects.filter(pk=package.pk).update(downloads_synced_on=now()) + if self.version is None: + # Delete the entire package + packages = Package.objects.filter(name=self.name).select_for_update() + releases = Release.objects.filter(package__in=packages).select_for_update() - for release in package.releases.all(): - update_download_counts.delay(package.name, release.version, dict([(x.filename, x.pk) for x in release.files.all()])) + for package in packages: + package.delete() + else: + # Delete only this release + try: + package = Package.objects.get(name=self.name) + except Package.DoesNotExist: + return + releases = Release.objects.filter(package=package, version=self.version).select_for_update() -@task -def update_download_counts(package_name, version, files, index=None): - try: - pypi = xmlrpclib.ServerProxy(INDEX_URL) + for release in releases: + release.hidden = True + release.save() - downloads = pypi.release_downloads(package_name, version) + def remove_files(self, *files): + self.verify_and_sync_pages() - for filename, download_count in downloads: - if filename in files: - with transaction.commit_on_success(): - for releasefile in ReleaseFile.objects.filter(pk=files[filename]).select_for_update(): - old = releasefile.downloads - releasefile.downloads = download_count - releasefile.save() + packages = Package.objects.filter(name=self.name) + releases = Release.objects.filter(package__in=packages) - change = releasefile.downloads - old - if change: - PyPIDownloadChange.objects.create(file=releasefile, change=change) - except socket.error: - logger.exception("[DOWNLOAD SYNC] Network Error") + for rf in ReleaseFile.objects.filter(release__in=releases, filename__in=files): + rf.hidden = True + rf.save() + def fetch(self): + logger.debug("[FETCH] %s%s" % (self.name, " %s" % self.version if self.version else "")) -@task -def pypi_key_rollover(): - datastore = redis.StrictRedis(**dict([(x.lower(), y) for x, y in settings.REDIS[settings.PYPI_DATASTORE].items()])) + # Fetch meta data for this release + self.releases = self.get_releases() + self.release_data = self.get_release_data() + self.release_url_data = self.get_release_urls() - sig = requests.get(SERVERKEY_URL, stream=False) - sig.raise_for_status() + def build(self): + logger.debug("[BUILD] %s%s" % (self.name, " %s" % self.version if self.version else "")) - datastore.set(SERVERKEY_KEY, sig.content) + # Check to Make sure fetch has been ran + if not hasattr(self, "releases") or not hasattr(self, "release_data") or not hasattr(self, "release_url_data"): + raise Exception("fetch must be called prior to running build") # @@@ Make a Custom Exception - for package in Package.objects.all(): - fetch_server_key.delay(package.name) + # Construct our representation of the releases + self.data = {} + for release in self.releases: + data = {} + data["package"] = self.name + data["version"] = release -@task -def fetch_server_key(package): - p = PyPIPackage(package) - p.verify_and_sync_pages() + data["author"] = get_helper(self.release_data[release], "author") + data["author_email"] = get_helper(self.release_data[release], "author_email") + data["maintainer"] = get_helper(self.release_data[release], "maintainer") + data["maintainer_email"] = get_helper(self.release_data[release], "maintainer_email") -@task -def refresh_pypi_package_index_cache(): - r = requests.get("http://pypi.python.org/simple/", stream=False) - PyPIIndexPage.objects.create(content=r.content) + data["summary"] = get_helper(self.release_data[release], "summary") + data["description"] = get_helper(self.release_data[release], "description") + data["license"] = get_helper(self.release_data[release], "license") + data["keywords"] = get_helper(self.release_data[release], "keywords") # @@@ Switch This to a List + data["platform"] = get_helper(self.release_data[release], "platform") + data["download_uri"] = get_helper(self.release_data[release], "download_url") # @@@ Should This Go Under URI? + data["requires_python"] = get_helper(self.release_data[release], "required_python") -@task -def integrate_download_deltas(): - with Lock("pypi-integrate-downloads", expires=60 * 5, timeout=30): - count = 0 + data["stable_version"] = get_helper(self.release_data[release], "stable_version") # @@@ What Is This? - for d in PyPIDownloadChange.objects.filter(integrated=False)[:1000]: - with transaction.commit_on_success(): - dd, c = DownloadDelta.objects.get_or_create(file=d.file, date=d.created.date(), defaults={"delta": d.change}) + data["classifiers"] = get_helper(self.release_data[release], "classifiers", []) - if not c: - DownloadDelta.objects.filter(pk=dd.pk).select_for_update() + # Construct the URIs + data["uris"] = {} + + if get_helper(self.release_data[release], "home_page"): + data["uris"]["Home Page"] = get_helper(self.release_data[release], "home_page") - dd.delta += d.change - dd.save() + if get_helper(self.release_data[release], "bugtrack_url"): + data["uris"]["Bug Tracker"] = get_helper(self.release_data[release], "bugtrack_url") + + for label, url in [x.split(",", 1) for x in get_helper(self.release_data[release], "project_url", [])]: + data["uris"][label] = url + + # Construct Requires + data["requires"] = [] + + for kind in ["requires", "requires_dist", "requires_external"]: + for require in get_helper(self.release_data[release], kind, []): + req = {"kind": kind if kind is not "requires_external" else "external"} + req.update(split_meta(require)) + data["requires"].append(req) + + # Construct Provides + data["provides"] = [] + + for kind in ["provides", "provides_dist"]: + for provides in get_helper(self.release_data[release], kind, []): + req = {"kind": kind} + req.update(split_meta(provides)) + data["provides"].append(req) + + # Construct Obsoletes + data["obsoletes"] = [] + + for kind in ["obsoletes", "obsoletes_dist"]: + for provides in get_helper(self.release_data[release], kind, []): + req = {"kind": kind} + req.update(split_meta(provides)) + data["obsoletes"].append(req) + + # Construct Files + data["files"] = [] - PyPIDownloadChange.objects.filter(pk=d.pk).update(integrated=True) - count += 1 + for url_data in self.release_url_data[release]: + data["files"].append({ + "comment": get_helper(url_data, "comment_text"), + "downloads": get_helper(url_data, "downloads", 0), + "file": get_helper(url_data, "url"), + "filename": get_helper(url_data, "filename"), + "python_version": get_helper(url_data, "python_version"), + "type": get_helper(url_data, "packagetype"), + "digests": { + "md5": url_data["md5_digest"].lower(), + } + }) + if url_data.get("upload_time"): + data["files"][-1]["created"] = url_data["upload_time"].replace(tzinfo=utc) + + for file_data in data["files"]: + if file_data.get("created"): + if data.get("created"): + if file_data["created"] < data["created"]: + data["created"] = file_data["created"] + else: + data["created"] = file_data["created"] + + self.data[release] = data + + logger.debug("[RELEASE BUILD DATA] %s %s %s" % (self.name, release, data)) + + def store(self): + package, _ = Package.objects.get_or_create(name=self.name) + + for data in self.data.values(): + try: + release = Release.objects.get(package=package, version=data["version"]) + except Release.DoesNotExist: + release = Release(package=package, version=data["version"]) + release.full_clean() + release.save() + + # This is an extra database call but it should prevent ShareLocks + Release.objects.filter(pk=release.pk).select_for_update() + + if release.hidden: + release.hidden = False + + for key, value in data.iteritems(): + if key in ["package", "version"]: + # Short circuit package and version + continue + + if key == "uris": + ReleaseURI.objects.filter(release=release).delete() + for label, uri in value.iteritems(): + try: + ReleaseURI.objects.get(release=release, label=label, uri=uri) + except ReleaseURI.DoesNotExist: + try: + release_uri = ReleaseURI(release=release, label=label, uri=uri) + release_uri.full_clean() + release_uri.save(force_insert=True) + except ValidationError: + logger.exception("%s, %s for %s-%s Invalid Data" % (label, uri, release.package.name, release.version)) + elif key == "classifiers": + release.classifiers.clear() + for classifier in value: + try: + trove = TroveClassifier.objects.get(trove=classifier) + except TroveClassifier.DoesNotExist: + trove = TroveClassifier(trove=classifier) + trove.full_clean() + trove.save(force_insert=True) + release.classifiers.add(trove) + elif key in ["requires", "provides", "obsoletes"]: + model = {"requires": ReleaseRequire, "provides": ReleaseProvide, "obsoletes": ReleaseObsolete}.get(key) + model.objects.filter(release=release).delete() + for item in value: + try: + model.objects.get(release=release, **item) + except model.DoesNotExist: + m = model(release=release, **item) + m.full_clean() + m.save(force_insert=True) + elif key == "files": + files = ReleaseFile.objects.filter(release=release) + filenames = dict([(x.filename, x) for x in files]) + + for f in value: + try: + rf = ReleaseFile.objects.get( + release=release, + type=f["type"], + filename=f["filename"], + python_version=f["python_version"], + ) + + for k, v in f.iteritems(): + if k in ["digests", "file", "filename", "type", "python_version"]: + continue + setattr(rf, k, v) + + rf.hidden = False + rf.full_clean() + rf.save() + + except ReleaseFile.DoesNotExist: + rf = ReleaseFile( + release=release, + type=f["type"], + filename=f["filename"], + python_version=f["python_version"], + **dict([(k, v) for k, v in f.iteritems() if k not in ["digests", "file", "filename", "type", "python_version"]]) + ) + + rf.hidden = False + rf.full_clean() + rf.save() + + if f["filename"] in filenames.keys(): + del filenames[f["filename"]] + + if filenames: + for rf in ReleaseFile.objects.filter(pk__in=[f.pk for f in filenames.values()]): + rf.hidden = True + rf.save() + else: + setattr(release, key, value) + + while True: + try: + release.full_clean() + except ValidationError as e: + if "download_uri" in e.message_dict: + release.download_uri = "" + logger.exception("%s-%s Release Validation Error %s" % (release.package.name, release.version, str(e.message_dict))) + else: + raise + else: + break + release.save() + + # Mark unsynced as deleted when bulk processing + if self.bulk: + for release in Release.objects.filter(package=package).exclude(version__in=self.data.keys()): + release.hidden = True + release.save() + + self.stored = True + + def download(self): + # Check to Make sure fetch has been ran + if not hasattr(self, "releases") or not hasattr(self, "release_data") or not hasattr(self, "release_url_data"): + raise Exception("fetch and build must be called prior to running download") # @@@ Make a Custom Exception + + # Check to Make sure build has been ran + if not hasattr(self, "data"): + raise Exception("build must be called prior to running download") # @@@ Make a Custom Exception + + if not self.stored: + raise Exception("package must be stored prior to downloading") # @@@ Make a Custom Exception + + pypi_pages = self.verify_and_sync_pages() + + for data in self.data.values(): + try: + # if pypi_pages.get("has_sig"): + # simple_html = lxml.html.fromstring(pypi_pages["simple"]) + # simple_html.make_links_absolute(urlparse.urljoin(SIMPLE_URL, data["package"]) + "/") + + # verified_md5_hashes = {} + + # for link in simple_html.iterlinks(): + # m = _md5_re.search(link[2]) + # if m: + # url, md5_hash = m.groups() + # verified_md5_hashes[url] = md5_hash + + package = Package.objects.get(name=data["package"]) + release = Release.objects.filter(package=package, version=data["version"]).select_for_update() + + for release_file in ReleaseFile.objects.filter(release=release, filename__in=[x["filename"] for x in data["files"]]).select_for_update(): + file_data = [x for x in data["files"] if x["filename"] == release_file.filename][0] + + datastore_key = "crate:pypi:download:%(url)s" % {"url": file_data["file"]} + stored_file_data = self.datastore.hgetall(datastore_key) + + headers = None + + if stored_file_data and self.skip_modified: + # Stored data exists for this file + if release_file.file: + try: + release_file.file.read() + except IOError: + pass + else: + # We already have a file + if stored_file_data["md5"].lower() == file_data["digests"]["md5"].lower(): + # The supposed MD5 from PyPI matches our local + headers = { + "If-Modified-Since": stored_file_data["modified"], + } + + resp = requests.get(file_data["file"], headers=headers, stream=False) + + if resp.status_code == 304: + logger.info("[DOWNLOAD] skipping %(filename)s because it has not been modified" % {"filename": release_file.filename}) + return + logger.info("[DOWNLOAD] downloading %(filename)s" % {"filename": release_file.filename}) + + resp.raise_for_status() + + # Make sure the MD5 of the file we receive matched what we were told it is + if hashlib.md5(resp.content).hexdigest().lower() != file_data["digests"]["md5"].lower(): + raise PackageHashMismatch("%s does not match %s for %s %s" % ( + hashlib.md5(resp.content).hexdigest().lower(), + file_data["digests"]["md5"].lower(), + file_data["type"], + file_data["filename"], + )) + + release_file.digest = "$".join(["sha256", hashlib.sha256(resp.content).hexdigest().lower()]) + + release_file.full_clean() + release_file.file.save(file_data["filename"], ContentFile(resp.content), save=False) + release_file.save() + + Event.objects.create( + package=release_file.release.package.name, + version=release_file.release.version, + action=Event.ACTIONS.file_add, + data={ + "filename": release_file.filename, + "digest": release_file.digest, + "uri": release_file.get_absolute_url(), + } + ) + + # Store data relating to this file (if modified etc) + stored_file_data = { + "md5": file_data["digests"]["md5"].lower(), + "modified": resp.headers.get("Last-Modified"), + } + + if resp.headers.get("Last-Modified"): + self.datastore.hmset(datastore_key, { + "md5": file_data["digests"]["md5"].lower(), + "modified": resp.headers["Last-Modified"], + }) + # Set a year expire on the key so that stale entries disappear + self.datastore.expire(datastore_key, 31556926) + else: + self.datastore.delete(datastore_key) + except requests.HTTPError: + logger.exception("[DOWNLOAD ERROR]") - return count + def get_releases(self): + if self.version is None: + releases = self.pypi.package_releases(self.name, True) + else: + releases = [self.version] + + logger.debug("[RELEASES] %s%s [%s]" % (self.name, " %s" % self.version if self.version else "", ", ".join(releases))) + + return releases + + def get_release_data(self): + release_data = [] + for release in self.releases: + data = self.pypi.release_data(self.name, release) + logger.debug("[RELEASE DATA] %s %s" % (self.name, release)) + release_data.append([release, data]) + return dict(release_data) + + def get_release_urls(self): + release_url_data = [] + for release in self.releases: + data = self.pypi.release_urls(self.name, release) + logger.info("[RELEASE URL] %s %s" % (self.name, release)) + logger.debug("[RELEASE URL DATA] %s %s %s" % (self.name, release, data)) + release_url_data.append([release, data]) + return dict(release_url_data) + + def verify_and_sync_pages(self): + # Get the Server Key for PyPI + if self.datastore.get(SERVERKEY_KEY): + key = load_key(self.datastore.get(SERVERKEY_KEY)) + else: + serverkey = requests.get(SERVERKEY_URL, stream=False) + key = load_key(serverkey.content) + self.datastore.set(SERVERKEY_KEY, serverkey.content) + + try: + # Download the "simple" page from PyPI for this package + simple = requests.get(urlparse.urljoin(SIMPLE_URL, urllib.quote(self.name.encode("utf-8"))), stream=False) + simple.raise_for_status() + except requests.HTTPError: + if simple.status_code == 404: + return {"has_sig": False} + raise + except ValueError: + logger.exception("Got a ValueError from downloading the Simple page") + return {"has_sig": False} + + try: + # Download the "serversig" page from PyPI for this package + serversig = requests.get(urlparse.urljoin(SERVERSIG_URL, urllib.quote(self.name.encode("utf-8"))), stream=False) + serversig.raise_for_status() + except requests.HTTPError: + if serversig.status_code == 404: + return {"has_sig": False} + raise + + try: + if not verify(key, simple.content, serversig.content): + raise Exception("Simple API page does not match serversig") # @@@ This Should be Custom Exception + except (UnicodeDecodeError, UnicodeEncodeError, ValueError, AssertionError): + logger.exception("Exception trying to verify %s" % self.name) # @@@ Figure out a better way to handle this + + try: + package = Package.objects.get(name=self.name) + except Package.DoesNotExist: + logger.exception("Error Trying To Verify %s (Querying Package)" % self.name) + return + + simple_mirror, c = PyPIMirrorPage.objects.get_or_create(package=package, defaults={"content": simple.content}) + if not c and simple_mirror.content != simple.content: + simple_mirror.content = simple.content + simple_mirror.save() + + serversig_mirror, c = PyPIServerSigPage.objects.get_or_create(package=package, defaults={"content": serversig.content.encode("base64")}) + serversig_mirror.content = base64.b64encode(serversig.content) + serversig_mirror.save() + + return { + "simple": simple.content, + "serversig": serversig.content, + "has_sig": True, + } From 99329c61352fbda9f14e81d26e51d155e65c8ded Mon Sep 17 00:00:00 2001 From: InQuant Date: Sun, 5 May 2013 21:20:38 +0200 Subject: [PATCH 5/5] refresh method also needs new requests property --- crate/pypi/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crate/pypi/tasks.py b/crate/pypi/tasks.py index 821a7c2..d3dd0c4 100644 --- a/crate/pypi/tasks.py +++ b/crate/pypi/tasks.py @@ -40,7 +40,7 @@ def process(name, version, timestamp, action, matches): def remove(name, version, timestamp, action, matches): package = PyPIPackage(name, version) - package.delete() + eackage.delete() def remove_file(name, version, timestamp, action, matches): @@ -203,7 +203,7 @@ def fetch_server_key(package): @task def refresh_pypi_package_index_cache(): - r = requests.get("http://pypi.python.org/simple/", prefetch=True) + r = requests.get("http://pypi.python.org/simple/", stream=False) PyPIIndexPage.objects.create(content=r.content)