Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support testlib SPJ for grader #168

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 107 additions & 93 deletions cyaron/compare.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from __future__ import absolute_import, print_function
from .io import IO
from . import log
from cyaron.utils import *
from cyaron.consts import *
from cyaron.graders import CYaRonGraders
import subprocess

import multiprocessing
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor
from io import open
import os
from typing import List, Optional, Tuple, Union, cast

from cyaron.consts import *
from cyaron.graders import CYaRonGraders, GraderType3
from cyaron.utils import *

from . import log
from .io import IO


class CompareMismatch(ValueError):
Expand All @@ -22,25 +26,35 @@ def __str__(self):
return "In program: '{}'. {}".format(self.name, self.mismatch)


PrgoramType = Union[str, Tuple[str, ...], List[str]]


class Compare:

@staticmethod
def __compare_two(name, content, std, grader):
(result, info) = CYaRonGraders.invoke(grader, content, std)
def __compare_two(name: PrgoramType, content: str, std: str,
input_content: str, grader: Union[str, GraderType3]):
result, info = CYaRonGraders.invoke(grader, content, std,
input_content)
status = "Correct" if result else "!!!INCORRECT!!!"
info = info if info is not None else ""
log.debug("{}: {} {}".format(name, status, info))
if not result:
raise CompareMismatch(name, info)

@staticmethod
def __process_file(file):
def __process_output_file(file: Union[str, IO]):
if isinstance(file, IO):
if file.output_filename is None:
raise ValueError("IO object has no output file.")
file.flush_buffer()
file.output_file.seek(0)
return file.output_filename, file.output_file.read()
with open(file.output_filename,
"r",
newline="\n",
encoding='utf-8') as f:
return file.output_filename, f.read()
else:
with open(file, "r", newline="\n") as f:
with open(file, "r", newline="\n", encoding="utf-8") as f:
return file, f.read()

