Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b5bcc7c
Add support for --direct access for cim plugins
william-billaud Dec 4, 2025
df955f1
Merge branch 'main' into cim_add_direct_support
william-billaud Dec 9, 2025
c36698b
Merge branch 'main' into cim_add_direct_support
william-billaud Dec 11, 2025
f183164
Merge branch 'main' into cim_add_direct_support
william-billaud Dec 15, 2025
340ce17
Merge branch 'main' into cim_add_direct_support
william-billaud Dec 18, 2025
1000f34
Merge branch 'main' into cim_add_direct_support
william-billaud Dec 22, 2025
db71419
Use case insentive vfs by default on direct mode.
william-billaud Dec 22, 2025
28b5a4a
Fix test python < 3.12
william-billaud Jan 6, 2026
5ea70ae
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 6, 2026
1afdbb9
Fix tests on case insensitive fs
william-billaud Jan 6, 2026
25b8d17
Revert static version
william-billaud Jan 8, 2026
4d60ab1
Use VirtualFilesystem (currently causing a deadlock)
william-billaud Jan 9, 2026
6bc3c16
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 9, 2026
715c249
Fix direct loader vfs conversion to str, causing errors in tests
william-billaud Jan 9, 2026
1e6a00d
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 9, 2026
6b076a4
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 12, 2026
5bc1b42
Revert tox ini change
william-billaud Jan 12, 2026
076f512
Delete no longer used variable
william-billaud Jan 12, 2026
21a6aee
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 13, 2026
4ed0bcd
Apply suggestion
william-billaud Jan 13, 2026
867dbc0
Update tests/loaders/test_direct.py
william-billaud Jan 13, 2026
8140f64
Add specific function to enumerate file for windows before 3.12
william-billaud Jan 13, 2026
88cfc2f
Ruff
william-billaud Jan 13, 2026
10f348b
Ruff
william-billaud Jan 13, 2026
9f2a012
typo
william-billaud Jan 13, 2026
880e20d
Add comment
william-billaud Jan 14, 2026
4710aba
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 14, 2026
808e028
Merge branch 'main' into cim_add_direct_support
twiggler Jan 15, 2026
f1e01c4
Fix for python < 3.12 on windows
william-billaud Jan 19, 2026
310632a
Lint
william-billaud Jan 19, 2026
48719bd
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 19, 2026
f413c9a
Update dissect/target/loaders/direct.py
william-billaud Jan 23, 2026
8b2fd4a
Update dissect/target/loaders/direct.py
william-billaud Jan 23, 2026
f2d7ab1
Update dissect/target/tools/query.py
william-billaud Jan 23, 2026
0a1efce
Update tests/tools/test_query.py
william-billaud Jan 23, 2026
5815c12
Update dissect/target/target.py
william-billaud Jan 23, 2026
c674034
Move __repr__
william-billaud Jan 23, 2026
c6033b7
Fix docstyle
william-billaud Jan 23, 2026
745e5d9
Fix open_direct call
william-billaud Jan 23, 2026
e9386bd
Merge branch 'main' into cim_add_direct_support
william-billaud Jan 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions dissect/target/loaders/direct.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,39 @@
from __future__ import annotations

import functools
import operator
from pathlib import Path
from typing import TYPE_CHECKING

from dissect.target.filesystem import VirtualFilesystem
from dissect.target.helpers.logging import get_logger
from dissect.target.loader import Loader
from dissect.target.plugins.os.default._os import DefaultOSPlugin

if TYPE_CHECKING:
from collections.abc import Iterator

from dissect.target.target import Target

log = get_logger(__name__)


class DirectLoader(Loader):
def __init__(self, paths: list[str | Path]):
def __init__(self, paths: list[str | Path], case_sensitive: bool = False):
self.case_sensitive = case_sensitive
self.paths = [(Path(path) if not isinstance(path, Path) else path).resolve() for path in paths]

@staticmethod
def detect(path: Path) -> bool:
return False

def map(self, target: Target) -> None:
vfs = VirtualFilesystem()
if not self.case_sensitive and self.check_case_insensitive_overlap():
log.warning(
"Direct mode used in case insensitive mode, but this will cause files overlap, "
"consider using --direct-sensitive"
)
vfs = VirtualFilesystem(case_sensitive=self.case_sensitive)
for path in self.paths:
if path.is_file():
vfs.map_file(str(path), str(path))
Expand All @@ -29,3 +42,31 @@ def map(self, target: Target) -> None:

target.filesystems.add(vfs)
target._os_plugin = DefaultOSPlugin

