|
| 1 | +"""Wrapper to restrict filestore access to a specific directory. |
| 2 | +
|
| 3 | +This class will limit the filestore access to a specific directory. |
| 4 | +All relative paths will be relative to this directory. |
| 5 | +All absolute paths will be converted to subpaths of the restricted path e.g. |
| 6 | + /tmp/file.txt -> /restricted_path/tmp/file.txt |
| 7 | +
|
| 8 | +This is not a security feature but a convenience feature to limit filestore |
| 9 | +access to a specific directory. |
| 10 | +""" |
| 11 | + |
| 12 | +from __future__ import annotations # Python 3.9 compatibility for | syntax |
| 13 | + |
| 14 | +from typing import TYPE_CHECKING |
| 15 | + |
| 16 | +from cfdppy.filestore import NativeFilestore |
| 17 | + |
| 18 | +if TYPE_CHECKING: |
| 19 | + from pathlib import Path |
| 20 | + |
| 21 | + from spacepackets.cfdp import ChecksumType, FilestoreResponseStatusCode |
| 22 | + |
| 23 | + |
| 24 | +class RestrictedFilestore(NativeFilestore): |
| 25 | + """Wrapper to restrict filestore access to a specific directory.""" |
| 26 | + |
| 27 | + def __init__(self, restricted_path: Path): |
| 28 | + """Create a new RestrictedFilestore instance. |
| 29 | +
|
| 30 | + The path is used to restrict all paths as relative to this path. |
| 31 | + Absolute paths will be converted to subpaths of the restricted path |
| 32 | + keeping the original path structure. |
| 33 | +
|
| 34 | + :param restricted_path: Path to restrict the filestore to |
| 35 | + """ |
| 36 | + super().__init__() |
| 37 | + self.restricted_path = restricted_path |
| 38 | + |
| 39 | + def __make_local(self, file: Path) -> Path: |
| 40 | + """Make file paths subfolders of the restricted path. |
| 41 | +
|
| 42 | + :param file: File to make relative to the restricted path |
| 43 | + :return: New Path |
| 44 | + """ |
| 45 | + if not file.is_relative_to(self.restricted_path): |
| 46 | + if file.is_absolute(): |
| 47 | + return self.restricted_path.joinpath(file.relative_to(file.anchor)) |
| 48 | + return self.restricted_path.joinpath(file) |
| 49 | + return file |
| 50 | + |
| 51 | + def read_data(self, file: Path, offset: int | None, read_len: int | None = None) -> bytes: |
| 52 | + """Read data from file.""" |
| 53 | + return super().read_data(self.__make_local(file), offset, read_len) |
| 54 | + |
| 55 | + def is_directory(self, path: Path) -> bool: |
| 56 | + """Check if path is a directory.""" |
| 57 | + return super().is_directory(self.__make_local(path)) |
| 58 | + |
| 59 | + def filename_from_full_path(self, path: Path) -> str | None: |
| 60 | + """Get filename from full path.""" |
| 61 | + return super().filename_from_full_path(self.__make_local(path)) |
| 62 | + |
| 63 | + def file_exists(self, path: Path) -> bool: |
| 64 | + """Check if file exists.""" |
| 65 | + return super().file_exists(self.__make_local(path)) |
| 66 | + |
| 67 | + def truncate_file(self, file: Path) -> None: |
| 68 | + """Truncate file.""" |
| 69 | + return super().truncate_file(self.__make_local(file)) |
| 70 | + |
| 71 | + def file_size(self, file: Path) -> int: |
| 72 | + """Get file size.""" |
| 73 | + return super().file_size(self.__make_local(file)) |
| 74 | + |
| 75 | + def write_data(self, file: Path, data: bytes, offset: int | None) -> None: |
| 76 | + """Write data to file.""" |
| 77 | + return super().write_data(self.__make_local(file), data, offset) |
| 78 | + |
| 79 | + def create_file(self, file: Path) -> FilestoreResponseStatusCode: |
| 80 | + """Create file.""" |
| 81 | + return super().create_file(self.__make_local(file)) |
| 82 | + |
| 83 | + def delete_file(self, file: Path) -> FilestoreResponseStatusCode: |
| 84 | + """Delete file.""" |
| 85 | + return super().delete_file(self.__make_local(file)) |
| 86 | + |
| 87 | + def rename_file(self, _old_file: Path, _new_file: Path) -> FilestoreResponseStatusCode: |
| 88 | + """Rename file.""" |
| 89 | + return super().rename_file(self.__make_local(_old_file), self.__make_local(_new_file)) |
| 90 | + |
| 91 | + def replace_file(self, _replaced_file: Path, _source_file: Path) -> FilestoreResponseStatusCode: |
| 92 | + """Replace file.""" |
| 93 | + return super().replace_file( |
| 94 | + self.__make_local(_replaced_file), self.__make_local(_source_file) |
| 95 | + ) |
| 96 | + |
| 97 | + def create_directory(self, _dir_name: Path) -> FilestoreResponseStatusCode: |
| 98 | + """Create directory.""" |
| 99 | + return super().create_directory(self.__make_local(_dir_name)) |
| 100 | + |
| 101 | + def remove_directory( |
| 102 | + self, dir_name: Path, recursive: bool = False |
| 103 | + ) -> FilestoreResponseStatusCode: |
| 104 | + """Remove directory.""" |
| 105 | + return super().remove_directory(dir_name=self.__make_local(dir_name), recursive=recursive) |
| 106 | + |
| 107 | + def list_directory( |
| 108 | + self, _dir_name: Path, _file_name: Path, _recursive: bool = False |
| 109 | + ) -> FilestoreResponseStatusCode: |
| 110 | + """List directory contents.""" |
| 111 | + return super().list_directory( |
| 112 | + self.__make_local(_dir_name), self.__make_local(_file_name), _recursive |
| 113 | + ) |
| 114 | + |
| 115 | + def calculate_checksum( |
| 116 | + self, |
| 117 | + checksum_type: ChecksumType, |
| 118 | + file_path: Path, |
| 119 | + size_to_verify: int, |
| 120 | + segment_len: int = 4096, |
| 121 | + ) -> bytes: |
| 122 | + """Calculate checksum of file. |
| 123 | +
|
| 124 | + :param checksum_type: Type of checksum |
| 125 | + :param file_path: Path to file |
| 126 | + :param size_to_verify: Size to check in bytes |
| 127 | + :param segment_len: Length of segments to calculate checksum for |
| 128 | + :return: checksum as bytes |
| 129 | + """ |
| 130 | + return super().calculate_checksum( |
| 131 | + checksum_type, self.__make_local(file_path), size_to_verify, segment_len |
| 132 | + ) |
| 133 | + |
| 134 | + def verify_checksum( |
| 135 | + self, |
| 136 | + checksum: bytes, |
| 137 | + checksum_type: ChecksumType, |
| 138 | + file_path: Path, |
| 139 | + size_to_verify: int, |
| 140 | + segment_len: int = 4096, |
| 141 | + ) -> bool: |
| 142 | + """Verify checksum of file.""" |
| 143 | + return super().verify_checksum( |
| 144 | + checksum=checksum, |
| 145 | + checksum_type=checksum_type, |
| 146 | + file_path=self.__make_local(file_path), |
| 147 | + size_to_verify=size_to_verify, |
| 148 | + segment_len=segment_len, |
| 149 | + ) |
0 commit comments