Skip to content

Commit

Permalink
(#23) RawBspLump refactor & new inheritance order
Browse files Browse the repository at this point in the history
  • Loading branch information
snake-biscuits committed May 8, 2023
1 parent 8cdcbf4 commit ce3e68a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 64 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
- new barebones `__init__`
- maps bsp lumps with `from_header`
- pulled out of streams with `from_count`
- `lumps.create_RawBspLump` will decompress before mapping
- added `append`, `extend` & `insert` methods to `RawBspLump`
- `BspLump.find(attr=val)` method is now `.search()`
- removed `.find()` method from `BasicBspLump`

### Fixed
* `shared.Entities` the following silent failures are now caught by the parser
Expand Down
114 changes: 51 additions & 63 deletions bsp_tool/lumps.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,22 @@ def create_RawBspLump(stream: Stream, lump_header: LumpHeader) -> RawBspLump:
return ExternalRawBspLump.from_header(lump_header)


def create_BspLump(stream: Stream, lump_header: LumpHeader, LumpClass: object = None) -> BspLump:
def create_BasicBspLump(stream: Stream, lump_header: LumpHeader, LumpClass: object) -> BasicBspLump:
if hasattr(lump_header, "fourCC"):
stream, lump_header = decompressed(stream, lump_header)
if not hasattr(lump_header, "filename"):
return BspLump.from_header(stream, lump_header, LumpClass)
return BasicBspLump.from_header(stream, lump_header, LumpClass)
else:
return ExternalBspLump.from_header(lump_header, LumpClass)
return ExternalBasicBspLump.from_header(lump_header, LumpClass)


def create_BasicBspLump(stream: Stream, lump_header: LumpHeader, LumpClass: object) -> BasicBspLump:
def create_BspLump(stream: Stream, lump_header: LumpHeader, LumpClass: object = None) -> BspLump:
if hasattr(lump_header, "fourCC"):
stream, lump_header = decompressed(stream, lump_header)
if not hasattr(lump_header, "filename"):
return BasicBspLump.from_header(stream, lump_header, LumpClass)
return BspLump.from_header(stream, lump_header, LumpClass)
else:
return ExternalBasicBspLump.from_header(lump_header, LumpClass)
return ExternalBspLump.from_header(lump_header, LumpClass)


class RawBspLump:
Expand Down Expand Up @@ -126,13 +126,6 @@ def __getitem__(self, index: Union[int, slice]) -> bytes:
else:
raise TypeError(f"list indices must be integers or slices, not {type(index)}")

def __iadd__(self, other_bytes: bytes):
if not isinstance(other_bytes, bytes):
raise TypeError(f"can't concat {other_bytes.__class__.__name__} to bytes")
start = self._length
self._length += len(other_bytes)
self[start:] = other_bytes # slice insert; TEST!

def __setitem__(self, index: int | slice, value: Any):
"""remapping slices is allowed, but only slices"""
if isinstance(index, int):
Expand Down Expand Up @@ -162,9 +155,27 @@ def __iter__(self):
def __len__(self):
return self._length

def append(self, entry):
self._length += 1
self[-1] = entry

class BspLump(RawBspLump):
"""Dynamically reads LumpClasses from a binary stream"""
def extend(self, entries: bytes):
for entry in entries:
self.append(entry)

def insert(self, index: int, entry: Any):
self._length += 1
self[index + 1:] = self[index:]
self[index] = entry

def pop(self, index: Union[int, slice]) -> Union[int, bytes]:
out = self[index]
del self[index]
return out


class BasicBspLump(RawBspLump):
"""Dynamically reads BasicLumpClasses from a binary stream"""
stream: Stream
offset: int # position in stream where lump begins
LumpClass: object
Expand Down Expand Up @@ -222,15 +233,15 @@ def __delitem__(self, index: Union[int, slice]):

def __getitem__(self, index: Union[int, slice]):
"""Reads bytes from self.stream & returns LumpClass(es)"""
# read bytes -> struct.unpack tuples -> LumpClass
# NOTE: BspLump[index] = LumpClass(entry)
if isinstance(index, int):
index = _remap_negative_index(index, self._length)
if index in self._changes:
return self._changes[index]
else:
self.stream.seek(self.offset + (index * self._entry_size))
_tuple = struct.unpack(self.LumpClass._format, self.stream.read(self._entry_size))
return self.LumpClass.from_tuple(_tuple)
elif isinstance(index, slice): # LAZY HACK
self.stream.seek(self.offset + (index * self._entry_size))
raw_entry = struct.unpack(self.LumpClass._format, self.stream.read(self._entry_size))
# NOTE: only the following line has changed
return self.LumpClass(raw_entry[0])
elif isinstance(index, slice):
_slice = _remap_slice(index, self._length)
out = list()
for i in range(_slice.start, _slice.stop, _slice.step):
Expand All @@ -239,56 +250,29 @@ def __getitem__(self, index: Union[int, slice]):
else:
raise TypeError(f"list indices must be integers or slices, not {type(index)}")

def append(self, entry):
self._length += 1
self[-1] = entry

def extend(self, entries: bytes):
for entry in entries:
self.append(entry)

def find(self, **kwargs):
"""Returns all lump entries which have the queried values [e.g. find(x=0)]"""
out = list()
entire_lump = self[::] # load all LumpClasses
for entry in entire_lump:
if all([getattr(entry, attr) == value for attr, value in kwargs.items()]):
out.append(entry)
return out

def insert(self, index: int, entry: Any):
self._length += 1
self[index + 1:] = self[index:]
self[index] = entry

def pop(self, index: Union[int, slice]) -> Union[int, bytes]:
out = self[index]
del self[index]
return out


class BasicBspLump(BspLump):
class BspLump(BasicBspLump):
"""Dynamically reads LumpClasses from a binary stream"""
stream: Stream
offset: int # position in stream where lump begins
LumpClass: object
_changes: Dict[int, Any]
_changes: Dict[int, object]
# ^ {index: LumpClass(new_entry)}
# NOTE: there are no checks to ensure changes are the correct type or size
_entry_size: int # sizeof(LumpClass)
_length: int # number of indexable entries

def __getitem__(self, index: Union[int, slice]):
"""Reads bytes from self.stream & returns LumpClass(es)"""
# read bytes -> struct.unpack tuples -> LumpClass
# NOTE: BspLump[index] = LumpClass(entry)
if isinstance(index, int):
index = _remap_negative_index(index, self._length)
self.stream.seek(self.offset + (index * self._entry_size))
raw_entry = struct.unpack(self.LumpClass._format, self.stream.read(self._entry_size))
# NOTE: only the following line has changed
return self.LumpClass(raw_entry[0])
elif isinstance(index, slice):
if index in self._changes:
return self._changes[index]
else:
self.stream.seek(self.offset + (index * self._entry_size))
_tuple = struct.unpack(self.LumpClass._format, self.stream.read(self._entry_size))
return self.LumpClass.from_tuple(_tuple)
elif isinstance(index, slice): # LAZY HACK
_slice = _remap_slice(index, self._length)
out = list()
for i in range(_slice.start, _slice.stop, _slice.step):
Expand All @@ -297,6 +281,10 @@ def __getitem__(self, index: Union[int, slice]):
else:
raise TypeError(f"list indices must be integers or slices, not {type(index)}")

def search(self, **kwargs):
"""Returns all lump entries which have the queried values [e.g. find(x=0)]"""
return [x for x in self[::] if all([getattr(x, a) == v for a, v in kwargs.items()])]


class ExternalRawBspLump(RawBspLump):
"""Maps an open binary stream to a list-like object"""
Expand All @@ -316,15 +304,13 @@ def from_header(cls, lump_header: LumpHeader):
return out


class ExternalBspLump(BspLump):
class ExternalBasicBspLump(BasicBspLump):
"""Dynamically reads LumpClasses from a binary stream"""
stream: Stream
offset: int # position in stream where lump begins
LumpClass: object
_changes: Dict[int, object]
# ^ {index: LumpClass(new_entry)}
# NOTE: there are no checks to ensure changes are the correct type or size
_entry_size: int # sizeof(LumpClass)
_length: int # number of indexable entries

@classmethod
Expand All @@ -335,18 +321,20 @@ def from_header(cls, lump_header: LumpHeader, LumpClass: object):
return out


class ExternalBasicBspLump(BasicBspLump):
class ExternalBspLump(BspLump):
"""Dynamically reads LumpClasses from a binary stream"""
stream: Stream
offset: int # position in stream where lump begins
LumpClass: object
_changes: Dict[int, object]
# ^ {index: LumpClass(new_entry)}
# NOTE: there are no checks to ensure changes are the correct type or size
_entry_size: int # sizeof(LumpClass)
_length: int # number of indexable entries

@classmethod
def from_header(cls, lump_header: LumpHeader, LumpClass: object):
out = super().__init__(None, lump_header, LumpClass)
out = super().from_header(None, lump_header, LumpClass)
out.offset = 0
out.stream = open(lump_header.filename, "rb")
return out
Expand Down
2 changes: 1 addition & 1 deletion tests/test_lumps.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_concat(self):
header = LumpHeader_basic(offset=0, length=8)
stream = io.BytesIO(bytes([*range(8)]))
lump = lumps.RawBspLump.from_header(stream, header)
lump += bytes([*range(8, 16)])
lump.extend(bytes([*range(8, 16)]))
assert len(lump) == 16
for i, x in enumerate(lump):
assert i == x
Expand Down

0 comments on commit ce3e68a

Please sign in to comment.