Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otk: use file handles instead of paths when reading files #107

Merged
merged 3 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions src/otk/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,16 @@ def root() -> int:


def _process(arguments: argparse.Namespace, dry_run: bool) -> int:
src = pathlib.Path("/dev/stdin" if arguments.input is None else arguments.input)
dst = pathlib.Path("/dev/stdout" if arguments.output is None else arguments.output) if not dry_run else None

if not src.exists():
log.fatal("INPUT path %r does not exist", str(src))
return 1

if not dry_run and not dst.parent.exists():
log.fatal("OUTPUT path %r does not exist", str(dst))
return 1
src = sys.stdin if arguments.input is None else open(arguments.input)
if not dry_run:
dst = sys.stdout if arguments.output is None else open(arguments.output, "w")

# the working directory is either the current directory for stdin or the
# directory the omnifest is located in
cwd = pathlib.Path.cwd() if arguments.input is None else src.parent
cwd = pathlib.Path.cwd() if arguments.input is None else pathlib.Path(src.name).parent

ctx = CommonContext(cwd)
doc = Omnifest.from_yaml_path(src)
doc = Omnifest.from_yaml_file(src)

# let's peek at the tree to validate some things necessary for compilation
# we might want to move this into a separate place once this gets shared
Expand Down Expand Up @@ -112,7 +105,7 @@ def _process(arguments: argparse.Namespace, dry_run: bool) -> int:

# and then output by writing to the output
if not dry_run:
dst.write_text(target_registry.get(kind, CommonTarget)().as_string(spec, tree))
dst.write(target_registry.get(kind, CommonTarget)().as_string(spec, tree))

return 0

Expand Down
4 changes: 0 additions & 4 deletions src/otk/directive.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ def include(ctx: Context, tree: Any) -> Any:
# TODO instead
log.info("otk.include=%s", str(file))

if not file.exists():
# TODO, better error type
raise Exception("otk.include nonexistent file %r" % file)

# TODO
return yaml.safe_load(file.read_text())

Expand Down
15 changes: 4 additions & 11 deletions src/otk/document.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import io
import logging
import pathlib
from copy import deepcopy
from typing import Any

Expand Down Expand Up @@ -27,17 +27,10 @@ def from_yaml_bytes(cls, text: bytes) -> "Omnifest":
return cls(deserialized_data)

@classmethod
def from_yaml_path(cls, path: pathlib.Path) -> "Omnifest":
def from_yaml_file(cls, file: io.IOBase) -> "Omnifest":
"""Read a YAML file into an Omnifest instance."""

log.debug("reading yaml from path %r", str(path))

# This is an invariant that should be handled at the calling side of
# this function
assert path.exists(), "path to exist"

with path.open("rb") as file:
return cls.from_yaml_bytes(file.read())
log.debug("reading yaml from path %r", file.name)
return cls.from_yaml_bytes(file.read())

@classmethod
def read(cls, deserialized_data: Any) -> dict[str, Any]:
Expand Down
77 changes: 33 additions & 44 deletions test/test_compile.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,45 @@
import argparse
import os.path
import pytest
import os


from otk.command import compile


@pytest.mark.parametrize(
"arguments, input_data, output_data, sys_exit_code, log_message",
[
# "GENERATE" is a placeholder to append "tmp_path" from pytest
(
argparse.Namespace(input="GENERATE", output="GENERATE", target=None),
"""otk.version: 1
otk.target.osbuild.qcow2: { test: 1 }
""",
"""{
fake_otk_yaml = """
otk.version: 1
otk.target.osbuild.qcow2:
test: 1
"""

expected_otk_tree = """\
{
"test": 1,
"version": "2",
"sources": {}
}""",
0,
None,
),
],
)
def test_compile(
tmp_path,
caplog,
capsys,
monkeypatch,
arguments,
input_data,
output_data,
sys_exit_code,
log_message,
):
if arguments.input == "GENERATE":
input_filename = "input.yaml"
arguments.input = os.path.join(tmp_path, input_filename)
with open(arguments.input, "w") as f:
f.write(input_data)

if arguments.output == "GENERATE":
# fix path so we only write to tmp_path
arguments.output = os.path.join(tmp_path, "output.yaml")
}"""


def test_compile_integration_file(tmp_path):
input_path = tmp_path / "input.txt"
input_path.write_text(fake_otk_yaml)
output_path = tmp_path / "output.txt"

arguments = argparse.Namespace(input=input_path, output=output_path, target="osbuild")
ret = compile(arguments)
assert ret == sys_exit_code
assert ret == 0

if log_message:
assert log_message in caplog.text
assert output_path.exists()
assert output_path.read_text() == expected_otk_tree


def test_compile_integration_stdin(capsys, monkeypatch):
mocked_stdin = os.memfd_create("<fake-stdin>")
os.write(mocked_stdin, fake_otk_yaml.encode("utf8"))
os.lseek(mocked_stdin, 0, 0)
monkeypatch.setattr("sys.stdin", os.fdopen(mocked_stdin))

arguments = argparse.Namespace(input=None, output=None, target="osbuild")
ret = compile(arguments)
assert ret == 0

assert os.path.exists(arguments.output)
with open(arguments.output) as f:
assert f.readlines() == output_data.splitlines(True)
assert capsys.readouterr().out == expected_otk_tree
7 changes: 7 additions & 0 deletions test/test_directive.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ def test_include_unhappy():
include(ctx, 1)


def test_include_file_missing_errors(tmp_path):
ctx = CommonContext()

with pytest.raises(FileNotFoundError):
include(ctx, "non-existing.yml")


def test_op_seq_join():
ctx = CommonContext()

Expand Down
41 changes: 29 additions & 12 deletions test/test_document.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import textwrap

import pytest

from otk.document import Omnifest
Expand Down Expand Up @@ -27,15 +29,7 @@ def test_omnifest_ensure_no_keys():
assert Omnifest.ensure({})


def test_omnifest_from_yaml_bytes():
# Happy
Omnifest.from_yaml_bytes(
"""
otk.version: 1
otk.target.osbuild: {}
"""
)

def test_omnifest_from_yaml_bytes_sad():
# A bunch of YAMLs that don't contain a top level map
with pytest.raises(ParseTypeError):
Omnifest.from_yaml_bytes(
Expand Down Expand Up @@ -83,6 +77,29 @@ def test_omnifest_from_yaml_bytes():
)


def test_omnifest_from_yaml_path():
# TODO
...
TEST_CASES = [
(
textwrap.dedent("""
otk.version: 1
otk.target.osbuild:
some: key
"""),
{"otk.target.osbuild": {"some": "key"}, "otk.version": 1},
)
]


@pytest.mark.parametrize("fake_input,expected_tree", TEST_CASES)
def test_omnifest_from_yaml_bytes_happy(fake_input, expected_tree):
omi = Omnifest.from_yaml_bytes(fake_input)
assert omi.tree == expected_tree


@pytest.mark.parametrize("fake_input,expected_tree", TEST_CASES)
def test_omnifest_from_yaml_file_happy(tmp_path, fake_input, expected_tree):
fake_yaml_path = tmp_path / "test.otk"
fake_yaml_path.write_text(fake_input)

with fake_yaml_path.open() as fp:
omi = Omnifest.from_yaml_file(fp)
assert omi.tree == expected_tree
Loading