feat: major refactor and feature expansion for auto_grader CLI

- **Introduced `run_log` and `test_type` classes**
  - Provide structured logging and standardized output formatting for build, run,
    and correctness phases.
  - Support both multiline and oneline colored summaries for better readability.
- **Added `color_string` and `indent_text` utilities**
  - Improve terminal UX with color-coded and indented CLI messages.

- **Enhanced attribute manipulation utilities**
  - Generalized weak/static attribute handling with `add_attributes` and
    `rmv_attributes_file` functions.
  - Unified behavior for `_weaken_file`, `_staticize_file`, and `_unstaticize_file`.
  - Improved regex robustness and correctness messages for missing definitions.

- **Added `StudentVarType` click parameter type**
  - Provides shell completion for student directories based on current path.

- **Introduced new build/run infrastructure**
  - Added `build_root()`, `run_root()`, and `clean_root()` wrappers using `subprocess.run`.
  - Introduced detailed error capture and standardized log generation.

- **New `compile` command**
  - Builds and cleans all roots (or specific student via `-s`).
  - Provides colorized per-root build summaries with timeout support and command overrides.

- **New `correctness` test command**
  - Runs compiled student executables against reference solutions.
This commit is contained in:
Erik Fabrizzi
2025-10-28 18:22:11 +01:00
parent 77444e8e0b
commit d4bac2ee3f

View File