def yield_all_file_recursively(self, base_path: Path, max_depth: int = 7) -> Iterator[Path]:
"""
Return list of all files recursively, as rglob is not case-sensitive until python 3.12

:param base_path:
:param max_depth: max depth, prevent infinite recursion
:return:
"""
if max_depth == 0:
return
if not base_path.exists():
return
if base_path.is_file():
yield base_path
return
for f in base_path.iterdir():
if f.is_dir():
yield from self.yield_all_file_recursively(f, max_depth=max_depth - 1)
else:
yield f

def check_case_insensitive_overlap(self) -> bool:
"""Verify if two differents files will have the same path in a case-insensitive fs"""
all_files_list = list(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am considering if the case insensitivity check should not be an optional check in VirtualDirectory::add.

It should be more efficient and easier to implement there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar enough with these component but probably.

Nevertheless maybe this implementation could be considered a good enough for the --direct use case, and improvement could be managed in a dedicated issue (as it required more modification of internals).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevertheless maybe this implementation could be considered a good enough for the --direct use case

Sure, we can defer to another ticket.

I will add a suggestion to make the overlap check more efficient tomorrow, along with another review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been looking at the implementation of yield_all_file_recursively and check_case_insensitive _overlap. I noticed the comment mentioning that rglob isn't case-sensitive until Python 3.12. While that's a valid point to consider, the actual case-insensitive check in the current code happens when the paths are converted to lowercase.

My proposed refactoring keeps this exact same logic but simplifies the first step of gathering the files. Instead of using a custom recursive function, we can use rglob to get all the file paths and then perform the same lowercase comparison. This works correctly on any OS, regardless of the filesystem's native case sensitivity, because the check is done in Python after collecting the files.

Here’s how we could refactor it:

# No longer needed
# def yield_all_file_recursively(self, base_path: Path, max_depth: int = 7) -> Iterator[Path]:

def check_case_insensitive_overlap(self) -> bool:
    """Verify if two different files will have the same path in a case-insensitive fs."""

    def get_files(path: Path):
        if not path.exists():
            return []
        if path.is_file():
            return [path]
        # Recursively find all files in the directory
        return list(path.rglob("*"))

    # Create a flat list of all file paths from all input directories
    all_paths = chain.from_iterable(get_files(p) for p in self.paths)
    # Filter out directories, keeping only files
    all_files = [p for p in all_paths if p.is_file()]

    # Compare the count of all files with the count of unique, lowercased file paths
    return len({str(p).lower() for p in all_files}) != len(all_files)

(Untested)
What do you think of this approach?

Copy link
Contributor Author

@william-billaud william-billaud Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's close to my initial approach.
unfortunately it does not work on with python <3.12 when target is on a case sensitive FS on windows. I was not able to catch it in test (it maybe related to the cpython implemention) using the Virtual Filesystem.
my comment is misleading as in fact this is not related to the case_sensitive parameter, but probably to some inner works when adding this option

Here is the output from a windows VM, where z: is virtualbox share with your suggestion and o has the following content

❯ tree o
o
├── u
│   ├── a
│   │   └── 1
│   └── A
│       └── 1
└── U
    ├── a
    │   └── 1
    └── A
        └── 1

6 directories, 4 files                                                 

With your suggestion : Warning only raised in 3.12

PS C:\Users\RaptorSniper\dissect.target>  uv run --python 3.12 --refresh --extra dev --extra full target-query --direct -f example_namespace Z:\o
2026-01-13T16:09:49.617953Z [warning  ] Direct mode used in case insensitive mode, but this will cause files overlap, consider using --direct-sensitive [dissect.target.loaders.direct]
<example/descriptor hostname=None domain=None field_a='namespace_example' field_b='record'>
PS C:\Users\RaptorSniper\dissect.target>  uv run --python 3.10 --refresh --extra dev --extra full target-query --direct -f example_namespace Z:\o
Using CPython 3.10.19
<example/descriptor hostname=None domain=None field_a='namespace_example' field_b='record'>
PS C:\Users\RaptorSniper\dissect.target>  uv run --python 3.11 --refresh --extra dev --extra full target-query --direct -f example_namespace Z:\o
Using CPython 3.11.14
<example/descriptor hostname=None domain=None field_a='namespace_example' field_b='record'>
PS C:\Users\RaptorSniper\dissect.target>        

With previous implementation : Warning raised with all version

PS C:\Users\RaptorSniper\dissect.target>  uv run --python 3.10 --refresh --extra dev --extra full target-query --direct -f example_namespace Z:\o
Using CPython 3.10.19
2026-01-13T16:18:52.905258Z [warning  ] Direct mode used in case insensitive mode, but this will cause files overlap, consider using --direct-sensitive [dissect.target.loaders.direct]
<example/descriptor hostname=None domain=None field_a='namespace_example' field_b='record'>
PS C:\Users\RaptorSniper\dissect.target>  uv run --python 3.11 --refresh --extra dev --extra full target-query --direct -f example_namespace Z:\o
Using CPython 3.11.14
2026-01-13T16:19:26.408962Z [warning  ] Direct mode used in case insensitive mode, but this will cause files overlap, consider using --direct-sensitive [dissect.target.loaders.direct]
<example/descriptor hostname=None domain=None field_a='namespace_example' field_b='record'>
PS C:\Users\RaptorSniper\dissect.target>  uv run --python 3.12 --refresh --extra dev --extra full target-query --direct -f example_namespace Z:\o
Using CPython 3.12.10 interpreter at: C:\Users\RaptorSniper\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.12_qbz5n2kfra8p0\python.exe
2026-01-13T16:20:40.384049Z [warning  ] Direct mode used in case insensitive mode, but this will cause files overlap, consider using --direct-sensitive [dissect.target.loaders.direct]
<example/descriptor hostname=None domain=None field_a='namespace_example' field_b='record'>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented a logic to define a different function for this edge case.

This will allows to easily delete it in the (not near) future when support for python 3.11 will be dropped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's unfortunate, there indeed appears to be a bug in python 3.11 on Windows (python/cpython#94537)

I think it is nice we can remove the workaround when python 3.11 support is dropped.

functools.reduce(operator.iadd, (list(self.yield_all_file_recursively(p)) for p in self.paths), [])
)
return len({str(p).lower() for p in all_files_list}) != len(all_files_list)
47 changes: 33 additions & 14 deletions dissect/target/plugins/os/windows/cim.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path

