diff --git a/doc/source/examples/hardware/tabor.py b/doc/source/examples/hardware/tabor.py index e80d8fc03..1926a854c 100644 --- a/doc/source/examples/hardware/tabor.py +++ b/doc/source/examples/hardware/tabor.py @@ -1,5 +1,5 @@ from qupulse.hardware.setup import HardwareSetup, PlaybackChannel, MarkerChannel -from qupulse.hardware.awgs.tabor import TaborChannelPair, TaborAWGRepresentation +from qupulse.hardware.awgs.tabor import TaborAWGRepresentation import pyvisa diff --git a/qupulse/_program/seqc.py b/qupulse/_program/seqc.py index a139710d9..7d1d4324e 100644 --- a/qupulse/_program/seqc.py +++ b/qupulse/_program/seqc.py @@ -34,6 +34,7 @@ from qupulse.utils import replace_multiple, grouper from qupulse._program.waveforms import Waveform from qupulse._program._loop import Loop + from qupulse._program.volatile import VolatileRepetitionCount, VolatileProperty from qupulse.hardware.awgs.base import ProgramEntry diff --git a/qupulse/_program/tabor.py b/qupulse/_program/tabor.py index 64002d969..2b498a450 100644 --- a/qupulse/_program/tabor.py +++ b/qupulse/_program/tabor.py @@ -8,6 +8,7 @@ import numpy as np +from qupulse.hardware.feature_awg.features import RepetitionMode from qupulse.utils.types import ChannelID, TimeType from qupulse.hardware.awgs.base import ProgramEntry from qupulse.hardware.util import get_sample_times, voltage_to_uint16 @@ -389,7 +390,7 @@ def __init__(self, voltage_transformations: Tuple[Optional[callable], Optional[callable]], sample_rate: TimeType, mode: TaborSequencing = None, - repetition_mode: str = "infinite", + repetition_mode: Union[str, RepetitionMode] = "infinite", ): if len(channels) != device_properties['chan_per_part']: raise TaborException('TaborProgram only supports {} channels'.format(device_properties['chan_per_part'])) @@ -420,7 +421,7 @@ def __init__(self, self._parsed_program = None # type: Optional[ParsedProgram] self._mode = None self._device_properties = device_properties - self._repetition_mode = repetition_mode + self._repetition_mode = RepetitionMode(repetition_mode) assert mode in (TaborSequencing.ADVANCED, TaborSequencing.SINGLE), "Invalid mode" if mode == TaborSequencing.SINGLE: diff --git a/qupulse/hardware/awgs/__init__.py b/qupulse/hardware/awgs/__init__.py index d91e1833d..6b61353bd 100644 --- a/qupulse/hardware/awgs/__init__.py +++ b/qupulse/hardware/awgs/__init__.py @@ -4,7 +4,7 @@ __all__ = ["install_requirements"] try: - from qupulse.hardware.awgs.tabor import TaborAWGRepresentation, TaborChannelPair + from qupulse.hardware.feature_awg.tabor import TaborDevice, TaborChannelTuple __all__.extend(["TaborAWGRepresentation", "TaborChannelPair"]) except ImportError: pass diff --git a/qupulse/hardware/feature_awg/base.py b/qupulse/hardware/feature_awg/base.py index d8a85d16b..b45559b9e 100644 --- a/qupulse/hardware/feature_awg/base.py +++ b/qupulse/hardware/feature_awg/base.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Optional, Collection +from typing import Optional, Collection, Union import weakref from qupulse.hardware.awgs.base import AWG @@ -11,14 +11,16 @@ class AWGDeviceFeature(Feature): """Base class for features that are used for `AWGDevice`s""" - def __init__(self): + def __init__(self, device: 'AWGDevice'): super().__init__(AWGDevice) + self._device = weakref.proxy(device) class AWGChannelFeature(Feature): """Base class for features that are used for `AWGChannel`s""" - def __init__(self): + def __init__(self, channel: Union['AWGChannel', 'AWGMarkerChannel']): super().__init__(_BaseAWGChannel) + self._channel = weakref.proxy(channel) class AWGChannelTupleFeature(Feature): diff --git a/qupulse/hardware/feature_awg/features.py b/qupulse/hardware/feature_awg/features.py index 1b82189cc..d4749fe30 100644 --- a/qupulse/hardware/feature_awg/features.py +++ b/qupulse/hardware/feature_awg/features.py @@ -4,8 +4,8 @@ from enum import Enum from qupulse._program._loop import Loop -from qupulse.hardware.feature_awg.base import AWGDeviceFeature, AWGChannelFeature, AWGChannelTupleFeature,\ - AWGChannelTuple +from qupulse.hardware.feature_awg.base import AWGDeviceFeature, AWGChannelFeature, AWGChannelTupleFeature, \ + AWGChannelTuple, AWGDevice from qupulse.utils.types import ChannelID try: @@ -24,8 +24,8 @@ class SCPI(AWGDeviceFeature, ABC): https://en.wikipedia.org/wiki/Standard_Commands_for_Programmable_Instruments """ - def __init__(self, visa: 'pyvisa.resources.MessageBasedResource'): - super().__init__() + def __init__(self, device: 'AWGDevice', visa: 'pyvisa.resources.MessageBasedResource'): + super().__init__(device) self._socket = visa def send_cmd(self, cmd_str): diff --git a/qupulse/hardware/feature_awg/tabor.py b/qupulse/hardware/feature_awg/tabor.py index 4a08ab8b1..832b38f3d 100644 --- a/qupulse/hardware/feature_awg/tabor.py +++ b/qupulse/hardware/feature_awg/tabor.py @@ -74,27 +74,25 @@ def selector(channel_tuple: "TaborChannelTuple", *args, **kwargs) -> Any: # Features class TaborSCPI(SCPI): def __init__(self, device: "TaborDevice", visa: pyvisa.resources.MessageBasedResource): - super().__init__(visa) - - self._parent = weakref.ref(device) + super().__init__(device=device, visa=visa) def send_cmd(self, cmd_str, paranoia_level=None): - for instr in self._parent().all_devices: + for instr in self._device.all_devices: instr.send_cmd(cmd_str=cmd_str, paranoia_level=paranoia_level) def send_query(self, query_str, query_mirrors=False) -> Any: if query_mirrors: - return tuple(instr.send_query(query_str) for instr in self._parent().all_devices) + return tuple(instr.send_query(query_str) for instr in self._device.all_devices) else: - return self._parent().main_instrument.send_query(query_str) + return self._device.main_instrument.send_query(query_str) def _send_cmd(self, cmd_str, paranoia_level=None) -> Any: """Overwrite send_cmd for paranoia_level > 3""" if paranoia_level is None: - paranoia_level = self._parent().paranoia_level + paranoia_level = self._device.paranoia_level if paranoia_level < 3: - self._parent().super().send_cmd(cmd_str=cmd_str, paranoia_level=paranoia_level) # pragma: no cover + self._device.super().send_cmd(cmd_str=cmd_str, paranoia_level=paranoia_level) # pragma: no cover else: cmd_str = cmd_str.rstrip() @@ -103,12 +101,12 @@ def _send_cmd(self, cmd_str, paranoia_level=None) -> Any: else: ask_str = "*OPC?; :SYST:ERR?" - *answers, opc, error_code_msg = self._parent()._visa_inst.ask(ask_str).split(";") + *answers, opc, error_code_msg = self._device._visa_inst.ask(ask_str).split(";") error_code, error_msg = error_code_msg.split(",") error_code = int(error_code) if error_code != 0: - _ = self._parent()._visa_inst.ask("*CLS; *OPC?") + _ = self._device._visa_inst.ask("*CLS; *OPC?") if error_code == -450: # query queue overflow @@ -123,8 +121,7 @@ class TaborChannelSynchronization(ChannelSynchronization): """This Feature is used to synchronise a certain ammount of channels""" def __init__(self, device: "TaborDevice"): - super().__init__() - self._parent = weakref.ref(device) + super().__init__(device=device) def synchronize_channels(self, group_size: int) -> None: """ @@ -135,21 +132,21 @@ def synchronize_channels(self, group_size: int) -> None: group_size: Number of channels per channel tuple """ if group_size == 2: - self._parent()._channel_tuples = [] - for i in range((int)(len(self._parent().channels) / group_size)): - self._parent()._channel_tuples.append( + self._device._channel_tuples = [] + for i in range(int(len(self._device.channels) / group_size)): + self._device._channel_tuples.append( TaborChannelTuple((i + 1), - self._parent(), - self._parent().channels[(i * group_size):((i * group_size) + group_size)], - self._parent().marker_channels[(i * group_size):((i * group_size) + group_size)]) + self._device, + self._device.channels[(i * group_size):((i * group_size) + group_size)], + self._device.marker_channels[(i * group_size):((i * group_size) + group_size)]) ) - self._parent()[SCPI].send_cmd(":INST:COUP:STAT OFF") + self._device[SCPI].send_cmd(":INST:COUP:STAT OFF") elif group_size == 4: - self._parent()._channel_tuples = [TaborChannelTuple(1, - self._parent(), - self._parent().channels, - self._parent().marker_channels)] - self._parent()[SCPI].send_cmd(":INST:COUP:STAT ON") + self._device._channel_tuples = [TaborChannelTuple(1, + self._device, + self._device.channels, + self._device.marker_channels)] + self._device[SCPI].send_cmd(":INST:COUP:STAT ON") else: raise TaborException("Invalid group size") @@ -158,31 +155,30 @@ class TaborDeviceControl(DeviceControl): """This feature is used for basic communication with a AWG""" def __init__(self, device: "TaborDevice"): - super().__init__() - self._parent = weakref.ref(device) + super().__init__(device=device) def reset(self) -> None: """ Resetting the whole device. A command for resetting is send to the Device, the device is initialized again and all channel tuples are cleared. """ - self._parent()[SCPI].send_cmd(":RES") - self._parent()._coupled = None + self._device[SCPI].send_cmd(":RES") + self._device._coupled = None - self._parent()._initialize() - for channel_tuple in self._parent().channel_tuples: + self._device._initialize() + for channel_tuple in self._device.channel_tuples: channel_tuple[TaborProgramManagement].clear() def trigger(self) -> None: """ This method triggers a device remotely. """ - self._parent()[SCPI].send_cmd(":TRIG") + self._device[SCPI].send_cmd(":TRIG") class TaborStatusTable(StatusTable): def __init__(self, device: "TaborDevice"): - super().__init__() + super().__init__(device=device) self._parent = device def get_status_table(self) -> Dict[str, Union[str, float, int]]: @@ -220,8 +216,8 @@ def get_status_table(self) -> Dict[str, Union[str, float, int]]: data = OrderedDict((name, []) for name, *_ in name_query_type_list) for ch in (1, 2, 3, 4): - self._parent.channels[ch - 1]._select() - self._parent.marker_channels[(ch - 1) % 2]._select() + self._device.channels[ch - 1]._select() + self._device.marker_channels[(ch - 1) % 2]._select() for name, query, dtype in name_query_type_list: data[name].append(dtype(self._parent[SCPI].send_query(query))) return data @@ -411,25 +407,24 @@ def _get_readable_device(self, simulator=True) -> tabor_control.device.TEWXAwg: # Features class TaborVoltageRange(VoltageRange): def __init__(self, channel: "TaborChannel"): - super().__init__() - self._parent = weakref.ref(channel) + super().__init__(channel=channel) @property @with_select def offset(self) -> float: """Get offset of AWG channel""" return float( - self._parent().device[SCPI].send_query(":VOLT:OFFS?".format(channel=self._parent().idn))) + self._channel.device[SCPI].send_query(":VOLT:OFFS?".format(channel=self._channel.idn))) @property @with_select def amplitude(self) -> float: """Get amplitude of AWG channel""" - coupling = self._parent().device[SCPI].send_query(":OUTP:COUP?") + coupling = self._channel.device[SCPI].send_query(":OUTP:COUP?") if coupling == "DC": - return float(self._parent().device[SCPI].send_query(":VOLT?")) + return float(self._channel.device[SCPI].send_query(":VOLT?")) elif coupling == "HV": - return float(self._parent().device[SCPI].send_query(":VOLT:HV?")) + return float(self._channel.device[SCPI].send_query(":VOLT:HV?")) else: raise TaborException("Unknown coupling: {}".format(coupling)) @@ -439,7 +434,7 @@ def amplitude_offset_handling(self) -> AmplitudeOffsetHandling: Gets the amplitude and offset handling of this channel. The amplitude-offset controls if the amplitude and offset settings are constant or if these should be optimized by the driver """ - return self._parent()._amplitude_offset_handling + return self._channel._amplitude_offset_handling @amplitude_offset_handling.setter def amplitude_offset_handling(self, amp_offs_handling: Union[AmplitudeOffsetHandling, str]) -> None: @@ -447,16 +442,15 @@ def amplitude_offset_handling(self, amp_offs_handling: Union[AmplitudeOffsetHand amp_offs_handling: See possible values at `AWGAmplitudeOffsetHandling` """ amp_offs_handling = AmplitudeOffsetHandling(AmplitudeOffsetHandling) - self._parent()._amplitude_offset_handling = amp_offs_handling + self._channel._amplitude_offset_handling = amp_offs_handling def _select(self) -> None: - self._parent()._select() + self._channel._select() class TaborActivatableChannels(ActivatableChannels): def __init__(self, channel: "TaborChannel"): - super().__init__() - self._parent = weakref.ref(channel) + super().__init__(channel=channel) @property def enabled(self) -> bool: @@ -464,22 +458,23 @@ def enabled(self) -> bool: Returns the the state a channel has at the moment. A channel is either activated or deactivated True stands for activated and false for deactivated """ - return self._parent().device[SCPI].send_query(":OUTP ?") == "ON" + return self._channel.device[SCPI].send_query(":OUTP ?") == "ON" @with_select def enable(self): """Enables the output of a certain channel""" - command_string = ":OUTP ON".format(ch_id=self._parent().idn) - self._parent().device[SCPI].send_cmd(command_string) + command_string = ":OUTP ON".format(ch_id=self._channel.idn) + self._channel.device[SCPI].send_cmd(command_string) @with_select def disable(self): """Disables the output of a certain channel""" - command_string = ":OUTP OFF".format(ch_id=self._parent().idn) - self._parent().device[SCPI].send_cmd(command_string) + command_string = ":OUTP OFF".format(ch_id=self._channel.idn) + self._channel.device[SCPI].send_cmd(command_string) def _select(self) -> None: - self._parent()._select() + self._channel._select() + # Implementation class TaborChannel(AWGChannel): @@ -529,6 +524,9 @@ def __init__(self, channel_tuple: "TaborChannelTuple"): self._idle_sequence_table = [(1, 1, 0), (1, 1, 0), (1, 1, 0)] self._trigger_source = 'BUS' + # TODO: QUESTION: is this right? - Is auto_ream the default repetition mode + self._default_repetition_mode = RepetitionMode("auto_rearm") + def get_repetition_mode(self, program_name: str) -> str: """ Returns the default repetition mode of a certain program @@ -537,7 +535,7 @@ def get_repetition_mode(self, program_name: str) -> str: """ return self._channel_tuple._known_programs[program_name].program._repetition_mode - def set_repetition_mode(self, program_name: str, repetition_mode: str) -> None: + def set_repetition_mode(self, program_name: str, repetition_mode: Union[str, RepetitionMode]) -> None: """ Changes the default repetition mode of a certain program @@ -547,14 +545,17 @@ def set_repetition_mode(self, program_name: str, repetition_mode: str) -> None: Throws: ValueError: this Exception is thrown when an invalid repetition mode is given """ - if repetition_mode in ("infinite", "once"): + cur_repetition_mode = RepetitionMode(repetition_mode) + + if cur_repetition_mode in self.supported_repetition_modes: self._channel_tuple._known_programs[program_name].program._repetition_mode = repetition_mode else: raise ValueError("{} is no vaild repetition mode".format(repetition_mode)) @property + # TODO: QUESTION: is this right? - Are these the two run modes the are supported? def supported_repetition_modes(self) -> Set[RepetitionMode]: - return {RepetitionMode.INFINITE} + return {RepetitionMode.INFINITE, RepetitionMode.AUTO_REARM} @with_configuration_guard @with_select @@ -706,25 +707,41 @@ def programs(self) -> Set[str]: return set(program.name for program in self._channel_tuple._known_programs.keys()) @with_select - def run_current_program(self) -> None: + def run_current_program(self, repetition_mode: Union[str, RepetitionMode] = None) -> None: """ This method starts running the active program + Args: + repetition_mode (Union[str, RepetitionMode]): The repetition mode with which the program is executed. + If the repetition mode is none, the default repetition mode of + program is used. + Throws: - RuntimeError: This exception is thrown if there is no active program for this device + RuntimeError: This exception is thrown if there is no active program for this device """ if (self._channel_tuple.device._is_coupled()): # channel tuple is the first channel tuple if (self._channel_tuple.device._channel_tuples[0] == self): if self._channel_tuple._current_program: - repetition_mode = self._channel_tuple._known_programs[ - self._channel_tuple._current_program].program._repetition_mode - if repetition_mode == "infinite": - self._cont_repetition_mode() - self._channel_tuple.device[SCPI].send_cmd(':TRIG', - paranoia_level=self._channel_tuple.internal_paranoia_level) + default_repetition_mode = RepetitionMode(self._channel_tuple._known_programs[ + self._channel_tuple._current_program].program._repetition_mode) + + if repetition_mode is not None: + cur_repetition_mode = RepetitionMode(repetition_mode) + + if default_repetition_mode != cur_repetition_mode: + self._change_armed_program(name=self._channel_tuple._current_program, + repetion_mode=cur_repetition_mode) + + if RepetitionMode(default_repetition_mode) == RepetitionMode.INFINITE: + self._infinite_repetition_mode() + elif RepetitionMode(default_repetition_mode) == RepetitionMode.AUTO_REARM: + self._auto_rearm_repetition_mode() else: - raise ValueError("{} is no vaild repetition mode".format(repetition_mode)) + raise ValueError("{} is no vaild repetition mode".format(default_repetition_mode)) + + self._channel_tuple.device[TaborSCPI].send_cmd(':TRIG', + paranoia_level=self._channel_tuple.internal_paranoia_level) else: raise RuntimeError("No program active") else: @@ -733,20 +750,35 @@ def run_current_program(self) -> None: else: if self._channel_tuple._current_program: - repetition_mode = self._channel_tuple._known_programs[ + default_repetition_mode = self._channel_tuple._known_programs[ self._channel_tuple._current_program].program._repetition_mode - if repetition_mode == "infinite": - self._cont_repetition_mode() - self._channel_tuple.device[SCPI].send_cmd(':TRIG', paranoia_level=self._channel_tuple.internal_paranoia_level) + + if RepetitionMode(default_repetition_mode) == RepetitionMode("infinite"): + self._infinite_repetition_mode() + elif RepetitionMode(default_repetition_mode) == RepetitionMode("auto_rearm"): + self._auto_rearm_repetition_mode() else: - raise ValueError("{} is no vaild repetition mode".format(repetition_mode)) + raise ValueError("{} is no vaild repetition mode".format(default_repetition_mode)) + + self._channel_tuple.device[TaborSCPI].send_cmd(':TRIG', + paranoia_level=self._channel_tuple.internal_paranoia_level) else: raise RuntimeError("No program active") @with_select @with_configuration_guard - def _change_armed_program(self, name: Optional[str]) -> None: + def _change_armed_program(self, name: Optional[str], repetition_mode: RepetitionMode = None) -> None: """The armed program of the channel tuple is changed to the program with the name 'name'""" + + if repetition_mode is None: + if name is not None: + repetition_mode = RepetitionMode(self._channel_tuple._known_programs[name].program._repetition_mode) + else: + repetition_mode = self.default_repetition_mode + + if repetition_mode not in self.supported_repetition_modes: + raise ValueError("{} is no vaild repetition mode".format(repetition_mode)) + if name is None: sequencer_tables = [self._idle_sequence_table] advanced_sequencer_table = [(1, 1, 0)] @@ -774,24 +806,31 @@ def _change_armed_program(self, name: Optional[str]) -> None: assert advanced_sequencer_table[0][0] == 1 sequencer_tables[1].append((1, 1, 0)) - # insert idle sequence in advanced sequence table - advanced_sequencer_table = [(1, 1, 0)] + advanced_sequencer_table + # TODO: QUESTION: Is this right? - are the things that are attached at the end correct? + if repetition_mode == RepetitionMode.INFINITE: + advanced_sequencer_table = [(1, 1, 0)] + advanced_sequencer_table + elif repetition_mode == RepetitionMode.AUTO_REARM: + advanced_sequencer_table = [(1, 1, 1)] + advanced_sequencer_table while len(advanced_sequencer_table) < self._channel_tuple.device.dev_properties["min_aseq_len"]: advanced_sequencer_table.append((1, 1, 0)) - self._channel_tuple.device[SCPI].send_cmd("SEQ:DEL:ALL", paranoia_level=self._channel_tuple.internal_paranoia_level) + # reset sequencer and advanced sequencer tables to fix bug which occurs when switching between some programs + self._channel_tuple.device[SCPI].send_cmd("SEQ:DEL:ALL", + paranoia_level=self._channel_tuple.internal_paranoia_level) self._channel_tuple._sequencer_tables = [] - self._channel_tuple.device[SCPI].send_cmd("ASEQ:DEL", paranoia_level=self._channel_tuple.internal_paranoia_level) + self._channel_tuple.device[SCPI].send_cmd("ASEQ:DEL", + paranoia_level=self._channel_tuple.internal_paranoia_level) self._channel_tuple._advanced_sequence_table = [] # download all sequence tables for i, sequencer_table in enumerate(sequencer_tables): self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL {}".format(i + 1), - paranoia_level=self._channel_tuple.internal_paranoia_level) + paranoia_level=self._channel_tuple.internal_paranoia_level) self._channel_tuple.device._download_sequencer_table(sequencer_table) self._channel_tuple._sequencer_tables = sequencer_tables - self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL 1", paranoia_level=self._channel_tuple.internal_paranoia_level) + self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL 1", + paranoia_level=self._channel_tuple.internal_paranoia_level) self._channel_tuple.device._download_adv_seq_table(advanced_sequencer_table) self._channel_tuple._advanced_sequence_table = advanced_sequencer_table @@ -816,12 +855,19 @@ def _exit_config_mode(self): self._channel_tuple._exit_config_mode() @with_select - def _cont_repetition_mode(self): - """Changes the run mode of this channel tuple to continous mode""" + def _infinite_repetition_mode(self): + """Changes the run mode of this channel tuple to infinite repetition mode""" self._channel_tuple.device[SCPI].send_cmd(f":TRIG:SOUR:ADV EXT") self._channel_tuple.device[SCPI].send_cmd( f":INIT:GATE OFF; :INIT:CONT ON; :INIT:CONT:ENAB ARM; :INIT:CONT:ENAB:SOUR {self._trigger_source}") + @with_select + def _auto_rearm_repetition_mode(self): + """Changes the run mode of this channel tuple to auto repetition mode""" + self._channel_tuple.device[SCPI].send_cmd(f":TRIG:SOUR:ADV EXT") + self._channel_tuple.device[SCPI].send_cmd( + f":INIT:GATE OFF; :INIT:CONT ON; :INIT:CONT:ENAB SELF; :INIT:CONT:ENAB:SOUR {self._trigger_source}") + class TaborVolatileParameters(VolatileParameters): def __init__(self, channel_tuple: "TaborChannelTuple", ): @@ -890,7 +936,11 @@ class TaborChannelTuple(AWGChannelTuple): def __init__(self, idn: int, device: TaborDevice, channels: Iterable["TaborChannel"], marker_channels: Iterable["TaborMarkerChannel"]): super().__init__(idn) - self._device = weakref.ref(device) + + if isinstance(device, weakref.ProxyType): + self._device = device + else: + self._device = weakref.ref(device) self._configuration_guard_count = 0 self._is_in_config_mode = False @@ -954,7 +1004,7 @@ def _select(self) -> None: @property def device(self) -> TaborDevice: """Returns the device that the channel tuple belongs to""" - return self._device() + return self._device @property def channels(self) -> Collection["TaborChannel"]: @@ -1218,7 +1268,8 @@ def cleanup(self) -> None: chunk_size = 10 for chunk_start in range(new_end, old_end, chunk_size): self.device[SCPI].send_cmd("; ".join("TRAC:DEL {}".format(i + 1) - for i in range(chunk_start, min(chunk_start + chunk_size, old_end)))) + for i in + range(chunk_start, min(chunk_start + chunk_size, old_end)))) except Exception as e: raise TaborUndefinedState("Error during cleanup. Device is in undefined state.", device=self) from e @@ -1298,8 +1349,7 @@ def _exit_config_mode(self) -> None: class TaborActivatableMarkerChannels(ActivatableChannels): def __init__(self, marker_channel: "TaborMarkerChannel"): - super().__init__() - self._parent = weakref.ref(marker_channel) + super().__init__(marker_channel) @property def enabled(self) -> bool: @@ -1307,28 +1357,28 @@ def enabled(self) -> bool: Returns the the state a marker channel has at the moment. A channel is either activated or deactivated True stands for activated and false for deactivated """ - return self._parent().device[SCPI].send_query(":MARK:STAT ?") == "ON" + return self._channel.device[SCPI].send_query(":MARK:STAT ?") == "ON" @with_select def enable(self): """Enables the output of a certain marker channel""" command_string = "SOUR:MARK:SOUR USER; :SOUR:MARK:STAT ON" command_string = command_string.format( - channel=self._parent().channel_tuple.channels[0].idn, - marker=self._parent().channel_tuple.marker_channels.index(self._parent()) + 1) - self._parent().device[SCPI].send_cmd(command_string) + channel=self._channel.channel_tuple.channels[0].idn, + marker=self._channel.channel_tuple.marker_channels.index(self._channel) + 1) + self._channel.device[SCPI].send_cmd(command_string) @with_select def disable(self): """Disable the output of a certain marker channel""" command_string = ":SOUR:MARK:SOUR USER; :SOUR:MARK:STAT OFF" command_string = command_string.format( - channel=self._parent().channel_tuple.channels[0].idn, - marker=self._parent().channel_tuple.marker_channels.index(self._parent()) + 1) - self._parent().device[SCPI].send_cmd(command_string) + channel=self._channel.channel_tuple.channels[0].idn, + marker=self._channel.channel_tuple.marker_channels.index(self._channel) + 1) + self._channel.device[SCPI].send_cmd(command_string) def _select(self) -> None: - self._parent()._select() + self._channel._select() # Implementation @@ -1367,10 +1417,12 @@ def _select(self) -> None: self.device[SCPI].send_cmd(":SOUR:MARK:SEL {marker}".format(marker=(((self.idn - 1) % 2) + 1))) +######################################################################################################################## + class TaborUndefinedState(TaborException): """ If this exception is raised the attached tabor device is in an undefined state. - It is highly recommended to call reset it.f + It is highly recommended to call reset it. """ def __init__(self, *args, device: Union[TaborDevice, TaborChannelTuple]): diff --git a/tests/hardware/awg_new_driver_base_tests.py b/tests/hardware/awg_new_driver_base_tests.py new file mode 100644 index 000000000..24bc5162e --- /dev/null +++ b/tests/hardware/awg_new_driver_base_tests.py @@ -0,0 +1,279 @@ +from typing import Callable, Iterable, Optional, Set, Tuple +import unittest +import warnings + +from qupulse import ChannelID +from qupulse._program._loop import Loop +from qupulse.hardware.feature_awg import channel_tuple_wrapper +from qupulse.hardware.feature_awg.base import AWGDevice, AWGChannel, AWGChannelTuple, AWGMarkerChannel +from qupulse.hardware.feature_awg.features import ChannelSynchronization, ProgramManagement, VoltageRange, \ + AmplitudeOffsetHandling +from qupulse.utils.types import Collection + + +######################################################################################################################## +# Example Features +######################################################################################################################## + +class TestSynchronizeChannelsFeature(ChannelSynchronization): + def __init__(self, device: "TestAWGDevice"): + super().__init__() + self._parent = device + + def synchronize_channels(self, group_size: int) -> None: + """Forwarding call to TestAWGDevice""" + self._parent.synchronize_channels(group_size) + + +class TestVoltageRangeFeature(VoltageRange): + def __init__(self, channel: "TestAWGChannel"): + super().__init__() + self._parent = channel + + @property + def offset(self) -> float: + """Get offset of TestAWGChannel""" + return self._parent._offset + + @offset.setter + def offset(self, offset: float) -> None: + """Set offset of TestAWGChannel""" + self._parent._offset = offset + + @property + def amplitude(self) -> float: + """Get amplitude of TestAWGChannel""" + return self._parent._amplitude + + @amplitude.setter + def amplitude(self, amplitude: float) -> None: + """Set amplitude of TestAWGChannel""" + self._parent._amplitude = amplitude + + @property + def amplitude_offset_handling(self) -> str: + """Get amplitude-offset-handling of TestAWGChannel""" + return self._parent._ampl_offs_handling + + @amplitude_offset_handling.setter + def amplitude_offset_handling(self, ampl_offs_handling: str) -> None: + """Set amplitude-offset-handling of TestAWGChannel""" + self._parent._ampl_offs_handling = ampl_offs_handling + + +class TestProgramManagementFeature(ProgramManagement): + def __init__(self): + super().__init__() + self._programs = {} + self._armed_program = None + + def upload(self, name: str, program: Loop, channels: Tuple[Optional[ChannelID], ...], + marker_channels: Tuple[Optional[ChannelID], ...], voltage_transformation: Tuple[Optional[Callable], ...], + force: bool = False) -> None: + if name in self._programs: + raise KeyError("Program with name \"{}\" is already on the instrument.".format(name)) + self._programs[name] = program + + def remove(self, name: str) -> None: + if self._armed_program == name: + raise RuntimeError("Cannot remove program, when it is armed.") + if name not in self._programs: + raise KeyError("Unknown program: {}".format(name)) + del self._programs[name] + + def clear(self) -> None: + if self._armed_program is not None: + raise RuntimeError("Cannot clear programs, with an armed program.") + self._programs.clear() + + def arm(self, name: Optional[str]) -> None: + self._armed_program = name + + @property + def programs(self) -> Set[str]: + return set(self._programs.keys()) + + def run_current_program(self) -> None: + if self._armed_program: + print("Run Program:", self._armed_program) + print(self.programs[self._armed_program]) + else: + print("No program armed") + + +######################################################################################################################## +# Device & Channels +######################################################################################################################## + +class TestAWGDevice(AWGDevice): + def __init__(self, name: str): + super().__init__(name) + + # Add feature to this object (self) + # During this call, the function of the feature is dynamically added to this object + self.add_feature(TestSynchronizeChannelsFeature(self)) + + self._channels = [TestAWGChannel(i, self) for i in range(8)] # 8 channels + self._channel_tuples = [] + + # Call the feature function, with the feature's signature + # Default channel synchronization with a group size of 2 + self[ChannelSynchronization].synchronize_channels(2) + + def cleanup(self) -> None: + """This will be called automatically in __del__""" + self._channels.clear() + self._channel_tuples.clear() + + @property + def channels(self) -> Collection["TestAWGChannel"]: + return self._channels + + @property + def marker_channels(self) -> Collection[AWGMarkerChannel]: + return [] + + @property + def channel_tuples(self) -> Collection["TestAWGChannelTuple"]: + return self._channel_tuples + + def synchronize_channels(self, group_size: int) -> None: + """Implementation of the feature's , but you can also call it directly""" + if group_size not in [2, 4, 8]: # Allowed group sizes + raise ValueError("Invalid group size for channel synchronization") + + self._channel_tuples.clear() + tmp_channel_tuples = [[] for i in range(len(self._channels) // group_size)] + + # Preparing the channel structure + for i, channel in enumerate(self._channels): + tmp_channel_tuples[i // group_size].append(channel) + + # Create channel tuples with its belonging channels and refer to their parent tuple + for i, tmp_channel_tuple in enumerate(tmp_channel_tuples): + channel_tuple = TestAWGChannelTuple(i, self, tmp_channel_tuple) + self._channel_tuples.append(channel_tuple) + for channel in tmp_channel_tuple: + channel._set_channel_tuple(channel_tuple) + + +class TestAWGChannelTuple(AWGChannelTuple): + def __init__(self, idn: int, device: TestAWGDevice, channels: Iterable["TestAWGChannel"]): + super().__init__(idn) + + # Add feature to this object (self) + # During this call, the function of the feature is dynamically added to this object + self.add_feature(TestProgramManagementFeature()) + + self._device = device + self._channels = tuple(channels) + self._sample_rate = 12.456 # default value + + @property + def channel_tuple_adapter(self) -> channel_tuple_wrapper: + pass + + @property + def sample_rate(self) -> float: + return self._sample_rate + + @sample_rate.setter + def sample_rate(self, sample_rate: float) -> None: + self._sample_rate = sample_rate + + @property + def device(self) -> TestAWGDevice: + return self._device + + @property + def channels(self) -> Collection["TestAWGChannel"]: + return self._channels + + @property + def marker_channels(self) -> Collection[AWGMarkerChannel]: + return [] + + +class TestAWGChannel(AWGChannel): + def __init__(self, idn: int, device: TestAWGDevice): + super().__init__(idn) + + # Add feature to this object (self) + # During this call, all functions of the feature are dynamically added to this object + self.add_feature(TestVoltageRangeFeature(self)) + + self._device = device + self._channel_tuple = None + self._offset = 0.0 + self._amplitude = 5.0 + self._ampl_offs_handling = AmplitudeOffsetHandling.IGNORE_OFFSET + + @property + def device(self) -> TestAWGDevice: + return self._device + + @property + def channel_tuple(self) -> Optional[TestAWGChannelTuple]: + return self._channel_tuple + + def _set_channel_tuple(self, channel_tuple: TestAWGChannelTuple) -> None: + self._channel_tuple = channel_tuple + + +class TestBaseClasses(unittest.TestCase): + def setUp(self): + self.device_name = "My device" + self.device = TestAWGDevice(self.device_name) + + def test_device(self): + self.assertEqual(self.device.name, self.device_name, "Invalid name for device") + self.assertEqual(len(self.device.channels), 8, "Invalid number of channels") + self.assertEqual(len(self.device.marker_channels), 0, "Invalid number of marker channels") + self.assertEqual(len(self.device.channel_tuples), 4, "Invalid default channel tuples for device") + + def test_channels(self): + for i, channel in enumerate(self.device.channels): + self.assertEqual(channel.idn, i), "Invalid channel id" + self.assertEqual(channel[VoltageRange].offset, 0, "Invalid default offset for channel {}".format(i)) + self.assertEqual(channel[VoltageRange].amplitude, 5.0, + "Invalid default amplitude for channel {}".format(i)) + + offs = -0.1 * i + ampl = 0.5 + 3 * i + channel[VoltageRange].offset = offs + channel[VoltageRange].amplitude = ampl + + self.assertEqual(channel[VoltageRange].offset, offs, "Invalid offset for channel {}".format(i)) + self.assertEqual(channel[VoltageRange].amplitude, ampl, "Invalid amplitude for channel {}".format(i)) + + def test_channel_tuples(self): + for group_size in [2, 4, 8]: + self.device[ChannelSynchronization].synchronize_channels(group_size) + + self.assertEqual(len(self.device.channel_tuples), 8 // group_size, "Invalid number of channel tuples") + + # Check if channels and channel tuples are connected right + for i, channel in enumerate(self.device.channels): + self.assertEqual(channel.channel_tuple.idn, i // group_size, + "Invalid channel tuple {} for channel {}".format(channel.channel_tuple.idn, i)) + self.assertTrue(channel in channel.channel_tuple.channels, + "Channel {} not in its parent channel tuple {}".format(i, channel.channel_tuple.idn)) + + self.assertEqual(len(self.device.channel_tuples), 1, "Invalid number of channel tuples") + + def test_error_handling(self): + with self.assertRaises(ValueError): + self.device[ChannelSynchronization].synchronize_channels(3) + + with self.assertRaises(KeyError): + self.device.add_feature(TestSynchronizeChannelsFeature(self.device)) + + with self.assertRaises(TypeError): + self.device.add_feature(TestProgramManagementFeature()) + + with self.assertRaises(TypeError): + self.device.features[ChannelSynchronization] = None + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/hardware/tabor_new_driver_clock_tests.py b/tests/hardware/tabor_new_driver_clock_tests.py new file mode 100644 index 000000000..fe3ae7a3c --- /dev/null +++ b/tests/hardware/tabor_new_driver_clock_tests.py @@ -0,0 +1,135 @@ +import unittest + + +with_alazar = True + +def get_pulse(): + from qupulse.pulses import TablePulseTemplate as TPT, SequencePulseTemplate as SPT, RepetitionPulseTemplate as RPT + + ramp = TPT(identifier='ramp', channels={'out', 'trigger'}) + ramp.add_entry(0, 'start', channel='out') + ramp.add_entry('duration', 'stop', 'linear', channel='out') + + ramp.add_entry(0, 1, channel='trigger') + ramp.add_entry('duration', 1, 'hold', channel='trigger') + + ramp.add_measurement_declaration('meas', 0, 'duration') + + base = SPT([(ramp, dict(start='min', stop='max', duration='tau/3'), dict(meas='A')), + (ramp, dict(start='max', stop='max', duration='tau/3'), dict(meas='B')), + (ramp, dict(start='max', stop='min', duration='tau/3'), dict(meas='C'))], {'min', 'max', 'tau'}) + + repeated = RPT(base, 'n') + + root = SPT([repeated, repeated, repeated], {'min', 'max', 'tau', 'n'}) + + return root + + +def get_alazar_config(): + from atsaverage import alazar + from atsaverage.config import ScanlineConfiguration, CaptureClockConfiguration, EngineTriggerConfiguration,\ + TRIGInputConfiguration, InputConfiguration + + trig_level = int((5 + 0.4) / 10. * 255) + assert 0 <= trig_level < 256 + + config = ScanlineConfiguration() + config.triggerInputConfiguration = TRIGInputConfiguration(triggerRange=alazar.TriggerRangeID.etr_5V) + config.triggerConfiguration = EngineTriggerConfiguration(triggerOperation=alazar.TriggerOperation.J, + triggerEngine1=alazar.TriggerEngine.J, + triggerSource1=alazar.TriggerSource.external, + triggerSlope1=alazar.TriggerSlope.positive, + triggerLevel1=trig_level, + triggerEngine2=alazar.TriggerEngine.K, + triggerSource2=alazar.TriggerSource.disable, + triggerSlope2=alazar.TriggerSlope.positive, + triggerLevel2=trig_level) + config.captureClockConfiguration = CaptureClockConfiguration(source=alazar.CaptureClockType.internal_clock, + samplerate=alazar.SampleRateID.rate_100MSPS) + config.inputConfiguration = 4*[InputConfiguration(input_range=alazar.InputRangeID.range_1_V)] + config.totalRecordSize = 0 + + assert config.totalRecordSize == 0 + + return config + +def get_operations(): + from atsaverage.operations import Downsample + + return [Downsample(identifier='DS_A', maskID='A'), + Downsample(identifier='DS_B', maskID='B'), + Downsample(identifier='DS_C', maskID='C'), + Downsample(identifier='DS_D', maskID='D')] + +def get_window(card): + from atsaverage.gui import ThreadedStatusWindow + window = ThreadedStatusWindow(card) + window.start() + return window + + +class TaborTests(unittest.TestCase): + @unittest.skip + def test_all(self): + from qupulse.hardware.feature_awg.tabor import TaborChannelTuple, TaborDevice + #import warnings + tawg = TaborDevice(r'USB0::0x168C::0x2184::0000216488::INSTR') + tchannelpair = TaborChannelTuple(tawg, (1, 2), 'TABOR_AB') + tawg.paranoia_level = 2 + + #warnings.simplefilter('error', Warning) + + from qupulse.hardware.setup import HardwareSetup, PlaybackChannel, MarkerChannel + hardware_setup = HardwareSetup() + + hardware_setup.set_channel('TABOR_A', PlaybackChannel(tchannelpair, 0)) + hardware_setup.set_channel('TABOR_B', PlaybackChannel(tchannelpair, 1)) + hardware_setup.set_channel('TABOR_A_MARKER', MarkerChannel(tchannelpair, 0)) + hardware_setup.set_channel('TABOR_B_MARKER', MarkerChannel(tchannelpair, 1)) + + if with_alazar: + from qupulse.hardware.dacs.alazar import AlazarCard + import atsaverage.server + + if not atsaverage.server.Server.default_instance.running: + atsaverage.server.Server.default_instance.start(key=b'guest') + + import atsaverage.core + + alazar = AlazarCard(atsaverage.core.getLocalCard(1, 1)) + alazar.register_mask_for_channel('A', 0) + alazar.register_mask_for_channel('B', 0) + alazar.register_mask_for_channel('C', 0) + alazar.config = get_alazar_config() + + alazar.register_operations('test', get_operations()) + window = get_window(atsaverage.core.getLocalCard(1, 1)) + hardware_setup.register_dac(alazar) + + repeated = get_pulse() + + from qupulse.pulses.sequencing import Sequencer + + sequencer = Sequencer() + sequencer.push(repeated, + parameters=dict(n=1000, min=-0.5, max=0.5, tau=192*3), + channel_mapping={'out': 'TABOR_A', 'trigger': 'TABOR_A_MARKER'}, + window_mapping=dict(A='A', B='B', C='C')) + instruction_block = sequencer.build() + + hardware_setup.register_program('test', instruction_block) + + if with_alazar: + from atsaverage.masks import PeriodicMask + m = PeriodicMask() + m.identifier = 'D' + m.begin = 0 + m.end = 1 + m.period = 1 + m.channel = 0 + alazar._registered_programs['test'].masks.append(m) + + hardware_setup.arm_program('test') + + d = 1 diff --git a/tests/hardware/tabor_new_driver_dummy_based_tests.py b/tests/hardware/tabor_new_driver_dummy_based_tests.py new file mode 100644 index 000000000..2ea52ab36 --- /dev/null +++ b/tests/hardware/tabor_new_driver_dummy_based_tests.py @@ -0,0 +1,809 @@ +import sys +import unittest +from unittest import mock +from unittest.mock import patch, MagicMock + +from typing import List, Tuple, Optional, Any +from copy import copy, deepcopy + +import numpy as np + +from qupulse.hardware.awgs.base import AWGAmplitudeOffsetHandling +from qupulse.hardware.feature_awg.tabor import TaborProgram, TaborProgramMemory +from qupulse.utils.types import TimeType +from qupulse._program.tabor import TableDescription, TimeType, TableEntry +from tests.hardware.dummy_modules import import_package + + +class DummyTaborProgramClass: + def __init__(self, segments=None, segment_lengths=None, + sequencer_tables=None, advanced_sequencer_table=None, waveform_mode=None): + self.program = None + self.device_properties = None + self.channels = None + self.markers = None + + self.segment_lengths = segment_lengths + self.segments = segments + + self.sequencer_tables = sequencer_tables + self.advanced_sequencer_table = advanced_sequencer_table + self.waveform_mode = waveform_mode + + self.created = [] + + def __call__(self, program, device_properties, channels, markers): + self.program = program + self.device_properties = device_properties + self.channels = channels + self.markers = markers + + class DummyTaborProgram: + def __init__(self, class_obj: DummyTaborProgramClass): + self.sampled_segments_calls = [] + self.class_obj = class_obj + self.waveform_mode = class_obj.waveform_mode + def sampled_segments(self, sample_rate, voltage_amplitude, voltage_offset, voltage_transformation): + self.sampled_segments_calls.append((sample_rate, voltage_amplitude, voltage_offset, voltage_transformation)) + return self.class_obj.segments, self.class_obj.segment_lengths + def get_sequencer_tables(self): + return self.class_obj.sequencer_tables + def get_advanced_sequencer_table(self): + return self.class_obj.advanced_sequencer_table + self.created.append(DummyTaborProgram(self)) + return self.created[-1] + + +class TaborDummyBasedTest(unittest.TestCase): + to_unload = ['pytabor', 'pyvisa', 'visa', 'teawg', 'qupulse', 'tests.pulses.sequencing_dummies'] + backup_modules = dict() + + @classmethod + def unload_package(cls, package_name): + modules_to_delete = [module_name for module_name in sys.modules if module_name.startswith(package_name)] + + for module_name in modules_to_delete: + del sys.modules[module_name] + + @classmethod + def backup_package(cls, package_name): + cls.backup_modules[package_name] = [(module_name, module) + for module_name, module in sys.modules.items() + if module_name.startswith(package_name)] + + @classmethod + def restore_packages(cls): + for package, module_list in cls.backup_modules.items(): + for module_name, module in module_list: + sys.modules[module_name] = module + + @classmethod + def setUpClass(cls): + for u in cls.to_unload: + cls.backup_package(u) + + for u in cls.to_unload: + cls.unload_package(u) + + import_package('pytabor') + import_package('pyvisa') + import_package('teawg') + + @classmethod + def tearDownClass(cls): + for u in cls.to_unload: + cls.unload_package(u) + + cls.restore_packages() + + def setUp(self): + from qupulse.hardware.awgs.tabor import TaborAWGRepresentation + self.instrument = TaborAWGRepresentation('main_instrument', + reset=True, + paranoia_level=2, + mirror_addresses=['mirror_instrument']) + self.instrument.main_instrument.visa_inst.answers[':OUTP:COUP'] = 'DC' + self.instrument.main_instrument.visa_inst.answers[':VOLT'] = '1.0' + self.instrument.main_instrument.visa_inst.answers[':FREQ:RAST'] = '1e9' + self.instrument.main_instrument.visa_inst.answers[':VOLT:HV'] = '0.7' + + @property + def awg_representation(self): + return self.instrument + + @property + def channel_pair(self): + return self.awg_representation.channel_pair_AB + + def reset_instrument_logs(self): + for device in self.instrument.all_devices: + device.logged_commands = [] + device._send_binary_data_calls = [] + device._download_adv_seq_table_calls = [] + device._download_sequencer_table_calls = [] + + def assertAllCommandLogsEqual(self, expected_log: List): + for device in self.instrument.all_devices: + self.assertEqual(device.logged_commands, expected_log) + + +class TaborAWGRepresentationDummyBasedTests(TaborDummyBasedTest): + def test_send_cmd(self): + self.reset_instrument_logs() + + self.instrument.send_cmd('bleh', paranoia_level=3) + + self.assertAllCommandLogsEqual([((), dict(paranoia_level=3, cmd_str='bleh'))]) + + self.instrument.send_cmd('bleho') + self.assertAllCommandLogsEqual([((), dict(paranoia_level=3, cmd_str='bleh')), + ((), dict(cmd_str='bleho', paranoia_level=None))]) + + def test_trigger(self): + self.reset_instrument_logs() + self.instrument.trigger() + + self.assertAllCommandLogsEqual([((), dict(cmd_str=':TRIG', paranoia_level=None))]) + + def test_paranoia_level(self): + self.assertEqual(self.instrument.paranoia_level, self.instrument.main_instrument.paranoia_level) + self.instrument.paranoia_level = 30 + for device in self.instrument.all_devices: + self.assertEqual(device.paranoia_level, 30) + + def test_enable(self): + self.reset_instrument_logs() + self.instrument.enable() + + expected_commands = [':ENAB'] + expected_log = [((), dict(cmd_str=cmd, paranoia_level=None)) + for cmd in expected_commands] + self.assertAllCommandLogsEqual(expected_log) + + +class TaborChannelPairTests(TaborDummyBasedTest): + @staticmethod + def to_new_sequencer_tables(sequencer_tables: List[List[Tuple[int, int, int]]] + ) -> List[List[Tuple[TableDescription, Optional[Any]]]]: + return [[(TableDescription(*entry), None) for entry in sequencer_table] + for sequencer_table in sequencer_tables] + + @staticmethod + def to_new_advanced_sequencer_table(advanced_sequencer_table: List[Tuple[int, int, int]]) -> List[TableDescription]: + return [TableDescription(*entry) for entry in advanced_sequencer_table] + + @classmethod + def setUpClass(cls): + super().setUpClass() + + from qupulse.hardware.awgs.tabor import TaborChannelPair, TaborProgramMemory, TaborSegment, TaborSequencing + from qupulse.pulses.table_pulse_template import TableWaveform + from qupulse.pulses.interpolation import HoldInterpolationStrategy + from qupulse._program._loop import Loop + + from tests.pulses.sequencing_dummies import DummyWaveform + + from qupulse._program.tabor import make_combined_wave + + cls.DummyWaveform = DummyWaveform + cls.TaborChannelPair = TaborChannelPair + cls.TaborProgramMemory = TaborProgramMemory + cls.TableWaveform = TableWaveform + cls.HoldInterpolationStrategy = HoldInterpolationStrategy + cls.Loop = Loop + cls.TaborSegment = TaborSegment + cls.make_combined_wave = staticmethod(make_combined_wave) + cls.TaborSequencing = TaborSequencing + + def setUp(self): + super().setUp() + + def test__execute_multiple_commands_with_config_guard(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + given_commands = [':ASEQ:DEF 2,2,5,0', ':SEQ:SEL 2', ':SEQ:DEF 1,2,10,0'] + expected_command = ':ASEQ:DEF 2,2,5,0;:SEQ:SEL 2;:SEQ:DEF 1,2,10,0' + with mock.patch.object(channel_pair.device, 'send_cmd') as send_cmd: + channel_pair._execute_multiple_commands_with_config_guard(given_commands) + send_cmd.assert_called_once_with(expected_command, paranoia_level=channel_pair.internal_paranoia_level) + + def test_set_volatile_parameters(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + parameters = {'var': 2} + modifications = {1: TableEntry(repetition_count=5, element_number=1, jump_flag=0), + (0, 1): TableDescription(repetition_count=10, element_id=0, jump_flag=0)} + invalid_modification = {1: TableEntry(repetition_count=0, element_number=1, jump_flag=0)} + no_modifications = {} + + program_mock = mock.Mock(TaborProgram) + program_memory = TaborProgramMemory(waveform_to_segment=np.array([1, 4]), program=program_mock) + + expected_commands = {':ASEQ:DEF 2,2,5,0', ':SEQ:SEL 2', ':SEQ:DEF 1,2,10,0'} + + channel_pair._known_programs['active_program'] = program_memory + channel_pair._known_programs['other_program'] = program_memory + channel_pair._current_program = 'active_program' + + with mock.patch.object(program_mock, 'update_volatile_parameters', return_value=modifications) as update_prog: + with mock.patch.object(channel_pair, '_execute_multiple_commands_with_config_guard') as ex_com: + with mock.patch.object(channel_pair.device.main_instrument._visa_inst, 'query'): + channel_pair.set_volatile_parameters('other_program', parameters) + ex_com.assert_not_called() + update_prog.assert_called_once_with(parameters) + + channel_pair.set_volatile_parameters('active_program', parameters) + self.assertEqual(1, ex_com.call_count) + actual_commands, = ex_com.call_args[0] + self.assertEqual(expected_commands, set(actual_commands)) + self.assertEqual(len(expected_commands), len(actual_commands)) + + assert update_prog.call_count == 2 + update_prog.assert_called_with(parameters) + + with mock.patch.object(program_mock, 'update_volatile_parameters', return_value=no_modifications) as update_prog: + with mock.patch.object(channel_pair, '_execute_multiple_commands_with_config_guard') as ex_com: + channel_pair.set_volatile_parameters('active_program', parameters) + + ex_com.assert_not_called() + update_prog.assert_called_once_with(parameters) + + with mock.patch.object(program_mock, 'update_volatile_parameters', return_value=invalid_modification) as update_prog: + with mock.patch.object(channel_pair, '_execute_multiple_commands_with_config_guard') as ex_com: + with self.assertRaises(ValueError): + channel_pair.set_volatile_parameters('active_program', parameters) + + ex_com.assert_not_called() + update_prog.assert_called_once_with(parameters) + + def test_copy(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + with self.assertRaises(NotImplementedError): + copy(channel_pair) + with self.assertRaises(NotImplementedError): + deepcopy(channel_pair) + + def test_init(self): + with self.assertRaises(ValueError): + self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 3)) + + def test_free_program(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + with self.assertRaises(KeyError): + channel_pair.free_program('test') + + program = self.TaborProgramMemory(np.array([1, 2], dtype=np.int64), None) + + channel_pair._segment_references = np.array([1, 3, 1, 0]) + channel_pair._known_programs['test'] = program + self.assertIs(channel_pair.free_program('test'), program) + + np.testing.assert_equal(channel_pair._segment_references, np.array([1, 2, 0, 0])) + + def test_upload_exceptions(self): + + wv = self.TableWaveform(1, [(0, 0.1, self.HoldInterpolationStrategy()), + (192, 0.1, self.HoldInterpolationStrategy())]) + + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + program = self.Loop(waveform=wv) + with self.assertRaises(ValueError): + channel_pair.upload('test', program, (1, 2, 3), (5, 6), (lambda x: x, lambda x: x)) + with self.assertRaises(ValueError): + channel_pair.upload('test', program, (1, 2), (5, 6, 'a'), (lambda x: x, lambda x: x)) + with self.assertRaises(ValueError): + channel_pair.upload('test', program, (1, 2), (3, 4), (lambda x: x,)) + + old = channel_pair._amplitude_offset_handling + with self.assertRaises(ValueError): + channel_pair._amplitude_offset_handling = 'invalid' + channel_pair.upload('test', program, (1, None), (None, None), (lambda x: x, lambda x: x)) + channel_pair._amplitude_offset_handling = old + + channel_pair._known_programs['test'] = self.TaborProgramMemory(np.array([0]), None) + with self.assertRaises(ValueError): + channel_pair.upload('test', program, (1, 2), (3, 4), (lambda x: x, lambda x: x)) + + def test_upload(self): + segments = np.array([1, 2, 3, 4, 5]) + segment_lengths = np.array([0, 16, 0, 16, 0], dtype=np.uint16).tolist() + + segment_references = np.array([1, 1, 2, 0, 1], dtype=np.uint32) + + w2s = np.array([-1, -1, 1, 2, -1], dtype=np.int64) + ta = np.array([True, False, False, False, True]) + ti = np.array([-1, 3, -1, -1, -1]) + + channels = (1, None) + markers = (None, None) + voltage_transformations = (lambda x: x, lambda x: x) + sample_rate = TimeType.from_fraction(1, 1) + + with mock.patch('qupulse.hardware.awgs.tabor.TaborProgram', specs=TaborProgram) as DummyTaborProgram: + tabor_program = DummyTaborProgram.return_value + tabor_program.get_sampled_segments.return_value = (segments, segment_lengths) + + program = self.Loop(waveform=self.DummyWaveform(duration=192)) + + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + channel_pair._segment_references = segment_references + + def dummy_find_place(segments_, segement_lengths_): + self.assertIs(segments_, segments) + self.assertIs(segment_lengths, segement_lengths_) + return w2s, ta, ti + + def dummy_upload_segment(segment_index, segment): + self.assertEqual(segment_index, 3) + self.assertEqual(segment, 2) + + def dummy_amend_segments(segments_): + np.testing.assert_equal(segments_, np.array([1, 5])) + return np.array([5, 6], dtype=np.int64) + + channel_pair._find_place_for_segments_in_memory = dummy_find_place + channel_pair._upload_segment = dummy_upload_segment + channel_pair._amend_segments = dummy_amend_segments + + channel_pair.upload('test', program, channels, markers, voltage_transformations) + + DummyTaborProgram.assert_called_once_with( + program, + channels=tuple(channels), + markers=markers, + device_properties=channel_pair.device.dev_properties, + sample_rate=sample_rate, + amplitudes=(.5, .5), + offsets=(0., 0.), + voltage_transformations=voltage_transformations + ) + + # the other references are increased in amend and upload segment method + np.testing.assert_equal(channel_pair._segment_references, np.array([1, 2, 3, 0, 1])) + + self.assertEqual(len(channel_pair._known_programs), 1) + np.testing.assert_equal(channel_pair._known_programs['test'].waveform_to_segment, + np.array([5, 3, 1, 2, 6], dtype=np.int64)) + + def test_upload_offset_handling(self): + + program = self.Loop(waveform=self.TableWaveform(1, [(0, 0.1, self.HoldInterpolationStrategy()), + (192, 0.1, self.HoldInterpolationStrategy())])) + + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + channels = (1, None) + markers = (None, None) + + tabor_program_kwargs = dict( + channels=channels, + markers=markers, + device_properties=channel_pair.device.dev_properties) + + test_sample_rate = TimeType.from_fraction(1, 1) + test_amplitudes = (channel_pair.device.amplitude(channel_pair._channels[0]) / 2, + channel_pair.device.amplitude(channel_pair._channels[1]) / 2) + test_offset = 0.1 + test_transform = (lambda x: x, lambda x: x) + + with patch('qupulse.hardware.awgs.tabor.TaborProgram', wraps=TaborProgram) as tabor_program_mock: + with patch.object(self.instrument, 'offset', return_value=test_offset) as offset_mock: + tabor_program_mock.get_sampled_segments = mock.Mock(wraps=tabor_program_mock.get_sampled_segments) + + channel_pair.amplitude_offset_handling = AWGAmplitudeOffsetHandling.CONSIDER_OFFSET + channel_pair.upload('test1', program, channels, markers, test_transform) + + tabor_program_mock.assert_called_once_with(program, **tabor_program_kwargs, + sample_rate=test_sample_rate, + amplitudes=test_amplitudes, + offsets=(test_offset, test_offset), + voltage_transformations=test_transform) + self.assertEqual([mock.call(1), mock.call(2)], offset_mock.call_args_list) + offset_mock.reset_mock() + tabor_program_mock.reset_mock() + + channel_pair.amplitude_offset_handling = AWGAmplitudeOffsetHandling.IGNORE_OFFSET + channel_pair.upload('test2', program, (1, None), (None, None), test_transform) + + tabor_program_mock.assert_called_once_with(program, **tabor_program_kwargs, + sample_rate=test_sample_rate, + amplitudes=test_amplitudes, + offsets=(0., 0.), + voltage_transformations=test_transform) + self.assertEqual([], offset_mock.call_args_list) + + def test_find_place_for_segments_in_memory(self): + def hash_based_on_dir(ch): + hash_list = [] + for d in dir(ch): + o = getattr(ch, d) + if isinstance(o, np.ndarray): + hash_list.append(hash(o.tobytes())) + else: + try: + hash_list.append(hash(o)) + except TypeError: + pass + return hash(tuple(hash_list)) + + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + # empty + segments = np.asarray([-5, -6, -7, -8, -9]) + segment_lengths = 192 + np.asarray([32, 16, 64, 32, 16]) + + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(ta.tolist(), [True, True, True, True, True]) + self.assertEqual(ti.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + # all new segments + channel_pair._segment_capacity = 192 + np.asarray([0, 16, 32, 16, 0], dtype=np.uint32) + channel_pair._segment_hashes = np.asarray([1, 2, 3, 4, 5], dtype=np.int64) + channel_pair._segment_references = np.asarray([1, 1, 1, 2, 1], dtype=np.int32) + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(ta.tolist(), [True, True, True, True, True]) + self.assertEqual(ti.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + # some known segments + channel_pair._segment_capacity = 192 + np.asarray([0, 16, 32, 64, 0, 16], dtype=np.uint32) + channel_pair._segment_hashes = np.asarray([1, 2, 3, -7, 5, -9], dtype=np.int64) + channel_pair._segment_references = np.asarray([1, 1, 1, 2, 1, 3], dtype=np.int32) + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, 3, -1, 5]) + self.assertEqual(ta.tolist(), [True, True, False, True, False]) + self.assertEqual(ti.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + # insert some segments with same length + channel_pair._segment_capacity = 192 + np.asarray([0, 16, 32, 64, 0, 16], dtype=np.uint32) + channel_pair._segment_hashes = np.asarray([1, 2, 3, 4, 5, 6], dtype=np.int64) + channel_pair._segment_references = np.asarray([1, 0, 1, 0, 1, 3], dtype=np.int32) + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(ta.tolist(), [True, False, False, True, True]) + self.assertEqual(ti.tolist(), [-1, 1, 3, -1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + # insert some segments with smaller length + channel_pair._segment_capacity = 192 + np.asarray([0, 80, 32, 64, 96, 16], dtype=np.uint32) + channel_pair._segment_hashes = np.asarray([1, 2, 3, 4, 5, 6], dtype=np.int64) + channel_pair._segment_references = np.asarray([1, 0, 1, 1, 0, 3], dtype=np.int32) + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(ta.tolist(), [True, True, False, False, True]) + self.assertEqual(ti.tolist(), [-1, -1, 4, 1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + # mix everything + segments = np.asarray([-5, -6, -7, -8, -9, -10, -11]) + segment_lengths = 192 + np.asarray([32, 16, 64, 32, 16, 0, 0]) + + channel_pair._segment_capacity = 192 + np.asarray([0, 80, 32, 64, 32, 16], dtype=np.uint32) + channel_pair._segment_hashes = np.asarray([1, 2, 3, 4, -8, 6], dtype=np.int64) + channel_pair._segment_references = np.asarray([1, 0, 1, 0, 1, 0], dtype=np.int32) + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, -1, 4, -1, -1, -1]) + self.assertEqual(ta.tolist(), [False, True, False, False, True, True, True]) + self.assertEqual(ti.tolist(), [1, -1, 3, -1, -1, -1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + def test_upload_segment(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + self.reset_instrument_logs() + + channel_pair._segment_references = np.array([1, 2, 0, 1], dtype=np.uint32) + channel_pair._segment_capacity = 192 + np.array([0, 16, 32, 32], dtype=np.uint32) + channel_pair._segment_lengths = channel_pair._segment_capacity.copy() + + channel_pair._segment_hashes = np.array([1, 2, 3, 4], dtype=np.int64) + + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + segment = self.TaborSegment.from_sampled(np.ones(192+16, dtype=np.uint16), np.zeros(192+16, dtype=np.uint16), None, None) + segment_binary = segment.get_as_binary() + with self.assertRaises(ValueError): + channel_pair._upload_segment(3, segment) + + with self.assertRaises(ValueError): + channel_pair._upload_segment(0, segment) + + channel_pair._upload_segment(2, segment) + np.testing.assert_equal(channel_pair._segment_capacity, 192 + np.array([0, 16, 32, 32], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_lengths, 192 + np.array([0, 16, 16, 32], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_hashes, np.array([1, 2, hash(segment), 4], dtype=np.int64)) + + expected_commands = [':INST:SEL 1', ':INST:SEL 1', ':INST:SEL 1', + ':TRAC:DEF 3, 208', + ':TRAC:SEL 3', + ':TRAC:MODE COMB'] + expected_log = [((), dict(cmd_str=cmd, paranoia_level=channel_pair.internal_paranoia_level)) + for cmd in expected_commands] + self.assertAllCommandLogsEqual(expected_log) + + expected_send_binary_data_log = [(':TRAC:DATA', segment_binary, None)] + for device in self.instrument.all_devices: + np.testing.assert_equal(device._send_binary_data_calls, expected_send_binary_data_log) + + def test_amend_segments_flush(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + self.instrument.main_instrument.paranoia_level = 0 + self.instrument.main_instrument.logged_commands = [] + self.instrument.main_instrument.logged_queries = [] + self.instrument.main_instrument._send_binary_data_calls = [] + self.reset_instrument_logs() + + channel_pair._segment_references = np.array([1, 2, 0, 1], dtype=np.uint32) + channel_pair._segment_capacity = 192 + np.array([0, 16, 32, 32], dtype=np.uint32) + channel_pair._segment_lengths = 192 + np.array([0, 16, 16, 32], dtype=np.uint32) + + channel_pair._segment_hashes = np.array([1, 2, 3, 4], dtype=np.int64) + + data = np.ones(192, dtype=np.uint16) + segments = [self.TaborSegment.from_sampled(0*data, 1*data, None, None), + self.TaborSegment.from_sampled(1*data, 2*data, None, None)] + + channel_pair._amend_segments(segments) + + expected_references = np.array([1, 2, 0, 1, 1, 1], dtype=np.uint32) + expected_capacities = 192 + np.array([0, 16, 32, 32, 0, 0], dtype=np.uint32) + expected_lengths = 192 + np.array([0, 16, 16, 32, 0, 0], dtype=np.uint32) + expected_hashes = np.array([1, 2, 3, 4, hash(segments[0]), hash(segments[1])], dtype=np.int64) + + np.testing.assert_equal(channel_pair._segment_references, expected_references) + np.testing.assert_equal(channel_pair._segment_capacity, expected_capacities) + np.testing.assert_equal(channel_pair._segment_lengths, expected_lengths) + np.testing.assert_equal(channel_pair._segment_hashes, expected_hashes) + + expected_commands = [':INST:SEL 1', + ':TRAC:DEF 5,{}'.format(2 * 192 + 16), + ':TRAC:SEL 5', + ':TRAC:MODE COMB', + ':TRAC:DEF 3,208'] + expected_log = [((), dict(cmd_str=cmd, paranoia_level=channel_pair.internal_paranoia_level)) + for cmd in expected_commands] + self.assertAllCommandLogsEqual(expected_log) + #self.assertEqual(expected_log, instrument.main_instrument.logged_commands) + + expected_download_segment_calls = [(expected_capacities, ':SEGM:DATA', None)] + np.testing.assert_equal(self.instrument.main_instrument._download_segment_lengths_calls, expected_download_segment_calls) + + expected_bin_blob = self.make_combined_wave(segments) + expected_send_binary_data_log = [(':TRAC:DATA', expected_bin_blob, None)] + np.testing.assert_equal(self.instrument.main_instrument._send_binary_data_calls, expected_send_binary_data_log) + + def test_amend_segments_iter(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + self.instrument.paranoia_level = 0 + self.reset_instrument_logs() + + channel_pair._segment_references = np.array([1, 2, 0, 1], dtype=np.uint32) + channel_pair._segment_capacity = 192 + np.array([0, 16, 32, 32], dtype=np.uint32) + channel_pair._segment_lengths = 192 + np.array([0, 0, 16, 16], dtype=np.uint32) + + channel_pair._segment_hashes = np.array([1, 2, 3, 4], dtype=np.int64) + + data = np.ones(192, dtype=np.uint16) + segments = [self.TaborSegment.from_sampled(0*data, 1*data, None, None), + self.TaborSegment.from_sampled(1*data, 2*data, None, None)] + + indices = channel_pair._amend_segments(segments) + + expected_references = np.array([1, 2, 0, 1, 1, 1], dtype=np.uint32) + expected_capacities = 192 + np.array([0, 16, 32, 32, 0, 0], dtype=np.uint32) + expected_lengths = 192 + np.array([0, 0, 16, 16, 0, 0], dtype=np.uint32) + expected_hashes = np.array([1, 2, 3, 4, hash(segments[0]), hash(segments[1])], dtype=np.int64) + + np.testing.assert_equal(channel_pair._segment_references, expected_references) + np.testing.assert_equal(channel_pair._segment_capacity, expected_capacities) + np.testing.assert_equal(channel_pair._segment_lengths, expected_lengths) + np.testing.assert_equal(channel_pair._segment_hashes, expected_hashes) + + np.testing.assert_equal(indices, np.array([4, 5], dtype=np.int64)) + + expected_commands = [':INST:SEL 1', + ':TRAC:DEF 5,{}'.format(2 * 192 + 16), + ':TRAC:SEL 5', + ':TRAC:MODE COMB', + ':TRAC:DEF 5,192', + ':TRAC:DEF 6,192'] + expected_log = [((), dict(cmd_str=cmd, paranoia_level=channel_pair.internal_paranoia_level)) + for cmd in expected_commands] + self.assertAllCommandLogsEqual(expected_log) + + expected_download_segment_calls = [] + for device in self.instrument.all_devices: + self.assertEqual(device._download_segment_lengths_calls, expected_download_segment_calls) + + expected_bin_blob = self.make_combined_wave(segments) + expected_send_binary_data_log = [(':TRAC:DATA', expected_bin_blob, None)] + for device in self.instrument.all_devices: + np.testing.assert_equal(device._send_binary_data_calls, expected_send_binary_data_log) + + def test_cleanup(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + self.instrument.paranoia_level = 0 + self.instrument.logged_commands = [] + self.instrument.logged_queries = [] + self.instrument._send_binary_data_calls = [] + + channel_pair._segment_references = np.array([1, 2, 0, 1], dtype=np.uint32) + channel_pair._segment_capacity = 192 + np.array([0, 16, 32, 32], dtype=np.uint32) + channel_pair._segment_lengths = 192 + np.array([0, 0, 16, 16], dtype=np.uint32) + channel_pair._segment_hashes = np.array([1, 2, 3, 4], dtype=np.int64) + + channel_pair.cleanup() + np.testing.assert_equal(channel_pair._segment_references, np.array([1, 2, 0, 1], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_capacity, 192 + np.array([0, 16, 32, 32], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_lengths, 192 + np.array([0, 0, 16, 16], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_hashes, np.array([1, 2, 3, 4], dtype=np.int64)) + + channel_pair._segment_references = np.array([1, 2, 0, 1, 0], dtype=np.uint32) + channel_pair._segment_capacity = 192 + np.array([0, 16, 32, 32, 32], dtype=np.uint32) + channel_pair._segment_lengths = 192 + np.array([0, 0, 16, 16, 0], dtype=np.uint32) + channel_pair._segment_hashes = np.array([1, 2, 3, 4, 5], dtype=np.int64) + + channel_pair.cleanup() + np.testing.assert_equal(channel_pair._segment_references, np.array([1, 2, 0, 1], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_capacity, 192 + np.array([0, 16, 32, 32], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_lengths, 192 + np.array([0, 0, 16, 16], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_hashes, np.array([1, 2, 3, 4], dtype=np.int64)) + + def test_remove(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + calls = [] + + program_name = 'test' + def dummy_free_program(name): + self.assertIs(name, program_name) + calls.append('free_program') + + def dummy_cleanup(): + calls.append('cleanup') + + channel_pair.cleanup = dummy_cleanup + channel_pair.free_program = dummy_free_program + + channel_pair.remove(program_name) + self.assertEqual(calls, ['free_program', 'cleanup']) + + def test_change_armed_program_single_sequence(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + self.instrument.paranoia_level = 0 + self.instrument.logged_commands = [] + self.instrument.logged_queries = [] + self.reset_instrument_logs() + + advanced_sequencer_table = [(2, 1, 0)] + sequencer_tables = [[(3, 0, 0), (2, 1, 0), (1, 0, 0), (1, 2, 0), (1, 3, 0)]] + w2s = np.array([2, 5, 3, 1]) + + sequencer_tables = self.to_new_sequencer_tables(sequencer_tables) + advanced_sequencer_table = self.to_new_advanced_sequencer_table(advanced_sequencer_table) + + expected_sequencer_table = [(3, 3, 0), (2, 6, 0), (1, 3, 0), (1, 4, 0), (1, 2, 0)] + + program = DummyTaborProgramClass(advanced_sequencer_table=advanced_sequencer_table, + sequencer_tables=sequencer_tables, + waveform_mode=self.TaborSequencing.SINGLE)(None, None, None, None) + + channel_pair._known_programs['test'] = self.TaborProgramMemory(w2s, program) + + channel_pair.change_armed_program('test') + + expected_adv_seq_table_log = [([(1, 1, 1), (2, 2, 0), (1, 1, 0)], ':ASEQ:DATA', None)] + expected_sequencer_table_log = [((sequencer_table,), dict(pref=':SEQ:DATA', paranoia_level=None)) + for sequencer_table in [channel_pair._idle_sequence_table, + expected_sequencer_table]] + + for device in self.instrument.all_devices: + self.assertEqual(device._download_adv_seq_table_calls, expected_adv_seq_table_log) + self.assertEqual(device._download_sequencer_table_calls, expected_sequencer_table_log) + + def test_change_armed_program_single_waveform(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + self.instrument.paranoia_level = 0 + self.instrument.logged_commands = [] + self.instrument.logged_queries = [] + self.reset_instrument_logs() + + advanced_sequencer_table = [(1, 1, 0)] + sequencer_tables = [[(10, 0, 0)]] + w2s = np.array([4]) + + sequencer_tables = self.to_new_sequencer_tables(sequencer_tables) + advanced_sequencer_table = self.to_new_advanced_sequencer_table(advanced_sequencer_table) + + expected_sequencer_table = [(10, 5, 0), (1, 1, 0), (1, 1, 0)] + + program = DummyTaborProgramClass(advanced_sequencer_table=advanced_sequencer_table, + sequencer_tables=sequencer_tables, + waveform_mode=self.TaborSequencing.SINGLE)(None, None, None, None) + + channel_pair._known_programs['test'] = self.TaborProgramMemory(w2s, program) + + channel_pair.change_armed_program('test') + + expected_adv_seq_table_log = [([(1, 1, 1), (1, 2, 0), (1, 1, 0)], ':ASEQ:DATA', None)] + expected_sequencer_table_log = [((sequencer_table,), dict(pref=':SEQ:DATA', paranoia_level=None)) + for sequencer_table in [channel_pair._idle_sequence_table, + expected_sequencer_table]] + + for device in self.instrument.all_devices: + self.assertEqual(device._download_adv_seq_table_calls, expected_adv_seq_table_log) + self.assertEqual(device._download_sequencer_table_calls, expected_sequencer_table_log) + + def test_change_armed_program_advanced_sequence(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + self.instrument.paranoia_level = 0 + self.instrument.logged_commands = [] + self.instrument.logged_queries = [] + self.instrument._send_binary_data_calls = [] + + self.reset_instrument_logs() + + advanced_sequencer_table = [(2, 1, 0), (3, 2, 0)] + sequencer_tables = [[(3, 0, 0), (2, 1, 0), (1, 0, 0), (1, 2, 0), (1, 3, 0)], + [(4, 1, 0), (2, 1, 0), (1, 0, 0), (1, 2, 0), (1, 3, 0)]] + wf_idx2seg_idx = np.array([2, 5, 3, 1]) + + sequencer_tables = self.to_new_sequencer_tables(sequencer_tables) + advanced_sequencer_table = self.to_new_advanced_sequencer_table(advanced_sequencer_table) + + expected_sequencer_tables = [[(3, 3, 0), (2, 6, 0), (1, 3, 0), (1, 4, 0), (1, 2, 0)], + [(4, 6, 0), (2, 6, 0), (1, 3, 0), (1, 4, 0), (1, 2, 0)]] + + program = DummyTaborProgramClass(advanced_sequencer_table=advanced_sequencer_table, + sequencer_tables=sequencer_tables, + waveform_mode=self.TaborSequencing.ADVANCED)(None, None, None, None) + + channel_pair._known_programs['test'] = self.TaborProgramMemory(wf_idx2seg_idx, program) + + channel_pair.change_armed_program('test') + + expected_adv_seq_table_log = [([(1, 1, 1), (2, 2, 0), (3, 3, 0)], ':ASEQ:DATA', None)] + expected_sequencer_table_log = [((sequencer_table,), dict(pref=':SEQ:DATA', paranoia_level=None)) + for sequencer_table in [channel_pair._idle_sequence_table] + + expected_sequencer_tables] + + for device in self.instrument.all_devices: + self.assertEqual(device._download_adv_seq_table_calls, expected_adv_seq_table_log) + self.assertEqual(device._download_sequencer_table_calls, expected_sequencer_table_log) diff --git a/tests/hardware/tabor_new_driver_exex_test.py b/tests/hardware/tabor_new_driver_exex_test.py new file mode 100644 index 000000000..c4cf4489e --- /dev/null +++ b/tests/hardware/tabor_new_driver_exex_test.py @@ -0,0 +1,136 @@ +import unittest + + +with_alazar = True + +def get_pulse(): + from qupulse.pulses import TablePulseTemplate as TPT, SequencePulseTemplate as SPT, RepetitionPulseTemplate as RPT + + ramp = TPT(identifier='ramp', channels={'out', 'trigger'}) + ramp.add_entry(0, 'start', channel='out') + ramp.add_entry('duration', 'stop', 'linear', channel='out') + + ramp.add_entry(0, 1, channel='trigger') + ramp.add_entry('duration', 1, 'hold', channel='trigger') + + ramp.add_measurement_declaration('meas', 0, 'duration') + + base = SPT([(ramp, dict(start='min', stop='max', duration='tau/3'), dict(meas='A')), + (ramp, dict(start='max', stop='max', duration='tau/3'), dict(meas='B')), + (ramp, dict(start='max', stop='min', duration='tau/3'), dict(meas='C'))], {'min', 'max', 'tau'}) + + repeated = RPT(base, 'n') + + root = SPT([repeated, repeated, repeated], {'min', 'max', 'tau', 'n'}) + + return root + + +def get_alazar_config(): + from atsaverage import alazar + from atsaverage.config import ScanlineConfiguration, CaptureClockConfiguration, EngineTriggerConfiguration,\ + TRIGInputConfiguration, InputConfiguration + + trig_level = int((5 + 0.4) / 10. * 255) + assert 0 <= trig_level < 256 + + config = ScanlineConfiguration() + config.triggerInputConfiguration = TRIGInputConfiguration(triggerRange=alazar.TriggerRangeID.etr_5V) + config.triggerConfiguration = EngineTriggerConfiguration(triggerOperation=alazar.TriggerOperation.J, + triggerEngine1=alazar.TriggerEngine.J, + triggerSource1=alazar.TriggerSource.external, + triggerSlope1=alazar.TriggerSlope.positive, + triggerLevel1=trig_level, + triggerEngine2=alazar.TriggerEngine.K, + triggerSource2=alazar.TriggerSource.disable, + triggerSlope2=alazar.TriggerSlope.positive, + triggerLevel2=trig_level) + config.captureClockConfiguration = CaptureClockConfiguration(source=alazar.CaptureClockType.internal_clock, + samplerate=alazar.SampleRateID.rate_100MSPS) + config.inputConfiguration = 4*[InputConfiguration(input_range=alazar.InputRangeID.range_1_V)] + config.totalRecordSize = 0 + + assert config.totalRecordSize == 0 + + return config + +def get_operations(): + from atsaverage.operations import Downsample + + return [Downsample(identifier='DS_A', maskID='A'), + Downsample(identifier='DS_B', maskID='B'), + Downsample(identifier='DS_C', maskID='C'), + Downsample(identifier='DS_D', maskID='D')] + +def get_window(card): + from atsaverage.gui import ThreadedStatusWindow + window = ThreadedStatusWindow(card) + window.start() + return window + + +class TaborTests(unittest.TestCase): + @unittest.skip + def test_all(self): + from qupulse.hardware.feature_awg.tabor import TaborChannelTuple, TaborDevice + #import warnings + tawg = TaborDevice(r'USB0::0x168C::0x2184::0000216488::INSTR') + tchannelpair = TaborChannelTuple(tawg, (1, 2), 'TABOR_AB') + tawg.paranoia_level = 2 + + #warnings.simplefilter('error', Warning) + + from qupulse.hardware.setup import HardwareSetup, PlaybackChannel, MarkerChannel + hardware_setup = HardwareSetup() + + hardware_setup.set_channel('TABOR_A', PlaybackChannel(tchannelpair, 0)) + hardware_setup.set_channel('TABOR_B', PlaybackChannel(tchannelpair, 1)) + hardware_setup.set_channel('TABOR_A_MARKER', MarkerChannel(tchannelpair, 0)) + hardware_setup.set_channel('TABOR_B_MARKER', MarkerChannel(tchannelpair, 1)) + + if with_alazar: + from qupulse.hardware.dacs.alazar import AlazarCard + import atsaverage.server + + if not atsaverage.server.Server.default_instance.running: + atsaverage.server.Server.default_instance.start(key=b'guest') + + import atsaverage.core + + alazar = AlazarCard(atsaverage.core.getLocalCard(1, 1)) + alazar.register_mask_for_channel('A', 0) + alazar.register_mask_for_channel('B', 0) + alazar.register_mask_for_channel('C', 0) + alazar.config = get_alazar_config() + + alazar.register_operations('test', get_operations()) + window = get_window(atsaverage.core.getLocalCard(1, 1)) + hardware_setup.register_dac(alazar) + + repeated = get_pulse() + + from qupulse.pulses.sequencing import Sequencer + + sequencer = Sequencer() + sequencer.push(repeated, + parameters=dict(n=1000, min=-0.5, max=0.5, tau=192*3), + channel_mapping={'out': 'TABOR_A', 'trigger': 'TABOR_A_MARKER'}, + window_mapping=dict(A='A', B='B', C='C')) + instruction_block = sequencer.build() + + hardware_setup.register_program('test', instruction_block) + + if with_alazar: + from atsaverage.masks import PeriodicMask + m = PeriodicMask() + m.identifier = 'D' + m.begin = 0 + m.end = 1 + m.period = 1 + m.channel = 0 + alazar._registered_programs['test'].masks.append(m) + + hardware_setup.arm_program('test') + + d = 1 + diff --git a/tests/hardware/tabor_new_driver_simulator_based_tests.py b/tests/hardware/tabor_new_driver_simulator_based_tests.py new file mode 100644 index 000000000..ecbd9b445 --- /dev/null +++ b/tests/hardware/tabor_new_driver_simulator_based_tests.py @@ -0,0 +1,290 @@ +import unittest +import subprocess +import time +import platform +import os +from typing import List, Tuple, Optional, Any + +import pytabor +import numpy as np + +from qupulse._program.tabor import TableDescription, TableEntry +from qupulse.hardware.feature_awg.features import DeviceControl, VoltageRange, ProgramManagement, SCPI, \ + VolatileParameters, RepetitionMode +from qupulse.hardware.feature_awg.tabor import TaborDevice, TaborSegment, TaborProgramMemory +from qupulse.utils.types import TimeType + + +class TaborSimulatorManager: + def __init__(self, + simulator_executable='WX2184C.exe', + simulator_path=os.path.realpath(os.path.dirname(__file__))): + self.simulator_executable = simulator_executable + self.simulator_path = simulator_path + + self.started_simulator = False + + self.simulator_process = None + self.instrument: TaborDevice = None + + def kill_running_simulators(self): + command = 'Taskkill', '/IM {simulator_executable}'.format(simulator_executable=self.simulator_executable) + try: + subprocess.run([command], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except FileNotFoundError: + pass + + @property + def simulator_full_path(self): + return os.path.join(self.simulator_path, self.simulator_executable) + + def start_simulator(self, try_connecting_to_existing_simulator=True, max_wait_time=30): + if try_connecting_to_existing_simulator: + if pytabor.open_session('127.0.0.1') is not None: + return + + if not os.path.isfile(self.simulator_full_path): + raise RuntimeError('Cannot locate simulator executable.') + + self.kill_running_simulators() + + self.simulator_process = subprocess.Popen([self.simulator_full_path, '/switch-on', '/gui-in-tray']) + + start = time.time() + while pytabor.open_session('127.0.0.1') is None: + if self.simulator_process.returncode: + raise RuntimeError('Simulator exited with return code {}'.format(self.simulator_process.returncode)) + if time.time() - start > max_wait_time: + raise RuntimeError('Could not connect to simulator') + time.sleep(0.1) + + def connect(self) -> TaborDevice: + self.instrument = TaborDevice("testDevice", + "127.0.0.1", + reset=True, + paranoia_level=2) + + if self.instrument.main_instrument.visa_inst is None: + raise RuntimeError('Could not connect to simulator') + return self.instrument + + def disconnect(self): + for device in self.instrument.all_devices: + device.close() + self.instrument = None + + def __del__(self): + if self.started_simulator and self.simulator_process: + self.simulator_process.kill() + + +@unittest.skipIf(platform.system() != 'Windows', "Simulator currently only available on Windows :(") +class TaborSimulatorBasedTest(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.instrument: TaborDevice = None + + @classmethod + def setUpClass(cls): + cls.simulator_manager = TaborSimulatorManager('WX2184C.exe', os.path.dirname(__file__)) + try: + cls.simulator_manager.start_simulator() + except RuntimeError as err: + raise unittest.SkipTest(*err.args) from err + + @classmethod + def tearDownClass(cls): + del cls.simulator_manager + + def setUp(self): + self.instrument = self.simulator_manager.connect() + + def tearDown(self): + self.instrument[DeviceControl].reset() + self.simulator_manager.disconnect() + + @staticmethod + def to_new_sequencer_tables(sequencer_tables: List[List[Tuple[int, int, int]]] + ) -> List[List[Tuple[TableDescription, Optional[Any]]]]: + return [[(TableDescription(*entry), None) for entry in sequencer_table] + for sequencer_table in sequencer_tables] + + @staticmethod + def to_new_advanced_sequencer_table(advanced_sequencer_table: List[Tuple[int, int, int]]) -> List[TableDescription]: + return [TableDescription(*entry) for entry in advanced_sequencer_table] + + +class TaborAWGRepresentationTests(TaborSimulatorBasedTest): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def test_sample_rate(self): + for ch_tuple in self.instrument.channel_tuples: + self.assertIsInstance(ch_tuple.sample_rate, TimeType) + + self.instrument[SCPI].send_cmd(':INST:SEL 1') + self.instrument[SCPI].send_cmd(':FREQ:RAST 2.3e9') + + self.assertEqual(2300000000, self.instrument.channel_tuples[0].sample_rate) + + def test_amplitude(self): + for channel in self.instrument.channels: + self.assertIsInstance(channel[VoltageRange].amplitude, float) + + self.instrument[SCPI].send_cmd(':INST:SEL 1; :OUTP:COUP DC') + self.instrument[SCPI].send_cmd(':VOLT 0.7') + + self.assertAlmostEqual(.7, self.instrument.channels[0][VoltageRange].amplitude) + + def test_select_marker(self): + with self.assertRaises(IndexError): + self.instrument.marker_channels[6]._select() + + self.instrument.marker_channels[1]._select() + selected = self.instrument[SCPI].send_query(':SOUR:MARK:SEL?') + self.assertEqual(selected, '2') + + self.instrument.marker_channels[0]._select() + selected = self.instrument[SCPI].send_query(':SOUR:MARK:SEL?') + self.assertEqual(selected, '1') + + def test_select_channel(self): + with self.assertRaises(IndexError): + self.instrument.channels[6]._select() + + self.instrument.channels[0]._select() + self.assertEqual(self.instrument[SCPI].send_query(':INST:SEL?'), '1') + + self.instrument.channels[3]._select() + self.assertEqual(self.instrument[SCPI].send_query(':INST:SEL?'), '4') + + +class TaborMemoryReadTests(TaborSimulatorBasedTest): + def setUp(self): + super().setUp() + + ramp_up = np.linspace(0, 2 ** 14 - 1, num=192, dtype=np.uint16) + ramp_down = ramp_up[::-1] + zero = np.ones(192, dtype=np.uint16) * 2 ** 13 + sine = ((np.sin(np.linspace(0, 2 * np.pi, 192 + 64)) + 1) / 2 * (2 ** 14 - 1)).astype(np.uint16) + + self.segments = [TaborSegment.from_sampled(ramp_up, ramp_up, None, None), + TaborSegment.from_sampled(ramp_down, zero, None, None), + TaborSegment.from_sampled(sine, sine, None, None)] + + self.zero_segment = TaborSegment.from_sampled(zero, zero, None, None) + + # program 1 + self.sequence_tables_raw = [[(10, 0, 0), (10, 1, 0), (10, 0, 0), (10, 1, 0)], + [(1, 0, 0), (1, 1, 0), (1, 0, 0), (1, 1, 0)]] + self.advanced_sequence_table = [(1, 1, 0), (1, 2, 0)] + + self.sequence_tables = self.to_new_sequencer_tables(self.sequence_tables_raw) + self.advanced_sequence_table = self.to_new_advanced_sequencer_table(self.advanced_sequence_table) + + self.channel_pair = self.instrument.channel_tuples[0] + + def arm_program(self, sequencer_tables, advanced_sequencer_table, mode, waveform_to_segment_index): + class DummyProgram: + def __init__(self): + self._repetition_mode = RepetitionMode.INFINITE + + def get_sequencer_tables(self): + return sequencer_tables + + def get_advanced_sequencer_table(self): + return advanced_sequencer_table + + def update_volatile_parameters(self, parameters): + modifications = {1: TableEntry(repetition_count=5, element_number=2, jump_flag=0), + (0, 1): TableDescription(repetition_count=50, element_id=1, jump_flag=0)} + return modifications + + markers = (None, None) + channels = (1, 2) + + waveform_mode = mode + + # TODO: QUESTION: is this change okay? + self.channel_pair._known_programs['dummy_program'] = TaborProgramMemory(waveform_to_segment_index, + DummyProgram()) + self.channel_pair[ProgramManagement]._change_armed_program('dummy_program') + + def test_read_waveforms(self): + self.channel_pair._amend_segments(self.segments) + + waveforms = self.channel_pair.read_waveforms() + + segments = [TaborSegment.from_binary_segment(waveform) + for waveform in waveforms] + + expected = [self.zero_segment, *self.segments] + + for ex, r in zip(expected, segments): + ex1, ex2 = ex.data_a, ex.data_b + r1, r2 = r.data_a, r.data_b + np.testing.assert_equal(ex1, r1) + np.testing.assert_equal(ex2, r2) + + self.assertEqual(expected, segments) + + def test_read_sequence_tables(self): + self.channel_pair._amend_segments(self.segments) + self.arm_program(self.sequence_tables, self.advanced_sequence_table, None, np.asarray([1, 2])) + + sequence_tables = self.channel_pair.read_sequence_tables() + + actual_sequence_tables = [self.channel_pair[ProgramManagement]._idle_sequence_table] + [[(rep, index + 2, jump) + for rep, index, jump in + table] + for table in + self.sequence_tables_raw] + + expected = list(tuple(np.asarray(d) + for d in zip(*table)) + for table in actual_sequence_tables) + + np.testing.assert_equal(sequence_tables, expected) + + def test_read_advanced_sequencer_table(self): + self.channel_pair._amend_segments(self.segments) + self.arm_program(self.sequence_tables, self.advanced_sequence_table, None, np.asarray([1, 2])) + + actual_advanced_table = [(1, 1, 0)] + [(rep, idx + 1, jmp) for rep, idx, jmp in self.advanced_sequence_table] + + expected = list(np.asarray(d) + for d in zip(*actual_advanced_table)) + + advanced_table = self.channel_pair.read_advanced_sequencer_table() + np.testing.assert_equal(advanced_table, expected) + + def test_set_volatile_parameter(self): + self.channel_pair._amend_segments(self.segments) + self.arm_program(self.sequence_tables, self.advanced_sequence_table, None, np.asarray([1, 2])) + + para = {'a': 5} + actual_sequence_tables = [self.channel_pair[ProgramManagement]._idle_sequence_table] + [[(rep, index + 2, jump) + for rep, index, jump in + table] + for table in + self.sequence_tables_raw] + + actual_advanced_table = [(1, 1, 0)] + [(rep, idx + 1, jmp) for rep, idx, jmp in self.advanced_sequence_table] + + self.channel_pair[VolatileParameters].set_volatile_parameters('dummy_program', parameters=para) + + actual_sequence_tables[1][1] = (50, 3, 0) + actual_advanced_table[2] = (5, 3, 0) + + sequence_table = self.channel_pair.read_sequence_tables() + expected = list(tuple(np.asarray(d) + for d in zip(*table)) + for table in actual_sequence_tables) + np.testing.assert_equal(sequence_table, expected) + + advanced_table = self.channel_pair.read_advanced_sequencer_table() + expected = list(np.asarray(d) + for d in zip(*actual_advanced_table)) + np.testing.assert_equal(advanced_table, expected) diff --git a/tests/hardware/tabor_new_driver_tests.py b/tests/hardware/tabor_new_driver_tests.py new file mode 100644 index 000000000..85421b042 --- /dev/null +++ b/tests/hardware/tabor_new_driver_tests.py @@ -0,0 +1,43 @@ +import unittest + +from qupulse.hardware.feature_awg.tabor import with_configuration_guard + + +class ConfigurationGuardTest(unittest.TestCase): + class DummyChannelPair: + def __init__(self, test_obj: unittest.TestCase): + self.test_obj = test_obj + self._configuration_guard_count = 0 + self.is_in_config_mode = False + + def _enter_config_mode(self): + self.test_obj.assertFalse(self.is_in_config_mode) + self.test_obj.assertEqual(self._configuration_guard_count, 0) + self.is_in_config_mode = True + + def _exit_config_mode(self): + self.test_obj.assertTrue(self.is_in_config_mode) + self.test_obj.assertEqual(self._configuration_guard_count, 0) + self.is_in_config_mode = False + + @with_configuration_guard + def guarded_method(self, counter=5, throw=False): + self.test_obj.assertTrue(self.is_in_config_mode) + if counter > 0: + return self.guarded_method(counter - 1, throw) + 1 + if throw: + raise RuntimeError() + return 0 + + def test_config_guard(self): + channel_pair = ConfigurationGuardTest.DummyChannelPair(self) + + for i in range(5): + self.assertEqual(channel_pair.guarded_method(i), i) + + with self.assertRaises(RuntimeError): + channel_pair.guarded_method(1, True) + + self.assertFalse(channel_pair.is_in_config_mode) + +