commit cd09ee874256f38e3d783fc1e331dc446c2504b6 Author: Erik Fabrizzi Date: Fri Oct 24 13:57:49 2025 +0200 Initial Commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..505a3b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +# Python-generated files +__pycache__/ +*.py[oc] +build/ +dist/ +wheels/ +*.egg-info + +# Virtual environments +.venv diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..521624e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,17 @@ +[project] +name = "auto-grader" +version = "0.1.0" +description = "Automatic Grader assist for NHR MPI course" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "click>=8.3.0", + "numpy>=2.3.4", + "unidecode>=1.4.0", +] + +[project.scripts] +grdr = "auto_grader.cli:main" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/src/auto_grader/__init__.py b/src/auto_grader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/auto_grader/cli.py b/src/auto_grader/cli.py new file mode 100644 index 0000000..6abde3f --- /dev/null +++ b/src/auto_grader/cli.py @@ -0,0 +1,472 @@ +import click +import os +import zipfile +import tarfile +import shutil +import subprocess +import shlex +import tomllib +import re +from unidecode import unidecode + +SUCCESS_BOX = "[\033[0;92mSUCCESS\033[0m]" +ERROR_BOX = "[\033[0;91mERROR\033[0m]" +WARNING_BOX = "[\033[0;93mWARNING\033[0m]" + +readme_str = "# Grading-dir for NHR MPI course assignements\n" + \ + "## Usage\n" +\ + "some usage\n" + + +def _unpack(file_path, dest_path): + success = False + if zipfile.is_zipfile(file_path): + with zipfile.ZipFile(file_path, 'r') as z: + z.extractall(dest_path) + success = True + elif tarfile.is_tarfile(file_path): + with tarfile.open(file_path, 'r:*') as t: + t.extractall(dest_path, filter='data') + success = True + return success + + +def _unpack_tree(dir_path): + for dir, _, filenames in os.walk(dir_path): + for filename in filenames: + archive_path = os.path.join(dir, filename) + if zipfile.is_zipfile(archive_path) or tarfile.is_tarfile(archive_path): + extract_dir = os.path.join(dir, os.path.splitext(filename)[0]) + os.makedirs(extract_dir, exist_ok=True) + _unpack(archive_path, extract_dir) + os.remove(archive_path) + + +def _add_weak_attribute(match): + prefix = match.group(1) + func_name = match.group(2) + suffix = match.group(3) + if "__attribute__((weak))" in prefix: + return match.group(0) + return f"{prefix}__attribute__((weak)) {func_name}{suffix}" + + +def _weaken_file(filename, functions): + pattern = re.compile( + r"^(\s*(?:(?:\w+[\s\*]+)+))" + r"(" + "|".join(functions) + r")" + r"(\s*\([^)]*\)\s*\{)", + re.MULTILINE + ) + with open(filename, "r") as f: + code = f.read() + new_code = pattern.sub(_add_weak_attribute, code) + with open(filename, "w") as f: + f.write(new_code) + + +def _remove_static_attribute(match): + prefix = match.group(1) + func_name = match.group(2) + suffix = match.group(3) + new_prefix = re.sub(r'\bstatic\s+', '', prefix) + return f"{new_prefix}{func_name}{suffix}" + + +def _unstaticize_file(filename, functions): + pattern = re.compile( + r"^(\s*(?:(?:\w+[\s\*]+)+))" # prefix (type, qualifiers, etc.) + r"(" + "|".join(map(re.escape, functions)) + r")" # function name + r"(\s*\([^)]*\)\s*\{)", # argument list + opening brace + re.MULTILINE + ) + with open(filename, "r") as f: + code = f.read() + new_code = pattern.sub(_remove_static_attribute, code) + with open(filename, "w") as f: + f.write(new_code) + + +def is_submission_dir(dir_path): + return os.path.isdir(dir_path) and "_assignsubmission_file" in dir_path + + +# def _make_names(dir_path): +# student_names = [unidecode(name[:name.find('_')].lower().replace( +# ' ', '_')) for name in os.listdir( +# dir_path) if is_submission_dir(os.path.join(dir_path, name))] +# return student_names + + +def _make_names_pairs(dir_path): + student_pairs = [(unidecode(name[:name.find('_')].lower().replace( + ' ', '_')), os.path.join(dir_path, name)) for name in os.listdir( + dir_path) if is_submission_dir(os.path.join(dir_path, name))] + return student_pairs + + +def is_assign_archive(path): + result = True + if not os.path.isfile(path): + result = False + if not zipfile.is_zipfile(path) and not tarfile.is_tarfile(path): + result = False + if not ("Assignment" in path): + result = False + return result + + +def _identify_archives(path): + asg_archives = [ + f"{path}/{file}" for file in os.listdir(path) if is_assign_archive(f"{path}/{file}")] + numbers = [0]*len(asg_archives) + basenames = [""]*len(asg_archives) + for idx, asg in enumerate(asg_archives): + name = os.path.basename(asg) + basenames[idx] = name + number = name[name.find("Assignment")+len("Assignment") + + 1:].replace('-', ' ').split(sep=' ')[0] + numbers[idx] = int(number) + pairs = list(zip(numbers, basenames, asg_archives)) + pairs.sort(key=lambda x: x[0]) + return pairs + + +@click.group() +def cli(): + pass + + +@click.command() +@click.argument('path', default='.', type=click.Path(resolve_path=True)) +def init(path): + os.makedirs(f"{path}/raw", exist_ok=True) + os.makedirs(f"{path}/archives", exist_ok=True) + os.makedirs(f"{path}/submissions", exist_ok=True) + os.makedirs(f"{path}/roots", exist_ok=True) + os.makedirs(f"{path}/solutions", exist_ok=True) + os.makedirs(f"{path}/logs", exist_ok=True) + with open(f"{path}/README.md", 'w') as handle: + handle.write(readme_str) + click.echo(f"{SUCCESS_BOX} :{os.path.basename(path)} inited!") + + +@click.group() +def archives(): + pass + + +@click.command("list") +@click.argument('path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +def list_assign(path): + pairs = _identify_archives(path) + for number, archive, _ in pairs: + click.echo(f"[{number}] -> {archive}") + + +@click.command("unpack") +@click.argument('number', required=True, type=click.INT) +@click.option('-p', '--path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('-o', '--output-path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.') +def unpack_assign(path, number, yes, output_path): + pairs = _identify_archives(path) + idx = -1 + if len(pairs) > 0: + numbers, names, paths = zip(*pairs) + else: + numbers = tuple() + names = tuple() + paths = tuple() + if number not in numbers: + detected_str = str(numbers).replace('(', '').replace(')', '') + if len(numbers) == 0: + detected_str = 'None' + click.echo( + f"{ERROR_BOX}: Unavailable assignment. Detected : {detected_str}.", err=True) + return + else: + idx = numbers.index(number) + + if not yes and not click.confirm(f"Are you sure you want to unpack {names[idx]}?"): + return + tmp_dir = os.path.join(output_path, 'tmp') + tmp_dir_shortname = os.path.relpath(tmp_dir) + if os.path.isdir(tmp_dir): + if not yes and not click.confirm(f"I am going to delete {tmp_dir_shortname}, confirm?"): + return + else: + shutil.rmtree(tmp_dir) + os.makedirs(tmp_dir) + _unpack(paths[idx], tmp_dir) + _unpack_tree(tmp_dir) + + name_origin_pairs = _make_names_pairs(tmp_dir) + for name, origin in name_origin_pairs: + destination = os.path.join(output_path, name, f"assignment_{number}") + if not yes and os.path.exists(destination) and not click.confirm(f"I am going to override {os.path.relpath(destination)}, confirm?"): + click.echo( + f"{WARNING_BOX}: Ingoring assignement_{number} for {name}") + continue + if os.path.exists(destination): + shutil.rmtree(destination) + shutil.copytree(origin, destination) + shutil.rmtree(tmp_dir) + + +@click.group() +def roots(): + pass + + +@click.command() +@click.argument('number', required=True, type=click.INT) +@click.option('-p', '--path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('-o', '--output-path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.') +def collect(number, path, output_path, yes): + students = os.listdir(path) + src_paths = [ + f"{path}/{student}/assignment_{number}/" for student in students] + dst_paths = [ + f"{output_path}/{student}/assignment_{number}/" for student in students] + for src_path, dst_path, student in zip(src_paths, dst_paths, students): + if not os.path.exists(src_path) or not os.path.isdir(src_path): + click.echo( + f"{WARNING_BOX}: No assignment_{number} dir found for {student}, skipping...") + continue + if not yes and os.path.exists(dst_path) and not click.confirm(f"I will override {os.path.relpath(dst_path)}, confirm?"): + click.echo(f"{WARNING_BOX}: Skipping {student}...") + continue + if os.path.exists(dst_path): + shutil.rmtree(dst_path) + roots = list() + for dir_root, _, files in os.walk(src_path): + if "Makefile" in files and os.path.isdir(f"{dir_root}/src"): + roots.append(dir_root) + if len(roots) == 0: + click.echo( + f"{WARNING_BOX}: No root could be identified for {student}/assignment_{number}") + continue + for idx, root in enumerate(roots): + root_dst = os.path.join(dst_path, f"root_{idx}") + shutil.copytree(root, root_dst) + with open(os.path.join(root_dst, ".origin"), 'w') as handle: + handle.write(f"origin:\n\t{os.path.relpath(root)}\n") + + +@click.command() +@click.argument('number', required=True, type=click.INT) +@click.option('-p', '--path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('-o', '--output-path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.') +def collect(number, path, output_path, yes): + students = os.listdir(path) + src_paths = [ + f"{path}/{student}/assignment_{number}/" for student in students] + dst_paths = [ + f"{output_path}/{student}/assignment_{number}/" for student in students] + for src_path, dst_path, student in zip(src_paths, dst_paths, students): + if not os.path.exists(src_path) or not os.path.isdir(src_path): + click.echo( + f"{WARNING_BOX}: No assignment_{number} dir found for {student}, skipping...") + continue + if not yes and os.path.exists(dst_path) and not click.confirm(f"I will override {os.path.relpath(dst_path)}, confirm?"): + click.echo(f"{WARNING_BOX}: Skipping {student}...") + continue + if os.path.exists(dst_path): + shutil.rmtree(dst_path) + roots = list() + for dir_root, _, files in os.walk(src_path): + if "Makefile" in files and os.path.isdir(f"{dir_root}/src"): + roots.append(dir_root) + if len(roots) == 0: + click.echo( + f"{WARNING_BOX}: No root could be identified for {student}/assignment_{number}") + continue + for idx, root in enumerate(roots): + root_dst = os.path.join(dst_path, f"root_{idx}") + shutil.copytree(root, root_dst) + with open(os.path.join(root_dst, ".origin"), 'w') as handle: + handle.write(f"origin:\n\t{os.path.relpath(root)}\n") + + +@click.group() +def test(): + pass + + +@click.command() +@click.argument('number', required=True, type=click.INT) +@click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('-c', '--command', default='make', type=click.STRING) +@click.option('-v', '--verbose', is_flag=True, default=False) +def compile(number, path, command, verbose): + students = os.listdir(path) + asg_dirs = [os.path.join( + path, student, f"assignment_{number}") for student in students] + + for asg_dir, student in zip(asg_dirs, students): + if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir): + click.echo( + f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") + continue + local_roots = [os.path.join(asg_dir, dir) + for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))] + if len(local_roots) == 0: + click.echo( + f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") + continue + current_wd = os.getcwd() + if verbose: + click.echo(f"Testing compile for {student}:assignment_{number}") + else: + click.echo(f"({student}:assignment_{number}):") + + for root in local_roots: + if verbose: + click.echo(f"\t[{os.path.basename(root)}]: ", nl=False) + click.echo(f"entering {os.path.relpath(root)}") + os.chdir(root) + result = subprocess.run( + shlex.split(command), capture_output=True, text=True) + if verbose: + if (result.returncode != 0): + click.echo( + f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False) + else: + click.echo( + f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: ", nl=False) + click.echo(f"stdout: {result.stdout}") + click.echo(f"stderr: {result.stderr}") + else: + if (result.returncode != 0): + click.echo( + f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False) + click.echo(f"stderr: {result.stderr[:69]:69s}...") + + else: + click.echo( + f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: SUCCESS") + + os.chdir(current_wd) + + +@click.command() +@click.argument('number', required=True, type=click.INT) +@click.option('-s', '--solution-path', default='./solutions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +@click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) +# @click.option('-c', '--command', default='make', type=click.STRING) +# @click.option('-v', '--verbose', is_flag=True, default=False) +def dissect(number, solution_path, path): + sol_as_path = os.path.join(solution_path, f"assignment_{number}") + config_path = os.path.join(sol_as_path, "test_config.toml") + + if not os.path.isdir(sol_as_path): + click.echo( + f"{ERROR_BOX}: No reference solution for assignment_{number} found.", err=True) + return + if not os.path.exists(config_path) or not os.path.isfile(config_path): + click.echo( + f"{ERROR_BOX}: No test configuration for assignment_{number} found.", err=True) + return + config = dict() + + with open(config_path, 'rb') as handle: + config = tomllib.load(handle) + if not 'test_functions' in config["dissect"]: + click.echo( + f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True) + return + test_functions = config['dissect']['test_functions'] + if not 'all_functions' in config["dissect"]: + click.echo( + f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True) + return + all_functions = config['dissect']['all_functions'] + if not 'build_cmd' in config["dissect"]: + click.echo( + f"{ERROR_BOX}: No build config found for assignment_{number}.", err=True) + return + build_cmd = config['dissect']['build_cmd'] + if not 'run_cmd' in config["dissect"]: + click.echo( + f"{ERROR_BOX}: No run config found for assignment_{number}.", err=True) + return + run_cmd = config['dissect']['run_cmd'] + if not 'solution_files' in config["dissect"]: + click.echo( + f"{ERROR_BOX}: No solution_files config found for assignment_{number}.", err=True) + return + solution_files = config['dissect']['solution_files'] + + students = os.listdir(path) + asg_dirs = [os.path.join( + path, student, f"assignment_{number}") for student in students] + for asg_dir, student in zip(asg_dirs, students): + if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir): + click.echo( + f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") + continue + local_roots = [os.path.join(asg_dir, dir) + for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))] + if len(local_roots) == 0: + click.echo( + f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") + continue + current_wd = os.getcwd() + click.echo(f"({student}:assignment_{number}):") + failed = False + flags = [f"-DGRDR_TEST_{func.upper()}" for func in test_functions] + for root in local_roots: + root_src_paths = [os.path.join(root, 'src', file) + for file in solution_files] + root_des_paths = [os.path.join( + sol_as_path, 'src', file) for file in solution_files] + for src, dest in zip(root_src_paths, root_des_paths): + if not os.path.exists(src): + click.echo( + f"\t[{os.path.basename(root)}]:{ERROR_BOX}: Missing source {os.path.basename(src)}") + shutil.copyfile(src, dest) + _unstaticize_file(dest, all_functions) + _weaken_file(dest, all_functions) + # BUILD TEST + result = 0 + os.chdir(sol_as_path) + for function, flag in zip(test_functions, flags): + command = build_cmd + f" CFLAGS+={flag}" + result = subprocess.run( + ['make', 'distclean'], capture_output=True, text=True) + result = subprocess.run( + shlex.split(command), capture_output=True, text=True) + + if result.returncode == 0: + click.echo( + f"\t[{os.path.basename(root)}]:[BUILD]:[{function}]:{SUCCESS_BOX}") + else: + click.echo( + f"\t[{os.path.basename(root)}]:[BUILD]:[{function}]:{ERROR_BOX}") + click.echo(f"stderr: {result.stderr[:69]:69s}...") + os.chdir(current_wd) + + # for file in solution_files: + # _unstaticize_file(file, test_functions) + # _weaken_file(file, all_functions) + + +cli.add_command(init) +cli.add_command(archives) +cli.add_command(roots) +cli.add_command(test) + +archives.add_command(list_assign) +archives.add_command(unpack_assign) + +roots.add_command(collect) + +test.add_command(compile) +test.add_command(dissect) + + +def main(): + cli()