diff --git a/src/ophyd_epics_devices/panda.py b/src/ophyd_epics_devices/panda.py index 4f75bc0..21ba965 100644 --- a/src/ophyd_epics_devices/panda.py +++ b/src/ophyd_epics_devices/panda.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import atexit import re from enum import Enum @@ -92,11 +94,37 @@ class PVIEntry(TypedDict, total=False): x: str -def block_name_number(block_name: str) -> Tuple[str, int]: - m = re.match("^([a-z]+)([0-9]*)$", block_name) - assert m, f"Expected '', got '{block_name}'" - name, num = m.groups() - return name, int(num or 1) +def block_name_number(block_name: str) -> Tuple[str, Optional[int]]: + """Maps a panda block name to a block and number. + + There are exceptions to this rule; some blocks like pcap do not contain numbers. + Other blocks may contain numbers and letters, but no numbers at the end. + + Such block names will only return the block name, and not a number. + + If this function returns both a block name and number, it should be instantiated + into a device vector.""" + m = re.match("^([0-9a-z_-]*)([0-9]+)$", block_name) + if m is not None: + name, num = m.groups() + return name, int(num or 1) # just to pass type checks. + + return block_name, None + + +def _remove_inconsistent_blocks(pvi: Dict[str, PVIEntry]) -> None: + """Remove blocks from pvi information. + + This is needed because some pandas have 'pcap' and 'pcap1' blocks, which are + inconsistent with the assumption that pandas should only have a 'pcap' block, + for example. + + """ + pvi_keys = set(pvi.keys()) + for k in pvi_keys: + kn = re.sub(r"\d*$", "", k) + if kn and k != kn and kn in pvi_keys: + del pvi[k] async def pvi_get(pv: str, ctxt: Context, timeout: float = 5.0) -> Dict[str, PVIEntry]: @@ -106,6 +134,7 @@ async def pvi_get(pv: str, ctxt: Context, timeout: float = 5.0) -> Dict[str, PVI for attr_name, attr_info in pv_info.items(): result[attr_name] = PVIEntry(**attr_info) # type: ignore + _remove_inconsistent_blocks(result) return result @@ -116,8 +145,8 @@ class PandA(Device): seq: DeviceVector[SeqBlock] pcap: PcapBlock - def __init__(self, prefix: str, name: str = "") -> None: - self._init_prefix = prefix + def __init__(self, pv: str) -> None: + self._init_prefix = pv self.pvi_mapping: Dict[FrozenSet[str], Callable[..., Signal]] = { frozenset({"r", "w"}): lambda dtype, rpv, wpv: epics_signal_rw( dtype, rpv, wpv @@ -142,9 +171,9 @@ def _del_ctxt(): return PandA._ctxt - def verify_block(self, name: str, num: int): + def verify_block(self, name: str, num: Optional[int]): """Given a block name and number, return information about a block.""" - anno = get_type_hints(self).get(name) + anno = get_type_hints(self, globalns=globals()).get(name) block: Device = Device() @@ -153,11 +182,13 @@ def verify_block(self, name: str, num: int): block = type_args[0]() if type_args else anno() if not type_args: - assert num == 1, f"Only expected one {name} block, got {num}" + assert num is None, f"Only expected one {name} block, got {num}" return block - async def _make_block(self, name: str, num: int, block_pv: str, sim: bool = False): + async def _make_block( + self, name: str, num: Optional[int], block_pv: str, sim: bool = False + ): """Makes a block given a block name containing relevant signals. Loops through the signals in the block (found using type hints), if not in @@ -165,7 +196,7 @@ async def _make_block(self, name: str, num: int, block_pv: str, sim: bool = Fals """ block = self.verify_block(name, num) - field_annos = get_type_hints(block) + field_annos = get_type_hints(block, globalns=globals()) block_pvi = await pvi_get(block_pv, self.ctxt) if not sim else None # finds which fields this class actually has, e.g. delay, width... @@ -230,11 +261,18 @@ def _make_signal(self, signal_pvi: PVIEntry, dtype: Optional[Type] = None): return signal_factory(dtype, "pva://" + read_pv, "pva://" + write_pv) - def set_attribute(self, name, num, block): - anno = get_type_hints(self).get(name) + # TODO redo to set_panda_block? confusing name + def set_attribute(self, name: str, num: Optional[int], block: Device): + """Set a block on the panda. + + Need to be able to set device vectors on the panda as well, e.g. if num is not + None, need to be able to make a new device vector and start populating it... + """ + anno = get_type_hints(self, globalns=globals()).get(name) - # get_origin to see if it's a device vector. - if (anno == DeviceVector[PulseBlock]) or (anno == DeviceVector[SeqBlock]): + # if it's an annotated device vector, or it isn't but we've got a number then + # make a DeviceVector on the class + if get_origin(anno) == DeviceVector or (not anno and num is not None): self.__dict__.setdefault(name, DeviceVector())[num] = block else: setattr(self, name, block) @@ -251,7 +289,7 @@ async def connect(self, sim=False) -> None: pvi = await pvi_get(self._init_prefix + ":PVI", self.ctxt) if not sim else None hints = { attr_name: attr_type - for attr_name, attr_type in get_type_hints(self).items() + for attr_name, attr_type in get_type_hints(self, globalns=globals()).items() if not attr_name.startswith("_") } @@ -283,8 +321,9 @@ async def connect(self, sim=False) -> None: "d" ], f"Expected PandA to only contain blocks, got {entry}" else: - block = await self._make_block(block_name, 1, "sim://", sim=sim) - self.set_attribute(block_name, 1, block) + num = 1 if get_origin(hints[block_name]) == DeviceVector else None + block = await self._make_block(block_name, num, "sim://", sim=sim) + self.set_attribute(block_name, num, block) self.set_name(self.name) await super().connect(sim) diff --git a/tests/db/panda.db b/tests/db/panda.db index 3589d4f..faf4444 100644 --- a/tests/db/panda.db +++ b/tests/db/panda.db @@ -452,11 +452,11 @@ $(EXCLUDE_PCAP=) } $(EXCLUDE_PCAP=) }) $(EXCLUDE_PCAP=)} -$(INCLUDE_EXTRA_SIGNAL=#)record(ao, "$(IOC_NAME=PANDAQSRV):PCAP:ARM2") +$(INCLUDE_EXTRA_SIGNAL=#)record(ao, "$(IOC_NAME=PANDAQSRV):PCAP:NEWSIGNAL") $(INCLUDE_EXTRA_SIGNAL=#){ $(INCLUDE_EXTRA_SIGNAL=#) info(Q:group, { $(INCLUDE_EXTRA_SIGNAL=#) "$(IOC_NAME=PANDAQSRV):PCAP:PVI": { -$(INCLUDE_EXTRA_SIGNAL=#) "pvi.arm2.x": { +$(INCLUDE_EXTRA_SIGNAL=#) "pvi.newsignal.x": { $(INCLUDE_EXTRA_SIGNAL=#) "+channel": "NAME", $(INCLUDE_EXTRA_SIGNAL=#) "+type": "plain" $(INCLUDE_EXTRA_SIGNAL=#) } @@ -480,10 +480,10 @@ $(EXCLUDE_PCAP=) }) $(EXCLUDE_PCAP=)} -$(INCLUDE_EXTRA_BLOCK=#)record(ao, "$(IOC_NAME=PANDAQSRV):EXTRA:ARM") +$(INCLUDE_EXTRA_BLOCK=#)record(ao, "$(IOC_NAME=PANDAQSRV):EXTRA1:ARM") $(INCLUDE_EXTRA_BLOCK=#){ $(INCLUDE_EXTRA_BLOCK=#) info(Q:group, { -$(INCLUDE_EXTRA_BLOCK=#) "$(IOC_NAME=PANDAQSRV):EXTRA:PVI": { +$(INCLUDE_EXTRA_BLOCK=#) "$(IOC_NAME=PANDAQSRV):EXTRA1:PVI": { $(INCLUDE_EXTRA_BLOCK=#) "pvi.arm.x": { $(INCLUDE_EXTRA_BLOCK=#) "+channel": "NAME", $(INCLUDE_EXTRA_BLOCK=#) "+type": "plain" @@ -493,12 +493,38 @@ $(INCLUDE_EXTRA_BLOCK=#) }) $(INCLUDE_EXTRA_BLOCK=#)} $(INCLUDE_EXTRA_BLOCK=#) $(INCLUDE_EXTRA_BLOCK=#) -$(INCLUDE_EXTRA_BLOCK=#)record(stringin, "$(IOC_NAME=PANDAQSRV):EXTRA:_PVI") +$(INCLUDE_EXTRA_BLOCK=#)record(stringin, "$(IOC_NAME=PANDAQSRV):EXTRA1:_PVI") $(INCLUDE_EXTRA_BLOCK=#){ -$(INCLUDE_EXTRA_BLOCK=#) field(VAL, "$(IOC_NAME=PANDAQSRV):EXTRA:PVI") +$(INCLUDE_EXTRA_BLOCK=#) field(VAL, "$(IOC_NAME=PANDAQSRV):EXTRA1:PVI") $(INCLUDE_EXTRA_BLOCK=#) info(Q:group, { $(INCLUDE_EXTRA_BLOCK=#) "$(IOC_NAME=PANDAQSRV):PVI": { -$(INCLUDE_EXTRA_BLOCK=#) "pvi.extra.d": { +$(INCLUDE_EXTRA_BLOCK=#) "pvi.extra1.d": { +$(INCLUDE_EXTRA_BLOCK=#) "+channel": "VAL", +$(INCLUDE_EXTRA_BLOCK=#) "+type": "plain" +$(INCLUDE_EXTRA_BLOCK=#) } +$(INCLUDE_EXTRA_BLOCK=#) } +$(INCLUDE_EXTRA_BLOCK=#) }) +$(INCLUDE_EXTRA_BLOCK=#)} + +$(INCLUDE_EXTRA_BLOCK=#)record(ao, "$(IOC_NAME=PANDAQSRV):EXTRA2:ARM") +$(INCLUDE_EXTRA_BLOCK=#){ +$(INCLUDE_EXTRA_BLOCK=#) info(Q:group, { +$(INCLUDE_EXTRA_BLOCK=#) "$(IOC_NAME=PANDAQSRV):EXTRA2:PVI": { +$(INCLUDE_EXTRA_BLOCK=#) "pvi.arm.x": { +$(INCLUDE_EXTRA_BLOCK=#) "+channel": "NAME", +$(INCLUDE_EXTRA_BLOCK=#) "+type": "plain" +$(INCLUDE_EXTRA_BLOCK=#) } +$(INCLUDE_EXTRA_BLOCK=#) } +$(INCLUDE_EXTRA_BLOCK=#) }) +$(INCLUDE_EXTRA_BLOCK=#)} +$(INCLUDE_EXTRA_BLOCK=#) +$(INCLUDE_EXTRA_BLOCK=#) +$(INCLUDE_EXTRA_BLOCK=#)record(stringin, "$(IOC_NAME=PANDAQSRV):EXTRA2:_PVI") +$(INCLUDE_EXTRA_BLOCK=#){ +$(INCLUDE_EXTRA_BLOCK=#) field(VAL, "$(IOC_NAME=PANDAQSRV):EXTRA2:PVI") +$(INCLUDE_EXTRA_BLOCK=#) info(Q:group, { +$(INCLUDE_EXTRA_BLOCK=#) "$(IOC_NAME=PANDAQSRV):PVI": { +$(INCLUDE_EXTRA_BLOCK=#) "pvi.extra2.d": { $(INCLUDE_EXTRA_BLOCK=#) "+channel": "VAL", $(INCLUDE_EXTRA_BLOCK=#) "+type": "plain" $(INCLUDE_EXTRA_BLOCK=#) } diff --git a/tests/test_panda.py b/tests/test_panda.py index c4528e4..1ef91f9 100644 --- a/tests/test_panda.py +++ b/tests/test_panda.py @@ -1,9 +1,36 @@ """Test file specifying how we want to eventually interact with the panda...""" +import copy +from typing import Dict + import numpy as np import pytest from ophyd.v2.core import DeviceCollector -from ophyd_epics_devices.panda import PandA, SeqTable, SeqTrigger +from ophyd_epics_devices.panda import PandA, PVIEntry, SeqTable, SeqTrigger, pvi_get + + +class DummyDict: + def __init__(self, dict) -> None: + self.dict = dict + + def todict(self): + return self.dict + + +class MockPvi: + def __init__(self, pvi: Dict[str, PVIEntry]) -> None: + self.pvi = pvi + + def get(self, item: str): + return DummyDict(self.pvi) + + +class MockCtxt: + def __init__(self, pvi: Dict[str, PVIEntry]) -> None: + self.pvi = copy.copy(pvi) + + def get(self, pv: str, timeout: float = 0.0): + return MockPvi(self.pvi) @pytest.fixture @@ -20,6 +47,21 @@ def test_panda_names_correct(sim_panda: PandA): assert sim_panda.pulse[1].name == "sim_panda-pulse-1" +async def test_pvi_get_for_inconsistent_blocks(): + dummy_pvi = { + "pcap": {}, + "pcap1": {}, + "pulse1": {}, + "pulse2": {}, + "sfp3_sync_out1": {}, + "sfp3_sync_out": {}, + } + + resulting_pvi = await pvi_get("", MockCtxt(dummy_pvi)) + assert "sfp3_sync_out1" not in resulting_pvi + assert "pcap1" not in resulting_pvi + + async def test_panda_children_connected(sim_panda: PandA): # try to set and retrieve from simulated values... table = SeqTable( @@ -66,8 +108,10 @@ async def test_panda_with_extra_blocks_and_signals(pva): panda = PandA("PANDAQSRV") await panda.connect() - assert panda.extra, "extra device has not been instantiated" # type: ignore - assert panda.pcap.arm2, "extra signal not instantiated" # type: ignore + assert panda.extra # type: ignore + assert panda.extra[1] # type: ignore + assert panda.extra[2] # type: ignore + assert panda.pcap.newsignal # type: ignore async def test_panda_block_missing_signals(pva):