From ef477a7904b7ec221718b3b460d74b9f8c5a1281 Mon Sep 17 00:00:00 2001 From: Marion Deveaud Date: Fri, 25 Sep 2020 14:10:40 +0200 Subject: [PATCH 1/3] Revert "feat(group): use header groupName wherever possible, handle 403" This reverts commit e8369867fd50997d0601503e5742a7f9356215d6. --- fossology/__init__.py | 30 ++------- fossology/folders.py | 17 ++--- fossology/jobs.py | 26 ++------ fossology/obj.py | 19 +++--- fossology/report.py | 31 +-------- fossology/uploads.py | 147 +++++++++--------------------------------- tests/test_folders.py | 9 --- tests/test_jobs.py | 22 +------ tests/test_report.py | 84 ++++++++++-------------- tests/test_search.py | 10 --- tests/test_uploads.py | 102 +++++------------------------ 11 files changed, 114 insertions(+), 383 deletions(-) diff --git a/fossology/__init__.py b/fossology/__init__.py index cf309a6..e37d73a 100644 --- a/fossology/__init__.py +++ b/fossology/__init__.py @@ -6,16 +6,12 @@ import requests from datetime import date, timedelta -from fossology.obj import Agents, User, TokenScope, SearchTypes, get_options -from fossology.folders import Folders -from fossology.uploads import Uploads -from fossology.jobs import Jobs -from fossology.report import Report -from fossology.exceptions import ( - AuthenticationError, - AuthorizationError, - FossologyApiError, -) +from .obj import Agents, User, TokenScope, SearchTypes +from .folders import Folders +from .uploads import Uploads +from .jobs import Jobs +from .report import Report +from .exceptions import AuthenticationError, FossologyApiError logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -212,7 +208,7 @@ def delete_user(self, user): :raises FossologyApiError: if the REST call failed """ response = self.session.delete(f"{self.api}/users/{user.id}") - + print(response.json()) if response.status_code == 202: return else: @@ -228,7 +224,6 @@ def search( filesizemax=None, license=None, copyright=None, - group=None, ): """Search for a specific file @@ -241,7 +236,6 @@ def search( :param filesizemax: Max filesize in bytes :param license: License search filter :param copyright: Copyright search filter - :param group: the group name to choose while performing search (default: None) :type searchType: SearchType Enum :type filename: string :type tag: string @@ -249,11 +243,9 @@ def search( :type filesizemax: int :type license: string :type copyright: string - :type group: string :return: list of items corresponding to the search criteria :rtype: JSON :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user can't access the group """ headers = {"searchType": searchType.value} if filename: @@ -268,18 +260,10 @@ def search( headers["license"] = license if copyright: headers["copyright"] = copyright - if group: - headers["groupName"] = group response = self.session.get(f"{self.api}/search", headers=headers) - if response.status_code == 200: return response.json() - - elif response.status_code == 403: - description = f"Searching {get_options(group)}not authorized" - raise AuthorizationError(description, response) - else: description = "Unable to get a result with the given search criteria" raise FossologyApiError(description, response) diff --git a/fossology/folders.py b/fossology/folders.py index e038b00..b892e8c 100644 --- a/fossology/folders.py +++ b/fossology/folders.py @@ -3,8 +3,8 @@ import logging -from fossology.obj import Folder, get_options -from fossology.exceptions import AuthorizationError, FossologyApiError +from .obj import Folder +from .exceptions import AuthorizationError, FossologyApiError logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -58,7 +58,7 @@ def detail_folder(self, folder_id): description = f"Error while getting details for folder {folder_id}" raise FossologyApiError(description, response) - def create_folder(self, parent, name, description=None, group=None): + def create_folder(self, parent, name, description=None): """Create a new (sub)folder The name of the new folder must be unique under the same parent. @@ -67,25 +67,20 @@ def create_folder(self, parent, name, description=None, group=None): :param parent: the parent folder :param name: the name of the folder - :param description: a meaningful description for the folder (default: None) - :param group: the name of the group chosen to create the folder (default: None) + :param description: a meaningful description for the folder (optional) :type parent: Folder() object :type name: str :type description: str - :type group: string :return: the folder newly created (or already existing) - or None :rtype: Folder() object :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user is not allowed to write in the folder or access the group + :raises AuthorizationError: if the user is not allowed to write in the folder """ headers = { "parentFolder": f"{parent.id}", "folderName": f"{name}", "folderDescription": f"{description}", } - if group: - headers["groupName"] = group - response = self.session.post(f"{self.api}/folders", headers=headers) if response.status_code == 200: @@ -101,7 +96,7 @@ def create_folder(self, parent, name, description=None, group=None): return self.detail_folder(response.json()["message"]) elif response.status_code == 403: - description = f"Folder creation {get_options(group, parent)}not authorized" + description = f"Folder creation in parent {parent} not authorized" raise AuthorizationError(description, response) else: description = f"Unable to create folder {name} under {parent}" diff --git a/fossology/jobs.py b/fossology/jobs.py index b8ceba2..6cb4225 100644 --- a/fossology/jobs.py +++ b/fossology/jobs.py @@ -5,8 +5,8 @@ import time import logging -from fossology.obj import Job, get_options -from fossology.exceptions import AuthorizationError, FossologyApiError +from .obj import Job +from .exceptions import FossologyApiError logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -15,7 +15,7 @@ class Jobs: """Class dedicated to all "jobs" related endpoints""" - def list_jobs(self, page_size=20, page=1, upload=None): + def list_jobs(self, page_size=20, pages=1, upload=None): """Get all available jobs API Endpoint: GET /jobs @@ -23,17 +23,17 @@ def list_jobs(self, page_size=20, page=1, upload=None): The answer is limited to the first page of 20 results by default :param page_size: the maximum number of results per page - :param page: the number of pages to be retrieved + :param pages: the number of pages to be retrieved :param upload: list only jobs of the given upload (default: None) :type page_size: int (default: 20) - :type page: int (default: 1) + :type pages: int (default: 1) :type upload: Upload :return: the jobs data :rtype: list of Job :raises FossologyApiError: if the REST call failed """ params = {} - headers = {"limit": str(page_size), "page": str(page)} + headers = {"limit": str(page_size), "pages": str(pages)} if upload: params["upload"] = upload.id response = self.session.get(f"{self.api}/jobs", params=params, headers=headers) @@ -82,7 +82,7 @@ def detail_job(self, job_id, wait=False, timeout=30): description = f"Error while getting details for job {job_id}" raise FossologyApiError(description, response) - def schedule_jobs(self, folder, upload, spec, group=None, wait=False, timeout=30): + def schedule_jobs(self, folder, upload, spec, wait=False, timeout=30): """Schedule jobs for a specific upload API Endpoint: POST /jobs @@ -121,42 +121,30 @@ def schedule_jobs(self, folder, upload, spec, group=None, wait=False, timeout=30 :param folder: the upload folder :param upload: the upload for which jobs will be scheduled :param spec: the job specification - :param group: the group name to choose while scheduling jobs (default: None) :param wait: wait for the scheduled job to finish (default: False) :param timeout: stop waiting after x seconds (default: 30) :type upload: Upload :type folder: Folder :type spec: dict - :type group: string :type wait: boolean :type timeout: 30 :return: the job id :rtype: Job :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user can't access the group """ headers = { "folderId": str(folder.id), "uploadId": str(upload.id), "Content-Type": "application/json", } - if group: - headers["groupName"] = group - response = self.session.post( f"{self.api}/jobs", headers=headers, data=json.dumps(spec) ) - if response.status_code == 201: detailled_job = self.detail_job( response.json()["message"], wait=wait, timeout=timeout ) return detailled_job - - elif response.status_code == 403: - description = f"Scheduling job {get_options(group)}not authorized" - raise AuthorizationError(description, response) - else: description = "Scheduling jobs for upload {upload.uploadname} failed" raise FossologyApiError(description, response) diff --git a/fossology/obj.py b/fossology/obj.py index 3882afb..15a0842 100644 --- a/fossology/obj.py +++ b/fossology/obj.py @@ -60,6 +60,16 @@ class ClearingStatus(Enum): REJECTED = "Rejected" +class LicenseAgent(Enum): + """Available license agents: NOMOS, MONK, NINKA, OJO, REPORTIMPORT""" + + NOMOS = "nomos" + MONK = "monk" + NINKA = "ninka" + OJO = "ojo" + REPORTIMPORT = "reportImport" + + class Agents(object): """FOSSology agents. @@ -423,12 +433,3 @@ def __str__(self): @classmethod def from_json(cls, json_dict): return cls(**json_dict) - - -def get_options(group: str = None, folder: Folder = None) -> str: - options = "" - if group: - options += f"for group {group} " - if folder: - options += f"in folder {folder.id} " - return options diff --git a/fossology/report.py b/fossology/report.py index 48ad436..7d528d7 100644 --- a/fossology/report.py +++ b/fossology/report.py @@ -6,8 +6,7 @@ import logging from tenacity import retry, TryAgain, stop_after_attempt, retry_if_exception_type -from fossology.exceptions import FossologyApiError, AuthorizationError -from fossology.obj import get_options +from .exceptions import FossologyApiError logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -17,52 +16,40 @@ class Report: """Class dedicated to all "report" related endpoints""" @retry(retry=retry_if_exception_type(TryAgain), stop=stop_after_attempt(3)) - def generate_report(self, upload, report_format=None, group=None): + def generate_report(self, upload, report_format=None): """Generate a report for a given upload API Endpoint: GET /report :param upload: the upload which report will be generated :param format: the report format (default: ReportFormat.READMEOSS) - :param group: the group name to choose while generating the report (default: None) :type upload: Upload :type format: ReportFormat - :type group: string :return: the report id :rtype: int :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user can't access the group """ headers = {"uploadId": str(upload.id)} if report_format: headers["reportFormat"] = report_format.value else: headers["reportFormat"] = "readmeoss" - if group: - headers["groupName"] = group response = self.session.get(f"{self.api}/report", headers=headers) - if response.status_code == 201: report_id = re.search("[0-9]*$", response.json()["message"]) return report_id[0] - - elif response.status_code == 403: - description = f"Generating report for upload {upload.id} {get_options(group)}not authorized" - raise AuthorizationError(description, response) - elif response.status_code == 503: wait_time = response.headers["Retry-After"] logger.debug(f"Retry generate report after {wait_time} seconds") time.sleep(int(wait_time)) raise TryAgain - else: description = f"Report generation for upload {upload.name} failed" raise FossologyApiError(description, response) @retry(retry=retry_if_exception_type(TryAgain), stop=stop_after_attempt(3)) - def download_report(self, report_id, as_zip=False, group=None): + def download_report(self, report_id, as_zip=False): """Download a report API Endpoint: GET /report/{id} @@ -81,30 +68,18 @@ def download_report(self, report_id, as_zip=False, group=None): :param report_id: the id of the generated report :param as_zip: control if the report should be generated as ZIP file (default: False) - :param group: the group name to choose while downloading a specific report (default: None) :type report_id: int :type as_zip: boolean - :type group: string :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user can't access the group """ if as_zip: headers = {"Accept": "application/zip"} else: headers = {"Accept": "text/plain"} - if group: - headers["groupName"] = group response = self.session.get(f"{self.api}/report/{report_id}", headers=headers) if response.status_code == 200: return response.text - - elif response.status_code == 403: - description = ( - f"Getting report {report_id} {get_options(group)}not authorized" - ) - raise AuthorizationError(description, response) - elif response.status_code == 503: wait_time = response.headers["Retry-After"] logger.debug(f"Retry get report after {wait_time} seconds") diff --git a/fossology/uploads.py b/fossology/uploads.py index 7a6c9d5..4ef8a8e 100644 --- a/fossology/uploads.py +++ b/fossology/uploads.py @@ -6,8 +6,8 @@ import logging from tenacity import retry, retry_if_exception_type, stop_after_attempt, TryAgain -from fossology.obj import Upload, Summary, get_options -from fossology.exceptions import AuthorizationError, FossologyApiError +from .obj import Upload, Summary +from .exceptions import FossologyApiError logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -18,33 +18,21 @@ class Uploads: # Retry until the unpack agent is finished @retry(retry=retry_if_exception_type(TryAgain), stop=stop_after_attempt(10)) - def detail_upload(self, upload_id, group=None): + def detail_upload(self, upload_id): """Get detailled information about an upload API Endpoint: GET /uploads/{id} :param upload_id: the id of the upload - :param group: the group the upload shall belong to :type: int - :type group: string :return: the upload data :rtype: Upload :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user can't access the group """ - headers = {} - if group: - headers["groupName"] = group - response = self.session.get(f"{self.api}/uploads/{upload_id}", headers=headers) - + response = self.session.get(f"{self.api}/uploads/{upload_id}") if response.status_code == 200: logger.debug(f"Got details for upload {upload_id}") return Upload.from_json(response.json()) - - elif response.status_code == 403: - description = f"Getting details for upload {upload_id} {get_options(group)}not authorized" - raise AuthorizationError(description, response) - elif response.status_code == 503: wait_time = response.headers["Retry-After"] logger.debug( @@ -52,7 +40,6 @@ def detail_upload(self, upload_id, group=None): ) time.sleep(int(wait_time)) raise TryAgain - else: description = f"Error while getting details for upload {upload_id}" raise FossologyApiError(description, response) @@ -135,7 +122,6 @@ def upload_file( # noqa: C901 :return: the upload data :rtype: Upload :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user can't access the group """ headers = {"folderId": str(folder.id)} if description: @@ -171,13 +157,6 @@ def upload_file( # noqa: C901 ) return - if file: - source = f"{file}" - elif vcs: - source = vcs.get("vcsName") - else: - source = url.get("name") - if response.status_code == 201: try: upload = self.detail_upload(response.json()["message"]) @@ -187,48 +166,30 @@ def upload_file( # noqa: C901 ) return upload except TryAgain: + if file: + source = f"{file}" + elif vcs: + source = vcs.get("vcsName") + else: + source = url.get("name") description = f"Upload of {source} failed" raise FossologyApiError(description, response) - elif response.status_code == 403: - description = ( - f"Upload of {source} {get_options(group, folder)}not authorized" - ) - raise AuthorizationError(description, response) - - else: - description = f"Upload {description} could not be performed" - raise FossologyApiError(description, response) - @retry(retry=retry_if_exception_type(TryAgain), stop=stop_after_attempt(3)) - def upload_summary(self, upload, group=None): + def upload_summary(self, upload): """Get clearing information about an upload API Endpoint: GET /uploads/{id}/summary :param upload: the upload to gather data from - :param group: the group name to chose while accessing an upload (default: None) :type: Upload - :type group: string :return: the upload summary data :rtype: Summary :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user can't access the group """ - headers = {} - if group: - headers["groupName"] = group - response = self.session.get( - f"{self.api}/uploads/{upload.id}/summary", headers=headers - ) - + response = self.session.get(f"{self.api}/uploads/{upload.id}/summary") if response.status_code == 200: return Summary.from_json(response.json()) - - elif response.status_code == 403: - description = f"Getting summary of upload {upload.id} {get_options(group)}not authorized" - raise AuthorizationError(description, response) - elif response.status_code == 503: logger.debug( f"Unpack agent for {upload.uploadname} (id={upload.id}) didn't start yet" @@ -240,59 +201,46 @@ def upload_summary(self, upload, group=None): raise FossologyApiError(description, response) @retry(retry=retry_if_exception_type(TryAgain), stop=stop_after_attempt(3)) - def upload_licenses(self, upload, agent=None, containers=False, group=None): + def upload_licenses(self, upload, agent=None, containers=False): """Get clearing information about an upload API Endpoint: GET /uploads/{id}/licenses :param upload: the upload to gather data from - :param agent: the license agents to use (e.g. "nomos,monk,ninka,ojo,reportImport", default: "nomos") + :param agent: the license agent to use (default: LicenseAgent.MONK) :param containers: wether to show containers or not (default: False) - :param group: the group name to chose while accessing the upload (default: None) :type upload: Upload - :type agent: string + :type agent: LicenseAgent :type containers: boolean - :type group: string :return: the licenses found by the specified agent :rtype: list of licenses as JSON object :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user can't access the group """ params = {} - headers = {} - if group: - headers["groupName"] = group if agent: - params["agent"] = agent + params["agent"] = agent.value else: - params["agent"] = agent = "nomos" + params["agent"] = "nomos" if containers: params["containers"] = "true" response = self.session.get( - f"{self.api}/uploads/{upload.id}/licenses", headers=headers, params=params + f"{self.api}/uploads/{upload.id}/licenses", params=params ) if response.status_code == 200: return response.json() - - elif response.status_code == 403: - description = f"Getting license for upload {upload.id} {get_options(group)}not authorized" - raise AuthorizationError(description, response) - elif response.status_code == 412: logger.info( - f"Unable to get licenses from {agent} for {upload.uploadname} (id={upload.id}): " + f"Agent {agent} has not been scheduled for {upload.uploadname} (id={upload.id}): " f"{response.json()['message']}" ) return - elif response.status_code == 503: logger.debug( f"Unpack agent for {upload.uploadname} (id={upload.id}) didn't start yet" ) time.sleep(3) raise TryAgain - else: description = f"No licenses for upload {upload.uploadname} (id={upload.id})" raise FossologyApiError(description, response) @@ -307,7 +255,6 @@ def delete_upload(self, upload, group=None): :type upload: Upload :type group: string :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user can't access the group """ headers = {} if group: @@ -315,70 +262,51 @@ def delete_upload(self, upload, group=None): response = self.session.delete( f"{self.api}/uploads/{upload.id}", headers=headers ) - if response.status_code == 202: logger.info(f"Upload {upload.id} has been scheduled for deletion") - - elif response.status_code == 403: - description = ( - f"Deleting upload {upload.id} {get_options(group)}not authorized" - ) - raise AuthorizationError(description, response) - else: description = f"Unable to delete upload {upload.id}" raise FossologyApiError(description, response) - def list_uploads( - self, folder=None, group=None, recursive=True, page_size=20, page=1 - ): + def list_uploads(self, folder=None, group=None, recursive=True, page=1): """Get all uploads available to the registered user API Endpoint: GET /uploads :param folder: only list uploads from the given folder - :param group: list uploads from a specific group (not only your own uploads) (default: None) + :param group: list uploads from a specific group (not only your own uploads) :param recursive: wether to list uploads from children folders or not (default: True) - :param page_size: limit the number of uploads per page (default: 20) :param page: the number of the page to fetch uploads from (default: 1) :type folder: Folder :type group: string :type recursive: boolean - :type page_size: int :type page: int :return: a list of uploads :rtype: list of Upload :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user can't access the group """ - params = {} - headers = {"limit": str(page_size), "page": str(page)} + headers = {} if group: headers["groupName"] = group + + url = f"{self.api}/uploads" + params = {} + if page != 1: + params["page"] = page if folder: params["folderId"] = folder.id if not recursive: params["recursive"] = "false" - response = self.session.get( - f"{self.api}/uploads", headers=headers, params=params - ) - + response = self.session.get(url, params=params) if response.status_code == 200: uploads_list = list() for upload in response.json(): uploads_list.append(Upload.from_json(upload)) logger.info( - f"Retrieved page {page} of uploads, {response.headers['X-TOTAL-PAGES']} pages are in total available" + f"Retrived page {page} of uploads, {response.headers['X-TOTAL-PAGES']} available" ) return uploads_list - - elif response.status_code == 403: - description = ( - f"Retrieving list of uploads {get_options(group, folder)}not authorized" - ) - raise AuthorizationError(description, response) - else: description = "Unable to retrieve the list of uploads" raise FossologyApiError(description, response) @@ -390,12 +318,11 @@ def move_upload(self, upload, folder, group=None): :param upload: the Upload to be copied in another folder :param folder: the destination Folder - :param group: the group name to chose while changing the upload (default: None) + :param group: the group name to chose while deleting the upload (default: None) :type upload: Upload :type folder: Folder :type group: string :raises FossologyApiError: if the REST call failed - :raises AuthorizationError: if the user can't access the group or folder """ headers = {"folderId": str(folder.id)} if group: @@ -403,16 +330,8 @@ def move_upload(self, upload, folder, group=None): response = self.session.patch( f"{self.api}/uploads/{upload.id}", headers=headers ) - if response.status_code == 202: logger.info(f"Upload {upload.uploadname} has been moved to {folder.name}") - - elif response.status_code == 403: - description = ( - f"Moving upload {upload.id} {get_options(group, folder)}not authorized" - ) - raise AuthorizationError(description, response) - else: description = f"Unable to move upload {upload.uploadname} to {folder.name}" raise FossologyApiError(description, response) @@ -430,14 +349,8 @@ def copy_upload(self, upload, folder): """ headers = {"folderId": str(folder.id)} response = self.session.put(f"{self.api}/uploads/{upload.id}", headers=headers) - if response.status_code == 202: logger.info(f"Upload {upload.uploadname} has been copied to {folder.name}") - - elif response.status_code == 403: - description = f"Copy upload {upload.id} {get_options(folder)}not authorized" - raise AuthorizationError(description, response) - else: description = f"Unable to copy upload {upload.uploadname} to {folder.name}" raise FossologyApiError(description, response) diff --git a/tests/test_folders.py b/tests/test_folders.py index 186cf73..25a4800 100644 --- a/tests/test_folders.py +++ b/tests/test_folders.py @@ -14,15 +14,6 @@ class TestFossologyFolders(unittest.TestCase): def test_create_folder(self): name = "FossPythonTest" desc = "Created via the Fossology Python API" - # Create folder for unknown group - with self.assertRaises(AuthorizationError) as cm: - foss.create_folder(foss.rootFolder, name, description=desc, group="test") - self.assertIn( - "Provided group:test does not exist (403)", - cm.exception.message, - "Exception message does not match requested group", - ) - test_folder = foss.create_folder(foss.rootFolder, name, description=desc) self.assertEqual( test_folder.name, name, f"Main test {name} folder couldn't be created" diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 3a60dfc..a59b4d9 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -5,7 +5,7 @@ from test_base import foss, logger from test_uploads import get_upload, do_upload, upload_filename -from fossology.exceptions import FossologyApiError, AuthorizationError +from fossology.exceptions import FossologyApiError from fossology.obj import Agents @@ -54,15 +54,6 @@ def test_schedule_jobs(self): }, } - # Create jobs for unknown group - with self.assertRaises(AuthorizationError) as cm: - foss.schedule_jobs(foss.rootFolder, test_upload, jobs_spec, group="test") - self.assertIn( - "Provided group:test does not exist (403)", - cm.exception.message, - "Exception message does not match requested group", - ) - try: job = foss.schedule_jobs(foss.rootFolder, test_upload, jobs_spec) self.assertEqual( @@ -83,17 +74,6 @@ def test_schedule_jobs(self): job = foss.detail_job(jobs[1].id, wait=True, timeout=30) self.assertEqual(job.status, "Completed", f"Job {job} not completed yet") - # Use pagination - jobs = foss.list_jobs(upload=test_upload, page_size=1, page=2) - self.assertEqual( - len(jobs), - 1, - f"Found {len(jobs)} jobs for upload {test_upload.uploadname}: {jobs}", - ) - self.assertEqual( - jobs[0].id, job.id, "Paginated list_jobs doesn't return the expected result" - ) - if __name__ == "__main__": unittest.main() diff --git a/tests/test_report.py b/tests/test_report.py index 395de1b..bcc894e 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -8,7 +8,7 @@ from pathlib import Path from test_base import foss, logger from test_uploads import get_upload, do_upload, upload_filename -from fossology.exceptions import FossologyApiError, AuthorizationError +from fossology.exceptions import FossologyApiError from fossology.obj import ReportFormat @@ -18,17 +18,6 @@ def test_generate_report(self): if not test_upload: test_upload = do_upload() - # Generate report for unknown group - with self.assertRaises(AuthorizationError) as cm: - foss.generate_report( - test_upload, report_format=ReportFormat.SPDX2, group="test" - ) - self.assertIn( - "Provided group:test does not exist (403)", - cm.exception.message, - "Exception message does not match requested group", - ) - try: report_id = foss.generate_report( test_upload, report_format=ReportFormat.SPDX2 @@ -37,46 +26,43 @@ def test_generate_report(self): logger.error(error.message) return - # Get report for unknown group - with self.assertRaises(AuthorizationError) as cm: - foss.download_report(report_id, group="test") - self.assertIn( - "Provided group:test does not exist (403)", - cm.exception.message, - "Exception message does not match requested group", - ) - - # Plain text - report = foss.download_report(report_id) - report_path = Path.cwd() / "tests/files" - report_name = upload_filename + ".spdx-report.rdf" - with open(report_path / report_name, "w+") as report_file: - report_file.write(report) + try: + # Plain text + report = foss.download_report(report_id) + report_path = Path.cwd() / "tests/files" + report_name = upload_filename + ".spdx-report.rdf" + with open(report_path / report_name, "w+") as report_file: + report_file.write(report) - filetype = mimetypes.guess_type(report_path / report_name) - report_stat = os.stat(report_path / report_name) - self.assertGreater(report_stat.st_size, 0, "Downloaded report is empty") - self.assertIn( - filetype[0], - ("application/rdf+xml", "application/xml"), - "Downloaded report is not a RDF/XML file", - ) - Path(report_path / report_name).unlink() + filetype = mimetypes.guess_type(report_path / report_name) + report_stat = os.stat(report_path / report_name) + self.assertGreater(report_stat.st_size, 0, "Downloaded report is empty") + self.assertIn( + filetype[0], + ("application/rdf+xml", "application/xml"), + "Downloaded report is not a RDF/XML file", + ) + Path(report_path / report_name).unlink() + except FossologyApiError as error: + logger.error(error.message) - # Zip - report = foss.download_report(report_id, as_zip=True) - report_path = Path.cwd() / "tests/files" - report_name = upload_filename + ".spdx-report.rdf.zip" - with open(report_path / report_name, "w+") as report_file: - report_file.write(report) + try: + # Zip + report = foss.download_report(report_id, as_zip=True) + report_path = Path.cwd() / "tests/files" + report_name = upload_filename + ".spdx-report.rdf.zip" + with open(report_path / report_name, "w+") as report_file: + report_file.write(report) - filetype = mimetypes.guess_type(report_path / report_name) - report_stat = os.stat(report_path / report_name) - self.assertGreater(report_stat.st_size, 0, "Downloaded report is empty") - self.assertEqual( - filetype[0], "application/zip", "Downloaded report is not a ZIP file" - ) - Path(report_path / report_name).unlink() + filetype = mimetypes.guess_type(report_path / report_name) + report_stat = os.stat(report_path / report_name) + self.assertGreater(report_stat.st_size, 0, "Downloaded report is empty") + self.assertEqual( + filetype[0], "application/zip", "Downloaded report is not a ZIP file" + ) + Path(report_path / report_name).unlink() + except FossologyApiError as error: + logger.error(error.message) if __name__ == "__main__": diff --git a/tests/test_search.py b/tests/test_search.py index 3903e9a..0891e37 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -5,20 +5,10 @@ from test_base import foss from fossology.obj import SearchTypes -from fossology.exceptions import AuthorizationError class TestFossologySearch(unittest.TestCase): def test_search(self): - # Search for unknown group - with self.assertRaises(AuthorizationError) as cm: - foss.search(searchType=SearchTypes.ALLFILES, filename="GPL%", group="test") - self.assertIn( - "Provided group:test does not exist (403)", - cm.exception.message, - "Exception message does not match requested group", - ) - search_result = foss.search(searchType=SearchTypes.ALLFILES, filename="GPL%") self.assertIsNotNone(search_result, "Couldn't search Fossology") search_result = foss.search( diff --git a/tests/test_uploads.py b/tests/test_uploads.py index c36be26..5163f8e 100644 --- a/tests/test_uploads.py +++ b/tests/test_uploads.py @@ -6,8 +6,8 @@ import unittest from test_base import foss, logger, test_files -from fossology.obj import AccessLevel, Folder, Upload, SearchTypes -from fossology.exceptions import AuthorizationError, FossologyApiError +from fossology.obj import AccessLevel, LicenseAgent, Folder, Upload, SearchTypes +from fossology.exceptions import FossologyApiError upload_filename = "base-files_11.tar.xz" @@ -56,21 +56,7 @@ def test_get_uploads(self): name = "FossPythonTestUploadsSubfolder" test_subfolder = foss.create_folder(test_folder, name, description=desc) test_file = f"{test_files}/{upload_filename}" - - # Upload file for unknown group - with self.assertRaises(AuthorizationError) as cm: - foss.upload_file( - test_folder, - file=test_file, - description="Test upload from github repository via python lib", - group="test", - ) - self.assertIn( - "Provided group:test does not exist (403)", - cm.exception.message, - "Exception message does not match requested group", - ) - + # FIXME: use group options once required features implemented foss.upload_file( test_folder, file=test_file, @@ -82,15 +68,8 @@ def test_get_uploads(self): description="Test upload from github repository via python lib", ) - # Get uploads from unknown group - with self.assertRaises(AuthorizationError) as cm: - foss.list_uploads(group="test") - self.assertIn( - "Provided group:test does not exist (403)", - cm.exception.message, - "Exception message does not match requested group", - ) - + # FIXME: using the group option does not filter uploads of groups + # group_uploads = foss.list_uploads(group="TestGroup") folder_uploads = foss.list_uploads(folder=test_folder) no_subfolder_uploads = foss.list_uploads(folder=test_folder, recursive=False) self.assertEqual(len(folder_uploads), 2, "Too many uploads listed") @@ -143,11 +122,9 @@ def test_upload_ignore_scm(self): search_result = foss.search( searchType=SearchTypes.DIRECTORIES, filename=".git", ) - self.assertEqual(search_result, [], "A .git directory has been found by search") - + print(search_result) # FIXME: shall be fixed in the next release # self.assertEqual(search_result, $something, "Search result should be empty") - foss.delete_upload(vcs_upload) def test_upload_from_url(self): @@ -188,42 +165,28 @@ def test_move_upload(self): move_folder = foss.create_folder( foss.rootFolder, "MoveUploadTest", "Test move upload function" ) - # Move upload for unknown group - with self.assertRaises(AuthorizationError) as cm: - foss.move_upload(test_upload, move_folder, group="test") - self.assertIn( - "Provided group:test does not exist (403)", - cm.exception.message, - "Exception message does not match requested group", - ) - foss.move_upload(test_upload, move_folder) moved_upload = foss.detail_upload(test_upload.id) self.assertEqual( moved_upload.folderid, move_folder.id, "Upload couldn't be moved" ) - # Move/Copy to arbitrary folder - non_folder = Folder(secrets.randbelow(1000), "Non folder", "", foss.rootFolder) - self.assertRaises(AuthorizationError, foss.move_upload, test_upload, non_folder) - self.assertRaises(AuthorizationError, foss.copy_upload, test_upload, non_folder) - - # FIXME: recursion due to https://github.com/fossology/fossology/pull/1748? foss.copy_upload(moved_upload, foss.rootFolder) list_uploads = foss.list_uploads() test_upload = None for upload in list_uploads: if upload.folderid == foss.rootFolder.id: test_upload = upload - if not test_upload: - logger.error("Coyping uploads didn't work") - else: - logger.info("Copying uploads works again, replace log output with assert") - # self.assertIsNotNone(test_upload, "Upload couldn't be copied") + self.assertIsNotNone(test_upload, "Upload couldn't be copied") # Clean up foss.delete_folder(move_folder) + # Move/Copy to arbitrary folder + non_folder = Folder(secrets.randbelow(1000), "Non folder", "", foss.rootFolder) + self.assertRaises(FossologyApiError, foss.move_upload, test_upload, non_folder) + self.assertRaises(FossologyApiError, foss.copy_upload, test_upload, non_folder) + def test_upload_summary(self): test_upload = get_upload() if not test_upload: @@ -233,15 +196,6 @@ def test_upload_summary(self): self.assertEqual(summary.clearingStatus, "Open") self.assertEqual(summary.mainLicense, None) - # Get upload summary for unknown group - with self.assertRaises(AuthorizationError) as cm: - foss.upload_summary(test_upload, group="test") - self.assertIn( - "Provided group:test does not exist (403)", - cm.exception.message, - "Exception message does not match requested group", - ) - def test_upload_licenses(self): test_upload = get_upload() if not test_upload: @@ -254,7 +208,7 @@ def test_upload_licenses(self): "Unexpected licenses were found for upload {test_upload.uploadname}", ) - # Get licenses with containers + # Get license with containers licenses = foss.upload_licenses(test_upload, containers=True) self.assertEqual( len(licenses), @@ -262,30 +216,13 @@ def test_upload_licenses(self): "Unexpected licenses were found for upload {test_upload.uploadname}", ) - # Get licenses from unscheduled agent 'ojo' - licenses = foss.upload_licenses(test_upload, agent="ojo") + # Get license from unscheduled agent 'ojo' + licenses = foss.upload_licenses(test_upload, agent=LicenseAgent.OJO) self.assertIsNone( licenses[0].get("agentFindings"), "Unexpected ojo licenses were found for upload {test_upload.uploadname}", ) - # Get licenses from another agent 'monk' - licenses = foss.upload_licenses(test_upload, agent="monk") - self.assertEqual( - len(licenses), - 23, - "Unexpected licenses were found for upload {test_upload.uploadname}", - ) - - # Get license from upload for unknown group - with self.assertRaises(AuthorizationError) as cm: - foss.upload_licenses(test_upload, group="test") - self.assertIn( - "Provided group:test does not exist (403)", - cm.exception.message, - "Exception message does not match requested group", - ) - def test_delete_unknown_upload(self): non_upload = Upload( foss.rootFolder, @@ -304,15 +241,6 @@ def test_delete_upload(self): if not test_upload: test_upload = do_upload() - # Delete upload for unknown group - with self.assertRaises(AuthorizationError) as cm: - foss.delete_upload(test_upload, group="test") - self.assertIn( - "Provided group:test does not exist (403)", - cm.exception.message, - "Exception message does not match requested group", - ) - foss.delete_upload(test_upload) verify_uploads = foss.list_uploads() From fb7e0440d5af57037f79b7afb40b64ca57596690 Mon Sep 17 00:00:00 2001 From: Marion Deveaud Date: Fri, 25 Sep 2020 14:11:18 +0200 Subject: [PATCH 2/3] Revert "feat(0.1.2): align with API version 1.0.17" This reverts commit a7dbf590529d28c343be83e943aaaad9b046a3ee. --- fossology/__init__.py | 4 +-- fossology/jobs.py | 8 +++-- fossology/report.py | 4 +-- fossology/uploads.py | 76 ++++++++++++++++--------------------------- tests/test_uploads.py | 61 +--------------------------------- tests/tests.py | 2 -- 6 files changed, 38 insertions(+), 117 deletions(-) diff --git a/fossology/__init__.py b/fossology/__init__.py index e37d73a..045c042 100644 --- a/fossology/__init__.py +++ b/fossology/__init__.py @@ -36,12 +36,12 @@ def fossology_token( :param password: the password of the user :param name: the name of the token :param scope: the scope of the token (default: READ) - :param expire: the expire date of the token (default: max. 30 days) + :param expire: the expire date of the token (default max. 30 days) :type url: string :type username: string :type password: string :type name: string - :type scope: TokenScope (default: TokenScope.READ) + :type scope: TokenScope (default TokenScope.READ) :type expire: string, e.g. 2019-12-25 :return: the new token :rtype: string diff --git a/fossology/jobs.py b/fossology/jobs.py index 6cb4225..91eceb8 100644 --- a/fossology/jobs.py +++ b/fossology/jobs.py @@ -32,11 +32,13 @@ def list_jobs(self, page_size=20, pages=1, upload=None): :rtype: list of Job :raises FossologyApiError: if the REST call failed """ - params = {} headers = {"limit": str(page_size), "pages": str(pages)} if upload: - params["upload"] = upload.id - response = self.session.get(f"{self.api}/jobs", params=params, headers=headers) + response = self.session.get( + f"{self.api}/jobs?upload={upload.id}", headers=headers + ) + else: + response = self.session.get(f"{self.api}/jobs", headers=headers) if response.status_code == 200: jobs_list = list() for job in response.json(): diff --git a/fossology/report.py b/fossology/report.py index 7d528d7..fdaeffd 100644 --- a/fossology/report.py +++ b/fossology/report.py @@ -22,7 +22,7 @@ def generate_report(self, upload, report_format=None): API Endpoint: GET /report :param upload: the upload which report will be generated - :param format: the report format (default: ReportFormat.READMEOSS) + :param format: the report format (default ReportFormat.READMEOSS) :type upload: Upload :type format: ReportFormat :return: the report id @@ -67,7 +67,7 @@ def download_report(self, report_id, as_zip=False): >>> report_file.write(report_content) :param report_id: the id of the generated report - :param as_zip: control if the report should be generated as ZIP file (default: False) + :param as_zip: control if the report should be generated as ZIP file (default False) :type report_id: int :type as_zip: boolean :raises FossologyApiError: if the REST call failed diff --git a/fossology/uploads.py b/fossology/uploads.py index 4ef8a8e..0219a9b 100644 --- a/fossology/uploads.py +++ b/fossology/uploads.py @@ -55,7 +55,7 @@ def upload_file( # noqa: C901 ignore_scm=False, group=None, ): - """Upload a package to FOSSology + """Upload a file to FOSSology API Endpoint: POST /uploads @@ -109,7 +109,7 @@ def upload_file( # noqa: C901 :param url: the URL specification to upload from a url :param description: description of the upload (default: None) :param access_level: access permissions of the upload (default: protected) - :param ignore_scm: ignore SCM files (Git, SVN, TFS) (default: True) + :param ignore_scm: ignore SCM files (Git, SVN, TFS) (default: False) :param group: the group name to chose while uploading the file (default: None) :type folder: Folder :type file: string @@ -129,7 +129,7 @@ def upload_file( # noqa: C901 if access_level: headers["public"] = access_level.value if ignore_scm: - headers["ignoreScm"] = "false" + headers["ignoreScm"] = "true" if group: headers["groupName"] = group @@ -140,21 +140,22 @@ def upload_file( # noqa: C901 response = self.session.post( f"{self.api}/uploads", files=files, headers=headers ) - elif vcs or url: - if vcs: - headers["uploadType"] = "vcs" - data = json.dumps(vcs) - else: - headers["uploadType"] = "url" - data = json.dumps(url) + elif vcs: + headers["uploadType"] = "vcs" + data = json.dumps(vcs) headers["Content-Type"] = "application/json" response = self.session.post( f"{self.api}/uploads", data=data, headers=headers ) - else: - logger.debug( - "Neither VCS, or Url or filename option given, not uploading anything" + elif url: + headers["uploadType"] = "url" + data = json.dumps(url) + headers["Content-Type"] = "application/json" + response = self.session.post( + f"{self.api}/uploads", data=data, headers=headers ) + else: + logger.debug("Neither VCS or File option given, not uploading anything") return if response.status_code == 201: @@ -216,16 +217,19 @@ def upload_licenses(self, upload, agent=None, containers=False): :rtype: list of licenses as JSON object :raises FossologyApiError: if the REST call failed """ - params = {} if agent: - params["agent"] = agent.value + agent = agent.value else: - params["agent"] = "nomos" + agent = "nomos" if containers: - params["containers"] = "true" - response = self.session.get( - f"{self.api}/uploads/{upload.id}/licenses", params=params - ) + containers = "true" + response = self.session.get( + f"{self.api}/uploads/{upload.id}/licenses?agent={agent}&containers={containers}" + ) + else: + response = self.session.get( + f"{self.api}/uploads/{upload.id}/licenses?agent={agent}" + ) if response.status_code == 200: return response.json() @@ -251,7 +255,7 @@ def delete_upload(self, upload, group=None): API Endpoint: DELETE /uploads/{id} :param upload: the upload to be deleted - :param group: the group name to chose while deleting the upload (default: None) + :param group: the group name to chose while deleting the upload (default None) :type upload: Upload :type group: string :raises FossologyApiError: if the REST call failed @@ -268,44 +272,20 @@ def delete_upload(self, upload, group=None): description = f"Unable to delete upload {upload.id}" raise FossologyApiError(description, response) - def list_uploads(self, folder=None, group=None, recursive=True, page=1): + def list_uploads(self): """Get all uploads available to the registered user API Endpoint: GET /uploads - :param folder: only list uploads from the given folder - :param group: list uploads from a specific group (not only your own uploads) - :param recursive: wether to list uploads from children folders or not (default: True) - :param page: the number of the page to fetch uploads from (default: 1) - :type folder: Folder - :type group: string - :type recursive: boolean - :type page: int :return: a list of uploads :rtype: list of Upload :raises FossologyApiError: if the REST call failed """ - headers = {} - if group: - headers["groupName"] = group - - url = f"{self.api}/uploads" - params = {} - if page != 1: - params["page"] = page - if folder: - params["folderId"] = folder.id - if not recursive: - params["recursive"] = "false" - - response = self.session.get(url, params=params) + response = self.session.get(f"{self.api}/uploads") if response.status_code == 200: uploads_list = list() for upload in response.json(): uploads_list.append(Upload.from_json(upload)) - logger.info( - f"Retrived page {page} of uploads, {response.headers['X-TOTAL-PAGES']} available" - ) return uploads_list else: description = "Unable to retrieve the list of uploads" @@ -318,7 +298,7 @@ def move_upload(self, upload, folder, group=None): :param upload: the Upload to be copied in another folder :param folder: the destination Folder - :param group: the group name to chose while deleting the upload (default: None) + :param group: the group name to chose while deleting the upload (default None) :type upload: Upload :type folder: Folder :type group: string diff --git a/tests/test_uploads.py b/tests/test_uploads.py index 5163f8e..130f635 100644 --- a/tests/test_uploads.py +++ b/tests/test_uploads.py @@ -6,7 +6,7 @@ import unittest from test_base import foss, logger, test_files -from fossology.obj import AccessLevel, LicenseAgent, Folder, Upload, SearchTypes +from fossology.obj import AccessLevel, LicenseAgent, Folder, Upload from fossology.exceptions import FossologyApiError upload_filename = "base-files_11.tar.xz" @@ -49,35 +49,6 @@ def test_upload_file(self): "Filesha1 on the server is wrong", ) - def test_get_uploads(self): - name = "FossPythonTestUploads" - desc = "Created via the Fossology Python API" - test_folder = foss.create_folder(foss.rootFolder, name, description=desc) - name = "FossPythonTestUploadsSubfolder" - test_subfolder = foss.create_folder(test_folder, name, description=desc) - test_file = f"{test_files}/{upload_filename}" - # FIXME: use group options once required features implemented - foss.upload_file( - test_folder, - file=test_file, - description="Test upload from github repository via python lib", - ) - foss.upload_file( - test_subfolder, - file=test_file, - description="Test upload from github repository via python lib", - ) - - # FIXME: using the group option does not filter uploads of groups - # group_uploads = foss.list_uploads(group="TestGroup") - folder_uploads = foss.list_uploads(folder=test_folder) - no_subfolder_uploads = foss.list_uploads(folder=test_folder, recursive=False) - self.assertEqual(len(folder_uploads), 2, "Too many uploads listed") - self.assertEqual(len(no_subfolder_uploads), 1, "Too many uploads listed") - - # Cleanup - foss.delete_folder(test_folder) - def test_upload_from_vcs(self): vcs = { "vcsType": "git", @@ -95,36 +66,6 @@ def test_upload_from_vcs(self): self.assertEqual( vcs_upload.uploadname, vcs["vcsName"], "Uploadname on the server is wrong", ) - search_result = foss.search( - searchType=SearchTypes.DIRECTORIES, filename=".git", - ) - self.assertEqual(search_result, [], "Search result should be empty") - foss.delete_upload(vcs_upload) - - def test_upload_ignore_scm(self): - vcs = { - "vcsType": "git", - "vcsUrl": "https://github.com/fossology/fossology-python", - "vcsName": "fossology-python-github-master", - "vcsUsername": "", - "vcsPassword": "", - } - vcs_upload = foss.upload_file( - foss.rootFolder, - vcs=vcs, - description="Test upload with ignore_scm flag", - ignore_scm=False, - access_level=AccessLevel.PUBLIC, - ) - self.assertEqual( - vcs_upload.uploadname, vcs["vcsName"], "Uploadname on the server is wrong", - ) - search_result = foss.search( - searchType=SearchTypes.DIRECTORIES, filename=".git", - ) - print(search_result) - # FIXME: shall be fixed in the next release - # self.assertEqual(search_result, $something, "Search result should be empty") foss.delete_upload(vcs_upload) def test_upload_from_url(self): diff --git a/tests/tests.py b/tests/tests.py index 6694611..1bf0242 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -20,9 +20,7 @@ def suite(): suite.addTest(TestFossologyFolders("test_move_folder")) suite.addTest(TestFossologyFolders("test_delete_folder")) suite.addTest(TestFossologyUploads("test_upload_file")) - suite.addTest(TestFossologyUploads("test_get_uploads")) suite.addTest(TestFossologyUploads("test_upload_from_vcs")) - suite.addTest(TestFossologyUploads("test_upload_ignore_scm")) suite.addTest(TestFossologyUploads("test_upload_from_url")) suite.addTest(TestFossologyUploads("test_empty_upload")) suite.addTest(TestFossologyUploads("test_move_upload")) From 0bf248d4eb928e2c870563b182a8b13cd249c901 Mon Sep 17 00:00:00 2001 From: Marion Deveaud Date: Fri, 25 Sep 2020 14:24:03 +0200 Subject: [PATCH 3/3] feat(release): patch version 0.1.3 with reverted commits --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 64a3a4e..83bce2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fossology" -version = "0.1.2" +version = "0.1.3" description = "A library to automate Fossology from Python scripts" authors = ["Marion Deveaud "] license = "MIT License"