Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions tests/core/pyspec/eth2spec/gen_helpers/gen_base/dumper.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from pathlib import Path

from eth_utils import encode_hex
from ruamel.yaml import YAML
from snappy import compress

from eth2spec.test import context

from .gen_typing import TestCase


def get_default_yaml():
yaml = YAML(pure=True)
Expand Down Expand Up @@ -48,31 +48,31 @@ def cfg_represent_quoted_str(self, data):
class Dumper:
"""Helper for dumping test case outputs (cfg, data, meta, ssz)."""

def __init__(self, default_yaml: YAML = None, cfg_yaml: YAML = None):
def __init__(self, default_yaml: YAML | None = None, cfg_yaml: YAML | None = None):
self.default_yaml = default_yaml or get_default_yaml()
self.cfg_yaml = cfg_yaml or get_cfg_yaml()

def dump_meta(self, test_case: TestCase, meta: dict) -> None:
def dump_meta(self, dir: Path, meta: dict) -> None:
if not meta:
return
self._dump_yaml(test_case, "meta", meta, self.default_yaml)
self._dump_yaml(dir, "meta", meta, self.default_yaml)

def dump_cfg(self, test_case: TestCase, name: str, data: any) -> None:
self._dump_yaml(test_case, name, data, self.cfg_yaml)
def dump_cfg(self, dir: Path, name: str, data: any) -> None:
self._dump_yaml(dir, name, data, self.cfg_yaml)

def dump_data(self, test_case: TestCase, name: str, data: any) -> None:
self._dump_yaml(test_case, name, data, self.default_yaml)
def dump_data(self, dir: Path, name: str, data: any) -> None:
self._dump_yaml(dir, name, data, self.default_yaml)

def dump_ssz(self, test_case: TestCase, name: str, data: bytes) -> None:
def dump_ssz(self, dir: Path, name: str, data: bytes) -> None:
"""Compress and write SSZ data for test case."""
path = test_case.dir / f"{name}.ssz_snappy"
path = dir / f"{name}.ssz_snappy"
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("wb") as f:
f.write(compress(data))

def _dump_yaml(self, test_case: TestCase, name: str, data: any, yaml_encoder: YAML) -> None:
def _dump_yaml(self, dir: Path, name: str, data: any, yaml_encoder: YAML) -> None:
"""Helper to write YAML files for test case."""
path = test_case.dir / f"{name}.yaml"
path = dir / f"{name}.yaml"
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
yaml_encoder.dump(data, f)
5 changes: 3 additions & 2 deletions tests/core/pyspec/eth2spec/gen_helpers/gen_base/gen_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

# Flag that the runner does NOT run test via pytest
context.is_pytest = False
context.is_generator = True


def get_shared_prefix(test_cases, min_segments=3):
Expand Down Expand Up @@ -104,10 +105,10 @@ def execute_test(test_case: TestCase, dumper: Dumper):

for name, kind, data in outputs:
method = getattr(dumper, f"dump_{kind}")
method(test_case, name, data)
method(test_case.dir, name, data)

if meta:
dumper.dump_meta(test_case, meta)
dumper.dump_meta(test_case.dir, meta)


def run_generator(input_test_cases: Iterable[TestCase], args=None):
Expand Down
3 changes: 3 additions & 0 deletions tests/core/pyspec/eth2spec/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,6 @@ def bls_type(request):
bls_utils.use_fastest()
else:
raise Exception(f"unrecognized bls type: {bls_type}")


pytest_plugins = ["tests.infra.pytest_plugins.yield_generator"]
11 changes: 8 additions & 3 deletions tests/core/pyspec/eth2spec/test/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from lru import LRU

from eth2spec.utils import bls
from tests.infra.pytest_plugins.yield_generator import MultiPhaseResult

