diff --git a/oidc-conformance-tests/configure_is.py b/oidc-conformance-tests/configure_is.py index 6618abc7251..53ddb67d8ba 100644 --- a/oidc-conformance-tests/configure_is.py +++ b/oidc-conformance-tests/configure_is.py @@ -55,6 +55,87 @@ def dcr(): print(response.text) +# retrieve the application ID associated with the provided application name +def get_application_id(name): + try: + response = requests.get(url=constants.APPLICATION_ENDPOINT, headers=constants.DCR_HEADERS, verify=False) + response.raise_for_status() + application_list = json.loads(response.text) + filtered_app_list = list(filter(lambda application_data: application_data["name"] == name, application_list["applications"])) + return filtered_app_list[0]["id"] + except HTTPError as http_error: + print(http_error) + print(response.text) + exit(1) + except Exception as error: + print("\nError occurred: " + str(error)) + exit(1) + else: + print("\nCompleted with status: " + str(response.status_code)) + print(response.text) + +# subscribe to the required APIs +def subscribe_apis(): + print("\nSubscribe to the required APIs") + + # get the application id related to `python_script` app + application_id = get_application_id(constants.DCR_CLIENT_NAME) + + try: + # Create payloads to subcribe the APIs + response = requests.get(url=constants.API_RESOURCE_ENDPOINT, headers=constants.DCR_HEADERS, verify=False) + response.raise_for_status() + total_api_resources_count = json.loads(response.text)["totalResults"] + response = requests.get(url=constants.API_RESOURCE_ENDPOINT + "?limit=" + str(total_api_resources_count), + headers=constants.DCR_HEADERS, verify=False) + response.raise_for_status() + api_resources = json.loads(response.text)["apiResources"] + + payloads = [] + api_identifiers = [] + for api_data in constants.SUBSCRIBE_APIS: + payloads.append({ "scopes": api_data["scopes"], "policyIdentifier": "RBAC" }) + api_identifiers.append(api_data["identifier"]) + + for api_resource in api_resources: + if api_resource["identifier"] in api_identifiers: + index = api_identifiers.index(api_resource["identifier"]) + payloads[index]["id"] = api_resource["id"] + + for payload in payloads: + response = requests.post(url=constants.APPLICATION_ENDPOINT + "/" + application_id + "/authorized-apis", headers=constants.DCR_HEADERS, + data=json.dumps(payload), verify=False) + response.raise_for_status() + + # Allow admin role to access the subscribed APIs + response = requests.get(url=constants.ROLES_ENDPOINT + "?filter=audience.type+eq+organization", headers=constants.DCR_HEADERS, verify=False) + response.raise_for_status() + admin_role = list(filter(lambda data: data["displayName"] == "admin", json.loads(response.text)["Resources"])) + admin_role_id = admin_role[0]["id"] + + role_update_payload = { + "name": constants.DCR_CLIENT_NAME, + "associatedRoles": { + "allowedAudience": "ORGANIZATION", + "roles": [ + { "id": admin_role_id } + ] + } + } + response = requests.patch(url=constants.APPLICATION_ENDPOINT + "/" + application_id, headers=constants.DCR_HEADERS, + data=json.dumps(role_update_payload), verify=False) + response.raise_for_status() + except HTTPError as http_error: + print(http_error) + print(response.text) + exit(1) + except Exception as error: + print("\nError occurred: " + str(error)) + exit(1) + else: + print("\nCompleted with status: " + str(response.status_code)) + print(response.text) + # obtain an access token with given client details and scope def get_access_token(client_id, client_secret, scope, url): body = { @@ -345,6 +426,14 @@ def generate_config_for_plan(service_provider1_config, service_provider2_config, json_config_builder(service_provider_1, service_provider_2, output_file_path, plan_name) +def get_required_scopes(): + scopes = constants.SCOPES + + for api_data in constants.SUBSCRIBE_APIS: + scopes += " " + " ".join(api_data["scopes"]) + + return scopes + warnings.filterwarnings("ignore") if not is_process_running("wso2server"): unpack_and_run(path_to_is_zip) @@ -353,7 +442,8 @@ def generate_config_for_plan(service_provider1_config, service_provider2_config, dcr() -access_token = get_access_token(constants.DCR_CLIENT_ID, constants.DCR_CLIENT_SECRET, constants.SCOPES, +subscribe_apis() +access_token = get_access_token(constants.DCR_CLIENT_ID, constants.DCR_CLIENT_SECRET, get_required_scopes(), constants.TOKEN_ENDPOINT) headers['Authorization'] = "Bearer " + access_token diff --git a/oidc-conformance-tests/constants.py b/oidc-conformance-tests/constants.py index 8dff88f6e2e..718d4cb5d3b 100644 --- a/oidc-conformance-tests/constants.py +++ b/oidc-conformance-tests/constants.py @@ -21,19 +21,39 @@ TOKEN_ENDPOINT = BASE_URL + "/oauth2/token" +DCR_CLIENT_NAME = "python_script" + DCR_CLIENT_ID = "oidc_test_clientid001" DCR_CLIENT_SECRET = "oidc_test_client_secret001" +SUBSCRIBE_APIS = [ + { + "identifier": "/api/server/v1/claim-dialects", + "scopes": ["internal_claim_meta_update"] + }, + { + "identifier": "/api/server/v1/applications", + "scopes": ["internal_application_mgt_create", "internal_application_mgt_update", "internal_application_mgt_view"] + }, + { + "identifier": "/api/server/v1/oidc/scopes", + "scopes": ["internal_oidc_scope_mgt_update"] + } +] + +API_RESOURCE_ENDPOINT = BASE_URL + "/api/server/v1/api-resources" + +ROLES_ENDPOINT = BASE_URL + "/scim2/v2/Roles" + APPLICATION_ENDPOINT = BASE_URL + "/api/server/v1/applications" -SCOPES = "internal_user_mgt_update internal_application_mgt_create internal_application_mgt_view internal_login " \ - "internal_claim_meta_update internal_application_mgt_update internal_scope_mgt_create" +SCOPES = "internal_user_mgt_update internal_login" DCR_HEADERS = {'Content-Type': 'application/json', 'Connection': 'keep-alive', 'Authorization': 'Basic YWRtaW46YWRtaW4='} DCR_BODY = { - 'client_name': 'python_script', + 'client_name': DCR_CLIENT_NAME, "grant_types": ["password"], "ext_param_client_id": DCR_CLIENT_ID, "ext_param_client_secret": DCR_CLIENT_SECRET,