from dissect.target.target import Target

Expand Down Expand Up @@ -105,22 +106,40 @@ class CimPlugin(Plugin):
def __init__(self, target: Target):
super().__init__(target)
self._repo = None
repodir = self.target.resolve("%windir%/system32/wbem/repository")

repodirs = list(self.get_paths())
if len(repodirs) > 1:
raise UnsupportedPluginError(
"CIM plugins does not support multiple paths. "
"If using with --direct access use wbem/repository folder as input"
)
if len(repodirs) == 0:
raise UnsupportedPluginError("No auth log files found")

self._subscription_ns = None
self._filters: dict[str, EventFilter] = {}
if repodir.exists():
index = repodir.joinpath("index.btr")
objects = repodir.joinpath("objects.data")
mappings = [repodir.joinpath(f"mapping{i}.map") for i in range(1, 4)]

if all([index.exists(), objects.exists(), all(m.exists() for m in mappings)]):
try:
self._repo = cim.CIM(index.open(), objects.open(), [m.open() for m in mappings])
except cim.Error as e:
self.target.log.warning("Error opening CIM database")
self.target.log.debug("", exc_info=e)
self._subscription_ns = self._repo.root.namespace("subscription")
self._filters = self._get_filters()
repodir = repodirs[0]
index = repodir.joinpath("index.btr")
objects = repodir.joinpath("objects.data")
mappings = [repodir.joinpath(f"mapping{i}.map") for i in range(1, 4)]

if all([index.exists(), objects.exists(), all(m.exists() for m in mappings)]):
try:
self._repo = cim.CIM(index.open(), objects.open(), [m.open() for m in mappings])
except cim.Error as e:
self.target.log.warning("Error opening CIM database")
self.target.log.debug("", exc_info=e)
else:
missing_files = ",".join(str(f) for f in [index, objects, *mappings] if not f.exists())
raise UnsupportedPluginError(f"missing expected files : {missing_files}")

self._subscription_ns = self._repo.root.namespace("subscription")
self._filters = self._get_filters()

def _get_paths(self) -> Iterator[Path]:
repodir = self.target.resolve("%windir%/system32/wbem/repository")
if repodir.exists:
yield repodir

def check_compatible(self) -> None:
if not self._repo:
Expand Down
4 changes: 2 additions & 2 deletions dissect/target/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,12 @@ def _open_all(spec: str | Path, include_children: bool = False, *, apply: bool =
raise TargetError(f"Failed to find any loader for targets: {paths}")

@classmethod
def open_direct(cls, paths: list[str | Path]) -> Self:
def open_direct(cls, paths: list[str | Path], case_sensitive: bool = False) -> Self:
"""Create a minimal target with a virtual root filesystem with all ``paths`` mapped into it.

This is useful when running plugins on individual files.
"""
return cls._load("direct", DirectLoader(paths))
return cls._load("direct", DirectLoader(paths, case_sensitive))

@property
def is_direct(self) -> bool:
Expand Down
6 changes: 6 additions & 0 deletions dissect/target/tools/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ def main() -> int:
type=pathlib.Path,
help="write the query report file to the given directory",
)

advanced_group = parser.add_argument_group("Advanced options")
advanced_group.add_argument(
"--direct-sensitive", action="store_true", help="Same as --direct, but paths will be case sensitive"
)

