From 7b30888fccf0d589f901492ea0114217c2f73070 Mon Sep 17 00:00:00 2001 From: Luigi Pellecchia Date: Sat, 7 Dec 2024 17:57:10 +0100 Subject: [PATCH 1/2] Remove calls to dbi.engine.dispose() Signed-off-by: Luigi Pellecchia --- api/api.py | 265 ------------------------------------------- api/testrun.py | 4 - db/models/init_db.py | 1 - 3 files changed, 270 deletions(-) diff --git a/api/api.py b/api/api.py index 3b82569..9426b22 100644 --- a/api/api.py +++ b/api/api.py @@ -776,7 +776,6 @@ def get_api_sw_requirements_mapping_sections(dbi, api): ret = {'mapped': mapped_sections, 'unmapped': unmapped_sections} - dbi.engine.dispose() return ret @@ -1078,8 +1077,6 @@ def get(self): query = query.order_by(CommentModel.created_at.asc()) comments = [c.as_dict() for c in query.all()] - - dbi.engine.dispose() return comments def post(self): @@ -1160,7 +1157,6 @@ def post(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return new_comment.as_dict() @@ -1192,7 +1188,6 @@ def get(self): spec = get_api_specification(api.raw_specification_url) ret = check_direct_work_items_against_another_spec_file(dbi.session, spec, api) - dbi.engine.dispose() return ret @@ -1203,7 +1198,6 @@ def get(self): query = dbi.session.query(DocumentModel) query = filter_query(query, args, DocumentModel, False) docs = [doc.as_dict() for doc in query.all()] - dbi.engine.dispose() return docs @@ -1224,13 +1218,11 @@ def get(self): api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if "url" in args.keys(): @@ -1268,7 +1260,6 @@ def get(self): "content": content, "valid": valid} - dbi.engine.dispose() return ret @@ -1342,7 +1333,6 @@ def get(self): doc_all[0].offset = analysis['documents']['warning'][i]['new-offset'] dbi.session.commit() - dbi.engine.dispose() return True @@ -1414,7 +1404,6 @@ def get(self): "per_page": per_page, "count": count} - dbi.engine.dispose() return ret def post(self): @@ -1437,7 +1426,6 @@ def post(self): ).all() if len(api) > 0: - dbi.engine.dispose() return 'Api is already in the db for the selected library', 409 user = get_active_user_from_request(request_data, dbi.session) @@ -1552,7 +1540,6 @@ def post(self): dbi.session.add(tmp) dbi.session.commit() - dbi.engine.dispose() return new_api.as_dict() def put(self): @@ -1568,7 +1555,6 @@ def put(self): api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # User permission @@ -1579,7 +1565,6 @@ def put(self): # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'e' not in permissions or user.role not in USER_ROLES_EDIT_PERMISSIONS: - dbi.engine.dispose() return FORBIDDEN_MESSAGE, FORBIDDEN_STATUS # Check that the new api+library+library_version is not already in the db @@ -1593,7 +1578,6 @@ def put(self): ApiModel.id != request_data['api-id']).all() if len(same_existing_apis) > 0: - dbi.engine.dispose() return 'An Api with selected name and library already exist in the db', 409 api_modified = False @@ -1618,7 +1602,6 @@ def put(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return api.as_dict() def delete(self): @@ -1643,7 +1626,6 @@ def delete(self): ).one() if len(apis) != 1: - dbi.engine.dispose() return 'Api not found', 400 api = apis[0] @@ -1656,7 +1638,6 @@ def delete(self): # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'e' not in permissions or user.role not in USER_ROLES_EDIT_PERMISSIONS: - dbi.engine.dispose() return FORBIDDEN_MESSAGE, FORBIDDEN_STATUS justifications_mapping_api = dbi.session.query(ApiJustificationModel).filter( @@ -1697,7 +1678,6 @@ def delete(self): f'/?currentLibrary={api.library}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return True @@ -1719,7 +1699,6 @@ def put(self): api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # User permission @@ -1730,7 +1709,6 @@ def put(self): # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return FORBIDDEN_MESSAGE, FORBIDDEN_STATUS api_modified = False @@ -1742,7 +1720,6 @@ def put(self): dbi.session.add(api) dbi.session.commit() - dbi.engine.dispose() return api.as_dict() @@ -1789,7 +1766,6 @@ def get(self): break if not first_found: - dbi.engine.dispose() return [] ret.append(get_combined_history_object(first_obj, {}, _model_fields, [])) @@ -1811,7 +1787,6 @@ def get(self): ret[i]['mapping'][permission_field], dbi.session)) ret = ret[::-1] - dbi.engine.dispose() return ret @@ -1821,7 +1796,6 @@ def get(self): # args = get_query_string_args(request.args) dbi = db_orm.DbInterface(get_db()) libraries = dbi.session.query(ApiModel.library).distinct().all() - dbi.engine.dispose() return sorted([x.library for x in libraries]) @@ -1838,7 +1812,6 @@ def get(self): api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS spec = get_api_specification(api.raw_specification_url) @@ -1846,13 +1819,11 @@ def get(self): # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS ret = api.as_dict() ret['raw_specification'] = spec ret['permissions'] = permissions - dbi.engine.dispose() return ret @@ -1874,13 +1845,11 @@ def get(self): # Find api api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS api_specification = get_api_specification(api.raw_specification_url) @@ -1931,7 +1900,6 @@ def get(self): ret = {'mapped': mapped_sections, 'unmapped': unmapped_sections} - dbi.engine.dispose() return ret def post(self): @@ -1950,13 +1918,11 @@ def post(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS section = request_data['section'] @@ -1968,7 +1934,6 @@ def post(self): # `status` field should be skipped because a default is assigned in the model for check_field in [x for x in TestSpecification.fields if x not in ['status']]: if check_field.replace("_", "-") not in request_data['test-specification'].keys(): - dbi.engine.dispose() return "Bad request. Unconsistent data.", 400 title = request_data['test-specification']['title'] @@ -1981,7 +1946,6 @@ def post(self): TestSpecificationModel.preconditions == preconditions).filter( TestSpecificationModel.test_description == test_description).filter( TestSpecificationModel.expected_behavior == expected_behavior).all()) > 0: - dbi.engine.dispose() return "Test Specification already associated to the selected api Specification section.", 409 new_test_specification = TestSpecificationModel(title, @@ -2005,7 +1969,6 @@ def post(self): ApiTestSpecificationModel.api_id == api.id).filter( ApiTestSpecificationModel.test_specification_id == id).filter( ApiTestSpecificationModel.section == section).all()) > 0: - dbi.engine.dispose() return "Test Specification already associated to the selected api Specification section.", 409 try: @@ -2039,7 +2002,6 @@ def post(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return new_test_specification_mapping_api.as_dict() def put(self): @@ -2058,13 +2020,11 @@ def put(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS try: @@ -2117,7 +2077,6 @@ def put(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return test_specification_mapping_api.as_dict() def delete(self): @@ -2134,13 +2093,11 @@ def delete(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # check if api ... @@ -2151,7 +2108,6 @@ def delete(self): return f"Unable to find the Test Specification mapping to Api id {request_data['relation-id']}", 400 if test_specification_mapping_api.api.id != api.id: - dbi.engine.dispose() return 'bad request!', 401 notification_ts_id = test_specification_mapping_api.test_specification.id @@ -2181,7 +2137,6 @@ def delete(self): """ dbi.session.commit() - dbi.engine.dispose() return True @@ -2201,13 +2156,11 @@ def get(self): # Find api api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS api_specification = get_api_specification(api.raw_specification_url) @@ -2248,7 +2201,6 @@ def get(self): ret = {'mapped': mapped_sections, 'unmapped': unmapped_sections} - dbi.engine.dispose() return ret def post(self): @@ -2267,13 +2219,11 @@ def post(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS section = request_data['section'] @@ -2285,7 +2235,6 @@ def post(self): # `status` field should be skipped because a default is assigned in the model for check_field in [x for x in TestCase.fields if x not in ['status']]: if check_field.replace("_", "-") not in request_data['test-case'].keys(): - dbi.engine.dispose() return "Bad request. Not consistent data.", 400 repository = request_data['test-case']['repository'] @@ -2298,7 +2247,6 @@ def post(self): ApiTestCaseModel.section == section).filter( TestCaseModel.repository == repository).filter( TestCaseModel.relative_path == relative_path).all()) > 0: - dbi.engine.dispose() return "Test Case already associated to the current api.", 409 new_test_case = TestCaseModel(repository, @@ -2319,7 +2267,6 @@ def post(self): id = request_data['test-case']['id'] if len(dbi.session.query(ApiTestCaseModel).filter(ApiTestCaseModel.api_id == api.id).filter( ApiTestCaseModel.test_case_id == id).filter(ApiTestCaseModel.section == section).all()) > 0: - dbi.engine.dispose() return "Test Case already associated to the selected api Specification section.", 409 try: @@ -2350,7 +2297,6 @@ def post(self): str(user.id), f'/mapping/{api.id}') dbi.session.add(notifications) - dbi.engine.dispose() return new_test_case_mapping_api.as_dict() def put(self): @@ -2369,13 +2315,11 @@ def put(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS try: @@ -2383,7 +2327,6 @@ def put(self): ApiTestCaseModel.id == request_data["relation-id"]).one() test_case = test_case_mapping_api.test_case except NoResultFound: - dbi.engine.dispose() return "Test Case mapping api not found", 400 # Update only modified fields @@ -2428,7 +2371,6 @@ def put(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return test_case_mapping_api.as_dict() def delete(self): @@ -2447,13 +2389,11 @@ def delete(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # check if api ... @@ -2464,7 +2404,6 @@ def delete(self): return f"Unable to find the Test Case mapping to Api id {request_data['relation-id']}", 400 if test_case_mapping_api.api.id != api.id: - dbi.engine.dispose() return 'bad request!', 401 notification_tc_id = test_case_mapping_api.test_case.id @@ -2494,7 +2433,6 @@ def delete(self): """ dbi.session.commit() - dbi.engine.dispose() return True @@ -2506,7 +2444,6 @@ def get(self): if 'work_item_type' not in args.keys() or \ 'mapped_to_type' not in args.keys() or \ 'relation_id' not in args.keys(): - dbi.engine.dispose() return [] if args['mapped_to_type'] == "api": @@ -2536,7 +2473,6 @@ def get(self): _model_map_history = ApiTestCaseHistoryModel _model_history = TestCaseHistoryModel else: - dbi.engine.dispose() return [] elif args['mapped_to_type'] == "sw-requirement": @@ -2556,7 +2492,6 @@ def get(self): _model_map_history = SwRequirementTestCaseHistoryModel _model_history = TestCaseHistoryModel else: - dbi.engine.dispose() return [] elif args['mapped_to_type'] == "test-specification": @@ -2566,10 +2501,8 @@ def get(self): _model_map_history = TestSpecificationTestCaseHistoryModel _model_history = TestCaseHistoryModel else: - dbi.engine.dispose() return [] else: - dbi.engine.dispose() return [] _model_fields = _model.__table__.columns.keys() @@ -2577,7 +2510,6 @@ def get(self): relation_rows = dbi.session.query(_model_map).filter(_model_map.id == args['relation_id']).all() if len(relation_rows) != 1: - dbi.engine.dispose() return [] relation_row = relation_rows[0].as_dict() @@ -2637,7 +2569,6 @@ def get(self): break if not first_found: - dbi.engine.dispose() return [] ret.append(get_combined_history_object(first_obj, first_map, _model_fields, _model_map_fields)) @@ -2657,7 +2588,6 @@ def get(self): ret = get_reduced_history_data(ret, _model_fields, _model_map_fields, dbi.session) ret = ret[::-1] - dbi.engine.dispose() return ret @@ -2672,7 +2602,6 @@ def get(self): dbi = db_orm.DbInterface(get_db()) if 'work_item_type' not in args.keys() or 'id' not in args.keys(): - dbi.engine.dispose() return [] _id = args['id'] @@ -2777,13 +2706,11 @@ def get(self): # Find api api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS api_specification = get_api_specification(api.raw_specification_url) @@ -2802,7 +2729,6 @@ def get(self): ret = {'mapped': mapped_sections, 'unmapped': unmapped_sections} - dbi.engine.dispose() return ret @@ -2824,13 +2750,11 @@ def get(self): # Find api api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS api_specification = get_api_specification(api.raw_specification_url) @@ -2857,7 +2781,6 @@ def get(self): ret = {'mapped': mapped_sections, 'unmapped': unmapped_sections} - dbi.engine.dispose() return ret def post(self): @@ -2876,13 +2799,11 @@ def post(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS section = request_data['section'] @@ -2906,7 +2827,6 @@ def post(self): if len(dbi.session.query(ApiJustificationModel).filter(ApiJustificationModel.api_id == api.id).filter( ApiJustificationModel.justification_id == id).filter( ApiJustificationModel.section == section).all()) > 0: - dbi.engine.dispose() return "Justification already associated to the selected api Specification section.", 409 try: @@ -2939,7 +2859,6 @@ def post(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return new_justification_mapping_api.as_dict() def put(self): @@ -2958,13 +2877,11 @@ def put(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # check if api ... @@ -3018,7 +2935,6 @@ def put(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return justification_mapping_api.as_dict() def delete(self): @@ -3037,13 +2953,11 @@ def delete(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # check if api ... @@ -3051,13 +2965,11 @@ def delete(self): ApiJustificationModel.id == request_data["relation-id"]).all() if len(justification_mapping_api) != 1: - dbi.engine.dispose() return 'bad request!', 401 justification_mapping_api = justification_mapping_api[0] if justification_mapping_api.api.id != api.id: - dbi.engine.dispose() return 'bad request!', 401 notification_j_id = justification_mapping_api.justification.id @@ -3087,7 +2999,6 @@ def delete(self): """ dbi.session.commit() - dbi.engine.dispose() return True @@ -3119,13 +3030,11 @@ def get(self): # Find api api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS api_specification = get_api_specification(api.raw_specification_url) @@ -3152,7 +3061,6 @@ def get(self): ret = {'mapped': mapped_sections, 'unmapped': unmapped_sections} - dbi.engine.dispose() return ret def post(self): @@ -3171,13 +3079,11 @@ def post(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS mapping_section = request_data['section'] @@ -3187,7 +3093,6 @@ def post(self): if 'id' not in request_data['document'].keys(): if not check_fields_in_request(self.document_fields, request_data['document']): - dbi.engine.dispose() return 'bad request!!', 400 doc_title = request_data['document']['title'] @@ -3221,7 +3126,6 @@ def post(self): if len(dbi.session.query(ApiDocumentModel).filter(ApiDocumentModel.api_id == api.id).filter( ApiDocumentModel.document_id == id).filter( ApiDocumentModel.section == mapping_section).all()) > 0: - dbi.engine.dispose() return "Document already associated to the selected api Specification section.", 409 try: @@ -3253,7 +3157,6 @@ def post(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return new_document_mapping_api.as_dict() def put(self): @@ -3276,13 +3179,11 @@ def put(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # check if api ... @@ -3339,7 +3240,6 @@ def put(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return document_mapping_api.as_dict() def delete(self): @@ -3358,13 +3258,11 @@ def delete(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # check if api ... @@ -3372,13 +3270,11 @@ def delete(self): ApiDocumentModel.id == request_data["relation-id"]).all() if len(document_mapping_api) != 1: - dbi.engine.dispose() return 'bad request!', 401 document_mapping_api = document_mapping_api[0] if document_mapping_api.api.id != api.id: - dbi.engine.dispose() return 'bad request!', 401 notification_d_id = document_mapping_api.document.id @@ -3408,7 +3304,6 @@ def delete(self): """ dbi.session.commit() - dbi.engine.dispose() return True @@ -3430,17 +3325,14 @@ def get(self): # Find api api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS ret = get_api_sw_requirements_mapping_sections(dbi, api) - dbi.engine.dispose() return ret def post(self): @@ -3459,13 +3351,11 @@ def post(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS section = request_data['section'] @@ -3477,7 +3367,6 @@ def post(self): # `status` field should be skipped because a default is assigned in the model for check_field in [x for x in SwRequirement.fields if x not in ['status']]: if check_field.replace("_", "-") not in request_data['sw-requirement'].keys(): - dbi.engine.dispose() return "Bad request. Not consistent data.", 400 title = request_data['sw-requirement']['title'] @@ -3486,7 +3375,6 @@ def post(self): if len(dbi.session.query(SwRequirementModel).filter( SwRequirementModel.title == title).filter( SwRequirementModel.description == description).all()) > 0: - dbi.engine.dispose() return "SW Requirement already associated to the selected api Specification section.", 409 new_sw_requirement = SwRequirementModel(title, @@ -3510,7 +3398,6 @@ def post(self): ApiSwRequirementModel.api_id == api.id).filter( ApiSwRequirementModel.sw_requirement_id == id).filter( ApiSwRequirementModel.section == section).all()) > 0: - dbi.engine.dispose() return "SW Requirement already associated to the selected api Specification section.", 409 try: @@ -3542,7 +3429,6 @@ def post(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return new_sw_requirement_mapping_api.as_dict() def put(self): @@ -3561,13 +3447,11 @@ def put(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS try: @@ -3619,7 +3503,6 @@ def put(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return sw_requirement_mapping_api.as_dict() def delete(self): @@ -3638,13 +3521,11 @@ def delete(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # check if api ... @@ -3655,7 +3536,6 @@ def delete(self): return f"Unable to find the Sw Requirement mapping to Api id {request_data['relation-id']}", 400 if sw_requirement_mapping_api.api.id != api.id: - dbi.engine.dispose() return BAD_REQUEST_MESSAGE, BAD_REQUEST_STATUS notification_sr_id = sw_requirement_mapping_api.sw_requirement.id @@ -3685,7 +3565,6 @@ def delete(self): """ dbi.session.commit() - dbi.engine.dispose() return True @@ -3705,7 +3584,6 @@ def get(self): minimal_keys = ['id', 'description'] jus = [{key: val for key, val in sub.items() if key in minimal_keys} for sub in jus] - dbi.engine.dispose() return jus @@ -3726,7 +3604,6 @@ def get(self): minimal_keys = ['id', 'title'] tss = [{key: val for key, val in sub.items() if key in minimal_keys} for sub in tss] - dbi.engine.dispose() return tss @@ -3747,7 +3624,6 @@ def get(self): minimal_keys = ['id', 'title'] srs = [{key: val for key, val in sub.items() if key in minimal_keys} for sub in srs] - dbi.engine.dispose() return srs @@ -3767,7 +3643,6 @@ def get(self): minimal_keys = ['id', 'title'] tcs = [{key: val for key, val in sub.items() if key in minimal_keys} for sub in tcs] - dbi.engine.dispose() return tcs @@ -3781,7 +3656,6 @@ def get(self): query = filter_query(query, args, SwRequirementSwRequirementModel, False) srsrs = [srsr.as_dict(db_session=dbi.session) for srsr in query.all()] - dbi.engine.dispose() return srsrs def post(self): @@ -3812,7 +3686,6 @@ def post(self): try: relation_to_item = relation_to_query.one() except NoResultFound: - dbi.engine.dispose() return "Parent mapping not found", 404 api_id = relation_to_item.api.id @@ -3823,28 +3696,23 @@ def post(self): try: relation_to_item = relation_to_query.one() except NoResultFound: - dbi.engine.dispose() return "Parent mapping not found", 404 api_id = get_parent_api_id(relation_to_item, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS else: - dbi.engine.dispose() return "Bad request!!!", 400 try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if request_data['relation-to'] == 'api': @@ -3859,7 +3727,6 @@ def post(self): parent_sw_requirement = dbi.session.query(SwRequirementModel).filter( SwRequirementModel.id == parent_sw_requirement_id).one() except NoResultFound: - dbi.engine.dispose() return "Sw Requirement not found", 400 del parent_sw_requirement # Just need to check it exists @@ -3869,7 +3736,6 @@ def post(self): # `status` field should be skipped because a default is assigned in the model for check_field in [x for x in SwRequirement.fields if x not in ['status']]: if check_field.replace("_", "-") not in request_data['sw-requirement'].keys(): - dbi.engine.dispose() return "Bad request. Not consistent data.", 400 title = request_data['sw-requirement']['title'] @@ -3884,7 +3750,6 @@ def post(self): SwRequirementSwRequirementModel.id == relation_id).filter( SwRequirementSwRequirementModel.sw_requirement_id == sr.id).all() if len(sr_mapping) > 0: - dbi.engine.dispose() return "Sw Requirement already associated to the selected Sw Requirement.", 409 new_sw_requirement = SwRequirementModel(title, @@ -3907,18 +3772,15 @@ def post(self): if len(dbi.session.query(SwRequirementSwRequirementModel).filter( SwRequirementSwRequirementModel.id == relation_id).filter( SwRequirementSwRequirementModel.sw_requirement_id == sw_requirement_id).all()) > 0: - dbi.engine.dispose() return "Sw Requirement already associated to the selected Sw Requirement.", 409 try: sw_requirement = dbi.session.query(SwRequirementModel).filter( SwRequirementModel.id == sw_requirement_id).one() except NoResultFound: - dbi.engine.dispose() return "Bad request.", 400 if not isinstance(sw_requirement, SwRequirementModel): - dbi.engine.dispose() return "Bad request.", 400 new_sw_requirement_mapping_sw_requirement = SwRequirementSwRequirementModel(api_sr, @@ -3941,7 +3803,6 @@ def post(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return new_sw_requirement_mapping_sw_requirement.as_dict() def put(self): @@ -3963,24 +3824,20 @@ def put(self): SwRequirementSwRequirementModel.id == request_data['relation-id'] ).one() except NoResultFound: - dbi.engine.dispose() return "Sw Requirement mapping not found", 400 api_id = get_parent_api_id(sr_mapping_sr, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS sr = sr_mapping_sr.sw_requirement @@ -4017,7 +3874,6 @@ def put(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return ret def delete(self): @@ -4042,24 +3898,20 @@ def delete(self): api_id = get_parent_api_id(sr_mapping_sr, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS """ if sw_requirement_mapping_sw_requirement.sw_requirement_id != sw_requirement.id: - dbi.engine.dispose() return 'bad request!', 401 """ @@ -4079,7 +3931,6 @@ def delete(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return True @@ -4119,7 +3970,6 @@ def post(self): sw_requirement = dbi.session.query(SwRequirementModel).filter( SwRequirementModel.id == sw_requirement_id).one() except NoResultFound: - dbi.engine.dispose() return "Sw Requirement not found", 400 del sw_requirement # Just need to check it exists @@ -4131,7 +3981,6 @@ def post(self): try: relation_to_item = relation_to_query.one() except NoResultFound: - dbi.engine.dispose() return "Parent mapping not found", 404 api_id = relation_to_item.api.id @@ -4142,27 +3991,22 @@ def post(self): try: relation_to_item = relation_to_query.one() except NoResultFound: - dbi.engine.dispose() return "Parent mapping not found", 404 api_id = get_parent_api_id(relation_to_item, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS else: - dbi.engine.dispose() return "Bad request!!!", 400 try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if relation_to == 'api': @@ -4175,7 +4019,6 @@ def post(self): # `status` field should be skipped because a default is assigned in the model for check_field in [x for x in TestSpecification.fields if x not in ['status']]: if check_field.replace("_", "-") not in request_data['test-specification'].keys(): - dbi.engine.dispose() return "Bad request. Not consistent data.", 400 title = request_data['test-specification']['title'] @@ -4199,7 +4042,6 @@ def post(self): SwRequirementTestSpecificationModel.test_specification_id == ts.id).filter( ApiSwRequirementModel.api_id == api.id).all() if len(ts_mapping) > 0: - dbi.engine.dispose() return "Test Specification already associated to the selected api Specification section.", 409 """ @@ -4225,7 +4067,6 @@ def post(self): SwRequirementTestSpecificationModel.sw_requirement_mapping_api_id == request_data[ 'relation-id']).filter( SwRequirementTestSpecificationModel.test_specification_id == test_specification_id).all()) > 0: - dbi.engine.dispose() return "Test Specification already associated to the selected Api and Sw Requirement.", 409 try: @@ -4235,7 +4076,6 @@ def post(self): return "Unable to find the selected Test Specification", 400 if not isinstance(test_specification, TestSpecificationModel): - dbi.engine.dispose() return "Bad request.", 400 new_test_specification_mapping_sw_requirement = SwRequirementTestSpecificationModel(api_sr, @@ -4258,7 +4098,6 @@ def post(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return new_test_specification_mapping_sw_requirement.as_dict() def put(self): @@ -4285,19 +4124,16 @@ def put(self): api_id = get_parent_api_id(sw_mapping_ts, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS test_specification = sw_mapping_ts.test_specification @@ -4334,7 +4170,6 @@ def put(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return ret def delete(self): @@ -4362,19 +4197,16 @@ def delete(self): api_id = get_parent_api_id(sw_mapping_ts, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # test_specification = sw_mapping_ts.test_specification @@ -4396,7 +4228,6 @@ def delete(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return ret @@ -4439,7 +4270,6 @@ def post(self): try: relation_to_item = relation_to_query.one() except NoResultFound: - dbi.engine.dispose() return "Parent mapping not found", 404 api_id = relation_to_item.api.id @@ -4451,27 +4281,22 @@ def post(self): try: relation_to_item = relation_to_query.one() except NoResultFound: - dbi.engine.dispose() return "Parent mapping not found", 404 api_id = get_parent_api_id(relation_to_item, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS else: - dbi.engine.dispose() return "Bad request!!!", 400 try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if relation_to == 'api': @@ -4486,7 +4311,6 @@ def post(self): sw_requirement = dbi.session.query(SwRequirementModel).filter( SwRequirementModel.id == sw_requirement_id).one() except NoResultFound: - dbi.engine.dispose() return "Sw Requirement not found", 400 del sw_requirement # Just need to check it exists @@ -4496,7 +4320,6 @@ def post(self): # `status` field should be skipped because a default is assigned in the model for check_field in [x for x in TestCase.fields if x not in ['status']]: if check_field.replace("_", "-") not in request_data['test-case'].keys(): - dbi.engine.dispose() return "Bad request. Not consistent data.", 400 title = request_data['test-case']['title'] @@ -4515,7 +4338,6 @@ def post(self): SwRequirementTestCaseModel.id == relation_id).filter( SwRequirementTestCaseModel.test_case_id == tc.id).all() if len(tc_mapping) > 0: - dbi.engine.dispose() return "Test Case already associated to the selected api Specification section.", 409 new_test_case = TestCaseModel(repository, @@ -4540,18 +4362,15 @@ def post(self): if len(dbi.session.query(SwRequirementTestCaseModel).filter( SwRequirementTestCaseModel.id == relation_id).filter( SwRequirementTestCaseModel.test_case_id == test_case_id).all()) > 0: - dbi.engine.dispose() return "Test Case already associated to the selected Api and Sw Requirement.", 409 try: test_case = dbi.session.query(TestCaseModel).filter( TestCaseModel.id == test_case_id).one() except NoResultFound: - dbi.engine.dispose() return "Bad request.", 400 if not isinstance(test_case, TestCaseModel): - dbi.engine.dispose() return "Bad request.", 400 new_test_case_mapping_sw_requirement = SwRequirementTestCaseModel(api_sr, @@ -4574,7 +4393,6 @@ def post(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return new_test_case_mapping_sw_requirement.as_dict() def put(self): @@ -4600,19 +4418,16 @@ def put(self): api_id = get_parent_api_id(sw_mapping_tc, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS test_case = sw_mapping_tc.test_case @@ -4650,7 +4465,6 @@ def put(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return ret def delete(self): @@ -4677,19 +4491,16 @@ def delete(self): api_id = get_parent_api_id(sw_mapping_tc, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # test_case = sw_mapping_tc.test_case @@ -4711,7 +4522,6 @@ def delete(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return ret @@ -4752,7 +4562,6 @@ def post(self): SwRequirementTestSpecificationModel.id == relation_id ) else: - dbi.engine.dispose() return "Bad request!!!", 400 try: @@ -4768,19 +4577,16 @@ def post(self): sr_ts = relation_to_item api_id = get_parent_api_id(sr_ts, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS test_specification_id = request_data['test-specification']['id'] @@ -4790,7 +4596,6 @@ def post(self): test_specification = dbi.session.query(TestSpecificationModel).filter( TestSpecificationModel.id == test_specification_id).one() except NoResultFound: - dbi.engine.dispose() return "Test Specification not found", 400 del test_specification # Just need to check it exists @@ -4800,7 +4605,6 @@ def post(self): # `status` field should be skipped because a default is assigned in the model for check_field in [x for x in TestCase.fields if x not in ['status']]: if check_field.replace("_", "-") not in request_data['test-case'].keys(): - dbi.engine.dispose() return "Bad request. Not consistent data.", 400 title = request_data['test-case']['title'] @@ -4819,7 +4623,6 @@ def post(self): TestSpecificationTestCaseModel.id == relation_id).filter( TestSpecificationTestCaseModel.test_case_id == tc.id).all() if len(tc_mapping) > 0: - dbi.engine.dispose() return "Test Case already associated to the selected api Specification section.", 409 new_test_case = TestCaseModel(repository, @@ -4844,18 +4647,15 @@ def post(self): if len(dbi.session.query(TestSpecificationTestCaseModel).filter( TestSpecificationTestCaseModel.id == relation_id).filter( TestSpecificationTestCaseModel.test_case_id == test_case_id).all()) > 0: - dbi.engine.dispose() return "Test Case already associated to the selected Api and Test Specification.", 409 try: test_case = dbi.session.query(TestCaseModel).filter( TestCaseModel.id == test_case_id).one() except NoResultFound: - dbi.engine.dispose() return "Bad request.", 400 if not isinstance(test_case, TestCaseModel): - dbi.engine.dispose() return "Bad request.", 400 new_test_case_mapping_test_specification = TestSpecificationTestCaseModel(api_ts, @@ -4878,7 +4678,6 @@ def post(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return new_test_case_mapping_test_specification.as_dict() def put(self): @@ -4921,13 +4720,11 @@ def put(self): return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS api_id = get_parent_api_id(sr_ts, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS else: return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS @@ -4935,7 +4732,6 @@ def put(self): # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS test_case = ts_mapping_tc.test_case @@ -4972,7 +4768,6 @@ def put(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return ret def delete(self): @@ -5016,13 +4811,11 @@ def delete(self): return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS api_id = get_parent_api_id(sr_ts, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS else: return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS @@ -5030,7 +4823,6 @@ def delete(self): # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # test_case = ts_mapping_tc.test_case @@ -5052,7 +4844,6 @@ def delete(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return ret @@ -5081,7 +4872,6 @@ def post(self): # Permissions permissions = get_api_user_permissions(asr_mapping.api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS new_sr = SwRequirementModel(asr_mapping.sw_requirement.title, @@ -5093,7 +4883,6 @@ def post(self): asr_mapping.sw_requirement_id = new_sr.id dbi.session.commit() - dbi.engine.dispose() return asr_mapping.as_dict() @@ -5122,18 +4911,15 @@ def post(self): # Permissions api_id = get_parent_api_id(sr_sr_mapping, dbi.session) if not api_id: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS try: api = dbi.session.query(ApiModel).filter(ApiModel.id == api_id).one() except NoResultFound: - dbi.engine.dispose() return SW_COMPONENT_NOT_FOUND_MESSAGE, NOT_FOUND_STATUS permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions or user.role not in USER_ROLES_WRITE_PERMISSIONS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS new_sr = SwRequirementModel(sr_sr_mapping.sw_requirement.title, @@ -5145,7 +4931,6 @@ def post(self): sr_sr_mapping.sw_requirement_id = new_sr.id dbi.session.commit() - dbi.engine.dispose() return sr_sr_mapping.as_dict() @@ -5211,7 +4996,6 @@ def post(self): user.token = str(uuid4()) dbi.session.add(user) dbi.session.commit() - dbi.engine.dispose() return {"id": user.id, "role": user.role, @@ -5249,7 +5033,6 @@ def post(self): '') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return {"id": user.id, "email": user.email, @@ -5293,7 +5076,6 @@ def get(self): return f"User {request_data['email']} not found.", 403 target_user_permissions = get_api_user_permissions(api, target_user.id, dbi.session) - dbi.engine.dispose() return {"email": request_data["email"], "api": request_data["api-id"], "role": target_user.role, @@ -5376,7 +5158,6 @@ def put(self): dbi.session.add(api) dbi.session.commit() - dbi.engine.dispose() return {"email": request_data["email"], "api": request_data["api-id"], @@ -5398,11 +5179,9 @@ def put(self): user = get_active_user_from_request(request_data, dbi.session) if not isinstance(user, UserModel): - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if user.role not in USER_ROLES_MANAGE_USERS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS try: @@ -5410,14 +5189,12 @@ def put(self): UserModel.email == request_data["email"] ).one() except NoResultFound: - dbi.engine.dispose() return f"User {request_data['email']} not found.", 403 target_user.enabled = int(request_data['enabled']) dbi.session.add(target_user) dbi.session.commit() - dbi.engine.dispose() return {"email": request_data["email"], "enabled": target_user.enabled} @@ -5447,8 +5224,6 @@ def get(self): ).all() # users = [{k: v for k, v in user.as_dict().items() if k not in ['pwd']} for user in users] - dbi.engine.dispose() - return [x.as_dict(full_data=True) for x in users] @@ -5467,11 +5242,9 @@ def put(self): user = get_active_user_from_request(request_data, dbi.session) if not isinstance(user, UserModel): - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if user.role not in USER_ROLES_MANAGE_USERS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS try: @@ -5479,12 +5252,10 @@ def put(self): UserModel.email == request_data["email"] ).one() except NoResultFound: - dbi.engine.dispose() return f"User {request_data['email']} not found.", 403 target_user.pwd = request_data['password'] dbi.session.commit() - dbi.engine.dispose() return {"email": request_data["email"]} @@ -5507,11 +5278,9 @@ def put(self): user = get_active_user_from_request(request_data, dbi.session) if not isinstance(user, UserModel): - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if user.role not in USER_ROLES_MANAGE_USERS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS try: @@ -5519,12 +5288,10 @@ def put(self): UserModel.email == request_data["email"] ).one() except NoResultFound: - dbi.engine.dispose() return f"User {request_data['email']} not found.", 403 target_user.role = request_data['role'] dbi.session.commit() - dbi.engine.dispose() return {"email": request_data["email"]} @@ -5541,7 +5308,6 @@ def delete(self): user = get_active_user_from_request(request_data, dbi.session) if not isinstance(user, UserModel): - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # It should be possible to delete 1 notification filtering with `id` @@ -5573,7 +5339,6 @@ def delete(self): notifications[i].read_by = ",".join([str(x) for x in read_by]) dbi.session.commit() - dbi.engine.dispose() return "Notification updated" def get(self): @@ -5584,7 +5349,6 @@ def get(self): user = get_active_user_from_request(request_data, dbi.session) if not isinstance(user, UserModel): - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS user_api_notifications = [] @@ -5615,7 +5379,6 @@ def put(self): user = get_active_user_from_request(request_data, dbi.session) if not isinstance(user, UserModel): - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if not user.api_notifications: @@ -5635,7 +5398,6 @@ def put(self): user.api_notifications = ",".join([str(x) for x in user_api_notifications]) dbi.session.add(user) dbi.session.commit() - dbi.engine.dispose() return "Notification updated" @@ -5722,8 +5484,6 @@ def post(self): dbi.session.commit() return "Error", 400 - dbi.engine.dispose() - return new_ssh_key.as_dict() def delete(self): @@ -5755,7 +5515,6 @@ def delete(self): if os.path.exists(f'{SSH_KEYS_PATH}/{ssh_key.id}'): os.remove(f'{SSH_KEYS_PATH}/{ssh_key.id}') - dbi.engine.dispose() return True @@ -5805,8 +5564,6 @@ def post(self): if not isinstance(user, UserModel): return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS - dbi.engine.dispose() - # Test Run Configuration test_config_ret, test_config_status = add_test_run_config(dbi, request_data, user) if test_config_status not in [OK_STATUS, CREATED_STATUS]: @@ -5838,13 +5595,11 @@ def get(self): # Find api api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS runs_query = dbi.session.query(TestRunModel).join( @@ -5897,7 +5652,6 @@ def post(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Test Run Configuration @@ -5972,7 +5726,6 @@ def post(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return new_test_run.as_dict() @@ -5992,13 +5745,11 @@ def put(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS mapping_model = None @@ -6060,7 +5811,6 @@ def put(self): dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return run.as_dict() def delete(self): @@ -6079,13 +5829,11 @@ def delete(self): # Find api api = get_api_from_request(request_data, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user.id, dbi.session) if 'w' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS mapping_model = None @@ -6139,7 +5887,6 @@ def delete(self): f'/mapping/{api.id}') dbi.session.add(notifications) dbi.session.commit() - dbi.engine.dispose() return run_dict @@ -6163,13 +5910,11 @@ def get(self): # Find api api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS try: @@ -6224,13 +5969,11 @@ def get(self): # Find api api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS try: @@ -6271,13 +6014,11 @@ def get(self): # Find api api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if os.path.exists(TESTRUN_PRESET_FILEPATH): @@ -6326,13 +6067,11 @@ def get(self): # Find api api = get_api_from_request(args, dbi.session) if not api: - dbi.engine.dispose() return NOT_FOUND_MESSAGE, NOT_FOUND_STATUS # Permissions permissions = get_api_user_permissions(api, user_id, dbi.session) if 'r' not in permissions: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if preset: @@ -6630,11 +6369,9 @@ def get(self): user = get_active_user_from_request(args, dbi.session) if not isinstance(user, UserModel): - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if user.role not in USER_ROLES_MANAGE_USERS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if os.path.exists(TESTRUN_PRESET_FILEPATH): @@ -6658,11 +6395,9 @@ def put(self): user = get_active_user_from_request(request_data, dbi.session) if not isinstance(user, UserModel): - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS if user.role not in USER_ROLES_MANAGE_USERS: - dbi.engine.dispose() return UNAUTHORIZED_MESSAGE, UNAUTHORIZED_STATUS # Validate the content diff --git a/api/testrun.py b/api/testrun.py index bf011ce..d4293cf 100644 --- a/api/testrun.py +++ b/api/testrun.py @@ -82,10 +82,6 @@ class TestRunner: mapping = None DB_NAME = 'basil.db' - def __del__(self): - if self.dbi: - self.dbi.engine.dispose() - def __init__(self, id): self.id = id self.dbi = db_orm.DbInterface(self.DB_NAME) diff --git a/db/models/init_db.py b/db/models/init_db.py index 934c449..880448b 100644 --- a/db/models/init_db.py +++ b/db/models/init_db.py @@ -64,7 +64,6 @@ def initialization(db_name='basil.db'): dbi.session.add(test_user) dbi.session.commit() - dbi.engine.dispose() if __name__ == "__main__": From 9232571fa9846a0dd0c6ae2627a557a9d35014d8 Mon Sep 17 00:00:00 2001 From: Luigi Pellecchia Date: Sat, 7 Dec 2024 18:03:58 +0100 Subject: [PATCH 2/2] TestCase conflict should take into account also the test case title because the same file into a repo can be use to implement multiple test cases. Signed-off-by: Luigi Pellecchia --- api/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/api/api.py b/api/api.py index 9426b22..7e8263b 100644 --- a/api/api.py +++ b/api/api.py @@ -2245,6 +2245,7 @@ def post(self): # Check if the same Test Case is already associated with the same snippet if len(dbi.session.query(ApiTestCaseModel).join(TestCaseModel).filter( ApiTestCaseModel.section == section).filter( + TestCaseModel.title == repository).filter( TestCaseModel.repository == repository).filter( TestCaseModel.relative_path == relative_path).all()) > 0: return "Test Case already associated to the current api.", 409