Skip to content

Commit

Permalink
Remove Mux handling and add documentation for muxes and loops
Browse files Browse the repository at this point in the history
  • Loading branch information
DiamondJoseph committed Sep 7, 2023
1 parent e88c234 commit 0869c04
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 78 deletions.
10 changes: 3 additions & 7 deletions src/tickit_devices/zebra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@ class Zebra(ComponentConfig):
"""
Simulation of a Zebra device with a TCP server for reading/setting params/muxes
(see `ZebraAdapter` for what read/set is available); Block wiring is currently
invariant while Tickit is running.
As rewiring while running is not currently supported, only those blocks that are
configured in the components configuration are instantiated, and attempting to
read or write to params/muxes that are on blocks that are not instantiated will
return 0.
invariant while Tickit is running, and only those blocks that are configured in
components are instantiated.
Configuration that is currently passed down to Block behaviour from `params`:
- For AND/OR gates N=1,2,3,4, the following (default 0) may be set
Expand All @@ -43,7 +39,7 @@ class Zebra(ComponentConfig):
components: List[AndOrBlockConfig]
host: str = "localhost"
port: int = 7012
params: dict[str, int] = Field(default_factory=_default)
params: dict[str, int] = Field(default_factory=dict)

@validator("params")
def add_defaults(cls, v: dict[str, int]) -> dict[str, int]:
Expand Down
46 changes: 39 additions & 7 deletions src/tickit_devices/zebra/_common.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union

import pydantic
from tickit.core.components.component import ComponentConfig
from tickit.core.device import Device
from tickit.core.device import Device, DeviceUpdate
from tickit.core.typedefs import SimTime
from typing_extensions import get_type_hints


@dataclass
class Param:
"""
Representation of the internal configuration of the Zebra.
Sometimes is shared state between multiple blocks.
"""

reg: int
blocks: List[str]


@dataclass
class Mux:
"""
Representation of the internal mapping of the Zebra.
e.g If AND1_INP1=Mux(0x08, ...) is set to 0x09, then
AND1_INP1 should take the same input as AND1_INP2=Mux(0x09, ...)
"""

reg: int
block: Optional[str] = None

Expand Down Expand Up @@ -197,25 +209,45 @@ class Mux:
param_types = {name: t for name, t in register_types.items() if isinstance(t, Param)}
mux_types = {name: t for name, t in register_types.items() if isinstance(t, Mux)}

Inputs = TypeVar("Inputs")
Outputs = TypeVar("Outputs")


@dataclass
class Block(Device, ABC):
class Block(Device, ABC, Generic[Inputs, Outputs]):
name: str
"""
Blocks take 20ns to propagate a signal across, so we cache the next output value
and emit it 20ns later.
"""
previous_outputs: Outputs
next_outputs: Optional[Outputs] = None
params: Optional[Dict[str, int]] = None

def update(self, time: SimTime, inputs: Inputs) -> DeviceUpdate[Outputs]:
if self.next_outputs:
self.previous_outputs = self.next_outputs
self.next_outputs = None
return DeviceUpdate(self.previous_outputs, call_at=None)
else:
self.next_outputs = self._get_next_outputs(inputs)
return DeviceUpdate(self.previous_outputs, call_at=SimTime(time + 20))

@abstractmethod
def _get_next_outputs(self, inputs: Inputs) -> Outputs:
...

@property
def num(self):
match = re.search(r"\d*$", self.name)
assert match, f"No trailing number in {self.name}"
return int(match.group())

@abstractmethod
def read_mux(self, register: str) -> int:
...
return 0

@abstractmethod
def set_mux(self, register: str, value: int) -> int:
...
return 0


@pydantic.v1.dataclasses.dataclass
Expand Down
32 changes: 5 additions & 27 deletions src/tickit_devices/zebra/and_or_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

import pydantic.v1.dataclasses
from tickit.core.components.device_simulation import DeviceSimulation
from tickit.core.device import DeviceUpdate
from tickit.core.typedefs import SimTime

from tickit_devices.zebra._common import Block, BlockConfig, extract_bit

Expand All @@ -17,8 +15,6 @@ class AndOrBlock(Block):
unless it is also inverted.
"""

last_input: "AndOrBlock.Inputs"

class Inputs(TypedDict):
INP1: bool
INP2: bool
Expand All @@ -28,40 +24,22 @@ class Inputs(TypedDict):
class Outputs(TypedDict):
OUT: bool

def __init__(self, name: str):
super().__init__(name=name, previous_outputs=self.Outputs(OUT=False))

