Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b5bcc7c
Add support for --direct access for cim plugins
william-billaud Dec 4, 2025
df955f1
Merge branch 'main' into cim_add_direct_support
william-billaud Dec 9, 2025
c36698b
Merge branch 'main' into cim_add_direct_support
william-billaud Dec 11, 2025
f183164
Merge branch 'main' into cim_add_direct_support
william-billaud Dec 15, 2025
340ce17
Merge branch 'main' into cim_add_direct_support
william-billaud Dec 18, 2025
1000f34
Merge branch 'main' into cim_add_direct_support
william-billaud Dec 22, 2025
db71419
Use case insentive vfs by default on direct mode.
william-billaud Dec 22, 2025
28b5a4a
Fix test python < 3.12
william-billaud Jan 6, 2026
5ea70ae
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 6, 2026
1afdbb9
Fix tests on case insensitive fs
william-billaud Jan 6, 2026
25b8d17
Revert static version
william-billaud Jan 8, 2026
4d60ab1
Use VirtualFilesystem (currently causing a deadlock)
william-billaud Jan 9, 2026
6bc3c16
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 9, 2026
715c249
Fix direct loader vfs conversion to str, causing errors in tests
william-billaud Jan 9, 2026
1e6a00d
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 9, 2026
6b076a4
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 12, 2026
5bc1b42
Revert tox ini change
william-billaud Jan 12, 2026
076f512
Delete no longer used variable
william-billaud Jan 12, 2026
21a6aee
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 13, 2026
4ed0bcd
Apply suggestion
william-billaud Jan 13, 2026
867dbc0
Update tests/loaders/test_direct.py
william-billaud Jan 13, 2026
8140f64
Add specific function to enumerate file for windows before 3.12
william-billaud Jan 13, 2026
88cfc2f
Ruff
william-billaud Jan 13, 2026
10f348b
Ruff
william-billaud Jan 13, 2026
9f2a012
typo
william-billaud Jan 13, 2026
880e20d
Add comment
william-billaud Jan 14, 2026
4710aba
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 14, 2026
808e028
Merge branch 'main' into cim_add_direct_support
twiggler Jan 15, 2026
f1e01c4
Fix for python < 3.12 on windows
william-billaud Jan 19, 2026
310632a
Lint
william-billaud Jan 19, 2026
48719bd
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 19, 2026
f413c9a
Update dissect/target/loaders/direct.py
william-billaud Jan 23, 2026
8b2fd4a
Update dissect/target/loaders/direct.py
william-billaud Jan 23, 2026
f2d7ab1
Update dissect/target/tools/query.py
william-billaud Jan 23, 2026
0a1efce
Update tests/tools/test_query.py
william-billaud Jan 23, 2026
5815c12
Update dissect/target/target.py
william-billaud Jan 23, 2026
c674034
Move __repr__
william-billaud Jan 23, 2026
c6033b7
Fix docstyle
william-billaud Jan 23, 2026
745e5d9
Fix open_direct call
william-billaud Jan 23, 2026
e9386bd
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 66 additions & 4 deletions dissect/target/loaders/direct.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,93 @@
from __future__ import annotations

import sys
from itertools import chain
from pathlib import Path
from typing import TYPE_CHECKING

from dissect.target.filesystem import VirtualFilesystem
from dissect.target.helpers.logging import get_logger
from dissect.target.loader import Loader
from dissect.target.plugins.os.default._os import DefaultOSPlugin

if TYPE_CHECKING:
from collections.abc import Iterator

from dissect.target.target import Target

log = get_logger(__name__)


class DirectLoader(Loader):
def __init__(self, paths: list[str | Path]):
def __init__(self, paths: list[str | Path], case_sensitive: bool = False):
self.case_sensitive = case_sensitive
self.paths = [(Path(path) if not isinstance(path, Path) else path).resolve() for path in paths]

def __repr__(self) -> str:
"""
As DirectLoader does not call super().__init__() self.path is not defined, we need to redefine the __repr__ func
:return:
"""
return f"{self.__class__.__name__}({str(self.paths)!r})"

@staticmethod
def detect(path: Path) -> bool:
return False

def map(self, target: Target) -> None:
vfs = VirtualFilesystem()
if not self.case_sensitive and self.check_case_insensitive_overlap():
log.warning(
"Direct mode used in case insensitive mode, but this will cause files overlap, "
"consider using --direct-sensitive"
)
vfs = VirtualFilesystem(case_sensitive=self.case_sensitive)
for path in self.paths:
if path.is_file():
vfs.map_file(str(path), str(path))
vfs.map_file(str(path), path)
elif path.is_dir():
vfs.map_dir(str(path), str(path))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@twiggler regarding

Can you elaborate on why the string conversion was a problem? According to the typings, both LayerFilesystem::map_dir and LayerFilesystem::map_file should be able to take realpath as a str

