diff --git a/src/auto_grader/cli.py b/src/auto_grader/cli.py index 394bd32..c437d49 100644 --- a/src/auto_grader/cli.py +++ b/src/auto_grader/cli.py @@ -1,305 +1,26 @@ -from click.shell_completion import CompletionItem import click import os -import zipfile -import tarfile import shutil -import subprocess -import shlex import tomllib -import re -import numpy as np -from subprocess import CompletedProcess -from unidecode import unidecode -from typing import Optional -SUCCESS_BOX = "[\033[0;92mSUCCESS\033[0m]" -ERROR_BOX = "[\033[0;91mERROR\033[0m]" -WARNING_BOX = "[\033[0;93mWARNING\033[0m]" +from auto_grader.utils.display import SUCCESS_BOX, WARNING_BOX + +from auto_grader.commands.test.run import run +from auto_grader.commands.test.compile import compile +from auto_grader.commands.test.dissect import dissect +from auto_grader.commands.test.correctness import correctness + +from auto_grader.commands.archives import list_assign, unpack readme_str = "# Grading-dir for NHR MPI course assignements\n" + \ "## Usage\n" +\ "some usage\n" -shell_colors = {'br-red': 91, 'br-green': 92, 'br-yellow': 93} - -def color_string(string: str, color: str): - result_string = string - if color in shell_colors: - result_string = f"\033[0;{shell_colors[color]}m{string}\033[0m" - return result_string - - -def indent_text(text: str, indent: int, indent_char: str = ' '): - lines = text.split('\n') - - for idx, line in enumerate(lines): - lines[idx] = indent_char*indent + line - return '\n'.join(lines) - - -class test_type: - build = "BUILD" - run = "RUN" - correctness = "CORRECTNESS" - - -class run_log: - def __init__(self, - ttype: str, - run_cmd: str, - run_abs_dir: str, - run_success: bool, - summary: str, - cmd_return_code: int, - stdout: str, - stderr: str): - self.ttype = ttype - self.run_cmd = run_cmd - self.run_abs_dir = run_abs_dir - self.run_success = run_success - self.summary = summary - self.cmd_return_code = cmd_return_code - self.stdout = stdout - self.stderr = stderr - - def as_str(self, indent=0, indent_char=' ', color=True): - color_func = ( - (lambda s, c: color_string(s, c)) - if color - else (lambda s, c: s) - ) - - header = f"[{self.ttype}]" - success_string = color_func( - 'SUCCESS', 'br-green') if self.run_success else color_func('FAILURE', 'br-red') - run_info = \ - "[RUN INFO]\n" +\ - f" - COMMAND : {self.run_cmd}\n" +\ - f" - RUN AT : {self.run_abs_dir}\n" +\ - f" - RUN SUCCESS : {success_string}" - - if not self.run_success: - run_info += "\n - FAILURE SUMMARY:\n" - run_info += indent_text(self.summary, 8) - - cmd_info = \ - "[CMD INFO]\n" +\ - f" - RETURN CODE : {self.cmd_return_code}\n" +\ - f" - STDOUT : \n" +\ - indent_text(self.stdout, 8) + '\n' +\ - f" - STDERR : \n" +\ - indent_text(self.stderr, 8) - - final_string = '\n'.join([header, - indent_text(run_info, 4), indent_text(cmd_info, 4)]) - final_string = indent_text( - final_string, indent=indent, indent_char=indent_char) - return final_string - - def oneline(self, type_as_prefix=True, color=True): - color_func = ( - (lambda s, c: color_string(s, c)) - if color - else (lambda s, c: s) - ) - oneline = "" - if type_as_prefix: - oneline += f"[{self.ttype}]:" - oneline += f"[{os.path.basename(self.run_abs_dir)}]: " - if self.run_success and self.cmd_return_code == 0: - oneline += f"{color_func('SUCCESS', 'br-green')}" - if not self.run_success: - oneline += f"{color_func('FAILURE', 'br-red')} (shl)(hint: " - rest_str_len = len(oneline+'...)') - err_int = self.summary[:rest_str_len].replace("\n", " ") - oneline += err_int+"...)" - if self.run_success and not self.cmd_return_code == 0: - oneline += f"{color_func('FAILURE', 'br-red')} (cmd)(stderr: " - rest_str_len = len(oneline+'...)') - err_int = self.stderr[:rest_str_len].replace("\n", " ") - oneline += err_int+"...)" - return oneline - - -class StudentVarType(click.ParamType): - name = "student" - - def shell_complete(self, ctx, param, incomplete): - if 'path' not in ctx.params: - return click.Path.shell_complete(click.Path(), ctx, param, incomplete) - else: - return [CompletionItem(dir) for dir in os.listdir(ctx.params['path']) if dir.startswith(incomplete)] - - -def _unpack(file_path, dest_path): - success = False - if zipfile.is_zipfile(file_path): - with zipfile.ZipFile(file_path, 'r') as z: - z.extractall(dest_path) - success = True - elif tarfile.is_tarfile(file_path): - with tarfile.open(file_path, 'r:*') as t: - t.extractall(dest_path, filter='data') - success = True - return success - - -def _unpack_tree(dir_path): - for dir, _, filenames in os.walk(dir_path): - for filename in filenames: - archive_path = os.path.join(dir, filename) - if zipfile.is_zipfile(archive_path) or tarfile.is_tarfile(archive_path): - extract_dir = os.path.join(dir, os.path.splitext(filename)[0]) - os.makedirs(extract_dir, exist_ok=True) - _unpack(archive_path, extract_dir) - os.remove(archive_path) - - -# - SRC-TOOLS -----------------------------------------------------------------# -def _add_attribute_match(match, attribute: str): - prefix = match.group(1) - func_name = match.group(2) - suffix = match.group(3) - if attribute in prefix: - return match.group(0) - return f"{prefix} {attribute} {func_name}{suffix}" - - -def add_attributes(filename, functions, attribute: str): - pattern = re.compile( - r"^(\s*(?:(?:\w+[\s\*]+)+))" - r"(" + "|".join(map(re.escape, functions)) + r")" - r"(\s*\([^)]*\)\s*\{)", - re.MULTILINE - ) - with open(filename, "r") as f: - code = f.read() - - def sub_func(match): return _add_attribute_match(match, attribute) - new_code = pattern.sub(sub_func, code) - with open(filename, "w") as f: - f.write(new_code) - - -def _weaken_file(filename, functions): - add_attributes(filename, functions, '__attribute__((weak))') - - -def _staticize_file(filename, functions): - add_attributes(filename, functions, 'static') - - -def _rmv_attribute_match(match, attribute: str): - prefix = match.group(1) - func_name = match.group(2) - suffix = match.group(3) - new_prefix = re.sub(rf'\b{re.escape(attribute)}\b\s+', '', prefix) - return f"{new_prefix}{func_name}{suffix}" - - -def rmv_attributes_file(filename, functions, attribute: str): - pattern = re.compile( - r"^(\s*(?:(?:\w+[\s\*]+)+))" # prefix (type, qualifiers, etc.) - r"(" + "|".join(map(re.escape, functions)) + r")" # function name - r"(\s*\([^)]*\)\s*\{)", # argument list + opening brace - re.MULTILINE - ) - with open(filename, "r") as f: - code = f.read() - - def remove_func(match): return _rmv_attribute_match(match, attribute) - new_code = pattern.sub(remove_func, code) - with open(filename, "w") as f: - f.write(new_code) - - -def _remove_static_attribute(match): - prefix = match.group(1) - func_name = match.group(2) - suffix = match.group(3) - new_prefix = re.sub(r'\bstatic\s+', '', prefix) - return f"{new_prefix}{func_name}{suffix}" - - -def _unstaticize_file(filename, functions): - rmv_attributes_file(filename, functions, 'static') - - -def _extract_prototypes_to_header(filename, functions, header_filename): - func_pattern = re.compile( - r"^(\s*(?:(?:\w+[\s\*]+)+))" - r"(" + "|".join(functions) + r")" - r"(\s*\([^)]*\)\s*\{)", - re.MULTILINE - ) - - with open(filename, "r") as f: - code = f.read() - matches = func_pattern.findall(code) - - if not matches: - click.echo( - f"{WARNING_BOX}: {functions} not found in {os.path.relpath(filename)}.") - return - - prototypes = [] - for prefix, func_name, args in matches: - proto = f"{prefix.strip()} static {func_name}{args.strip()[:-1]};" - prototypes.append(proto) - prototypes = list(dict.fromkeys(prototypes)) - - header_exists = os.path.exists(header_filename) - with open(header_filename, "w" if header_exists else "w") as header: - header.write('#include "solver.h"'+"\n") - for proto in prototypes: - header.write(proto + "\n") - - include_line = f'#include "{os.path.basename(header_filename)}"\n' - if include_line not in code: - new_code = include_line + code - with open(filename, "w") as f: - f.write(new_code) -# -----------------------------------------------------------------------------# - - -def is_submission_dir(dir_path): - return os.path.isdir(dir_path) and "_assignsubmission_file" in dir_path - - -def is_assignment_archive(path): - result = True - if not os.path.isfile(path): - result = False - if not zipfile.is_zipfile(path) and not tarfile.is_tarfile(path): - result = False - if not ("Assignment" in path): - result = False - return result - - -def _make_names_pairs(dir_path): - student_pairs = [(unidecode(name[:name.find('_')].lower().replace( - ' ', '_')), os.path.join(dir_path, name)) for name in os.listdir( - dir_path) if is_submission_dir(os.path.join(dir_path, name))] - return student_pairs - - -def _identify_archives(path): - asg_archives = [ - f"{path}/{file}" for file in os.listdir(path) if is_assignment_archive(f"{path}/{file}")] - numbers = [0]*len(asg_archives) - basenames = [""]*len(asg_archives) - for idx, asg in enumerate(asg_archives): - name = os.path.basename(asg) - basenames[idx] = name - number = name[name.find("Assignment")+len("Assignment") + - 1:].replace('-', ' ').split(sep=' ')[0] - numbers[idx] = int(number) - pairs = list(zip(numbers, basenames, asg_archives)) - pairs.sort(key=lambda x: x[0]) - return pairs +def parse_config(config_path): + with open(config_path, 'rb') as handle: + config = tomllib.load(handle) + print(config) @click.group() @@ -320,64 +41,6 @@ def init(path): click.echo(f"{SUCCESS_BOX} :{os.path.basename(path)} inited!") -@click.command("list") -@click.argument('path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) -def list_assign(path): - pairs = _identify_archives(path) - for number, archive, _ in pairs: - click.echo(f"[{number}] -> {archive}") - - -@click.command() -@click.argument('number', required=True, type=click.INT) -@click.option('-p', '--path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) -@click.option('-o', '--output-path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) -@click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.') -def unpack(path, number, yes, output_path): - pairs = _identify_archives(path) - idx = -1 - if len(pairs) > 0: - numbers, names, paths = zip(*pairs) - else: - numbers = tuple() - names = tuple() - paths = tuple() - if number not in numbers: - detected_str = str(numbers).replace('(', '').replace(')', '') - if len(numbers) == 0: - detected_str = 'None' - click.echo( - f"{ERROR_BOX}: Unavailable assignment. Detected : {detected_str}.", err=True) - return - else: - idx = numbers.index(number) - - if not yes and not click.confirm(f"Are you sure you want to unpack {names[idx]}?"): - return - tmp_dir = os.path.join(output_path, 'tmp') - tmp_dir_shortname = os.path.relpath(tmp_dir) - if os.path.isdir(tmp_dir): - if not yes and not click.confirm(f"I am going to delete {tmp_dir_shortname}, confirm?"): - return - else: - shutil.rmtree(tmp_dir) - os.makedirs(tmp_dir) - _unpack(paths[idx], tmp_dir) - _unpack_tree(tmp_dir) - - name_origin_pairs = _make_names_pairs(tmp_dir) - for name, origin in name_origin_pairs: - destination = os.path.join(output_path, name, f"assignment_{number}") - if not yes and os.path.exists(destination) and not click.confirm(f"I am going to override {os.path.relpath(destination)}, confirm?"): - click.echo( - f"{WARNING_BOX}: Ingoring assignement_{number} for {name}") - continue - if os.path.exists(destination): - shutil.rmtree(destination) - shutil.copytree(origin, destination) - shutil.rmtree(tmp_dir) - - @click.command() @click.argument('number', required=True, type=click.INT) @click.option('-p', '--path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) @@ -416,412 +79,11 @@ def collect(number, path, output_path, yes): handle.write(f"origin:\n\t{os.path.relpath(root)}\n") -def build_root(root_path: str, build_cmd: str, timeout: int = 10): - cwd = os.getcwd() - os.chdir(root_path) - run_result = True - exception_summary = "" - result = CompletedProcess('', -1, '', '') - try: - result = subprocess.run(shlex.split(build_cmd), - timeout=10, text=True, capture_output=True) - except Exception as e: - run_result = False - exception_summary = str(e) - - os.chdir(cwd) - log = run_log(test_type.build, build_cmd, os.path.abspath(root_path), run_result, - exception_summary, result.returncode, result.stdout, result.stderr) - return log - - -def run_root(root_path: str, build_cmd: str, timeout: int = 10, exe: Optional[str] = None): +@click.command() +@click.option('-p', '--path', default='', type=click.Path(exists=True, file_okay=True, dir_okay=True, resolve_path=True)) +def dev(path): + parse_config(path) pass - # cwd = os.getcwd() - # os.chdir(root_path) - # run_result = True - # exception_summary = "" - # result = CompletedProcess('', -1, '', '') - # try: - # result = subprocess.run(shlex.split(build_cmd), - # timeout=10, text=True, capture_output=True) - # except Exception as e: - # run_result = False - # exception_summary = str(e) - # - # os.chdir(cwd) - # log = run_log(test_type.build, build_cmd, os.path.abspath(root_path), run_result, - # exception_summary, result.returncode, result.stdout, result.stderr) - # return log - - -def clean_root(root_path: str, clean: str, timeout: int = 10): - return build_root(root_path, clean, timeout=timeout) - - -@click.command() -@click.argument('number', required=True, type=click.INT) -@click.option('-p', '--path', default='./roots', - type=click.Path(exists=True, file_okay=False, - dir_okay=True, resolve_path=True), - help='Path to directory conaining student roots in format (student)/assignment_[NUMBER]/root_(...).') -@click.option('-s', '--student', default='', type=StudentVarType(), - help='Specify wich student to run the test for. If omitted test is run for all found students.') -@click.option('-c', '--command', default='make', type=click.STRING, - help='Overrides command that is run to build roots. The default command is `make`.') -@click.option('--clean-command', default='make distclean', type=click.STRING, - help='Overrides command that is run to build roots. The default command is `make distclean`.') -@click.option('-t', '--timeout', default=10, type=click.INT, - help='Sets timeout of both clean and build commands, defaults to 10s.') -def compile(number, path, command, student, timeout, clean_command): - current_wd = os.getcwd() - students = os.listdir(path) - if student and student not in students: - click.echo( - f"{ERROR_BOX}: No student {student} in {os.path.relpath(path)}.") - return - if student: - students = [student] - asg_dirs = [os.path.join( - path, student, f"assignment_{number}") for student in students] - for asg_dir, student in zip(asg_dirs, students): - if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir): - click.echo( - f"{WARNING_BOX}: No {student}/assignment_{number} was found, skipping...") - continue - local_roots = [os.path.join(asg_dir, dir) - for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))] - if len(local_roots) == 0: - click.echo( - f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") - continue - click.echo(f"[{student}]:") - for root in local_roots: - os.chdir(root) - clean_log = clean_root(root, clean_command) - if not clean_log.run_success or not clean_log.cmd_return_code == 0: - click.echo( - f"{indent_text('[CLEAN]:' + clean_log.oneline(type_as_prefix=False), 4)}") - continue - log = build_root(root, command, timeout=timeout) - clean_log = clean_root(root, clean_command) - if not clean_log.run_success or not clean_log.cmd_return_code == 0: - click.echo( - f"{indent_text('[CLEAN]:' + clean_log.oneline(type_as_prefix=False), 4)}") - continue - click.echo(f"{indent_text(log.oneline(type_as_prefix=False), 4)}") - os.chdir(current_wd) - - -@click.command() -@click.argument('number', required=True, type=click.INT) -@click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) -@click.option('-c', '--compile-command', default='make', type=click.STRING) -@click.option('-r', '--run-command', default='mpirun', type=click.STRING) -@click.option('-n', '--nproc-flag', default="-n 5", type=click.STRING) -@click.option('-v', '--verbose', is_flag=True, default=False) -@click.option('-s', '--solution-path', default='./solutions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) -def correctness(number, solution_path, path, compile_command, run_command, nproc_flag, verbose): - students = os.listdir(path) - asg_dirs = [os.path.join( - path, student, f"assignment_{number}") for student in students] - - for asg_dir, student in zip(asg_dirs, students): - - current_wd = os.getcwd() - - if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir): - click.echo( - f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") - continue - local_roots = [os.path.join(asg_dir, dir) - for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))] - if len(local_roots) == 0: - click.echo( - f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") - continue - - if verbose: - click.echo(f"Testing compile for {student}:assignment_{number}") - else: - click.echo(f"({student}:assignment_{number}):") - - for root in local_roots: - os.chdir(current_wd) - if verbose: - click.echo(f"\t[{os.path.basename(root)}]: ", nl=False) - click.echo(f"entering {os.path.relpath(root)}") - os.chdir(root) - result = subprocess.run( - shlex.split("make distclean"), capture_output=True, text=True) - - result = subprocess.run( - shlex.split(compile_command), capture_output=True, text=True) - - if verbose: - if (result.returncode != 0): - click.echo( - f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False) - else: - click.echo( - f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: ", nl=False) - click.echo(f"stdout: {result.stdout}") - click.echo(f"stderr: {result.stderr}") - else: - if (result.returncode != 0): - click.echo( - f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False) - click.echo(f"stderr: {result.stderr[:69]:69s}...") - - else: - click.echo( - f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: SUCCESS") - - executables = [os.path.join(root, file) - for file in os.listdir(root) if 'exe-' in file and os.path.isfile(os.path.join(root, file))] - - sol_as_path = os.path.join(solution_path, f"assignment_{number}") - config_path = os.path.join(sol_as_path, "test_config.toml") - if not os.path.isdir(sol_as_path): - click.echo( - f"{ERROR_BOX}: No reference solution for assignment_{number} found.", err=True) - return - if not os.path.exists(config_path) or not os.path.isfile(config_path): - click.echo( - f"{ERROR_BOX}: No test configuration for assignment_{number} found.", err=True) - return - - config = dict() - with open(config_path, 'rb') as handle: - config = tomllib.load(handle) - - if not 'ref_solution' in config["correctness"]: - click.echo( - f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True) - return - - ref_solution = config['correctness']['ref_solution'] - ref_solution_path = os.path.join(sol_as_path, ref_solution) - - if not 'ref_param' in config["correctness"]: - click.echo( - f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True) - return - ref_param = config['correctness']['ref_param'] - ref_param_path = os.path.join(sol_as_path, ref_param) - local_run_command = " ".join([run_command, nproc_flag, - executables[0], ref_param_path]) - local_solution_path = os.path.join(root, 'p.dat') - - if os.path.exists(local_solution_path): - os.remove(local_solution_path) - - try: - result = subprocess.run( - shlex.split(local_run_command), capture_output=True, text=True, timeout=10) - except subprocess.TimeoutExpired: - click.echo("Run Timeout") - result.returncode = 1 - - if verbose: - if (result.returncode != 0): - click.echo( - f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False) - continue - else: - click.echo( - f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: ", nl=False) - click.echo(f"stdout: {result.stdout}") - click.echo(f"stderr: {result.stderr}") - else: - if (result.returncode != 0): - click.echo( - f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False) - click.echo(f"stderr: {result.stderr[:69]:69s}...") - continue - else: - click.echo( - f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: SUCCESS") - - if not os.path.exists(local_solution_path): - data_files = [file for file in os.listdir( - root) if '.dat' in file] - print(data_files) - continue - - local_solution_data = np.loadtxt(local_solution_path, usecols=True) - ref_solution_data = np.loadtxt(ref_solution_path, usecols=True) - - if local_solution_data.shape != ref_solution_data.shape: - click.echo("Correctness Test Failed, Dimenstions Not Matching") - continue - elif not np.allclose(local_solution_data, ref_solution_data, rtol=1e-1): - click.echo("Correctness Test Failed: Data Not Matching") - continue - else: - click.echo("Correctness Test Passed") - - -@click.command() -@click.argument('number', required=True, type=click.INT) -@click.option('-s', '--solution-path', default='./solutions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) -@click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) -def dissect(number, solution_path, path): - - sol_as_path = os.path.join(solution_path, f"assignment_{number}") - config_path = os.path.join(sol_as_path, "test_config.toml") - - if not os.path.isdir(sol_as_path): - click.echo( - f"{ERROR_BOX}: No reference solution for assignment_{number} found.", err=True) - return - if not os.path.exists(config_path) or not os.path.isfile(config_path): - click.echo( - f"{ERROR_BOX}: No test configuration for assignment_{number} found.", err=True) - return - - config = dict() - with open(config_path, 'rb') as handle: - config = tomllib.load(handle) - - if not 'test_functions' in config["dissect"]: - click.echo( - f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True) - return - test_functions = config['dissect']['test_functions'] - - if not 'build_cmd' in config["dissect"]: - click.echo( - f"{ERROR_BOX}: No build config found for assignment_{number}.", err=True) - return - static_functions = config['dissect']['static_functions'] - - if not 'build_cmd' in config["dissect"]: - click.echo( - f"{ERROR_BOX}: No build config found for assignment_{number}.", err=True) - return - build_cmd = config['dissect']['build_cmd'] - - if not 'run_cmd' in config["dissect"]: - click.echo( - f"{ERROR_BOX}: No run config found for assignment_{number}.", err=True) - return - run_cmd = config['dissect']['run_cmd'] - - if not 'solution_files' in config["dissect"]: - click.echo( - f"{ERROR_BOX}: No solution_files config found for assignment_{number}.", err=True) - return - solution_files = config['dissect']['solution_files'] - - if not 'ref_solution' in config["correctness"]: - click.echo( - f"{ERROR_BOX}: No ref_solution config found for assignment_{number}.", err=True) - return - ref_solution = config['correctness']['ref_solution'] - - if not 'user_solution' in config["correctness"]: - click.echo( - f"{ERROR_BOX}: No user_solution config found for assignment_{number}.", err=True) - return - user_solution = config['correctness']['user_solution'] - - students = os.listdir(path) - asg_dirs = [os.path.join( - path, student, f"assignment_{number}") for student in students] - - for asg_dir, student in zip(asg_dirs, students): - if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir): - click.echo( - f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") - continue - local_roots = [os.path.join(asg_dir, dir) - for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))] - if len(local_roots) == 0: - click.echo( - f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") - continue - current_wd = os.getcwd() - click.echo(f"({student}:assignment_{number}):") - - flags = [f"-DGRDR_TEST_{func.upper()}" for func in test_functions] - for root in local_roots: - root_src_paths = [os.path.join(root, 'src', file) - for file in solution_files] - root_des_paths = [os.path.join( - sol_as_path, 'src', file) for file in solution_files] - for src, dest in zip(root_src_paths, root_des_paths): - if not os.path.exists(src): - click.echo( - f"\t[{os.path.basename(root)}]:{ERROR_BOX}: Missing source {os.path.basename(src)}") - shutil.copyfile(src, dest) - - _extract_prototypes_to_header( - dest, static_functions, os.path.join(sol_as_path, 'src', 'aid.h')) - - _unstaticize_file(dest, test_functions) - _weaken_file(dest, test_functions) - _staticize_file(dest, static_functions) - - result = 0 - os.chdir(sol_as_path) - - for function, flag in zip(test_functions, flags): - command = build_cmd + f" CFLAGS+={flag}" - result = subprocess.run( - ['make', 'distclean'], capture_output=True, text=True) - result = subprocess.run( - shlex.split(command), capture_output=True, text=True, timeout=10) - if result.returncode == 0: - click.echo( - f"\t[{os.path.basename(root)}]:[BUILD]:[{function}]:{SUCCESS_BOX}") - else: - click.echo( - f"\t[{os.path.basename(root)}]:[BUILD]:[{function}]:{ERROR_BOX}") - click.echo(f"\t\tstderr: {result.stderr[:61]:61s}...") - continue - - ref_sol_path = os.path.join(sol_as_path, ref_solution) - usr_sol_path = os.path.join(sol_as_path, user_solution) - - if os.path.exists(usr_sol_path): - os.remove(usr_sol_path) - - try: - result = subprocess.run( - shlex.split(run_cmd), capture_output=True, text=True, timeout=10) - except subprocess.TimeoutExpired: - result.returncode = 1 - click.echo( - f"\t[{os.path.basename(root)}]:[RUN]:[{function}]:{ERROR_BOX} -- Timeout") - - if result.returncode == 0: - click.echo( - f"\t[{os.path.basename(root)}]:[RUN]:[{function}]:{SUCCESS_BOX}") - else: - click.echo( - f"\t[{os.path.basename(root)}]:[RUN]:[{function}]:{ERROR_BOX}") - click.echo(f"\t\tstderr: {result.stderr[:61]:61s}...") - continue - - ref_sol_data = np.loadtxt(ref_sol_path) - usr_sol_data = np.loadtxt(usr_sol_path) - - if np.allclose(ref_sol_data, usr_sol_data, rtol=1e-1) == 0: - click.echo( - f"\t[{os.path.basename(root)}]:[CORRECT]:[{function}]:{SUCCESS_BOX}") - else: - click.echo( - f"\t[{os.path.basename(root)}]:[CORRECT]:[{function}]:{ERROR_BOX}") - continue - os.chdir(current_wd) - - -# @click.command() -# @click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) -# @click.option('-s', '--student', default='', type=StudentVarType()) -# def dev(path, student): -# pass @click.group() @@ -840,7 +102,7 @@ def test(): cli.add_command(init) -# cli.add_command(dev) +cli.add_command(dev) cli.add_command(archives) cli.add_command(roots) cli.add_command(test) @@ -850,6 +112,7 @@ archives.add_command(unpack) roots.add_command(collect) +test.add_command(run) test.add_command(correctness) test.add_command(compile) test.add_command(dissect) diff --git a/src/auto_grader/commands/__init__.py b/src/auto_grader/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/auto_grader/commands/archives.py b/src/auto_grader/commands/archives.py new file mode 100644 index 0000000..0aac2e7 --- /dev/null +++ b/src/auto_grader/commands/archives.py @@ -0,0 +1,127 @@ +import click +import zipfile +import tarfile +import os +import shutil +from unidecode import unidecode +from auto_grader.utils.display import ERROR_BOX, WARNING_BOX + + +def _unpack(file_path, dest_path): + success = False + if zipfile.is_zipfile(file_path): + with zipfile.ZipFile(file_path, 'r') as z: + z.extractall(dest_path) + success = True + elif tarfile.is_tarfile(file_path): + with tarfile.open(file_path, 'r:*') as t: + t.extractall(dest_path, filter='data') + success = True + return success + + +def _unpack_tree(dir_path): + for dir, _, filenames in os.walk(dir_path): + for filename in filenames: + archive_path = os.path.join(dir, filename) + if zipfile.is_zipfile(archive_path) or tarfile.is_tarfile(archive_path): + extract_dir = os.path.join(dir, os.path.splitext(filename)[0]) + os.makedirs(extract_dir, exist_ok=True) + _unpack(archive_path, extract_dir) + os.remove(archive_path) + + +def _is_submission_dir(dir_path): + return os.path.isdir(dir_path) and "_assignsubmission_file" in dir_path + + +def _is_assignment_archive(path): + result = True + if not os.path.isfile(path): + result = False + if not zipfile.is_zipfile(path) and not tarfile.is_tarfile(path): + result = False + if not ("Assignment" in path): + result = False + return result + + +def _make_names_pairs(dir_path): + student_pairs = [(unidecode(name[:name.find('_')].lower().replace( + ' ', '_')), os.path.join(dir_path, name)) for name in os.listdir( + dir_path) if _is_submission_dir(os.path.join(dir_path, name))] + return student_pairs + + +def _identify_archives(path): + asg_archives = [ + f"{path}/{file}" for file in os.listdir(path) if _is_assignment_archive(f"{path}/{file}")] + numbers = [0]*len(asg_archives) + basenames = [""]*len(asg_archives) + for idx, asg in enumerate(asg_archives): + name = os.path.basename(asg) + basenames[idx] = name + number = name[name.find("Assignment")+len("Assignment") + + 1:].replace('-', ' ').split(sep=' ')[0] + numbers[idx] = int(number) + pairs = list(zip(numbers, basenames, asg_archives)) + pairs.sort(key=lambda x: x[0]) + return pairs + + +@click.command("list") +@click.argument('path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +def list_assign(path): + pairs = _identify_archives(path) + for number, archive, _ in pairs: + click.echo(f"[{number}] -> {archive}") + + +@click.command() +@click.argument('number', required=True, type=click.INT) +@click.option('-p', '--path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('-o', '--output-path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.') +def unpack(path, number, yes, output_path): + pairs = _identify_archives(path) + idx = -1 + if len(pairs) > 0: + numbers, names, paths = zip(*pairs) + else: + numbers = tuple() + names = tuple() + paths = tuple() + if number not in numbers: + detected_str = str(numbers).replace('(', '').replace(')', '') + if len(numbers) == 0: + detected_str = 'None' + click.echo( + f"{ERROR_BOX}: Unavailable assignment. Detected : {detected_str}.", err=True) + return + else: + idx = numbers.index(number) + + if not yes and not click.confirm(f"Are you sure you want to unpack {names[idx]}?"): + return + tmp_dir = os.path.join(output_path, 'tmp') + tmp_dir_shortname = os.path.relpath(tmp_dir) + if os.path.isdir(tmp_dir): + if not yes and not click.confirm(f"I am going to delete {tmp_dir_shortname}, confirm?"): + return + else: + shutil.rmtree(tmp_dir) + os.makedirs(tmp_dir) + _unpack(paths[idx], tmp_dir) + _unpack_tree(tmp_dir) + + name_origin_pairs = _make_names_pairs(tmp_dir) + for name, origin in name_origin_pairs: + destination = os.path.join(output_path, name, f"assignment_{number}") + if not yes and os.path.exists(destination) and not click.confirm(f"I am going to override {os.path.relpath(destination)}, confirm?"): + click.echo( + f"{WARNING_BOX}: Ingoring assignement_{number} for {name}") + continue + if os.path.exists(destination): + shutil.rmtree(destination) + shutil.copytree(origin, destination) + shutil.rmtree(tmp_dir) diff --git a/src/auto_grader/commands/test/__init__.py b/src/auto_grader/commands/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/auto_grader/commands/test/compile.py b/src/auto_grader/commands/test/compile.py new file mode 100644 index 0000000..8282c45 --- /dev/null +++ b/src/auto_grader/commands/test/compile.py @@ -0,0 +1,60 @@ +import click +import os + +from auto_grader.types import StudentVarType +from auto_grader.utils.display import indent_text, ERROR_BOX, WARNING_BOX +from auto_grader.utils import build_root, clean_root + + +@click.command() +@click.argument('number', required=True, type=click.INT) +@click.option('-p', '--path', default='./roots', + type=click.Path(exists=True, file_okay=False, + dir_okay=True, resolve_path=True), + help='Path to directory conaining student roots in format (student)/assignment_[NUMBER]/root_(...).') +@click.option('-s', '--student', default='', type=StudentVarType(), + help='Specify wich student to run the test for. If omitted test is run for all found students.') +@click.option('-c', '--command', default='make', type=click.STRING, + help='Overrides command that is run to build roots. The default command is `make`.') +@click.option('--clean-command', default='make distclean', type=click.STRING, + help='Overrides command that is run to build roots. The default command is `make distclean`.') +@click.option('-t', '--timeout', default=10, type=click.INT, + help='Sets timeout of both clean and build commands, defaults to 10s.') +def compile(number, path, command, student, timeout, clean_command): + current_wd = os.getcwd() + students = os.listdir(path) + if student and student not in students: + click.echo( + f"{ERROR_BOX}: No student {student} in {os.path.relpath(path)}.") + return + if student: + students = [student] + asg_dirs = [os.path.join( + path, student, f"assignment_{number}") for student in students] + for asg_dir, student in zip(asg_dirs, students): + if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir): + click.echo( + f"{WARNING_BOX}: No {student}/assignment_{number} was found, skipping...") + continue + local_roots = [os.path.join(asg_dir, dir) + for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))] + if len(local_roots) == 0: + click.echo( + f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") + continue + click.echo(f"[{student}]:") + for root in local_roots: + os.chdir(root) + clean_log = clean_root(root, clean_command) + if not clean_log.run_success or not clean_log.cmd_return_code == 0: + click.echo( + f"{indent_text('[CLEAN]:' + clean_log.oneline(type_as_prefix=False), 4)}") + continue + log = build_root(root, command, timeout=timeout) + click.echo(f"{indent_text(log.oneline(type_as_prefix=False), 4)}") + clean_log = clean_root(root, clean_command) + if not clean_log.run_success or not clean_log.cmd_return_code == 0: + click.echo( + f"{indent_text('[CLEAN]:' + clean_log.oneline(type_as_prefix=False), 4)}") + continue + os.chdir(current_wd) diff --git a/src/auto_grader/commands/test/correctness.py b/src/auto_grader/commands/test/correctness.py new file mode 100644 index 0000000..cce7aed --- /dev/null +++ b/src/auto_grader/commands/test/correctness.py @@ -0,0 +1,156 @@ +import click +import os +import subprocess +import shlex +import tomllib +import numpy as np +from auto_grader.utils.display import ERROR_BOX, WARNING_BOX + + +@click.command() +@click.argument('number', required=True, type=click.INT) +@click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('-c', '--compile-command', default='make', type=click.STRING) +@click.option('-r', '--run-command', default='mpirun', type=click.STRING) +@click.option('-n', '--nproc-flag', default="-n 5", type=click.STRING) +@click.option('-v', '--verbose', is_flag=True, default=False) +@click.option('-s', '--solution-path', default='./solutions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +def correctness(number, solution_path, path, compile_command, run_command, nproc_flag, verbose): + students = os.listdir(path) + asg_dirs = [os.path.join( + path, student, f"assignment_{number}") for student in students] + + for asg_dir, student in zip(asg_dirs, students): + + current_wd = os.getcwd() + + if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir): + click.echo( + f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") + continue + local_roots = [os.path.join(asg_dir, dir) + for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))] + if len(local_roots) == 0: + click.echo( + f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") + continue + + if verbose: + click.echo(f"Testing compile for {student}:assignment_{number}") + else: + click.echo(f"({student}:assignment_{number}):") + + for root in local_roots: + os.chdir(current_wd) + if verbose: + click.echo(f"\t[{os.path.basename(root)}]: ", nl=False) + click.echo(f"entering {os.path.relpath(root)}") + os.chdir(root) + result = subprocess.run( + shlex.split("make distclean"), capture_output=True, text=True) + + result = subprocess.run( + shlex.split(compile_command), capture_output=True, text=True) + + if verbose: + if (result.returncode != 0): + click.echo( + f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False) + else: + click.echo( + f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: ", nl=False) + click.echo(f"stdout: {result.stdout}") + click.echo(f"stderr: {result.stderr}") + else: + if (result.returncode != 0): + click.echo( + f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False) + click.echo(f"stderr: {result.stderr[:69]:69s}...") + + else: + click.echo( + f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: SUCCESS") + + executables = [os.path.join(root, file) + for file in os.listdir(root) if 'exe-' in file and os.path.isfile(os.path.join(root, file))] + + sol_as_path = os.path.join(solution_path, f"assignment_{number}") + config_path = os.path.join(sol_as_path, "test_config.toml") + if not os.path.isdir(sol_as_path): + click.echo( + f"{ERROR_BOX}: No reference solution for assignment_{number} found.", err=True) + return + if not os.path.exists(config_path) or not os.path.isfile(config_path): + click.echo( + f"{ERROR_BOX}: No test configuration for assignment_{number} found.", err=True) + return + + config = dict() + with open(config_path, 'rb') as handle: + config = tomllib.load(handle) + + if not 'ref_solution' in config["correctness"]: + click.echo( + f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True) + return + + ref_solution = config['correctness']['ref_solution'] + ref_solution_path = os.path.join(sol_as_path, ref_solution) + + if not 'ref_param' in config["correctness"]: + click.echo( + f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True) + return + ref_param = config['correctness']['ref_param'] + ref_param_path = os.path.join(sol_as_path, ref_param) + local_run_command = " ".join([run_command, nproc_flag, + executables[0], ref_param_path]) + local_solution_path = os.path.join(root, 'p.dat') + + if os.path.exists(local_solution_path): + os.remove(local_solution_path) + + try: + result = subprocess.run( + shlex.split(local_run_command), capture_output=True, text=True, timeout=10) + except subprocess.TimeoutExpired: + click.echo("Run Timeout") + result.returncode = 1 + + if verbose: + if (result.returncode != 0): + click.echo( + f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False) + continue + else: + click.echo( + f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: ", nl=False) + click.echo(f"stdout: {result.stdout}") + click.echo(f"stderr: {result.stderr}") + else: + if (result.returncode != 0): + click.echo( + f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False) + click.echo(f"stderr: {result.stderr[:69]:69s}...") + continue + else: + click.echo( + f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: SUCCESS") + + if not os.path.exists(local_solution_path): + data_files = [file for file in os.listdir( + root) if '.dat' in file] + print(data_files) + continue + + local_solution_data = np.loadtxt(local_solution_path, usecols=True) + ref_solution_data = np.loadtxt(ref_solution_path, usecols=True) + + if local_solution_data.shape != ref_solution_data.shape: + click.echo("Correctness Test Failed, Dimenstions Not Matching") + continue + elif not np.allclose(local_solution_data, ref_solution_data, rtol=1e-1): + click.echo("Correctness Test Failed: Data Not Matching") + continue + else: + click.echo("Correctness Test Passed") diff --git a/src/auto_grader/commands/test/dissect.py b/src/auto_grader/commands/test/dissect.py new file mode 100644 index 0000000..403974e --- /dev/null +++ b/src/auto_grader/commands/test/dissect.py @@ -0,0 +1,262 @@ +import click +import os +import tomllib +import shutil +import re +import subprocess +import shlex +import numpy as np + +from auto_grader.utils.display import SUCCESS_BOX, ERROR_BOX, WARNING_BOX + + +def _add_attribute_match(match, attribute: str): + prefix = match.group(1) + func_name = match.group(2) + suffix = match.group(3) + if attribute in prefix: + return match.group(0) + return f"{prefix} {attribute} {func_name}{suffix}" + + +def add_attributes(filename, functions, attribute: str): + pattern = re.compile( + r"^(\s*(?:(?:\w+[\s\*]+)+))" + r"(" + "|".join(map(re.escape, functions)) + r")" + r"(\s*\([^)]*\)\s*\{)", + re.MULTILINE + ) + with open(filename, "r") as f: + code = f.read() + + def sub_func(match): return _add_attribute_match(match, attribute) + new_code = pattern.sub(sub_func, code) + with open(filename, "w") as f: + f.write(new_code) + + +def _weaken_file(filename, functions): + add_attributes(filename, functions, '__attribute__((weak))') + + +def _staticize_file(filename, functions): + add_attributes(filename, functions, 'static') + + +def _rmv_attribute_match(match, attribute: str): + prefix = match.group(1) + func_name = match.group(2) + suffix = match.group(3) + new_prefix = re.sub(rf'\b{re.escape(attribute)}\b\s+', '', prefix) + return f"{new_prefix}{func_name}{suffix}" + + +def rmv_attributes_file(filename, functions, attribute: str): + pattern = re.compile( + r"^(\s*(?:(?:\w+[\s\*]+)+))" # prefix (type, qualifiers, etc.) + r"(" + "|".join(map(re.escape, functions)) + r")" # function name + r"(\s*\([^)]*\)\s*\{)", # argument list + opening brace + re.MULTILINE + ) + with open(filename, "r") as f: + code = f.read() + + def remove_func(match): return _rmv_attribute_match(match, attribute) + new_code = pattern.sub(remove_func, code) + with open(filename, "w") as f: + f.write(new_code) + + +def _unstaticize_file(filename, functions): + rmv_attributes_file(filename, functions, 'static') + + +def _extract_prototypes_to_header(filename, functions, header_filename): + func_pattern = re.compile( + r"^(\s*(?:(?:\w+[\s\*]+)+))" + r"(" + "|".join(functions) + r")" + r"(\s*\([^)]*\)\s*\{)", + re.MULTILINE + ) + + with open(filename, "r") as f: + code = f.read() + matches = func_pattern.findall(code) + + if not matches: + click.echo( + f"{WARNING_BOX}: {functions} not found in {os.path.relpath(filename)}.") + return + + prototypes = [] + for prefix, func_name, args in matches: + proto = f"{prefix.strip()} static {func_name}{args.strip()[:-1]};" + prototypes.append(proto) + prototypes = list(dict.fromkeys(prototypes)) + + header_exists = os.path.exists(header_filename) + with open(header_filename, "w" if header_exists else "w") as header: + header.write('#include "solver.h"'+"\n") + for proto in prototypes: + header.write(proto + "\n") + + include_line = f'#include "{os.path.basename(header_filename)}"\n' + if include_line not in code: + new_code = include_line + code + with open(filename, "w") as f: + f.write(new_code) + + +@click.command() +@click.argument('number', required=True, type=click.INT) +@click.option('-s', '--solution-path', default='./solutions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +def dissect(number, solution_path, path): + + sol_as_path = os.path.join(solution_path, f"assignment_{number}") + config_path = os.path.join(sol_as_path, "test_config.toml") + + if not os.path.isdir(sol_as_path): + click.echo( + f"{ERROR_BOX}: No reference solution for assignment_{number} found.", err=True) + return + if not os.path.exists(config_path) or not os.path.isfile(config_path): + click.echo( + f"{ERROR_BOX}: No test configuration for assignment_{number} found.", err=True) + return + + config = dict() + with open(config_path, 'rb') as handle: + config = tomllib.load(handle) + + if not 'test_functions' in config["dissect"]: + click.echo( + f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True) + return + test_functions = config['dissect']['test_functions'] + + if not 'build_cmd' in config["dissect"]: + click.echo( + f"{ERROR_BOX}: No build config found for assignment_{number}.", err=True) + return + static_functions = config['dissect']['static_functions'] + + if not 'build_cmd' in config["dissect"]: + click.echo( + f"{ERROR_BOX}: No build config found for assignment_{number}.", err=True) + return + build_cmd = config['dissect']['build_cmd'] + + if not 'run_cmd' in config["dissect"]: + click.echo( + f"{ERROR_BOX}: No run config found for assignment_{number}.", err=True) + return + run_cmd = config['dissect']['run_cmd'] + + if not 'solution_files' in config["dissect"]: + click.echo( + f"{ERROR_BOX}: No solution_files config found for assignment_{number}.", err=True) + return + solution_files = config['dissect']['solution_files'] + + if not 'ref_solution' in config["correctness"]: + click.echo( + f"{ERROR_BOX}: No ref_solution config found for assignment_{number}.", err=True) + return + ref_solution = config['correctness']['ref_solution'] + + if not 'user_solution' in config["correctness"]: + click.echo( + f"{ERROR_BOX}: No user_solution config found for assignment_{number}.", err=True) + return + user_solution = config['correctness']['user_solution'] + + students = os.listdir(path) + asg_dirs = [os.path.join( + path, student, f"assignment_{number}") for student in students] + + for asg_dir, student in zip(asg_dirs, students): + if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir): + click.echo( + f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") + continue + local_roots = [os.path.join(asg_dir, dir) + for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))] + if len(local_roots) == 0: + click.echo( + f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") + continue + current_wd = os.getcwd() + click.echo(f"({student}:assignment_{number}):") + + flags = [f"-DGRDR_TEST_{func.upper()}" for func in test_functions] + for root in local_roots: + root_src_paths = [os.path.join(root, 'src', file) + for file in solution_files] + root_des_paths = [os.path.join( + sol_as_path, 'src', file) for file in solution_files] + for src, dest in zip(root_src_paths, root_des_paths): + if not os.path.exists(src): + click.echo( + f"\t[{os.path.basename(root)}]:{ERROR_BOX}: Missing source {os.path.basename(src)}") + shutil.copyfile(src, dest) + + _extract_prototypes_to_header( + dest, static_functions, os.path.join(sol_as_path, 'src', 'aid.h')) + + _unstaticize_file(dest, test_functions) + _weaken_file(dest, test_functions) + _staticize_file(dest, static_functions) + + result = 0 + os.chdir(sol_as_path) + + for function, flag in zip(test_functions, flags): + command = build_cmd + f" CFLAGS+={flag}" + result = subprocess.run( + ['make', 'distclean'], capture_output=True, text=True) + result = subprocess.run( + shlex.split(command), capture_output=True, text=True, timeout=10) + if result.returncode == 0: + click.echo( + f"\t[{os.path.basename(root)}]:[BUILD]:[{function}]:{SUCCESS_BOX}") + else: + click.echo( + f"\t[{os.path.basename(root)}]:[BUILD]:[{function}]:{ERROR_BOX}") + click.echo(f"\t\tstderr: {result.stderr[:61]:61s}...") + continue + + ref_sol_path = os.path.join(sol_as_path, ref_solution) + usr_sol_path = os.path.join(sol_as_path, user_solution) + + if os.path.exists(usr_sol_path): + os.remove(usr_sol_path) + + try: + result = subprocess.run( + shlex.split(run_cmd), capture_output=True, text=True, timeout=10) + except subprocess.TimeoutExpired: + result.returncode = 1 + click.echo( + f"\t[{os.path.basename(root)}]:[RUN]:[{function}]:{ERROR_BOX} -- Timeout") + + if result.returncode == 0: + click.echo( + f"\t[{os.path.basename(root)}]:[RUN]:[{function}]:{SUCCESS_BOX}") + else: + click.echo( + f"\t[{os.path.basename(root)}]:[RUN]:[{function}]:{ERROR_BOX}") + click.echo(f"\t\tstderr: {result.stderr[:61]:61s}...") + continue + + ref_sol_data = np.loadtxt(ref_sol_path) + usr_sol_data = np.loadtxt(usr_sol_path) + + if np.allclose(ref_sol_data, usr_sol_data, rtol=1e-1) == 0: + click.echo( + f"\t[{os.path.basename(root)}]:[CORRECT]:[{function}]:{SUCCESS_BOX}") + else: + click.echo( + f"\t[{os.path.basename(root)}]:[CORRECT]:[{function}]:{ERROR_BOX}") + continue + os.chdir(current_wd) diff --git a/src/auto_grader/commands/test/run.py b/src/auto_grader/commands/test/run.py new file mode 100644 index 0000000..a31b8c8 --- /dev/null +++ b/src/auto_grader/commands/test/run.py @@ -0,0 +1,110 @@ +import click +from auto_grader.types import StudentVarType + + +@click.command(help='Builds and Runs full student root. The run command is served as \n\n' + + '[COMMAND] [PROCOPT] [NUMPROC] [EXECUTABLE-PREFIX]\n\n' + + 'Example with defaults:\n\n' + + 'mpirun -n 4 exe ') +@click.argument('number', required=True, type=click.INT) +@click.option('-p', '--path', default='./roots', + type=click.Path(exists=True, file_okay=False, + dir_okay=True, resolve_path=True), + help='Path to directory conaining student roots in format (student)/assignment_[NUMBER]/root_(...).') +@click.option('--solution-path', default='./solutions', + type=click.Path(exists=True, file_okay=False, + dir_okay=True, resolve_path=True), + help='Path to directory conaining solution root in format assignment_[NUMBER]/. Solution must contain configuration test_config.toml defining ') +@click.option('-s', '--student', default='', type=StudentVarType(), + help='Specify wich student to run the test for. If omitted test is run for all found students.') +@click.option('-c', '--command', default='mpirun', type=click.STRING, + help='Overrides command that is used to run roots. The default command is `mpirun`.') +@click.option('--procopt', default='-n ', type=click.STRING, + help='Indicates the option flag to be used to specify process number from [COMMAND]. The default value is `-n `.') +@click.option('-n', '--numproc', default=4, type=click.INT, + help='Selects the number of mpi processes used to run the root. The default value is `4`.') +@click.option('--executable-prefix', default='', type=click.STRING, + help='This prefix is appended to the executable name when the run command is launched (some launchers require `./`. The default value is ``.') +@click.option('--compile-command', default='make', type=click.STRING, + help='Overrides command that is run to build roots. The default command is `make`.') +@click.option('--clean-command', default='make distclean', type=click.STRING, + help='Overrides command that is run to build roots. The default command is `make distclean`.') +@click.option('-t', '--timeout', default=10, type=click.INT, + help='Sets timeout of both clean and build commands, defaults to 10s.') +@click.option('-e', '--exe-name', default='', type=click.STRING, + help='Specify an executable name instead of determining it at runtime.') +@click.pass_context +def run( + context, + number, + path, + solution_path, + student, + command, + procopt, + numproc, + executable_prefix, + timeout, + compile_command, + clean_command, + exe_name +): + pass + # current_wd = os.getcwd() + # students = os.listdir(path) + # if student and student not in students: + # click.echo( + # f"{ERROR_BOX}: No student {student} in {os.path.relpath(path)}.") + # return + # if student: + # students = [student] + # asg_dirs = [os.path.join( + # path, student, f"assignment_{number}") for student in students] + # + # for asg_dir, student in zip(asg_dirs, students): + # + # if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir): + # click.echo( + # f"{WARNING_BOX}: No {student}/assignment_{number} was found, skipping...") + # continue + # + # local_roots = [os.path.join(asg_dir, dir) + # for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))] + # if len(local_roots) == 0: + # click.echo( + # f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") + # continue + # click.echo(f"[{student}]:") + # + # for root in local_roots: + # os.chdir(root) + # + # clean_log = clean_root(root, clean_command) + # if not clean_log.run_success or not clean_log.cmd_return_code == 0: + # click.echo( + # f"{indent_text('[CLEAN]:' + clean_log.oneline(type_as_prefix=False), 4)}") + # continue + # + # executables = [file for file in os.listdir( + # root) if os.path.isfile(file) and file.startswith('exe-')] + # for executable in executables: + # os.remove(executable) + # + # build_log = build_root(root, compile_command, timeout=timeout) + # if not build_log.run_success or not build_log.cmd_return_code == 0: + # click.echo(f"{indent_text(build_log.oneline(), 4)}") + # continue + # executables = [file for file in os.listdir( + # root) if os.path.isfile(file) and file.startswith('exe-')] + # + # if len(executables) == 0: + # warn_string = f"[{os.path.basename(root)}]:{color_string('WARNING', 'br-yellow')} Build successfull but no exe-* was found, maybe non standard name? Skipping ..." + # click.echo(f"{indent_text(warn_string, 4)}") + # continue + # + # clean_log = clean_root(root, clean_command) + # if not clean_log.run_success or not clean_log.cmd_return_code == 0: + # click.echo( + # f"{indent_text('[CLEAN]:' + clean_log.oneline(type_as_prefix=False), 4)}") + # continue + # os.chdir(current_wd) diff --git a/src/auto_grader/types/__init__.py b/src/auto_grader/types/__init__.py new file mode 100644 index 0000000..619b121 --- /dev/null +++ b/src/auto_grader/types/__init__.py @@ -0,0 +1,98 @@ +import click +import os +from click.shell_completion import CompletionItem +from auto_grader.utils.display import color_string, indent_text + + +class test_type: + build = "BUILD" + run = "RUN" + correctness = "CORRECTNESS" + + +class run_log: + def __init__(self, + ttype: str, + run_cmd: str, + run_abs_dir: str, + run_success: bool, + summary: str, + cmd_return_code: int, + stdout: str, + stderr: str): + self.ttype = ttype + self.run_cmd = run_cmd + self.run_abs_dir = run_abs_dir + self.run_success = run_success + self.summary = summary + self.cmd_return_code = cmd_return_code + self.stdout = stdout + self.stderr = stderr + + def as_str(self, indent=0, indent_char=' ', color=True): + color_func = ( + (lambda s, c: color_string(s, c)) + if color + else (lambda s, c: s) + ) + + header = f"[{self.ttype}]" + success_string = color_func( + 'SUCCESS', 'br-green') if self.run_success else color_func('FAILURE', 'br-red') + run_info = \ + "[RUN INFO]\n" +\ + f" - COMMAND : {self.run_cmd}\n" +\ + f" - RUN AT : {self.run_abs_dir}\n" +\ + f" - RUN SUCCESS : {success_string}" + + if not self.run_success: + run_info += "\n - FAILURE SUMMARY:\n" + run_info += indent_text(self.summary, 8) + + cmd_info = \ + "[CMD INFO]\n" +\ + f" - RETURN CODE : {self.cmd_return_code}\n" +\ + f" - STDOUT : \n" +\ + indent_text(self.stdout, 8) + '\n' +\ + f" - STDERR : \n" +\ + indent_text(self.stderr, 8) + + final_string = '\n'.join([header, + indent_text(run_info, 4), indent_text(cmd_info, 4)]) + final_string = indent_text( + final_string, indent=indent, indent_char=indent_char) + return final_string + + def oneline(self, type_as_prefix=True, color=True): + color_func = ( + (lambda s, c: color_string(s, c)) + if color + else (lambda s, c: s) + ) + oneline = "" + if type_as_prefix: + oneline += f"[{self.ttype}]:" + oneline += f"[{os.path.basename(self.run_abs_dir)}]: " + if self.run_success and self.cmd_return_code == 0: + oneline += f"{color_func('SUCCESS', 'br-green')}" + if not self.run_success: + oneline += f"{color_func('FAILURE', 'br-red')} (shl)(hint: " + rest_str_len = len(oneline+'...)') + err_int = self.summary[:rest_str_len].replace("\n", " ") + oneline += err_int+"...)" + if self.run_success and not self.cmd_return_code == 0: + oneline += f"{color_func('FAILURE', 'br-red')} (cmd)(stderr: " + rest_str_len = len(oneline+'...)') + err_int = self.stderr[:rest_str_len].replace("\n", " ") + oneline += err_int+"...)" + return oneline + + +class StudentVarType(click.ParamType): + name = "student" + + def shell_complete(self, ctx, param, incomplete): + if 'path' not in ctx.params: + return click.Path.shell_complete(click.Path(), ctx, param, incomplete) + else: + return [CompletionItem(dir) for dir in os.listdir(ctx.params['path']) if dir.startswith(incomplete)] diff --git a/src/auto_grader/utils/__init__.py b/src/auto_grader/utils/__init__.py new file mode 100644 index 0000000..cd54546 --- /dev/null +++ b/src/auto_grader/utils/__init__.py @@ -0,0 +1,46 @@ +import os +import subprocess +from subprocess import CompletedProcess +from auto_grader.types import run_log, test_type +import shlex + + +def build_root(root_path: str, build_cmd: str, timeout: int = 10): + cwd = os.getcwd() + os.chdir(root_path) + run_result = True + exception_summary = "" + result = CompletedProcess('', -1, '', '') + try: + result = subprocess.run(shlex.split(build_cmd), + timeout=timeout, text=True, capture_output=True) + except Exception as e: + run_result = False + exception_summary = str(e) + os.chdir(cwd) + log = run_log(test_type.build, build_cmd, os.path.abspath(root_path), run_result, + exception_summary, result.returncode, result.stdout, result.stderr) + return log + + +def run_root(root_path: str, run_cmd: str, timeout: int = 10): + cwd = os.getcwd() + os.chdir(root_path) + run_result = True + exception_summary = "" + result = CompletedProcess('', -1, '', '') + try: + result = subprocess.run(shlex.split(run_cmd), + timeout=timeout, text=True, capture_output=True) + except Exception as e: + run_result = False + exception_summary = str(e) + + os.chdir(cwd) + log = run_log(test_type.build, test_type.run, os.path.abspath(root_path), run_result, + exception_summary, result.returncode, result.stdout, result.stderr) + return log + + +def clean_root(root_path: str, clean: str, timeout: int = 10): + return build_root(root_path, clean, timeout=timeout) diff --git a/src/auto_grader/utils/display.py b/src/auto_grader/utils/display.py new file mode 100644 index 0000000..0c9f470 --- /dev/null +++ b/src/auto_grader/utils/display.py @@ -0,0 +1,20 @@ +SUCCESS_BOX = "[\033[0;92mSUCCESS\033[0m]" +ERROR_BOX = "[\033[0;91mERROR\033[0m]" +WARNING_BOX = "[\033[0;93mWARNING\033[0m]" + +shell_colors = {'br-red': 91, 'br-green': 92, 'br-yellow': 93} + + +def color_string(string: str, color: str): + result_string = string + if color in shell_colors: + result_string = f"\033[0;{shell_colors[color]}m{string}\033[0m" + return result_string + + +def indent_text(text: str, indent: int, indent_char: str = ' '): + lines = text.split('\n') + + for idx, line in enumerate(lines): + lines[idx] = indent_char*indent + line + return '\n'.join(lines)