Skip to content

Commit

Permalink
Merge pull request #69 from fossology/feat/license-endpoints
Browse files Browse the repository at this point in the history
Feat/license endpoints
  • Loading branch information
deveaud-m authored Jul 22, 2021
2 parents 9c55066 + 42a669d commit e95a7af
Show file tree
Hide file tree
Showing 4 changed files with 456 additions and 34 deletions.
2 changes: 2 additions & 0 deletions fossology/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ def schedule_jobs(self, folder, upload, spec, group=None, wait=False, timeout=30
>>> "reuse_group": 0,
>>> "reuse_main": True,
>>> "reuse_enhanced": True
>>> "reuse_report": True
>>> "reuse_copyright": True
>>> }
>>> }
Expand Down
203 changes: 185 additions & 18 deletions fossology/license.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,212 @@
# Copyright 2019-2021 Siemens AG
# SPDX-License-Identifier: MIT

import json
from json.decoder import JSONDecodeError
import logging

import fossology
from fossology.exceptions import FossologyApiError, FossologyUnsupported
from fossology.obj import License
from fossology.obj import License, LicenseType, Obligation

from urllib.parse import quote

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def check_empty_response(response) -> bool:
try:
message = response.json().get("message")
if message and message == "Can not exceed total pages: 0":
return True
except JSONDecodeError:
# No JSON response available
pass
return False


class LicenseEndpoint:
"""Class dedicated to all "license" related endpoints"""