As the path variable (that containes the VFS) was converted to string, the vfs object was lost and vfs.map_file tried to map the string '/' (thus my root fs)

vfs.map_dir(str(path), path)

target.filesystems.add(vfs)
target._os_plugin = DefaultOSPlugin

def check_case_insensitive_overlap(self) -> bool:
"""Verify if two differents files will have the same path in a case-insensitive scenario."""
if sys.version_info >= (3, 12) or sys.platform != "win32":

def get_files(path: Path) -> Iterator[Path]:
"""Return list of all files recursively."""
if not path.exists():
return
if path.is_file():
yield path
# Recursively find all files in the directory
yield from path.rglob("*")
else:

def get_files(path: Path, max_depth: int = 7) -> Iterator[Path]:
"""
rglob seems to have issue on windows with python <3.12 when working on a case sensitive FS. Thus
we use another implementation without using rglob.
Probably related to https://github.com/python/cpython/issues/94537
"""

if max_depth == 0:
return
if not path.exists():
return
if path.is_dir():
for f in path.iterdir():
if f.is_dir():
yield from get_files(f, max_depth=max_depth - 1)
else:
yield f
elif path.is_file():
yield path

# Create a flat list of all file paths from all input directories
all_paths = chain.from_iterable(get_files(p) for p in self.paths)
# Filter out directories, keeping only files
all_files = [p for p in all_paths if p.is_file()]
# Compare the count of all files with the count of unique, lowercased file paths

return len({str(p).lower() for p in all_files}) != len(all_files)
45 changes: 32 additions & 13 deletions dissect/target/plugins/os/windows/cim.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path

from dissect.target.target import Target

Expand Down Expand Up @@ -105,20 +106,38 @@ class CimPlugin(Plugin):
def __init__(self, target: Target):
super().__init__(target)
self._repo = None
repodir = self.target.resolve("%windir%/system32/wbem/repository")

repodirs = list(self.get_paths())
if len(repodirs) > 1:
raise UnsupportedPluginError(
"CIM plugins does not support multiple paths. "
"If using with --direct access use wbem/repository folder as input"
)
if len(repodirs) == 0:
raise UnsupportedPluginError("No CIM database found")

self._filters: dict[str, EventFilter] = {}
if repodir.exists():
index = repodir.joinpath("index.btr")
objects = repodir.joinpath("objects.data")
mappings = [repodir.joinpath(f"mapping{i}.map") for i in range(1, 4)]

if all([index.exists(), objects.exists(), all(m.exists() for m in mappings)]):
try:
self._repo = cim.CIM(index.open(), objects.open(), [m.open() for m in mappings])
except cim.Error as e:
self.target.log.warning("Error opening CIM database")
self.target.log.debug("", exc_info=e)
self._filters = self._get_filters()
repodir = repodirs[0]
index = repodir.joinpath("index.btr")
objects = repodir.joinpath("objects.data")
mappings = [repodir.joinpath(f"mapping{i}.map") for i in range(1, 4)]

if all([index.exists(), objects.exists(), all(m.exists() for m in mappings)]):
try:
self._repo = cim.CIM(index.open(), objects.open(), [m.open() for m in mappings])
except cim.Error as e:
self.target.log.warning("Error opening CIM database")
self.target.log.debug("", exc_info=e)
else:
missing_files = ",".join(str(f) for f in [index, objects, *mappings] if not f.exists())
raise UnsupportedPluginError(f"missing expected files : {missing_files}")

self._filters = self._get_filters()

def _get_paths(self) -> Iterator[Path]:
repodir = self.target.resolve("%windir%/system32/wbem/repository")
if repodir.exists:
yield repodir

def check_compatible(self) -> None:
if not self._repo:
Expand Down
4 changes: 2 additions & 2 deletions dissect/target/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,12 @@ def _open_all(spec: str | Path, include_children: bool = False, *, apply: bool =
raise TargetError(f"Failed to find any loader for targets: {paths}")

@classmethod
def open_direct(cls, paths: list[str | Path]) -> Self:
def open_direct(cls, paths: list[str | Path], *, case_sensitive: bool = False) -> Self:
"""Create a minimal target with a virtual root filesystem with all ``paths`` mapped into it.

This is useful when running plugins on individual files.
"""
return cls._load("direct", DirectLoader(paths))
return cls._load("direct", DirectLoader(paths, case_sensitive))

@property
def is_direct(self) -> bool:
Expand Down
6 changes: 6 additions & 0 deletions dissect/target/tools/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ def main() -> int:
type=pathlib.Path,
help="write the query report file to the given directory",
)

advanced_group = parser.add_argument_group("Advanced options")
advanced_group.add_argument(
"--direct-sensitive", action="store_true", help="same as --direct, but paths will be case-sensitive"
)

configure_generic_arguments(parser)

