import click import os import tomllib import shutil import re import subprocess import shlex import numpy as np from auto_grader.utils.display import SUCCESS_BOX, ERROR_BOX, WARNING_BOX def _add_attribute_match(match, attribute: str): prefix = match.group(1) func_name = match.group(2) suffix = match.group(3) if attribute in prefix: return match.group(0) return f"{prefix} {attribute} {func_name}{suffix}" def add_attributes(filename, functions, attribute: str): pattern = re.compile( r"^(\s*(?:(?:\w+[\s\*]+)+))" r"(" + "|".join(map(re.escape, functions)) + r")" r"(\s*\([^)]*\)\s*\{)", re.MULTILINE ) with open(filename, "r") as f: code = f.read() def sub_func(match): return _add_attribute_match(match, attribute) new_code = pattern.sub(sub_func, code) with open(filename, "w") as f: f.write(new_code) def _weaken_file(filename, functions): add_attributes(filename, functions, '__attribute__((weak))') def _staticize_file(filename, functions): add_attributes(filename, functions, 'static') def _rmv_attribute_match(match, attribute: str): prefix = match.group(1) func_name = match.group(2) suffix = match.group(3) new_prefix = re.sub(rf'\b{re.escape(attribute)}\b\s+', '', prefix) return f"{new_prefix}{func_name}{suffix}" def rmv_attributes_file(filename, functions, attribute: str): pattern = re.compile( r"^(\s*(?:(?:\w+[\s\*]+)+))" # prefix (type, qualifiers, etc.) r"(" + "|".join(map(re.escape, functions)) + r")" # function name r"(\s*\([^)]*\)\s*\{)", # argument list + opening brace re.MULTILINE ) with open(filename, "r") as f: code = f.read() def remove_func(match): return _rmv_attribute_match(match, attribute) new_code = pattern.sub(remove_func, code) with open(filename, "w") as f: f.write(new_code) def _unstaticize_file(filename, functions): rmv_attributes_file(filename, functions, 'static') def _extract_prototypes_to_header(filename, functions, header_filename): func_pattern = re.compile( r"^(\s*(?:(?:\w+[\s\*]+)+))" r"(" + "|".join(functions) + r")" r"(\s*\([^)]*\)\s*\{)", re.MULTILINE ) with open(filename, "r") as f: code = f.read() matches = func_pattern.findall(code) if not matches: click.echo( f"{WARNING_BOX}: {functions} not found in {os.path.relpath(filename)}.") return prototypes = [] for prefix, func_name, args in matches: proto = f"{prefix.strip()} static {func_name}{args.strip()[:-1]};" prototypes.append(proto) prototypes = list(dict.fromkeys(prototypes)) header_exists = os.path.exists(header_filename) with open(header_filename, "w" if header_exists else "w") as header: header.write('#include "solver.h"'+"\n") for proto in prototypes: header.write(proto + "\n") include_line = f'#include "{os.path.basename(header_filename)}"\n' if include_line not in code: new_code = include_line + code with open(filename, "w") as f: f.write(new_code) @click.command() @click.argument('number', required=True, type=click.INT) @click.option('-s', '--solution-path', default='./solutions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) @click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) def dissect(number, solution_path, path): sol_as_path = os.path.join(solution_path, f"assignment_{number}") config_path = os.path.join(sol_as_path, "test_config.toml") if not os.path.isdir(sol_as_path): click.echo( f"{ERROR_BOX}: No reference solution for assignment_{number} found.", err=True) return if not os.path.exists(config_path) or not os.path.isfile(config_path): click.echo( f"{ERROR_BOX}: No test configuration for assignment_{number} found.", err=True) return config = dict() with open(config_path, 'rb') as handle: config = tomllib.load(handle) if not 'test_functions' in config["dissect"]: click.echo( f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True) return test_functions = config['dissect']['test_functions'] if not 'build_cmd' in config["dissect"]: click.echo( f"{ERROR_BOX}: No build config found for assignment_{number}.", err=True) return static_functions = config['dissect']['static_functions'] if not 'build_cmd' in config["dissect"]: click.echo( f"{ERROR_BOX}: No build config found for assignment_{number}.", err=True) return build_cmd = config['dissect']['build_cmd'] if not 'run_cmd' in config["dissect"]: click.echo( f"{ERROR_BOX}: No run config found for assignment_{number}.", err=True) return run_cmd = config['dissect']['run_cmd'] if not 'solution_files' in config["dissect"]: click.echo( f"{ERROR_BOX}: No solution_files config found for assignment_{number}.", err=True) return solution_files = config['dissect']['solution_files'] if not 'ref_solution' in config["correctness"]: click.echo( f"{ERROR_BOX}: No ref_solution config found for assignment_{number}.", err=True) return ref_solution = config['correctness']['ref_solution'] if not 'user_solution' in config["correctness"]: click.echo( f"{ERROR_BOX}: No user_solution config found for assignment_{number}.", err=True) return user_solution = config['correctness']['user_solution'] students = os.listdir(path) asg_dirs = [os.path.join( path, student, f"assignment_{number}") for student in students] for asg_dir, student in zip(asg_dirs, students): if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir): click.echo( f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") continue local_roots = [os.path.join(asg_dir, dir) for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))] if len(local_roots) == 0: click.echo( f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") continue current_wd = os.getcwd() click.echo(f"({student}:assignment_{number}):") flags = [f"-DGRDR_TEST_{func.upper()}" for func in test_functions] for root in local_roots: root_src_paths = [os.path.join(root, 'src', file) for file in solution_files] root_des_paths = [os.path.join( sol_as_path, 'src', file) for file in solution_files] for src, dest in zip(root_src_paths, root_des_paths): if not os.path.exists(src): click.echo( f"\t[{os.path.basename(root)}]:{ERROR_BOX}: Missing source {os.path.basename(src)}") shutil.copyfile(src, dest) _extract_prototypes_to_header( dest, static_functions, os.path.join(sol_as_path, 'src', 'aid.h')) _unstaticize_file(dest, test_functions) _weaken_file(dest, test_functions) _staticize_file(dest, static_functions) result = 0 os.chdir(sol_as_path) for function, flag in zip(test_functions, flags): command = build_cmd + f" CFLAGS+={flag}" result = subprocess.run( ['make', 'distclean'], capture_output=True, text=True) result = subprocess.run( shlex.split(command), capture_output=True, text=True, timeout=10) if result.returncode == 0: click.echo( f"\t[{os.path.basename(root)}]:[BUILD]:[{function}]:{SUCCESS_BOX}") else: click.echo( f"\t[{os.path.basename(root)}]:[BUILD]:[{function}]:{ERROR_BOX}") click.echo(f"\t\tstderr: {result.stderr[:61]:61s}...") continue ref_sol_path = os.path.join(sol_as_path, ref_solution) usr_sol_path = os.path.join(sol_as_path, user_solution) if os.path.exists(usr_sol_path): os.remove(usr_sol_path) try: result = subprocess.run( shlex.split(run_cmd), capture_output=True, text=True, timeout=10) except subprocess.TimeoutExpired: result.returncode = 1 click.echo( f"\t[{os.path.basename(root)}]:[RUN]:[{function}]:{ERROR_BOX} -- Timeout") if result.returncode == 0: click.echo( f"\t[{os.path.basename(root)}]:[RUN]:[{function}]:{SUCCESS_BOX}") else: click.echo( f"\t[{os.path.basename(root)}]:[RUN]:[{function}]:{ERROR_BOX}") click.echo(f"\t\tstderr: {result.stderr[:61]:61s}...") continue ref_sol_data = np.loadtxt(ref_sol_path) usr_sol_data = np.loadtxt(usr_sol_path) if np.allclose(ref_sol_data, usr_sol_data, rtol=1e-1) == 0: click.echo( f"\t[{os.path.basename(root)}]:[CORRECT]:[{function}]:{SUCCESS_BOX}") else: click.echo( f"\t[{os.path.basename(root)}]:[CORRECT]:[{function}]:{ERROR_BOX}") continue os.chdir(current_wd)