Initial Commit

This commit is contained in:
Erik Fabrizzi
2025-10-24 13:57:49 +02:00
commit cd09ee8742
5 changed files with 499 additions and 0 deletions

View File

472
src/auto_grader/cli.py Normal file
View File

@@ -0,0 +1,472 @@
import click
import os
import zipfile
import tarfile
import shutil
import subprocess
import shlex
import tomllib
import re
from unidecode import unidecode
SUCCESS_BOX = "[\033[0;92mSUCCESS\033[0m]"
ERROR_BOX = "[\033[0;91mERROR\033[0m]"
WARNING_BOX = "[\033[0;93mWARNING\033[0m]"
readme_str = "# Grading-dir for NHR MPI course assignements\n" + \
"## Usage\n" +\
"some usage\n"
def _unpack(file_path, dest_path):
success = False
if zipfile.is_zipfile(file_path):
with zipfile.ZipFile(file_path, 'r') as z:
z.extractall(dest_path)
success = True
elif tarfile.is_tarfile(file_path):
with tarfile.open(file_path, 'r:*') as t:
t.extractall(dest_path, filter='data')
success = True
return success
def _unpack_tree(dir_path):
for dir, _, filenames in os.walk(dir_path):
for filename in filenames:
archive_path = os.path.join(dir, filename)
if zipfile.is_zipfile(archive_path) or tarfile.is_tarfile(archive_path):
extract_dir = os.path.join(dir, os.path.splitext(filename)[0])
os.makedirs(extract_dir, exist_ok=True)
_unpack(archive_path, extract_dir)
os.remove(archive_path)
def _add_weak_attribute(match):
prefix = match.group(1)
func_name = match.group(2)
suffix = match.group(3)
if "__attribute__((weak))" in prefix:
return match.group(0)
return f"{prefix}__attribute__((weak)) {func_name}{suffix}"
def _weaken_file(filename, functions):
pattern = re.compile(
r"^(\s*(?:(?:\w+[\s\*]+)+))"
r"(" + "|".join(functions) + r")"
r"(\s*\([^)]*\)\s*\{)",
re.MULTILINE
)
with open(filename, "r") as f:
code = f.read()
new_code = pattern.sub(_add_weak_attribute, code)
with open(filename, "w") as f:
f.write(new_code)
def _remove_static_attribute(match):
prefix = match.group(1)
func_name = match.group(2)
suffix = match.group(3)
new_prefix = re.sub(r'\bstatic\s+', '', prefix)
return f"{new_prefix}{func_name}{suffix}"
def _unstaticize_file(filename, functions):
pattern = re.compile(
r"^(\s*(?:(?:\w+[\s\*]+)+))" # prefix (type, qualifiers, etc.)
r"(" + "|".join(map(re.escape, functions)) + r")" # function name
r"(\s*\([^)]*\)\s*\{)", # argument list + opening brace
re.MULTILINE
)
with open(filename, "r") as f:
code = f.read()
new_code = pattern.sub(_remove_static_attribute, code)
with open(filename, "w") as f:
f.write(new_code)
def is_submission_dir(dir_path):
return os.path.isdir(dir_path) and "_assignsubmission_file" in dir_path
# def _make_names(dir_path):
# student_names = [unidecode(name[:name.find('_')].lower().replace(
# ' ', '_')) for name in os.listdir(
# dir_path) if is_submission_dir(os.path.join(dir_path, name))]
# return student_names
def _make_names_pairs(dir_path):
student_pairs = [(unidecode(name[:name.find('_')].lower().replace(
' ', '_')), os.path.join(dir_path, name)) for name in os.listdir(
dir_path) if is_submission_dir(os.path.join(dir_path, name))]
return student_pairs
def is_assign_archive(path):
result = True
if not os.path.isfile(path):
result = False
if not zipfile.is_zipfile(path) and not tarfile.is_tarfile(path):
result = False
if not ("Assignment" in path):
result = False
return result
def _identify_archives(path):
asg_archives = [
f"{path}/{file}" for file in os.listdir(path) if is_assign_archive(f"{path}/{file}")]
numbers = [0]*len(asg_archives)
basenames = [""]*len(asg_archives)
for idx, asg in enumerate(asg_archives):
name = os.path.basename(asg)
basenames[idx] = name
number = name[name.find("Assignment")+len("Assignment") +
1:].replace('-', ' ').split(sep=' ')[0]
numbers[idx] = int(number)
pairs = list(zip(numbers, basenames, asg_archives))
pairs.sort(key=lambda x: x[0])
return pairs
@click.group()
def cli():
pass
@click.command()
@click.argument('path', default='.', type=click.Path(resolve_path=True))
def init(path):
os.makedirs(f"{path}/raw", exist_ok=True)
os.makedirs(f"{path}/archives", exist_ok=True)
os.makedirs(f"{path}/submissions", exist_ok=True)
os.makedirs(f"{path}/roots", exist_ok=True)
os.makedirs(f"{path}/solutions", exist_ok=True)
os.makedirs(f"{path}/logs", exist_ok=True)
with open(f"{path}/README.md", 'w') as handle:
handle.write(readme_str)
click.echo(f"{SUCCESS_BOX} :{os.path.basename(path)} inited!")
@click.group()
def archives():
pass
@click.command("list")
@click.argument('path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
def list_assign(path):
pairs = _identify_archives(path)
for number, archive, _ in pairs:
click.echo(f"[{number}] -> {archive}")
@click.command("unpack")
@click.argument('number', required=True, type=click.INT)
@click.option('-p', '--path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('-o', '--output-path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.')
def unpack_assign(path, number, yes, output_path):
pairs = _identify_archives(path)
idx = -1
if len(pairs) > 0:
numbers, names, paths = zip(*pairs)
else:
numbers = tuple()
names = tuple()
paths = tuple()
if number not in numbers:
detected_str = str(numbers).replace('(', '').replace(')', '')
if len(numbers) == 0:
detected_str = 'None'
click.echo(
f"{ERROR_BOX}: Unavailable assignment. Detected : {detected_str}.", err=True)
return
else:
idx = numbers.index(number)
if not yes and not click.confirm(f"Are you sure you want to unpack {names[idx]}?"):
return
tmp_dir = os.path.join(output_path, 'tmp')
tmp_dir_shortname = os.path.relpath(tmp_dir)
if os.path.isdir(tmp_dir):
if not yes and not click.confirm(f"I am going to delete {tmp_dir_shortname}, confirm?"):
return
else:
shutil.rmtree(tmp_dir)
os.makedirs(tmp_dir)
_unpack(paths[idx], tmp_dir)
_unpack_tree(tmp_dir)
name_origin_pairs = _make_names_pairs(tmp_dir)
for name, origin in name_origin_pairs:
destination = os.path.join(output_path, name, f"assignment_{number}")
if not yes and os.path.exists(destination) and not click.confirm(f"I am going to override {os.path.relpath(destination)}, confirm?"):
click.echo(
f"{WARNING_BOX}: Ingoring assignement_{number} for {name}")
continue
if os.path.exists(destination):
shutil.rmtree(destination)
shutil.copytree(origin, destination)
shutil.rmtree(tmp_dir)
@click.group()
def roots():
pass
@click.command()
@click.argument('number', required=True, type=click.INT)
@click.option('-p', '--path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('-o', '--output-path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.')
def collect(number, path, output_path, yes):
students = os.listdir(path)
src_paths = [
f"{path}/{student}/assignment_{number}/" for student in students]
dst_paths = [
f"{output_path}/{student}/assignment_{number}/" for student in students]
for src_path, dst_path, student in zip(src_paths, dst_paths, students):
if not os.path.exists(src_path) or not os.path.isdir(src_path):
click.echo(
f"{WARNING_BOX}: No assignment_{number} dir found for {student}, skipping...")
continue
if not yes and os.path.exists(dst_path) and not click.confirm(f"I will override {os.path.relpath(dst_path)}, confirm?"):
click.echo(f"{WARNING_BOX}: Skipping {student}...")
continue
if os.path.exists(dst_path):
shutil.rmtree(dst_path)
roots = list()
for dir_root, _, files in os.walk(src_path):
if "Makefile" in files and os.path.isdir(f"{dir_root}/src"):
roots.append(dir_root)
if len(roots) == 0:
click.echo(
f"{WARNING_BOX}: No root could be identified for {student}/assignment_{number}")
continue
for idx, root in enumerate(roots):
root_dst = os.path.join(dst_path, f"root_{idx}")
shutil.copytree(root, root_dst)
with open(os.path.join(root_dst, ".origin"), 'w') as handle:
handle.write(f"origin:\n\t{os.path.relpath(root)}\n")
@click.command()
@click.argument('number', required=True, type=click.INT)
@click.option('-p', '--path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('-o', '--output-path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.')
def collect(number, path, output_path, yes):
students = os.listdir(path)
src_paths = [
f"{path}/{student}/assignment_{number}/" for student in students]
dst_paths = [
f"{output_path}/{student}/assignment_{number}/" for student in students]
for src_path, dst_path, student in zip(src_paths, dst_paths, students):
if not os.path.exists(src_path) or not os.path.isdir(src_path):
click.echo(
f"{WARNING_BOX}: No assignment_{number} dir found for {student}, skipping...")
continue
if not yes and os.path.exists(dst_path) and not click.confirm(f"I will override {os.path.relpath(dst_path)}, confirm?"):
click.echo(f"{WARNING_BOX}: Skipping {student}...")
continue
if os.path.exists(dst_path):
shutil.rmtree(dst_path)
roots = list()
for dir_root, _, files in os.walk(src_path):
if "Makefile" in files and os.path.isdir(f"{dir_root}/src"):
roots.append(dir_root)
if len(roots) == 0:
click.echo(
f"{WARNING_BOX}: No root could be identified for {student}/assignment_{number}")
continue
for idx, root in enumerate(roots):
root_dst = os.path.join(dst_path, f"root_{idx}")
shutil.copytree(root, root_dst)
with open(os.path.join(root_dst, ".origin"), 'w') as handle:
handle.write(f"origin:\n\t{os.path.relpath(root)}\n")
@click.group()
def test():
pass
@click.command()
@click.argument('number', required=True, type=click.INT)
@click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('-c', '--command', default='make', type=click.STRING)
@click.option('-v', '--verbose', is_flag=True, default=False)
def compile(number, path, command, verbose):
students = os.listdir(path)
asg_dirs = [os.path.join(
path, student, f"assignment_{number}") for student in students]
for asg_dir, student in zip(asg_dirs, students):
if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir):
click.echo(
f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}")
continue
local_roots = [os.path.join(asg_dir, dir)
for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))]
if len(local_roots) == 0:
click.echo(
f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}")
continue
current_wd = os.getcwd()
if verbose:
click.echo(f"Testing compile for {student}:assignment_{number}")
else:
click.echo(f"({student}:assignment_{number}):")
for root in local_roots:
if verbose:
click.echo(f"\t[{os.path.basename(root)}]: ", nl=False)
click.echo(f"entering {os.path.relpath(root)}")
os.chdir(root)
result = subprocess.run(
shlex.split(command), capture_output=True, text=True)
if verbose:
if (result.returncode != 0):
click.echo(
f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False)
else:
click.echo(
f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: ", nl=False)
click.echo(f"stdout: {result.stdout}")
click.echo(f"stderr: {result.stderr}")
else:
if (result.returncode != 0):
click.echo(
f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False)
click.echo(f"stderr: {result.stderr[:69]:69s}...")
else:
click.echo(
f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: SUCCESS")
os.chdir(current_wd)
@click.command()
@click.argument('number', required=True, type=click.INT)
@click.option('-s', '--solution-path', default='./solutions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
# @click.option('-c', '--command', default='make', type=click.STRING)
# @click.option('-v', '--verbose', is_flag=True, default=False)
def dissect(number, solution_path, path):
sol_as_path = os.path.join(solution_path, f"assignment_{number}")
config_path = os.path.join(sol_as_path, "test_config.toml")
if not os.path.isdir(sol_as_path):
click.echo(
f"{ERROR_BOX}: No reference solution for assignment_{number} found.", err=True)
return
if not os.path.exists(config_path) or not os.path.isfile(config_path):
click.echo(
f"{ERROR_BOX}: No test configuration for assignment_{number} found.", err=True)
return
config = dict()
with open(config_path, 'rb') as handle:
config = tomllib.load(handle)
if not 'test_functions' in config["dissect"]:
click.echo(
f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True)
return
test_functions = config['dissect']['test_functions']
if not 'all_functions' in config["dissect"]:
click.echo(
f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True)
return
all_functions = config['dissect']['all_functions']
if not 'build_cmd' in config["dissect"]:
click.echo(
f"{ERROR_BOX}: No build config found for assignment_{number}.", err=True)
return
build_cmd = config['dissect']['build_cmd']
if not 'run_cmd' in config["dissect"]:
click.echo(
f"{ERROR_BOX}: No run config found for assignment_{number}.", err=True)
return
run_cmd = config['dissect']['run_cmd']
if not 'solution_files' in config["dissect"]:
click.echo(
f"{ERROR_BOX}: No solution_files config found for assignment_{number}.", err=True)
return
solution_files = config['dissect']['solution_files']
students = os.listdir(path)
asg_dirs = [os.path.join(
path, student, f"assignment_{number}") for student in students]
for asg_dir, student in zip(asg_dirs, students):
if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir):
click.echo(
f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}")
continue
local_roots = [os.path.join(asg_dir, dir)
for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))]
if len(local_roots) == 0:
click.echo(
f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}")
continue
current_wd = os.getcwd()
click.echo(f"({student}:assignment_{number}):")
failed = False
flags = [f"-DGRDR_TEST_{func.upper()}" for func in test_functions]
for root in local_roots:
root_src_paths = [os.path.join(root, 'src', file)
for file in solution_files]
root_des_paths = [os.path.join(
sol_as_path, 'src', file) for file in solution_files]
for src, dest in zip(root_src_paths, root_des_paths):
if not os.path.exists(src):
click.echo(
f"\t[{os.path.basename(root)}]:{ERROR_BOX}: Missing source {os.path.basename(src)}")
shutil.copyfile(src, dest)
_unstaticize_file(dest, all_functions)
_weaken_file(dest, all_functions)
# BUILD TEST
result = 0
os.chdir(sol_as_path)
for function, flag in zip(test_functions, flags):
command = build_cmd + f" CFLAGS+={flag}"
result = subprocess.run(
['make', 'distclean'], capture_output=True, text=True)
result = subprocess.run(
shlex.split(command), capture_output=True, text=True)
if result.returncode == 0:
click.echo(
f"\t[{os.path.basename(root)}]:[BUILD]:[{function}]:{SUCCESS_BOX}")
else:
click.echo(
f"\t[{os.path.basename(root)}]:[BUILD]:[{function}]:{ERROR_BOX}")
click.echo(f"stderr: {result.stderr[:69]:69s}...")
os.chdir(current_wd)
# for file in solution_files:
# _unstaticize_file(file, test_functions)
# _weaken_file(file, all_functions)
cli.add_command(init)
cli.add_command(archives)
cli.add_command(roots)
cli.add_command(test)
archives.add_command(list_assign)
archives.add_command(unpack_assign)
roots.add_command(collect)
test.add_command(compile)
test.add_command(dissect)
def main():
cli()