args, rest = parser.parse_known_args()
Expand Down
12 changes: 8 additions & 4 deletions dissect/target/tools/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,14 @@ def process_plugin_arguments(parser: argparse.ArgumentParser, args: argparse.Nam


def open_target(args: argparse.Namespace, *, apply: bool = True) -> Target:
direct: bool = getattr(args, "direct", False)
direct: bool = getattr(args, "direct", False) or getattr(args, "direct_sensitive", False)
child: str | None = getattr(args, "child", None)

target = Target.open_direct(args.target) if direct else Target.open(args.target, apply=apply)
target = (
Target.open_direct(args.targets, case_sensitive=getattr(args, "direct_sensitive", False))
if direct
else Target.open(args.target, apply=apply)
)

if child:
try:
Expand All @@ -234,12 +238,12 @@ def open_target(args: argparse.Namespace, *, apply: bool = True) -> Target:


def open_targets(args: argparse.Namespace, *, apply: bool = True) -> Iterator[Target]:
direct: bool = getattr(args, "direct", False)
direct: bool = getattr(args, "direct", False) or getattr(args, "direct_sensitive", False)
children: bool = getattr(args, "children", False)
child: str | None = getattr(args, "child", None)

targets: Iterable[Target] = (
[Target.open_direct(args.targets)]
[Target.open_direct(args.targets, case_sensitive=getattr(args, "direct_sensitive", False))]
if direct
else Target.open_all(args.targets, include_children=children, apply=apply)
)
Expand Down
43 changes: 43 additions & 0 deletions tests/loaders/test_direct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from __future__ import annotations

import io
import logging
import typing

from dissect.target import Target
from dissect.target.filesystem import VirtualFilesystem

if typing.TYPE_CHECKING:
from pathlib import Path

import pytest


def test_direct_overlap_warning(
tmp_path: Path, caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Assert direct raise warning in case sensitive mode if some files overlap
We must uncompress files in a temporary directory as having two files with same name
would cause issue with git on case insensitive fs.
"""
source_vfs = VirtualFilesystem(case_sensitive=True)
source_vfs.map_file_fh("file.txt", io.BytesIO(b"a"))
source_vfs.map_file_fh("File.txt", io.BytesIO(b"b"))
with caplog.at_level(logging.WARNING):
_ = Target.open_direct([source_vfs.path("/")], case_sensitive=True)
assert (
"Direct mode used in case insensitive mode, but this will cause files overlap, "
"consider using --direct-sensitive" not in caplog.text
)
with caplog.at_level(logging.WARNING):
_ = Target.open_direct([source_vfs.path("/")], case_sensitive=False)
assert (
"Direct mode used in case insensitive mode, but this will cause files overlap, "
"consider using --direct-sensitive" in caplog.text
)
with caplog.at_level(logging.WARNING):
_ = Target.open_direct([source_vfs.path("/file.txt"), source_vfs.path("/File.txt")], case_sensitive=False)
assert (
"Direct mode used in case insensitive mode, but this will cause files overlap, "
"consider using --direct-sensitive" in caplog.text
)
10 changes: 9 additions & 1 deletion tests/plugins/os/windows/test_cim.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
ActiveScriptEventConsumerRecord,
CommandLineEventConsumerRecord,
)
from dissect.target.target import Target
from tests._utils import absolute_path

if TYPE_CHECKING:
from dissect.target.filesystem import VirtualFilesystem
from dissect.target.target import Target


def test_cim_plugin(target_win: Target, fs_win: VirtualFilesystem) -> None:
Expand All @@ -28,6 +28,14 @@ def test_cim_plugin(target_win: Target, fs_win: VirtualFilesystem) -> None:
assert len([record for record in target_win.cim() if record.filter_query]) == 3


def test_cim_direct_mode() -> None:
data_path = absolute_path("_data/plugins/os/windows/cim/default-namespace")
target = Target.open_direct([data_path])
records = list(target.cim.consumerbindings())

assert len(records) == 3


r"""
Result of the WMI query on the system used to generate the test data

Expand Down
20 changes: 20 additions & 0 deletions tests/tools/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,3 +495,23 @@ def test_mixed_namespace_and_regular_regression(capsys: pytest.CaptureFixture, m
"<example/descriptor hostname=None domain=None field_a='example' field_b='record'>\n"
"<example/descriptor hostname=None domain=None field_a='namespace_example' field_b='record'>\n"
) in out


def test_direct_mode(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that the cim plugin works in direct insensitive mode."""
with monkeypatch.context() as m:
m.setattr(
"sys.argv",
[
"target-query",
"-f",
"cim",
str(absolute_path("_data/plugins/os/windows/cim/default-namespace")),
"--direct",
"-s",
],
)

target_query()
out, _ = capsys.readouterr()
assert len(out.splitlines()) == 3