from .exceptions import SkippedTest
from .helpers.constants import (
Expand Down Expand Up @@ -302,7 +303,7 @@ def entry(*args, **kw):


is_pytest = True

is_generator = False

def dump_skipping_message(reason: str) -> None:
message = f"[Skipped test] {reason}"
Expand Down Expand Up @@ -582,10 +583,14 @@ def _run_test_case_with_phases(fn, phases, other_phases, kw, args, is_fork_trans

# Return is ignored whenever multiple phases are ran.
# This return is for test generators to emit python generators (yielding test vector outputs)

results: MultiPhaseResult = {}

for phase in run_phases:
ret = fn(spec=targets[phase], phases=phase_dir, *args, **kw)
results[phase] = ret

return ret
return results


def with_phases(phases, other_phases=None):
Expand Down Expand Up @@ -758,7 +763,7 @@ def wrapper(*args, spec: Spec, **kw):
def only_generator(reason):
def _decorator(inner):
def _wrapper(*args, **kwargs):
if is_pytest:
if not is_generator and is_pytest:
dump_skipping_message(reason)
return None
return inner(*args, **kwargs)
Expand Down
1 change: 1 addition & 0 deletions tests/core/pyspec/eth2spec/test/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def generator_mode():
# Just complete the function, ignore all yielded data,
# we are not using it (or processing it, i.e. nearly zero efficiency loss)
# Pytest does not support yielded data in the outer function, so we need to wrap it like this.
return generator_mode()
for _ in fn(*args, **kw):
continue
return None
Expand Down
224 changes: 224 additions & 0 deletions tests/infra/pytest_plugins/yield_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
from __future__ import annotations

from collections.abc import Iterable
from typing import Mapping, Any

import _pytest
import pytest

from eth2spec.test.helpers.typing import SpecForkName
from eth2spec.gen_helpers.gen_base.dumper import Dumper
from eth2spec.test import context
from tests.infra.manifest import Manifest
from pathlib import Path

RUNNERS = ["kzg", "epoch_processing"]

MultiPhaseResult = dict[SpecForkName, list]


class SpecTestFunction(pytest.Function):
"""Custom pytest.Function subclass that captures test results.

This class extends pytest.Function to store test vectors generated by
spec tests, supporting both single-phase and multi-phase test results.

Attributes:
result: Test result data, either a list (single-phase) or a dict
mapping fork names to lists (multi-phase)
"""

result: MultiPhaseResult | list | None = None

@classmethod
def from_function(cls, f: pytest.Function) -> SpecTestFunction:
"""
Create a SpecTestFunction from an existing pytest.Function.
"""
self = cls.from_parent(
parent=f.parent,
name=f.name,
callspec=getattr(f, "callspec", None),
callobj=getattr(f, "_obj", _pytest.compat.NOTSET),
keywords=f.keywords,
fixtureinfo=getattr(f, "_fixtureinfo", None),
originalname=f.originalname,
)
self.manifest_guess()

return self

def manifest_guess(self) -> None:
print("guessing manifest for:", self.name)
path = self.parent.path
str_path = str(path)
filename = path.name

possible_runners = [runner for runner in RUNNERS if runner in str_path]
if len(possible_runners) == 1:
runner_name = possible_runners[0]
else:
return

handler_name = filename.replace("test_", "").replace(".py", "")

suite_name = getattr(self.obj, "suite_name", "pyspec_tests")

case_name = self.name
if case_name.startswith("test_"):
case_name = case_name[5:]

manifest = Manifest(
runner_name=runner_name,
handler_name=handler_name,
case_name=case_name,
suite_name=suite_name,
)

if hasattr(self.obj, "manifest") and self.obj.manifest is not None:
manifest = self.obj.manifest.override(manifest)

self.obj.manifest = manifest

def runtest(self):
super().runtest()

def get_manifest(self) -> Manifest | None:
if not hasattr(self.obj, "manifest") or self.obj.manifest is None:
return None
return self.obj.manifest

def get_result(self) -> MultiPhaseResult | list | None:
return self.result


class YieldGeneratorPlugin:
output_dir: str = "generated-tests"
dumper: Dumper | None = None

def __init__(self, config):
self.config = config

def register(self):
self.config.pluginmanager.register(self, "yield_generator")

@pytest.hookimpl(tryfirst=True)
def pytest_pyfunc_call(self, pyfuncitem: pytest.Function):
if not isinstance(pyfuncitem, SpecTestFunction):
return False

pyfuncitem.result = None

testfunction = pyfuncitem.obj
if _pytest.compat.is_async_function(testfunction):
_pytest.compat.async_fail(pyfuncitem.nodeid)
funcargs = pyfuncitem.funcargs
testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
result = testfunction(**testargs)
if hasattr(result, "__await__") or hasattr(result, "__aiter__"):
_pytest.compat.async_fail(pyfuncitem.nodeid)
elif result is not None:
if not isinstance(result, dict) and isinstance(result, Iterable):
pyfuncitem.result = list(result)
else:
pyfuncitem.result = result
return True

@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_protocol(self, item, nextitem):
print(f"\nRunning test: {item.name}")

yield

if self.config.getoption("--pytest-reftests") is False:
return

if not isinstance(item, SpecTestFunction):
return

manifest = item.get_manifest()
result = item.get_result()

if manifest is not None:
print(f"\nManifest from {item.name}:")
print(manifest)
else:
print(f"\nNo manifest found for {item.name}")

if manifest is not None and result is None:
print(f"\nWarning: manifest but vector not created for {item.name}")

if result is not None:
if isinstance(result, dict) and isinstance(list(result.keys())[0], str):
print(f"\nMulti-phase test result for {item.name}")
else:
print(f"\nSingle-phase test result for {item.name}")

if manifest is not None and result is not None:
self.generate_test_vector(manifest, result)

def pytest_collection_modifyitems(self, config, items):
for i, item in enumerate(items):
if isinstance(item, pytest.Function):
# Replace with custom item
items[i] = SpecTestFunction.from_function(item)

def pytest_addoption(self, parser):
"""Add custom command-line options"""
parser.addoption(
"--pytest-reftests",
action="store_true",
default=True,
help="Vector tests generation"
)

def pytest_configure(self, config):
if config.getoption("--pytest-reftests"):
context.is_generator = True

def generate_test_vector(self, manifest: Manifest, result: MultiPhaseResult | list) -> None:
if isinstance(result, dict):
for fork_name, phase_result in result.items():
self.generate_test_vector_phase(manifest, phase_result, fork_name)

def generate_test_vector_phase(self, manifest: Manifest, phase_result: list, fork_name: SpecForkName) -> None:
dumper = self.get_dumper()

manifest = manifest.override(Manifest(fork_name=fork_name, preset_name="mainnet"))
assert manifest.is_complete(), f"Manifest must be complete to generate test vector for {manifest}"

dir = (
Path(self.output_dir)
/ manifest.preset_name # type: ignore
/ manifest.fork_name
/ manifest.runner_name
/ manifest.handler_name
/ manifest.suite_name
/ manifest.case_name
)

outputs: list[tuple[str, str, Any]] = []
meta: dict[str, Any] = {}

for name, kind, data in phase_result:
if kind == "meta":
meta[name] = data
else:
method = getattr(dumper, f"dump_{kind}", None)
if method is None:
raise ValueError(f"Unknown kind {kind!r}")
outputs.append((name, kind, data))

for name, kind, data in outputs:
method = getattr(dumper, f"dump_{kind}")
method(dir, name, data)

def get_dumper(self):
if self.dumper is None:
self.dumper = Dumper()
return self.dumper


def pytest_configure(config):
"""Register the plugin."""
YieldGeneratorPlugin(config).register()