Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7a57b20
added disk serial and volume uuid
loaflover Sep 7, 2025
04c0fd7
adding tests
loaflover Sep 7, 2025
6fcd734
fixed default test failing. added ceck to see if volume doesnt exist.…
loaflover Sep 7, 2025
e6e1635
fixed some of the requested chages
loaflover Oct 17, 2025
40fec39
Apply suggestions from code review
loaflover Oct 17, 2025
05a7561
Merge branch 'feature/uuid-walkfs' of https://github.com/loaflover/di…
loaflover Oct 17, 2025
98865f4
more fixes
loaflover Oct 17, 2025
06a5541
more changes
loaflover Oct 17, 2025
a53f925
linting stuff
loaflover Oct 17, 2025
3ae2c78
added stuf
loaflover Oct 17, 2025
72ce981
added disk serial and volume uuid
loaflover Sep 7, 2025
24d4a49
adding tests
loaflover Sep 7, 2025
a7dcc1a
fixed default test failing. added ceck to see if volume doesnt exist.…
loaflover Sep 7, 2025
c22ca26
fixed some of the requested chages
loaflover Oct 17, 2025
d83e8b5
Apply suggestions from code review
loaflover Oct 17, 2025
78ffbd8
more fixes
loaflover Oct 17, 2025
c88fdc5
more changes
loaflover Oct 17, 2025
51a2986
linting stuff
loaflover Oct 17, 2025
c124db2
added stuf
loaflover Oct 17, 2025
242d243
Merge branch 'feature/uuid-walkfs' of https://github.com/loaflover/di…
loaflover Oct 17, 2025
7508485
fixed all tests??
loaflover Oct 17, 2025
63db79d
looks better idk
loaflover Oct 17, 2025
206ecc1
oops...
loaflover Oct 17, 2025
d0d1e18
added new test for new code
loaflover Oct 17, 2025
5f0ea91
linting i think
loaflover Oct 17, 2025
4feb0c8
Merge branch 'main' into feature/uuid-walkfs
Miauwkeru Oct 20, 2025
fbc84b0
applied some of that changes
loaflover Oct 20, 2025
497fdbf
Merge branch 'feature/uuid-walkfs' of https://github.com/loaflover/di…
loaflover Oct 20, 2025
4219d5e
fixed filesystems (:
loaflover Oct 20, 2025
f400fb9
oops
loaflover Oct 20, 2025
ae6d271
fixed issue with test
loaflover Oct 20, 2025
197e7b9
fixed test titles
loaflover Oct 20, 2025
675da3d
Merge remote-tracking branch 'upstream/main' into feature/uuid-walkfs
loaflover Jan 24, 2026
0cbc19d
ruff + hey yall im back
loaflover Jan 24, 2026
ced4a6d
fixes
loaflover Jan 24, 2026
8c13863
Apply suggestions from code review
loaflover Jan 24, 2026
c0fa1b1
Merge branch 'feature/uuid-walkfs' of https://github.com/loaflover/di…
loaflover Jan 24, 2026
99742b8
fixed some thingies
loaflover Jan 24, 2026
2dc8f06
fixed walkfs test (have not applied suggestions yet!)
loaflover Jan 24, 2026
f1d2f8b
ruff
loaflover Jan 24, 2026
5a739d0
Apply suggestions from code review
loaflover Jan 30, 2026
c84310f
fixed other requested change
loaflover Jan 30, 2026
d89f264
Merge branch 'feature/uuid-walkfs' of https://github.com/loaflover/di…
loaflover Jan 30, 2026
6110be9
Update dissect/target/plugins/filesystem/walkfs.py
loaflover Jan 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion dissect/target/plugins/filesystem/walkfs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from uuid import UUID

from dissect.util.ts import from_unix

from dissect.target.exceptions import FileNotFoundError, UnsupportedPluginError
from dissect.target.filesystem import FilesystemEntry, LayerFilesystemEntry
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import Plugin, arg, export
from dissect.target.plugin import Plugin, arg, export, internal

if TYPE_CHECKING:
from collections.abc import Iterator
Expand All @@ -28,6 +29,8 @@
("uint32", "uid"),
("uint32", "gid"),
("string[]", "fstypes"),
("string[]", "vuuid"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
("string[]", "vuuid"),
("string[]", "identifiers"),

Maybe an idea to change it to identifiers now, as that is what we use in the code.

("string[]", "dserial"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
("string[]", "dserial"),
("string[]", "disk_identifiers"),

maybe a bit more clear what the field is for

],
)

Expand Down Expand Up @@ -66,6 +69,53 @@ def walkfs(self, walkfs_path: str = "/") -> Iterator[FilesystemRecord]:
self.target.log.debug("", exc_info=e)
continue

@internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the @internal annotation is only used within plugins, So you can leave this out.

def get_volume_uuid(entry: FilesystemEntry) -> str:
"""
Returns the volume_uuid if it exists. otherwise, returns none

Args:
entry: :class:`FilesystemEntry` instance

Returns:
UUID as str
"""
if entry.fs.volume is None:
return None
if entry.fs.volume.guid:
return UUID(bytes_le=entry.fs.volume.guid)
elif entry.fs.__type__ == 'ntfs':
return UUID(int=entry.fs.ntfs.serial)
elif entry.fs.__type__ in ['ext2', 'ext3', 'ext4']:
return entry.fs.extfs.uuid
elif entry.fs.__type__ == 'fat':
return UUID(int=int(entry.fs.fatfs.volume_id, 16))
elif entry.fs.__type__ == 'exfat':
return UUID(int=entry.fs.exfat.vbr.volume_serial)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to create a Filesystem.uuid function. And implement these lookups inside each respective filesystem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am going over the changes and don't entirely understand this one, could you elaborate a bit on exactly what you are suggesting? thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was talking about adding a function to the Filesystem representation itself.

So in dissect/target/filesystem.py:

class Filesystem:
    ...
    def uuid(self) -> UUID | None:
        return None

And then implement it for those specific filesystems, e.g. ntfs etc:

dissect/target/filesystems/extfs.py

class ExtFilesystem(Filesystem):
    ...
    def uuid(self) -> UUID | None:
        return self.extfs.uuid

and that for all those filesystems above. Then you can replace those if statements with:
fs.uuid()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be nicer to have it as either a property or a __init__ argument?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm almost done adding it as a property. just need to make sure tests pass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will take a bit longer then expected. not only does it seem i have tio rebase (some tests fail unless i rebase even when going before my first commit, i assume something internally has changed?) but also with the uuid moving to a property ill have to totally redo the tests, as mock doesnt use them

else:
# Return None if no valid UUID or serial is found
return None


@internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal comment before was also for this one ;)

