diff --git a/unblob/handlers/archive/_safe_tarfile.py b/unblob/handlers/archive/_safe_tarfile.py index 0ecc2e081e..95346fe256 100644 --- a/unblob/handlers/archive/_safe_tarfile.py +++ b/unblob/handlers/archive/_safe_tarfile.py @@ -76,23 +76,93 @@ def extract(self, tarinfo: tarfile.TarInfo, extract_root: Path): # noqa: C901 # prevent traversal attempts through links if tarinfo.islnk() or tarinfo.issym(): - if Path(tarinfo.linkname).is_absolute(): + rel_target = Path(tarinfo.linkname) + if rel_target.is_absolute(): + # If target is absolute, we'll rewrite to be relative to the symlink + + # Strip leading '/' to make the path relative to extract directory + rel_target = (extract_root / rel_target.relative_to("/")).relative_to( + extract_root + ) + + # Now we need to find the path from the symlink to the target + # If the symlink is in a directory like /foo/bar and the target is at /target + # we need to make rel_target ../target (relative to /foo) instead of just being /target (relative to /) + + # Let's calculate depth of the symlink itself and the depth of the target + symlink_depth = len(tarinfo.name.split("/")) + target_depth = len(rel_target.parts) + + # If the symlink is deeper than the target, we need to go up by the difference in depth + if symlink_depth > target_depth: + # We need to go up by the difference in depth + rel_target = ( + Path( + "/".join( + [".." for _ in range(symlink_depth - target_depth)] + ) + ) + / rel_target + ) + self.record_problem( tarinfo, "Absolute path as link target.", "Converted to extraction relative path.", ) - tarinfo.linkname = f"./{tarinfo.linkname}" - if not is_safe_path( - basedir=extract_root, - path=extract_root / tarinfo.linkname, - ): - self.record_problem( - tarinfo, + + # The symlink will point to our relative target (may be updated below if unsafe) + tarinfo.linkname = rel_target + logger.info( + "Link target is relative", linkname=tarinfo.linkname, name=tarinfo.name + ) + + # Resolve the link target to an absolute path + resolved_path = (extract_root / tarinfo.name).parent / rel_target + + # If the resolved path points outside of extract_root, we need to fix it! + if not is_safe_path(extract_root, resolved_path): + logger.warning( "Traversal attempt through link path.", - "Skipped.", + src=tarinfo.name, + dest=tarinfo.linkname, + basedir=extract_root, + resovled_path=resolved_path, ) - return + + for drop_count in range(len(str(rel_target).split("/"))): + new_path = ( + (extract_root / tarinfo.name).parent + / Path("/".join(["placeholder"] * drop_count)) + / rel_target + ) + resolved_path = new_path.resolve() + if str(resolved_path).startswith(str(extract_root)): + break + else: + # We didn't hit the break, we couldn't resolve the path safely + self.record_problem( + tarinfo, + "Traversal attempt through link path.", + "Skipped.", + ) + return + + # Double check that it's safe now + if not is_safe_path(extract_root, resolved_path): + self.record_problem( + tarinfo, + "Traversal attempt through link path.", + "Skipped.", + ) + return + + # Prepend placeholder directories before rel_target to get a valid path + # within extract_root. This is the relative version of resolved_path. + rel_target = Path("/".join(["placeholder"] * drop_count)) / rel_target + tarinfo.linkname = rel_target + + logger.debug("Creating symlink", points_to=resolved_path, name=tarinfo.name) target_path = extract_root / tarinfo.name # directories are special: we can not set their metadata now + they might also be already existing