-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
cb46dcd
commit e5ab5d0
Showing
5 changed files
with
291 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
- type: tickit_devices.digitelmpc.DigitelMpc | ||
name: digitelMpc | ||
inputs: {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import pydantic.v1.dataclasses | ||
from tickit.adapters.io import TcpIo | ||
from tickit.core.adapter import AdapterContainer | ||
from tickit.core.components.component import Component, ComponentConfig | ||
from tickit.core.components.device_component import DeviceComponent | ||
|
||
from .digitelmpc import DigitelMpcAdapter, DigitelMpcDevice | ||
|
||
|
||
@pydantic.v1.dataclasses.dataclass | ||
class DigitelMpc(ComponentConfig): | ||
"""DigitelMpc simulation with TCP server.""" | ||
|
||
host: str = "localhost" | ||
port: int = 25565 | ||
|
||
def __call__(self) -> Component: # noqa: D102 | ||
device = DigitelMpcDevice() | ||
adapters = [ | ||
AdapterContainer( | ||
DigitelMpcAdapter(device), | ||
TcpIo( | ||
self.host, | ||
self.port, | ||
), | ||
) | ||
] | ||
return DeviceComponent( | ||
name=self.name, | ||
device=device, | ||
adapters=adapters, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
import logging | ||
import asyncio | ||
from ctypes import c_short, c_ubyte, c_ushort | ||
from typing import Union | ||
|
||
from tickit_devices.digitelmpc.states import Status, Error, SpRelay | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
class DigitelMpcBase: | ||
"""A base class for digitelMpc logic.""" | ||
|
||
low_pressure: float = 8.2e-11 | ||
high_pressure: float = 800 | ||
set_point_on: float = 1.0e-08 | ||
set_point_off: float = 2.0e-08 | ||
max_pump_size: int = 1000 | ||
min_pump_size: int = 100 | ||
min_voltage: int = 0 | ||
max_voltage: int = 5000 | ||
|
||
def __init__(self) -> None: | ||
"""A digitelMpcBase contructor which assigns initial values""" | ||
self.current: int = 97e-9 | ||
self.pressure: int = 0 | ||
self.voltage: int = 80 | ||
self.status: int = 0 | ||
self.error_code: int = 0 | ||
self.sp_relay: int = SpRelay.ON.value | ||
self.pump_size: int = 0 | ||
self.calibration: int = 0 | ||
|
||
async def start(self) -> None: | ||
"""Starts the digitelMpc controller""" | ||
self.status = Status.STARTING.value | ||
self.error_code = Error.OK.value | ||
self.pressure = 8.0e-11 | ||
self.status = Status.RUNNING.value | ||
|
||
async def reset(self) -> None: | ||
"""Resets the digitelMpc controller""" | ||
self.__init__() | ||
self.status = Status.STARTING.value | ||
self.status = Status.RUNNING.value | ||
|
||
async def stop(self) -> None: | ||
"""Stops the digitelMpc Controller """ | ||
self.status = Status.STANDBY.value | ||
|
||
async def get_pressure(self) -> None: | ||
"""returns the current pressure of the system""" | ||
return self.pressure | ||
|
||
async def get_current(self) -> None: | ||
"""returns the current of the system""" | ||
return self.current | ||
|
||
async def get_voltage(self) -> None: | ||
"""returns the current voltage of the system""" | ||
return self.voltage | ||
|
||
async def set_pump_size(self, pump_size: int ) -> None: | ||
"""Sets the pump size""" | ||
if pump_size < self.min_pump_size or pump_size > self.max_pump_size: | ||
self.status = Status.ERROR.value | ||
else: | ||
self.pump_size = pump_size | ||
|
||
async def get_pump_size(self) -> None: | ||
"""Returns the pump size""" | ||
return self.pump_size | ||
|
||
async def set_voltage(self, voltage_value: int) -> None: | ||
"""Sets the voltage""" | ||
if voltage_value < self.min_voltage and voltage_value > self.max_voltage: | ||
self.status = Status.ERROR.value | ||
else: | ||
self.voltage = voltage_value | ||
|
||
if voltage_value < self.min_voltage: | ||
self.error_code = Error.LOW_VOLTAGE.value | ||
|
||
self.pressure = ((voltage_value)*self.high_pressure)/self.max_voltage | ||
|
||
|
||
async def get_pump_status(self) -> None: | ||
"""Gets the current status of the pump controller""" | ||
return self.status | ||
|
||
async def get_sp_relay(self) -> None: | ||
"""Gets the sp relay status""" | ||
return self.sp_relay | ||
|
||
|
||
async def set_calibration(self, calibration_value: int) -> int: | ||
|
||
self.calibration = calibration_value | ||
|
||
async def auto_restart(self, autorestart: int) -> int: | ||
|
||
pass | ||
|
||
|
||
|
||
|
||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
import asyncio | ||
|
||
from typing import TypedDict, AsyncIterable | ||
from tickit.core.device import Device, DeviceUpdate | ||
|
||
|
||
from tickit.adapters.specifications import RegexCommand | ||
from tickit.adapters.tcp import CommandAdapter | ||
from tickit.core.typedefs import SimTime | ||
|
||
from tickit_devices.digitelmpc.base import DigitelMpcBase | ||
from tickit_devices.digitelmpc.states import Status | ||
|
||
class DigitelMpcDevice(Device,DigitelMpcBase): | ||
"""Ion pump vacuum controller device.""" | ||
|
||
class Inputs(TypedDict): | ||
... | ||
|
||
class Outputs(TypedDict): | ||
pressure: float | ||
|
||
def __init__(self,) -> None: | ||
"""A digitelMpc constructor that sets up initial internal values.""" | ||
|
||
super().__init__() | ||
self.status: int = Status.RUNNING.value | ||
# self.callback_period: SimTime = SimTime(int(1e9)) | ||
|
||
def update(self, time: SimTime, inputs : Inputs) -> DeviceUpdate[Outputs]: | ||
"""The update method which changes the pressure according to set modes. | ||
Returns: | ||
DeviceUpdate[Outputs]: | ||
The produced update event which contains the value of the output | ||
pressure, and requests callback if pressure should continue to | ||
change. | ||
""" | ||
|
||
return DeviceUpdate(self.Outputs(pressure=self.pressure), None) | ||
|
||
|
||
class DigitelMpcAdapter(CommandAdapter): | ||
"""A digitelMpc TCP adapter which sends regular status packets and can set modes.""" | ||
|
||
device: DigitelMpcDevice | ||
|
||
def __init__(self, device: DigitelMpcDevice) -> None: | ||
super().__init__() | ||
self.device = device | ||
|
||
|
||
async def on_connect(self) -> AsyncIterable[bytes]: | ||
"""A method which continously yields status packets. | ||
Returns: | ||
AsyncIterable[bytes]: An asynchronous iterable of packed diitelMpc status | ||
packets. | ||
""" | ||
# while True: | ||
# await asyncio.sleep(2.0) | ||
# # await self.device | ||
# pass | ||
|
||
@RegexCommand(r"~ 01 01 22", interrupt=False, format="utf-8") | ||
async def get_model_name(self) -> None: | ||
"""A regex bytes command which gets model name """ | ||
return str("01 OK 00 DIGITEL MPCQ 0E").encode("utf-8") | ||
|
||
@RegexCommand(r"~ 01 01 37", interrupt=False, format="utf-8") | ||
async def start(self) -> None: | ||
"""A regex bytes command that initialises the controller""" | ||
await self.device.start() | ||
return str("01 OK 00 00").encode("utf-8") | ||
|
||
@RegexCommand(r"~ 01 0B 01 B4", interrupt=False, format="utf-8") | ||
async def get_pressure(self) -> None: | ||
"""A regex bytes commmand which gets the current pressure of the system""" | ||
pressure = await self.device.get_pressure() | ||
return str(f"01 OK 00 {pressure} TORR A5").encode("utf-8") | ||
|
||
@RegexCommand(r"~ 01 12 (?P<pump_size>\d+) 00", interrupt=False, format="utf-8") | ||
async def set_pump_size(self, pump_size: float) -> None: | ||
"""A regex bytes command which sets the pump size""" | ||
await self.device.set_pump_size(pump_size) | ||
return str("01 OK 00 00").encode("utf-8") | ||
|
||
@RegexCommand(r"~ 01 11 02 B5", interrupt=False, format="utf-8") | ||
async def get_pump_size(self) -> None: | ||
"""A regex btyes command which gets the current pump size""" | ||
pump_size = await self.device.get_pump_size() | ||
return str(f"01 OK 00 {pump_size} L/S A6").encode("utf-8") | ||
|
||
@RegexCommand(r"~ 01 0A 01 B3", interrupt=False, format="utf-8") | ||
async def get_current(self) -> None: | ||
"""A regex bytes commmand which gets the current pressure of the system""" | ||
current = await self.device.get_current() | ||
return str(f"01 OK 00 {current} AMPS C5").encode("utf-8") | ||
|
||
@RegexCommand(r"~ 01 0C 01 B5", interrupt=False, format="utf-8") | ||
async def get_voltage(self) -> None: | ||
"""A regex bytes command which gets the voltage""" | ||
voltage = await self.device.get_voltage() | ||
return str(f"01 OK 00 {voltage} VOLTS C").encode("utf-8") | ||
|
||
@RegexCommand(r"~ 01 22 (?P<voltage>\d+) 00", interrupt=False, format="utf-8") | ||
async def set_voltage(self, voltage: float) -> None: | ||
"""A regex bytes command which sets the pump size""" | ||
await self.device.set_voltage(voltage) | ||
return str("01 OK 00 00").encode("utf-8") | ||
|
||
@RegexCommand(r"~ 01 FF 01 CE", interrupt=False, format="utf-8") | ||
async def reset(self) -> None: | ||
"""A regex command that resets the state of the controller""" | ||
await self.device.reset() | ||
|
||
@RegexCommand(r"~ 01 0D 01 B6", interrupt=False, format="utf-8") | ||
async def get_pump_status(self)-> None: | ||
"""A regex command that gets the status of the pump""" | ||
status = await self.device.get_pump_status() | ||
return str(f"01 OK 00 0{status}").encode("utf-8") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from enum import IntEnum | ||
|
||
class Status(IntEnum): | ||
"""An enumerator for digitelMpc statuses""" | ||
STANDBY = 0 | ||
STARTING = 1 | ||
RUNNING = 2 | ||
COOLDOWN = 3 | ||
ERROR = 4 | ||
|
||
class Error(IntEnum): | ||
"""An enumerator for digitelMpc Errors""" | ||
OK = 0 | ||
TOO_MANY_CYCLES = 1 | ||
HIGH_PRESSURE = 2 | ||
HIGH_CURRENT = 3 | ||
PUMP_POWER = 4 | ||
SHORT_CIRCUIT = 5 | ||
MALFUNCTION = 6 | ||
LOW_VOLTAGE = 7 | ||
ARC_DETECT = 8 | ||
|
||
class SpRelay(IntEnum): | ||
"""An enumerator for digitelMpc SP relay mode""" | ||
ON = 0 | ||
OFF = 1 |