@staticmethod
Expand All @@ -64,7 +78,7 @@ def output(cls, *files, **kwargs):
("stop_on_incorrect", None),
),
)
std = kwargs["std"]
std: IO = kwargs["std"]
grader = kwargs["grader"]
max_workers = kwargs["max_workers"]
job_pool = kwargs["job_pool"]
Expand All @@ -75,8 +89,6 @@ def output(cls, *files, **kwargs):
if (max_workers is None or max_workers >= 0) and job_pool is None:
max_workers = cls.__normal_max_workers(max_workers)
try:
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=max_workers) as job_pool:
return cls.output(*files,
std=std,
Expand All @@ -87,52 +99,58 @@ def output(cls, *files, **kwargs):
pass

def get_std():
return cls.__process_file(std)[1]
return cls.__process_output_file(std)[1]

if job_pool is not None:
std = job_pool.submit(get_std).result()
std_answer = job_pool.submit(get_std).result()
else:
std = get_std()
std_answer = get_std()

with open(std.input_filename, "r", newline="\n",
encoding="utf-8") as input_file:
input_text = input_file.read()

def do(file):
(file_name, content) = cls.__process_file(file)
cls.__compare_two(file_name, content, std, grader)
(file_name, content) = cls.__process_output_file(file)
cls.__compare_two(file_name, content, std_answer, input_text,
grader)

if job_pool is not None:
job_pool.map(do, files)
else:
[x for x in map(do, files)]

@classmethod
def program(cls, *programs, **kwargs):
kwargs = unpack_kwargs(
"program",
kwargs,
(
"input",
("std", None),
("std_program", None),
("grader", DEFAULT_GRADER),
("max_workers", -1),
("job_pool", None),
("stop_on_incorrect", None),
),
)
input = kwargs["input"]
std = kwargs["std"]
std_program = kwargs["std_program"]
grader = kwargs["grader"]
max_workers = kwargs["max_workers"]
job_pool = kwargs["job_pool"]
if kwargs["stop_on_incorrect"] is not None:
def program(cls,
*programs: Union[PrgoramType, Tuple[PrgoramType, float]],
input: Union[IO, str],
std: Optional[Union[str, IO]] = None,
std_program: Optional[Union[str, Tuple[str, ...],
List[str]]] = None,
grader: Union[str, GraderType3] = DEFAULT_GRADER,
max_workers: Optional[int] = -1,
job_pool: Optional[ThreadPoolExecutor] = None,
stop_on_incorrect=None):
"""
Compare the output of the programs with the standard output.

Args:
programs: The programs to be compared.
input: The input file.
std: The standard output file.
std_program: The program that generates the standard output.
grader: The grader to be used.
max_workers: The maximum number of workers.
job_pool: The job pool.
stop_on_incorrect: Deprecated and has no effect.
"""
if stop_on_incorrect is not None:
log.warn(
"parameter stop_on_incorrect is deprecated and has no effect.")

if (max_workers is None or max_workers >= 0) and job_pool is None:
max_workers = cls.__normal_max_workers(max_workers)
try:
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=max_workers) as job_pool:
return cls.program(*programs,
input=input,
Expand All @@ -144,74 +162,70 @@ def program(cls, *programs, **kwargs):
except ImportError:
pass

if not isinstance(input, IO):
raise TypeError("expect {}, got {}".format(
type(IO).__name__,
type(input).__name__))
input.flush_buffer()
input.input_file.seek(0)
if isinstance(input, IO):
input.flush_buffer()

if std_program is not None:

def get_std():
with open(os.dup(input.input_file.fileno()), "r",
newline="\n") as input_file:
content = make_unicode(
subprocess.check_output(
std_program,
shell=(not list_like(std_program)),
stdin=input.input_file,
universal_newlines=True,
))
input_file.seek(0)
def get_std_from_std_program():
with open(input.input_filename
if isinstance(input, IO) else input,
"r",
newline="\n",
encoding="utf-8") as input_file:
content = subprocess.check_output(
std_program,
shell=(not list_like(std_program)),
stdin=input_file,
universal_newlines=True,
encoding="utf-8")
return content

if job_pool is not None:
std = job_pool.submit(get_std).result()
std = job_pool.submit(get_std_from_std_program).result()
else:
std = get_std()
std = get_std_from_std_program()
elif std is not None:

def get_std():
return cls.__process_file(std)[1]
def get_std_from_std_file():
return cls.__process_output_file(cast(Union[str, IO], std))[1]

if job_pool is not None:
std = job_pool.submit(get_std).result()
std = job_pool.submit(get_std_from_std_file).result()
else:
std = get_std()
std = get_std_from_std_file()
else:
raise TypeError(
"program() missing 1 required non-None keyword-only argument: 'std' or 'std_program'"
)

def do(program_name):
with open(input.input_filename if isinstance(input, IO) else input,
"r",
newline="\n",
encoding="utf-8") as input_file:
input_text = input_file.read()

def do(program_name: Union[PrgoramType, Tuple[PrgoramType, float]]):
timeout = None
if (list_like(program_name) and len(program_name) == 2
and int_like(program_name[-1])):
program_name, timeout = program_name
with open(os.dup(input.input_file.fileno()), "r",
newline="\n") as input_file:
if timeout is None:
content = make_unicode(
subprocess.check_output(
program_name,
shell=(not list_like(program_name)),
stdin=input_file,
universal_newlines=True,
))
else:
content = make_unicode(
subprocess.check_output(
program_name,
shell=(not list_like(program_name)),
stdin=input_file,
universal_newlines=True,
timeout=timeout,
))
input_file.seek(0)
cls.__compare_two(program_name, content, std, grader)
if isinstance(program_name, tuple) and len(program_name) == 2 and (
isinstance(program_name[1], float)
or isinstance(program_name[1], int)):
program_name, timeout = cast(Tuple[PrgoramType, float],
program_name)
else:
program_name = cast(PrgoramType, program_name)
content = subprocess.check_output(
list(program_name)
if isinstance(program_name, tuple) else program_name,
shell=(not list_like(program_name)),
input=input_text,
universal_newlines=True,
encoding="utf-8",
timeout=timeout)
cls.__compare_two(program_name, content, std, input_text, grader)

if job_pool is not None:
job_pool.map(do, programs)
else:
[x for x in map(do, programs)]
for program in programs:
do(program)
5 changes: 3 additions & 2 deletions cyaron/graders/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .graderregistry import CYaRonGraders
from .graderregistry import CYaRonGraders, GraderType2, GraderType3

from .fulltext import fulltext
from .noipstyle import noipstyle
from .noipstyle import noipstyle
from .testlib_checker import TestlibChecker
48 changes: 42 additions & 6 deletions cyaron/graders/graderregistry.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,54 @@
from typing import Callable, Tuple, Dict, Union, Any

__all__ = ['CYaRonGraders', 'GraderType2', 'GraderType3']

GraderType2 = Callable[[str, str], Tuple[bool, Any]]
GraderType3 = Callable[[str, str, str], Tuple[bool, Any]]


class GraderRegistry:
_registry = dict()
"""A registry for grader functions."""
_registry: Dict[str, GraderType3] = {}

def grader2(self, name: str):
"""
This decorator registers a grader function under a specific name in the registry.

The function being decorated should accept exactly two parameters (excluding
the content input).
"""

def wrapper(func: GraderType2):
self._registry[name] = lambda content, std, _: func(content, std)
return func

return wrapper

grader = grader2

def grader(self, name):
def grader3(self, name: str):
"""
This decorator registers a grader function under a specific name in the registry.

The function being decorated should accept exactly three parameters.
"""

def wrapper(func):
def wrapper(func: GraderType3):
self._registry[name] = func
return func

return wrapper

def invoke(self, name, content, std):
return self._registry[name](content, std)
def invoke(self, grader: Union[str, GraderType3], content: str, std: str,
input_content: str):
"""Invoke a grader function by name or function object."""
if isinstance(grader, str):
return self._registry[grader](content, std, input_content)
else:
return grader(content, std, input_content)

def check(self, name):
def check(self, name: str):
"""Check if a grader is registered."""
return name in self._registry


Expand Down
Loading
Loading