From ec8155f5f85325df55558b5e66713f7dba5adf3d Mon Sep 17 00:00:00 2001 From: Marion Deveaud Date: Fri, 3 Nov 2023 23:32:37 +0100 Subject: [PATCH] feat(clearing): add bulk clearing endpoints, refactor objects --- fossology/__init__.py | 3 +- fossology/enums.py | 197 +++++++++++++++++++++++ fossology/foss_cli.py | 3 +- fossology/groups.py | 3 +- fossology/license.py | 3 +- fossology/obj.py | 285 ++++++++++++++++------------------ fossology/report.py | 3 +- fossology/search.py | 5 +- fossology/uploads.py | 174 ++++++++++++++++++++- tests/conftest.py | 24 ++- tests/test_groups.py | 3 +- tests/test_jobs.py | 3 +- tests/test_license.py | 7 +- tests/test_report.py | 3 +- tests/test_search.py | 3 +- tests/test_upload_clearing.py | 160 +++++++++++++++++++ tests/test_upload_from.py | 3 +- tests/test_uploads.py | 3 +- 18 files changed, 713 insertions(+), 172 deletions(-) create mode 100644 fossology/enums.py create mode 100644 tests/test_upload_clearing.py diff --git a/fossology/__init__.py b/fossology/__init__.py index a186516..a1567fa 100644 --- a/fossology/__init__.py +++ b/fossology/__init__.py @@ -6,12 +6,13 @@ import requests +from fossology.enums import TokenScope from fossology.exceptions import AuthenticationError, FossologyApiError from fossology.folders import Folders from fossology.groups import Groups from fossology.jobs import Jobs from fossology.license import LicenseEndpoint -from fossology.obj import Agents, ApiInfo, HealthInfo, TokenScope, User +from fossology.obj import Agents, ApiInfo, HealthInfo, User from fossology.report import Report from fossology.search import Search from fossology.uploads import Uploads diff --git a/fossology/enums.py b/fossology/enums.py new file mode 100644 index 0000000..19a0e47 --- /dev/null +++ b/fossology/enums.py @@ -0,0 +1,197 @@ +# Copyright 2023 Siemens AG +# SPDX-License-Identifier: MIT + +from enum import Enum + + +class AccessLevel(Enum): + """Available access levels for uploads: + + PRIVATE + PROTECTED + PUBLIC + + """ + + PRIVATE = "private" + PROTECTED = "protected" + PUBLIC = "public" + + +class ReportFormat(Enum): + """Available report format: + + DEP5 + SPDX2 + SPDX2TV + READMEOSS + UNIFIEDREPORT + + """ + + DEP5 = "dep5" + SPDX2 = "spdx2" + SPDX2TV = "spdx2tv" + READMEOSS = "readmeoss" + UNIFIEDREPORT = "unifiedreport" + + +class SearchTypes(Enum): + """Type of item that can be searched: + + ALLFILES + CONTAINERS + DIRECTORY + + """ + + ALLFILES = "allfiles" + CONTAINERS = "containers" + DIRECTORY = "directory" + + +class TokenScope(Enum): + """Scope for API tokens: + + READ: Read only access, limited only to "GET" calls + + WRITE: Read/Write access, required for calls other than "GET" + + """ + + READ = "read" + WRITE = "write" + + +class ClearingStatus(Enum): + """Clearing statuses: + + OPEN + INPROGRESS + CLOSED + REJECTED + + """ + + OPEN = "Open" + INPROGRESS = "InProgress" + CLOSED = "Closed" + REJECTED = "Rejected" + + +class JobStatus(Enum): + """Job statuses: + + COMPLETED + FAILED + QUEUED + PROCESSING + + """ + + COMPLETED = "Completed" + FAILED = "Failed" + QUEUED = "Queued" + PROCESSING = "Processing" + + +class LicenseType(Enum): + """License types: + + CANDIDATE + MAIN + ALL + + """ + + CANDIDATE = "candidate" + MAIN = "main" + ALL = "all" + + +class ObligationClass(Enum): + """Classification of an obligation: + + GREEN + WHITE + YELLOW + RED + + """ + + GREEN = "green" + WHITE = "white" + YELLOW = "yellow" + RED = "red" + + +class MemberPerm(Enum): + """Group member permissions: + + USER + ADMIN + ADVISOR + + """ + + USER = 0 + ADMIN = 1 + ADVISOR = 2 + + +class Permission(Enum): + """Upload or group permissions: + + NONE + READ_ONLY + READ_WRITE + CLEARING_ADMIN + ADMIN + """ + + NONE = "0" + READ_ONLY = "1" + READ_WRITE = "3" + CLEARING_ADMIN = "5" + ADMIN = "10" + + +class ClearingScope(Enum): + """Scope of the clearing: + + LOCAL + PACKAGE + GLOBAL + """ + + LOCAL = "local" + PACKAGE = "package" + GLOBAL = "global" + + +class ClearingType(Enum): + """Type of the clearing: + + TO_BE_DISCUSSED + IRRELEVANT + IDENTIFIED + DO_NOT_USE + NON_FUNCTIONAL + """ + + TO_BE_DISCUSSED = "TO_BE_DISCUSSED" + IRRELEVANT = "IRRELEVANT" + IDENTIFIED = "IDENTIFIED" + DO_NOT_USE = "DO_NOT_USE" + NON_FUNCTIONAL = "NON_FUNCTIONAL" + + +class PrevNextSelection(Enum): + """Type of file to be selected for the prev-next endpoint: + + WITHLICENSES + NOCLEARING + """ + + WITHLICENSES = "withLicenses" + NOCLEARING = "noClearing" diff --git a/fossology/foss_cli.py b/fossology/foss_cli.py index 2346ed9..4f7253c 100644 --- a/fossology/foss_cli.py +++ b/fossology/foss_cli.py @@ -23,8 +23,9 @@ import click from fossology import Fossology, fossology_token +from fossology.enums import AccessLevel, ReportFormat, TokenScope from fossology.exceptions import FossologyApiError, FossologyUnsupported -from fossology.obj import AccessLevel, Folder, ReportFormat, Summary, TokenScope +from fossology.obj import Folder, Summary logger = logging.getLogger(__name__) formatter = logging.Formatter( diff --git a/fossology/groups.py b/fossology/groups.py index 22d0fb6..b569509 100644 --- a/fossology/groups.py +++ b/fossology/groups.py @@ -4,8 +4,9 @@ import logging +from fossology.enums import MemberPerm from fossology.exceptions import FossologyApiError -from fossology.obj import Group, MemberPerm, UserGroupMember +from fossology.obj import Group, UserGroupMember logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) diff --git a/fossology/license.py b/fossology/license.py index f0af57a..24b6458 100644 --- a/fossology/license.py +++ b/fossology/license.py @@ -8,8 +8,9 @@ from typing import Tuple from urllib.parse import quote +from fossology.enums import LicenseType from fossology.exceptions import FossologyApiError -from fossology.obj import License, LicenseType, Obligation +from fossology.obj import License, Obligation logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) diff --git a/fossology/obj.py b/fossology/obj.py index e3d8bdc..44c808f 100644 --- a/fossology/obj.py +++ b/fossology/obj.py @@ -3,159 +3,9 @@ # SPDX-License-Identifier: MIT import json -from enum import Enum +from typing import Iterable - -class AccessLevel(Enum): - """Available access levels for uploads: - - PRIVATE - PROTECTED - PUBLIC - - """ - - PRIVATE = "private" - PROTECTED = "protected" - PUBLIC = "public" - - -class ReportFormat(Enum): - """Available report format: - - DEP5 - SPDX2 - SPDX2TV - READMEOSS - UNIFIEDREPORT - - """ - - DEP5 = "dep5" - SPDX2 = "spdx2" - SPDX2TV = "spdx2tv" - READMEOSS = "readmeoss" - UNIFIEDREPORT = "unifiedreport" - - -class SearchTypes(Enum): - """Type of item that can be searched: - - ALLFILES - CONTAINERS - DIRECTORY - - """ - - ALLFILES = "allfiles" - CONTAINERS = "containers" - DIRECTORY = "directory" - - -class TokenScope(Enum): - """Scope for API tokens: - - READ: Read only access, limited only to "GET" calls - - WRITE: Read/Write access, required for calls other than "GET" - - """ - - READ = "read" - WRITE = "write" - - -class ClearingStatus(Enum): - """Clearing statuses: - - OPEN - INPROGRESS - CLOSED - REJECTED - - """ - - OPEN = "Open" - INPROGRESS = "InProgress" - CLOSED = "Closed" - REJECTED = "Rejected" - - -class JobStatus(Enum): - """Job statuses: - - COMPLETED - FAILED - QUEUED - PROCESSING - - """ - - COMPLETED = "Completed" - FAILED = "Failed" - QUEUED = "Queued" - PROCESSING = "Processing" - - -class LicenseType(Enum): - """License types: - - CANDIDATE - MAIN - ALL - - """ - - CANDIDATE = "candidate" - MAIN = "main" - ALL = "all" - - -class ObligationClass(Enum): - """Classification of an obligation: - - GREEN - WHITE - YELLOW - RED - - """ - - GREEN = "green" - WHITE = "white" - YELLOW = "yellow" - RED = "red" - - -class MemberPerm(Enum): - """Group member permissions: - - USER - ADMIN - ADVISOR - - """ - - USER = 0 - ADMIN = 1 - ADVISOR = 2 - - -class Permission(Enum): - """Upload or group permissions: - - NONE - READ_ONLY - READ_WRITE - CLEARING_ADMIN - ADMIN - """ - - NONE = "0" - READ_ONLY = "1" - READ_WRITE = "3" - CLEARING_ADMIN = "5" - ADMIN = "10" +from fossology.enums import ClearingScope, ClearingType, Permission class Agents(object): @@ -1115,3 +965,134 @@ def __str__(self): @classmethod def from_json(cls, json_dict): return cls(**json_dict) + + +class GetClearingHistory(object): + + """Clearing history. + + Represents the clearing history of a specified item. + + :param date: date of the clearing history + :param username: username of the user who created the decision + :param scope: scope of the clearing + :param type: type of the clearing + :param addedLicenses: list of license shortnames added to the decision + :param removedLicenses: list of license shortnames removed to the decision + :param kwargs: handle any other job information provided by the fossology instance + :type date: string + :type username: string + :type scope: str + :type type: str + :type addedLicenses: List[str] + :type removedLicenses: List[str] + :type kwargs: key word argument + """ + + def __init__( + self, + date: str, + username: str, + scope: str, + type: str, + addedLicenses: Iterable[str], + removedLicenses: Iterable[str], + **kwargs, + ): + self.date = date + self.username = username + self.scope = ClearingScope(scope) + self.type = ClearingType(type) + self.addedLicenses = addedLicenses + self.removedLicenses = removedLicenses + self.additional_info = kwargs + + def __str__(self): + return f"{self.username} changed clearing history at {self.date} in {self.scope} (type: {self.type})" + + @classmethod + def from_json(cls, json_dict): + return cls(**json_dict) + + +class GetBulkHistory(object): + + """Bulk history. + + Represents the bulk history of a specified item. + + :param bulkId: the bulk id + :param clearingEventId: the event id associated with the bulk + :param text: scan reference text + :param matched: whether matched or not + :param tried: whether tried or not + :param addedLicenses: list of license shortnames added to the scan + :param removedLicenses: list of license shortnames removed to the scan + :param kwargs: handle any other job information provided by the fossology instance + :type bulkId: int + :type clearingEventId: int + :type text: str + :type matched: bool + :type tried: bool + :type addedLicenses: List[str] + :type removedLicenses: List[str] + :type kwargs: key word argument + """ + + def __init__( + self, + bulkId: int, + clearingEventId: int, + text: str, + matched: bool, + tried: bool, + addedLicenses: Iterable[str], + removedLicenses: Iterable[str], + **kwargs, + ): + self.bulkId = bulkId + self.clearingEventId = clearingEventId + self.text = text + self.matched = matched + self.tried = tried + self.addedLicenses = addedLicenses + self.removedLicenses = removedLicenses + self.additional_info = kwargs + + def __str__(self): + return f"Bulk Id {self.bulkId} associated with {self.clearingEventId} | Search for {self.text}" + + @classmethod + def from_json(cls, json_dict): + return cls(**json_dict) + + +class GetPrevNextItem(object): + """PrevNext item for the clearing history. + + Represents the prev-next item list for the clearing history. + + :param prevItemId: id of the previous item + :param nextItemId: id of the next item + :param kwargs: handle any other job information provided by the fossology instance + :type prevItemId: int + :type nextItemId: int + :type kwargs: key word argument + """ + + def __init__( + self, + prevItemId: int, + nextItemId: int, + **kwargs, + ): + self.prevItemId = prevItemId + self.nextItemId = nextItemId + self.additional_info = kwargs + + def __str__(self): + return f"Prev: {self.prevItemId} | Next: {self.nextItemId}" + + @classmethod + def from_json(cls, json_dict): + return cls(**json_dict) diff --git a/fossology/report.py b/fossology/report.py index 9a0436e..10f55d9 100644 --- a/fossology/report.py +++ b/fossology/report.py @@ -9,8 +9,9 @@ from tenacity import TryAgain, retry, retry_if_exception_type, stop_after_attempt +from fossology.enums import ReportFormat from fossology.exceptions import AuthorizationError, FossologyApiError -from fossology.obj import ReportFormat, Upload +from fossology.obj import Upload logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) diff --git a/fossology/search.py b/fossology/search.py index dd95812..03082c4 100644 --- a/fossology/search.py +++ b/fossology/search.py @@ -3,8 +3,9 @@ # SPDX-License-Identifier: MIT import logging +from fossology.enums import SearchTypes from fossology.exceptions import AuthorizationError, FossologyApiError -from fossology.obj import File, SearchResult, SearchTypes, Upload +from fossology.obj import File, SearchResult, Upload logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -118,12 +119,10 @@ def search( response = self.session.get(f"{self.api}/search", headers=headers) if response.status_code == 200: - print(f"Got {len(response.json())} search results") for result in response.json(): results_list.append(SearchResult.from_json(result)) x_total_pages = int(response.headers.get("X-TOTAL-PAGES", 0)) - print(f"Total number of pages: {x_total_pages}") if not all_pages or x_total_pages == 0: logger.info( f"Retrieved page {page} of uploads, {x_total_pages} pages are in total available" diff --git a/fossology/uploads.py b/fossology/uploads.py index ba51cae..be2b29d 100644 --- a/fossology/uploads.py +++ b/fossology/uploads.py @@ -10,11 +10,13 @@ import requests from tenacity import TryAgain, retry, retry_if_exception_type, stop_after_attempt +from fossology.enums import AccessLevel, ClearingStatus, PrevNextSelection from fossology.exceptions import AuthorizationError, FossologyApiError from fossology.obj import ( - AccessLevel, - ClearingStatus, Folder, + GetBulkHistory, + GetClearingHistory, + GetPrevNextItem, Group, Permission, Summary, @@ -798,3 +800,171 @@ def upload_permissions( f"API error while getting permissions for upload {upload.uploadname}." ) raise FossologyApiError(description, response) + + def get_clearing_history( + self, + upload: Upload, + item_id: int, + ) -> list[GetClearingHistory]: + """Get the clearing history for a specific upload item + + API Endpoint: GET /uploads/{id}/item/{itemId}/clearing-history + + :param upload: the upload to get items from + :param item_id: the id of the item with clearing decision + :type upload: Upload + :type item_id: int, + :return: the clearing history for the specified item + :rtype: List[GetClearingHistory] + :raises FossologyApiError: if the REST call failed + :raises AuthorizationError: if the REST call is not authorized + """ + response = self.session.get( + f"{self.api}/uploads/{upload.id}/item/{item_id}/clearing-history" + ) + + if response.status_code == 200: + clearing_history = [] + for action in response.json(): + clearing_history.append(GetClearingHistory.from_json(action)) + return clearing_history + + elif response.status_code == 404: + description = f"Upload {upload.id} or item {item_id} not found" + raise FossologyApiError(description, response) + else: + description = f"API error while getting clearing history for item {item_id} from upload {upload.uploadname}." + raise FossologyApiError(description, response) + + def get_prev_next( + self, upload: Upload, item_id: int, selection: PrevNextSelection | None = None + ) -> GetPrevNextItem: + """Get the index of the previous and the next time for an upload + + API Endpoint: GET /uploads/{id}/item/{itemId}/prev-next + + :param upload: the upload to get items from + :param item_id: the id of the item with clearing decision + :param selection: tell Fossology server how to select prev-next item + :type upload: Upload + :type item_id: int + :type selection: str + :return: list of items for the clearing history + :rtype: List[GetPrevNextItem] + :raises FossologyApiError: if the REST call failed + :raises AuthorizationError: if the REST call is not authorized + """ + params = {} + if selection: + params["selection"] = selection + + response = self.session.get( + f"{self.api}/uploads/{upload.id}/item/{item_id}/prev-next", params=params + ) + + if response.status_code == 200: + return GetPrevNextItem.from_json(response.json()) + + elif response.status_code == 404: + description = f"Upload {upload.id} or item {item_id} not found" + raise FossologyApiError(description, response) + else: + description = f"API error while getting prev-next items for {item_id} from upload {upload.uploadname}." + raise FossologyApiError(description, response) + + def get_bulk_history( + self, + upload: Upload, + item_id: int, + ) -> list[GetBulkHistory]: + """Get the bulk history for a specific upload item + + API Endpoint: GET /uploads/{id}/item/{itemId}/bulk-history + + :param upload: the upload to get items from + :param item_id: the id of the item with clearing decision + :type upload: Upload + :type item_id: int + :return: list of data from the bulk history + :rtype: List[GetBulkHistory] + :raises FossologyApiError: if the REST call failed + :raises AuthorizationError: if the REST call is not authorized + """ + response = self.session.get( + f"{self.api}/uploads/{upload.id}/item/{item_id}/bulk-history" + ) + + if response.status_code == 200: + bulk_history = [] + for item in response.json(): + bulk_history.append(GetBulkHistory.from_json(item)) + return bulk_history + + elif response.status_code == 404: + description = f"Upload {upload.id} or item {item_id} not found" + raise FossologyApiError(description, response) + else: + description = f"API error while getting bulk history for {item_id} from upload {upload.uploadname}." + raise FossologyApiError(description, response) + + def schedule_bulk_scan( + self, + upload: Upload, + item_id: int, + spec: dict, + ): + """Schedule a bulk scan for a specific upload item + + API Endpoint: POST /uploads/{id}/item/{itemId}/bulk-scan + + Bulk scan specifications `spec` are added to the request body, + following options are available: + + >>> bulk_scan_spec = { + ... "bulkActions": [ + ... { + ... "licenseShortName": string (example: 'MIT'), + ... "licenseText": string (example: 'License text'), + ... "acknowledgement": string (example: 'Acknowledgment text'), + ... "comment": string (example: 'Comment text'), + ... "licenseAction": LicenseAction (ADD/REMOVE), + ... } + ... ], + ... "refText": string (example: 'Reference Text'), + ... "bulkScope": BulkScope (folder/upload), + ... "forceDecision": boolean (example: 'false'), + ... "ignoreIrre": boolean (example: 'false'), + ... "delimiters": string (example: 'DEFAULT'), + ... "scanOnlyFindings": boolean (example: 'true'), + ... } + ... } + + :param upload: the upload for the bulk scan + :param item_id: the id of the item for the bulk scan + :param spec: bulk scan specification + :type upload: Upload + :type item_id: int + :raises FossologyApiError: if the REST call failed + :raises AuthorizationError: if the REST call is not authorized + """ + headers = {"Content-Type": "application/json"} + response = self.session.post( + f"{self.api}/uploads/{upload.id}/item/{item_id}/bulk-scan", + headers=headers, + data=json.dumps(spec), + ) + if response.status_code == 201: + logger.info( + f"Bulk scan scheduled for upload {upload.uploadname}, item {item_id}" + ) + elif response.status_code == 400: + description = ( + f"Bad bulk scan request for upload {upload.id}, item {item_id}" + ) + raise FossologyApiError(description, response) + elif response.status_code == 404: + description = f"Upload {upload.id} or item {item_id} not found" + raise FossologyApiError(description, response) + else: + description = f"API error while scheduling bulk scan for item {item_id} from upload {upload.uploadname}." + raise FossologyApiError(description, response) diff --git a/tests/conftest.py b/tests/conftest.py index c48d016..d767883 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,8 +11,9 @@ from click.testing import CliRunner import fossology +from fossology.enums import AccessLevel, JobStatus, TokenScope from fossology.exceptions import AuthenticationError, FossologyApiError -from fossology.obj import AccessLevel, Agents, JobStatus, TokenScope, Upload +from fossology.obj import Agents, Upload logger = logging.getLogger("fossology") console = logging.StreamHandler() @@ -57,6 +58,27 @@ def foss_agents() -> Agents: ) +@pytest.fixture(scope="session") +def foss_bulk_scan_spec() -> Dict: + return { + "bulkActions": [ + { + "licenseShortName": "MIT", + "licenseText": "", + "acknowledgement": "", + "comment": "", + "licenseAction": "ADD", + } + ], + "refText": "Reference Text", + "bulkScope": "upload", + "forceDecision": "false", + "ignoreIrre": "false", + "delimiters": "DEFAULT", + "scanOnlyFindings": "true", + } + + @pytest.fixture(scope="session") def foss_schedule_agents() -> Dict: return { diff --git a/tests/test_groups.py b/tests/test_groups.py index 950c81f..f16ba9b 100644 --- a/tests/test_groups.py +++ b/tests/test_groups.py @@ -8,8 +8,9 @@ import responses import fossology +from fossology.enums import MemberPerm from fossology.exceptions import FossologyApiError -from fossology.obj import MemberPerm, User +from fossology.obj import User # Helper functions diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 9fabbf3..21cd7b4 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -9,8 +9,9 @@ import responses from fossology import Fossology +from fossology.enums import JobStatus from fossology.exceptions import AuthorizationError, FossologyApiError -from fossology.obj import JobStatus, Upload +from fossology.obj import Upload def test_unpack_jobs(foss: Fossology, upload: Upload): diff --git a/tests/test_license.py b/tests/test_license.py index 58236cb..c831b6a 100644 --- a/tests/test_license.py +++ b/tests/test_license.py @@ -7,8 +7,9 @@ import responses import fossology +from fossology.enums import LicenseType, ObligationClass from fossology.exceptions import FossologyApiError -from fossology.obj import License, LicenseType, Obligation, ObligationClass +from fossology.obj import License, Obligation shortname = "GPL-2.0+" @@ -50,7 +51,7 @@ def test_detail_license_not_found(foss: fossology.Fossology): def test_detail_license(foss: fossology.Fossology): detail_license = foss.detail_license(shortname, group="fossy") assert detail_license - assert type(detail_license) == License + assert isinstance(detail_license, License) @responses.activate @@ -154,7 +155,7 @@ def test_patch_license_and_get_by_shortname( def test_license_to_json(test_license: License): json_license = test_license.to_json() - assert type(json_license) == str + assert isinstance(json_license, str) def test_obligation_object(): diff --git a/tests/test_report.py b/tests/test_report.py index 780554a..b3a25ad 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -10,8 +10,9 @@ import responses from fossology import Fossology +from fossology.enums import ReportFormat from fossology.exceptions import AuthorizationError, FossologyApiError -from fossology.obj import ReportFormat, Upload +from fossology.obj import Upload def test_report_nogroup(foss: Fossology, upload: Upload): diff --git a/tests/test_search.py b/tests/test_search.py index 50f7cd6..4d3fdc7 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -7,8 +7,9 @@ import responses from fossology import Fossology +from fossology.enums import SearchTypes from fossology.exceptions import AuthorizationError, FossologyApiError -from fossology.obj import SearchTypes, Upload +from fossology.obj import Upload # See: https://github.com/fossology/fossology/pull/2390 diff --git a/tests/test_upload_clearing.py b/tests/test_upload_clearing.py new file mode 100644 index 0000000..5e11e94 --- /dev/null +++ b/tests/test_upload_clearing.py @@ -0,0 +1,160 @@ +# Copyright 2023 Siemens AG +# SPDX-License-Identifier: MIT + +import pytest +import responses + +from fossology import Fossology +from fossology.enums import PrevNextSelection +from fossology.exceptions import FossologyApiError +from fossology.obj import Upload + + +def test_upload_get_clearing_history(foss: Fossology, upload_with_jobs: Upload): + files, _ = foss.search(license="BSD") + history = foss.get_clearing_history(upload_with_jobs, files[0].uploadTreeId) + assert not history + + +def test_upload_get_clearing_history_with_unknown_item_raises_api_error( + foss: Fossology, upload_with_jobs: Upload +): + with pytest.raises(FossologyApiError) as excinfo: + foss.get_clearing_history(upload_with_jobs, 1) + assert f"Upload {upload_with_jobs.id} or item 1 not found" in str(excinfo.value) + + +@responses.activate +def test_upload_get_clearing_history_500_error( + foss: Fossology, foss_server: str, upload_with_jobs: Upload +): + responses.add( + responses.GET, + f"{foss_server}/api/v1/uploads/{upload_with_jobs.id}/item/1/clearing-history", + status=500, + ) + with pytest.raises(FossologyApiError) as excinfo: + foss.get_clearing_history(upload_with_jobs, 1) + assert ( + f"API error while getting clearing history for item 1 from upload {upload_with_jobs.uploadname}." + in excinfo.value.message + ) + + +def test_upload_get_bulk_history(foss: Fossology, upload_with_jobs: Upload): + files, _ = foss.search(license="BSD") + history = foss.get_bulk_history(upload_with_jobs, files[0].uploadTreeId) + assert not history + + +def test_upload_get_bulk_history_with_unknown_item_raises_api_error( + foss: Fossology, upload_with_jobs: Upload +): + with pytest.raises(FossologyApiError) as excinfo: + foss.get_bulk_history(upload_with_jobs, 1) + assert f"Upload {upload_with_jobs.id} or item 1 not found" in str(excinfo.value) + + +@responses.activate +def test_upload_get_bulk_history_500_error( + foss: Fossology, foss_server: str, upload_with_jobs: Upload +): + responses.add( + responses.GET, + f"{foss_server}/api/v1/uploads/{upload_with_jobs.id}/item/1/clearing-history", + status=500, + ) + with pytest.raises(FossologyApiError) as excinfo: + foss.get_clearing_history(upload_with_jobs, 1) + assert ( + f"API error while getting clearing history for item 1 from upload {upload_with_jobs.uploadname}." + in excinfo.value.message + ) + + +def test_upload_schedule_bulk_scan( + foss: Fossology, upload_with_jobs: Upload, foss_bulk_scan_spec: dict +): + files, _ = foss.search(license="BSD") + history = foss.get_bulk_history(upload_with_jobs, files[0].uploadTreeId) + assert not history + foss.schedule_bulk_scan( + upload_with_jobs, files[0].uploadTreeId, foss_bulk_scan_spec + ) + history = foss.get_bulk_history(upload_with_jobs, files[0].uploadTreeId) + assert history[0].addedLicenses == ["MIT"] + + +def test_schedule_bulk_scan_with_unknown_item_raises_api_error( + foss: Fossology, upload_with_jobs: Upload, foss_bulk_scan_spec: dict +): + with pytest.raises(FossologyApiError) as excinfo: + foss.schedule_bulk_scan(upload_with_jobs, 1, foss_bulk_scan_spec) + assert f"Upload {upload_with_jobs.id} or item 1 not found" in str(excinfo.value) + + +@responses.activate +def test_schedule_bulk_scan_500_error( + foss: Fossology, + foss_server: str, + upload_with_jobs: Upload, + foss_bulk_scan_spec: dict, +): + responses.add( + responses.POST, + f"{foss_server}/api/v1/uploads/{upload_with_jobs.id}/item/1/bulk-scan", + status=500, + ) + with pytest.raises(FossologyApiError) as excinfo: + foss.schedule_bulk_scan(upload_with_jobs, 1, foss_bulk_scan_spec) + assert ( + f"API error while scheduling bulk scan for item 1 from upload {upload_with_jobs.uploadname}." + in excinfo.value.message + ) + + +def test_upload_get_prev_next(foss: Fossology, upload_with_jobs: Upload): + files, _ = foss.search(license="BSD") + prev_next = foss.get_prev_next(upload_with_jobs, files[0].uploadTreeId) + assert prev_next + + +def test_upload_get_prev_next_with_licenses(foss: Fossology, upload_with_jobs: Upload): + files, _ = foss.search(license="BSD") + prev_next = foss.get_prev_next( + upload_with_jobs, files[0].uploadTreeId, PrevNextSelection.WITHLICENSES.value + ) + assert prev_next + + +def test_upload_get_prev_next_no_clearing(foss: Fossology, upload_with_jobs: Upload): + files, _ = foss.search(license="BSD") + prev_next = foss.get_prev_next( + upload_with_jobs, files[0].uploadTreeId, PrevNextSelection.NOCLEARING.value + ) + assert prev_next + + +def test_upload_get_prev_next_with_unknown_item_raises_api_error( + foss: Fossology, upload_with_jobs: Upload +): + with pytest.raises(FossologyApiError) as excinfo: + foss.get_prev_next(upload_with_jobs, 1) + assert f"Upload {upload_with_jobs.id} or item 1 not found" in str(excinfo.value) + + +@responses.activate +def test_upload_get_prev_next_500_error( + foss: Fossology, foss_server: str, upload_with_jobs: Upload +): + responses.add( + responses.GET, + f"{foss_server}/api/v1/uploads/{upload_with_jobs.id}/item/1/prev-next", + status=500, + ) + with pytest.raises(FossologyApiError) as excinfo: + foss.get_prev_next(upload_with_jobs, 1) + assert ( + f"API error while getting prev-next items for 1 from upload {upload_with_jobs.uploadname}." + in excinfo.value.message + ) diff --git a/tests/test_upload_from.py b/tests/test_upload_from.py index 77e8a6f..cad2855 100644 --- a/tests/test_upload_from.py +++ b/tests/test_upload_from.py @@ -2,8 +2,9 @@ # SPDX-License-Identifier: MIT from fossology import Fossology +from fossology.enums import AccessLevel from fossology.exceptions import FossologyApiError -from fossology.obj import AccessLevel, Upload +from fossology.obj import Upload def delete_upload(foss: Fossology, upload: Upload): diff --git a/tests/test_uploads.py b/tests/test_uploads.py index 8058253..b7b5261 100644 --- a/tests/test_uploads.py +++ b/tests/test_uploads.py @@ -13,8 +13,9 @@ import responses from fossology import Fossology +from fossology.enums import AccessLevel, ClearingStatus from fossology.exceptions import AuthorizationError, FossologyApiError -from fossology.obj import AccessLevel, ClearingStatus, Folder, Upload +from fossology.obj import Folder, Upload def test_upload_sha1(upload: Upload):