def detail_license(self, name) -> list[License]:
"""Get a license from the DB
def list_licenses(
self,
active: bool = False,
kind: LicenseType = LicenseType.ALL,
page_size: int = 100,
page: int = 1,
all_pages: bool = False,
) -> list[License]:
"""Get all license from the DB
API Endpoint: GET /license
:param name: Short name of the license
:rtype name: str
:param active: list only active licenses
:param kind: list only licenses from type LicenseType
:param page_size: the maximum number of results per page
:param page: the number of pages to be retrieved
:param all_pages: get all licenses
:type active: bool (default: False)
:type kind: LicenseType (default: LicenseType.ALL)
:type page_size: int (default: 100)
:type page: int (default: 1)
:type all_pages: boolean (default: False)
:return: a list of licenses
:rtype: list of License objects
:rtype: list[License]
:raises FossologyApiError: if the REST call failed
"""
if fossology.versiontuple(self.version) < fossology.versiontuple("1.1.3"):
if fossology.versiontuple(self.version) < fossology.versiontuple("1.3.0"):
description = f"Endpoint /license is not supported by your Fossology API version {self.version}"
raise FossologyUnsupported(description)

headers = {"shortName": f"{name}"}
response = self.session.get(f"{self.api}/license", headers=headers)
if response.status_code == 200:
licenses = list()
if fossology.versiontuple(self.version) >= fossology.versiontuple("1.3.0"):
json_licenses = response.json()
for license in json_licenses:
licenses.append(License.from_json(license))
license_list = list()
headers = {"limit": str(page_size)}
if active:
headers["active"] = json.dumps(True)
if all_pages:
# will be reset after the total number of pages has been retrieved from the API
x_total_pages = 2
else:
x_total_pages = page
while page <= x_total_pages:
headers["page"] = str(page)
response = self.session.get(
f"{self.api}/license?kind={kind.value}", headers=headers
)
if response.status_code == 200:
for license in response.json():
license_list.append(License.from_json(license))
x_total_pages = int(response.headers.get("X-TOTAL-PAGES", 0))
if not all_pages or x_total_pages == 0:
logger.info(
f"Retrieved page {page} of license, {x_total_pages} pages are in total available"
)
return license_list, x_total_pages
page += 1
else:
licenses.append(License.from_json(response.json()))
return licenses
if check_empty_response(response):
return license_list, 0
description = (
f"Unable to retrieve the list of licenses from page {page}"
)
raise FossologyApiError(description, response)
logger.info(f"Retrieved all {x_total_pages} pages of licenses")
return license_list, x_total_pages

def detail_license(
self, shortname, group=None
) -> tuple[int, License, list[Obligation]]:
"""Get a license from the DB
API Endpoint: GET /license/{shortname}
:param shortname: Short name of the license
:param group: the group this license belongs to
:rtype name: str
:rtype group: int (default: None)
:return: the license id, the license data and the associated obligations
:rtype: tuple(int, License, list[Obligation])
:raises FossologyApiError: if the REST call failed
"""
if fossology.versiontuple(self.version) < fossology.versiontuple("1.3.0"):
description = (
f"Endpoint /license/{shortname} is not supported by your API version ",
f"{self.version}",
)
raise FossologyUnsupported(description)

headers = dict()
if group:
headers["groupName"] = group
response = self.session.get(
f"{self.api}/license/{quote(shortname)}", headers=headers
)
if response.status_code == 200:
return License.from_json(response.json())
elif response.status_code == 404:
description = f"License {shortname} not found"
raise FossologyApiError(description, response)
else:
description = f"Error while getting license {shortname}"
raise FossologyApiError(description, response)

def add_license(self, license: License, merge_request: bool = False):
"""Add a new license to the DB
API Endpoint: POST /license
License data are added to the request body, here is an example:
>>> new_license = License(
>>> "GPL-1.0",
>>> "GNU General Public License 1.0",
>>> "Text of the license...",
>>> "http://www.gnu.org/licenses/gpl-1.0.txt",
>>> "red"
>>> "false"
>>> )
>>> foss.add_license(new_license, merge_request=True)
:param license: the license data
:param merge_request: open a merge request for the license candidate?
:type license: License
:type merge_request: bool (default: False)
:raises FossologyApiError: if the REST call failed
"""
headers = {"Content-Type": "application/json"}
license_data = license.to_dict()
if merge_request:
license_data["mergeRequest"] = json.dumps(True)
response = self.session.post(
f"{self.api}/license", headers=headers, data=json.dumps(license_data)
)
if response.status_code == 201:
logger.info(f"License {license.shortName} has been added to the DB")
elif response.status_code == 409:
logger.info(f"License {license.shortName} already exists")
else:
description = f"Error while adding new license {license.shortName}"
raise FossologyApiError(description, response)

def update_license(
self,
shortname,
fullname: str = "",
text: str = "",
url: str = "",
risk: int = 2,
):
"""Update a license
API Endpoint: PATCH /license/{shortname}
:param shortName: the short name of the license to be updated
:param fullName: the new fullName of the license (optional)
:param text: the new text of the license (optional)
:param url: the new url of the license (optional)
:param risk: the new risk of the license (optional)
:type shortName: str
:type fullName: str
:type text: str
:type url: str
:type risk: int (default: 2)
:raises FossologyApiError: if the REST call failed
"""
headers = {"Content-Type": "application/json"}
license_data = {
"fullName": fullname,
"text": text,
"url": url,
"risk": str(risk),
}
response = self.session.patch(
f"{self.api}/license/{quote(shortname)}",
data=json.dumps(license_data),
headers=headers,
)
if response.status_code == 200:
logger.info(f"License {shortname} has been updated")
return
else:
description = f"Unable to get license {name}"
description = f"Unable to update license {shortname}"
raise FossologyApiError(description, response)
98 changes: 94 additions & 4 deletions fossology/obj.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,25 @@ class ClearingStatus(Enum):
REJECTED = "Rejected"


class LicenseType(Enum):
"""Clearing statuses
"""

CANDIDATE = "candidate"
MAIN = "main"
ALL = "all"


class ObligationClass(Enum):
"""Classification of an obligation
"""

GREEN = "green"
WHITE = "white"
YELLOW = "yellow"
RED = "red"


class Agents(object):

"""FOSSology agents.
Expand Down Expand Up @@ -312,30 +331,101 @@ class License(object):
Represents a FOSSology license.
:param id: the ID of the license
:param shortName: the short name of the license
:param fullName: the full name of the license
:param text: the text of the license
:param url: URL of the license text
:param risk: the risk level of the license
:param isCandidate: is the license a candidate?
:param kwargs: handle any other folder information provided by the fossology instance
:type id: int
:type shortName: string
:type fullName: string
:type text: string
:type url: string
:type risk: int
:type isCandidate: bool
:type kwargs: key word argument
"""

def __init__(self, id, shortName, fullName, text, risk, **kwargs):
def __init__(
self, shortName, fullName, text, url, risk, isCandidate, id=None, **kwargs
):
self.id = id
self.shortName = shortName
self.fullName = fullName
self.text = text
self.url = url
self.risk = risk
self.isCandidate = isCandidate
self.additional_info = kwargs

def __str__(self):
license_type = "License"
if self.isCandidate:
license_type = "Candidate license"
return f"{license_type} {self.fullName} - {self.shortName} ({self.id}) with risk level {self.risk}"

def to_dict(self):
"""Get a directory with the license data
:return: the license data
:rtype: dict
"""
return {
"shortName": self.shortName,
"fullName": self.fullName,
"text": self.text,
"url": self.url,
"risk": self.risk,
"isCandidate": self.isCandidate,
}

@classmethod
def from_json(cls, json_dict):
return cls(**json_dict)

def to_json(self) -> str:
"""Get a JSON object with the license data
:return: the license data
:rtype: JSON
"""
return json.dumps(self.to_dict())


class Obligation(object):

"""FOSSology license obligation.
Represents a FOSSology license obligation.
:param id: the ID of the obligation
:param topic: the topic of the obligation
:param type: the type of the obligation
:param text: the text of the obligation
:param classification: level of attention it should raise in the clearing process
:param comment: comment for the obligation
:param kwargs: handle any other folder information provided by the fossology instance
:type id: int
:type topic: string
:type type: string
:type text: string
:type classification: string
:type comment: string
:type kwargs: key word argument
"""

def __init__(self, id, topic, type, text, classification, comment, **kwargs):
self.id = id
self.topic = topic
self.type = type
self.text = text
self.classification = classification
self.comment = comment
self.additional_info = kwargs

def __str__(self):
return f"License {self.fullName} - {self.shortName} ({self.id}) with risk level {self.risk}"
return f"Obligation {self.topic}, {self.type} ({self.id}) is classified {self.classification}"

@classmethod
def from_json(cls, json_dict):
Expand Down
Loading

0 comments on commit e95a7af

Please sign in to comment.