diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 0f8b56ad9..76dfc6aba 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,24 @@ Changelog ========== +.. _2.7.1: + +2.7.1 - 2024-06-26 +~~~~~~~~~~~~~~~~~~~~~~~ + +- New features: + - Fix the Project endpoint according to OpenAPI standard. + - Fix the Superadmin endpoint according to OpenAPI standard. +- Dependencies: + - `Authlib` from `0.15.5` to `1.3.1` + - `PyMySQL` from `1.0.2` to `1.1.1` +- Node modules: + - `braces` from `3.0.2` to `3.0.3` + - `fill-range` from `7.0.1` to `7.1.1` + - `ip` from `2.0.0` to `9.0.5` + - `socks` from `2.7.1` to `2.8.3` + - `tar` from `6.1.15` to `6.2.1` + .. _2.7.0: 2.7.0 - 2024-05-29 diff --git a/SPRINTLOG.md b/SPRINTLOG.md index 6dbd3bb4a..d0efaacc9 100644 --- a/SPRINTLOG.md +++ b/SPRINTLOG.md @@ -393,3 +393,14 @@ _Nothing merged in CLI during this sprint_ - Fix raising error when archiving project, bucket deleted but DB error ([#1524](https://github.com/ScilifelabDataCentre/dds_web/pull/1524)) - Increase the identified less covered files([#1521](https://github.com/ScilifelabDataCentre/dds_web/pull/1521)) - Parse boolean inputs correctly ([#1528](https://github.com/ScilifelabDataCentre/dds_web/pull/1528)) + +# 2024-06-03 - 2024-06-14 + +- Fix the project endpoints according to the OpenAPI standard ([#1527](https://github.com/ScilifelabDataCentre/dds_web/pull/1527)) +- Fix the Superadmin endpoints according to the OpenAPI standard ([#1533](https://github.com/ScilifelabDataCentre/dds_web/pull/1533)) + +# 2024-06-17 - 2024-06-28 + +- Update pymysql to address cve ([#1534](https://github.com/ScilifelabDataCentre/dds_web/pull/1534)) +- Update authlib to address cve ([#1535](https://github.com/ScilifelabDataCentre/dds_web/pull/1535)) +- Update node packages to address cve ([#1536](https://github.com/ScilifelabDataCentre/dds_web/pull/1536)) diff --git a/dds_web/api/project.py b/dds_web/api/project.py index 2885dca49..0ce611cfe 100644 --- a/dds_web/api/project.py +++ b/dds_web/api/project.py @@ -9,6 +9,7 @@ # Installed import flask_restful +from flask_restful import inputs import flask import sqlalchemy import datetime @@ -59,6 +60,35 @@ class ProjectStatus(flask_restful.Resource): @handle_validation_errors def get(self): """Get current project status and optionally entire status history""" + if "api/v1" in flask.request.path: + # requests comming from api/v1 should be handled as before + return self.old_get() + + elif "api/v3" in flask.request.path: + # Get project ID, project and verify access + project_id = dds_web.utils.get_required_item(obj=flask.request.args, req="project") + project = dds_web.utils.collect_project(project_id=project_id) + dds_web.utils.verify_project_access(project=project) + + # Get current status and deadline + return_info = {"current_status": project.current_status} + if project.current_deadline: + return_info["current_deadline"] = project.current_deadline + + # Get status history + history = flask.request.args.get("history", type=inputs.boolean, default=False) + if history: + history_info = [] + for pstatus in project.project_statuses: + history_info.append(tuple((pstatus.status, pstatus.date_created))) + history_info.sort(key=lambda x: x[1], reverse=True) + return_info.update({"history": history_info}) + + return return_info + + def old_get(self): + """Implementation of old get method. Should be removed when api/v1 is removed.""" + # Get project ID, project and verify access project_id = dds_web.utils.get_required_item(obj=flask.request.args, req="project") project = dds_web.utils.collect_project(project_id=project_id) diff --git a/dds_web/api/superadmin_only.py b/dds_web/api/superadmin_only.py index 3c73305f1..6afc9b7e1 100644 --- a/dds_web/api/superadmin_only.py +++ b/dds_web/api/superadmin_only.py @@ -10,6 +10,7 @@ # Installed import flask_restful +from flask_restful import inputs import flask import structlog import flask_mail @@ -204,11 +205,31 @@ class FindUser(flask_restful.Resource): @auth.login_required(role=["Super Admin"]) @logging_bind_request - @json_required @handle_db_error def get(self): - """Return users or a confirmation on if one exists.""" - # Get request info + if "api/v1" in flask.request.path: + # requests comming from api/v1 should be handled as before + return self.old_get() + + elif "api/v3" in flask.request.path: + """Return all users or confirmation whether a specific user is in the database.""" + + # Get username from request + user_to_find = flask.request.args.get("username") + if not user_to_find: + raise ddserr.DDSArgumentError( + message="Username required to check existence of account." + ) + + return { + "exists": models.User.query.filter_by(username=user_to_find).one_or_none() + is not None + } + + @json_required + def old_get(self): + """Implementation of old get method. Should be removed when api/v1 is removed.""" # Get request info + request_json = flask.request.get_json(silent=True) # Verified by json_required # Get username from request @@ -303,6 +324,31 @@ class AnyProjectsBusy(flask_restful.Resource): @handle_db_error def get(self): """Check if any projects are busy.""" + if "api/v1" in flask.request.path: + # requests comming from api/v1 should be handled as before + return self.old_get() + + elif "api/v3" in flask.request.path: + # Get busy projects + projects_busy: typing.List = models.Project.query.filter_by(busy=True).all() + num_busy: int = len(projects_busy) + + # Set info to always return nu + return_info: typing.Dict = {"num": num_busy} + + # Return 0 if none are busy + if num_busy == 0: + return return_info + + # Check if user listing busy projects + if flask.request.args.get("list", type=inputs.boolean, default=False): + return_info.update( + {"projects": {p.public_id: p.date_updated for p in projects_busy}} + ) + + return return_info + + def old_get(self): # Get busy projects projects_busy: typing.List = models.Project.query.filter_by(busy=True).all() num_busy: int = len(projects_busy) diff --git a/dds_web/security/project_user_keys.py b/dds_web/security/project_user_keys.py index 8c27afb7a..81ec35863 100644 --- a/dds_web/security/project_user_keys.py +++ b/dds_web/security/project_user_keys.py @@ -4,7 +4,8 @@ import argon2 import cryptography.exceptions -from cryptography.hazmat.primitives import asymmetric, ciphers, hashes, serialization +from cryptography.hazmat.primitives import asymmetric, hashes, serialization +from cryptography.hazmat.primitives.ciphers.aead import AESGCM import flask import gc @@ -178,7 +179,7 @@ def __encrypt_with_aes(key, plaintext, aad=None): :param plaintext: a byte string :param aad: Additional data that should be authenticated with the key, but is not encrypted. Can be None. """ - aesgcm = ciphers.aead.AESGCM(key) + aesgcm = AESGCM(key) nonce = os.urandom(12) return nonce, aesgcm.encrypt(nonce, plaintext, aad) @@ -192,7 +193,7 @@ def __decrypt_with_aes(key, ciphertext, nonce, aad=None): :param aad: Additional data that should be authenticated with the key, but is not encrypted. Can be None. """ try: - aesgcm = ciphers.aead.AESGCM(key=key) + aesgcm = AESGCM(key=key) return aesgcm.decrypt(nonce=nonce, data=ciphertext, associated_data=aad) except (cryptography.exceptions.InvalidTag, ValueError): return None @@ -205,7 +206,7 @@ def __owner_identifier(owner): def __encrypt_owner_private_key(owner, private_key, owner_key=None): """Encrypt owners private key.""" # Generate key or use current key if exists - key = owner_key or ciphers.aead.AESGCM.generate_key(bit_length=256) + key = owner_key or AESGCM.generate_key(bit_length=256) # Encrypt private key nonce, encrypted_key = __encrypt_with_aes( diff --git a/dds_web/static/package-lock.json b/dds_web/static/package-lock.json index 482625544..de7385ae2 100644 --- a/dds_web/static/package-lock.json +++ b/dds_web/static/package-lock.json @@ -770,12 +770,12 @@ } }, "node_modules/braces": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz", - "integrity": "sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz", + "integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==", "dev": true, "dependencies": { - "fill-range": "^7.0.1" + "fill-range": "^7.1.1" }, "engines": { "node": ">=8" @@ -1586,9 +1586,9 @@ } }, "node_modules/fill-range": { - "version": "7.0.1", - "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", - "integrity": "sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==", + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", + "integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==", "dev": true, "dependencies": { "to-regex-range": "^5.0.1" @@ -2272,11 +2272,18 @@ "node": ">= 0.4" } }, - "node_modules/ip": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.0.tgz", - "integrity": "sha512-WKa+XuLG1A1R0UWhl2+1XQSi+fZWMsYKffMZTTYsiZaUD8k2yDAj5atimTUD2TZkyCkNEeYE5NhFZmupOGtjYQ==", - "dev": true + "node_modules/ip-address": { + "version": "9.0.5", + "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-9.0.5.tgz", + "integrity": "sha512-zHtQzGojZXTwZTHQqra+ETKd4Sn3vgi7uBmlPoXVWZqYvuKmtI0l/VZTjqGmJY9x88GGOaZ9+G9ES8hC4T4X8g==", + "dev": true, + "dependencies": { + "jsbn": "1.1.0", + "sprintf-js": "^1.1.3" + }, + "engines": { + "node": ">= 12" + } }, "node_modules/is-array-buffer": { "version": "3.0.2", @@ -2632,6 +2639,12 @@ "integrity": "sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==", "dev": true }, + "node_modules/jsbn": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/jsbn/-/jsbn-1.1.0.tgz", + "integrity": "sha512-4bYVV3aAMtDTTu4+xsDYa6sy9GyJ69/amsu9sYF2zqjiEoZA5xJi3BrfX3uY+/IekIu7MwdObdbDWpoZdBv3/A==", + "dev": true + }, "node_modules/json-buffer": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/json-buffer/-/json-buffer-3.0.1.tgz", @@ -4881,16 +4894,16 @@ } }, "node_modules/socks": { - "version": "2.7.1", - "resolved": "https://registry.npmjs.org/socks/-/socks-2.7.1.tgz", - "integrity": "sha512-7maUZy1N7uo6+WVEX6psASxtNlKaNVMlGQKkG/63nEDdLOWNbiUMoLK7X4uYoLhQstau72mLgfEWcXcwsaHbYQ==", + "version": "2.8.3", + "resolved": "https://registry.npmjs.org/socks/-/socks-2.8.3.tgz", + "integrity": "sha512-l5x7VUUWbjVFbafGLxPWkYsHIhEvmF85tbIeFZWc8ZPtoMyybuEhL7Jye/ooC4/d48FgOjSJXgsF/AJPYCW8Zw==", "dev": true, "dependencies": { - "ip": "^2.0.0", + "ip-address": "^9.0.5", "smart-buffer": "^4.2.0" }, "engines": { - "node": ">= 10.13.0", + "node": ">= 10.0.0", "npm": ">= 3.0.0" } }, @@ -4958,6 +4971,12 @@ "integrity": "sha512-XkD+zwiqXHikFZm4AX/7JSCXA98U5Db4AFd5XUg/+9UNtnH75+Z9KxtpYiJZx36mUDVOwH83pl7yvCer6ewM3w==", "dev": true }, + "node_modules/sprintf-js": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.1.3.tgz", + "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==", + "dev": true + }, "node_modules/ssri": { "version": "8.0.1", "resolved": "https://registry.npmjs.org/ssri/-/ssri-8.0.1.tgz", @@ -5469,9 +5488,9 @@ } }, "node_modules/tar": { - "version": "6.1.15", - "resolved": "https://registry.npmjs.org/tar/-/tar-6.1.15.tgz", - "integrity": "sha512-/zKt9UyngnxIT/EAGYuxaMYgOIJiP81ab9ZfkILq4oNLPFX50qyYmu7jRj9qeXoxmJHjGlbH0+cm2uy1WCs10A==", + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/tar/-/tar-6.2.1.tgz", + "integrity": "sha512-DZ4yORTwrbTj/7MZYq2w+/ZFdI6OZ/f9SFHR+71gIVUZhOQPHzVCLpvRnPgyaMpfWxxk/4ONva3GQSyNIKRv6A==", "dev": true, "dependencies": { "chownr": "^2.0.0", @@ -6513,12 +6532,12 @@ } }, "braces": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz", - "integrity": "sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz", + "integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==", "dev": true, "requires": { - "fill-range": "^7.0.1" + "fill-range": "^7.1.1" } }, "browserslist": { @@ -7120,9 +7139,9 @@ } }, "fill-range": { - "version": "7.0.1", - "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", - "integrity": "sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==", + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", + "integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==", "dev": true, "requires": { "to-regex-range": "^5.0.1" @@ -7627,11 +7646,15 @@ "side-channel": "^1.0.4" } }, - "ip": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.0.tgz", - "integrity": "sha512-WKa+XuLG1A1R0UWhl2+1XQSi+fZWMsYKffMZTTYsiZaUD8k2yDAj5atimTUD2TZkyCkNEeYE5NhFZmupOGtjYQ==", - "dev": true + "ip-address": { + "version": "9.0.5", + "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-9.0.5.tgz", + "integrity": "sha512-zHtQzGojZXTwZTHQqra+ETKd4Sn3vgi7uBmlPoXVWZqYvuKmtI0l/VZTjqGmJY9x88GGOaZ9+G9ES8hC4T4X8g==", + "dev": true, + "requires": { + "jsbn": "1.1.0", + "sprintf-js": "^1.1.3" + } }, "is-array-buffer": { "version": "3.0.2", @@ -7873,6 +7896,12 @@ "integrity": "sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==", "dev": true }, + "jsbn": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/jsbn/-/jsbn-1.1.0.tgz", + "integrity": "sha512-4bYVV3aAMtDTTu4+xsDYa6sy9GyJ69/amsu9sYF2zqjiEoZA5xJi3BrfX3uY+/IekIu7MwdObdbDWpoZdBv3/A==", + "dev": true + }, "json-buffer": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/json-buffer/-/json-buffer-3.0.1.tgz", @@ -9513,12 +9542,12 @@ "dev": true }, "socks": { - "version": "2.7.1", - "resolved": "https://registry.npmjs.org/socks/-/socks-2.7.1.tgz", - "integrity": "sha512-7maUZy1N7uo6+WVEX6psASxtNlKaNVMlGQKkG/63nEDdLOWNbiUMoLK7X4uYoLhQstau72mLgfEWcXcwsaHbYQ==", + "version": "2.8.3", + "resolved": "https://registry.npmjs.org/socks/-/socks-2.8.3.tgz", + "integrity": "sha512-l5x7VUUWbjVFbafGLxPWkYsHIhEvmF85tbIeFZWc8ZPtoMyybuEhL7Jye/ooC4/d48FgOjSJXgsF/AJPYCW8Zw==", "dev": true, "requires": { - "ip": "^2.0.0", + "ip-address": "^9.0.5", "smart-buffer": "^4.2.0" } }, @@ -9577,6 +9606,12 @@ "integrity": "sha512-XkD+zwiqXHikFZm4AX/7JSCXA98U5Db4AFd5XUg/+9UNtnH75+Z9KxtpYiJZx36mUDVOwH83pl7yvCer6ewM3w==", "dev": true }, + "sprintf-js": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.1.3.tgz", + "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==", + "dev": true + }, "ssri": { "version": "8.0.1", "resolved": "https://registry.npmjs.org/ssri/-/ssri-8.0.1.tgz", @@ -9985,9 +10020,9 @@ } }, "tar": { - "version": "6.1.15", - "resolved": "https://registry.npmjs.org/tar/-/tar-6.1.15.tgz", - "integrity": "sha512-/zKt9UyngnxIT/EAGYuxaMYgOIJiP81ab9ZfkILq4oNLPFX50qyYmu7jRj9qeXoxmJHjGlbH0+cm2uy1WCs10A==", + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/tar/-/tar-6.2.1.tgz", + "integrity": "sha512-DZ4yORTwrbTj/7MZYq2w+/ZFdI6OZ/f9SFHR+71gIVUZhOQPHzVCLpvRnPgyaMpfWxxk/4ONva3GQSyNIKRv6A==", "dev": true, "requires": { "chownr": "^2.0.0", diff --git a/dds_web/static/swaggerv3.yaml b/dds_web/static/swaggerv3.yaml index afa599359..7417afb91 100644 --- a/dds_web/static/swaggerv3.yaml +++ b/dds_web/static/swaggerv3.yaml @@ -1001,12 +1001,7 @@ paths: get: tags: - project - summary: Get current project status and optionally entire status history CHECK METHOD - description: This method requires some data - to be passed in the request body instead of the query. - Since this does not comply with the openAPI standards, swagger cannot document it properly, - therefore we need to change/remove it in the future. - deprecated: true + summary: Get current project status and optionally entire status history operationId: projectStatusGet parameters: - $ref: "#/components/parameters/defaultHeader" @@ -1017,7 +1012,7 @@ paths: schema: type: boolean example: true - description: If true, return entire status history + description: If true, return entire status history. If false or missing, return only current status responses: "401": $ref: "#/components/responses/UnauthorizedToken" @@ -1418,15 +1413,15 @@ paths: get: tags: - superadmin - summary: Get all users or check if there is a specific user in the database CHECK METHOD - description: This method requires the data - to be passed in the request body instead of the query. - Since this does not comply with the openAPI standards, swagger cannot document it properly, - therefore we need to change/remove it in the future. - deprecated: true + summary: Check if a specific user exists in the system. The username must be passed operationId: findUser parameters: - $ref: "#/components/parameters/defaultHeader" + - in: query + name: username + schema: + type: string + description: username to check responses: "401": $ref: "#/components/responses/UnauthorizedToken" @@ -1483,12 +1478,7 @@ paths: get: tags: - superadmin - summary: Check if any project are busy CHECK METHOD - description: This method requires the data - to be passed in the request body instead of the query. - Since this does not comply with the openAPI standards, swagger cannot document it properly, - therefore we need to change/remove it in the future. - deprecated: true + summary: Check if any project are busy. Returns the number of busy projects and can list them if requested. operationId: anyProjectBusy parameters: - $ref: "#/components/parameters/defaultHeader" @@ -1659,7 +1649,8 @@ components: in: query schema: type: string - description: Project id to query + required: true + description: project id to query email: name: email in: query diff --git a/dds_web/version.py b/dds_web/version.py index 8ce0e986f..c0b18f39f 100644 --- a/dds_web/version.py +++ b/dds_web/version.py @@ -1,3 +1,3 @@ # Do not do major version upgrade during 2024. # If mid or minor version reaches 9, continue with 10, 11 etc etc. -__version__ = "2.7.0" +__version__ = "2.7.1" diff --git a/requirements.txt b/requirements.txt index 6b64c20aa..9ed7512e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ aniso8601==9.0.1 APScheduler==3.9.1 argon2-cffi==21.3.0 argon2-cffi-bindings==21.2.0 -Authlib==0.15.5 +Authlib==1.3.1 blinker==1.4 boto3==1.20.47 botocore==1.23.47 @@ -45,7 +45,7 @@ marshmallow-sqlalchemy==0.27.0 packaging==21.3 Pillow==10.2.0 # required by qrcode pycparser==2.21 -PyMySQL==1.0.2 +PyMySQL==1.1.1 PyNaCl==1.5.0 pyparsing==3.0.7 python-dateutil==2.8.2 diff --git a/tests/test_version.py b/tests/test_version.py index c98810944..92c2234cb 100644 --- a/tests/test_version.py +++ b/tests/test_version.py @@ -2,4 +2,4 @@ def test_version(): - assert version.__version__ == "2.7.0" + assert version.__version__ == "2.7.1" diff --git a/tests/tests_v3/api/__init__.py b/tests/tests_v3/api/__init__.py new file mode 100644 index 000000000..d3f5a12fa --- /dev/null +++ b/tests/tests_v3/api/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/tests_v3/api/test_project.py b/tests/tests_v3/api/test_project.py new file mode 100644 index 000000000..b5cf1632b --- /dev/null +++ b/tests/tests_v3/api/test_project.py @@ -0,0 +1,1823 @@ +# IMPORTS ################################################################################ IMPORTS # + +# Standard library +import http +from sqlite3 import OperationalError +import pytest +from _pytest.logging import LogCaptureFixture +import logging +import datetime +import time +import unittest.mock + +# Installed +import boto3 +import flask_mail +import werkzeug +import sqlalchemy + +# Own +import dds_web +from dds_web import mail, db +from dds_web.errors import BucketNotFoundError, DatabaseError, DeletionError +import tests.tests_v3 as tests +from tests.test_files_new import project_row, file_in_db, FIRST_NEW_FILE +from tests.test_project_creation import proj_data_with_existing_users, create_unit_admins +from dds_web.database import models +from dds_web.api.project import UserProjects + +# CONFIG ################################################################################## CONFIG # + +proj_data = { + "pi": "researchuser@mailtrap.io", + "title": "Test proj", + "description": "A longer project description", + "users_to_add": [{"email": "researchuser2@mailtrap.io", "role": "Project Owner"}], +} +fields_set_to_null = [ + "title", + "date_created", + "description", + "pi", + "public_key", + # "unit_id", + # "created_by", + # "is_active", + # "date_updated", +] + +release_project = {"new_status": "Available"} +release_project_small_deadline = {**release_project, "deadline": 5} +release_project_big_deadline = {**release_project, "deadline": 80} + +extend_deadline_data_no_confirmed = { + "new_deadline_in": 20, +} + +extend_deadline_data = {**extend_deadline_data_no_confirmed, "confirmed": True} + + +# HELPER FUNCTIONS ################################################################################## CONFIG # + + +def create_and_release_project(client, proj_data, release_data): + """Helper function that creates a project and set it ups as available""" + + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + response = client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + # Release project + response = client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(client), + query_string={"project": project_id}, + json=release_data, + ) + assert response.status_code == http.HTTPStatus.OK + + return project_id, project + + +@pytest.fixture(scope="module") +def test_project(module_client): + """Create a shared test project""" + with unittest.mock.patch.object(boto3.session.Session, "resource") as mock_session: + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + project_id = response.json.get("project_id") + # add a file + response = module_client.post( + tests.DDSEndpoint.FILE_NEW, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=FIRST_NEW_FILE, + ) + + return project_id + + +def mock_sqlalchemyerror(_=None): + raise sqlalchemy.exc.SQLAlchemyError() + + +# ProjectStatus + +# get + + +def test_projectstatus_get_status_without_args(module_client, boto3_session): + """Submit status request with invalid arguments""" + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + # Create project + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + + # Test getting project status without args - should fail + response = module_client.get( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + json={}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Missing required information: 'project'" in response.json["message"] + + # Test getting project status without args version 2 - should fail + response = module_client.get( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + json={}, + query_string={}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Missing required information: 'project'" in response.json["message"] + + +def test_projectstatus_get_status_with_empty_args(module_client, boto3_session): + """Submit status request with invalid arguments""" + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + # Create project + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + + # Test getting project status without args - should fail + response = module_client.get( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + json={}, + query_string={"test": "test"}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Missing required information: 'project'" in response.json["message"] + + +def test_projectstatus_get_status_with_invalid_project(module_client, boto3_session): + """Submit status request with invalid arguments""" + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + # Create project + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + + # Test getting project status without args - should fail + response = module_client.get( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + json={}, + query_string={"project": "nonexistentproject"}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "The specified project does not exist." in response.json["message"] + + +def test_projectstatus_get_status_with_non_accessible_project(module_client, boto3_session): + """Submit status request with invalid arguments""" + # Get project for unit 2 + project = models.Project.query.filter_by(unit_id=2).first() + assert project + + # Test getting project status without args - should fail + response = module_client.get( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + json={}, + query_string={"project": project.public_id}, + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + assert "Project access denied." in response.json["message"] + + +# set_busy + + +def test_set_busy_true(module_client): + """Test set busy to true.""" + from dds_web.api import project + + # Get project + project_obj = models.Project.query.first() + assert project_obj + + # Set as not busy + project_obj.busy = False + db.session.commit() + + # Run function + project.ProjectStatus.set_busy(project=project_obj, busy=True) + assert project_obj.busy + + +def test_set_busy_false(module_client): + """Test set busy to false.""" + from dds_web.api import project + + # Get project + project_obj = models.Project.query.first() + assert project_obj + + # Set as not busy + project_obj.busy = True + db.session.commit() + + # Run function + project.ProjectStatus.set_busy(project=project_obj, busy=False) + assert not project_obj.busy + + +# post + + +def test_projectstatus_when_busy(module_client): + """Status change should not be possible when project is busy.""" + # Get user + username = "unitadmin" + user = models.User.query.filter_by(username=username).one_or_none() + assert user + + # Get project and set to busy + project = user.projects[0] + project.busy = True + db.session.commit() + assert project.busy + + # Attempt to change status + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS[username]).token(module_client), + query_string={"project": project.public_id}, + json={"something": "something"}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert ( + f"The status for the project '{project.public_id}' is already in the process of being changed." + in response.json.get("message") + ) + + +def test_projectstatus_when_not_busy_but_invalid(module_client): + """Status change which results in an exception should also reset busy to False.""" + # Get user + username = "unitadmin" + user = models.User.query.filter_by(username=username).one_or_none() + assert user + + # Get project and set as not busy + project = user.projects[0] + project.busy = False + db.session.commit() + assert not project.busy + + # Attempt to change status + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS[username]).token(module_client), + query_string={"project": project.public_id}, + json={"new_status": ""}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "No status transition provided. Specify the new status." in response.json.get("message") + assert not project.busy + + +def test_projectstatus_submit_request_with_invalid_args(module_client, boto3_session): + """Submit status request with invalid arguments""" + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + # Create project + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={}, + ) + + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Required data missing" in response.json["message"] + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"new_status": "Invalid"}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Invalid status" in response.json["message"] + + response: werkzeug.test.WrapperTestResponse = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"test": "test"}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "No status transition provided. Specify the new status." in response.json["message"] + + +def test_projectstatus_post_operationalerror(module_client, boto3_session): + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + new_status = {"new_status": "Deleted"} + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + token = tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client) + with unittest.mock.patch("dds_web.db.session.commit", mock_sqlalchemyerror): + # Run command + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=token, + query_string={"project": project_id}, + json=new_status, + ) + assert response.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR + + +def test_projectstatus_set_project_to_deleted_from_in_progress(module_client, boto3_session): + """Create project and set status to deleted""" + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + new_status = {"new_status": "Deleted"} + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + # add a file + response = module_client.post( + tests.DDSEndpoint.FILE_NEW, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=FIRST_NEW_FILE, + ) + + assert file_in_db(test_dict=FIRST_NEW_FILE, project=project.id) + + for field, value in vars(project).items(): + if field in fields_set_to_null: + assert value + assert project.project_user_keys + + response = module_client.get( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + ) + + assert response.status_code == http.HTTPStatus.OK + assert response.json["current_status"] == project.current_status + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Deleted" + for field, value in vars(project).items(): + if field in fields_set_to_null: + assert not value + assert not project.project_user_keys + + +def test_projectstatus_archived_project(module_client, boto3_session): + """Create a project and archive it""" + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + + project_id = response.json.get("project_id") + # add a file + response = module_client.post( + tests.DDSEndpoint.FILE_NEW, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=FIRST_NEW_FILE, + ) + + project = project_row(project_id=project_id) + + assert file_in_db(test_dict=FIRST_NEW_FILE, project=project.id) + + new_status = {"new_status": "Archived"} + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Archived" + + assert not max(project.project_statuses, key=lambda x: x.date_created).is_aborted + assert not file_in_db(test_dict=FIRST_NEW_FILE, project=project.id) + assert not project.project_user_keys + + for field, value in vars(project).items(): + if field in fields_set_to_null: + assert value + assert project.researchusers + + +def test_projectstatus_aborted_project(module_client, boto3_session): + """Create a project and try to abort it""" + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + + project_id = response.json.get("project_id") + # add a file + response = module_client.post( + tests.DDSEndpoint.FILE_NEW, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=FIRST_NEW_FILE, + ) + + project = project_row(project_id=project_id) + + assert file_in_db(test_dict=FIRST_NEW_FILE, project=project.id) + + for field, value in vars(project).items(): + if field in fields_set_to_null: + assert value + assert len(project.researchusers) > 0 + assert project.project_user_keys + + time.sleep(1) + new_status = {"new_status": "Archived", "is_aborted": True} + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Archived" + assert max(project.project_statuses, key=lambda x: x.date_created).is_aborted + assert not file_in_db(test_dict=FIRST_NEW_FILE, project=project.id) + assert not project.project_user_keys + + for field, value in vars(project).items(): + if field in fields_set_to_null: + assert not value + assert len(project.researchusers) == 0 + + +def test_projectstatus_abort_from_in_progress_once_made_available(module_client, boto3_session): + """Create project and abort it from In Progress after it has been made available""" + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + + project_id = response.json.get("project_id") + + # add a file + response = module_client.post( + tests.DDSEndpoint.FILE_NEW, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=FIRST_NEW_FILE, + ) + + project = project_row(project_id=project_id) + + assert file_in_db(test_dict=FIRST_NEW_FILE, project=project.id) + + new_status = {"new_status": "Available"} + time.sleep(1) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Available" + + new_status["new_status"] = "In Progress" + time.sleep(1) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "In Progress" + assert project.project_user_keys + + response = module_client.get( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id, "history": True}, + ) + + assert response.status_code == http.HTTPStatus.OK + assert response.json["current_status"] == project.current_status + assert response.json["current_deadline"] + assert response.json["history"] + + time.sleep(1) + new_status["new_status"] = "Archived" + new_status["is_aborted"] = True + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Archived" + assert max(project.project_statuses, key=lambda x: x.date_created).is_aborted + assert not file_in_db(test_dict=FIRST_NEW_FILE, project=project.id) + + for field, value in vars(project).items(): + if field in fields_set_to_null: + assert not value + assert len(project.researchusers) == 0 + assert not project.project_user_keys + + +def test_projectstatus_check_invalid_transitions_from_in_progress(module_client, boto3_session): + """Check all invalid transitions from In Progress""" + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + # In Progress to Expired + new_status = {"new_status": "Expired"} + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert project.current_status == "In Progress" + assert ( + "You cannot expire a project that has the current status 'In Progress'." + in response.json["message"] + ) + + # In Progress to Archived + new_status["new_status"] = "Archived" + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Archived" + + +def test_projectstatus_set_project_to_available_valid_transition(module_client, test_project): + """Set status to Available for test project""" + + new_status = {"new_status": "Available", "deadline": 10} + + project_id = test_project + project = project_row(project_id=project_id) + time.sleep(1) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Available" + + db_deadline = max(project.project_statuses, key=lambda x: x.date_created).deadline + calc_deadline = datetime.datetime.utcnow().replace( + hour=23, minute=59, second=59, microsecond=0 + ) + datetime.timedelta(days=new_status["deadline"]) + + assert db_deadline == calc_deadline + + +def test_projectstatus_set_project_to_available_no_mail(module_client, boto3_session): + """Set status to Available for test project, but skip sending mails""" + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + token = tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=token, + json=proj_data_with_existing_users, + ) + assert response.status_code == http.HTTPStatus.OK + assert response.json and response.json.get("user_addition_statuses") + for x in response.json.get("user_addition_statuses"): + assert "given access to the Project" in x + + public_project_id = response.json.get("project_id") + + with unittest.mock.patch.object(flask_mail.Mail, "send") as mock_mail_send: + with unittest.mock.patch.object( + dds_web.api.user.AddUser, "compose_and_send_email_to_user" + ) as mock_mail_func: + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=token, + query_string={"project": public_project_id}, + json={"new_status": "Available", "deadline": 10, "send_email": False}, + ) + # assert that no mail is being sent. + assert mock_mail_func.called == False + assert mock_mail_send.call_count == 0 + + assert response.status_code == http.HTTPStatus.OK + assert "An e-mail notification has not been sent." in response.json["message"] + + +def test_projectstatus_set_project_to_deleted_from_available(module_client, test_project): + """Try to set status to Deleted for test project in Available""" + + new_status = {"new_status": "Deleted"} + + project_id = test_project + project = project_row(project_id=project_id) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert project.current_status == "Available" + + +def test_projectstatus_check_deadline_remains_same_when_made_available_again_after_going_to_in_progress( + module_client, test_project +): + """Check deadline remains same when an available project goes to In Progress and is made available again""" + project_id = test_project + project = project_row(project_id=project_id) + assert project.current_status == "Available" + deadline_initial = project.current_deadline + + time.sleep(1) + new_status = {"new_status": "In Progress"} + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "In Progress" + time.sleep(1) + + # Try to delete the project + new_status = {"new_status": "Deleted"} + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert ( + "You cannot delete a project that has been made available previously" + in response.json["message"] + ) + assert project.current_status == "In Progress" + + new_status = {"new_status": "Available"} + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Available" + assert project.current_deadline == deadline_initial + + +def test_projectstatus_set_project_to_expired_from_available(module_client, test_project): + """Set status to Expired for test project""" + + new_status = {"new_status": "Expired", "deadline": 5} + + project_id = test_project + project = project_row(project_id=project_id) + time.sleep(1) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Expired" + + db_deadline = max(project.project_statuses, key=lambda x: x.date_created).deadline + calc_deadline = datetime.datetime.utcnow().replace( + hour=23, minute=59, second=59, microsecond=0 + ) + datetime.timedelta(days=new_status["deadline"]) + + assert db_deadline == calc_deadline + + +def test_projectstatus_project_availability_after_set_to_expired_more_than_twice( + module_client, test_project +): + """Try to set status to Available for test project after being in Expired 3 times""" + + new_status = {"new_status": "Available", "deadline": 5} + + project_id = test_project + project = project_row(project_id=project_id) + time.sleep(1) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Available" + + new_status["new_status"] = "Expired" + time.sleep(1) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Expired" + + new_status["new_status"] = "Available" + time.sleep(1) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Available" + + new_status["new_status"] = "Expired" + time.sleep(1) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Expired" + + new_status["new_status"] = "Available" + time.sleep(1) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert project.current_status == "Expired" + + assert "Project cannot be made Available any more times" in response.json["message"] + + +def test_projectstatus_invalid_transitions_from_expired(module_client, test_project): + """Check all invalid transitions from Expired""" + + # Expired to In progress + new_status = {"new_status": "In Progress"} + project_id = test_project + project = project_row(project_id=project_id) + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert project.current_status == "Expired" + assert ( + "You cannot retract a project that has the current status 'Expired'" + in response.json["message"] + ) + + # Expired to Deleted + new_status["new_status"] = "Deleted" + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert project.current_status == "Expired" + assert ( + "You cannot delete a project that has the current status 'Expired'" + in response.json["message"] + ) + + +def test_projectstatus_set_project_to_archived(module_client, test_project, boto3_session): + """Archive an expired project""" + + new_status = {"new_status": "Archived"} + project_id = test_project + project = project_row(project_id=project_id) + + assert file_in_db(test_dict=FIRST_NEW_FILE, project=project.id) + assert project.project_user_keys + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + + assert response.status_code == http.HTTPStatus.OK + assert project.current_status == "Archived" + assert not max(project.project_statuses, key=lambda x: x.date_created).is_aborted + assert not file_in_db(test_dict=FIRST_NEW_FILE, project=project.id) + assert not project.project_user_keys + + +def test_projectstatus_invalid_transitions_from_archived(module_client, test_project): + """Check all invalid transitions from Archived""" + + # Archived to In progress + project_id = test_project + project = project_row(project_id=project_id) + + new_status = {"new_status": "In Progress"} + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert project.current_status == "Archived" + assert "Cannot change status for a project" in response.json["message"] + + # Archived to Deleted + new_status["new_status"] = "Deleted" + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert project.current_status == "Archived" + assert "Cannot change status for a project" in response.json["message"] + + # Archived to Available + new_status["new_status"] = "Available" + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert project.current_status == "Archived" + assert "Cannot change status for a project" in response.json["message"] + + # Archived to Expired + new_status["new_status"] = "Expired" + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=new_status, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert project.current_status == "Archived" + assert "Cannot change status for a project" in response.json["message"] + + +def test_projectstatus_post_invalid_deadline_release(module_client, boto3_session): + """Attempt to set an invalid deadline.""" + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + # Release project - should fail due to invalid deadline + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"new_status": "Available", "deadline": 100}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "The deadline needs to be less than (or equal to) 90 days." in response.json["message"] + + +def test_projectstatus_post_invalid_deadline_expire(module_client, boto3_session): + # Create unit admins to allow project creation + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + # Release project + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"new_status": "Available"}, + ) + assert response.status_code == http.HTTPStatus.OK + + # Expire project - should fail due to invalid deadline + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"new_status": "Expired", "deadline": 40}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "The deadline needs to be less than (or equal to) 30 days." in response.json["message"] + + +def test_extend_deadline_bad_confirmed(module_client, boto3_session): + """Try to extend a deadline and send a not boolean for confirmation""" + + # create project and release it + project_id, project = create_and_release_project( + client=module_client, proj_data=proj_data, release_data=release_project + ) + assert project.times_expired == 0 + time.sleep(1) # tests are too fast + + # try to extend deadline with a string as confirmed - should fail + response = module_client.patch( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={**extend_deadline_data, "confirmed": "true"}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "`confirmed` is a boolean value: True or False." in response.json["message"] + + +def test_extend_deadline_no_confirmed(module_client, boto3_session): + """Try to extend a deadline before confirmation - should sent a warning and no operation is perfrom""" + + # create project and release it + project_id, project = create_and_release_project( + client=module_client, proj_data=proj_data, release_data=release_project + ) + assert project.times_expired == 0 + time.sleep(1) # tests are too fast + + # try to extend deadline + response = module_client.patch( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=extend_deadline_data_no_confirmed, + ) + # status code is ok but no operation perform + assert response.status_code == http.HTTPStatus.OK + assert project.times_expired == 0 + + assert "Operation must be confirmed before proceding." in response.json["warning"] + assert all( + item in response.json + for item in ["project_info", "project_status", "warning", "default_unit_days"] + ) + + +def test_extend_deadline_when_busy(module_client, boto3_session): + """Request should not be possible when project is busy.""" + + # create project and release it + project_id, project = create_and_release_project( + client=module_client, proj_data=proj_data, release_data=release_project + ) + assert project.times_expired == 0 + time.sleep(1) # tests are too fast + + # set to busy + project.busy = True + db.session.commit() + assert project.busy + + # attempt to extend deadline + response = module_client.patch( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=extend_deadline_data, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + + assert ( + f"The deadline for the project '{project_id}' is already in the process of being changed. " + in response.json["message"] + ) + assert ( + "Please try again later. \n\nIf you know that the project is not busy, contact support." + in response.json["message"] + ) + + +def test_extend_deadline_no_deadline(module_client, boto3_session): + """If no deadline has been provided it should not be executed anything""" + + # create project and release it + project_id, project = create_and_release_project( + client=module_client, proj_data=proj_data, release_data=release_project + ) + assert project.times_expired == 0 + time.sleep(1) # tests are too fast + + # try to extend deadline - no new deadline provided + response = module_client.patch( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"confirmed": True}, + ) + assert response.status_code == http.HTTPStatus.OK + assert project.times_expired == 0 + assert "Nothing to update." in response.json["message"] + + +def test_extend_deadline_project_not_available(module_client, boto3_session): + """Is not possible to extend deadline to a project in another status than available.""" + + # create a new project and never release it + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + project_id = response.json.get("project_id") + + # attempt to extend deadline - project is in progress + response = module_client.patch( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=extend_deadline_data, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + + assert ( + "You can only extend the deadline for a project that has the status 'Available'." + in response.json["message"] + ) + + +def test_extend_deadline_too_much_days(module_client, boto3_session): + """If the new deadline together with the time left already is more than 90 days it should not work""" + + # create project and release it with big dealdine + project_id, project = create_and_release_project( + client=module_client, proj_data=proj_data, release_data=release_project_big_deadline + ) + assert project.times_expired == 0 + time.sleep(1) # tests are too fast + + # try to extend deadline -> 80 + 11 > 90 + extend_deadline_data_big_deadline = {**extend_deadline_data, "new_deadline_in": 11} + response = module_client.patch( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=extend_deadline_data_big_deadline, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert ( + "The new deadline needs to be less than (or equal to) 90 days." in response.json["message"] + ) + + +def test_extend_deadline_bad_new_deadline(module_client, boto3_session): + """If the new deadlien provided is not an integer it should fail""" + + # create project and release it + project_id, project = create_and_release_project( + client=module_client, proj_data=proj_data, release_data=release_project + ) + assert project.times_expired == 0 + time.sleep(1) # tests are too fast + + # try to extend deadline with a bad new deadline + extend_deadline_data_bad_deadline = {**extend_deadline_data, "new_deadline_in": "20"} + response = module_client.patch( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=extend_deadline_data_bad_deadline, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert ( + "The deadline attribute passed should be of type Int (i.e a number)." + in response.json["message"] + ) + + +def test_extend_deadline_more_than_default(module_client, boto3_session): + """If the new deadline provided is more than the default unit days to release a project it should fail""" + + # create project and release it + project_id, project = create_and_release_project( + client=module_client, proj_data=proj_data, release_data=release_project_small_deadline + ) + assert project.times_expired == 0 + time.sleep(1) # tests are too fast + + default_unit_days = project.responsible_unit.days_in_available + + # try to extend deadline with a bigger deadline that it is suppose to have + extend_deadline_data_bad_deadline = { + **extend_deadline_data, + "new_deadline_in": default_unit_days + 1, + } + response = module_client.patch( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=extend_deadline_data_bad_deadline, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert ( + "The number of days has to be lower than the default deadline extension number" + in response.json["message"] + ) + + +def test_extend_deadline_maxium_number_available_exceded(module_client, boto3_session): + """If the deadline has been extended more than 2 times it should not work""" + + # create project and release it + project_id, project = create_and_release_project( + client=module_client, proj_data=proj_data, release_data=release_project_small_deadline + ) + assert project.times_expired == 0 + deadline = project.current_deadline # current deadline + new_deadline_in = 1 # small new deadline + + for i in range(1, 4): + time.sleep(1) # tests are too fast + + # extend deadline by a small new deadline so we can do it several times + extend_deadline_data_small_deadline = { + **extend_deadline_data, + "new_deadline_in": new_deadline_in, + } + response = module_client.patch( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=extend_deadline_data_small_deadline, + ) + if i < 3: + assert response.status_code == http.HTTPStatus.OK + assert project.times_expired == i + assert project.current_deadline == deadline + datetime.timedelta(days=new_deadline_in) + deadline = project.current_deadline # update current deadline + assert project.current_status == "Available" + assert ( + f"The project '{project_id}' has been given a new deadline" + in response.json["message"] + ) + assert "An e-mail notification has not been sent." in response.json["message"] + else: + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert ( + "Project availability limit: The maximum number of changes in data availability has been reached." + in response.json["message"] + ) + + +def test_extend_deadline_ok(module_client, boto3_session): + """Extend a project deadline of a project - it should work ok""" + + # create project and release it + project_id, project = create_and_release_project( + client=module_client, proj_data=proj_data, release_data=release_project_small_deadline + ) + assert project.times_expired == 0 + time.sleep(1) # tests are too fast + + deadline = project.current_deadline # save current deadline + + # extend deadline + response = module_client.patch( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json=extend_deadline_data, + ) + assert response.status_code == http.HTTPStatus.OK + assert project.times_expired == 1 + assert project.current_deadline == deadline + datetime.timedelta( + days=extend_deadline_data.get("new_deadline_in") + ) + assert project.current_status == "Available" + + assert f"The project '{project_id}' has been given a new deadline" in response.json["message"] + assert "An e-mail notification has not been sent." in response.json["message"] + + +def test_extend_deadline_mock_database_error( + module_client, boto3_session, capfd: LogCaptureFixture +): + """Operation fails when trying to save in the Database""" + + project_id, project = create_and_release_project( + client=module_client, proj_data=proj_data, release_data=release_project_small_deadline + ) + assert project.times_expired == 0 + time.sleep(1) # tests are too fast + + token = tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client) + + with unittest.mock.patch.object(db.session, "rollback") as rollback: + with unittest.mock.patch("dds_web.db.session.commit") as mock_commit: + # we need this because the first time the commit function is called is when set_busy() + def side_effect_generator(): + yield None # First call, no exception + while True: + yield sqlalchemy.exc.SQLAlchemyError() # Subsequent calls, exception + + mock_commit.side_effect = side_effect_generator() + + # extend deadline + response = module_client.patch( + tests.DDSEndpoint.PROJECT_STATUS, + headers=token, + query_string={"project": project_id}, + json=extend_deadline_data, + ) + assert response.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR + assert "Saving database changes failed." in response.json["message"] + + assert rollback.called + _, err = capfd.readouterr() + assert "Failed to extend deadline" in err + + +def test_projectstatus_post_deletion_and_archivation_errors(module_client, boto3_session): + """Mock the different expections that can occur when deleting project.""" + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + def mock_typeerror(): + raise TypeError + + def mock_databaseerror(): + raise DatabaseError + + def mock_deletionerror(): + raise DeletionError() + + def mock_bucketnotfounderror(): + raise BucketNotFoundError() + + for func in [mock_typeerror, mock_databaseerror, mock_deletionerror, mock_bucketnotfounderror]: + with unittest.mock.patch("dds_web.api.project.ProjectStatus.delete_project_info", func): + # Release project + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"new_status": "Deleted"}, + ) + assert response.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR + assert "Server Error: Status was not updated" in response.json["message"] + + +def test_projectstatus_post_archiving_without_aborting(module_client, boto3_session): + """Try to archive a project thas has been available.""" + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + # Release project + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"new_status": "Available"}, + ) + assert response.status_code == http.HTTPStatus.OK + + time.sleep(1) # tests are too fast + + # Retract project + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"new_status": "In Progress"}, + ) + assert response.status_code == http.HTTPStatus.OK + + # Retract project + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"new_status": "Archived"}, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert ( + "You cannot archive a project that has been made available previously" + in response.json["message"] + ) + + +def test_projectstatus_post_deletion_and_archivation_errors(module_client, boto3_session): + """Mock the different expections that can occur when deleting project.""" + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"new_status": "Available"}, + ) + assert response.status_code == http.HTTPStatus.OK + + def mock_typeerror(): + raise TypeError + + def mock_databaseerror(): + raise DatabaseError + + def mock_deletionerror(): + raise DeletionError() + + def mock_bucketnotfounderror(): + raise BucketNotFoundError() + + for func in [mock_typeerror, mock_databaseerror, mock_deletionerror, mock_bucketnotfounderror]: + with unittest.mock.patch("dds_web.api.project.ProjectStatus.delete_project_info", func): + # Release project + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client), + query_string={"project": project_id}, + json={"new_status": "Archived", "is_aborted": True}, + ) + assert response.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR + assert "Server Error: Status was not updated" in response.json["message"] + + +# GetPublic + + +def test_getpublic_publickey_is_none(module_client, boto3_session): + """Try to get public key from project that does not have a project public key.""" + # Ensure enough unit admins + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + # Create project + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + # Remove public key + project.public_key = None + db.session.commit() + + # Get public key - does not exist so it fails + response = module_client.get( + tests.DDSEndpoint.PROJ_PUBLIC, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + query_string={"project": project_id}, + ) + assert response.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR + assert "Unrecoverable key error. Aborting." in response.json["message"] + + +def test_getpublic_publickey(module_client, boto3_session): + """Get project public key.""" + # Ensure enough unit admins + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + if current_unit_admins < 3: + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + # Create project + response = module_client.post( + tests.DDSEndpoint.PROJECT_CREATE, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + json=proj_data, + ) + assert response.status_code == http.HTTPStatus.OK + project_id = response.json.get("project_id") + project = project_row(project_id=project_id) + + # Get public key + response = module_client.get( + tests.DDSEndpoint.PROJ_PUBLIC, + headers=tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client), + query_string={"project": project_id}, + ) + assert response.status_code == http.HTTPStatus.OK + response_json = response.json + + # Verify correct + public_key = response_json.get("public") + assert public_key and public_key == project.public_key.hex().upper() + + +def test_proj_public_no_project(module_client): + """Attempting to get public key without a project should not work""" + token = tests.UserAuth(tests.USER_CREDENTIALS["researchuser"]).token(module_client) + response = module_client.get( + tests.DDSEndpoint.PROJ_PUBLIC, + headers=token, + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + response_json = response.json + assert "Missing required information: 'project'" in response_json.get("message") + + +def test_project_public_researcher_get(module_client): + """User should get access to public key""" + + token = tests.UserAuth(tests.USER_CREDENTIALS["researchuser"]).token(module_client) + response = module_client.get( + tests.DDSEndpoint.PROJ_PUBLIC, query_string={"project": "public_project_id"}, headers=token + ) + assert response.status_code == http.HTTPStatus.OK + response_json = response.json + assert response_json.get("public") + + +def test_project_public_facility_put(module_client): + """User should get access to public key""" + + token = tests.UserAuth(tests.USER_CREDENTIALS["unitadmin"]).token(module_client) + response = module_client.get( + tests.DDSEndpoint.PROJ_PUBLIC, query_string={"project": "public_project_id"}, headers=token + ) + assert response.status_code == http.HTTPStatus.OK + response_json = response.json + assert response_json.get("public") + + +# ProjectBusy + + +def test_set_busy_no_token(module_client): + """Token required to set project busy/not busy.""" + response = module_client.put(tests.DDSEndpoint.PROJECT_BUSY, headers=tests.DEFAULT_HEADER) + assert response.status_code == http.HTTPStatus.UNAUTHORIZED + assert response.json.get("message") + assert "No token" in response.json.get("message") + + +# This test should be removed due to removing the put method. -> Class ProjectBusy in dds_web/api/project.py +def test_set_busy_invalid_version(module_client): + """ProjectBusy endpoint is empty and should only return error message about invalid version.""" + # Error messages + major_version_error: str = "You're using an old CLI version, please upgrade to the latest one." + busy_error: str = ( + "Your CLI version is trying to use functionality which is no longer in use. Upgrade your version to the latest one and run your command again." + ) + + for username in ["superadmin", "researchuser", "projectowner", "unituser", "unitadmin"]: + # Get user + user = models.User.query.filter_by(username=username).one_or_none() + assert user + + # Get project + project = user.projects[0] + assert project + + # Authenticate and run + token = tests.UserAuth(tests.USER_CREDENTIALS[username]).token(module_client) + for version, error_message in { + token["X-CLI-Version"]: busy_error, + # Experimental v3 doesnt verify the major version + # "1.9.9": major_version_error, + "2.1.9": busy_error, + }.items(): + token["X-CLI-Version"] = version + response = module_client.put( + tests.DDSEndpoint.PROJECT_BUSY, + headers=token, + query_string={"project": project.public_id}, + json={"something": "notabool"}, + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + assert error_message in response.json.get("message") + + +# Project usage + + +def test_project_usage(module_client): + """Test if correct cost value is returned.""" + + cost_gbhour = 0.09 / (30 * 24) + + # Get user and project + user = models.User.query.filter_by(username="unitadmin").one_or_none() + assert user + project_0 = user.projects[0] + assert project_0 + + # Call project_usage() for the project and check if cost is calculated correctly + proj_bhours, proj_cost = UserProjects.project_usage(project=project_0) + assert (proj_bhours / 1e9) * cost_gbhour == proj_cost + + +def test_email_project_release(module_client, boto3_session): + """Test that check that the email sent to the researchers when project is released is correct""" + public_project_id = "public_project_id" + + create_unit_admins(num_admins=2) + current_unit_admins = models.UnitUser.query.filter_by(unit_id=1, is_admin=True).count() + assert current_unit_admins >= 3 + + # user to perfrom the operation + token = tests.UserAuth(tests.USER_CREDENTIALS["unituser"]).token(module_client) + # project to be released + project = models.Project.query.filter_by(public_id=public_project_id).first() + # num of researchers that will receive email + num_users = models.ProjectUsers.query.filter_by(project_id=project.id).count() + + # Release project and check email + with mail.record_messages() as outbox: + response = module_client.post( + tests.DDSEndpoint.PROJECT_STATUS, + headers=token, + query_string={"project": public_project_id}, + json={"new_status": "Available", "deadline": 10, "send_email": True}, + ) + assert len(outbox) == num_users # nÂș of Emails informing researchers + assert "Project made available by" in outbox[-1].subject + + body = outbox[-1].body # plain text + html = outbox[-1].html + + project_title = project.title + + ## check plain text message + assert f"- Project Title: {project_title}" in body + assert f"- DDS project ID: {public_project_id}" in body + assert f"dds ls -p {public_project_id}" in body + assert f"dds data get -p {public_project_id} -a --verify-checksum" in body + assert "If you experience issues, please contact the SciLifeLab unit" in body + assert ( + "What is the DDS? The DDS is a system for SciLifeLab infrastructures to deliver data to researchers in a fast, secure and simple way" + in body + ) + + ## check html + assert f"
  • Project Title: {project_title}
  • " in html + assert f"
  • DDS project ID: {public_project_id}
  • " in html + assert f"dds ls -p {public_project_id}" in html + assert f"dds data get -p {public_project_id} -a --verify-checksum" in html + assert "If you experience issues, please contact the SciLifeLab unit" in html + assert ( + "What is the DDS? The DDS is a system for SciLifeLab infrastructures to deliver data to researchers in a fast, secure and simple way." + in html + ) + + assert response.status_code == http.HTTPStatus.OK diff --git a/tests/tests_v3/api/test_superadmin_only.py b/tests/tests_v3/api/test_superadmin_only.py new file mode 100644 index 000000000..b70658819 --- /dev/null +++ b/tests/tests_v3/api/test_superadmin_only.py @@ -0,0 +1,1007 @@ +#################################################################################################### +# IMPORTS ################################################################################ IMPORTS # +#################################################################################################### + +# Standard library +import http +import time +import typing +import unittest +from datetime import datetime, timedelta +from unittest import mock +from unittest.mock import patch +from unittest.mock import PropertyMock + +# Installed +import flask +import werkzeug +import flask_mail +import freezegun +import click + +# Own +from dds_web import db, mail +from dds_web.database import models +import tests.tests_v3 as tests +from dds_web.commands import collect_stats + +#################################################################################################### +# CONFIG ################################################################################## CONFIG # +#################################################################################################### + +users: typing.Dict = { + "Researcher": "researchuser", + "Unit Personnel": "unituser", + "Unit Admin": "unitadmin", + "Super Admin": "superadmin", +} + +#################################################################################################### +# TESTS #################################################################################### TESTS # +#################################################################################################### + + +# Tools ############################################################################################ +def get_token(username: str, client: flask.testing.FlaskClient) -> typing.Dict: + return tests.UserAuth(tests.USER_CREDENTIALS[username]).token(client) + + +# AllUnits ######################################################################################### + + +def test_list_units_as_not_superadmin(client: flask.testing.FlaskClient) -> None: + """Only Super Admin can list users.""" + no_access_users: typing.Dict = users.copy() + no_access_users.pop("Super Admin") + + for u in no_access_users: + token: typing.Dict = get_token(username=users[u], client=client) + response: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.LIST_UNITS_ALL, headers=token + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + +def test_list_units_as_super_admin(client: flask.testing.FlaskClient) -> None: + """List units as Super Admin.""" + all_units: typing.List = models.Unit.query.all() + + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + response: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.LIST_UNITS_ALL, headers=token + ) + assert response.status_code == http.HTTPStatus.OK + + keys: typing.List = response.json.get("keys") + units: typing.List = response.json.get("units") + assert keys and units + + assert keys == [ + "Name", + "Public ID", + "External Display Name", + "Days In Available", + "Days In Expired", + "Safespring Endpoint", + "Contact Email", + "Size", + ] + assert len(all_units) == len(units) + + for unit in all_units: + expected: typing.Dict = { + "Name": unit.name, + "Public ID": unit.public_id, + "External Display Name": unit.external_display_name, + "Contact Email": unit.contact_email, + "Safespring Endpoint": unit.sto2_endpoint, + "Days In Available": unit.days_in_available, + "Days In Expired": unit.days_in_expired, + "Size": unit.size, + } + + correct_size: int = 0 + for project in unit.projects: + for file in project.files: + correct_size += file.size_stored + assert correct_size == unit.size + assert expected in units + + +# MOTD ############################################################################################# + + +def test_create_motd_not_superadmin(client: flask.testing.FlaskClient) -> None: + """Create a new message of the day, using everything but Super Admin access.""" + no_access_users: typing.Dict = users.copy() + no_access_users.pop("Super Admin") + + for u in no_access_users: + token: typing.Dict = get_token(username=users[u], client=client) + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD, headers=token + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + +def test_create_motd_as_superadmin_no_json(client: flask.testing.FlaskClient) -> None: + """Create a new message of the day, using a Super Admin account, but without any json.""" + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + response: werkzeug.test.WrapperTestResponse = client.post(tests.DDSEndpoint.MOTD, headers=token) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Required data missing from request!" in response.json.get("message") + + +def test_create_motd_as_superadmin_no_message(client: flask.testing.FlaskClient) -> None: + """Create a new message of the day, using a Super Admin account, but without any message.""" + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD, headers=token, json={"test": "test"} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "No MOTD specified." in response.json.get("message") + + +def test_create_motd_as_superadmin_empty_message(client: flask.testing.FlaskClient) -> None: + """Create a new message of the day, using a Super Admin account, but with empty message.""" + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD, headers=token, json={"message": ""} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "No MOTD specified." in response.json.get("message") + + +def test_create_motd_as_superadmin_success(client: flask.testing.FlaskClient) -> None: + """Create a new message of the day, using a Super Admin account.""" + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD, headers=token, json={"message": "test"} + ) + assert response.status_code == http.HTTPStatus.OK + assert "The MOTD was successfully added to the database." in response.json.get("message") + + assert models.MOTD.query.filter_by(message="test") + + +def test_get_motd_no_message(client: flask.testing.FlaskClient) -> None: + """Get latest MOTD from database.""" + response: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.MOTD, headers=tests.DEFAULT_HEADER + ) + assert response.status_code == http.HTTPStatus.OK + assert "There are no active MOTDs." in response.json.get("message") + + +def test_get_motd(client: flask.testing.FlaskClient) -> None: + """Get latest MOTD from database.""" + # Create first message + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD, headers=token, json={"message": "test"} + ) + assert response.status_code == http.HTTPStatus.OK + assert models.MOTD.query.filter_by(message="test") + + # Get first message + response1: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.MOTD, headers=tests.DEFAULT_HEADER + ) + assert response1.status_code == http.HTTPStatus.OK + assert isinstance(response1.json.get("motds"), list) + assert "test" in response1.json.get("motds")[0]["Message"] + + time.sleep(5) + + # Create new message + response2: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD, headers=token, json={"message": "something else"} + ) + assert response2.status_code == http.HTTPStatus.OK + assert models.MOTD.query.filter_by(message="something else") + + # Check that new message is displayed + response3: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.MOTD, headers=tests.DEFAULT_HEADER + ) + assert response3.status_code == http.HTTPStatus.OK + assert "something else" in response3.json.get("motds")[1]["Message"] + + # Deactivate message + response4: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.MOTD, headers=token, json={"motd_id": 1} + ) + assert response4.status_code == http.HTTPStatus.OK + assert "The MOTD was successfully deactivated in the database." in response4.json.get("message") + + # Deactivate message that is not active + response5: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.MOTD, headers=token, json={"motd_id": 1} + ) + assert response5.status_code == http.HTTPStatus.BAD_REQUEST + assert "MOTD with id 1 is not active." in response5.json.get("message") + + +def test_deactivate_motd_no_json(client: flask.testing.FlaskClient) -> None: + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + response: werkzeug.test.WrapperTestResponse = client.put(tests.DDSEndpoint.MOTD, headers=token) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Required data missing from request!" in response.json.get("message") + + +def test_deactivate_motd_no_motd_id(client: flask.testing.FlaskClient) -> None: + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.MOTD, headers=token, json={"test": "test"} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "No MOTD for deactivation specified." in response.json.get("message") + + +def test_deactivate_motd_no_such_motd(client: flask.testing.FlaskClient) -> None: + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.MOTD, headers=token, json={"motd_id": 8} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "MOTD with id 8 does not exist in the database" in response.json.get("message") + + +def test_deactivate_motd_not_superadmin(client: flask.testing.FlaskClient) -> None: + """Deactivate a message of the day, using everything but Super Admin access.""" + no_access_users: typing.Dict = users.copy() + no_access_users.pop("Super Admin") + + for u in no_access_users: + token: typing.Dict = get_token(username=users[u], client=client) + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.MOTD, headers=token + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + +# FindUser ######################################################################################### + + +def test_find_user_not_superadmin(client: flask.testing.FlaskClient) -> None: + """Try finding a specific user without being Super Admin.""" + no_access_users: typing.Dict = users.copy() + no_access_users.pop("Super Admin") + + for u in no_access_users: + token: typing.Dict = get_token(username=users[u], client=client) + response: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.USER_FIND, headers=token + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + +def test_find_user_no_username(client: flask.testing.FlaskClient) -> None: + """Find specific user with empty username.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Get user + for x in ["", None]: + response: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.USER_FIND, headers=token, query_string={"username": ""} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Username required to check existence of account." in response.json.get("message") + + +def test_find_user_non_existent(client: flask.testing.FlaskClient) -> None: + """Try to find non existent user.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Non existent user + username: str = "nonexistentuser" + assert not models.User.query.filter_by(username=username).first() + + # Get user + response: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.USER_FIND, headers=token, query_string={"username": username} + ) + assert response.status_code == http.HTTPStatus.OK + assert response.json and response.json.get("exists") is False + + +def test_find_user(client: flask.testing.FlaskClient) -> None: + """Find existing user.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Non existent user + user_row: models.User = models.User.query.first() + assert user_row + + # Get user + response: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.USER_FIND, headers=token, query_string={"username": user_row.username} + ) + assert response.status_code == http.HTTPStatus.OK + assert response.json and response.json.get("exists") is True + + +# ResetTwoFactor ################################################################################### + + +def test_reset_hotp_not_superadmin(client: flask.testing.FlaskClient) -> None: + """Try resetting a users HOTP without being Super Admin.""" + no_access_users: typing.Dict = users.copy() + no_access_users.pop("Super Admin") + + for u in no_access_users: + token: typing.Dict = get_token(username=users[u], client=client) + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.TOTP_DEACTIVATE, headers=token + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + +def test_reset_hotp_no_json(client: flask.testing.FlaskClient) -> None: + """Try reseting user HOTP without specifying the user.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Deactivate TOTP + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.TOTP_DEACTIVATE, headers=token + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Required data missing from request!" in response.json.get("message") + + +def test_reset_hotp_no_username(client: flask.testing.FlaskClient) -> None: + """Reset users HOTP with empty username.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Deactivate TOTP + for x in ["", None]: + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.TOTP_DEACTIVATE, headers=token, json={"username": x} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Username required to reset 2FA to HOTP" in response.json.get("message") + + +def test_reset_hotp_non_existent_user(client: flask.testing.FlaskClient) -> None: + """Try to reset HOTP for non existent user.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Non existent user + username: str = "nonexistentuser" + assert not models.User.query.filter_by(username=username).first() + + # Deactivate TOTP + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.TOTP_DEACTIVATE, headers=token, json={"username": username} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert f"The user doesn't exist: {username}" in response.json.get("message") + + +def test_reset_hotp_already_set(client: flask.testing.FlaskClient) -> None: + """Reset hotp when already set.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Existent user + user_row: models.User = models.User.query.first() + assert user_row + assert not user_row.totp_enabled + + # Deactivate TOTP + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.TOTP_DEACTIVATE, headers=token, json={"username": user_row.username} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "TOTP is already deactivated for this user" in response.json.get("message") + + +def test_reset_hotp(client: flask.testing.FlaskClient) -> None: + """Reset HOTP.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Existent user + user_row: models.User = models.User.query.first() + assert user_row + user_row.activate_totp() + assert user_row.totp_enabled + + # Deactivate TOTP + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.TOTP_DEACTIVATE, headers=token, json={"username": user_row.username} + ) + assert response.status_code == http.HTTPStatus.OK + assert ( + f"TOTP has been deactivated for user: {user_row.username}. They can now use 2FA via email during authentication." + in response.json.get("message") + ) + + user_row_again: models.User = models.User.query.filter_by(username=user_row.username).first() + assert user_row_again and not user_row_again.totp_enabled + + +# SendMOTD ######################################################################################### + + +def test_send_motd_incorrect_method(client: flask.testing.FlaskClient) -> None: + """Only post should be accepted.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Attempt request + with unittest.mock.patch.object(flask_mail.Connection, "send") as mock_mail_send: + for method in [client.get, client.put, client.delete, client.patch]: + response: werkzeug.test.WrapperTestResponse = method( + tests.DDSEndpoint.MOTD_SEND, headers=token, json={"motd_id": "something"} + ) + assert response.status_code == http.HTTPStatus.METHOD_NOT_ALLOWED + assert mock_mail_send.call_count == 0 + + +def test_send_motd_not_superadmin(client: flask.testing.FlaskClient) -> None: + """Only Super Admins should be able to send the motds.""" + for role in ["Unit Admin", "Unit Personnel", "Researcher"]: + # Authenticate + token: typing.Dict = get_token(username=users[role], client=client) + + # Attempt request + with unittest.mock.patch.object(flask_mail.Connection, "send") as mock_mail_send: + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD_SEND, headers=token, json={"motd_id": "something"} + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + assert mock_mail_send.call_count == 0 + + +def test_send_motd_no_json(client: flask.testing.FlaskClient) -> None: + """The request needs json in order to send a motd.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Attempt request + with unittest.mock.patch.object(flask_mail.Connection, "send") as mock_mail_send: + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD_SEND, headers=token + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Required data missing from request" in response.json.get("message") + assert mock_mail_send.call_count == 0 + + +def test_send_motd_no_motdid(client: flask.testing.FlaskClient) -> None: + """The json should have motd_id.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Attempt request + with unittest.mock.patch.object(flask_mail.Connection, "send") as mock_mail_send: + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD_SEND, headers=token, json={"test": "something"} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Please specify the ID of the MOTD you want to send." in response.json.get("message") + assert mock_mail_send.call_count == 0 + + +def test_send_motd_nonexistent_motd(client: flask.testing.FlaskClient) -> None: + """The motd_id needs to be a valid motd.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Specify motd to send + motd_id: int = 10 + + # Attempt request + with unittest.mock.patch.object(flask_mail.Connection, "send") as mock_mail_send: + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD_SEND, headers=token, json={"motd_id": motd_id} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert f"There is no active MOTD with ID '{motd_id}'" in response.json.get("message") + assert mock_mail_send.call_count == 0 + + +def test_send_motd_not_active(client: flask.testing.FlaskClient) -> None: + """Attempt sending a motd which is not active.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Create a motd + message: str = "This is a message that should become a MOTD and then be sent to all the users." + new_motd: models.MOTD = models.MOTD(message=message, active=False) + db.session.add(new_motd) + db.session.commit() + + # Make sure the motd is created + created_motd: models.MOTD = models.MOTD.query.filter_by(message=message).one_or_none() + assert created_motd and not created_motd.active + + # Attempt request + with unittest.mock.patch.object(flask_mail.Connection, "send") as mock_mail_send: + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD_SEND, headers=token, json={"motd_id": created_motd.id} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert f"There is no active MOTD with ID '{created_motd.id}'" in response.json.get( + "message" + ) + assert mock_mail_send.call_count == 0 + + +def test_send_motd_no_primary_email(client: flask.testing.FlaskClient) -> None: + """Send a motd to all users.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Create a motd + message: str = "This is a message that should become a MOTD and then be sent to all the users." + new_motd: models.MOTD = models.MOTD(message=message) + db.session.add(new_motd) + db.session.commit() + + # Make sure the motd is created + created_motd: models.MOTD = models.MOTD.query.filter_by(message=message).one_or_none() + assert created_motd + + # Get number of users + num_users: int = models.User.query.count() + + # Remove primary_email for one user + primary_email: models.Email = models.Email.query.first() + email: str = primary_email.email + username: str = primary_email.user.username + db.session.delete(primary_email) + db.session.commit() + + # Make sure email is removed + assert not models.Email.query.filter_by(email=email).one_or_none() + assert not models.User.query.filter_by(username=username).one().primary_email + + # Attempt request and catch email + with mail.record_messages() as outbox: + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD_SEND, headers=token, json={"motd_id": created_motd.id} + ) + assert response.status_code == http.HTTPStatus.OK + assert len(outbox) == num_users - 1 + assert "Important Information: Data Delivery System" in outbox[-1].subject + assert "incorrect subject" not in outbox[-1].subject + + +def test_send_motd_ok(client: flask.testing.FlaskClient) -> None: + """Send a motd to all users.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Create a motd + message: str = "This is a message that should become a MOTD and then be sent to all the users." + new_motd: models.MOTD = models.MOTD(message=message) + db.session.add(new_motd) + db.session.commit() + + # Make sure the motd is created + created_motd: models.MOTD = models.MOTD.query.filter_by(message=message).one_or_none() + assert created_motd + + # Get number of users + num_users = models.User.query.count() + + # Attempt request and catch email + with mail.record_messages() as outbox: + response: werkzeug.test.WrapperTestResponse = client.post( + tests.DDSEndpoint.MOTD_SEND, headers=token, json={"motd_id": created_motd.id} + ) + assert response.status_code == http.HTTPStatus.OK + assert len(outbox) == num_users + assert "Important Information: Data Delivery System" in outbox[-1].subject + + +# Maintenance ###################################################################################### + + +def test_set_maintenance_not_superadmin(client: flask.testing.FlaskClient) -> None: + """Change Maintenance mode using everything but Super Admin access.""" + no_access_users: typing.Dict = users.copy() + no_access_users.pop("Super Admin") + + for u in no_access_users: + token: typing.Dict = get_token(username=users[u], client=client) + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.MAINTENANCE, headers=token, json={"state": "on"} + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + +def test_get_maintenance_status_not_superadmin(client: flask.testing.FlaskClient) -> None: + """Check Maintenance mode status using everything but Super Admin access.""" + no_access_users: typing.Dict = users.copy() + no_access_users.pop("Super Admin") + + for u in no_access_users: + token: typing.Dict = get_token(username=users[u], client=client) + response: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.MAINTENANCE, headers=token + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + +def test_maintenance_command_incorrect_method(client: flask.testing.FlaskClient) -> None: + """Only put and get should be accepted.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Attempt request + for method in [client.post, client.delete, client.patch]: + response: werkzeug.test.WrapperTestResponse = method( + tests.DDSEndpoint.MAINTENANCE, headers=token, json={"state": "on"} + ) + assert response.status_code == http.HTTPStatus.METHOD_NOT_ALLOWED + + +def test_set_maintenance_no_json(client: flask.testing.FlaskClient) -> None: + """The request needs json in order to change Maintenance mode.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Attempt request + response: werkzeug.test.TestResponse = client.put(tests.DDSEndpoint.MAINTENANCE, headers=token) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Required data missing from request" in response.json.get("message") + + +def test_get_maintenance_status_no_json_required(client: flask.testing.FlaskClient) -> None: + """No json needed in order to get the current Maintenance mode.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Attempt request + response: werkzeug.test.TestResponse = client.get(tests.DDSEndpoint.MAINTENANCE, headers=token) + assert response.status_code == http.HTTPStatus.OK + assert "Maintenance mode is set to:" in response.json.get("message") + + +def test_set_maintenance_incorrect_state(client: flask.testing.FlaskClient) -> None: + """The json should be 'on' or 'off'.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # create record in Maintenance + current_mode: models.Maintenance = models.Maintenance(active=False) + db.session.add(current_mode) + db.session.commit() + # Attempt request + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.MAINTENANCE, headers=token, json={"state": "something"} + ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST + assert "Please, specify the correct argument: on or off" in response.json.get("message") + + +def test_set_maintenance_on_ok(client: flask.testing.FlaskClient) -> None: + """Set Maintenance mode to 'on'.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + setting = "on" + + # create record in Maintenance + current_mode: models.Maintenance = models.Maintenance(active=False) + db.session.add(current_mode) + db.session.commit() + + # Verify that maintenance is off + assert models.Maintenance.query.first().active is False + + # Attempt request + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.MAINTENANCE, headers=token, json={"state": setting} + ) + assert response.status_code == http.HTTPStatus.OK + assert f"Maintenance set to: {setting.upper()}" in response.json.get("message") + + # Verify that maintenance is set to ON using the get method + response: werkzeug.test.TestResponse = client.get(tests.DDSEndpoint.MAINTENANCE, headers=token) + assert response.status_code == http.HTTPStatus.OK + assert f"Maintenance mode is set to: {setting.upper()}" in response.json.get("message") + + +def test_set_maintenance_off_ok(client: flask.testing.FlaskClient) -> None: + """Set Maintenance mode to 'off'.""" + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + setting = "off" + + # create record in Maintenance + current_mode: models.Maintenance = models.Maintenance.query.first() + current_mode.active = True + db.session.commit() + + # Verify that maintenance is on + assert models.Maintenance.query.first().active + + # Attempt request + response: werkzeug.test.WrapperTestResponse = client.put( + tests.DDSEndpoint.MAINTENANCE, headers=token, json={"state": setting} + ) + assert response.status_code == http.HTTPStatus.OK + assert f"Maintenance set to: {setting.upper()}" in response.json.get("message") + + # Verify that maintenance is set to OFF using the get method + response: werkzeug.test.TestResponse = client.get(tests.DDSEndpoint.MAINTENANCE, headers=token) + assert response.status_code == http.HTTPStatus.OK + assert f"Maintenance mode is set to: {setting.upper()}" in response.json.get("message") + + +# AnyProjectsBusy + +# -- get + + +def test_anyprojectsbusy_no_token(client: flask.testing.FlaskClient) -> None: + """Token required to check if projects are busy.""" + response = client.get(tests.DDSEndpoint.PROJECT_BUSY_ANY, headers=tests.DEFAULT_HEADER) + assert response.status_code == http.HTTPStatus.UNAUTHORIZED + assert response.json.get("message") + assert "No token" in response.json.get("message") + + +def test_anyprojectsbusy_not_allowed(client: flask.testing.FlaskClient) -> None: + """Only super admins allowed.""" + for role in ["researcher", "unituser", "unitadmin"]: + token = tests.UserAuth(tests.USER_CREDENTIALS[role]).token(client) + response = client.get( + tests.DDSEndpoint.PROJECT_BUSY_ANY, + headers=token, + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + +def test_anyprojectsbusy_true(client: flask.testing.FlaskClient) -> None: + """There are busy projects.""" + # Get a project and set to busy + project: models.Project = models.Project.query.first() + project.busy = True + db.session.commit() + busy_count: int = models.Project.query.filter_by(busy=True).count() + assert busy_count > 0 + + # Call endpoint + token = tests.UserAuth(tests.USER_CREDENTIALS["superadmin"]).token(client) + response = client.get( + tests.DDSEndpoint.PROJECT_BUSY_ANY, + headers=token, + ) + assert response.status_code == http.HTTPStatus.OK + + # Check response + num: int = response.json.get("num") + assert num == busy_count + + +def test_anyprojectsbusy_false(client: flask.testing.FlaskClient) -> None: + """There are busy projects.""" + # Set all projects to not busy + for project in models.Project.query.all(): + project.busy = False + db.session.commit() + busy_count: int = models.Project.query.filter_by(busy=True).count() + assert busy_count == 0 + + # Call endpoint + token = tests.UserAuth(tests.USER_CREDENTIALS["superadmin"]).token(client) + response = client.get( + tests.DDSEndpoint.PROJECT_BUSY_ANY, + headers=token, + ) + assert response.status_code == http.HTTPStatus.OK + + # Check response + num: int = response.json.get("num") + assert num == 0 + + +def test_anyprojectsbusy_true_list(client: flask.testing.FlaskClient) -> None: + """There are busy projects, list them.""" + wanted_return_info: typing.Dict = {} + + # Get all projects and set to busy + all_projects: typing.List = models.Project.query.all() + for project in all_projects: + project.busy = True + wanted_return_info[project.public_id] = project.date_updated + db.session.commit() + busy_count: int = models.Project.query.filter_by(busy=True).count() + assert busy_count == len(all_projects) + + # Call endpoint + token = tests.UserAuth(tests.USER_CREDENTIALS["superadmin"]).token(client) + response = client.get( + tests.DDSEndpoint.PROJECT_BUSY_ANY, headers=token, query_string={"list": True} + ) + assert response.status_code == http.HTTPStatus.OK + + # Check response + num: int = response.json.get("num") + assert num == len(all_projects) + projects_returned: typing.Dict = response.json.get("projects") + for p in wanted_return_info: + assert p in projects_returned + + +def test_anyprojectsbusy_false_list(client: flask.testing.FlaskClient) -> None: + """There are busy projects.""" + # Get all projects and set to not busy + all_projects: typing.List = models.Project.query.all() + for project in all_projects: + project.busy = False + db.session.commit() + busy_count: int = models.Project.query.filter_by(busy=True).count() + assert busy_count == 0 + + # Call endpoint + token = tests.UserAuth(tests.USER_CREDENTIALS["superadmin"]).token(client) + response = client.get( + tests.DDSEndpoint.PROJECT_BUSY_ANY, headers=token, query_string={"list": True} + ) + assert response.status_code == http.HTTPStatus.OK + + # Check response + num: int = response.json.get("num") + assert num == 0 + projects_returned: typing.Dict = response.json.get("projects") + assert projects_returned is None + + +def test_statistics_no_access(client: flask.testing.FlaskClient) -> None: + """Verify that users that are not Super Admins cannot use this endpoint.""" + # Verify no access for researchers and unit users + for user in ["researcher", "unituser", "unitadmin"]: + token = tests.UserAuth(tests.USER_CREDENTIALS[user]).token(client) + response = client.get(tests.DDSEndpoint.STATS, headers=token) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + +def test_statistics_return_none(client: flask.testing.FlaskClient) -> None: + """There are no rows in the Reporting table.""" + # Check that there are no rows + assert not models.Reporting.query.count() + + # Get all rows from API + token = tests.UserAuth(tests.USER_CREDENTIALS["superadmin"]).token(client) + response = client.get(tests.DDSEndpoint.STATS, headers=token) + assert response.status_code == http.HTTPStatus.OK + + # Check response + returned: typing.Dict = response.json.get("stats") + assert returned == [] + + +def test_statistics_return_rows(client: flask.testing.FlaskClient, cli_runner) -> None: + """Verify list returned when there are rows in reporting table.""" + + def add_row_to_reporting_table(time): + """Run command to add a new row to the reporting table.""" + with freezegun.freeze_time(time): + # Run scheduled job now + with mock.patch.object(flask_mail.Mail, "send") as mock_mail_send: + result: click.testing.Result = cli_runner.invoke(collect_stats) + assert not result.exception, "Raised an unwanted exception." + assert mock_mail_send.call_count == 0 + + # Generate row in reporting table + time_1 = datetime(year=2022, month=12, day=10, hour=10, minute=54, second=10) + add_row_to_reporting_table(time=time_1) + + # Verify that there's a row added + assert models.Reporting.query.count() == 1 + + # Get all rows from API + token = tests.UserAuth(tests.USER_CREDENTIALS["superadmin"]).token(client) + response = client.get(tests.DDSEndpoint.STATS, headers=token) + assert response.status_code == http.HTTPStatus.OK + + # Check response + returned: typing.Dict = response.json.get("stats") + assert len(returned) == 1 + reporting_row = models.Reporting.query.first() + assert returned[0] == { + "Date": str(reporting_row.date.date()), + "Units": reporting_row.unit_count, + "Researchers": reporting_row.researcher_count, + "Project Owners": reporting_row.project_owner_unique_count, + "Unit Personnel": reporting_row.unit_personnel_count, + "Unit Admins": reporting_row.unit_admin_count, + "Super Admins": reporting_row.superadmin_count, + "Total Users": reporting_row.total_user_count, + "Total Projects": reporting_row.total_project_count, + "Active Projects": reporting_row.active_project_count, + "Inactive Projects": reporting_row.inactive_project_count, + "Data Now (TB)": reporting_row.tb_stored_now, + "Data Uploaded (TB)": reporting_row.tb_uploaded_since_start, + "TBHours Last Month": reporting_row.tbhours, + "TBHours Total": reporting_row.tbhours_since_start, + } + returned_columns: typing.Dict = response.json.get("columns") + assert returned_columns + + +# UnitUserEmails + + +def test_unituseremails_accessdenied(client: flask.testing.FlaskClient) -> None: + """Only Super Admins can get the emails.""" + no_access_users: typing.Dict = users.copy() + no_access_users.pop("Super Admin") + + for u in no_access_users: + token: typing.Dict = get_token(username=users[u], client=client) + response: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.USER_EMAILS, headers=token + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + +def test_unituseremails_no_emails(client: flask.testing.FlaskClient) -> None: + """Empty should be returned if no emails.""" + # No users returned from query + with patch("dds_web.database.models.UnitUser.query") as mock_users: + mock_users.return_value = [] + + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Call endpoint + response: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.USER_EMAILS, headers=token + ) + assert response.status_code == http.HTTPStatus.OK + + # Verify response + assert response.json and response.json.get("empty") == True + + +def test_unituseremails_ok(client: flask.testing.FlaskClient) -> None: + """Return user emails for unit users only.""" + # Emails that should be returned + unituser_emails = [user.primary_email for user in models.UnitUser.query.all()] + + # Emails that should not be returned + researcher_emails = [user.primary_email for user in models.ResearchUser.query.all()] + superadmin_emails = [user.primary_email for user in models.SuperAdmin.query.all()] + non_primary_emails = [ + email.email for email in models.Email.query.filter_by(primary=False).all() + ] + + # Authenticate + token: typing.Dict = get_token(username=users["Super Admin"], client=client) + + # Call endpoint + response: werkzeug.test.WrapperTestResponse = client.get( + tests.DDSEndpoint.USER_EMAILS, headers=token + ) + assert response.status_code == http.HTTPStatus.OK + + # Verify response ------------------------------- + + # There should be a json response + json_response = response.json + assert json_response + + # There should be emails in response + emails = json_response.get("emails") + assert emails + + # The list of emails should contain all unit user primary emails + assert len(emails) == len(unituser_emails) + for e in unituser_emails: + assert e in emails + + # The list of should not contain any of the other emails + for e in researcher_emails + superadmin_emails + non_primary_emails: + assert e not in emails