configure_generic_arguments(parser)

args, rest = parser.parse_known_args()
Expand Down
12 changes: 8 additions & 4 deletions dissect/target/tools/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,14 @@ def process_plugin_arguments(parser: argparse.ArgumentParser, args: argparse.Nam


def open_target(args: argparse.Namespace, *, apply: bool = True) -> Target:
direct: bool = getattr(args, "direct", False)
direct: bool = getattr(args, "direct", False) or getattr(args, "direct_sensitive", False)
child: str | None = getattr(args, "child", None)

target = Target.open_direct(args.target) if direct else Target.open(args.target, apply=apply)
target = (
Target.open_direct(args.targets, getattr(args, "direct_sensitive", False))
if direct
else Target.open(args.target, apply=apply)
)

if child:
try:
Expand All @@ -234,12 +238,12 @@ def open_target(args: argparse.Namespace, *, apply: bool = True) -> Target:


def open_targets(args: argparse.Namespace, *, apply: bool = True) -> Iterator[Target]:
direct: bool = getattr(args, "direct", False)
direct: bool = getattr(args, "direct", False) or getattr(args, "direct_sensitive", False)
children: bool = getattr(args, "children", False)
child: str | None = getattr(args, "child", None)

targets: Iterable[Target] = (
[Target.open_direct(args.targets)]
[Target.open_direct(args.targets, getattr(args, "direct_sensitive", False))]
if direct
else Target.open_all(args.targets, include_children=children, apply=apply)
)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dependencies = [
"flow.record~=3.21.0",
"structlog",
]
dynamic = ["version"]
version = "2.30.1"

[project.urls]
homepage = "https://dissect.tools"
Expand Down
3 changes: 3 additions & 0 deletions tests/_data/loaders/direct/overlap.zip
Git LFS file not shown
31 changes: 31 additions & 0 deletions tests/loaders/test_direct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import logging
from pathlib import Path
from zipfile import ZipFile

import pytest

from dissect.target import Target
from tests._utils import absolute_path


def test_direct_overlap_warning(
tmp_path: Path, caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Assert direct raise warning in case sensitive mode if some files overlap
We must uncompress files in a temporary directory as having two files with same name
would cause issue with git on case insensitive fs.
"""
ZipFile(absolute_path("_data/loaders/direct/overlap.zip")).extractall(tmp_path)
if len(list((tmp_path / "overlap").iterdir())) < 2:
pytest.skip("Test running on an insensitive fs")
with caplog.at_level(logging.WARNING):
_ = Target.open_direct([tmp_path], case_sensitive=True)
assert (
"Direct mode used in case insensitive mode, but this will cause files overlap, "
"consider using --direct-sensitive" not in caplog.text
)
_ = Target.open_direct([tmp_path], case_sensitive=False)
assert (
"Direct mode used in case insensitive mode, but this will cause files overlap, "
"consider using --direct-sensitive" in caplog.text
)
10 changes: 9 additions & 1 deletion tests/plugins/os/windows/test_cim.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

from dissect.target.plugins.os.windows import cim
from dissect.target.plugins.os.windows.cim import ActiveScriptEventConsumerRecord, CommandLineEventConsumerRecord
from dissect.target.target import Target
from tests._utils import absolute_path

if TYPE_CHECKING:
from dissect.target.filesystem import VirtualFilesystem
from dissect.target.target import Target


def test_cim_plugin(target_win: Target, fs_win: VirtualFilesystem) -> None:
Expand All @@ -24,6 +24,14 @@ def test_cim_plugin(target_win: Target, fs_win: VirtualFilesystem) -> None:
assert len([record for record in target_win.cim() if record.filter_query]) == 3


def test_cim_direct_mode() -> None:
data_path = absolute_path("_data/plugins/os/windows/cim")
target = Target.open_direct([data_path])
records = list(target.cim.consumerbindings())

assert len(records) == 3


r"""
Result of the WMI query on the system used to generate the test data

Expand Down
13 changes: 13 additions & 0 deletions tests/tools/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,3 +495,16 @@ def test_mixed_namespace_and_regular_regression(capsys: pytest.CaptureFixture, m
"<example/descriptor hostname=None domain=None field_a='example' field_b='record'>\n"
"<example/descriptor hostname=None domain=None field_a='namespace_example' field_b='record'>\n"
) in out


def test_direct_mode(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None:
"""Asset cim plugin works in direct insensitive mode"""
with monkeypatch.context() as m:
m.setattr(
"sys.argv",
["target-query", "-f", "cim", str(absolute_path("_data/plugins/os/windows/cim/")), "--direct", "-s"],
)

target_query()
out, _ = capsys.readouterr()
assert len(out.splitlines()) == 3
Loading