def get_disk_serial(entry: FilesystemEntry) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@internal
def get_disk_serial(entry: FilesystemEntry) -> str:
def get_disk_serial(entry: FilesystemEntry) -> str | None:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the same suggestion about changing the FilesystemEntry here.

"""
Returns the disk_serial if it exists. otherwise, returns none

Args:
entry: :class:`FilesystemEntry` instance

Returns:
serial as str
"""
if entry.fs.volume is None:
return None

if hasattr(entry.fs.volume.disk.vs, 'serial'):
return entry.fs.volume.disk.vs.serial
return None



def generate_record(target: Target, entry: FilesystemEntry) -> FilesystemRecord:
"""Generate a :class:`FilesystemRecord` from the given :class:`FilesystemEntry`.
Expand All @@ -81,8 +131,19 @@ def generate_record(target: Target, entry: FilesystemEntry) -> FilesystemRecord:

if isinstance(entry, LayerFilesystemEntry):
fs_types = [sub_entry.fs.__type__ for sub_entry in entry.entries]
volume_uuids = [
get_volume_uuid(sub_entry)
for sub_entry in entry.entries
]

disk_serials = [
get_disk_serial(sub_entry)
for sub_entry in entry.entries
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of iterating over these entries 3 times, it would be better to just do it once.

else:
fs_types = [entry.fs.__type__]
volume_uuids = [get_volume_uuid(entry)]
disk_serials = [get_disk_serial(entry)]

return FilesystemRecord(
atime=from_unix(stat.st_atime),
Expand All @@ -96,5 +157,7 @@ def generate_record(target: Target, entry: FilesystemEntry) -> FilesystemRecord:
uid=stat.st_uid,
gid=stat.st_gid,
fstypes=fs_types,
vuuid=volume_uuids,
dserial=disk_serials,
_target=target,
)
138 changes: 135 additions & 3 deletions tests/plugins/filesystem/test_walkfs.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will fail linting.

Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

from pathlib import Path
from typing import TYPE_CHECKING
from unittest.mock import Mock, MagicMock
from uuid import UUID

import pytest

from dissect.target.filesystem import VirtualFile, VirtualFilesystem
from dissect.target.filesystem import FilesystemEntry, VirtualFile, VirtualFilesystem
from dissect.target.loaders.tar import TarLoader
from dissect.target.plugins.filesystem.walkfs import WalkFSPlugin
from dissect.target.plugins.filesystem.walkfs import WalkFSPlugin, get_disk_serial, get_volume_uuid
from tests._utils import absolute_path

if TYPE_CHECKING:
Expand All @@ -17,6 +19,7 @@


def test_walkfs_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None:

fs_unix.map_file_entry("/path/to/some/file", VirtualFile(fs_unix, "file", None))
fs_unix.map_file_entry("/path/to/some/other/file.ext", VirtualFile(fs_unix, "file.ext", None))
fs_unix.map_file_entry("/root_file", VirtualFile(fs_unix, "root_file", None))
Expand All @@ -25,7 +28,7 @@ def test_walkfs_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
fs_unix.map_file_entry("/.test/.more.test.txt", VirtualFile(fs_unix, ".more.test.txt", None))

target_unix.add_plugin(WalkFSPlugin)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The linter will complain about this. So you can just run tox run -e fix to fix the fixable linting issues. Use tox run -e lint to see any other changes

results = list(target_unix.walkfs())
assert len(results) == 14
assert sorted([r.path for r in results]) == [
Expand Down Expand Up @@ -57,3 +60,132 @@ def test_benchmark_walkfs(target_bare: Target, benchmark: BenchmarkFixture) -> N
result = benchmark(lambda: next(WalkFSPlugin(target_bare).walkfs()))

assert result.path == "/"


@pytest.fixture
def mock_fs_entry():
"""Fixture to create a mock FilesystemEntry object."""
mock_entry = MagicMock(spec=FilesystemEntry)
mock_fs = Mock()
mock_volume = Mock()
mock_disk = Mock()

# Set up the mock object structure
mock_entry.fs = mock_fs
mock_fs.volume = mock_volume
mock_volume.disk = mock_disk

# Clear any attributes from previous tests
mock_volume.guid = None
mock_fs.__type__ = 'generic'
if hasattr(mock_fs, 'ntfs'): del mock_fs.ntfs
if hasattr(mock_fs, 'extfs'): del mock_fs.extfs
if hasattr(mock_fs, 'fatfs'): del mock_fs.fatfs
if hasattr(mock_fs, 'exfat'): del mock_fs.exfat
if hasattr(mock_disk.vs, 'serial'): del mock_disk.vs.serial
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason these objects have these attributes, is because it is a Mock object. These Mock objects are designed to have every attribute. Instead of deleting them, setting them to None would suffice


return mock_entry


def test_get_volume_uuid_ntfs(mock_fs_entry):
"""Test get_volume_uuid for an NTFS filesystem."""
# Mock NTFS-specific attributes
mock_fs_entry.fs.__type__ = 'ntfs'
mock_fs_entry.fs.ntfs = Mock(serial=123456789)

expected_uuid = UUID(int=123456789)
assert get_volume_uuid(mock_fs_entry) == expected_uuid


def test_get_volume_uuid_ext(mock_fs_entry):
"""Test get_volume_uuid for an EXT filesystem (ext2/3/4)."""
# Mock EXT-specific attributes
mock_fs_entry.fs.__type__ = 'ext4'
mock_fs_entry.fs.extfs = Mock(uuid="e0c3d987-a36c-4f9e-9b2f-90e633d7d7a1")

expected_uuid = "e0c3d987-a36c-4f9e-9b2f-90e633d7d7a1"
assert get_volume_uuid(mock_fs_entry) == expected_uuid


def test_get_volume_uuid_fat(mock_fs_entry):
"""Test get_volume_uuid for a FAT filesystem."""
# Mock FAT-specific attributes
mock_fs_entry.fs.__type__ = 'fat'
mock_fs_entry.fs.fatfs = Mock(volume_id="1a2b3c4d")

expected_uuid = UUID(int=0x1a2b3c4d)
assert get_volume_uuid(mock_fs_entry) == expected_uuid


def test_get_volume_uuid_exfat(mock_fs_entry):
"""Test get_volume_uuid for an ExFAT filesystem."""
# Mock ExFAT-specific attributes
mock_fs_entry.fs.__type__ = 'exfat'
mock_fs_entry.fs.exfat = Mock(vbr=Mock(volume_serial=987654321))

expected_uuid = UUID(int=987654321)
assert get_volume_uuid(mock_fs_entry) == expected_uuid


def test_get_volume_uuid_guid(mock_fs_entry):
"""Test get_volume_uuid when `volume.guid` exists (higher priority)."""
# Mock a GUID that should be returned first
mock_fs_entry.fs.volume.guid = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f'

expected_uuid = UUID(bytes_le=mock_fs_entry.fs.volume.guid)
assert get_volume_uuid(mock_fs_entry) == expected_uuid


def test_get_volume_uuid_no_match(mock_fs_entry):
"""Test get_volume_uuid when no valid UUID is found."""
# The default mock has no specific filesystem type
mock_fs_entry.fs.__type__ = 'unsupported_fs'

assert get_volume_uuid(mock_fs_entry) is None


def test_get_disk_serial(mock_fs_entry):
"""Test get_disk_serial when a serial number is available."""
# Mock the `serial` attribute on the `vs` object
mock_fs_entry.fs.volume.disk.vs = Mock(serial='A1B2C3D4')

assert get_disk_serial(mock_fs_entry) == 'A1B2C3D4'


def test_get_disk_serial_no_serial(mock_fs_entry):
"""Test get_disk_serial when the `serial` attribute is missing."""
# The default mock aparently does have the `serial` attribute on `vs`
mock_fs_entry.fs.volume.disk.vs = Mock()
if hasattr(mock_fs_entry.fs.volume.disk.vs, 'serial'):
delattr(mock_fs_entry.fs.volume.disk.vs, 'serial')
assert get_disk_serial(mock_fs_entry) is None


def test_walkfs_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is already defined at the top of this file, you can remove this one

fs_unix.map_file_entry("/path/to/some/file", VirtualFile(fs_unix, "file", None))
fs_unix.map_file_entry("/path/to/some/other/file.ext", VirtualFile(fs_unix, "file.ext", None))
fs_unix.map_file_entry("/root_file", VirtualFile(fs_unix, "root_file", None))
fs_unix.map_file_entry("/other_root_file.ext", VirtualFile(fs_unix, "other_root_file.ext", None))
fs_unix.map_file_entry("/.test/test.txt", VirtualFile(fs_unix, "test.txt", None))
fs_unix.map_file_entry("/.test/.more.test.txt", VirtualFile(fs_unix, ".more.test.txt", None))

target_unix.add_plugin(WalkFSPlugin)

results = list(target_unix.walkfs())
assert len(results) == 14
assert sorted([r.path for r in results]) == [
"/",
"/.test",
"/.test/.more.test.txt",
"/.test/test.txt",
"/etc",
"/other_root_file.ext",
"/path",
"/path/to",
"/path/to/some",
"/path/to/some/file",
"/path/to/some/other",
"/path/to/some/other/file.ext",
"/root_file",
"/var",
]