diff --git a/setup.py b/setup.py index 77a1d1e..8383492 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name = "sioworkers", - version = '1.5.4', + version = '1.5.5', author = "SIO2 Project Team", author_email = 'sio2@sio2project.mimuw.edu.pl', description = "Programming contest judging infrastructure", diff --git a/sio/executors/common.py b/sio/executors/common.py index 462d626..1ab788b 100644 --- a/sio/executors/common.py +++ b/sio/executors/common.py @@ -18,6 +18,7 @@ def _populate_environ(renv, environ): environ[key] = renv.get(key, '') if 'out_file' in renv: environ['out_file'] = renv['out_file'] + environ['result_percentage'] = renv.get('result_percentage', (0, 1)) def _extract_input_if_zipfile(input_name, zipdir): diff --git a/sio/executors/interactive_common.py b/sio/executors/interactive_common.py index 907da2a..2d32bdf 100644 --- a/sio/executors/interactive_common.py +++ b/sio/executors/interactive_common.py @@ -27,6 +27,29 @@ def __init__(self, message, interactor_out, env, renv, irenv): ) +class Pipes: + """ + Class for storing file descriptors for interactor and solution processes. + """ + r_interactor = None + w_interactor = None + r_solution = None + w_solution = None + + def __init__(self, r_interactor, w_interactor, r_solution, w_solution): + """ + Constructor for Pipes class. + :param r_interactor: file descriptor from which the interactor reads from the solution + :param w_interactor: file descriptor to which the interactor writes to the solution + :param r_solution: file descriptor from which the solution reads from the interactor + :param w_solution: file descriptor to which the solution writes to the interactor + """ + self.r_interactor = r_interactor + self.w_interactor = w_interactor + self.r_solution = r_solution + self.w_solution = w_solution + + def _limit_length(s): if len(s) > RESULT_STRING_LENGTH_LIMIT: suffix = b'[...]' @@ -70,18 +93,7 @@ def _fill_result(env, renv, irenv, interactor_out): inter_sig = irenv.get('exit_signal', None) sigpipe = signal.SIGPIPE.value - if irenv['result_code'] != 'OK' and inter_sig != sigpipe: - renv['result_code'] = 'SE' - raise InteractorError(f'Interactor got {irenv["result_code"]}.', interactor_out, env, renv, irenv) - elif renv['result_code'] != 'OK' and sol_sig != sigpipe: - return - elif len(interactor_out) == 0: - renv['result_code'] = 'SE' - raise InteractorError(f'Empty interactor out.', interactor_out, env, renv, irenv) - elif inter_sig == sigpipe: - renv['result_code'] = 'WA' - renv['result_string'] = 'solution exited prematurely' - else: + if six.ensure_binary(interactor_out[0]) != b'': renv['result_string'] = '' if six.ensure_binary(interactor_out[0]) == b'OK': renv['result_code'] = 'OK' @@ -93,11 +105,25 @@ def _fill_result(env, renv, irenv, interactor_out): if interactor_out[1]: renv['result_string'] = _limit_length(interactor_out[1]) renv['result_percentage'] = (0, 1) + elif irenv['result_code'] != 'OK' and irenv['result_code'] != 'TLE' and inter_sig != sigpipe: + renv['result_code'] = 'SE' + raise InteractorError(f'Interactor got {irenv["result_code"]}.', interactor_out, env, renv, irenv) + elif renv['result_code'] != 'OK' and sol_sig != sigpipe: + return + elif inter_sig == sigpipe: + renv['result_code'] = 'WA' + renv['result_string'] = 'solution exited prematurely' + elif irenv.get('real_time_killed', False): + renv['result_code'] = 'TLE' + renv['result_string'] = 'interactor time limit exceeded (user\'s solution or interactor can be the cause)' + else: + raise InteractorError(f'Unexpected interactor error', interactor_out, env, renv, irenv) def _run(environ, executor, use_sandboxes): input_name = tempcwd('in') + num_processes = environ.get('num_processes', 1) file_executor = get_file_runner(executor, environ) interactor_executor = DetailedUnprotectedExecutor() exe_filename = file_executor.preferred_filename() @@ -113,13 +139,18 @@ def _run(environ, executor, use_sandboxes): os.mkdir(zipdir) try: input_name = _extract_input_if_zipfile(input_name, zipdir) + proc_pipes = [] - r1, w1 = os.pipe() - r2, w2 = os.pipe() - for fd in (r1, w1, r2, w2): - os.set_inheritable(fd, True) + for i in range(num_processes): + r1, w1 = os.pipe() + r2, w2 = os.pipe() + for fd in (r1, w1, r2, w2): + os.set_inheritable(fd, True) + proc_pipes.append(Pipes(r1, w2, r2, w1)) - interactor_args = [os.path.basename(input_name), 'out'] + interactor_args = [str(num_processes)] + for pipes in proc_pipes: + interactor_args.extend([str(pipes.r_interactor), str(pipes.w_interactor)]) interactor_time_limit = 2 * environ['exec_time_limit'] @@ -139,51 +170,68 @@ def run(self): except Exception as e: self.exception = e - with interactor_executor as ie: - interactor = ExecutionWrapper( - ie, - [tempcwd(interactor_filename)] + interactor_args, - stdin=r2, - stdout=w1, - ignore_errors=True, - environ=environ, - environ_prefix='interactor_', - mem_limit=DEFAULT_INTERACTOR_MEM_LIMIT, - time_limit=interactor_time_limit, - fds_to_close=(r2, w1), - close_passed_fd=True, - cwd=tempcwd(), - in_file=environ['in_file'], - ) - - with file_executor as fe: - exe = ExecutionWrapper( - fe, - tempcwd(exe_filename), - [], - stdin=r1, - stdout=w2, - ignore_errors=True, - environ=environ, - environ_prefix='exec_', - fds_to_close=(r1, w2), - close_passed_fd=True, - cwd=tempcwd(), - in_file=environ['in_file'], - ) - - exe.start() - interactor.start() - - exe.join() - interactor.join() - - for ew in (exe, interactor): - if ew.exception is not None: - raise ew.exception - - renv = exe.value - irenv = interactor.value + with open(input_name, 'rb') as infile, open(tempcwd('out'), 'wb') as outfile: + processes = [] + interactor_fds = [] + for pipes in proc_pipes: + interactor_fds.extend([pipes.r_interactor, pipes.w_interactor]) + + with interactor_executor as ie: + interactor = ExecutionWrapper( + ie, + [tempcwd(interactor_filename)] + interactor_args, + stdin=infile, + stdout=outfile, + ignore_errors=True, + environ=environ, + environ_prefix='interactor_', + mem_limit=DEFAULT_INTERACTOR_MEM_LIMIT, + time_limit=interactor_time_limit, + fds_to_close=interactor_fds, + pass_fds=interactor_fds, + cwd=tempcwd(), + ) + + for i in range(num_processes): + pipes = proc_pipes[i] + with file_executor as fe: + exe = ExecutionWrapper( + fe, + tempcwd(exe_filename), + [str(i)], + stdin=pipes.r_solution, + stdout=pipes.w_solution, + ignore_errors=True, + environ=environ, + environ_prefix='exec_', + fds_to_close=[pipes.r_solution, pipes.w_solution], + cwd=tempcwd(), + ) + processes.append(exe) + + for process in processes: + process.start() + interactor.start() + + for process in processes: + process.join() + interactor.join() + + if interactor.exception: + raise interactor.exception + for process in processes: + if process.exception: + raise process.exception + + renv = processes[0].value + for process in processes: + if process.value['result_code'] != 'OK': + renv = process.value + break + renv['time_used'] = max(renv['time_used'], process.value['time_used']) + renv['mem_used'] = max(renv['mem_used'], process.value['mem_used']) + + irenv = interactor.value try: with open(tempcwd('out'), 'rb') as result_file: diff --git a/sio/workers/executors.py b/sio/workers/executors.py index d11dc03..61364fd 100644 --- a/sio/workers/executors.py +++ b/sio/workers/executors.py @@ -80,6 +80,7 @@ def execute_command( ignore_errors=False, extra_ignore_errors=(), cwd=None, + pass_fds=(), fds_to_close=(), **kwargs, ): @@ -140,6 +141,7 @@ def execute_command( stderr=forward_stderr and subprocess.STDOUT or stderr, shell=True, close_fds=True, + pass_fds=pass_fds, universal_newlines=True, env=env, cwd=cwd,