From 7869e13b787cc9fd0c82bba07e677463f655b11f Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 5 Apr 2025 12:24:31 +0200 Subject: [PATCH 01/11] Added new fields in workflow --- example_workflows/example.json | 211 ++++++++++-------- src/sio3pack/workflow/execution/__init__.py | 1 - src/sio3pack/workflow/execution/channels.py | 41 ++++ .../workflow/execution/descriptors.py | 30 +++ .../workflow/execution/mount_namespace.py | 12 +- src/sio3pack/workflow/execution/pipe.py | 25 --- src/sio3pack/workflow/execution/process.py | 23 +- src/sio3pack/workflow/execution/stream.py | 150 +++++++++++++ src/sio3pack/workflow/tasks.py | 32 ++- 9 files changed, 379 insertions(+), 146 deletions(-) create mode 100644 src/sio3pack/workflow/execution/channels.py create mode 100644 src/sio3pack/workflow/execution/descriptors.py delete mode 100644 src/sio3pack/workflow/execution/pipe.py create mode 100644 src/sio3pack/workflow/execution/stream.py diff --git a/example_workflows/example.json b/example_workflows/example.json index 7c53951..55d82a6 100644 --- a/example_workflows/example.json +++ b/example_workflows/example.json @@ -1,93 +1,122 @@ { - "name": "Example workflow", - "external_objects" : [ - "example-object" - ], - "observable_objects" : [], - "observable_registers" : 1, - "registers" : 3, - "tasks" : [ - { - "name": "Example script", - "input_registers" : [ - 1 - ], - "output_registers" : [ - 0, - 2 - ], - "reactive" : true, - "script" : "", - "type" : "script" - }, - { - "name": "Example execution", - "exclusive" : true, - "filesystems" : [ - { - "image" : "example-image", - "path" : "", - "type" : "image" - }, - { - "type" : "empty" - }, - { - "handle" : "example-object", - "type" : "object" - } - ], - "hard_time_limit" : 2137, - "mount_namespaces" : [ - { - "mountpoints" : [ - { - "source" : 1, - "target" : "/exe", - "writable" : false - } - ], - "root" : 0 - } - ], - "output_register" : 1, - "pid_namespaces" : 1, - "pipes" : [ - { - "buffer_size" : 1048576, - "file_buffer_size" : 1073741824, - "limit" : 2147483648 - } - ], - "processes" : [ - { - "arguments" : [ - "/exe", - "--help" - ], - "environment" : [ - "TERM=xterm-256color" - ], - "image" : "/exe", - "mount_namespace" : 0, - "pid_namespace" : 0, - "resource_group" : 0, - "working_directory" : "/" - } - ], - "resource_groups" : [ - { - "cpu_usage_limit" : 21.37, - "instruction_limit" : 2137000000, - "memory_limit" : 2147483648, - "oom_terminate_all_tasks" : false, - "pid_limit" : 8, - "swap_limit" : 4, - "time_limit" : 2137000000 - } - ], - "system_pipes" : 3, - "type" : "execution" - } - ] + "name": "Example workflow", + "external_objects": [ + "example-object" + ], + "observable_objects": [], + "observable_registers": 1, + "registers": 3, + "tasks": [ + { + "type": "script", + "name": "example_script", + "input_registers": [ + 1 + ], + "output_registers": [ + 0, + 2 + ], + "reactive": true, + "script": "" + }, + { + "type": "execution", + "name": "example_execution", + "channels": [ + { + "buffer_size": 1048576, + "file_buffer_size": 1073741824, + "limit": 2147483648, + "source_pipe": 0, + "target_pipe": 1 + } + ], + "exclusive": true, + "filesystems": [ + { + "type": "image", + "image": "example-image", + "path": "" + }, + { + "type": "empty" + }, + { + "type": "object", + "handle": "example-object" + } + ], + "hard_time_limit": 2137, + "mount_namespaces": [ + { + "root": 0, + "mountpoints": [ + { + "source": 1, + "target": "/exe", + "writable": false + } + ] + } + ], + "output_register": 1, + "pid_namespaces": 1, + "pipes": 3, + "processes": [ + { + "arguments": [ + "/exe", + "--help" + ], + "descriptors": { + "0": { + "type": "object_read", + "handle": "stdin" + }, + "1": { + "type": "object_write", + "handle": "stdout" + }, + "2": { + "type": "null" + }, + "3": { + "type": "pipe_write", + "pipe": 0 + }, + "4": { + "type": "pipe_read", + "pipe": 1 + }, + "65534": { + "type": "file", + "filesystem": 0, + "path": "/etc/hosts", + "mode": "read" + } + }, + "environment": [ + "TERM=xterm-256color" + ], + "image": "/exe", + "mount_namespace": 0, + "pid_namespace": 0, + "resource_group": 0, + "working_directory": "/" + } + ], + "resource_groups": [ + { + "cpu_usage_limit": 21.37, + "instruction_limit": 2137000000, + "memory_limit": 2147483648, + "oom_terminate_all_tasks": false, + "pid_limit": 8, + "swap_limit": 4, + "time_limit": 2137000000 + } + ] + } + ] } diff --git a/src/sio3pack/workflow/execution/__init__.py b/src/sio3pack/workflow/execution/__init__.py index 1e6085c..f817d34 100644 --- a/src/sio3pack/workflow/execution/__init__.py +++ b/src/sio3pack/workflow/execution/__init__.py @@ -1,5 +1,4 @@ from sio3pack.workflow.execution.filesystems import Filesystem from sio3pack.workflow.execution.mount_namespace import MountNamespace -from sio3pack.workflow.execution.pipe import Pipe from sio3pack.workflow.execution.process import Process from sio3pack.workflow.execution.resource_group import ResourceGroup diff --git a/src/sio3pack/workflow/execution/channels.py b/src/sio3pack/workflow/execution/channels.py new file mode 100644 index 0000000..f9a647c --- /dev/null +++ b/src/sio3pack/workflow/execution/channels.py @@ -0,0 +1,41 @@ +class Channel: + def __init__(self, buffer_size: int, source_pipe: int, target_pipe: int, file_buffer_size: int = None, limit: int = None): + """ + Create a new channel. + :param buffer_size: The size of the buffer. + :param source_pipe: The source pipe index. + :param target_pipe: The target pipe index. + :param file_buffer_size: The size of the file buffer. + :param limit: The limit of the channel. + """ + self.buffer_size = buffer_size + self.source_pipe = source_pipe + self.target_pipe = target_pipe + self.file_buffer_size = file_buffer_size + self.limit = limit + + @classmethod + def from_json(cls, data: dict) -> "Channel": + """ + Create a new channel from a dictionary. + :param data: The dictionary to create the channel from. + """ + return cls( + data["buffer_size"], + data["source_pipe"], + data["target_pipe"], + data.get("file_buffer_size"), + data.get("limit"), + ) + + def to_json(self) -> dict: + res = { + "buffer_size": self.buffer_size, + "source_pipe": self.source_pipe, + "target_pipe": self.target_pipe, + } + if self.file_buffer_size: + res["file_buffer_size"] = self.file_buffer_size + if self.limit: + res["limit"] = self.limit + return res diff --git a/src/sio3pack/workflow/execution/descriptors.py b/src/sio3pack/workflow/execution/descriptors.py new file mode 100644 index 0000000..fd22fa6 --- /dev/null +++ b/src/sio3pack/workflow/execution/descriptors.py @@ -0,0 +1,30 @@ +from sio3pack.workflow.execution.filesystems import FilesystemManager +from sio3pack.workflow.execution.stream import Stream +from sio3pack.workflow.object import ObjectsManager + + +class DescriptorManager: + def __init__(self, objects_manager: ObjectsManager, filesystem_manager: FilesystemManager): + self.filesystem_manager = filesystem_manager + self.descriptors: dict[int, Stream] = {} + + def add_descriptor(self, fd: int, stream: Stream): + """ + Add a stream to the descriptor manager. + :param fd: The file descriptor. + :param stream: The stream to add. + """ + self.descriptors[int(fd)] = stream + + def from_json(self, data: dict, objects_manager: ObjectsManager, filesystem_manager: FilesystemManager): + for fd, stream_data in data.items(): + stream = Stream.from_json(stream_data, objects_manager, filesystem_manager) + self.add_descriptor(int(fd), stream) + + def to_json(self) -> dict: + """ + Convert the descriptor manager to a JSON-serializable dictionary. + :return: The JSON-serializable dictionary. + """ + # Convert the fd numbers to strings, since in JSON keys cant be ints. + return {str(fd): stream.to_json() for fd, stream in self.descriptors.items()} diff --git a/src/sio3pack/workflow/execution/mount_namespace.py b/src/sio3pack/workflow/execution/mount_namespace.py index 8af97da..a92209d 100644 --- a/src/sio3pack/workflow/execution/mount_namespace.py +++ b/src/sio3pack/workflow/execution/mount_namespace.py @@ -2,10 +2,11 @@ class Mountpoint: - def __init__(self, source: Filesystem, target: str, writable: bool = False): + def __init__(self, source: Filesystem, target: str, writable: bool = False, capacity: int | None = None): self.source = source self.target = target self.writable = writable + self.capacity = capacity @classmethod def from_json(cls, data: dict, filesystem_manager: FilesystemManager): @@ -16,15 +17,16 @@ def from_json(cls, data: dict, filesystem_manager: FilesystemManager): """ if isinstance(data["source"], str): # TODO: delete this return cls(data["source"], data["target"], data["writable"]) - return cls(filesystem_manager.get_by_id(int(data["source"])), data["target"], data["writable"]) + return cls(filesystem_manager.get_by_id(int(data["source"])), data["target"], data["writable"], data.get("capacity")) def to_json(self) -> dict: """ Convert the mountpoint to a dictionary. """ - if isinstance(self.source, str): # TODO: delete this - return {"source": self.source, "target": self.target, "writable": self.writable} - return {"source": self.source.id, "target": self.target, "writable": self.writable} + res = {"source": self.source.id, "target": self.target, "writable": self.writable} + if self.capacity is not None: + res["capacity"] = self.capacity + return res class MountNamespace: diff --git a/src/sio3pack/workflow/execution/pipe.py b/src/sio3pack/workflow/execution/pipe.py deleted file mode 100644 index 0b2e78a..0000000 --- a/src/sio3pack/workflow/execution/pipe.py +++ /dev/null @@ -1,25 +0,0 @@ -class Pipe: - def __init__(self, buffer_size: int = 1048576, file_buffer_size: int = 1073741824, limit: int = 2147483648): - """ - Create a new pipe config. - :param buffer_size: The buffer size for the pipe. - :param file_buffer_size: The buffer size for files. - :param limit: The limit for the pipe. - """ - self.buffer_size = buffer_size - self.file_buffer_size = file_buffer_size - self.limit = limit - - @classmethod - def from_json(cls, data: dict): - """ - Create a new pipe config from a dictionary. - :param data: The dictionary to create the pipe config from. - """ - return cls(data["buffer_size"], data["file_buffer_size"], data["limit"]) - - def to_json(self) -> dict: - """ - Convert the pipe config to a dictionary. - """ - return {"buffer_size": self.buffer_size, "file_buffer_size": self.file_buffer_size, "limit": self.limit} diff --git a/src/sio3pack/workflow/execution/process.py b/src/sio3pack/workflow/execution/process.py index 506ed26..094dc0e 100644 --- a/src/sio3pack/workflow/execution/process.py +++ b/src/sio3pack/workflow/execution/process.py @@ -1,10 +1,13 @@ -from sio3pack.workflow.execution.mount_namespace import MountNamespace, MountNamespaceManager -from sio3pack.workflow.execution.resource_group import ResourceGroup, ResourceGroupManager +from sio3pack.workflow.execution.descriptors import DescriptorManager +from sio3pack.workflow.execution.mount_namespace import MountNamespace +from sio3pack.workflow.execution.resource_group import ResourceGroup class Process: def __init__( self, + workflow: "Workflow", + task: "ExecutionTask", arguments: list[str] = None, environment: dict = None, image: str = "", @@ -29,6 +32,9 @@ def __init__( self.resource_group = resource_group self.pid_namespace = pid_namespace self.working_directory = working_directory + self.task = task + self.workflow = workflow + self.descriptor_manager = DescriptorManager(workflow.objects_manager, task.filesystem_manager) def to_json(self) -> dict: return { @@ -39,22 +45,27 @@ def to_json(self) -> dict: "resource_group": self.resource_group.id, "pid_namespace": self.pid_namespace, "working_directory": self.working_directory, + "descriptors": self.descriptor_manager.to_json(), } @classmethod def from_json( - cls, data: dict, mountnamespace_manager: MountNamespaceManager, resource_group_manager: ResourceGroupManager + cls, data: dict, workflow: "Workflow", task: "Task" ): env = {} for var in data["environment"]: key, value = var.split("=", 1) env[key] = value - return cls( + process = cls( + workflow, + task, data["arguments"], env, data["image"], - mountnamespace_manager.get_by_id(data["mount_namespace"]), - resource_group_manager.get_by_id(data["resource_group"]), + task.mountnamespace_manager.get_by_id(data["mount_namespace"]), + task.resource_group_manager.get_by_id(data["resource_group"]), data["pid_namespace"], data["working_directory"], ) + process.descriptor_manager.from_json(data["descriptors"], workflow.objects_manager, task.filesystem_manager) + return process diff --git a/src/sio3pack/workflow/execution/stream.py b/src/sio3pack/workflow/execution/stream.py new file mode 100644 index 0000000..77aab5d --- /dev/null +++ b/src/sio3pack/workflow/execution/stream.py @@ -0,0 +1,150 @@ +from enum import Enum + +from sio3pack.workflow import Object +from sio3pack.workflow.execution import Filesystem +from sio3pack.workflow.execution.filesystems import FilesystemManager +from sio3pack.workflow.object import ObjectsManager + + +class StreamType(Enum): + FILE = "file" + NULL = "null" + OBJECT_READ = "object_read" + OBJECT_WRITE = "object_write" + PIPE_READ = "pipe_read" + PIPE_WRITE = "pipe_write" + + +class FileMode(Enum): + READ = "read" + READ_WRTIE = "read_write" + READ_WRITE_APPEND = "read_write_append" + READ_WRITE_TRUNCATE = "read_write_truncate" + WRITE = "write" + WRITE_APPEND = "write_append" + WRITE_TRUNCATE = "write_truncate" + + +class Stream: + def __init__(self, type: StreamType): + self.type = type + + @classmethod + def from_json(cls, data: dict, objects_manager: ObjectsManager, filesystem_manager: FilesystemManager): + type = StreamType(data.get("type")) + if type == StreamType.FILE: + return FileStream.from_json(filesystem_manager, data) + elif type == StreamType.NULL: + return NullStream.from_json(data) + elif type in (StreamType.OBJECT_READ, StreamType.OBJECT_WRITE): + return ObjectStream.from_json(data, objects_manager) + elif type in (StreamType.PIPE_READ, StreamType.PIPE_WRITE): + return PipeStream.from_json(data) + else: + raise ValueError(f"Unknown stream type: {type}") + + def to_json(self) -> dict: + raise NotImplementedError("Subclasses must implement to_json method") + + +class FileStream(Stream): + def __init__(self, filesystem: Filesystem, path: str, mode: FileMode): + super().__init__(StreamType.FILE) + self.filesystem = filesystem + self.path = path + self.mode = mode + + @classmethod + def from_json(cls, filesystem_manager: FilesystemManager, data: dict): + return cls( + filesystem_manager.get_by_id(data.get("filesystem")), + data.get("path"), + FileMode(data.get("mode")), + ) + + def to_json(self) -> dict: + return { + "type": self.type.value, + "filesystem": self.filesystem.id, + "path": self.path, + "mode": self.mode.value, + } + + +class NullStream(Stream): + def __init__(self): + super().__init__(StreamType.NULL) + + @classmethod + def from_json(cls, data: dict): + return cls() + + def to_json(self) -> dict: + return { + "type": self.type.value, + } + + +class ObjectStream(Stream): + def __init__(self, type: StreamType, object: Object): + if type not in (StreamType.OBJECT_READ, StreamType.OBJECT_WRITE): + raise ValueError("Invalid stream type for ObjectStream") + super().__init__(type) + self.object = object + + @classmethod + def from_json(cls, data: dict, objects_manager: ObjectsManager): + return cls( + StreamType(data.get("type")), + objects_manager.get_or_create_object(data.get("handle")), + ) + + def to_json(self) -> dict: + """ + Convert the object stream to a dictionary. + """ + return { + "type": self.type.value, + "handle": self.object.handle, + } + + +class ObjectReadStream(ObjectStream): + def __init__(self, object: Object): + super().__init__(StreamType.OBJECT_READ, object) + + +class ObjectWriteStream(ObjectStream): + def __init__(self, object: Object): + super().__init__(StreamType.OBJECT_WRITE, object) + + +class PipeStream(Stream): + def __init__(self, type: StreamType, pipe_index: int): + if type not in (StreamType.PIPE_READ, StreamType.PIPE_WRITE): + raise ValueError("Invalid stream type for PipeStream") + super().__init__(type) + self.pipe_index = pipe_index + + @classmethod + def from_json(cls, data: dict): + return cls(StreamType(data.get("type")), data.get("pipe")) + + def to_json(self) -> dict: + """ + Convert the pipe stream to a dictionary. + """ + return { + "type": self.type.value, + "pipe": self.pipe_index, + } + + +class PipeReadStream(PipeStream): + def __init__(self, pipe_index: int): + super().__init__(StreamType.PIPE_READ, pipe_index) + + +class PipeWriteStream(PipeStream): + def __init__(self, pipe_index: int): + super().__init__(StreamType.PIPE_WRITE, pipe_index) diff --git a/src/sio3pack/workflow/tasks.py b/src/sio3pack/workflow/tasks.py index 8bb497b..2494449 100644 --- a/src/sio3pack/workflow/tasks.py +++ b/src/sio3pack/workflow/tasks.py @@ -1,6 +1,6 @@ +from sio3pack.workflow.execution.channels import Channel from sio3pack.workflow.execution.filesystems import Filesystem, FilesystemManager from sio3pack.workflow.execution.mount_namespace import MountNamespace, MountNamespaceManager -from sio3pack.workflow.execution.pipe import Pipe from sio3pack.workflow.execution.process import Process from sio3pack.workflow.execution.resource_group import ResourceGroup, ResourceGroupManager @@ -51,11 +51,10 @@ def __init__( hard_time_limit: int = None, extra_limit: int = None, output_register: int = None, - input_register: int = None, pid_namespaces: int = 1, processes: list[Process] = None, - pipes: list[Pipe] = None, - system_pipes: int = 3, + pipes: int = 0, + channels: list[Channel] = None ): """ Create a new execution task. @@ -67,11 +66,9 @@ def __init__( :param extra_limit: If set, the hard_time_limit for the task will be the maximum time limit of all resource groups plus this value. :param output_register: The output register of the task. - :param input_register: The input register of the task. TODO delete :param pid_namespaces: The number of PID namespaces. :param processes: The processes of the task. - :param pipes: The pipes of the task. - :param system_pipes: The number of system pipes. + :param pipes: The number of pipes. """ self.name = name self.workflow = workflow @@ -79,12 +76,11 @@ def __init__( if hard_time_limit is not None: self.hard_time_limit = hard_time_limit self.output_register = output_register - self.input_register = input_register self.pid_namespaces = pid_namespaces self.processes = processes or [] - self.system_pipes = system_pipes - self.pipes = [Pipe.from_json(pipe) for pipe in pipes] if pipes else [] + self.pipes = pipes self.extra_limit = extra_limit + self.channels = channels or [] self.filesystem_manager = FilesystemManager(self) self.mountnamespace_manager = MountNamespaceManager(self, self.filesystem_manager) @@ -98,22 +94,24 @@ def from_json(cls, data: dict, workflow: "Workflow"): :param dict data: The dictionary to create the task from. :param Workflow workflow: The workflow the task belongs to. """ + channels = [] + for channel in data.get("channels", []): + channels.append(Channel.from_json(channel)) task = cls( data["name"], workflow, data["exclusive"], data.get("hard_time_limit"), output_register=data.get("output_register"), - input_register=data.get("input_register"), pid_namespaces=data["pid_namespaces"], - pipes=data["pipes"], - system_pipes=data.get("system_pipes", 3), + pipes=int(data["pipes"]), + channels=channels, ) task.filesystem_manager.from_json(data["filesystems"], workflow) task.mountnamespace_manager.from_json(data["mount_namespaces"]) task.resource_group_manager.from_json(data["resource_groups"]) task.processes = [ - Process.from_json(process, task.mountnamespace_manager, task.resource_group_manager) + Process.from_json(process, workflow, task) for process in data["processes"] ] return task @@ -134,19 +132,17 @@ def to_json(self) -> dict: res = { "name": self.name, "type": "execution", + "channels": [channel.to_json() for channel in self.channels], "exclusive": self.exclusive, "hard_time_limit": hard_time_limit, "output_register": self.output_register, "pid_namespaces": self.pid_namespaces, "filesystems": self.filesystem_manager.to_json(), "mount_namespaces": self.mountnamespace_manager.to_json(), - "pipes": [pipe.to_json() for pipe in self.pipes], - "system_pipes": self.system_pipes, + "pipes": self.pipes, "resource_groups": self.resource_group_manager.to_json(), "processes": [process.to_json() for process in self.processes], } - if self.input_register is not None: - res["input_register"] = self.input_register return res def add_filesystem(self, filesystem: Filesystem): From d34ebd5a7d3259c8b7819cc4b7b76b8cc750b1a0 Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 5 Apr 2025 13:12:11 +0200 Subject: [PATCH 02/11] Add objects to script task --- src/sio3pack/workflow/tasks.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/sio3pack/workflow/tasks.py b/src/sio3pack/workflow/tasks.py index 2494449..2c48f68 100644 --- a/src/sio3pack/workflow/tasks.py +++ b/src/sio3pack/workflow/tasks.py @@ -1,3 +1,4 @@ +from sio3pack.workflow import Object from sio3pack.workflow.execution.channels import Channel from sio3pack.workflow.execution.filesystems import Filesystem, FilesystemManager from sio3pack.workflow.execution.mount_namespace import MountNamespace, MountNamespaceManager @@ -190,6 +191,7 @@ def __init__( reactive: bool = False, input_registers: list[int] = None, output_registers: list[int] = None, + objects: list[Object] = None, script: str = None, ): """ @@ -206,6 +208,7 @@ def __init__( self.reactive = reactive self.input_registers = input_registers or [] self.output_registers = output_registers or [] + self.objects = objects or [] self.script = script def __str__(self): @@ -220,7 +223,7 @@ def from_json(cls, data: dict, workflow: "Workflow"): :param workflow: The workflow the task belongs to. """ return cls( - data["name"], workflow, data["reactive"], data["input_registers"], data["output_registers"], data["script"] + data["name"], workflow, data["reactive"], data["input_registers"], data["output_registers"], [workflow.objects_manager.get_or_create_object(obj) for obj in data.get("objects", [])], data["script"], ) def to_json(self) -> dict: @@ -229,7 +232,7 @@ def to_json(self) -> dict: :return: The dictionary representation of the task. """ - return { + res = { "name": self.name, "type": "script", "reactive": self.reactive, @@ -237,3 +240,6 @@ def to_json(self) -> dict: "output_registers": self.output_registers, "script": self.script, } + if self.objects: + res["objects"] = [obj.handle for obj in self.objects] + return res From e96948751ad6dadba8cb0b723ed9599ac62ce5aa Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 5 Apr 2025 13:12:28 +0200 Subject: [PATCH 03/11] Add example workflow with run of one test --- example_workflows/simple_run.json | 224 ++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 example_workflows/simple_run.json diff --git a/example_workflows/simple_run.json b/example_workflows/simple_run.json new file mode 100644 index 0000000..bb55d49 --- /dev/null +++ b/example_workflows/simple_run.json @@ -0,0 +1,224 @@ +{ + "name": "simple run", + "external_objects": [ + "chk", + "prog", + "abc0.in", + "abc0.out" + ], + "observable_objects": [], + "observable_registers": 1, + "registers": 6, + "tasks": [ + { + "type": "execution", + "name": "Run program on test 0", + "channels": [], + "exclusive": true, + "filesystems": [ + { + "type": "object", + "handle": "prog" + }, + { + "type": "empty" + } + ], + "hard_time_limit": 2137, + "mount_namespaces": [ + { + "root": 0, + "mountpoints": [ + { + "source": 0, + "target": "/exe", + "writable": false + } + ] + } + ], + "output_register": 1, + "pid_namespaces": 1, + "pipes": 0, + "processes": [ + { + "arguments": [ + "/exe" + ], + "descriptors": { + "0": { + "type": "object_read", + "handle": "abc0.in" + }, + "1": { + "type": "object_write", + "handle": "user_out0.out" + } + }, + "environment": [ + "TERM=xterm-256color" + ], + "image": "/exe", + "resource_group": 0, + "mount_namespace": 0, + "pid_namespace": 0, + "working_directory": "/" + } + ], + "resource_groups": [ + { + "cpu_usage_limit": 21.37, + "instruction_limit": 2137000000, + "memory_limit": 2147483648, + "oom_terminate_all_tasks": false, + "pid_limit": 8, + "swap_limit": 4, + "time_limit": 2137000000 + } + ] + }, + { + "name": "Check worker output on test 0", + "type": "script", + "input_registers": [ + 1 + ], + "output_registers": [ + 2 + ], + "reactive": false, + "script": "# Check if process exited with code 0 and didn't break limits" + }, + { + "name": "Run checker on test 0", + "type": "execution", + "channels": [], + "exclusive": false, + "filesystems": [ + { + "type": "object", + "handle": "chk" + }, + { + "type": "object", + "handle": "abc0.in" + }, + { + "type": "object", + "handle": "abc0.out" + }, + { + "type": "object", + "handle": "user_out0.out" + }, + { + "type": "empty" + } + ], + "hard_time_limit": 2137, + "mount_namespaces": [ + { + "root": 0, + "mountpoints": [ + { + "source": 0, + "target": "/exe", + "writable": false + }, + { + "source": 1, + "target": "/input", + "writable": false + }, + { + "source": 2, + "target": "/output", + "writable": false + }, + { + "source": 3, + "target": "/user_out", + "writable": false + } + ] + } + ], + "output_register": 3, + "pid_namespaces": 1, + "pipes": 0, + "processes": [ + { + "arguments": [ + "/exe", + "/input", + "/output", + "/user_out" + ], + "descriptors": { + "1": { + "type": "object_write", + "handle": "chk_out0.out" + } + }, + "environment": [ + "TERM=xterm-256color" + ], + "image": "/exe", + "resource_group": 0, + "mount_namespace": 0, + "pid_namespace": 0, + "working_directory": "/" + } + ], + "resource_groups": [ + { + "cpu_usage_limit": 21.37, + "instruction_limit": 2137000000, + "memory_limit": 2147483648, + "oom_terminate_all_tasks": false, + "pid_limit": 8, + "swap_limit": 4, + "time_limit": 2137000000 + } + ] + }, + { + "name": "Grade test 0", + "type": "script", + "input_registers": [ + 2, + 3 + ], + "output_registers": [ + 4 + ], + "reactive": true, + "objects": ["chk_out0.out"], + "script": "# grade test. reactive because if program failed, we can skip checker" + }, + { + "name": "Grade group 0", + "type": "script", + "input_registers": [ + 4 + ], + "output_registers": [ + 5 + ], + "reactive": true, + "script": "# grade group. reactive because if one test failed, we can skip grading group" + }, + { + "name": "Grade solution", + "type": "script", + "input_registers": [ + 5 + ], + "output_registers": [ + 0 + ], + "reactive": false, + "script": "# grade solution" + } + ] +} From 9c01674ff56d8299cc2afb364fc45473fa67ecbb Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 5 Apr 2025 14:17:00 +0200 Subject: [PATCH 04/11] Update visualizer to handle new fileds --- example_workflows/simple_run.json | 210 ++++++++++++++++++++++++--- src/sio3pack/visualizer/cytoscope.py | 91 +++++++++++- 2 files changed, 272 insertions(+), 29 deletions(-) diff --git a/example_workflows/simple_run.json b/example_workflows/simple_run.json index bb55d49..92229cd 100644 --- a/example_workflows/simple_run.json +++ b/example_workflows/simple_run.json @@ -1,14 +1,16 @@ { - "name": "simple run", + "name": "Example workflow for grading a solution on two tests with checker", "external_objects": [ "chk", "prog", "abc0.in", - "abc0.out" + "abc0a.in", + "abc0.out", + "abc0a.out" ], "observable_objects": [], "observable_registers": 1, - "registers": 6, + "registers": 8, "tasks": [ { "type": "execution", @@ -77,18 +79,6 @@ } ] }, - { - "name": "Check worker output on test 0", - "type": "script", - "input_registers": [ - 1 - ], - "output_registers": [ - 2 - ], - "reactive": false, - "script": "# Check if process exited with code 0 and didn't break limits" - }, { "name": "Run checker on test 0", "type": "execution", @@ -143,7 +133,7 @@ ] } ], - "output_register": 3, + "output_register": 2, "pid_namespaces": 1, "pipes": 0, "processes": [ @@ -186,24 +176,198 @@ "name": "Grade test 0", "type": "script", "input_registers": [ - 2, - 3 + 1, + 2 ], "output_registers": [ - 4 + 3 ], "reactive": true, "objects": ["chk_out0.out"], - "script": "# grade test. reactive because if program failed, we can skip checker" + "script": "# grade test. check worker output for program and checker stdout. reactive because if program failed, we can skip checker" + }, + { + "type": "execution", + "name": "Run program on test 0a", + "channels": [], + "exclusive": true, + "filesystems": [ + { + "type": "object", + "handle": "prog" + }, + { + "type": "empty" + } + ], + "hard_time_limit": 2137, + "mount_namespaces": [ + { + "root": 0, + "mountpoints": [ + { + "source": 0, + "target": "/exe", + "writable": false + } + ] + } + ], + "output_register": 4, + "pid_namespaces": 1, + "pipes": 0, + "processes": [ + { + "arguments": [ + "/exe" + ], + "descriptors": { + "0": { + "type": "object_read", + "handle": "abc0a.in" + }, + "1": { + "type": "object_write", + "handle": "user_out0a.out" + } + }, + "environment": [ + "TERM=xterm-256color" + ], + "image": "/exe", + "resource_group": 0, + "mount_namespace": 0, + "pid_namespace": 0, + "working_directory": "/" + } + ], + "resource_groups": [ + { + "cpu_usage_limit": 21.37, + "instruction_limit": 2137000000, + "memory_limit": 2147483648, + "oom_terminate_all_tasks": false, + "pid_limit": 8, + "swap_limit": 4, + "time_limit": 2137000000 + } + ] + }, + { + "name": "Run checker on test 0a", + "type": "execution", + "channels": [], + "exclusive": false, + "filesystems": [ + { + "type": "object", + "handle": "chk" + }, + { + "type": "object", + "handle": "abc0a.in" + }, + { + "type": "object", + "handle": "abc0a.out" + }, + { + "type": "object", + "handle": "user_out0a.out" + }, + { + "type": "empty" + } + ], + "hard_time_limit": 2137, + "mount_namespaces": [ + { + "root": 0, + "mountpoints": [ + { + "source": 0, + "target": "/exe", + "writable": false + }, + { + "source": 1, + "target": "/input", + "writable": false + }, + { + "source": 2, + "target": "/output", + "writable": false + }, + { + "source": 3, + "target": "/user_out", + "writable": false + } + ] + } + ], + "output_register": 5, + "pid_namespaces": 1, + "pipes": 0, + "processes": [ + { + "arguments": [ + "/exe", + "/input", + "/output", + "/user_out" + ], + "descriptors": { + "1": { + "type": "object_write", + "handle": "chk_out0a.out" + } + }, + "environment": [ + "TERM=xterm-256color" + ], + "image": "/exe", + "resource_group": 0, + "mount_namespace": 0, + "pid_namespace": 0, + "working_directory": "/" + } + ], + "resource_groups": [ + { + "cpu_usage_limit": 21.37, + "instruction_limit": 2137000000, + "memory_limit": 2147483648, + "oom_terminate_all_tasks": false, + "pid_limit": 8, + "swap_limit": 4, + "time_limit": 2137000000 + } + ] + }, + { + "name": "Grade test 0a", + "type": "script", + "input_registers": [ + 4, + 5 + ], + "output_registers": [ + 6 + ], + "reactive": true, + "objects": ["chk_out0a.out"], + "script": "# grade test. check worker output for program and checker stdout. reactive because if program failed, we can skip checker" }, { "name": "Grade group 0", "type": "script", "input_registers": [ - 4 + 3, 6 ], "output_registers": [ - 5 + 7 ], "reactive": true, "script": "# grade group. reactive because if one test failed, we can skip grading group" @@ -212,7 +376,7 @@ "name": "Grade solution", "type": "script", "input_registers": [ - 5 + 7 ], "output_registers": [ 0 diff --git a/src/sio3pack/visualizer/cytoscope.py b/src/sio3pack/visualizer/cytoscope.py index 15d1f1c..7241ea6 100644 --- a/src/sio3pack/visualizer/cytoscope.py +++ b/src/sio3pack/visualizer/cytoscope.py @@ -1,7 +1,35 @@ def get_elements(graph): elements = [] ins = {} + ins_objects = {} rendered_registers = set() + observable_objects = graph["observable_objects"] + external_objects = graph["external_objects"] + + # Create node for input object storage and output object storage + if len(external_objects) > 0: + elements.append({ + "data": { + "id": "input_storage", + "label": "Input object storage", + "info": "This is the object storage. Edges from this node are input object (external_objects) for the workflow." + }, + "classes": "input_storage", + }) + if len(observable_objects) > 0: + elements.append({ + "data": { + "id": "output_storage", + "label": "Output object storage", + "info": "This is the object storage. Edges to this node are output object (observable_objects) for the workflow." + } + }) + + # Mark observablr objects as inputs for output object storage + for object in observable_objects: + if object not in ins_objects: + ins_objects[object] = set() + ins_objects[object].add("output_storage") # Create nodes for observable registers. for register in range(graph["observable_registers"]): @@ -34,6 +62,12 @@ def get_elements(graph): if register not in ins: ins[register] = [] ins[register].append(id) + + # Handle objects + for object in task.get("objects", []): + if object not in ins_objects: + ins_objects[object] = set() + ins_objects[object].add(id) elif task["type"] == "execution": id = f"execution_{execution_i}" elements.append( @@ -45,12 +79,22 @@ def get_elements(graph): if task["exclusive"]: elements[-1]["classes"] += " exclusive" - # To delete, final spec is different - if "input_register" in task: - register = task["input_register"] - if register not in ins: - ins[register] = [] - ins[register].append(id) + # Handle objects in descriptors of processes for the task + for process in task["processes"]: + for _, stream in process["descriptors"].items(): + if stream["type"] == "object_read": + object = stream["handle"] + if object not in ins_objects: + ins_objects[object] = set() + ins_objects[object].add(id) + + # Handle objects in filesystems + for filesystem in task["filesystems"]: + if filesystem["type"] == "object": + object = filesystem["handle"] + if object not in ins_objects: + ins_objects[object] = set() + ins_objects[object].add(id) execution_i += 1 # Second pass to create edges. @@ -64,6 +108,7 @@ def get_elements(graph): else: raise + # Create edges from input registers to the task for register in registers: if register not in ins: elements.append( @@ -100,9 +145,43 @@ def get_elements(graph): if register not in rendered_registers: elements[-1]["data"]["label"] = f"via register {register}" + # Create edges for objects + if task["type"] == "script": + # Script tasks cant create objects + pass + elif task["type"] == "execution": + # Execution tasks can only create objects via descriptors + for process in task["processes"]: + for fd, stream in process["descriptors"].items(): + if stream["type"] == "object_write": + object = stream["handle"] + for id in ins_objects.get(object, []): + elements.append( + { + "data": { + "source": f"execution_{execution_i}", + "target": id, + "label": f"via object {object} from fd {fd}", + } + } + ) + if task["type"] == "script": script_i += 1 elif task["type"] == "execution": execution_i += 1 + # Create edges from input object storage to the task + for ext_object in external_objects: + for id in ins_objects.get(ext_object, []): + elements.append( + { + "data": { + "source": "input_storage", + "target": id, + "label": f"via object {ext_object}", + } + } + ) + return elements From 0259bbb22979032497c4c7c354694fcd4193c366 Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 5 Apr 2025 14:21:15 +0200 Subject: [PATCH 05/11] Style changes --- src/sio3pack/visualizer/__init__.py | 14 ++++++++++++++ src/sio3pack/visualizer/cytoscope.py | 7 +++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/sio3pack/visualizer/__init__.py b/src/sio3pack/visualizer/__init__.py index 859207e..290683e 100644 --- a/src/sio3pack/visualizer/__init__.py +++ b/src/sio3pack/visualizer/__init__.py @@ -164,6 +164,20 @@ def show_graph(n_clicks, value, contents): "background-color": "#ff4136", }, }, + { + "selector": "#input_storage", + "style": { + "shape": "rectangle", + "background-color": "#2ECC40", + } + }, + { + "selector": "#output_storage", + "style": { + "shape": "rectangle", + "background-color": "#FF851B", + } + }, ], ) return ( diff --git a/src/sio3pack/visualizer/cytoscope.py b/src/sio3pack/visualizer/cytoscope.py index 7241ea6..8951492 100644 --- a/src/sio3pack/visualizer/cytoscope.py +++ b/src/sio3pack/visualizer/cytoscope.py @@ -12,16 +12,15 @@ def get_elements(graph): "data": { "id": "input_storage", "label": "Input object storage", - "info": "This is the object storage. Edges from this node are input object (external_objects) for the workflow." - }, - "classes": "input_storage", + "info": "This is the object storage.\nEdges from this node are input object\n(external_objects) for the workflow." + } }) if len(observable_objects) > 0: elements.append({ "data": { "id": "output_storage", "label": "Output object storage", - "info": "This is the object storage. Edges to this node are output object (observable_objects) for the workflow." + "info": "This is the object storage.\n Edges to this node are output object\n(observable_objects) for the workflow." } }) From ddb7be354b2be1ec473460052f80fc1d27da3860 Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 5 Apr 2025 14:22:07 +0200 Subject: [PATCH 06/11] Formatter --- src/sio3pack/visualizer/__init__.py | 4 +-- src/sio3pack/visualizer/cytoscope.py | 28 +++++++++++-------- src/sio3pack/workflow/execution/channels.py | 4 ++- .../workflow/execution/mount_namespace.py | 4 ++- src/sio3pack/workflow/execution/process.py | 4 +-- src/sio3pack/workflow/execution/stream.py | 2 +- src/sio3pack/workflow/tasks.py | 15 ++++++---- 7 files changed, 35 insertions(+), 26 deletions(-) diff --git a/src/sio3pack/visualizer/__init__.py b/src/sio3pack/visualizer/__init__.py index 290683e..41754a5 100644 --- a/src/sio3pack/visualizer/__init__.py +++ b/src/sio3pack/visualizer/__init__.py @@ -169,14 +169,14 @@ def show_graph(n_clicks, value, contents): "style": { "shape": "rectangle", "background-color": "#2ECC40", - } + }, }, { "selector": "#output_storage", "style": { "shape": "rectangle", "background-color": "#FF851B", - } + }, }, ], ) diff --git a/src/sio3pack/visualizer/cytoscope.py b/src/sio3pack/visualizer/cytoscope.py index 8951492..371ec72 100644 --- a/src/sio3pack/visualizer/cytoscope.py +++ b/src/sio3pack/visualizer/cytoscope.py @@ -8,21 +8,25 @@ def get_elements(graph): # Create node for input object storage and output object storage if len(external_objects) > 0: - elements.append({ - "data": { - "id": "input_storage", - "label": "Input object storage", - "info": "This is the object storage.\nEdges from this node are input object\n(external_objects) for the workflow." + elements.append( + { + "data": { + "id": "input_storage", + "label": "Input object storage", + "info": "This is the object storage.\nEdges from this node are input object\n(external_objects) for the workflow.", + } } - }) + ) if len(observable_objects) > 0: - elements.append({ - "data": { - "id": "output_storage", - "label": "Output object storage", - "info": "This is the object storage.\n Edges to this node are output object\n(observable_objects) for the workflow." + elements.append( + { + "data": { + "id": "output_storage", + "label": "Output object storage", + "info": "This is the object storage.\n Edges to this node are output object\n(observable_objects) for the workflow.", + } } - }) + ) # Mark observablr objects as inputs for output object storage for object in observable_objects: diff --git a/src/sio3pack/workflow/execution/channels.py b/src/sio3pack/workflow/execution/channels.py index f9a647c..4bf6285 100644 --- a/src/sio3pack/workflow/execution/channels.py +++ b/src/sio3pack/workflow/execution/channels.py @@ -1,5 +1,7 @@ class Channel: - def __init__(self, buffer_size: int, source_pipe: int, target_pipe: int, file_buffer_size: int = None, limit: int = None): + def __init__( + self, buffer_size: int, source_pipe: int, target_pipe: int, file_buffer_size: int = None, limit: int = None + ): """ Create a new channel. :param buffer_size: The size of the buffer. diff --git a/src/sio3pack/workflow/execution/mount_namespace.py b/src/sio3pack/workflow/execution/mount_namespace.py index a92209d..72b1dff 100644 --- a/src/sio3pack/workflow/execution/mount_namespace.py +++ b/src/sio3pack/workflow/execution/mount_namespace.py @@ -17,7 +17,9 @@ def from_json(cls, data: dict, filesystem_manager: FilesystemManager): """ if isinstance(data["source"], str): # TODO: delete this return cls(data["source"], data["target"], data["writable"]) - return cls(filesystem_manager.get_by_id(int(data["source"])), data["target"], data["writable"], data.get("capacity")) + return cls( + filesystem_manager.get_by_id(int(data["source"])), data["target"], data["writable"], data.get("capacity") + ) def to_json(self) -> dict: """ diff --git a/src/sio3pack/workflow/execution/process.py b/src/sio3pack/workflow/execution/process.py index 094dc0e..4925dbf 100644 --- a/src/sio3pack/workflow/execution/process.py +++ b/src/sio3pack/workflow/execution/process.py @@ -49,9 +49,7 @@ def to_json(self) -> dict: } @classmethod - def from_json( - cls, data: dict, workflow: "Workflow", task: "Task" - ): + def from_json(cls, data: dict, workflow: "Workflow", task: "Task"): env = {} for var in data["environment"]: key, value = var.split("=", 1) diff --git a/src/sio3pack/workflow/execution/stream.py b/src/sio3pack/workflow/execution/stream.py index 77aab5d..68c8697 100644 --- a/src/sio3pack/workflow/execution/stream.py +++ b/src/sio3pack/workflow/execution/stream.py @@ -116,7 +116,7 @@ def __init__(self, object: Object): class ObjectWriteStream(ObjectStream): def __init__(self, object: Object): - super().__init__(StreamType.OBJECT_WRITE, object) + super().__init__(StreamType.OBJECT_WRITE, object) class PipeStream(Stream): diff --git a/src/sio3pack/workflow/tasks.py b/src/sio3pack/workflow/tasks.py index 2c48f68..b2ad654 100644 --- a/src/sio3pack/workflow/tasks.py +++ b/src/sio3pack/workflow/tasks.py @@ -55,7 +55,7 @@ def __init__( pid_namespaces: int = 1, processes: list[Process] = None, pipes: int = 0, - channels: list[Channel] = None + channels: list[Channel] = None, ): """ Create a new execution task. @@ -111,10 +111,7 @@ def from_json(cls, data: dict, workflow: "Workflow"): task.filesystem_manager.from_json(data["filesystems"], workflow) task.mountnamespace_manager.from_json(data["mount_namespaces"]) task.resource_group_manager.from_json(data["resource_groups"]) - task.processes = [ - Process.from_json(process, workflow, task) - for process in data["processes"] - ] + task.processes = [Process.from_json(process, workflow, task) for process in data["processes"]] return task def to_json(self) -> dict: @@ -223,7 +220,13 @@ def from_json(cls, data: dict, workflow: "Workflow"): :param workflow: The workflow the task belongs to. """ return cls( - data["name"], workflow, data["reactive"], data["input_registers"], data["output_registers"], [workflow.objects_manager.get_or_create_object(obj) for obj in data.get("objects", [])], data["script"], + data["name"], + workflow, + data["reactive"], + data["input_registers"], + data["output_registers"], + [workflow.objects_manager.get_or_create_object(obj) for obj in data.get("objects", [])], + data["script"], ) def to_json(self) -> dict: From c00dd176ed1532a199f6bcf45577c389d8cdb1e9 Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 5 Apr 2025 14:43:59 +0200 Subject: [PATCH 07/11] Small refactor, add doc strings --- src/sio3pack/workflow/execution/__init__.py | 3 + src/sio3pack/workflow/execution/channels.py | 37 ++++- .../workflow/execution/descriptors.py | 33 +++- .../workflow/execution/filesystems.py | 15 +- .../workflow/execution/mount_namespace.py | 26 ++- src/sio3pack/workflow/execution/process.py | 2 +- src/sio3pack/workflow/execution/stream.py | 152 +++++++++++++++++- src/sio3pack/workflow/tasks.py | 8 +- 8 files changed, 245 insertions(+), 31 deletions(-) diff --git a/src/sio3pack/workflow/execution/__init__.py b/src/sio3pack/workflow/execution/__init__.py index f817d34..dc2a7a3 100644 --- a/src/sio3pack/workflow/execution/__init__.py +++ b/src/sio3pack/workflow/execution/__init__.py @@ -1,4 +1,7 @@ +from sio3pack.workflow.execution.channels import Channel +from sio3pack.workflow.execution.descriptors import DescriptorManager from sio3pack.workflow.execution.filesystems import Filesystem from sio3pack.workflow.execution.mount_namespace import MountNamespace from sio3pack.workflow.execution.process import Process from sio3pack.workflow.execution.resource_group import ResourceGroup +from sio3pack.workflow.execution.stream import * diff --git a/src/sio3pack/workflow/execution/channels.py b/src/sio3pack/workflow/execution/channels.py index 4bf6285..cebfcf5 100644 --- a/src/sio3pack/workflow/execution/channels.py +++ b/src/sio3pack/workflow/execution/channels.py @@ -1,15 +1,32 @@ class Channel: + """ + A configuration of a channel. A channel is a connection between two pipes. + + :param int buffer_size: The maximum amount of data stored in the channel that has been written by + the writer, but not yet read by the reader. This value must be positive. + :param int source_pipe: The pipe this channel will be reading from. + :param int target_pipe: The pipe this channel will be writing to. + :param int file_buffer_size: Controls whether this channel is backed by a file on the disk. + A larger buffer may then be allocated on the disk. + :param int limit: Limits the maximum amount of data sent through the channel. + """ + def __init__( self, buffer_size: int, source_pipe: int, target_pipe: int, file_buffer_size: int = None, limit: int = None ): """ - Create a new channel. - :param buffer_size: The size of the buffer. - :param source_pipe: The source pipe index. - :param target_pipe: The target pipe index. - :param file_buffer_size: The size of the file buffer. - :param limit: The limit of the channel. + A configuration of a channel. A channel is a connection between two pipes. + + :param int buffer_size: The maximum amount of data stored in the channel that has been written by + the writer, but not yet read by the reader. This value must be positive. + :param int source_pipe: The pipe this channel will be reading from. + :param int target_pipe: The pipe this channel will be writing to. + :param int file_buffer_size: Controls whether this channel is backed by a file on the disk. + A larger buffer may then be allocated on the disk. + :param int limit: Limits the maximum amount of data sent through the channel. """ + if buffer_size <= 0: + raise ValueError("Buffer size must be positive") self.buffer_size = buffer_size self.source_pipe = source_pipe self.target_pipe = target_pipe @@ -20,8 +37,10 @@ def __init__( def from_json(cls, data: dict) -> "Channel": """ Create a new channel from a dictionary. - :param data: The dictionary to create the channel from. + + :param dict data: The dictionary to create the channel from. """ + return cls( data["buffer_size"], data["source_pipe"], @@ -31,6 +50,10 @@ def from_json(cls, data: dict) -> "Channel": ) def to_json(self) -> dict: + """ + Convert the channel to a dictionary. + """ + res = { "buffer_size": self.buffer_size, "source_pipe": self.source_pipe, diff --git a/src/sio3pack/workflow/execution/descriptors.py b/src/sio3pack/workflow/execution/descriptors.py index fd22fa6..0a5af57 100644 --- a/src/sio3pack/workflow/execution/descriptors.py +++ b/src/sio3pack/workflow/execution/descriptors.py @@ -1,30 +1,47 @@ -from sio3pack.workflow.execution.filesystems import FilesystemManager from sio3pack.workflow.execution.stream import Stream -from sio3pack.workflow.object import ObjectsManager class DescriptorManager: - def __init__(self, objects_manager: ObjectsManager, filesystem_manager: FilesystemManager): + """ + A class to manage file descriptors and their associated streams. + + :param ObjectsManager objects_manager: The objects manager. + :param FilesystemManager filesystem_manager: The filesystem manager. + """ + + def __init__(self, objects_manager: "ObjectsManager", filesystem_manager: "FilesystemManager"): + """ + Initialize the descriptor manager. + + :param ObjectsManager objects_manager: The objects manager. + :param FilesystemManager filesystem_manager: The filesystem manager. + """ + self.objects_manager = objects_manager self.filesystem_manager = filesystem_manager self.descriptors: dict[int, Stream] = {} def add_descriptor(self, fd: int, stream: Stream): """ Add a stream to the descriptor manager. - :param fd: The file descriptor. - :param stream: The stream to add. + + :param int fd: The file descriptor. + :param Stream stream: The stream to add. """ self.descriptors[int(fd)] = stream - def from_json(self, data: dict, objects_manager: ObjectsManager, filesystem_manager: FilesystemManager): + def from_json(self, data: dict): + """ + Load the descriptor manager from a JSON-serializable dictionary. + + :param dict data: The JSON-serializable dictionary to load from. + """ for fd, stream_data in data.items(): - stream = Stream.from_json(stream_data, objects_manager, filesystem_manager) + stream = Stream.from_json(stream_data, self.objects_manager, self.filesystem_manager) self.add_descriptor(int(fd), stream) def to_json(self) -> dict: """ Convert the descriptor manager to a JSON-serializable dictionary. - :return: The JSON-serializable dictionary. """ # Convert the fd numbers to strings, since in JSON keys cant be ints. return {str(fd): stream.to_json() for fd, stream in self.descriptors.items()} diff --git a/src/sio3pack/workflow/execution/filesystems.py b/src/sio3pack/workflow/execution/filesystems.py index b7ef77b..9e02257 100644 --- a/src/sio3pack/workflow/execution/filesystems.py +++ b/src/sio3pack/workflow/execution/filesystems.py @@ -143,10 +143,18 @@ def to_json(self) -> dict: class FilesystemManager: + """ + A class to manage filesystems. + + :param Task task: The task the filesystem manager belongs to. + :param list[Filesystem] filesystems: The list of filesystems. + """ + def __init__(self, task: "Task"): """ Create a new filesystem manager. - :param task: The task the filesystem manager belongs to. + + :param Task task: The task the filesystem manager belongs to. """ self.filesystems: list[Filesystem] = [] self.id = 0 @@ -155,8 +163,9 @@ def __init__(self, task: "Task"): def from_json(self, data: list[dict], workflow: "Workflow"): """ Create filesystems from a list of dictionaries. - :param data: The list of dictionaries to create the filesystems from. - :param workflow: The workflow the filesystems belong to. + + :param list[dict] data: The list of dictionaries to create the filesystems from. + :param Workflow workflow: The workflow the filesystems belong to. """ for fs in data: if fs["type"] == "image": diff --git a/src/sio3pack/workflow/execution/mount_namespace.py b/src/sio3pack/workflow/execution/mount_namespace.py index 72b1dff..a5bceee 100644 --- a/src/sio3pack/workflow/execution/mount_namespace.py +++ b/src/sio3pack/workflow/execution/mount_namespace.py @@ -2,18 +2,36 @@ class Mountpoint: + """ + A class to represent a mountpoint. + + :param Filesystem source: The source filesystem. + :param str target: The target path in the filesystem. + :param bool writable: Whether the mountpoint is writable or not. + :param int capacity: The capacity of the mountpoint. If None, the capacity is unlimited. + """ + def __init__(self, source: Filesystem, target: str, writable: bool = False, capacity: int | None = None): + """ + Represent a mountpoint. + + :param source: The source filesystem. + :param target: The target path in the filesystem. + :param writable: Whether the mountpoint is writable or not. + :param capacity: The capacity of the mountpoint. If None, the capacity is unlimited. + """ self.source = source self.target = target self.writable = writable self.capacity = capacity @classmethod - def from_json(cls, data: dict, filesystem_manager: FilesystemManager): + def from_json(cls, data: dict, filesystem_manager: FilesystemManager) -> "Mountpoint": """ Create a new mountpoint from a dictionary. - :param data: The dictionary to create the mountpoint from. - :param filesystem_manager: The filesystem manager to use. + + :param dict data: The dictionary to create the mountpoint from. + :param FilesystemManager filesystem_manager: The filesystem manager to use. """ if isinstance(data["source"], str): # TODO: delete this return cls(data["source"], data["target"], data["writable"]) @@ -81,7 +99,7 @@ def from_json(self, data: list[dict]): """ Create a new mount namespace manager from a list of dictionaries. :param data: The list of dictionaries to create the mount namespace manager from. - :param filesystem_manager: The filesystem manager to use. + :param FilesystemManager filesystem_manager: The filesystem manager to use. """ for mount_namespace in data: self.add(MountNamespace.from_json(mount_namespace, self.id, self.filesystem_manager)) diff --git a/src/sio3pack/workflow/execution/process.py b/src/sio3pack/workflow/execution/process.py index 4925dbf..fe139fd 100644 --- a/src/sio3pack/workflow/execution/process.py +++ b/src/sio3pack/workflow/execution/process.py @@ -65,5 +65,5 @@ def from_json(cls, data: dict, workflow: "Workflow", task: "Task"): data["pid_namespace"], data["working_directory"], ) - process.descriptor_manager.from_json(data["descriptors"], workflow.objects_manager, task.filesystem_manager) + process.descriptor_manager.from_json(data["descriptors"]) return process diff --git a/src/sio3pack/workflow/execution/stream.py b/src/sio3pack/workflow/execution/stream.py index 68c8697..4831eef 100644 --- a/src/sio3pack/workflow/execution/stream.py +++ b/src/sio3pack/workflow/execution/stream.py @@ -7,6 +7,10 @@ class StreamType(Enum): + """ + Enum representing the type of stream. + """ + FILE = "file" NULL = "null" OBJECT_READ = "object_read" @@ -16,6 +20,10 @@ class StreamType(Enum): class FileMode(Enum): + """ + Enum representing the mode of a file stream. + """ + READ = "read" READ_WRTIE = "read_write" READ_WRITE_APPEND = "read_write_append" @@ -26,11 +34,30 @@ class FileMode(Enum): class Stream: + """ + Base class for all streams. + + :param StreamType type: The type of the stream. + """ + def __init__(self, type: StreamType): + """ + Initialize the stream. + + :param StreamType type: The type of the stream. + """ self.type = type @classmethod - def from_json(cls, data: dict, objects_manager: ObjectsManager, filesystem_manager: FilesystemManager): + def from_json(cls, data: dict, objects_manager: ObjectsManager, filesystem_manager: FilesystemManager) -> "Stream": + """ + Create a stream from a JSON-serializable dictionary. + + :param dict data: The JSON-serializable dictionary to create the stream from. + :param ObjectsManager objects_manager: The objects manager. + :param FilesystemManager filesystem_manager: The filesystem manager. + """ + type = StreamType(data.get("type")) if type == StreamType.FILE: return FileStream.from_json(filesystem_manager, data) @@ -48,6 +75,15 @@ def to_json(self) -> dict: class FileStream(Stream): + """ + Class representing a file stream. A file will be opened + and passed to the process as a file descriptor. + + :param Filesystem filesystem: The filesystem to use. + :param str path: The path to the file. + :param FileMode mode: The mode to open the file in. + """ + def __init__(self, filesystem: Filesystem, path: str, mode: FileMode): super().__init__(StreamType.FILE) self.filesystem = filesystem @@ -55,7 +91,13 @@ def __init__(self, filesystem: Filesystem, path: str, mode: FileMode): self.mode = mode @classmethod - def from_json(cls, filesystem_manager: FilesystemManager, data: dict): + def from_json(cls, filesystem_manager: FilesystemManager, data: dict) -> "FileStream": + """ + Create a file stream from a JSON-serializable dictionary. + + :param FilesystemManager filesystem_manager: The filesystem manager. + :param dict data: The JSON-serializable dictionary to create the file stream from. + """ return cls( filesystem_manager.get_by_id(data.get("filesystem")), data.get("path"), @@ -63,6 +105,12 @@ def from_json(cls, filesystem_manager: FilesystemManager, data: dict): ) def to_json(self) -> dict: + """ + Convert the file stream to a dictionary. + + :return: The dictionary representation of the file stream. + """ + return { "type": self.type.value, "filesystem": self.filesystem.id, @@ -72,20 +120,42 @@ def to_json(self) -> dict: class NullStream(Stream): + """ + Class representing a null stream. + """ + def __init__(self): super().__init__(StreamType.NULL) @classmethod - def from_json(cls, data: dict): + def from_json(cls, data: dict) -> "NullStream": + """ + Create a null stream from a JSON-serializable dictionary. + + :param dict data: The JSON-serializable dictionary to create the null stream from. + """ return cls() def to_json(self) -> dict: + """ + Convert the null stream to a dictionary. + + :return: The dictionary representation of the null stream. + """ return { "type": self.type.value, } class ObjectStream(Stream): + """ + A base class for object streams. An object stream is a stream that + reads or writes to an object via a file descriptor. + + :param StreamType type: The type of the stream. + :param Object object: The object to use. + """ + def __init__(self, type: StreamType, object: Object): if type not in (StreamType.OBJECT_READ, StreamType.OBJECT_WRITE): raise ValueError("Invalid stream type for ObjectStream") @@ -93,7 +163,12 @@ def __init__(self, type: StreamType, object: Object): self.object = object @classmethod - def from_json(cls, data: dict, objects_manager: ObjectsManager): + def from_json(cls, data: dict, objects_manager: ObjectsManager) -> "ObjectStream": + """ + Create an object stream from a JSON-serializable dictionary. + + :param dict data: The JSON-serializable dictionary to create the object stream from. + """ return cls( StreamType(data.get("type")), objects_manager.get_or_create_object(data.get("handle")), @@ -110,24 +185,67 @@ def to_json(self) -> dict: class ObjectReadStream(ObjectStream): + """ + Class representing an object read stream. An object read stream + is a stream that reads from an object via a file descriptor. + + :param Object object: The object to read from. + """ + def __init__(self, object: Object): + """ + Initialize the object read stream. + + :param Object object: The object to read from. + """ super().__init__(StreamType.OBJECT_READ, object) class ObjectWriteStream(ObjectStream): + """ + Class representing an object write stream. An object write stream + is a stream that writes to an object via a file descriptor. + + :param Object object: The object to write to. + """ + def __init__(self, object: Object): + """ + Initialize the object write stream. + + :param Object object: The object to write to. + """ super().__init__(StreamType.OBJECT_WRITE, object) class PipeStream(Stream): + """ + A base class for pipe streams. A pipe stream is a stream that + reads or writes to a pipe via a file descriptor. + + :param StreamType type: The type of the stream. + :param int pipe_index: The index of the pipe. + """ + def __init__(self, type: StreamType, pipe_index: int): + """ + Initialize the pipe stream. + + :param StreamType type: The type of the stream. + :param int pipe_index: The index of the pipe. + """ if type not in (StreamType.PIPE_READ, StreamType.PIPE_WRITE): raise ValueError("Invalid stream type for PipeStream") super().__init__(type) self.pipe_index = pipe_index @classmethod - def from_json(cls, data: dict): + def from_json(cls, data: dict) -> "PipeStream": + """ + Create a pipe stream from a JSON-serializable dictionary. + + :param dict data: The JSON-serializable dictionary to create the pipe stream from. + """ return cls(StreamType(data.get("type")), data.get("pipe")) def to_json(self) -> dict: @@ -141,10 +259,34 @@ def to_json(self) -> dict: class PipeReadStream(PipeStream): + """ + Class representing a pipe read stream. A pipe read stream + is a stream that reads from a pipe via a file descriptor. + + :param int pipe_index: The index of the pipe. + """ + def __init__(self, pipe_index: int): + """ + Initialize the pipe read stream. + + :param int pipe_index: The index of the pipe. + """ super().__init__(StreamType.PIPE_READ, pipe_index) class PipeWriteStream(PipeStream): + """ + Class representing a pipe write stream. A pipe write stream + is a stream that writes to a pipe via a file descriptor. + + :param int pipe_index: The index of the pipe. + """ + def __init__(self, pipe_index: int): + """ + Initialize the pipe write stream. + + :param int pipe_index: The index of the pipe. + """ super().__init__(StreamType.PIPE_WRITE, pipe_index) diff --git a/src/sio3pack/workflow/tasks.py b/src/sio3pack/workflow/tasks.py index b2ad654..cbe6029 100644 --- a/src/sio3pack/workflow/tasks.py +++ b/src/sio3pack/workflow/tasks.py @@ -37,11 +37,10 @@ class ExecutionTask(Task): :param int hard_time_limit: The hard time limit. :param int extra_limit: If set, the hard_time_limit for the task will be the maximum time limit of all resource groups plus this value. :param int output_register: The output register of the task. - :param int input_register: The input register of the task. TODO delete :param int pid_namespaces: The number of PID namespaces. :param list[Process] processes: The processes of the task. - :param list[Pipe] pipes: The pipes of the task. - :param int system_pipes: The number of system pipes. + :param int pipes: The number of pipes available to the task. + :param list[Channel] channels: Configuration of the channels for the task. """ def __init__( @@ -70,6 +69,7 @@ def __init__( :param pid_namespaces: The number of PID namespaces. :param processes: The processes of the task. :param pipes: The number of pipes. + :param channels: Configuration of the channels for the task. """ self.name = name self.workflow = workflow @@ -178,6 +178,7 @@ class ScriptTask(Task): :param bool reactive: Whether the task is reactive. :param list[int] input_registers: The input registers of the task. :param list[int] output_registers: The output registers of the task. + :param list[Object] objects: The objects the task uses. :param str script: The script to run. """ @@ -198,6 +199,7 @@ def __init__( :param reactive: Whether the task is reactive. :param input_registers: The input registers of the task. :param output_registers: The output registers of the task. + :param objects: The objects the task uses. :param script: The script to run. """ self.name = name From 497e1197cab4d3898954d9140119645544fff2ee Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 5 Apr 2025 14:47:10 +0200 Subject: [PATCH 08/11] Fix circular import --- src/sio3pack/workflow/execution/stream.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sio3pack/workflow/execution/stream.py b/src/sio3pack/workflow/execution/stream.py index 4831eef..95d462f 100644 --- a/src/sio3pack/workflow/execution/stream.py +++ b/src/sio3pack/workflow/execution/stream.py @@ -1,8 +1,7 @@ from enum import Enum from sio3pack.workflow import Object -from sio3pack.workflow.execution import Filesystem -from sio3pack.workflow.execution.filesystems import FilesystemManager +from sio3pack.workflow.execution.filesystems import FilesystemManager, Filesystem from sio3pack.workflow.object import ObjectsManager From 08acd5f3c0f8c10425ff1a95ebb2043827dd644c Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 5 Apr 2025 14:47:58 +0200 Subject: [PATCH 09/11] Remove old example workflow --- example_workflows/run.json | 889 ------------------------------------- 1 file changed, 889 deletions(-) delete mode 100644 example_workflows/run.json diff --git a/example_workflows/run.json b/example_workflows/run.json deleted file mode 100644 index 10d4da4..0000000 --- a/example_workflows/run.json +++ /dev/null @@ -1,889 +0,0 @@ -{ - "name": "Run the tests and grade the results", - "external_objects": [ - "abc0a.in", - "abc0b.in", - "abc1a.in", - "abc1b.in", - "abc2a.in", - "abc0a.out", - "abc0b.out", - "abc1a.out", - "abc1b.out", - "abc2a.out", - "abcchk.e", - "sol.e" - ], - "observable_objects": [], - "observable_registers": 1, - "registers": 19, - "tasks": [ - { - "type": "execution", - "name": "Run test abc0a", - "exclusive": true, - "filesystems": [ - { - "type": "object", - "handle": "abc0a.in" - }, - { - "type": "object", - "handle": "sol.e" - }, - { - "type": "empty" - } - ], - "hard_time_limit": 2137, - "mount_namespaces": [ - { - "mountpoints": [ - { - "source": 0, - "target": "/in", - "writable": false - }, - { - "source": 1, - "target": "/exe", - "writable": false - } - ], - "root": 0 - } - ], - "output_register": 1, - "pid_namespaces": 1, - "pipes" : [ - { - "buffer_size" : 1048576, - "file_buffer_size" : 1073741824, - "limit" : 2147483648 - } - ], - "processes" : [ - { - "arguments" : [ - "/exe", - "" - }, - { - "type": "script", - "name": "Check results for abc0b", - "reactive": false, - "input_registers": [7], - "output_registers": [12], - "script": "" - }, - { - "type": "script", - "name": "Check results for abc1a", - "reactive": false, - "input_registers": [8], - "output_registers": [13], - "script": "" - }, - { - "type": "script", - "name": "Check results for abc1b", - "reactive": false, - "input_registers": [9], - "output_registers": [14], - "script": "" - }, - { - "type": "script", - "name": "Check results for abc2a", - "reactive": false, - "input_registers": [10], - "output_registers": [15], - "script": "" - }, - { - "type": "script", - "name": "Grade group 0", - "reactive": true, - "input_registers": [11, 12], - "output_registers": [16], - "script": "" - }, - { - "type": "script", - "name": "Grade group 1", - "reactive": true, - "input_registers": [13, 14], - "output_registers": [17], - "script": "" - }, - { - "type": "script", - "name": "Grade group 2", - "reactive": true, - "input_registers": [15], - "output_registers": [18], - "script": "" - }, - { - "type": "script", - "name": "Grade all", - "reactive": false, - "input_registers": [16, 17, 18], - "output_registers": [0], - "script": "" - } - ] -} From 2d510f0addc00970335db39afecb24b792926b10 Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 5 Apr 2025 14:48:27 +0200 Subject: [PATCH 10/11] Run isort --- src/sio3pack/workflow/execution/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sio3pack/workflow/execution/stream.py b/src/sio3pack/workflow/execution/stream.py index 95d462f..e85a339 100644 --- a/src/sio3pack/workflow/execution/stream.py +++ b/src/sio3pack/workflow/execution/stream.py @@ -1,7 +1,7 @@ from enum import Enum from sio3pack.workflow import Object -from sio3pack.workflow.execution.filesystems import FilesystemManager, Filesystem +from sio3pack.workflow.execution.filesystems import Filesystem, FilesystemManager from sio3pack.workflow.object import ObjectsManager From 06dad75d04d6b86aa29c693e98d8bd9a4be1c84f Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 5 Apr 2025 14:52:13 +0200 Subject: [PATCH 11/11] Fix typos, remove unnecessary code --- src/sio3pack/visualizer/cytoscope.py | 2 +- src/sio3pack/workflow/execution/mount_namespace.py | 2 -- src/sio3pack/workflow/execution/stream.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/sio3pack/visualizer/cytoscope.py b/src/sio3pack/visualizer/cytoscope.py index 371ec72..56742d2 100644 --- a/src/sio3pack/visualizer/cytoscope.py +++ b/src/sio3pack/visualizer/cytoscope.py @@ -28,7 +28,7 @@ def get_elements(graph): } ) - # Mark observablr objects as inputs for output object storage + # Mark observable objects as inputs for output object storage for object in observable_objects: if object not in ins_objects: ins_objects[object] = set() diff --git a/src/sio3pack/workflow/execution/mount_namespace.py b/src/sio3pack/workflow/execution/mount_namespace.py index a5bceee..13d7975 100644 --- a/src/sio3pack/workflow/execution/mount_namespace.py +++ b/src/sio3pack/workflow/execution/mount_namespace.py @@ -33,8 +33,6 @@ def from_json(cls, data: dict, filesystem_manager: FilesystemManager) -> "Mountp :param dict data: The dictionary to create the mountpoint from. :param FilesystemManager filesystem_manager: The filesystem manager to use. """ - if isinstance(data["source"], str): # TODO: delete this - return cls(data["source"], data["target"], data["writable"]) return cls( filesystem_manager.get_by_id(int(data["source"])), data["target"], data["writable"], data.get("capacity") ) diff --git a/src/sio3pack/workflow/execution/stream.py b/src/sio3pack/workflow/execution/stream.py index e85a339..4b779ba 100644 --- a/src/sio3pack/workflow/execution/stream.py +++ b/src/sio3pack/workflow/execution/stream.py @@ -24,7 +24,7 @@ class FileMode(Enum): """ READ = "read" - READ_WRTIE = "read_write" + READ_WRITE = "read_write" READ_WRITE_APPEND = "read_write_append" READ_WRITE_TRUNCATE = "read_write_truncate" WRITE = "write"