diff --git a/README.md b/README.md index c0a87ba..fc52159 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ -Dyana is a sandbox environment using Docker and [Tracee](https://github.com/aquasecurity/tracee) for loading, running and profiling a wide range of files, including machine learning models, ELF executables, Pickle serialized files, Javascripts [and more](https://docs.dreadnode.io/open-source/dyana/topics/loaders). It provides detailed insights into GPU memory usage, filesystem interactions, network requests, and security related events. +Dyana is a sandbox environment using Docker and [Tracee](https://github.com/aquasecurity/tracee) for loading, running and profiling a wide range of files, including machine learning models, GGUF model files, ELF executables, Pickle serialized files, Javascripts [and more](https://docs.dreadnode.io/open-source/dyana/topics/loaders). It provides detailed insights into GPU memory usage, filesystem interactions, network requests, and security related events. ## Installation diff --git a/dyana/cli.py b/dyana/cli.py index 5eb92ff..47b6549 100644 --- a/dyana/cli.py +++ b/dyana/cli.py @@ -20,6 +20,7 @@ from dyana.view import ( view_disk_events, view_disk_usage, + view_extra, view_gpus, view_header, view_imports, @@ -139,7 +140,9 @@ def trace( except Exception as e: serr = str(e) if "could not select device driver" in serr and "capabilities: [[gpu]]" in serr: - rich_print(":cross_mark: [bold][red]error:[/] [red]GPUs are not available on this system, run with --no-gpu.[/]") + rich_print( + ":cross_mark: [bold][red]error:[/] [red]GPUs are not available on this system, run with --no-gpu.[/]" + ) else: rich_print(f":cross_mark: [bold][red]error:[/] [red]{e}[/]") @@ -187,3 +190,4 @@ def summary(trace_path: pathlib.Path = typer.Option(help="Path to the trace file view_legacy_extra(trace["run"]) else: view_imports(trace["run"]["stages"]) + view_extra(trace["run"]) diff --git a/dyana/loaders/gguf/.gitignore b/dyana/loaders/gguf/.gitignore new file mode 100644 index 0000000..3d1264e --- /dev/null +++ b/dyana/loaders/gguf/.gitignore @@ -0,0 +1,3 @@ +dyana.py +dyana-requirements.txt +dyana-requirements-gpu.txt diff --git a/dyana/loaders/gguf/Dockerfile b/dyana/loaders/gguf/Dockerfile new file mode 100644 index 0000000..02c691c --- /dev/null +++ b/dyana/loaders/gguf/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y build-essential +COPY dyana.py . +COPY dyana-requirements.txt . +RUN pip install --no-cache-dir --root-user-action=ignore -r dyana-requirements.txt + +COPY requirements.txt . +RUN pip install --no-cache-dir --root-user-action=ignore -r requirements.txt + +COPY main.py . + +ENTRYPOINT ["python3", "-W", "ignore", "main.py"] diff --git a/dyana/loaders/gguf/gguf_test.py b/dyana/loaders/gguf/gguf_test.py new file mode 100644 index 0000000..f5bf8c0 --- /dev/null +++ b/dyana/loaders/gguf/gguf_test.py @@ -0,0 +1,312 @@ +import typing as t +from unittest.mock import MagicMock, patch + +from dyana.loaders.gguf.main import ( + DANGEROUS_CALL_NAMES, + DANGEROUS_DUNDER_ATTRS, + DANGEROUS_FILTER_NAMES, + MALICIOUS_PATTERNS, + OBFUSCATION_PATTERNS, + SUSPICIOUS_PATTERNS, + _analyze_ast, + analyze_chat_template, +) +from dyana.loaders.loader import Loader + + +class TestGGUFLoaderSettings: + def test_loader_loads(self) -> None: + loader = Loader(name="gguf", build=False) + assert loader.name == "gguf" + + def test_gpu_disabled(self) -> None: + loader = Loader(name="gguf", build=False) + assert loader.settings is not None + assert loader.settings.gpu is False + + def test_arg_structure(self) -> None: + loader = Loader(name="gguf", build=False) + assert loader.settings is not None + assert loader.settings.args is not None + assert len(loader.settings.args) == 1 + assert loader.settings.args[0].name == "gguf" + assert loader.settings.args[0].required is True + assert loader.settings.args[0].volume is True + + +class TestAnalyzeChatTemplate: + """Tests for the combined regex + AST analysis.""" + + def test_clean_template(self) -> None: + template = "{% for message in messages %}{{ message.role }}: {{ message.content }}\n{% endfor %}" + findings = analyze_chat_template(template) + assert len(findings["errors"]) == 0 + assert len(findings["warnings"]) == 0 + + def test_malicious_class_access(self) -> None: + template = "{{ ''.__class__.__mro__[1].__subclasses__() }}" + findings = analyze_chat_template(template) + assert len(findings["errors"]) > 0 + assert any("__class__" in e for e in findings["errors"]) + + def test_malicious_attr_filter(self) -> None: + template = "{{ request|attr('application') }}" + findings = analyze_chat_template(template) + assert len(findings["errors"]) > 0 + assert any("attr filter" in e for e in findings["errors"]) + + def test_malicious_import(self) -> None: + template = "{% set x = import os %}" + findings = analyze_chat_template(template) + assert any("Import statement" in e for e in findings["errors"]) + + def test_malicious_eval(self) -> None: + template = "{{ eval('os.system(\"id\")') }}" + findings = analyze_chat_template(template) + assert any("eval()" in e for e in findings["errors"]) + + def test_malicious_exec(self) -> None: + template = "{{ exec('import os') }}" + findings = analyze_chat_template(template) + assert any("exec()" in e for e in findings["errors"]) + + def test_malicious_subprocess(self) -> None: + template = "{{ subprocess.check_output('id') }}" + findings = analyze_chat_template(template) + assert any("Subprocess" in e for e in findings["errors"]) + + def test_malicious_os_access(self) -> None: + template = "{{ os.system('id') }}" + findings = analyze_chat_template(template) + assert any("OS module" in e for e in findings["errors"]) + + def test_obfuscation_base64(self) -> None: + template = "{{ base64.b64decode('aW1wb3J0IG9z') }}" + findings = analyze_chat_template(template) + assert len(findings["warnings"]) > 0 + assert any("Base64" in w for w in findings["warnings"]) + + def test_obfuscation_hex_escape(self) -> None: + template = r"{{ '\x6f\x73' }}" + findings = analyze_chat_template(template) + assert any("Hex escape" in w for w in findings["warnings"]) + + def test_obfuscation_unicode_escape(self) -> None: + template = r"{{ '\u006f\u0073' }}" + findings = analyze_chat_template(template) + assert any("Unicode escape" in w for w in findings["warnings"]) + + def test_obfuscation_chr(self) -> None: + template = "{{ chr(111) + chr(115) }}" + findings = analyze_chat_template(template) + assert any("chr()" in w for w in findings["warnings"]) + + def test_obfuscation_string_concat(self) -> None: + template = "{{ 'os' + '.system' }}" + findings = analyze_chat_template(template) + assert any("String concatenation" in w for w in findings["warnings"]) + + def test_suspicious_conditional(self) -> None: + template = "{% if user.role == 'admin' %}secret{% endif %}" + findings = analyze_chat_template(template) + assert len(findings["info"]) > 0 + assert any("Conditional" in i for i in findings["info"]) + + def test_suspicious_loop(self) -> None: + template = "{% for i in range(100) %}{{ i }}{% endfor %}" + findings = analyze_chat_template(template) + assert any("Loop" in i for i in findings["info"]) + + def test_sandbox_validation_failure(self) -> None: + mock_env = MagicMock() + mock_env.parse.side_effect = Exception("Unexpected end of template") + mock_sandbox_cls = MagicMock(return_value=mock_env) + mock_sandbox = MagicMock() + mock_sandbox.SandboxedEnvironment = mock_sandbox_cls + mock_jinja2 = MagicMock() + mock_jinja2.sandbox = mock_sandbox + + with patch.dict("sys.modules", {"jinja2": mock_jinja2, "jinja2.sandbox": mock_sandbox}): + template = "{% if true %} no end" + findings = analyze_chat_template(template) + assert any("Sandbox validation failed" in e for e in findings["errors"]) + + def test_pattern_lists_not_empty(self) -> None: + assert len(MALICIOUS_PATTERNS) > 0 + assert len(OBFUSCATION_PATTERNS) > 0 + assert len(SUSPICIOUS_PATTERNS) > 0 + assert len(DANGEROUS_DUNDER_ATTRS) > 0 + assert len(DANGEROUS_FILTER_NAMES) > 0 + assert len(DANGEROUS_CALL_NAMES) > 0 + + def test_multiple_findings(self) -> None: + template = "{{ ''.__class__.__base__.__subclasses__() }}{{ eval('x') }}" + findings = analyze_chat_template(template) + assert len(findings["errors"]) >= 3 + + +class TestAnalyzeAST: + """Tests for AST-based analysis specifically, using mocked jinja2.""" + + def _mock_jinja2(self) -> tuple[MagicMock, MagicMock, type, type, type, type, type]: + """Create mock jinja2 modules that simulate real AST node types.""" + mock_nodes = MagicMock() + mock_sandbox = MagicMock() + + class FakeNode: + pass + + class FakeGetattr(FakeNode): + def __init__(self, attr: str, node: t.Any = None) -> None: + self.attr = attr + self.node = node + + class FakeCall(FakeNode): + def __init__(self, callee: t.Any) -> None: + self.node = callee + + class FakeName(FakeNode): + def __init__(self, name: str) -> None: + self.name = name + + class FakeFilter(FakeNode): + def __init__(self, name: str) -> None: + self.name = name + + mock_nodes.Node = FakeNode + mock_nodes.Getattr = FakeGetattr + mock_nodes.Call = FakeCall + mock_nodes.Name = FakeName + mock_nodes.Filter = FakeFilter + + return mock_nodes, mock_sandbox, FakeNode, FakeGetattr, FakeCall, FakeName, FakeFilter + + def test_ast_detects_dunder_access(self) -> None: + mock_nodes, mock_sandbox, FakeNode, FakeGetattr, FakeCall, FakeName, FakeFilter = self._mock_jinja2() + + getattr_node = FakeGetattr("__class__") + mock_ast = MagicMock() + mock_ast.find_all.return_value = [getattr_node] + + mock_env = MagicMock() + mock_env.parse.return_value = mock_ast + mock_sandbox.SandboxedEnvironment.return_value = mock_env + mock_jinja2 = MagicMock() + mock_jinja2.nodes = mock_nodes + mock_jinja2.sandbox = mock_sandbox + + with patch.dict( + "sys.modules", {"jinja2": mock_jinja2, "jinja2.nodes": mock_nodes, "jinja2.sandbox": mock_sandbox} + ): + findings = _analyze_ast("{{ x.__class__ }}") + assert any("__class__" in e for e in findings["errors"]) + + def test_ast_detects_dangerous_call(self) -> None: + mock_nodes, mock_sandbox, FakeNode, FakeGetattr, FakeCall, FakeName, FakeFilter = self._mock_jinja2() + + name_node = FakeName("eval") + call_node = FakeCall(name_node) + mock_ast = MagicMock() + mock_ast.find_all.return_value = [call_node] + + mock_env = MagicMock() + mock_env.parse.return_value = mock_ast + mock_sandbox.SandboxedEnvironment.return_value = mock_env + mock_jinja2 = MagicMock() + mock_jinja2.nodes = mock_nodes + mock_jinja2.sandbox = mock_sandbox + + with patch.dict( + "sys.modules", {"jinja2": mock_jinja2, "jinja2.nodes": mock_nodes, "jinja2.sandbox": mock_sandbox} + ): + findings = _analyze_ast("{{ eval('x') }}") + assert any("eval()" in e for e in findings["errors"]) + + def test_ast_detects_dangerous_method_call(self) -> None: + mock_nodes, mock_sandbox, FakeNode, FakeGetattr, FakeCall, FakeName, FakeFilter = self._mock_jinja2() + + method_node = FakeGetattr("exec") + call_node = FakeCall(method_node) + mock_ast = MagicMock() + mock_ast.find_all.return_value = [call_node] + + mock_env = MagicMock() + mock_env.parse.return_value = mock_ast + mock_sandbox.SandboxedEnvironment.return_value = mock_env + mock_jinja2 = MagicMock() + mock_jinja2.nodes = mock_nodes + mock_jinja2.sandbox = mock_sandbox + + with patch.dict( + "sys.modules", {"jinja2": mock_jinja2, "jinja2.nodes": mock_nodes, "jinja2.sandbox": mock_sandbox} + ): + findings = _analyze_ast("{{ foo.exec('x') }}") + assert any("exec()" in e for e in findings["errors"]) + + def test_ast_detects_dangerous_filter(self) -> None: + mock_nodes, mock_sandbox, FakeNode, FakeGetattr, FakeCall, FakeName, FakeFilter = self._mock_jinja2() + + filter_node = FakeFilter("attr") + mock_ast = MagicMock() + mock_ast.find_all.return_value = [filter_node] + + mock_env = MagicMock() + mock_env.parse.return_value = mock_ast + mock_sandbox.SandboxedEnvironment.return_value = mock_env + mock_jinja2 = MagicMock() + mock_jinja2.nodes = mock_nodes + mock_jinja2.sandbox = mock_sandbox + + with patch.dict( + "sys.modules", {"jinja2": mock_jinja2, "jinja2.nodes": mock_nodes, "jinja2.sandbox": mock_sandbox} + ): + findings = _analyze_ast("{{ x|attr('y') }}") + assert any("|attr" in w for w in findings["warnings"]) + + def test_ast_warns_on_unknown_dunder(self) -> None: + mock_nodes, mock_sandbox, FakeNode, FakeGetattr, FakeCall, FakeName, FakeFilter = self._mock_jinja2() + + getattr_node = FakeGetattr("__custom_dunder__") + mock_ast = MagicMock() + mock_ast.find_all.return_value = [getattr_node] + + mock_env = MagicMock() + mock_env.parse.return_value = mock_ast + mock_sandbox.SandboxedEnvironment.return_value = mock_env + mock_jinja2 = MagicMock() + mock_jinja2.nodes = mock_nodes + mock_jinja2.sandbox = mock_sandbox + + with patch.dict( + "sys.modules", {"jinja2": mock_jinja2, "jinja2.nodes": mock_nodes, "jinja2.sandbox": mock_sandbox} + ): + findings = _analyze_ast("{{ x.__custom_dunder__ }}") + # Known dangerous dunders go to errors, unknown dunders to warnings + assert any("__custom_dunder__" in w for w in findings["warnings"]) + assert len(findings["errors"]) == 0 + + def test_ast_clean_template(self) -> None: + mock_nodes, mock_sandbox, FakeNode, FakeGetattr, FakeCall, FakeName, FakeFilter = self._mock_jinja2() + + mock_ast = MagicMock() + mock_ast.find_all.return_value = [] + + mock_env = MagicMock() + mock_env.parse.return_value = mock_ast + mock_sandbox.SandboxedEnvironment.return_value = mock_env + mock_jinja2 = MagicMock() + mock_jinja2.nodes = mock_nodes + mock_jinja2.sandbox = mock_sandbox + + with patch.dict( + "sys.modules", {"jinja2": mock_jinja2, "jinja2.nodes": mock_nodes, "jinja2.sandbox": mock_sandbox} + ): + findings = _analyze_ast("{{ message.content }}") + assert len(findings["errors"]) == 0 + assert len(findings["warnings"]) == 0 + assert len(findings["info"]) == 0 + + def test_ast_no_jinja2_returns_empty(self) -> None: + # When jinja2 is not available, AST analysis should return empty findings + findings = _analyze_ast("{{ anything }}") + assert findings == {"errors": [], "warnings": [], "info": []} diff --git a/dyana/loaders/gguf/main.py b/dyana/loaders/gguf/main.py new file mode 100644 index 0000000..1e585df --- /dev/null +++ b/dyana/loaders/gguf/main.py @@ -0,0 +1,283 @@ +import argparse +import os +import re +import typing as t + +# Patterns indicating SSTI / code execution attempts in Jinja2 templates +MALICIOUS_PATTERNS = [ + (r"__class__", "Python dunder access: __class__"), + (r"__base__", "Python dunder access: __base__"), + (r"__mro__", "Python dunder access: __mro__"), + (r"__subclasses__", "Python dunder access: __subclasses__"), + (r"__globals__", "Python dunder access: __globals__"), + (r"__builtins__", "Python dunder access: __builtins__"), + (r"\bos\.", "OS module access"), + (r"\bsubprocess\b", "Subprocess module reference"), + (r"\beval\s*\(", "eval() call"), + (r"\bexec\s*\(", "exec() call"), + (r"\bimport\s+", "Import statement"), + (r"\|attr\b", "Jinja2 attr filter (SSTI vector)"), +] + +# Patterns indicating obfuscation techniques +OBFUSCATION_PATTERNS = [ + (r"\bbase64\b", "Base64 encoding reference"), + (r"\\x[0-9a-fA-F]{2}", "Hex escape sequence"), + (r"\\u[0-9a-fA-F]{4}", "Unicode escape sequence"), + (r"\bchr\s*\(", "chr() character construction"), + (r"\+\s*['\"]|['\"]\s*\+", "String concatenation in template"), +] + +# Patterns that are suspicious but may be legitimate +SUSPICIOUS_PATTERNS = [ + (r"\{%\s*if\b", "Conditional logic in template"), + (r"\{%\s*for\b", "Loop construct in template"), + (r"\|\s*\w+\s*\|\s*\w+", "Chained filter usage"), +] + + +DANGEROUS_DUNDER_ATTRS = { + "__class__", + "__base__", + "__bases__", + "__mro__", + "__subclasses__", + "__globals__", + "__builtins__", + "__import__", + "__init__", + "__code__", + "__func__", + "__self__", + "__module__", + "__dict__", + "__getattr__", + "__setattr__", + "__delattr__", +} + +DANGEROUS_FILTER_NAMES = {"attr", "map", "select", "reject", "groupby"} + +DANGEROUS_CALL_NAMES = {"eval", "exec", "compile", "execfile", "input", "__import__", "getattr", "setattr", "delattr"} + + +def _analyze_ast(template: str) -> dict[str, list[str]]: + """Walk the Jinja2 AST to detect structural security issues.""" + findings: dict[str, list[str]] = {"errors": [], "warnings": [], "info": []} + + try: + from jinja2 import nodes + from jinja2.sandbox import SandboxedEnvironment + except ImportError: + return findings + + try: + env = SandboxedEnvironment() + ast = env.parse(template) + except Exception as e: + findings["errors"].append(f"Sandbox validation failed: {e}") + return findings + + # Walk every node in the AST + for node in ast.find_all(nodes.Node): + # Attribute access on dunder names + if isinstance(node, nodes.Getattr): + if node.attr in DANGEROUS_DUNDER_ATTRS: + findings["errors"].append(f"AST: dangerous attribute access '{node.attr}'") + elif node.attr.startswith("__") and node.attr.endswith("__"): + findings["warnings"].append(f"AST: dunder attribute access '{node.attr}'") + + # Function calls to dangerous builtins + elif isinstance(node, nodes.Call): + callee = node.node + if isinstance(callee, nodes.Name) and callee.name in DANGEROUS_CALL_NAMES: + findings["errors"].append(f"AST: dangerous call to '{callee.name}()'") + elif isinstance(callee, nodes.Getattr) and callee.attr in DANGEROUS_CALL_NAMES: + findings["errors"].append(f"AST: dangerous call to '.{callee.attr}()'") + + # Filter usage + elif isinstance(node, nodes.Filter): + if node.name in DANGEROUS_FILTER_NAMES: + findings["warnings"].append(f"AST: potentially dangerous filter '|{node.name}'") + + return findings + + +def analyze_chat_template(template: str) -> dict[str, list[str]]: + """Analyze a Jinja2 chat template for security issues. + + Uses two complementary approaches: + - Regex scanning catches patterns in raw text (including comments and obfuscation) + - AST walking catches structural issues regardless of formatting + + Returns a dict with keys 'errors', 'warnings', and 'info' containing lists of finding strings. + """ + findings: dict[str, list[str]] = {"errors": [], "warnings": [], "info": []} + + # Layer 1: Regex scanning for known patterns in raw text + for pattern, description in MALICIOUS_PATTERNS: + if re.search(pattern, template): + findings["errors"].append(f"Malicious pattern detected: {description}") + + for pattern, description in OBFUSCATION_PATTERNS: + if re.search(pattern, template): + findings["warnings"].append(f"Obfuscation detected: {description}") + + for pattern, description in SUSPICIOUS_PATTERNS: + if re.search(pattern, template): + findings["info"].append(f"Suspicious pattern: {description}") + + # Layer 2: AST-based structural analysis + ast_findings = _analyze_ast(template) + for key in ("errors", "warnings", "info"): + findings[key].extend(ast_findings[key]) + + return findings + + +def extract_metadata(reader: t.Any) -> dict[str, str | int | float | None]: + """Extract key metadata fields from a GGUF file.""" + metadata: dict[str, str | int | float | None] = {} + + key_fields = { + "general.architecture": "architecture", + "general.name": "model_name", + "general.quantization_version": "quantization_version", + "general.file_type": "file_type", + "tokenizer.chat_template": "chat_template", + } + + # Dynamic context length keys by architecture + context_length_suffixes = [ + ".context_length", + ".block_count", + ".embedding_length", + ".head_count", + ] + + for field in reader.fields: + field_name = str(field) + + if field_name in key_fields: + field_obj = reader.fields[field_name] + parts = field_obj.parts + # The value is typically in the last part(s) after the metadata key + if len(parts) > 0: + data = field_obj.data + if len(data) == 1: + metadata[key_fields[field_name]] = data[0].item() if hasattr(data[0], "item") else str(data[0]) + elif len(data) > 1: + # For string fields, decode the bytes + try: + metadata[key_fields[field_name]] = bytes(data).decode("utf-8") + except (UnicodeDecodeError, TypeError): + metadata[key_fields[field_name]] = str(data) + else: + # Check for architecture-specific context length + for suffix in context_length_suffixes: + if field_name.endswith(suffix): + field_obj = reader.fields[field_name] + data = field_obj.data + if len(data) == 1: + key = suffix.lstrip(".") + metadata[key] = data[0].item() if hasattr(data[0], "item") else int(data[0]) + + metadata["total_metadata_fields"] = len(reader.fields) + return metadata + + +def validate_file_structure(path: str) -> dict[str, str | int | bool]: + """Validate GGUF file structure: magic bytes, version, size.""" + result: dict[str, str | int | bool] = {} + + file_size = os.path.getsize(path) + result["file_size_bytes"] = file_size + + with open(path, "rb") as f: + magic = f.read(4) + result["magic_valid"] = magic == b"GGUF" + result["magic_hex"] = magic.hex() + + if len(magic) < 4: + result["error"] = "File too small to contain GGUF header" + return result + + # Version is a uint32 LE at offset 4 + version_bytes = f.read(4) + if len(version_bytes) == 4: + version = int.from_bytes(version_bytes, byteorder="little") + result["version"] = version + + return result + + +if __name__ == "__main__": + from dyana import Profiler # type: ignore[attr-defined] + + parser = argparse.ArgumentParser(description="Analyze a GGUF model file") + parser.add_argument("--gguf", help="Path to GGUF file", required=True) + args = parser.parse_args() + profiler: Profiler = Profiler(gpu=False) + + if not os.path.exists(args.gguf): + profiler.track_error("gguf", "GGUF file not found") + else: + try: + # Stage 1: Validate file structure + structure = validate_file_structure(args.gguf) + profiler.on_stage("validating_structure") + profiler.track_extra("file_structure", structure) + + if not structure.get("magic_valid"): + profiler.track_error("gguf", f"Invalid GGUF magic bytes: {structure.get('magic_hex', 'unknown')}") + else: + # Stage 2: Parse GGUF with the official reader + from gguf import GGUFReader + + reader = GGUFReader(args.gguf) + profiler.on_stage("parsing_gguf") + + # Stage 3: Extract metadata + metadata = extract_metadata(reader) + profiler.on_stage("extracting_metadata") + + # Store metadata (excluding the raw template which goes to analysis) + chat_template = metadata.pop("chat_template", None) + profiler.track_extra("metadata", metadata) + + # Stage 4: Analyze chat template + if chat_template and isinstance(chat_template, str): + profiler.track_extra("chat_template_length", len(chat_template)) + findings = analyze_chat_template(chat_template) + profiler.on_stage("analyzing_template") + + for error in findings["errors"]: + profiler.track_error(f"template.{error[:50]}", error) + + for warning in findings["warnings"]: + profiler.track_warning(f"template.{warning[:50]}", warning) + + profiler.track_extra("template_findings", findings) + else: + profiler.on_stage("analyzing_template") + profiler.track_extra("chat_template_length", 0) + profiler.track_extra("template_findings", {"errors": [], "warnings": [], "info": []}) + + # Stage 5: Analyze tensors + tensor_count = len(reader.tensors) + tensor_info = [] + for tensor in reader.tensors[:20]: # Sample first 20 + tensor_info.append( + { + "name": str(tensor.name), + "shape": [int(d) for d in tensor.shape], + "type": str(tensor.tensor_type), + } + ) + + profiler.on_stage("analyzing_tensors") + profiler.track_extra("tensor_count", tensor_count) + profiler.track_extra("tensor_sample", tensor_info) + + except Exception as e: + profiler.track_error("gguf", str(e)) diff --git a/dyana/loaders/gguf/requirements.txt b/dyana/loaders/gguf/requirements.txt new file mode 100644 index 0000000..4f26bab --- /dev/null +++ b/dyana/loaders/gguf/requirements.txt @@ -0,0 +1,2 @@ +gguf==0.17.1 +jinja2==3.1.6 diff --git a/dyana/loaders/gguf/settings.yml b/dyana/loaders/gguf/settings.yml new file mode 100644 index 0000000..e5843ab --- /dev/null +++ b/dyana/loaders/gguf/settings.yml @@ -0,0 +1,14 @@ +description: Analyzes GGUF model files for security issues including malicious chat templates. + +args: + - name: gguf + description: Path to the GGUF model file to analyze. + required: true + volume: true + +examples: + - description: "Analyze a GGUF model file:" + command: dyana trace --loader gguf --gguf /path/to/model.gguf + + - description: "Analyze with verbose output:" + command: dyana trace --loader gguf --gguf /path/to/model.gguf --verbose diff --git a/dyana/view.py b/dyana/view.py index aed6a9b..280a5c8 100644 --- a/dyana/view.py +++ b/dyana/view.py @@ -20,7 +20,9 @@ def _view_loader_help_markdown(loader: Loader) -> None: rich_print() rich_print("* **Requires Network:**", "yes" if loader.settings.network else "no") if loader.settings.build_args: - rich_print("* **Optional Build Arguments:**", ", ".join({f"`--{k}`" for k in loader.settings.build_args.keys()})) + rich_print( + "* **Optional Build Arguments:**", ", ".join({f"`--{k}`" for k in loader.settings.build_args.keys()}) + ) if loader.settings.args: rich_print() @@ -34,7 +36,9 @@ def _view_loader_help_markdown(loader: Loader) -> None: "|--------------|---------------------------------------------------------------------|------------------------------|----------|" ) for arg in loader.settings.args: - rich_print(f"| `--{arg.name}` | {arg.description} | `{arg.default}` | {'yes' if arg.required else 'no'} |") + rich_print( + f"| `--{arg.name}` | {arg.description} | `{arg.default}` | {'yes' if arg.required else 'no'} |" + ) if loader.settings.examples: rich_print() @@ -333,7 +337,7 @@ def view_network_events(trace: dict[str, t.Any]) -> None: else: data = [arg["value"] for arg in event["args"] if arg["name"] == "proto_dns"][0] question_names = [q["name"] for q in data["questions"]] - answers = [f'{a["name"]}={a["IP"]}' for a in data["answers"]] + answers = [f"{a['name']}={a['IP']}" for a in data["answers"]] if not answers: line = f" * [[dim]{event['processId']}[/]] {event['processName']} | [bold red]dns[/] | question={', '.join(question_names)}" @@ -399,6 +403,102 @@ def view_disk_events(trace: dict[str, t.Any]) -> None: rich_print() +def view_extra(run: dict[str, t.Any]) -> None: + extra = run.get("extra") + if not extra: + return + + loader_name = run.get("loader_name", "") + + if loader_name == "gguf": + _view_gguf_extra(extra) + else: + # Generic fallback for unknown extras + known_keys = {"imports"} + unknown = [k for k in extra if k not in known_keys] + if unknown: + rich_print("[bold yellow]Extra:[/]") + for k in unknown: + rich_print(f" * {k}") + rich_print() + + +def _view_gguf_extra(extra: dict[str, t.Any]) -> None: + # File structure + structure = extra.get("file_structure") + if structure: + rich_print("[bold yellow]File Structure:[/]") + rich_print(f" Size : {sizeof_fmt(structure.get('file_size_bytes', 0))}") + rich_print(f" GGUF version : {structure.get('version', 'unknown')}") + magic_ok = structure.get("magic_valid", False) + magic_str = "[green]valid[/]" if magic_ok else "[bold red]INVALID[/]" + rich_print(f" Magic bytes : {magic_str}") + rich_print() + + # Model metadata + metadata = extra.get("metadata") + if metadata: + rich_print("[bold yellow]Model Metadata:[/]") + field_labels = { + "architecture": "Architecture", + "model_name": "Name", + "file_type": "File type", + "quantization_version": "Quantization", + "context_length": "Context length", + "block_count": "Block count", + "embedding_length": "Embedding dim", + "head_count": "Head count", + "total_metadata_fields": "Total fields", + } + for key, label in field_labels.items(): + if key in metadata: + rich_print(f" {label:<18}: {metadata[key]}") + rich_print() + + # Template analysis + template_len = extra.get("chat_template_length", 0) + findings = extra.get("template_findings") + if findings: + errors = findings.get("errors", []) + warnings = findings.get("warnings", []) + info = findings.get("info", []) + + if errors or warnings or info: + rich_print("[bold yellow]Chat Template Analysis:[/]") + rich_print(f" Template length : {template_len} chars") + + for error in errors: + rich_print(f" [bold red]ERROR[/] : {error}") + for warning in warnings: + rich_print(f" [bold yellow]WARNING[/] : {warning}") + for item in info: + rich_print(f" [dim]INFO[/] : {item}") + rich_print() + elif template_len > 0: + rich_print("[bold yellow]Chat Template Analysis:[/]") + rich_print(f" Template length : {template_len} chars") + rich_print(" [green]No security issues found[/]") + rich_print() + else: + rich_print("[bold yellow]Chat Template Analysis:[/]") + rich_print(" [dim]No chat template present[/]") + rich_print() + + # Tensor summary + tensor_count = extra.get("tensor_count") + if tensor_count is not None: + rich_print("[bold yellow]Tensors:[/]") + rich_print(f" Total tensors : {tensor_count}") + sample = extra.get("tensor_sample", []) + if sample: + for tensor in sample[:10]: + shape_str = "x".join(str(d) for d in tensor.get("shape", [])) + rich_print(f" * [dim]{tensor.get('name', '?')}[/] [{shape_str}] {tensor.get('type', '')}") + if tensor_count > 10: + rich_print(f" [dim]... and {tensor_count - 10} more[/]") + rich_print() + + def view_security_events(trace: dict[str, t.Any]) -> None: security_events = [event for event in trace["events"] if event["eventName"] in SECURITY_EVENTS] if security_events: diff --git a/dyana/view_test.py b/dyana/view_test.py index 02f95c1..a118ff3 100644 --- a/dyana/view_test.py +++ b/dyana/view_test.py @@ -5,6 +5,7 @@ severity_fmt, view_disk_events, view_disk_usage, + view_extra, view_header, view_network_events, view_process_executions, @@ -284,9 +285,7 @@ def test_dedup(self) -> None: "processId": 1, "processName": "curl", "syscall": "connect", - "args": [ - {"name": "remote_addr", "value": {"sa_family": "AF_INET", "sin_addr": "1.2.3.4", "sin_port": 80}} - ], + "args": [{"name": "remote_addr", "value": {"sa_family": "AF_INET", "sin_addr": "1.2.3.4", "sin_port": 80}}], } trace: dict[str, t.Any] = {"events": [event, {**event, "timestamp": 2000}]} with patch("dyana.view.rich_print") as mock_print: @@ -434,3 +433,123 @@ def test_basic(self) -> None: assert "Disk Usage" in output assert "start" in output assert "end" in output + + +class TestViewExtra: + def test_no_extra(self) -> None: + run: dict[str, t.Any] = {"loader_name": "gguf"} + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + mock_print.assert_not_called() + + def test_gguf_file_structure(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "gguf", + "extra": { + "file_structure": {"file_size_bytes": 4096000, "version": 3, "magic_valid": True}, + }, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + output = " ".join(str(c) for c in mock_print.call_args_list) + assert "File Structure" in output + assert "valid" in output + assert "3" in output + + def test_gguf_metadata(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "gguf", + "extra": { + "metadata": { + "architecture": "llama", + "model_name": "test-model", + "context_length": 4096, + "total_metadata_fields": 25, + }, + }, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + output = " ".join(str(c) for c in mock_print.call_args_list) + assert "Model Metadata" in output + assert "llama" in output + assert "test-model" in output + assert "4096" in output + + def test_gguf_template_with_errors(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "gguf", + "extra": { + "chat_template_length": 500, + "template_findings": { + "errors": ["Malicious pattern detected: Python dunder access: __class__"], + "warnings": ["Obfuscation detected: Base64 encoding reference"], + "info": [], + }, + }, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + output = " ".join(str(c) for c in mock_print.call_args_list) + assert "Chat Template Analysis" in output + assert "ERROR" in output + assert "__class__" in output + assert "WARNING" in output + assert "Base64" in output + + def test_gguf_template_clean(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "gguf", + "extra": { + "chat_template_length": 200, + "template_findings": {"errors": [], "warnings": [], "info": []}, + }, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + output = " ".join(str(c) for c in mock_print.call_args_list) + assert "No security issues" in output + + def test_gguf_template_absent(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "gguf", + "extra": { + "chat_template_length": 0, + "template_findings": {"errors": [], "warnings": [], "info": []}, + }, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + output = " ".join(str(c) for c in mock_print.call_args_list) + assert "No chat template" in output + + def test_gguf_tensors(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "gguf", + "extra": { + "tensor_count": 250, + "tensor_sample": [ + {"name": "blk.0.attn_q.weight", "shape": [4096, 4096], "type": "Q4_0"}, + {"name": "blk.0.attn_k.weight", "shape": [1024, 4096], "type": "Q4_0"}, + ], + }, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + output = " ".join(str(c) for c in mock_print.call_args_list) + assert "Tensors" in output + assert "250" in output + assert "attn_q" in output + assert "4096x4096" in output + assert "240 more" in output + + def test_generic_loader_extra(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "pickle", + "extra": {"object_type": "", "length": 5}, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + output = " ".join(str(c) for c in mock_print.call_args_list) + assert "Extra" in output + assert "object_type" in output