diff --git a/tests/foreman/api/test_ansible.py b/tests/foreman/api/test_ansible.py index 829f225938d..276c2bce242 100644 --- a/tests/foreman/api/test_ansible.py +++ b/tests/foreman/api/test_ansible.py @@ -602,3 +602,91 @@ def test_negative_ansible_job_timeout_to_kill( assert [i['output'] for i in result if i['output'] == termination_msg] assert [i['output'] for i in result if i['output'] == 'StandardError: Job execution failed'] assert [i['output'] for i in result if i['output'] == 'Exit status: 120'] + + @pytest.mark.tier2 + @pytest.mark.no_containers + @pytest.mark.rhel_ver_list([settings.content_host.default_rhel_version]) + def test_positive_ansible_job_privilege_escalation( + self, + target_sat, + rhel_contenthost, + module_org, + module_location, + module_ak_with_synced_repo, + ): + """Verify privilege escalation defined inside ansible playbook tasks is working + when executing the playbook via Ansible - Remote Execution + + :id: 8c63fd1a-2121-4cce-9ec1-ae12817c9cc4 + + :steps: + 1. Register a RHEL host to Satellite. + 2. Setup a user on that host. + 3. Create a playbook. + 4. Set the SSH user to the created user, and unset the Effective user. + 5. Run the playbook. + + :expectedresults: In the playbook, created user is expected instead root user. + + :BZ: 1955385 + + :customerscenario: true + """ + playbook = ''' + --- + - name: Test Play + hosts: all + gather_facts: false + tasks: + - name: Check current user + command: bash -c "whoami" + register: def_user + - debug: + var: def_user.stdout + - name: Check become user + command: bash -c "whoami" + become: true + become_user: testing + register: bec_user + - debug: + var: bec_user.stdout + ''' + result = rhel_contenthost.register( + module_org, module_location, module_ak_with_synced_repo.name, target_sat + ) + assert result.status == 0, f'Failed to register host: {result.stderr}' + assert rhel_contenthost.execute('useradd testing').status == 0 + pwd = rhel_contenthost.execute( + f'echo {settings.server.ssh_password} | passwd testing --stdin' + ) + assert 'passwd: all authentication tokens updated successfully.' in pwd.stdout + template_id = ( + target_sat.api.JobTemplate() + .search(query={'search': 'name="Ansible - Run playbook"'})[0] + .id + ) + job = target_sat.api.JobInvocation().run( + synchronous=False, + data={ + 'job_category': 'Ansible Playbook', + 'job_template_id': template_id, + 'search_query': f'name = {rhel_contenthost.hostname}', + 'targeting_type': 'static_query', + 'inputs': {'playbook': playbook}, + }, + ) + target_sat.wait_for_tasks( + f'resource_type = JobInvocation and resource_id = {job["id"]}', + poll_timeout=1000, + ) + + result = target_sat.api.JobInvocation(id=job['id']).read() + assert result.pending == 0 + assert result.succeeded == 1 + assert result.status_label == 'succeeded' + + task = target_sat.wait_for_tasks( + f'resource_type = JobInvocation and resource_id = {job["id"]}', + ) + assert '"def_user.stdout": "root"' in task[0].humanized['output'] + assert '"bec_user.stdout": "testing"' in task[0].humanized['output']