-
Notifications
You must be signed in to change notification settings - Fork 75
Add filesystem identifiers to walkfs #1316
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
… removed serial for the test that needs it gone. finally finished :3
|
@Schamper is there an update on this pr? no rush just wanna know whats happening 😄 |
I'm currently on leave for a little bit and this is too big a PR to review during toilet breaks, so I'll get to it when I'm back! |
alright, looking forward to it! enjoy your leave (: |
|
hey sorry to bother but is there an update? @Schamper |
| self.target.log.debug("", exc_info=e) | ||
| continue | ||
|
|
||
| @internal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the @internal annotation is only used within plugins, So you can leave this out.
|
|
||
| target_unix.add_plugin(WalkFSPlugin) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The linter will complain about this. So you can just run tox run -e fix to fix the fixable linting issues. Use tox run -e lint to see any other changes
| fs_types = [sub_entry.fs.__type__ for sub_entry in entry.entries] | ||
| volume_uuids = [ | ||
| get_volume_uuid(sub_entry) | ||
| for sub_entry in entry.entries | ||
| ] | ||
|
|
||
| disk_serials = [ | ||
| get_disk_serial(sub_entry) | ||
| for sub_entry in entry.entries | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of iterating over these entries 3 times, it would be better to just do it once.
| if hasattr(mock_fs, 'ntfs'): del mock_fs.ntfs | ||
| if hasattr(mock_fs, 'extfs'): del mock_fs.extfs | ||
| if hasattr(mock_fs, 'fatfs'): del mock_fs.fatfs | ||
| if hasattr(mock_fs, 'exfat'): del mock_fs.exfat | ||
| if hasattr(mock_disk.vs, 'serial'): del mock_disk.vs.serial |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason these objects have these attributes, is because it is a Mock object. These Mock objects are designed to have every attribute. Instead of deleting them, setting them to None would suffice
| elif entry.fs.__type__ == 'ntfs': | ||
| return UUID(int=entry.fs.ntfs.serial) | ||
| elif entry.fs.__type__ in ['ext2', 'ext3', 'ext4']: | ||
| return entry.fs.extfs.uuid | ||
| elif entry.fs.__type__ == 'fat': | ||
| return UUID(int=int(entry.fs.fatfs.volume_id, 16)) | ||
| elif entry.fs.__type__ == 'exfat': | ||
| return UUID(int=entry.fs.exfat.vbr.volume_serial) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be better to create a Filesystem.uuid function. And implement these lookups inside each respective filesystem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am going over the changes and don't entirely understand this one, could you elaborate a bit on exactly what you are suggesting? thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was talking about adding a function to the Filesystem representation itself.
So in dissect/target/filesystem.py:
class Filesystem:
...
def uuid(self) -> UUID | None:
return NoneAnd then implement it for those specific filesystems, e.g. ntfs etc:
dissect/target/filesystems/extfs.py
class ExtFilesystem(Filesystem):
...
def uuid(self) -> UUID | None:
return self.extfs.uuidand that for all those filesystems above. Then you can replace those if statements with:
fs.uuid()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be nicer to have it as either a property or a __init__ argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm almost done adding it as a property. just need to make sure tests pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will take a bit longer then expected. not only does it seem i have tio rebase (some tests fail unless i rebase even when going before my first commit, i assume something internally has changed?) but also with the uuid moving to a property ill have to totally redo the tests, as mock doesnt use them
| @internal | ||
| def get_disk_serial(entry: FilesystemEntry) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @internal | |
| def get_disk_serial(entry: FilesystemEntry) -> str: | |
| def get_disk_serial(entry: FilesystemEntry) -> str | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the same suggestion about changing the FilesystemEntry here.
| assert get_disk_serial(mock_fs_entry) is None | ||
|
|
||
|
|
||
| def test_walkfs_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is already defined at the top of this file, you can remove this one
Co-authored-by: Miauwkeru <[email protected]>
…ssect.target into feature/uuid-walkfs
… removed serial for the test that needs it gone. finally finished :3
Co-authored-by: Miauwkeru <[email protected]>
…ssect.target into feature/uuid-walkfs
|
i believe i covered all bases and then-some by the way, this pr has evolved to a point where it makes my older pr, where i added UUID to mft parser, obsolete. id like to amend the code, i assume you'd prefer a new pr for it, but if you'd like it here just say. (should be a few lines at most) |
Miauwkeru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by the way, this pr has evolved to a point where it makes my older pr, where i added UUID to mft parser, obsolete. id like to amend the code, i assume you'd prefer a new pr for it, but if you'd like it here just say. (should be a few lines at most)
Because it would be a different feature, unrelated to the changes in this code I would prefer it to be a different PR.
| self.target.log.warning("File not found: %s", entry) | ||
| self.target.log.debug("", exc_info=e) | ||
| except Exception as e: | ||
| print(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| print(e) |
| ("uint32", "uid"), | ||
| ("uint32", "gid"), | ||
| ("string[]", "fstypes"), | ||
| ("string[]", "vuuid"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ("string[]", "vuuid"), | |
| ("string[]", "identifiers"), |
Maybe an idea to change it to identifiers now, as that is what we use in the code.
| def get_disk_serial(filesystem: Filesystem) -> str | None: | ||
| """ | ||
| Returns the disk serial number if it exists; otherwise, returns None. | ||
|
|
||
| Args: | ||
| entry: :class:`Filesystem` instance | ||
|
|
||
| Returns: | ||
| Disk serial number as str, or None if not available. | ||
| """ | ||
| entry_volume = getattr(filesystem, "volume", None) | ||
| volume_vs = getattr(entry_volume, "vs", None) | ||
| return getattr(volume_vs, "serial", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering whether the disk serial is useful in this state. Mostly because serial is only defined for an MBR partition. I am discussing it internally whether to expose more serials / identifiers to the volumesystem.
As I feel it might just create a lot of visual noise right now with dserials being defined as ["None"]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We went back over it, and yes, getting the disk serial or disk identifier at this point is a bit useless as its only populated in an MBR volume system and is not populated otherwise. So it is better to remove this function all together.
If we want to add this functionality we need to make more changes to volume and volumesystem to change it
| ("uint32", "gid"), | ||
| ("string[]", "fstypes"), | ||
| ("string[]", "vuuid"), | ||
| ("string[]", "dserial"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ("string[]", "dserial"), | |
| ("string[]", "disk_identifiers"), |
maybe a bit more clear what the field is for
| def test_get_disk_serial(mock_fs_entry: MagicMock) -> None: | ||
| """Test get_disk_serial when a serial number is available.""" | ||
| # Mock the `serial` attribute on the `vs` object | ||
| mock_fs_entry.fs.volume.vs = Mock(serial="A1B2C3D4") | ||
|
|
||
| assert get_disk_serial(mock_fs_entry.fs) == "A1B2C3D4" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def test_get_disk_serial(mock_fs_entry: MagicMock) -> None: | |
| """Test get_disk_serial when a serial number is available.""" | |
| # Mock the `serial` attribute on the `vs` object | |
| mock_fs_entry.fs.volume.vs = Mock(serial="A1B2C3D4") | |
| assert get_disk_serial(mock_fs_entry.fs) == "A1B2C3D4" | |
| def test_get_disk_serial() -> None: | |
| """Test get_disk_serial when a serial number is available.""" | |
| vfs = VirtualFilesystem() | |
| # Mock the `serial` attribute on the `vs` object | |
| vfs.volume = Mock(vs=Mock(serial="A1B2C3D4")) | |
| assert get_disk_serial(vfs) == "A1B2C3D4" |
This feels a bit cleaner compared to the custom fixture
| def test_get_disk_serial_no_serial(mock_fs_entry: MagicMock) -> None: | ||
| """Test get_disk_serial when the `serial` attribute is missing.""" | ||
| # The default mock aparently does have the `serial` attribute on `vs` | ||
|
|
||
| mock_fs_entry.fs.volume.disk.vs = Mock() | ||
| if hasattr(mock_fs_entry.fs.volume.vs, "serial"): | ||
| delattr(mock_fs_entry.fs.volume.vs, "serial") | ||
| assert get_disk_serial(mock_fs_entry.fs) is None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def test_get_disk_serial_no_serial(mock_fs_entry: MagicMock) -> None: | |
| """Test get_disk_serial when the `serial` attribute is missing.""" | |
| # The default mock aparently does have the `serial` attribute on `vs` | |
| mock_fs_entry.fs.volume.disk.vs = Mock() | |
| if hasattr(mock_fs_entry.fs.volume.vs, "serial"): | |
| delattr(mock_fs_entry.fs.volume.vs, "serial") | |
| assert get_disk_serial(mock_fs_entry.fs) is None | |
| def test_get_disk_serial_no_serial() -> None: | |
| """Test get_disk_serial when the `serial` attribute is missing.""" | |
| # The default mock aparently does have the `serial` attribute on `vs` | |
| vfs = VirtualFilesystem() | |
| # An eempty object without any attributes | |
| vfs.volume = Mock(vs=object()) | |
| assert get_disk_serial(vfs) is None |
This feels a bit cleaner compared to the custom fixture
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_fs_entry() -> MagicMock: | ||
| """Fixture to create a mock FilesystemEntry object.""" | ||
| mock_entry = MagicMock(spec=FilesystemEntry) | ||
| mock_fs = Mock() | ||
| mock_volume = Mock() | ||
| mock_disk = Mock() | ||
|
|
||
| # Set up the mock object structure | ||
| mock_entry.fs = mock_fs | ||
| mock_fs.volume = mock_volume | ||
| mock_volume.disk = mock_disk | ||
|
|
||
| # Clear any attributes from previous tests | ||
| mock_volume.guid = None | ||
| mock_fs.__type__ = "generic" | ||
| for attr in ("ntfs", "extfs", "fatfs", "exfat"): | ||
| if hasattr(mock_fs, attr): | ||
| setattr(mock_fs, attr, None) | ||
| if hasattr(mock_disk.vs, "serial"): | ||
| mock_disk.vs = None | ||
|
|
||
| return mock_entry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @pytest.fixture | |
| def mock_fs_entry() -> MagicMock: | |
| """Fixture to create a mock FilesystemEntry object.""" | |
| mock_entry = MagicMock(spec=FilesystemEntry) | |
| mock_fs = Mock() | |
| mock_volume = Mock() | |
| mock_disk = Mock() | |
| # Set up the mock object structure | |
| mock_entry.fs = mock_fs | |
| mock_fs.volume = mock_volume | |
| mock_volume.disk = mock_disk | |
| # Clear any attributes from previous tests | |
| mock_volume.guid = None | |
| mock_fs.__type__ = "generic" | |
| for attr in ("ntfs", "extfs", "fatfs", "exfat"): | |
| if hasattr(mock_fs, attr): | |
| setattr(mock_fs, attr, None) | |
| if hasattr(mock_disk.vs, "serial"): | |
| mock_disk.vs = None | |
| return mock_entry |
With the other changes in this file, this can be removed
| def test_ext_identifier_no_guid() -> None: | ||
| """EXT.identifier fallback using extfs.identifier when volume.guid is None.""" | ||
| fs = ExtFilesystem(fh=absolute_path("_data/filesystems/symlink_disk.ext4").open("rb")) | ||
| fs.volume = Mock(guid=None) | ||
| fs.extfs = Mock(uuid="e0c3d987-a36c-4f9e-9b2f-90e633d7d7a1") | ||
|
|
||
| expected_uuid = "e0c3d987-a36c-4f9e-9b2f-90e633d7d7a1" | ||
| assert fs.identifier == expected_uuid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be moved to tests/filesystems/test_extfs.py
| def test_filesystem_identifier_from_volume_guid() -> None: | ||
| """Filesystem.identifier returns a string when volume.guid is set.""" | ||
| guid = "test" * 4 | ||
|
|
||
| fs = MockFilesystem() | ||
| fs.volume = Mock(guid=guid) | ||
|
|
||
| assert fs.identifier == guid | ||
|
|
||
|
|
||
| def test_filesystem_identifier_string_when_no_guid() -> None: | ||
| """Filesystem.identifier returns the volume name when volume.guid is None.""" | ||
|
|
||
| fs = MockFilesystem() | ||
| fs.volume = Mock(guid=None) | ||
| fs.volume.name = "TestVolume" | ||
|
|
||
| assert fs.identifier == "TestVolume" | ||
|
|
||
|
|
||
| def test_filesystem_identifier_string_when_no_guid_or_name() -> None: | ||
| """Filesystem.identifier returns the fs type name when volume.guid and volume name are None.""" | ||
|
|
||
| fs = MockFilesystem() | ||
| fs.volume = Mock(guid=None) | ||
| fs.volume.name = None | ||
|
|
||
| assert fs.identifier == "filesystem_test" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be moved to tests/test_filesystem.py
|
hey guys, im back after a long hiatus. had to merge because rebase was taking forever and was getting messy. also had to discard all added/changed tests due to (it seems) a test overhaul at least for this test file. will obv re-add them. |
Co-authored-by: Miauwkeru <[email protected]>
…ssect.target into feature/uuid-walkfs
|
i believe every issue has been delt with (: |
| def get_disk_serial(filesystem: Filesystem) -> str | None: | ||
| """ | ||
| Returns the disk serial number if it exists; otherwise, returns None. | ||
|
|
||
| Args: | ||
| entry: :class:`Filesystem` instance | ||
|
|
||
| Returns: | ||
| Disk serial number as str, or None if not available. | ||
| """ | ||
| entry_volume = getattr(filesystem, "volume", None) | ||
| volume_vs = getattr(entry_volume, "vs", None) | ||
| return getattr(volume_vs, "serial", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We went back over it, and yes, getting the disk serial or disk identifier at this point is a bit useless as its only populated in an MBR volume system and is not populated otherwise. So it is better to remove this function all together.
If we want to add this functionality we need to make more changes to volume and volumesystem to change it
|
|
||
| import pytest | ||
|
|
||
| from dissect.target.filesystem import VirtualFile, VirtualFilesystem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the proposed changes, the changes in this file can be removed
Co-authored-by: Miauwkeru <[email protected]>
…ssect.target into feature/uuid-walkfs
|
ill make sure everything is changed |
Co-authored-by: Miauwkeru <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change will fail linting.
| return guid | ||
| if name := getattr(self.volume, "name", None): | ||
| return name | ||
| log.error("no identifier found for filesystem of type %s", self.__type__) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| log.error("no identifier found for filesystem of type %s", self.__type__) |
|
|
||
| @cached_property | ||
| def identifier(self) -> str: | ||
| """Returns the identifier of the Filesystem.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| """Returns the identifier of the Filesystem.""" | |
| """Returns the identifier of the filesystem. | |
| Usually this is a serial or UUID, but it can also be a volume GUID or name. | |
| """ |
| return None | ||
|
|
||
| @cached_property | ||
| def serial(self) -> int | str | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def serial(self) -> int | str | None: | |
| def serial(self) -> int | str | None: | |
| """Return the serial of the filesystem if it has one.""" |
| return self.get(path).hash(algos) | ||
|
|
||
| @cached_property | ||
| def uuid(self) -> UUID | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def uuid(self) -> UUID | None: | |
| def uuid(self) -> UUID | None: | |
| """Return the UUID of the filesystem if it has one.""" |
| if name := getattr(self.volume, "name", None): | ||
| return name | ||
| log.error("no identifier found for filesystem of type %s", self.__type__) | ||
| return f"filesystem_{self.__type__}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why None? Or repr(self) as a fallback?
title basically. i did what i said i did. i also added the appropriate tests because apparently that is how you write code (BORING)
in my test files i never saw disk serial populated but since its a field i assume it is populated sometimes. hope i didnt spend a few hours on nothing.
i also made it so adding volume uuid's for new fs's very easy to do. theres like, a WHOLE function that does it, and you just need to add a new ELIF clause for your filesystem. your welcome, everyone who is gonna use this (probably only me 😢 )