def _get_input(self, inputs: Dict[str, bool], i: int) -> bool:
if not self.params:
raise ValueError
enabled = extract_bit(self.params, f"{self.name}_ENA", i)
inverted = extract_bit(self.params, f"{self.name}_INV", i)
return enabled & inputs.get(f"INP{i + 1}", False) ^ inverted

def read_mux(self, inp: str) -> int:
# TODO: Is there a better way of getting the input from the adapter?
return int(self.last_input[inp] if self.last_input else False) # type: ignore

def set_mux(self, register: str, value: int) -> int:
# TODO: Does this do everything it needs to?
self.update(
SimTime(0),
AndOrBlock.Inputs(
**self.last_input
or {
"INP1": False,
"INP2": False,
"INP3": False,
"INP4": False,
},
**{register: bool(value)},
),
)
return value

def update(self, time: SimTime, inputs: Inputs) -> DeviceUpdate[Outputs]:
def _get_next_outputs(self, inputs: Inputs) -> Outputs:
op = and_ if self.name.startswith("AND") else or_
get_input = partial(self._get_input, inputs)
outputs = self.Outputs(OUT=reduce(op, map(get_input, range(4))))
self.last_input = inputs
return DeviceUpdate(outputs, call_at=None) # type: ignore
return outputs


@pydantic.v1.dataclasses.dataclass
Expand Down
50 changes: 13 additions & 37 deletions src/tickit_devices/zebra/zebra.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,32 @@
import asyncio
from typing import Dict, Optional, Union
from typing import Dict, Union

from tickit.adapters.specifications import RegexCommand
from tickit.adapters.system import BaseSystemSimulationAdapter
from tickit.core.components.device_simulation import DeviceSimulation
from tickit.core.management.event_router import InverseWiring, Wiring
from tickit.core.typedefs import ComponentID

from tickit_devices.zebra._common import (
Block,
Mux,
mux_types,
param_types,
register_names,
)
from tickit_devices.zebra._common import param_types, register_names


class ZebraAdapter(BaseSystemSimulationAdapter):
_components: Dict[ComponentID, DeviceSimulation]
params: dict[str, int]
"""
Network adapter for a Zebra system simulation, which operates a TCP server for
reading and writing params (configuration of the internal blocks) and muxes
(state of the internal blocks).
reading and setting configuration of blocks and internal wiring mapping.
N.B. Reading and setting internal wiring is not currently supported,and all
"mux" related queries will return as though successful but not operate.
Attempting to set or read muxes from blocks that are not instantiated returns 0.
See documentation for the Zebra:
`https://github.com/dls-controls/zebra/blob/master/documentation/TDI-CTRL-TNO-042-Zebra-Manual.pdf`
Params that are currently supported:
Configuration that is currently supported:
- For AND/OR gates N=1,2,3,4, the following (default 0) may be set
to Σ2**M where M is each input (1,2,3,4) for which the behaviour is desired.
{AND|OR}{N}_INV: Inverts input(s) M
{AND|OR}{N}_ENA: Enables input(s) M
Muxes that are currently supported:
The following values may also be read from or set on the register of the blocks:
- For AND/OR gates N=1,2,3,4, for inputs M=1,2,3,4
{AND|OR}{N}_INP{M}
"""

def __init__(self, params: dict[str, int]):
Expand Down Expand Up @@ -82,26 +73,11 @@ async def get_reg(self, reg: str) -> bytes:
value_int = self._read_mux(reg_name)
return b"R%02X%04XOK" % (reg_int, value_int)

def _read_mux(self, reg_name: str) -> int:
block = self._get_mux_block(reg_name)
if block:
return block.read_mux(reg_name.removeprefix(f"{block.name}_"))
def _read_mux(self, reg_name: str) -> int: # type: ignore
# TODO: Support reading Muxes: map internal wiring into Mux values
return 0

def _set_mux(self, reg_name: str, value: int) -> int:
block = self._get_mux_block(reg_name)
if block:
return block.set_mux(reg_name.removeprefix(f"{block.name}_"), value)
def _set_mux(self, reg_name: str, value: int) -> int: # type: ignore
# TODO: Support setting Muxes: map Mux value into internal wiring
# TODO: Support cyclic wirings involving Zebra: AND1_OUT->AND1_INP1 is valid
return 0

def _get_mux_block(self, reg_name: str) -> Optional[Block]:
register: Mux = mux_types[reg_name]
if not register:
return None
block_name = register.block
if not block_name:
return None
block = self._components.get(ComponentID(block_name))
if not isinstance(block, Block):
return None
return block

0 comments on commit 0869c04

Please sign in to comment.