@@ -1,3 +1,4 @@
from click.shell_completion import CompletionItem
import click import click
import os import os
import zipfile import zipfile
@@ -7,9 +8,10 @@ import subprocess
import shlex import shlex
import tomllib import tomllib
import re import re
import difflib
import numpy as np import numpy as np
from subprocess import CompletedProcess
from unidecode import unidecode from unidecode import unidecode
from typing import Optional
SUCCESS_BOX = "[\033[0;92mSUCCESS\033[0m]" SUCCESS_BOX = "[\033[0;92mSUCCESS\033[0m]"
ERROR_BOX = "[\033[0;91mERROR\033[0m]" ERROR_BOX = "[\033[0;91mERROR\033[0m]"
@@ -19,6 +21,117 @@ readme_str = "# Grading-dir for NHR MPI course assignements\n" + \
"## Usage\n" +\ "## Usage\n" +\
"some usage\n" "some usage\n"
shell_colors = {'br-red': 91, 'br-green': 92, 'br-yellow': 93}
def color_string(string: str, color: str):
result_string = string
if color in shell_colors:
result_string = f"\033[0;{shell_colors[color]}m{string}\033[0m"
return result_string
def indent_text(text: str, indent: int, indent_char: str = ' '):
lines = text.split('\n')
for idx, line in enumerate(lines):
lines[idx] = indent_char*indent + line
return '\n'.join(lines)
class test_type:
build = "BUILD"
run = "RUN"
correctness = "CORRECTNESS"
class run_log:
def __init__(self,
ttype: str,
run_cmd: str,
run_abs_dir: str,
run_success: bool,
summary: str,
cmd_return_code: int,
stdout: str,
stderr: str):
self.ttype = ttype
self.run_cmd = run_cmd
self.run_abs_dir = run_abs_dir
self.run_success = run_success
self.summary = summary
self.cmd_return_code = cmd_return_code
self.stdout = stdout
self.stderr = stderr
def as_str(self, indent=0, indent_char=' ', color=True):
color_func = (
(lambda s, c: color_string(s, c))
if color
else (lambda s, c: s)
)
header = f"[{self.ttype}]"
success_string = color_func(
'SUCCESS', 'br-green') if self.run_success else color_func('FAILURE', 'br-red')
run_info = \
"[RUN INFO]\n" +\
f" - COMMAND : {self.run_cmd}\n" +\
f" - RUN AT : {self.run_abs_dir}\n" +\
f" - RUN SUCCESS : {success_string}"
if not self.run_success:
run_info += "\n - FAILURE SUMMARY:\n"
run_info += indent_text(self.summary, 8)
cmd_info = \
"[CMD INFO]\n" +\
f" - RETURN CODE : {self.cmd_return_code}\n" +\
f" - STDOUT : \n" +\
indent_text(self.stdout, 8) + '\n' +\
f" - STDERR : \n" +\
indent_text(self.stderr, 8)
final_string = '\n'.join([header,
indent_text(run_info, 4), indent_text(cmd_info, 4)])
final_string = indent_text(
final_string, indent=indent, indent_char=indent_char)
return final_string
def oneline(self, type_as_prefix=True, color=True):
color_func = (
(lambda s, c: color_string(s, c))
if color
else (lambda s, c: s)
)
oneline = ""
if type_as_prefix:
oneline += f"[{self.ttype}]:"
oneline += f"[{os.path.basename(self.run_abs_dir)}]: "
if self.run_success and self.cmd_return_code == 0:
oneline += f"{color_func('SUCCESS', 'br-green')}"
if not self.run_success:
oneline += f"{color_func('FAILURE', 'br-red')} (shl)(hint: "
rest_str_len = len(oneline+'...)')
err_int = self.summary[:rest_str_len].replace("\n", " ")
oneline += err_int+"...)"
if self.run_success and not self.cmd_return_code == 0:
oneline += f"{color_func('FAILURE', 'br-red')} (cmd)(stderr: "
rest_str_len = len(oneline+'...)')
err_int = self.stderr[:rest_str_len].replace("\n", " ")
oneline += err_int+"...)"
return oneline
class StudentVarType(click.ParamType):
name = "student"
def shell_complete(self, ctx, param, incomplete):
if 'path' not in ctx.params:
return click.Path.shell_complete(click.Path(), ctx, param, incomplete)
else:
return [CompletionItem(dir) for dir in os.listdir(ctx.params['path']) if dir.startswith(incomplete)]
def _unpack(file_path, dest_path): def _unpack(file_path, dest_path):
success = False success = False
@@ -44,48 +157,60 @@ def _unpack_tree(dir_path):
os.remove(archive_path) os.remove(archive_path)
def _add_weak_attribute(match): # - SRC-TOOLS -----------------------------------------------------------------#
def _add_attribute_match(match, attribute: str):
prefix = match.group(1) prefix = match.group(1)
func_name = match.group(2) func_name = match.group(2)
suffix = match.group(3) suffix = match.group(3)
if "__attribute__((weak))" in prefix: if attribute in prefix:
return match.group(0) return match.group(0)
return f"{prefix}__attribute__((weak)) {func_name}{suffix}" return f"{prefix} {attribute} {func_name}{suffix}"
def _weaken_file(filename, functions): def add_attributes(filename, functions, attribute: str):
pattern = re.compile( pattern = re.compile(
r"^(\s*(?:(?:\w+[\s\*]+)+))" r"^(\s*(?:(?:\w+[\s\*]+)+))"
r"(" + "|".join(functions) + r")" r"(" + "|".join(map(re.escape, functions)) + r")"
r"(\s*\([^)]*\)\s*\{)", r"(\s*\([^)]*\)\s*\{)",
re.MULTILINE re.MULTILINE
) )
with open(filename, "r") as f: with open(filename, "r") as f:
code = f.read() code = f.read()
new_code = pattern.sub(_add_weak_attribute, code)
def sub_func(match): return _add_attribute_match(match, attribute)
new_code = pattern.sub(sub_func, code)
with open(filename, "w") as f: with open(filename, "w") as f:
f.write(new_code) f.write(new_code)
def _add_static_attribute(match): def _weaken_file(filename, functions):
prefix = match.group(1) add_attributes(filename, functions, '__attribute__((weak))')
func_name = match.group(2)
suffix = match.group(3)
if "static" in prefix:
return match.group(0)
return f"{prefix} static {func_name}{suffix}"
def _staticize_file(filename, functions): def _staticize_file(filename, functions):
add_attributes(filename, functions, 'static')
def _rmv_attribute_match(match, attribute: str):
prefix = match.group(1)
func_name = match.group(2)
suffix = match.group(3)
new_prefix = re.sub(rf'\b{re.escape(attribute)}\b\s+', '', prefix)
return f"{new_prefix}{func_name}{suffix}"
def rmv_attributes_file(filename, functions, attribute: str):
pattern = re.compile( pattern = re.compile(
r"^(\s*(?:(?:\w+[\s\*]+)+))" r"^(\s*(?:(?:\w+[\s\*]+)+))" # prefix (type, qualifiers, etc.)
r"(" + "|".join(functions) + r")" r"(" + "|".join(map(re.escape, functions)) + r")" # function name
r"(\s*\([^)]*\)\s*\{)", r"(\s*\([^)]*\)\s*\{)", # argument list + opening brace
re.MULTILINE re.MULTILINE
) )
with open(filename, "r") as f: with open(filename, "r") as f:
code = f.read() code = f.read()
new_code = pattern.sub(_add_static_attribute, code)
def remove_func(match): return _rmv_attribute_match(match, attribute)
new_code = pattern.sub(remove_func, code)
with open(filename, "w") as f: with open(filename, "w") as f:
f.write(new_code) f.write(new_code)
@@ -99,17 +224,7 @@ def _remove_static_attribute(match):
def _unstaticize_file(filename, functions): def _unstaticize_file(filename, functions):
pattern = re.compile( rmv_attributes_file(filename, functions, 'static')
r"^(\s*(?:(?:\w+[\s\*]+)+))" # prefix (type, qualifiers, etc.)
r"(" + "|".join(map(re.escape, functions)) + r")" # function name
r"(\s*\([^)]*\)\s*\{)", # argument list + opening brace
re.MULTILINE
)
with open(filename, "r") as f:
code = f.read()
new_code = pattern.sub(_remove_static_attribute, code)
with open(filename, "w") as f:
f.write(new_code)
def _extract_prototypes_to_header(filename, functions, header_filename): def _extract_prototypes_to_header(filename, functions, header_filename):
@@ -119,19 +234,22 @@ def _extract_prototypes_to_header(filename, functions, header_filename):
r"(\s*\([^)]*\)\s*\{)", r"(\s*\([^)]*\)\s*\{)",
re.MULTILINE re.MULTILINE
) )
with open(filename, "r") as f: with open(filename, "r") as f:
code = f.read() code = f.read()
matches = func_pattern.findall(code) matches = func_pattern.findall(code)
if not matches: if not matches:
click.echo(f"{WARNING_BOX}:No definitions found for {functions}") click.echo(
f"{WARNING_BOX}: {functions} not found in {os.path.relpath(filename)}.")
return return
prototypes = [] prototypes = []
for prefix, func_name, args in matches: for prefix, func_name, args in matches:
proto = f"{prefix.strip()} static {func_name}{args.strip()[:-1]};" proto = f"{prefix.strip()} static {func_name}{args.strip()[:-1]};"
prototypes.append(proto) prototypes.append(proto)
prototypes = list(dict.fromkeys(prototypes)) # preserve order prototypes = list(dict.fromkeys(prototypes))
header_exists = os.path.exists(header_filename) header_exists = os.path.exists(header_filename)
with open(header_filename, "w" if header_exists else "w") as header: with open(header_filename, "w" if header_exists else "w") as header:
header.write('#include "solver.h"'+"\n") header.write('#include "solver.h"'+"\n")
@@ -143,20 +261,14 @@ def _extract_prototypes_to_header(filename, functions, header_filename):
new_code = include_line + code new_code = include_line + code
with open(filename, "w") as f: with open(filename, "w") as f:
f.write(new_code) f.write(new_code)
# -----------------------------------------------------------------------------#
def is_submission_dir(dir_path): def is_submission_dir(dir_path):
return os.path.isdir(dir_path) and "_assignsubmission_file" in dir_path return os.path.isdir(dir_path) and "_assignsubmission_file" in dir_path
def _make_names_pairs(dir_path): def is_assignment_archive(path):
student_pairs = [(unidecode(name[:name.find('_')].lower().replace(
' ', '_')), os.path.join(dir_path, name)) for name in os.listdir(
dir_path) if is_submission_dir(os.path.join(dir_path, name))]
return student_pairs
def is_assign_archive(path):
result = True result = True
if not os.path.isfile(path): if not os.path.isfile(path):
result = False result = False
@@ -167,9 +279,16 @@ def is_assign_archive(path):
return result return result
def _make_names_pairs(dir_path):
student_pairs = [(unidecode(name[:name.find('_')].lower().replace(
' ', '_')), os.path.join(dir_path, name)) for name in os.listdir(
dir_path) if is_submission_dir(os.path.join(dir_path, name))]
return student_pairs
def _identify_archives(path): def _identify_archives(path):
asg_archives = [ asg_archives = [
f"{path}/{file}" for file in os.listdir(path) if is_assign_archive(f"{path}/{file}")] f"{path}/{file}" for file in os.listdir(path) if is_assignment_archive(f"{path}/{file}")]
numbers = [0]*len(asg_archives) numbers = [0]*len(asg_archives)
basenames = [""]*len(asg_archives) basenames = [""]*len(asg_archives)
for idx, asg in enumerate(asg_archives): for idx, asg in enumerate(asg_archives):
@@ -191,7 +310,6 @@ def cli():
@click.command() @click.command()
@click.argument('path', default='.', type=click.Path(resolve_path=True)) @click.argument('path', default='.', type=click.Path(resolve_path=True))
def init(path): def init(path):
os.makedirs(f"{path}/raw", exist_ok=True)
os.makedirs(f"{path}/archives", exist_ok=True) os.makedirs(f"{path}/archives", exist_ok=True)
os.makedirs(f"{path}/submissions", exist_ok=True) os.makedirs(f"{path}/submissions", exist_ok=True)
os.makedirs(f"{path}/roots", exist_ok=True) os.makedirs(f"{path}/roots", exist_ok=True)
@@ -202,11 +320,6 @@ def init(path):
click.echo(f"{SUCCESS_BOX} :{os.path.basename(path)} inited!") click.echo(f"{SUCCESS_BOX} :{os.path.basename(path)} inited!")
@click.group()
def archives():
pass
@click.command("list") @click.command("list")
@click.argument('path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) @click.argument('path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
def list_assign(path): def list_assign(path):
@@ -215,12 +328,12 @@ def list_assign(path):
click.echo(f"[{number}] -> {archive}") click.echo(f"[{number}] -> {archive}")
@click.command("unpack") @click.command()
@click.argument('number', required=True, type=click.INT) @click.argument('number', required=True, type=click.INT)
@click.option('-p', '--path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) @click.option('-p', '--path', default='./archives', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('-o', '--output-path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) @click.option('-o', '--output-path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.') @click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.')
def unpack_assign(path, number, yes, output_path): def unpack(path, number, yes, output_path):
pairs = _identify_archives(path) pairs = _identify_archives(path)
idx = -1 idx = -1
if len(pairs) > 0: if len(pairs) > 0:
@@ -265,22 +378,19 @@ def unpack_assign(path, number, yes, output_path):
shutil.rmtree(tmp_dir) shutil.rmtree(tmp_dir)
@click.group()
def roots():
pass
@click.command() @click.command()
@click.argument('number', required=True, type=click.INT) @click.argument('number', required=True, type=click.INT)
@click.option('-p', '--path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) @click.option('-p', '--path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('-o', '--output-path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) @click.option('-o', '--output-path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.') @click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.')
def collect(number, path, output_path, yes): def collect(number, path, output_path, yes):
students = os.listdir(path) students = os.listdir(path)
src_paths = [ src_paths = [
f"{path}/{student}/assignment_{number}/" for student in students] f"{path}/{student}/assignment_{number}/" for student in students]
dst_paths = [ dst_paths = [
f"{output_path}/{student}/assignment_{number}/" for student in students] f"{output_path}/{student}/assignment_{number}/" for student in students]
for src_path, dst_path, student in zip(src_paths, dst_paths, students): for src_path, dst_path, student in zip(src_paths, dst_paths, students):
if not os.path.exists(src_path) or not os.path.isdir(src_path): if not os.path.exists(src_path) or not os.path.isdir(src_path):
click.echo( click.echo(
@@ -306,58 +416,120 @@ def collect(number, path, output_path, yes):
handle.write(f"origin:\n\t{os.path.relpath(root)}\n") handle.write(f"origin:\n\t{os.path.relpath(root)}\n")
def build_root(root_path: str, build_cmd: str, timeout: int = 10):
cwd = os.getcwd()
os.chdir(root_path)
run_result = True
exception_summary = ""
result = CompletedProcess('', -1, '', '')
try:
result = subprocess.run(shlex.split(build_cmd),
timeout=10, text=True, capture_output=True)
except Exception as e:
run_result = False
exception_summary = str(e)
os.chdir(cwd)
log = run_log(test_type.build, build_cmd, os.path.abspath(root_path), run_result,
exception_summary, result.returncode, result.stdout, result.stderr)
return log
def run_root(root_path: str, build_cmd: str, timeout: int = 10, exe: Optional[str] = None):
pass
# cwd = os.getcwd()
# os.chdir(root_path)
# run_result = True
# exception_summary = ""
# result = CompletedProcess('', -1, '', '')
# try:
# result = subprocess.run(shlex.split(build_cmd),
# timeout=10, text=True, capture_output=True)
# except Exception as e:
# run_result = False
# exception_summary = str(e)
#
# os.chdir(cwd)
# log = run_log(test_type.build, build_cmd, os.path.abspath(root_path), run_result,
# exception_summary, result.returncode, result.stdout, result.stderr)
# return log
def clean_root(root_path: str, clean: str, timeout: int = 10):
return build_root(root_path, clean, timeout=timeout)
@click.command() @click.command()
@click.argument('number', required=True, type=click.INT) @click.argument('number', required=True, type=click.INT)
@click.option('-p', '--path', default='./submissions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) @click.option('-p', '--path', default='./roots',
@click.option('-o', '--output-path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) type=click.Path(exists=True, file_okay=False,
@click.option('--yes', is_flag=True, default=False, help='Skip confirmation prompt.') dir_okay=True, resolve_path=True),
def collect(number, path, output_path, yes): help='Path to directory conaining student roots in format (student)/assignment_[NUMBER]/root_(...).')
@click.option('-s', '--student', default='', type=StudentVarType(),
help='Specify wich student to run the test for. If omitted test is run for all found students.')
@click.option('-c', '--command', default='make', type=click.STRING,
help='Overrides command that is run to build roots. The default command is `make`.')
@click.option('--clean-command', default='make distclean', type=click.STRING,
help='Overrides command that is run to build roots. The default command is `make distclean`.')
@click.option('-t', '--timeout', default=10, type=click.INT,
help='Sets timeout of both clean and build commands, defaults to 10s.')
def compile(number, path, command, student, timeout, clean_command):
current_wd = os.getcwd()
students = os.listdir(path) students = os.listdir(path)
src_paths = [ if student and student not in students:
f"{path}/{student}/assignment_{number}/" for student in students] click.echo(
dst_paths = [ f"{ERROR_BOX}: No student {student} in {os.path.relpath(path)}.")
f"{output_path}/{student}/assignment_{number}/" for student in students] return
for src_path, dst_path, student in zip(src_paths, dst_paths, students): if student:
if not os.path.exists(src_path) or not os.path.isdir(src_path): students = [student]
asg_dirs = [os.path.join(
path, student, f"assignment_{number}") for student in students]
for asg_dir, student in zip(asg_dirs, students):
if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir):
click.echo( click.echo(
f"{WARNING_BOX}: No assignment_{number} dir found for {student}, skipping...") f"{WARNING_BOX}: No {student}/assignment_{number} was found, skipping...")
continue continue
if not yes and os.path.exists(dst_path) and not click.confirm(f"I will override {os.path.relpath(dst_path)}, confirm?"): local_roots = [os.path.join(asg_dir, dir)
click.echo(f"{WARNING_BOX}: Skipping {student}...") for dir in os.listdir(asg_dir) if 'root_' in dir and os.path.isdir(os.path.join(asg_dir, dir))]
continue if len(local_roots) == 0:
if os.path.exists(dst_path):
shutil.rmtree(dst_path)
roots = list()
for dir_root, _, files in os.walk(src_path):
if "Makefile" in files and os.path.isdir(f"{dir_root}/src"):
roots.append(dir_root)
if len(roots) == 0:
click.echo( click.echo(
f"{WARNING_BOX}: No root could be identified for {student}/assignment_{number}") f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}")
continue continue
for idx, root in enumerate(roots): click.echo(f"[{student}]:")
root_dst = os.path.join(dst_path, f"root_{idx}") for root in local_roots:
shutil.copytree(root, root_dst) os.chdir(root)
with open(os.path.join(root_dst, ".origin"), 'w') as handle: clean_log = clean_root(root, clean_command)
handle.write(f"origin:\n\t{os.path.relpath(root)}\n") if not clean_log.run_success or not clean_log.cmd_return_code == 0:
click.echo(
f"{indent_text('[CLEAN]:' + clean_log.oneline(type_as_prefix=False), 4)}")
@click.group() continue
def test(): log = build_root(root, command, timeout=timeout)
pass clean_log = clean_root(root, clean_command)
if not clean_log.run_success or not clean_log.cmd_return_code == 0:
click.echo(
f"{indent_text('[CLEAN]:' + clean_log.oneline(type_as_prefix=False), 4)}")
continue
click.echo(f"{indent_text(log.oneline(type_as_prefix=False), 4)}")
os.chdir(current_wd)
@click.command() @click.command()
@click.argument('number', required=True, type=click.INT) @click.argument('number', required=True, type=click.INT)
@click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True)) @click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
@click.option('-c', '--command', default='make', type=click.STRING) @click.option('-c', '--compile-command', default='make', type=click.STRING)
@click.option('-r', '--run-command', default='mpirun', type=click.STRING)
@click.option('-n', '--nproc-flag', default="-n 5", type=click.STRING)
@click.option('-v', '--verbose', is_flag=True, default=False) @click.option('-v', '--verbose', is_flag=True, default=False)
def compile(number, path, command, verbose): @click.option('-s', '--solution-path', default='./solutions', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
def correctness(number, solution_path, path, compile_command, run_command, nproc_flag, verbose):
students = os.listdir(path) students = os.listdir(path)
asg_dirs = [os.path.join( asg_dirs = [os.path.join(
path, student, f"assignment_{number}") for student in students] path, student, f"assignment_{number}") for student in students]
for asg_dir, student in zip(asg_dirs, students): for asg_dir, student in zip(asg_dirs, students):
current_wd = os.getcwd()
if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir): if not os.path.exists(asg_dir) or not os.path.isdir(asg_dir):
click.echo( click.echo(
f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}")
@@ -368,19 +540,24 @@ def compile(number, path, command, verbose):
click.echo( click.echo(
f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}") f"{WARNING_BOX}: No roots could be identified for {student}/assignment_{number}")
continue continue
current_wd = os.getcwd()
if verbose: if verbose:
click.echo(f"Testing compile for {student}:assignment_{number}") click.echo(f"Testing compile for {student}:assignment_{number}")
else: else:
click.echo(f"({student}:assignment_{number}):") click.echo(f"({student}:assignment_{number}):")
for root in local_roots: for root in local_roots:
os.chdir(current_wd)
if verbose: if verbose:
click.echo(f"\t[{os.path.basename(root)}]: ", nl=False) click.echo(f"\t[{os.path.basename(root)}]: ", nl=False)
click.echo(f"entering {os.path.relpath(root)}") click.echo(f"entering {os.path.relpath(root)}")
os.chdir(root) os.chdir(root)
result = subprocess.run( result = subprocess.run(
shlex.split(command), capture_output=True, text=True) shlex.split("make distclean"), capture_output=True, text=True)
result = subprocess.run(
shlex.split(compile_command), capture_output=True, text=True)
if verbose: if verbose:
if (result.returncode != 0): if (result.returncode != 0):
click.echo( click.echo(
@@ -400,7 +577,89 @@ def compile(number, path, command, verbose):
click.echo( click.echo(
f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: SUCCESS") f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: SUCCESS")
os.chdir(current_wd) executables = [os.path.join(root, file)
for file in os.listdir(root) if 'exe-' in file and os.path.isfile(os.path.join(root, file))]
sol_as_path = os.path.join(solution_path, f"assignment_{number}")
config_path = os.path.join(sol_as_path, "test_config.toml")
if not os.path.isdir(sol_as_path):
click.echo(
f"{ERROR_BOX}: No reference solution for assignment_{number} found.", err=True)
return
if not os.path.exists(config_path) or not os.path.isfile(config_path):
click.echo(
f"{ERROR_BOX}: No test configuration for assignment_{number} found.", err=True)
return
config = dict()
with open(config_path, 'rb') as handle:
config = tomllib.load(handle)
if not 'ref_solution' in config["correctness"]:
click.echo(
f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True)
return
ref_solution = config['correctness']['ref_solution']
ref_solution_path = os.path.join(sol_as_path, ref_solution)
if not 'ref_param' in config["correctness"]:
click.echo(
f"{ERROR_BOX}: No function dissect config found for assignment_{number}.", err=True)
return
ref_param = config['correctness']['ref_param']
ref_param_path = os.path.join(sol_as_path, ref_param)
local_run_command = " ".join([run_command, nproc_flag,
executables[0], ref_param_path])
local_solution_path = os.path.join(root, 'p.dat')
if os.path.exists(local_solution_path):
os.remove(local_solution_path)
try:
result = subprocess.run(
shlex.split(local_run_command), capture_output=True, text=True, timeout=10)
except subprocess.TimeoutExpired:
click.echo("Run Timeout")
result.returncode = 1
if verbose:
if (result.returncode != 0):
click.echo(
f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False)
continue
else:
click.echo(
f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: ", nl=False)
click.echo(f"stdout: {result.stdout}")
click.echo(f"stderr: {result.stderr}")
else:
if (result.returncode != 0):
click.echo(
f"\t\033[0;91m[{os.path.basename(root)}]\033[0m: ", nl=False)
click.echo(f"stderr: {result.stderr[:69]:69s}...")
continue
else:
click.echo(
f"\t\033[0;92m[{os.path.basename(root)}]\033[0m: SUCCESS")
if not os.path.exists(local_solution_path):
data_files = [file for file in os.listdir(
root) if '.dat' in file]
print(data_files)
continue
local_solution_data = np.loadtxt(local_solution_path, usecols=True)
ref_solution_data = np.loadtxt(ref_solution_path, usecols=True)
if local_solution_data.shape != ref_solution_data.shape:
click.echo("Correctness Test Failed, Dimenstions Not Matching")
continue
elif not np.allclose(local_solution_data, ref_solution_data, rtol=1e-1):
click.echo("Correctness Test Failed: Data Not Matching")
continue
else:
click.echo("Correctness Test Passed")
@click.command() @click.command()
@@ -420,8 +679,8 @@ def dissect(number, solution_path, path):
click.echo( click.echo(
f"{ERROR_BOX}: No test configuration for assignment_{number} found.", err=True) f"{ERROR_BOX}: No test configuration for assignment_{number} found.", err=True)
return return
config = dict()
config = dict()
with open(config_path, 'rb') as handle: with open(config_path, 'rb') as handle:
config = tomllib.load(handle) config = tomllib.load(handle)
@@ -548,27 +807,50 @@ def dissect(number, solution_path, path):
ref_sol_data = np.loadtxt(ref_sol_path) ref_sol_data = np.loadtxt(ref_sol_path)
usr_sol_data = np.loadtxt(usr_sol_path) usr_sol_data = np.loadtxt(usr_sol_path)
if np.allclose(ref_sol_data, usr_sol_data) == 0: if np.allclose(ref_sol_data, usr_sol_data, rtol=1e-1) == 0:
click.echo( click.echo(
f"\t[{os.path.basename(root)}]:[CORRECT]:[{function}]:{SUCCESS_BOX}") f"\t[{os.path.basename(root)}]:[CORRECT]:[{function}]:{SUCCESS_BOX}")
else: else:
click.echo( click.echo(
f"\t[{os.path.basename(root)}]:[CORRECT]:[{function}]:{ERROR_BOX}") f"\t[{os.path.basename(root)}]:[CORRECT]:[{function}]:{ERROR_BOX}")
click.echo(f"\t\tstderr: {result.stderr[:61]:61s}...")
continue continue
os.chdir(current_wd) os.chdir(current_wd)
# @click.command()
# @click.option('-p', '--path', default='./roots', type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True))
# @click.option('-s', '--student', default='', type=StudentVarType())
# def dev(path, student):
# pass
@click.group()
def archives():
pass
@click.group()
def roots():
pass
@click.group()
def test():
pass
cli.add_command(init) cli.add_command(init)
cli.add_command(dev)
cli.add_command(archives) cli.add_command(archives)
cli.add_command(roots) cli.add_command(roots)
cli.add_command(test) cli.add_command(test)
archives.add_command(list_assign) archives.add_command(list_assign)
archives.add_command(unpack_assign) archives.add_command(unpack)
roots.add_command(collect) roots.add_command(collect)
test.add_command(correctness)
test.add_command(compile) test.add_command(compile)
test.add_command(dissect) test.add_command(dissect)