From 94e96d95e45646743408df05cb179b4491b37d99 Mon Sep 17 00:00:00 2001 From: Marion Deveaud Date: Wed, 21 Jul 2021 17:04:50 +0200 Subject: [PATCH] feat(license): update license endpoints Only support endpoint starting from version 1.3.0 Support GET/POST/PATCH inclusive listing all licenses using 'all_pages' argument. --- fossology/license.py | 189 ++++++++++++++++++++++++++++++++++++++---- fossology/obj.py | 98 +++++++++++++++++++++- tests/test_license.py | 187 ++++++++++++++++++++++++++++++++++++++--- 3 files changed, 440 insertions(+), 34 deletions(-) diff --git a/fossology/license.py b/fossology/license.py index 0150a63..79fc98b 100644 --- a/fossology/license.py +++ b/fossology/license.py @@ -1,11 +1,14 @@ # Copyright 2019-2021 Siemens AG # SPDX-License-Identifier: MIT +import json import logging import fossology from fossology.exceptions import FossologyApiError, FossologyUnsupported -from fossology.obj import License +from fossology.obj import License, LicenseType, Obligation + +from urllib.parse import quote logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -14,32 +17,182 @@ class LicenseEndpoint: """Class dedicated to all "license" related endpoints""" - def detail_license(self, name) -> list[License]: - """Get a license from the DB + def list_licenses( + self, + active: bool = False, + kind: LicenseType = LicenseType.ALL, + page_size: int = 100, + page: int = 1, + all_pages: bool = False, + ) -> list[License]: + """Get all license from the DB API Endpoint: GET /license - :param name: Short name of the license - :rtype name: str + :param active: list only active licenses + :param kind: list only licenses from type LicenseType + :param page_size: the maximum number of results per page + :param page: the number of pages to be retrieved + :param all_pages: get all licenses + :type active: bool (default: False) + :type kind: LicenseType (default: LicenseType.ALL) + :type page_size: int (default: 100) + :type page: int (default: 1) + :type all_pages: boolean (default: False) :return: a list of licenses - :rtype: list of License objects + :rtype: list[License] :raises FossologyApiError: if the REST call failed """ - if fossology.versiontuple(self.version) < fossology.versiontuple("1.1.3"): + if fossology.versiontuple(self.version) < fossology.versiontuple("1.3.0"): description = f"Endpoint /license is not supported by your Fossology API version {self.version}" raise FossologyUnsupported(description) - headers = {"shortName": f"{name}"} - response = self.session.get(f"{self.api}/license", headers=headers) - if response.status_code == 200: - licenses = list() - if fossology.versiontuple(self.version) >= fossology.versiontuple("1.3.0"): - json_licenses = response.json() - for license in json_licenses: - licenses.append(License.from_json(license)) + license_list = list() + headers = {"limit": str(page_size)} + if active: + headers["active"] = json.dumps(True) + if kind: + headers["kind"] = kind.value + if all_pages: + # will be reset after the total number of pages has been retrieved from the API + x_total_pages = 2 + else: + x_total_pages = page + while page <= x_total_pages: + headers["page"] = str(page) + response = self.session.get(f"{self.api}/license", headers=headers) + if response.status_code == 200: + for license in response.json(): + license_list.append(License.from_json(license)) + x_total_pages = int(response.headers.get("X-TOTAL-PAGES", 0)) + if not all_pages or x_total_pages == 0: + logger.info( + f"Retrieved page {page} of license, {x_total_pages} pages are in total available" + ) + return license_list, x_total_pages + page += 1 else: - licenses.append(License.from_json(response.json())) - return licenses + description = ( + f"Unable to retrieve the list of licenses from page {page}" + ) + raise FossologyApiError(description, response) + logger.info(f"Retrieved all {x_total_pages} pages of licenses") + return license_list, x_total_pages + + def detail_license( + self, shortname, group=None + ) -> tuple[int, License, list[Obligation]]: + """Get a license from the DB + + API Endpoint: GET /license/{shortname} + + :param shortname: Short name of the license + :param group: the group this license belongs to + :rtype name: str + :rtype group: int (default: None) + :return: the license id, the license data and the associated obligations + :rtype: tuple(int, License, list[Obligation]) + :raises FossologyApiError: if the REST call failed + """ + if fossology.versiontuple(self.version) < fossology.versiontuple("1.3.0"): + description = ( + f"Endpoint /license/{shortname} is not supported by your API version ", + f"{self.version}", + ) + raise FossologyUnsupported(description) + + headers = dict() + if group: + headers["groupName"] = group + response = self.session.get( + f"{self.api}/license/{quote(shortname)}", headers=headers + ) + if response.status_code == 200: + return License.from_json(response.json()) + elif response.status_code == 404: + description = f"License {shortname} not found" + raise FossologyApiError(description, response) + else: + description = f"Error while getting license {shortname}" + raise FossologyApiError(description, response) + + def add_license(self, license: License, merge_request: bool = False): + """Add a new license to the DB + + API Endpoint: POST /license + + License data are added to the request body, here is an example: + + >>> new_license = License( + >>> "GPL-1.0", + >>> "GNU General Public License 1.0", + >>> "Text of the license...", + >>> "http://www.gnu.org/licenses/gpl-1.0.txt", + >>> "red" + >>> "false" + >>> ) + >>> foss.add_license(new_license, merge_request=True) + + :param license: the license data + :param merge_request: open a merge request for the license candidate? + :type license: License + :type merge_request: bool (default: False) + :raises FossologyApiError: if the REST call failed + """ + headers = {"Content-Type": "application/json"} + license_data = license.to_dict() + if merge_request: + license_data["mergeRequest"] = json.dumps(True) + response = self.session.post( + f"{self.api}/license", headers=headers, data=json.dumps(license_data) + ) + if response.status_code == 201: + logger.info(f"License {license.shortName} has been added to the DB") + elif response.status_code == 409: + logger.info(f"License {license.shortName} already exists") + else: + description = f"Error while adding new license {license.shortName}" + raise FossologyApiError(description, response) + + def update_license( + self, + shortname, + fullname: str = "", + text: str = "", + url: str = "", + risk: int = 2, + ): + """Update a license + + API Endpoint: PATCH /license/{shortname} + + :param shortName: the short name of the license to be updated + :param fullName: the new fullName of the license (optional) + :param text: the new text of the license (optional) + :param url: the new url of the license (optional) + :param risk: the new risk of the license (optional) + :type shortName: str + :type fullName: str + :type text: str + :type url: str + :type risk: int (default: 2) + :raises FossologyApiError: if the REST call failed + """ + headers = {"Content-Type": "application/json"} + license_data = { + "fullName": fullname, + "text": text, + "url": url, + "risk": str(risk), + } + response = self.session.patch( + f"{self.api}/license/{quote(shortname)}", + data=json.dumps(license_data), + headers=headers, + ) + if response.status_code == 200: + logger.info(f"License {shortname} has been updated") + return else: - description = f"Unable to get license {name}" + description = f"Unable to update license {shortname}" raise FossologyApiError(description, response) diff --git a/fossology/obj.py b/fossology/obj.py index 23f040d..efc260b 100644 --- a/fossology/obj.py +++ b/fossology/obj.py @@ -60,6 +60,25 @@ class ClearingStatus(Enum): REJECTED = "Rejected" +class LicenseType(Enum): + """Clearing statuses + """ + + CANDIDATE = "candidate" + MAIN = "main" + ALL = "all" + + +class ObligationClass(Enum): + """Classification of an obligation + """ + + GREEN = "green" + WHITE = "white" + YELLOW = "yellow" + RED = "red" + + class Agents(object): """FOSSology agents. @@ -312,30 +331,101 @@ class License(object): Represents a FOSSology license. - :param id: the ID of the license :param shortName: the short name of the license :param fullName: the full name of the license :param text: the text of the license + :param url: URL of the license text :param risk: the risk level of the license + :param isCandidate: is the license a candidate? :param kwargs: handle any other folder information provided by the fossology instance - :type id: int :type shortName: string :type fullName: string :type text: string + :type url: string :type risk: int + :type isCandidate: bool :type kwargs: key word argument """ - def __init__(self, id, shortName, fullName, text, risk, **kwargs): + def __init__( + self, shortName, fullName, text, url, risk, isCandidate, id=None, **kwargs + ): self.id = id self.shortName = shortName self.fullName = fullName self.text = text + self.url = url self.risk = risk + self.isCandidate = isCandidate + self.additional_info = kwargs + + def __str__(self): + license_type = "License" + if self.isCandidate: + license_type = "Candidate license" + return f"{license_type} {self.fullName} - {self.shortName} ({self.id}) with risk level {self.risk}" + + def to_dict(self): + """Get a directory with the license data + + :return: the license data + :rtype: dict + """ + return { + "shortName": self.shortName, + "fullName": self.fullName, + "text": self.text, + "url": self.url, + "risk": self.risk, + "isCandidate": self.isCandidate, + } + + @classmethod + def from_json(cls, json_dict): + return cls(**json_dict) + + def to_json(self) -> str: + """Get a JSON object with the license data + + :return: the license data + :rtype: JSON + """ + return json.dumps(self.to_dict()) + + +class Obligation(object): + + """FOSSology license obligation. + + Represents a FOSSology license obligation. + + :param id: the ID of the obligation + :param topic: the topic of the obligation + :param type: the type of the obligation + :param text: the text of the obligation + :param classification: level of attention it should raise in the clearing process + :param comment: comment for the obligation + :param kwargs: handle any other folder information provided by the fossology instance + :type id: int + :type topic: string + :type type: string + :type text: string + :type classification: string + :type comment: string + :type kwargs: key word argument + """ + + def __init__(self, id, topic, type, text, classification, comment, **kwargs): + self.id = id + self.topic = topic + self.type = type + self.text = text + self.classification = classification + self.comment = comment self.additional_info = kwargs def __str__(self): - return f"License {self.fullName} - {self.shortName} ({self.id}) with risk level {self.risk}" + return f"Obligation {self.topic}, {self.type} ({self.id}) is classified {self.classification}" @classmethod def from_json(cls, json_dict): diff --git a/tests/test_license.py b/tests/test_license.py index cf5fc52..3666645 100644 --- a/tests/test_license.py +++ b/tests/test_license.py @@ -1,42 +1,205 @@ # Copyright 2019-2021 Siemens AG # SPDX-License-Identifier: MIT +from unittest.mock import MagicMock import pytest import responses import fossology from fossology.exceptions import FossologyApiError, FossologyUnsupported -from fossology.obj import License +from fossology.obj import License, LicenseType, Obligation, ObligationClass -short = "GPL-2.0+" +shortname = "GPL-2.0+" + + +@pytest.fixture() +def test_license(): + shortname = "License-1.0" + fullname = "Open source license 1.0" + text = "This is the text for license 1.0" + url = "https://licenses.org/license1.txt" + risk = 2 + return License(shortname, fullname, text, url, risk, False) + + +@pytest.fixture() +def test_another_license(): + shortname = "License-2.0" + fullname = "Open source license 2.0" + text = "This is the text for license 2.0" + url = "https://licenses.org/license2.txt" + risk = 3 + return License(shortname, fullname, text, url, risk, False) @responses.activate def test_detail_license_error(foss_server: str, foss: fossology.Fossology): - if fossology.versiontuple(foss.version) < fossology.versiontuple("1.1.3"): + if fossology.versiontuple(foss.version) < fossology.versiontuple("1.3.0"): with pytest.raises(FossologyUnsupported) as excinfo: - foss.detail_license(short) + foss.detail_license(shortname) assert ( "Endpoint /license is not supported by your Fossology API version" in str(excinfo.value) ) else: - responses.add(responses.GET, f"{foss_server}/api/v1/license", status=500) + responses.add(responses.GET, f"{foss_server}/api/v1/license/Blah", status=500) + with pytest.raises(FossologyApiError) as excinfo: + foss.detail_license("Blah") + assert "Error while getting license Blah" in str(excinfo.value) + + +def test_detail_license_not_found(foss: fossology.Fossology): + if fossology.versiontuple(foss.version) >= fossology.versiontuple("1.3.0"): with pytest.raises(FossologyApiError) as excinfo: - foss.detail_license(short) - assert f"Unable to get license {short}" in str(excinfo.value) + foss.detail_license("Unknown") + assert "License Unknown not found" in str(excinfo.value) def test_detail_license(foss: fossology.Fossology): - if fossology.versiontuple(foss.version) < fossology.versiontuple("1.1.3"): + if fossology.versiontuple(foss.version) < fossology.versiontuple("1.3.0"): with pytest.raises(FossologyUnsupported) as excinfo: - foss.detail_license(short) + foss.detail_license(shortname) assert ( "Endpoint /license is not supported by your Fossology API version" in str(excinfo.value) ) else: - license = foss.detail_license(short) + license = foss.detail_license(shortname, group="fossy") assert license - if fossology.versiontuple(foss.version) >= fossology.versiontuple("1.3.0"): - assert type(license[0]) == License + assert type(license) == License + + +@responses.activate +def test_list_licenses_error(foss_server: str, foss: fossology.Fossology): + if fossology.versiontuple(foss.version) < fossology.versiontuple("1.3.0"): + with pytest.raises(FossologyUnsupported) as excinfo: + foss.detail_license(shortname) + assert ( + "Endpoint /license is not supported by your Fossology API version" + in str(excinfo.value) + ) + else: + responses.add(responses.GET, f"{foss_server}/api/v1/license", status=500) + with pytest.raises(FossologyApiError) as excinfo: + foss.list_licenses() + assert "Unable to retrieve the list of licenses from page 1" in str( + excinfo.value + ) + + +def test_get_all_licenses(foss: fossology.Fossology): + if fossology.versiontuple(foss.version) >= fossology.versiontuple("1.3.0"): + licenses, num_pages = foss.list_licenses(active=True, all_pages=True) + assert licenses + assert num_pages + + +def test_get_all_licenses_first_page(foss: fossology.Fossology): + if fossology.versiontuple(foss.version) >= fossology.versiontuple("1.3.0"): + licenses, num_pages = foss.list_licenses() + assert len(licenses) == 100 + assert num_pages + + +def test_get_all_candidate_licenses(foss: fossology.Fossology): + if fossology.versiontuple(foss.version) >= fossology.versiontuple("1.3.0"): + licenses, _ = foss.list_licenses(kind=LicenseType.CANDIDATE) + # FIXME: are candidate licenses properly filtered by the API? + assert licenses + + +@responses.activate +def test_add_license_error( + foss_server: str, foss: fossology.Fossology, test_license: License +): + if fossology.versiontuple(foss.version) >= fossology.versiontuple("1.3.0"): + responses.add(responses.POST, f"{foss_server}/api/v1/license", status=500) + with pytest.raises(FossologyApiError) as excinfo: + foss.add_license(test_license) + assert f"Error while adding new license {test_license.shortName}" in str( + excinfo.value + ) + + +@responses.activate +def test_add_license_already_exists( + foss_server: str, foss: fossology.Fossology, monkeypatch, test_license: License, +): + if fossology.versiontuple(foss.version) >= fossology.versiontuple("1.3.0"): + mocked_logger = MagicMock() + monkeypatch.setattr("fossology.license.logger", mocked_logger) + responses.add(responses.POST, f"{foss_server}/api/v1/license", status=409) + foss.add_license(test_license) + mocked_logger.info.assert_called_once() + + +def test_add_license_and_get_by_shortname( + foss: fossology.Fossology, test_license: License, monkeypatch +): + if fossology.versiontuple(foss.version) >= fossology.versiontuple("1.3.0"): + mocked_logger = MagicMock() + monkeypatch.setattr("fossology.license.logger", mocked_logger) + foss.add_license(test_license) + license_found = foss.detail_license(test_license.shortName) + assert license_found.shortName == "License-1.0" + expected_license_repr = ( + f"License {license_found.fullName} - {license_found.shortName} " + ) + expected_license_repr += ( + f"({license_found.id}) with risk level {license_found.risk}" + ) + assert str(license_found) == expected_license_repr + + foss.add_license(test_license, merge_request=True) + mocked_logger.info.assert_called_with( + f"License {test_license.shortName} already exists" + ) + + +@responses.activate +def test_patch_license_error( + foss_server: str, foss: fossology.Fossology, test_license: License +): + if fossology.versiontuple(foss.version) >= fossology.versiontuple("1.3.0"): + responses.add( + responses.PATCH, f"{foss_server}/api/v1/license/License-1.0", status=500 + ) + with pytest.raises(FossologyApiError) as excinfo: + foss.update_license(test_license.shortName) + assert f"Unable to update license {test_license.shortName}" in str( + excinfo.value + ) + + +def test_patch_license_and_get_by_shortname( + foss: fossology.Fossology, test_another_license: License +): + if fossology.versiontuple(foss.version) >= fossology.versiontuple("1.3.0"): + foss.add_license(test_another_license) + foss.update_license( + test_another_license.shortName, fullname="Inner Source license 2.0", risk=1 + ) + license_found = foss.detail_license(test_another_license.shortName) + assert license_found.shortName == "License-2.0" + assert license_found.fullName == "Inner Source license 2.0" + assert license_found.risk == 1 + + +def test_license_to_json(test_license: License): + json_license = test_license.to_json() + assert type(json_license) == str + + +def test_obligation_object(): + obligation = Obligation( + 1, + "do not modify", + "Obligation", + "do not modify files licensed under this license", + ObligationClass.YELLOW.value, + "", + ) + expected_obligation_repr = ( + "Obligation do not modify, Obligation (1) is classified yellow" + ) + assert str(obligation) == expected_obligation_repr