From 18de6a2aac31f09206846907aaf5d6e6bf86d43d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 31 Oct 2024 16:37:57 +0000 Subject: [PATCH] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/examples/buffered_dummy_example.py | 8 +- src/qumada/instrument/buffers/buffer.py | 29 +- .../instrument/custom_drivers/QDevil/QDAC2.py | 1160 +++++++++-------- .../instrument/custom_drivers/ZI/MFLI.py | 18 +- src/qumada/instrument/mapping/QDevil/qdac2.py | 10 +- src/qumada/measurement/device_object.py | 84 +- src/qumada/measurement/measurement.py | 15 +- src/qumada/measurement/shuttling_scripts.py | 6 +- src/tests/instrument_test.py | 2 +- 9 files changed, 690 insertions(+), 642 deletions(-) diff --git a/src/examples/buffered_dummy_example.py b/src/examples/buffered_dummy_example.py index 3cfc2d6..2b561a6 100644 --- a/src/examples/buffered_dummy_example.py +++ b/src/examples/buffered_dummy_example.py @@ -40,7 +40,11 @@ from qcodes.station import Station from qumada.instrument.buffered_instruments import BufferedDummyDMM as DummyDmm -from qumada.instrument.buffers.buffer import map_triggers, save_trigger_mapping, load_trigger_mapping +from qumada.instrument.buffers.buffer import ( + load_trigger_mapping, + map_triggers, + save_trigger_mapping, +) from qumada.instrument.custom_drivers.Dummies.dummy_dac import DummyDac from qumada.instrument.mapping import ( DUMMY_DMM_MAPPING, @@ -114,7 +118,7 @@ ) map_terminals_gui(station.components, script.gate_parameters) -map_triggers(station.components, path = r"C:\Users\till3\Documents\PythonScripts\tm.json") +map_triggers(station.components, path=r"C:\Users\till3\Documents\PythonScripts\tm.json") # %% Run measurement script.run() diff --git a/src/qumada/instrument/buffers/buffer.py b/src/qumada/instrument/buffers/buffer.py index 0b72fc0..6c10583 100644 --- a/src/qumada/instrument/buffers/buffer.py +++ b/src/qumada/instrument/buffers/buffer.py @@ -20,6 +20,7 @@ from __future__ import annotations +import logging from abc import ABC, abstractmethod from collections.abc import Mapping from typing import Any @@ -28,7 +29,6 @@ from qcodes.metadatable import Metadatable from qcodes.parameters import Parameter -import logging logger = logging.getLogger(__name__) import json @@ -78,7 +78,7 @@ def map_buffers( print("Available trigger inputs:") print("[0]: None") for idx, trigger in enumerate(buffer.AVAILABLE_TRIGGERS, 1): - print(f"[{idx}]: {trigger}") + print(f"[{idx}]: {trigger}") chosen = int(input(f"Choose the trigger input for {instrument.name}: ")) if chosen == 0: trigger = None @@ -120,12 +120,12 @@ def _map_triggers( def map_triggers( components: Mapping[Any, Metadatable], skip_mapped=True, - path: None|str=None, + path: None | str = None, **kwargs, ) -> None: """ - Maps the triggers of triggerable or bufferable components. - Ignores already mapped triggers by default. + Maps the triggers of triggerable or bufferable components. + Ignores already mapped triggers by default. Parameters ---------- @@ -136,7 +136,7 @@ def map_triggers( buffered instruments. TODO: Remove! gate_parameters : Mapping[Any, Mapping[Any, Parameter] | Parameter] - Parameters of measurement script/device. Currently only required for + Parameters of measurement script/device. Currently only required for buffered instruments. TODO: Remove! skip_mapped : Bool, optional @@ -158,20 +158,16 @@ def map_triggers( skip_mapped, **kwargs, ) - _map_triggers( - components, - skip_mapped, - **kwargs - ) - + _map_triggers(components, skip_mapped, **kwargs) + + def save_trigger_mapping(components: Mapping[Any, Metadatable], path: str): """ Saves mapped triggers from components to json file. Components should be station.components, path is the path to the file. """ trigger_dict = {} - triggered_instruments = filter( - lambda x: any((is_triggerable(x), is_bufferable(x))), components.values()) + triggered_instruments = filter(lambda x: any((is_triggerable(x), is_bufferable(x))), components.values()) for instrument in triggered_instruments: try: trigger_dict[instrument.full_name] = instrument._qumada_mapping.trigger_in @@ -180,6 +176,7 @@ def save_trigger_mapping(components: Mapping[Any, Metadatable], path: str): with open(path, mode="w") as file: json.dump(trigger_dict, file) + def load_trigger_mapping(components: Mapping[Any, Metadatable], path: str): """ Loads json file with trigger mappings and tries to apply them to instruments @@ -195,8 +192,8 @@ def load_trigger_mapping(components: Mapping[Any, Metadatable], path: str): instrument._qumada_buffer.trigger = trigger elif is_triggerable(instrument) is True: instrument._qumada_mapping.trigger_in = trigger - - + + class Buffer(ABC): """Base class for a general buffer interface for an instrument.""" diff --git a/src/qumada/instrument/custom_drivers/QDevil/QDAC2.py b/src/qumada/instrument/custom_drivers/QDevil/QDAC2.py index dc2ad65..3afe53f 100644 --- a/src/qumada/instrument/custom_drivers/QDevil/QDAC2.py +++ b/src/qumada/instrument/custom_drivers/QDevil/QDAC2.py @@ -1,14 +1,16 @@ -import numpy as np +import abc import itertools import uuid +from collections.abc import Sequence from time import sleep as sleep_s -from qcodes.instrument.channel import InstrumentChannel, ChannelList -from qcodes.instrument.visa import VisaInstrument +from typing import Dict, List, NewType, Optional, Tuple + +import numpy as np +from packaging.version import Version, parse from pyvisa.errors import VisaIOError +from qcodes.instrument.channel import ChannelList, InstrumentChannel +from qcodes.instrument.visa import VisaInstrument from qcodes.utils import validators -from typing import NewType, Tuple, Sequence, List, Dict, Optional -from packaging.version import Version, parse -import abc # Version 1.2.0 # @@ -45,48 +47,45 @@ # - Detect and handle mixing of internal and external triggers (_trigger). # -error_ambiguous_wave = 'Only one of frequency_Hz or period_s can be ' \ - 'specified for a wave form' +error_ambiguous_wave = "Only one of frequency_Hz or period_s can be " "specified for a wave form" def ints_to_comma_separated_list(array: Sequence[int]) -> str: - return ','.join([str(x) for x in array]) + return ",".join([str(x) for x in array]) def floats_to_comma_separated_list(array: Sequence[float]) -> str: - rounded = [format(x, 'g') for x in array] - return ','.join(rounded) + rounded = [format(x, "g") for x in array] + return ",".join(rounded) def comma_sequence_to_list(sequence: str) -> Sequence[str]: if not sequence: return [] - return [x.strip() for x in sequence.split(',')] + return [x.strip() for x in sequence.split(",")] def comma_sequence_to_list_of_floats(sequence: str) -> Sequence[float]: if not sequence: return [] - return [float(x.strip()) for x in sequence.split(',')] + return [float(x.strip()) for x in sequence.split(",")] -def diff_matrix(initial: Sequence[float], - measurements: Sequence[Sequence[float]]) -> np.ndarray: - """Subtract an array of measurements by an initial measurement - """ +def diff_matrix(initial: Sequence[float], measurements: Sequence[Sequence[float]]) -> np.ndarray: + """Subtract an array of measurements by an initial measurement""" matrix = np.asarray(measurements) return matrix - np.asarray(list(itertools.repeat(initial, matrix.shape[1]))) -def split_version_string_into_components(version: str) -> List[str]: - return version.split('-') +def split_version_string_into_components(version: str) -> list[str]: + return version.split("-") """External input trigger There are four 3V3 non-isolated triggers on the back (1, 2, 3, 4). """ -ExternalInput = NewType('ExternalInput', int) +ExternalInput = NewType("ExternalInput", int) class QDac2Trigger_Context: @@ -96,7 +95,7 @@ class QDac2Trigger_Context: that the trigger can be automatically reclaimed when the context exits. """ - def __init__(self, parent: 'QDac2', value: int): + def __init__(self, parent: "QDac2", value: int): self._parent = parent self._value = value @@ -125,64 +124,58 @@ class QDac2ExternalTrigger(InstrumentChannel): non-isolated 3V3 on the back (4, 5). """ - def __init__(self, parent: 'QDac2', name: str, external: int): + def __init__(self, parent: "QDac2", name: str, external: int): super().__init__(parent, name) - self.add_function( - name='source_from_bus', - call_cmd=f'outp:trig{external}:sour bus' - ) + self.add_function(name="source_from_bus", call_cmd=f"outp:trig{external}:sour bus") self.add_parameter( - name='source_from_input', + name="source_from_input", # Route external input to external output - set_cmd='outp:trig{0}:sour ext{1}'.format(external, '{}'), - get_parser=int + set_cmd="outp:trig{}:sour ext{}".format(external, "{}"), + get_parser=int, ) self.add_parameter( - name='source_from_trigger', + name="source_from_trigger", # Route internal trigger to external output set_parser=_trigger_context_to_value, - set_cmd='outp:trig{0}:sour int{1}'.format(external, '{}'), - get_parser=int + set_cmd="outp:trig{}:sour int{}".format(external, "{}"), + get_parser=int, ) self.add_parameter( - name='width_s', - label='width', - unit='s', - set_cmd='outp:trig{0}:widt {1}'.format(external, '{}'), - get_cmd=f'outp:trig{external}:widt?', - get_parser=float + name="width_s", + label="width", + unit="s", + set_cmd="outp:trig{}:widt {}".format(external, "{}"), + get_cmd=f"outp:trig{external}:widt?", + get_parser=float, ) self.add_parameter( - name='polarity', - label='polarity', - set_cmd='outp:trig{0}:pol {1}'.format(external, '{}'), - get_cmd=f'outp:trig{external}:pol?', + name="polarity", + label="polarity", + set_cmd="outp:trig{}:pol {}".format(external, "{}"), + get_cmd=f"outp:trig{external}:pol?", get_parser=str, - vals=validators.Enum('inv', 'norm') + vals=validators.Enum("inv", "norm"), ) self.add_parameter( - name='delay_s', - label='delay', - unit='s', - set_cmd='outp:trig{0}:del {1}'.format(external, '{}'), - get_cmd=f'outp:trig{external}:del?', - get_parser=float - ) - self.add_function( - name='signal', - call_cmd=f'outp:trig{external}:sign' + name="delay_s", + label="delay", + unit="s", + set_cmd="outp:trig{}:del {}".format(external, "{}"), + get_cmd=f"outp:trig{external}:del?", + get_parser=float, ) + self.add_function(name="signal", call_cmd=f"outp:trig{external}:sign") self.add_parameter( - name='set_trigger', + name="set_trigger", # Route internal trigger to external output - #set_parser=_trigger_context_to_value, - set_cmd='outp:trig{0}:sour {1}'.format(external, "{}"), + # set_parser=_trigger_context_to_value, + set_cmd="outp:trig{}:sour {}".format(external, "{}"), ) class _Channel_Context(metaclass=abc.ABCMeta): - def __init__(self, channel: 'QDac2Channel'): + def __init__(self, channel: "QDac2Channel"): self._channel = channel def __enter__(self): @@ -227,9 +220,9 @@ def _channel_message(self, template: str) -> None: class _Dc_Context(_Channel_Context): - def __init__(self, channel: 'QDac2Channel'): + def __init__(self, channel: "QDac2Channel"): super().__init__(channel) - self._write_channel('sour{0}:dc:trig:sour hold') + self._write_channel("sour{0}:dc:trig:sour hold") self._trigger: Optional[QDac2Trigger_Context] = None self._marker_start: Optional[QDac2Trigger_Context] = None self._marker_end: Optional[QDac2Trigger_Context] = None @@ -258,9 +251,8 @@ def start_on_external(self, trigger: ExternalInput) -> None: self._make_ready_to_start() def abort(self) -> None: - """Abort any DC running generator on the channel - """ - self._write_channel('sour{0}:dc:abor') + """Abort any DC running generator on the channel""" + self._write_channel("sour{0}:dc:abor") def end_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the end of the DC generator @@ -318,33 +310,42 @@ def _set_delay(self, delay_s: float) -> None: self._write_channel(f'sour{"{0}"}:dc:del {delay_s}') def _set_triggering(self) -> None: - self._write_channel('sour{0}:dc:trig:sour bus') + self._write_channel("sour{0}:dc:trig:sour bus") self._make_ready_to_start() def _start(self, description: str) -> None: if self._trigger: self._make_ready_to_start() - return self._write_channel(f'tint {self._trigger.value}') + return self._write_channel(f"tint {self._trigger.value}") self._switch_to_immediate_trigger() - self._write_channel('sour{0}:dc:init') + self._write_channel("sour{0}:dc:init") def _make_ready_to_start(self) -> None: - self._write_channel('sour{0}:dc:init:cont on') - self._write_channel('sour{0}:dc:init') + self._write_channel("sour{0}:dc:init:cont on") + self._write_channel("sour{0}:dc:init") def _switch_to_immediate_trigger(self) -> None: - self._write_channel('sour{0}:dc:init:cont off') - self._write_channel('sour{0}:dc:trig:sour imm') + self._write_channel("sour{0}:dc:init:cont off") + self._write_channel("sour{0}:dc:trig:sour imm") class Sweep_Context(_Dc_Context): - def __init__(self, channel: 'QDac2Channel', start_V: float, stop_V: float, - points: int, repetitions: int, dwell_s: float, delay_s: float, - backwards: bool, stepped: bool): + def __init__( + self, + channel: "QDac2Channel", + start_V: float, + stop_V: float, + points: int, + repetitions: int, + dwell_s: float, + delay_s: float, + backwards: bool, + stepped: bool, + ): self._repetitions = repetitions super().__init__(channel) - channel.write_channel('sour{0}:volt:mode swe') + channel.write_channel("sour{0}:volt:mode swe") self._set_voltages(start_V, stop_V) channel.write_channel(f'sour{"{0}"}:swe:poin {points}') self._set_trigger_mode(stepped) @@ -360,13 +361,13 @@ def _set_voltages(self, start_V: float, stop_V: float): def _set_trigger_mode(self, stepped: bool) -> None: if stepped: - return self._write_channel('sour{0}:swe:gen step') - self._write_channel('sour{0}:swe:gen auto') + return self._write_channel("sour{0}:swe:gen step") + self._write_channel("sour{0}:swe:gen auto") def _set_direction(self, backwards: bool) -> None: if backwards: - return self._write_channel('sour{0}:swe:dir down') - self._write_channel('sour{0}:swe:dir up') + return self._write_channel("sour{0}:swe:dir down") + self._write_channel("sour{0}:swe:dir up") def _set_repetitions(self) -> None: self._write_channel(f'sour{"{0}"}:swe:coun {self._repetitions}') @@ -375,44 +376,43 @@ def _perpetual(self) -> bool: return self._repetitions < 0 def start(self) -> None: - """Start the DC sweep - """ - self._start('DC sweep') + """Start the DC sweep""" + self._start("DC sweep") def points(self) -> int: """ Returns: int: Number of steps in the DC sweep """ - return int(self._ask_channel('sour{0}:swe:poin?')) + return int(self._ask_channel("sour{0}:swe:poin?")) def cycles_remaining(self) -> int: """ Returns: int: Number of cycles remaining in the DC sweep """ - return int(self._ask_channel('sour{0}:swe:ncl?')) + return int(self._ask_channel("sour{0}:swe:ncl?")) def time_s(self) -> float: """ Returns: float: Seconds that it will take to do the sweep """ - return float(self._ask_channel('sour{0}:swe:time?')) + return float(self._ask_channel("sour{0}:swe:time?")) def start_V(self) -> float: """ Returns: float: Starting voltage """ - return float(self._ask_channel('sour{0}:swe:star?')) + return float(self._ask_channel("sour{0}:swe:star?")) def stop_V(self) -> float: """ Returns: float: Ending voltage """ - return float(self._ask_channel('sour{0}:swe:stop?')) + return float(self._ask_channel("sour{0}:swe:stop?")) def values_V(self) -> Sequence[float]: """ @@ -424,12 +424,19 @@ def values_V(self) -> Sequence[float]: class List_Context(_Dc_Context): - def __init__(self, channel: 'QDac2Channel', voltages: Sequence[float], - repetitions: int, dwell_s: float, delay_s: float, - backwards: bool, stepped: bool): + def __init__( + self, + channel: "QDac2Channel", + voltages: Sequence[float], + repetitions: int, + dwell_s: float, + delay_s: float, + backwards: bool, + stepped: bool, + ): super().__init__(channel) self._repetitions = repetitions - self._write_channel('sour{0}:volt:mode list') + self._write_channel("sour{0}:volt:mode list") self._set_voltages(voltages) self._set_trigger_mode(stepped) self._write_channel(f'sour{"{0}"}:list:dwel {dwell_s}') @@ -439,17 +446,17 @@ def __init__(self, channel: 'QDac2Channel', voltages: Sequence[float], self._set_triggering() def _set_voltages(self, voltages: Sequence[float]) -> None: - self._write_channel_floats('sour{0}:list:volt ', voltages) + self._write_channel_floats("sour{0}:list:volt ", voltages) def _set_trigger_mode(self, stepped: bool) -> None: if stepped: - return self._write_channel('sour{0}:list:tmod step') - self._write_channel('sour{0}:list:tmod auto') + return self._write_channel("sour{0}:list:tmod step") + self._write_channel("sour{0}:list:tmod auto") def _set_direction(self, backwards: bool) -> None: if backwards: - return self._write_channel('sour{0}:list:dir down') - self._write_channel('sour{0}:list:dir up') + return self._write_channel("sour{0}:list:dir down") + self._write_channel("sour{0}:list:dir up") def _set_repetitions(self) -> None: self._write_channel(f'sour{"{0}"}:list:coun {self._repetitions}') @@ -458,9 +465,8 @@ def _perpetual(self) -> bool: return self._repetitions < 0 def start(self) -> None: - """Start the DC list generator - """ - self._start('DC list') + """Start the DC list generator""" + self._start("DC list") def append(self, voltages: Sequence[float]) -> None: """Append voltages to the existing list @@ -468,7 +474,7 @@ def append(self, voltages: Sequence[float]) -> None: Arguments: voltages (Sequence[float]): Sequence of voltages """ - self._write_channel_floats('sour{0}:list:volt:app ', voltages) + self._write_channel_floats("sour{0}:list:volt:app ", voltages) self._make_ready_to_start() def points(self) -> int: @@ -476,14 +482,14 @@ def points(self) -> int: Returns: int: Number of steps in the DC list """ - return int(self._ask_channel('sour{0}:list:poin?')) + return int(self._ask_channel("sour{0}:list:poin?")) def cycles_remaining(self) -> int: """ Returns: int: Number of cycles remaining in the DC list """ - return int(self._ask_channel('sour{0}:list:ncl?')) + return int(self._ask_channel("sour{0}:list:ncl?")) def values_V(self) -> Sequence[float]: """ @@ -492,13 +498,12 @@ def values_V(self) -> Sequence[float]: """ # return comma_sequence_to_list_of_floats( # self._ask_channel('sour{0}:list:volt?')) - return comma_sequence_to_list_of_floats( - self._ask_channel('sour{0}:list:volt?')) + return comma_sequence_to_list_of_floats(self._ask_channel("sour{0}:list:volt?")) class _Waveform_Context(_Channel_Context): - def __init__(self, channel: 'QDac2Channel'): + def __init__(self, channel: "QDac2Channel"): super().__init__(channel) self._trigger: Optional[QDac2Trigger_Context] = None self._marker_start: Optional[QDac2Trigger_Context] = None @@ -509,7 +514,7 @@ def __init__(self, channel: 'QDac2Channel'): def _start(self, wave_kind: str, description: str) -> None: if self._trigger: self._make_ready_to_start(wave_kind) - return self._write_channel(f'tint {self._trigger.value}') + return self._write_channel(f"tint {self._trigger.value}") self._switch_to_immediate_trigger(wave_kind) self._write_channel(f'sour{"{0}"}:{wave_kind}:init') @@ -569,66 +574,72 @@ def _set_slew(self, wave_kind: str, slew_V_s: Optional[float]) -> None: class Square_Context(_Waveform_Context): - def __init__(self, channel: 'QDac2Channel', frequency_Hz: Optional[float], - repetitions: int, period_s: Optional[float], - duty_cycle_percent: float, kind: str, inverted: bool, - span_V: float, offset_V: float, delay_s: float, - slew_V_s: Optional[float]): + def __init__( + self, + channel: "QDac2Channel", + frequency_Hz: Optional[float], + repetitions: int, + period_s: Optional[float], + duty_cycle_percent: float, + kind: str, + inverted: bool, + span_V: float, + offset_V: float, + delay_s: float, + slew_V_s: Optional[float], + ): super().__init__(channel) self._repetitions = repetitions - self._write_channel('sour{0}:squ:trig:sour hold') + self._write_channel("sour{0}:squ:trig:sour hold") self._set_frequency(frequency_Hz, period_s) self._write_channel(f'sour{"{0}"}:squ:dcyc {duty_cycle_percent}') self._set_type(kind) self._set_polarity(inverted) self._write_channel(f'sour{"{0}"}:squ:span {span_V}') self._write_channel(f'sour{"{0}"}:squ:offs {offset_V}') - self._set_slew('squ', slew_V_s) - super()._set_delay('squ', delay_s) + self._set_slew("squ", slew_V_s) + super()._set_delay("squ", delay_s) self._write_channel(f'sour{"{0}"}:squ:coun {repetitions}') self._set_triggering() def start(self) -> None: - """Start the square wave generator - """ - self._start('squ', 'square wave') + """Start the square wave generator""" + self._start("squ", "square wave") def abort(self) -> None: - """Abort any running square wave generator - """ - self._write_channel('sour{0}:squ:abor') + """Abort any running square wave generator""" + self._write_channel("sour{0}:squ:abor") def cycles_remaining(self) -> int: """ Returns: int: Number of cycles remaining in the square wave """ - return int(self._ask_channel('sour{0}:squ:ncl?')) + return int(self._ask_channel("sour{0}:squ:ncl?")) - def _set_frequency(self, frequency_Hz: Optional[float], - period_s: Optional[float]) -> None: + def _set_frequency(self, frequency_Hz: Optional[float], period_s: Optional[float]) -> None: if frequency_Hz: return self._write_channel(f'sour{"{0}"}:squ:freq {frequency_Hz}') if period_s: self._write_channel(f'sour{"{0}"}:squ:per {period_s}') def _set_type(self, kind: str) -> None: - if kind == 'positive': - self._write_channel('sour{0}:squ:typ pos') - elif kind == 'negative': - self._write_channel('sour{0}:squ:typ neg') + if kind == "positive": + self._write_channel("sour{0}:squ:typ pos") + elif kind == "negative": + self._write_channel("sour{0}:squ:typ neg") else: - self._write_channel('sour{0}:squ:typ symm') + self._write_channel("sour{0}:squ:typ symm") def _set_polarity(self, inverted: bool) -> None: if inverted: - self._write_channel('sour{0}:squ:pol inv') + self._write_channel("sour{0}:squ:pol inv") else: - self._write_channel('sour{0}:squ:pol norm') + self._write_channel("sour{0}:squ:pol norm") def _set_triggering(self) -> None: - self._write_channel('sour{0}:squ:trig:sour bus') - self._make_ready_to_start('squ') + self._write_channel("sour{0}:squ:trig:sour bus") + self._make_ready_to_start("squ") def end_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the end of the square wave @@ -638,7 +649,7 @@ def end_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the end """ - return super()._end_marker('squ') + return super()._end_marker("squ") def start_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the beginning of the square wave @@ -648,7 +659,7 @@ def start_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the beginning """ - return super()._start_marker('squ') + return super()._start_marker("squ") def period_end_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the end of each period @@ -658,7 +669,7 @@ def period_end_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the end of each period """ - return super()._period_end_marker('squ') + return super()._period_end_marker("squ") def period_start_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the beginning of each period @@ -668,7 +679,7 @@ def period_start_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the beginning of each period """ - return super()._period_start_marker('squ') + return super()._period_start_marker("squ") def start_on(self, trigger: QDac2Trigger_Context) -> None: """Attach internal trigger to start the square wave generator @@ -676,7 +687,7 @@ def start_on(self, trigger: QDac2Trigger_Context) -> None: Args: trigger (QDac2Trigger_Context): trigger that will start square wave """ - return super()._start_on(trigger, 'squ') + return super()._start_on(trigger, "squ") def start_on_external(self, trigger: ExternalInput) -> None: """Attach external trigger to start the square wave generator @@ -684,46 +695,51 @@ def start_on_external(self, trigger: ExternalInput) -> None: Args: trigger (ExternalInput): external trigger that will start square wave """ - return super()._start_on_external(trigger, 'squ') + return super()._start_on_external(trigger, "squ") class Sine_Context(_Waveform_Context): - def __init__(self, channel: 'QDac2Channel', frequency_Hz: Optional[float], - repetitions: int, period_s: Optional[float], inverted: bool, - span_V: float, offset_V: float, delay_s: float, - slew_V_s: Optional[float]): + def __init__( + self, + channel: "QDac2Channel", + frequency_Hz: Optional[float], + repetitions: int, + period_s: Optional[float], + inverted: bool, + span_V: float, + offset_V: float, + delay_s: float, + slew_V_s: Optional[float], + ): super().__init__(channel) self._repetitions = repetitions - self._write_channel('sour{0}:sine:trig:sour hold') + self._write_channel("sour{0}:sine:trig:sour hold") self._set_frequency(frequency_Hz, period_s) self._set_polarity(inverted) self._write_channel(f'sour{"{0}"}:sine:span {span_V}') self._write_channel(f'sour{"{0}"}:sine:offs {offset_V}') - self._set_slew('sine', slew_V_s) - super()._set_delay('sine', delay_s) + self._set_slew("sine", slew_V_s) + super()._set_delay("sine", delay_s) self._write_channel(f'sour{"{0}"}:sine:coun {repetitions}') self._set_triggering() def start(self) -> None: - """Start the sine wave generator - """ - self._start('sine', 'sine wave') + """Start the sine wave generator""" + self._start("sine", "sine wave") def abort(self) -> None: - """Abort any running sine wave generator - """ - self._write_channel('sour{0}:sine:abor') + """Abort any running sine wave generator""" + self._write_channel("sour{0}:sine:abor") def cycles_remaining(self) -> int: """ Returns: int: Number of cycles remaining in the sine wave """ - return int(self._ask_channel('sour{0}:sine:ncl?')) + return int(self._ask_channel("sour{0}:sine:ncl?")) - def _set_frequency(self, frequency_Hz: Optional[float], - period_s: Optional[float]) -> None: + def _set_frequency(self, frequency_Hz: Optional[float], period_s: Optional[float]) -> None: if frequency_Hz: return self._write_channel(f'sour{"{0}"}:sine:freq {frequency_Hz}') if period_s: @@ -731,13 +747,13 @@ def _set_frequency(self, frequency_Hz: Optional[float], def _set_polarity(self, inverted: bool) -> None: if inverted: - self._write_channel('sour{0}:sine:pol inv') + self._write_channel("sour{0}:sine:pol inv") else: - self._write_channel('sour{0}:sine:pol norm') + self._write_channel("sour{0}:sine:pol norm") def _set_triggering(self) -> None: - self._write_channel('sour{0}:sine:trig:sour bus') - self._make_ready_to_start('sine') + self._write_channel("sour{0}:sine:trig:sour bus") + self._make_ready_to_start("sine") def end_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the end of the sine wave @@ -747,7 +763,7 @@ def end_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the end """ - return super()._end_marker('sine') + return super()._end_marker("sine") def start_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the beginning of the sine wave @@ -757,7 +773,7 @@ def start_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the beginning """ - return super()._start_marker('sine') + return super()._start_marker("sine") def period_end_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the end of each period @@ -767,7 +783,7 @@ def period_end_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the end of each period """ - return super()._period_end_marker('sine') + return super()._period_end_marker("sine") def period_start_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the beginning of each period @@ -777,7 +793,7 @@ def period_start_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the beginning of each period """ - return super()._period_start_marker('sine') + return super()._period_start_marker("sine") def start_on(self, trigger: QDac2Trigger_Context) -> None: """Attach internal trigger to start the sine wave generator @@ -785,7 +801,7 @@ def start_on(self, trigger: QDac2Trigger_Context) -> None: Args: trigger (QDac2Trigger_Context): trigger that will start sine wave """ - return super()._start_on(trigger, 'sine') + return super()._start_on(trigger, "sine") def start_on_external(self, trigger: ExternalInput) -> None: """Attach external trigger to start the sine wave generator @@ -793,69 +809,75 @@ def start_on_external(self, trigger: ExternalInput) -> None: Args: trigger (ExternalInput): external trigger that will start sine wave """ - return super()._start_on_external(trigger, 'sine') + return super()._start_on_external(trigger, "sine") class Triangle_Context(_Waveform_Context): - def __init__(self, channel: 'QDac2Channel', frequency_Hz: Optional[float], - repetitions: int, period_s: Optional[float], - duty_cycle_percent: float, inverted: bool, span_V: float, - offset_V: float, delay_s: float, slew_V_s: Optional[float]): + def __init__( + self, + channel: "QDac2Channel", + frequency_Hz: Optional[float], + repetitions: int, + period_s: Optional[float], + duty_cycle_percent: float, + inverted: bool, + span_V: float, + offset_V: float, + delay_s: float, + slew_V_s: Optional[float], + ): super().__init__(channel) self._repetitions = repetitions - self._write_channel('sour{0}:tri:trig:sour hold') + self._write_channel("sour{0}:tri:trig:sour hold") self._set_frequency(frequency_Hz, period_s) self._write_channel(f'sour{"{0}"}:tri:dcyc {duty_cycle_percent}') self._set_polarity(inverted) self._write_channel(f'sour{"{0}"}:tri:span {span_V}') self._write_channel(f'sour{"{0}"}:tri:offs {offset_V}') - self._set_slew('tri', slew_V_s) - super()._set_delay('tri', delay_s) + self._set_slew("tri", slew_V_s) + super()._set_delay("tri", delay_s) self._write_channel(f'sour{"{0}"}:tri:coun {repetitions}') self._set_triggering() def start(self) -> None: - """Start the triangle wave generator - """ - self._start('tri', 'triangle wave') + """Start the triangle wave generator""" + self._start("tri", "triangle wave") def abort(self) -> None: - """Abort any running triangle wave generator - """ - self._write_channel('sour{0}:tri:abor') + """Abort any running triangle wave generator""" + self._write_channel("sour{0}:tri:abor") def cycles_remaining(self) -> int: """ Returns: int: Number of cycles remaining in the triangle wave """ - return int(self._ask_channel('sour{0}:tri:ncl?')) + return int(self._ask_channel("sour{0}:tri:ncl?")) - def _set_frequency(self, frequency_Hz: Optional[float], - period_s: Optional[float]) -> None: + def _set_frequency(self, frequency_Hz: Optional[float], period_s: Optional[float]) -> None: if frequency_Hz: return self._write_channel(f'sour{"{0}"}:tri:freq {frequency_Hz}') if period_s: self._write_channel(f'sour{"{0}"}:tri:per {period_s}') def _set_type(self, kind: bool) -> None: - if kind == 'positive': - self._write_channel('sour{0}:tri:typ pos') - elif kind == 'negative': - self._write_channel('sour{0}:tri:typ neg') + if kind == "positive": + self._write_channel("sour{0}:tri:typ pos") + elif kind == "negative": + self._write_channel("sour{0}:tri:typ neg") else: - self._write_channel('sour{0}:tri:typ symm') + self._write_channel("sour{0}:tri:typ symm") def _set_polarity(self, inverted: bool) -> None: if inverted: - self._write_channel('sour{0}:tri:pol inv') + self._write_channel("sour{0}:tri:pol inv") else: - self._write_channel('sour{0}:tri:pol norm') + self._write_channel("sour{0}:tri:pol norm") def _set_triggering(self) -> None: - self._write_channel('sour{0}:tri:trig:sour bus') - self._make_ready_to_start('tri') + self._write_channel("sour{0}:tri:trig:sour bus") + self._make_ready_to_start("tri") def end_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the end of the triangle wave @@ -865,7 +887,7 @@ def end_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the end """ - return super()._end_marker('tri') + return super()._end_marker("tri") def start_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the beginning of the triangle wave @@ -875,7 +897,7 @@ def start_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the beginning """ - return super()._start_marker('tri') + return super()._start_marker("tri") def period_end_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the end of each period @@ -885,7 +907,7 @@ def period_end_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the end of each period """ - return super()._period_end_marker('tri') + return super()._period_end_marker("tri") def period_start_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the beginning of each period @@ -895,7 +917,7 @@ def period_start_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the beginning of each period """ - return super()._period_start_marker('tri') + return super()._period_start_marker("tri") def start_on(self, trigger: QDac2Trigger_Context) -> None: """Attach internal trigger to start the triangle wave generator @@ -903,7 +925,7 @@ def start_on(self, trigger: QDac2Trigger_Context) -> None: Args: trigger (QDac2Trigger_Context): trigger that will start triangle """ - return super()._start_on(trigger, 'tri') + return super()._start_on(trigger, "tri") def start_on_external(self, trigger: ExternalInput) -> None: """Attach external trigger to start the triangle wave generator @@ -911,44 +933,48 @@ def start_on_external(self, trigger: ExternalInput) -> None: Args: trigger (ExternalInput): external trigger that will start triangle """ - return super()._start_on_external(trigger, 'tri') + return super()._start_on_external(trigger, "tri") class Awg_Context(_Waveform_Context): - def __init__(self, channel: 'QDac2Channel', trace_name: str, - repetitions: int, scale: float, offset_V: float, - slew_V_s: Optional[float]): + def __init__( + self, + channel: "QDac2Channel", + trace_name: str, + repetitions: int, + scale: float, + offset_V: float, + slew_V_s: Optional[float], + ): super().__init__(channel) self._repetitions = repetitions - self._write_channel('sour{0}:awg:trig:sour hold') + self._write_channel("sour{0}:awg:trig:sour hold") self._write_channel(f'sour{"{0}"}:awg:def "{trace_name}"') self._write_channel(f'sour{"{0}"}:awg:scal {scale}') self._write_channel(f'sour{"{0}"}:awg:offs {offset_V}') - self._set_slew('awg', slew_V_s) + self._set_slew("awg", slew_V_s) self._write_channel(f'sour{"{0}"}:awg:coun {repetitions}') self._set_triggering() def start(self) -> None: - """Start the AWG - """ - self._start('awg', 'AWG') + """Start the AWG""" + self._start("awg", "AWG") def abort(self) -> None: - """Abort any running AWG - """ - self._write_channel('sour{0}:awg:abor') + """Abort any running AWG""" + self._write_channel("sour{0}:awg:abor") def cycles_remaining(self) -> int: """ Returns: int: Number of cycles remaining in the AWG """ - return int(self._ask_channel('sour{0}:awg:ncl?')) + return int(self._ask_channel("sour{0}:awg:ncl?")) def _set_triggering(self) -> None: - self._write_channel('sour{0}:awg:trig:sour bus') - self._make_ready_to_start('awg') + self._write_channel("sour{0}:awg:trig:sour bus") + self._make_ready_to_start("awg") def end_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the end of the AWG @@ -958,7 +984,7 @@ def end_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the end """ - return super()._end_marker('awg') + return super()._end_marker("awg") def start_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the beginning of the AWG @@ -968,7 +994,7 @@ def start_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the beginning """ - return super()._start_marker('awg') + return super()._start_marker("awg") def period_end_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the end of each period @@ -978,7 +1004,7 @@ def period_end_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the end of each period """ - return super()._period_end_marker('awg') + return super()._period_end_marker("awg") def period_start_marker(self) -> QDac2Trigger_Context: """Internal trigger that will mark the beginning of each period @@ -988,7 +1014,7 @@ def period_start_marker(self) -> QDac2Trigger_Context: Returns: QDac2Trigger_Context: trigger that will mark the beginning of each period """ - return super()._period_start_marker('awg') + return super()._period_start_marker("awg") def start_on(self, trigger: QDac2Trigger_Context) -> None: """Attach internal trigger to start the AWG @@ -996,7 +1022,7 @@ def start_on(self, trigger: QDac2Trigger_Context) -> None: Args: trigger (QDac2Trigger_Context): trigger that will start AWG """ - return super()._start_on(trigger, 'awg') + return super()._start_on(trigger, "awg") def start_on_external(self, trigger: ExternalInput) -> None: """Attach external trigger to start the AWG @@ -1004,14 +1030,20 @@ def start_on_external(self, trigger: ExternalInput) -> None: Args: trigger (ExternalInput): external trigger that will start AWG """ - return super()._start_on_external(trigger, 'awg') + return super()._start_on_external(trigger, "awg") class Measurement_Context(_Channel_Context): - def __init__(self, channel: 'QDac2Channel', delay_s: float, - repetitions: int, current_range: str, - aperture_s: Optional[float], nplc: Optional[int]): + def __init__( + self, + channel: "QDac2Channel", + delay_s: float, + repetitions: int, + current_range: str, + aperture_s: Optional[float], + nplc: Optional[int], + ): super().__init__(channel) self._trigger: Optional[QDac2Trigger_Context] = None self._write_channel(f'sens{"{0}"}:del {delay_s}') @@ -1021,16 +1053,15 @@ def __init__(self, channel: 'QDac2Channel', delay_s: float, self._set_triggering() def start(self) -> None: - """Start a current measurement - """ + """Start a current measurement""" if self._trigger: - return self._write_channel(f'tint {self._trigger.value}') + return self._write_channel(f"tint {self._trigger.value}") self._switch_to_immediate_trigger() - self._write_channel('sens{0}:init') + self._write_channel("sens{0}:init") def _switch_to_immediate_trigger(self) -> None: - self._write_channel('sens{0}:init:cont off') - self._write_channel('sens{0}:trig:sour imm') + self._write_channel("sens{0}:init:cont off") + self._write_channel("sens{0}:trig:sour imm") def start_on(self, trigger: QDac2Trigger_Context) -> None: """Attach internal trigger to start the current measurement @@ -1055,23 +1086,22 @@ def start_on_external(self, trigger: ExternalInput) -> None: self._write_channel(f'sens{"{0}"}:init') def abort(self) -> None: - """Abort current measurement - """ - self._write_channel('sens{0}:abor') + """Abort current measurement""" + self._write_channel("sens{0}:abor") def n_cycles_remaining(self) -> int: """ Returns: int: Number of measurements remaining """ - return int(self._ask_channel('sens{0}:ncl?')) + return int(self._ask_channel("sens{0}:ncl?")) def n_available(self) -> int: """ Returns: int: Number of measurements available """ - return int(self._ask_channel('sens{0}:data:poin?')) + return int(self._ask_channel("sens{0}:data:poin?")) def available_A(self) -> Sequence[float]: """Retrieve current measurements @@ -1084,8 +1114,7 @@ def available_A(self) -> Sequence[float]: # Bug circumvention if self.n_available() == 0: return list() - return comma_sequence_to_list_of_floats( - self._ask_channel('sens{0}:data:rem?')) + return comma_sequence_to_list_of_floats(self._ask_channel("sens{0}:data:rem?")) def peek_A(self) -> float: """Peek at the first available current measurement @@ -1093,206 +1122,179 @@ def peek_A(self) -> float: Returns: float: current in Amperes """ - return float(self._ask_channel('sens{0}:data:last?')) + return float(self._ask_channel("sens{0}:data:last?")) - def _set_aperture(self, aperture_s: Optional[float], nplc: Optional[int] - ) -> None: + def _set_aperture(self, aperture_s: Optional[float], nplc: Optional[int]) -> None: if aperture_s: return self._write_channel(f'sens{"{0}"}:aper {aperture_s}') self._write_channel(f'sens{"{0}"}:nplc {nplc}') def _set_triggering(self) -> None: - self._write_channel('sens{0}:trig:sour bus') - self._write_channel('sens{0}:init') + self._write_channel("sens{0}:trig:sour bus") + self._write_channel("sens{0}:init") class QDac2Channel(InstrumentChannel): - def __init__(self, parent: 'QDac2', name: str, channum: int): + def __init__(self, parent: "QDac2", name: str, channum: int): super().__init__(parent, name) self._channum = channum self.add_parameter( - name='measurement_range', - label='range', - set_cmd='sens{1}:rang {0}'.format('{}', channum), - get_cmd=f'sens{channum}:rang?', - vals=validators.Enum('low', 'high') + name="measurement_range", + label="range", + set_cmd="sens{1}:rang {0}".format("{}", channum), + get_cmd=f"sens{channum}:rang?", + vals=validators.Enum("low", "high"), ) self.add_parameter( - name='measurement_aperture_s', - label='aperture', - unit='s', - set_cmd='sens{1}:aper {0}'.format('{}', channum), - get_cmd=f'sens{channum}:aper?', - get_parser=float + name="measurement_aperture_s", + label="aperture", + unit="s", + set_cmd="sens{1}:aper {0}".format("{}", channum), + get_cmd=f"sens{channum}:aper?", + get_parser=float, ) self.add_parameter( - name='measurement_nplc', - label='PLC', - set_cmd='sens{1}:nplc {0}'.format('{}', channum), - get_cmd=f'sens{channum}:nplc?', - get_parser=int + name="measurement_nplc", + label="PLC", + set_cmd="sens{1}:nplc {0}".format("{}", channum), + get_cmd=f"sens{channum}:nplc?", + get_parser=int, ) self.add_parameter( - name='measurement_delay_s', - label=f'delay', - unit='s', - set_cmd='sens{1}:del {0}'.format('{}', channum), - get_cmd=f'sens{channum}:del?', - get_parser=float - ) - self.add_function( - name='measurement_abort', - call_cmd=f'sens{channum}:abor' + name="measurement_delay_s", + label=f"delay", + unit="s", + set_cmd="sens{1}:del {0}".format("{}", channum), + get_cmd=f"sens{channum}:del?", + get_parser=float, ) + self.add_function(name="measurement_abort", call_cmd=f"sens{channum}:abor") self.add_parameter( - name='measurement_count', - label='count', - set_cmd='sens{1}:coun {0}'.format('{}', channum), - get_cmd=f'sens{channum}:coun?', - get_parser=int + name="measurement_count", + label="count", + set_cmd="sens{1}:coun {0}".format("{}", channum), + get_cmd=f"sens{channum}:coun?", + get_parser=int, ) self.add_parameter( - name='n_masurements_remaining', - label='remaning', - get_cmd=f'sens{channum}:ncl?', - get_parser=int + name="n_masurements_remaining", label="remaning", get_cmd=f"sens{channum}:ncl?", get_parser=int ) self.add_parameter( - name='current_last_A', - label='last', - unit='A', - get_cmd=f'sens{channum}:data:last?', - get_parser=float + name="current_last_A", label="last", unit="A", get_cmd=f"sens{channum}:data:last?", get_parser=float ) self.add_parameter( - name='n_measurements_available', - label='available', - get_cmd=f'sens{channum}:data:poin?', - get_parser=int + name="n_measurements_available", label="available", get_cmd=f"sens{channum}:data:poin?", get_parser=int ) self.add_parameter( - name='current_start_on', + name="current_start_on", # Channel {channum} current measurement on internal trigger set_parser=_trigger_context_to_value, - set_cmd='sens{1}:trig:sour int{0}'.format('{}', channum), + set_cmd="sens{1}:trig:sour int{0}".format("{}", channum), ) self.add_parameter( - name='measurement_start_on_external', + name="measurement_start_on_external", # Channel {channum} current measurement on external input - set_cmd='sens{1}:trig:sour ext{0}'.format('{}', channum), + set_cmd="sens{1}:trig:sour ext{0}".format("{}", channum), ) self.add_parameter( - name='output_range', - label='range', - set_cmd='sour{1}:rang {0}'.format('{}', channum), - get_cmd=f'sour{channum}:rang?', - vals=validators.Enum('low', 'high') + name="output_range", + label="range", + set_cmd="sour{1}:rang {0}".format("{}", channum), + get_cmd=f"sour{channum}:rang?", + vals=validators.Enum("low", "high"), ) self.add_parameter( - name='output_low_range_minimum_V', - label='low range min', - unit='V', - get_cmd=f'sour{channum}:rang:low:min?', - get_parser=float + name="output_low_range_minimum_V", + label="low range min", + unit="V", + get_cmd=f"sour{channum}:rang:low:min?", + get_parser=float, ) self.add_parameter( - name='output_low_range_maximum_V', - label='low voltage max', - unit='V', - get_cmd=f'sour{channum}:rang:low:max?', - get_parser=float + name="output_low_range_maximum_V", + label="low voltage max", + unit="V", + get_cmd=f"sour{channum}:rang:low:max?", + get_parser=float, ) self.add_parameter( - name='output_high_range_minimum_V', - label='high voltage min', - unit='V', - get_cmd=f'sour{channum}:rang:high:min?', - get_parser=float + name="output_high_range_minimum_V", + label="high voltage min", + unit="V", + get_cmd=f"sour{channum}:rang:high:min?", + get_parser=float, ) self.add_parameter( - name='output_high_range_maximum_V', - label='high voltage max', - unit='V', - get_cmd=f'sour{channum}:rang:high:max?', - get_parser=float + name="output_high_range_maximum_V", + label="high voltage max", + unit="V", + get_cmd=f"sour{channum}:rang:high:max?", + get_parser=float, ) self.add_parameter( - name='output_filter', - label=f'low-pass cut-off', - unit='Hz', - set_cmd='sour{1}:filt {0}'.format('{}', channum), - get_cmd=f'sour{channum}:filt?', + name="output_filter", + label=f"low-pass cut-off", + unit="Hz", + set_cmd="sour{1}:filt {0}".format("{}", channum), + get_cmd=f"sour{channum}:filt?", get_parser=str, - vals=validators.Enum('dc', 'med', 'high') + vals=validators.Enum("dc", "med", "high"), ) self.add_parameter( - name='dc_constant_V', - label=f'ch{channum}', - unit='V', + name="dc_constant_V", + label=f"ch{channum}", + unit="V", set_cmd=self._set_fixed_voltage_immediately, - get_cmd=f'sour{channum}:volt?', + get_cmd=f"sour{channum}:volt?", get_parser=float, - vals=validators.Numbers(-10.0, 10.0) + vals=validators.Numbers(-10.0, 10.0), ) self.add_parameter( - name='dc_last_V', - label=f'ch{channum}', - unit='V', - get_cmd=f'sour{channum}:volt:last?', - get_parser=float + name="dc_last_V", label=f"ch{channum}", unit="V", get_cmd=f"sour{channum}:volt:last?", get_parser=float ) self.add_parameter( - name='dc_next_V', - label=f'ch{channum}', - unit='V', - set_cmd='sour{1}:volt:trig {0}'.format('{}', channum), - get_cmd=f'sour{channum}:volt:trig?', - get_parser=float + name="dc_next_V", + label=f"ch{channum}", + unit="V", + set_cmd="sour{1}:volt:trig {0}".format("{}", channum), + get_cmd=f"sour{channum}:volt:trig?", + get_parser=float, ) self.add_parameter( - name='dc_slew_rate_V_per_s', - label=f'ch{channum}', - unit='V/s', - set_cmd='sour{1}:volt:slew {0}'.format('{}', channum), - get_cmd=f'sour{channum}:volt:slew?', - get_parser=float + name="dc_slew_rate_V_per_s", + label=f"ch{channum}", + unit="V/s", + set_cmd="sour{1}:volt:slew {0}".format("{}", channum), + get_cmd=f"sour{channum}:volt:slew?", + get_parser=float, ) self.add_parameter( - name='read_current_A', + name="read_current_A", # Perform immediate current measurement on channel - label=f'ch{channum}', - unit='A', - get_cmd=f'read{channum}?', - get_parser=float + label=f"ch{channum}", + unit="A", + get_cmd=f"read{channum}?", + get_parser=float, ) self.add_parameter( - name='fetch_current_A', + name="fetch_current_A", # Retrieve all available current measurements on channel - label=f'ch{channum}', - unit='A', - get_cmd=f'fetc{channum}?', - get_parser=comma_sequence_to_list_of_floats + label=f"ch{channum}", + unit="A", + get_cmd=f"fetc{channum}?", + get_parser=comma_sequence_to_list_of_floats, ) self.add_parameter( - name='dc_mode', - label=f'DC mode', - set_cmd='sour{1}:volt:mode {0}'.format('{}', channum), - get_cmd=f'sour{channum}:volt:mode?', - vals=validators.Enum('fixed', 'list', 'sweep') - ) - self.add_function( - name='dc_initiate', - call_cmd=f'sour{channum}:dc:init' - ) - self.add_function( - name='dc_abort', - call_cmd=f'sour{channum}:dc:abor' - ) - self.add_function( - name='abort', - call_cmd=f'sour{channum}:all:abor' + name="dc_mode", + label=f"DC mode", + set_cmd="sour{1}:volt:mode {0}".format("{}", channum), + get_cmd=f"sour{channum}:volt:mode?", + vals=validators.Enum("fixed", "list", "sweep"), ) + self.add_function(name="dc_initiate", call_cmd=f"sour{channum}:dc:init") + self.add_function(name="dc_abort", call_cmd=f"sour{channum}:dc:abor") + self.add_function(name="abort", call_cmd=f"sour{channum}:all:abor") @property def number(self) -> int: @@ -1308,16 +1310,18 @@ def clear_measurements(self) -> Sequence[float]: Sequence[float]: list of available current measurements """ # Bug circumvention - if int(self.ask_channel('sens{0}:data:poin?')) == 0: + if int(self.ask_channel("sens{0}:data:poin?")) == 0: return list() - return comma_sequence_to_list_of_floats( - self.ask_channel('sens{0}:data:rem?')) - - def measurement(self, delay_s: float = 0.0, repetitions: int = 1, - current_range: str = 'high', - aperture_s: Optional[float] = None, - nplc: Optional[int] = None - ) -> Measurement_Context: + return comma_sequence_to_list_of_floats(self.ask_channel("sens{0}:data:rem?")) + + def measurement( + self, + delay_s: float = 0.0, + repetitions: int = 1, + current_range: str = "high", + aperture_s: Optional[float] = None, + nplc: Optional[int] = None, + ) -> Measurement_Context: """Set up a sequence of current measurements Args: @@ -1334,14 +1338,12 @@ def measurement(self, delay_s: float = 0.0, repetitions: int = 1, ValueError: configuration error """ if aperture_s and nplc: - raise ValueError('Only one of nplc or aperture_s can be ' - 'specified for a current measurement') + raise ValueError("Only one of nplc or aperture_s can be " "specified for a current measurement") if not aperture_s and not nplc: nplc = 1 - return Measurement_Context(self, delay_s, repetitions, current_range, - aperture_s, nplc) + return Measurement_Context(self, delay_s, repetitions, current_range, aperture_s, nplc) - def output_mode(self, range: str = 'high', filter: str = 'high') -> None: + def output_mode(self, range: str = "high", filter: str = "high") -> None: """Set the output voltage Args: @@ -1351,10 +1353,15 @@ def output_mode(self, range: str = 'high', filter: str = 'high') -> None: self.output_range(range) self.output_filter(filter) - def dc_list(self, voltages: Sequence[float], repetitions: int = 1, - dwell_s: float = 1e-03, delay_s: float = 0, - backwards: bool = False, stepped: bool = False - ) -> List_Context: + def dc_list( + self, + voltages: Sequence[float], + repetitions: int = 1, + dwell_s: float = 1e-03, + delay_s: float = 0, + backwards: bool = False, + stepped: bool = False, + ) -> List_Context: """Set up a DC-list generator Args: @@ -1368,13 +1375,19 @@ def dc_list(self, voltages: Sequence[float], repetitions: int = 1, Returns: List_Context: context manager """ - return List_Context(self, voltages, repetitions, dwell_s, delay_s, - backwards, stepped) - - def dc_sweep(self, start_V: float, stop_V: float, points: int, - repetitions: int = 1, dwell_s: float = 1e-03, - delay_s: float = 0, backwards=False, stepped=True - ) -> Sweep_Context: + return List_Context(self, voltages, repetitions, dwell_s, delay_s, backwards, stepped) + + def dc_sweep( + self, + start_V: float, + stop_V: float, + points: int, + repetitions: int = 1, + dwell_s: float = 1e-03, + delay_s: float = 0, + backwards=False, + stepped=True, + ) -> Sweep_Context: """Set up a DC sweep Args: @@ -1390,16 +1403,21 @@ def dc_sweep(self, start_V: float, stop_V: float, points: int, Returns: Sweep_Context: context manager """ - return Sweep_Context(self, start_V, stop_V, points, repetitions, - dwell_s, delay_s, backwards, stepped) - - def square_wave(self, frequency_Hz: Optional[float] = None, - period_s: Optional[float] = None, repetitions: int = -1, - duty_cycle_percent: float = 50.0, kind: str = 'symmetric', - inverted: bool = False, span_V: float = 0.2, - offset_V: float = 0.0, delay_s: float = 0, - slew_V_s: Optional[float] = None - ) -> Square_Context: + return Sweep_Context(self, start_V, stop_V, points, repetitions, dwell_s, delay_s, backwards, stepped) + + def square_wave( + self, + frequency_Hz: Optional[float] = None, + period_s: Optional[float] = None, + repetitions: int = -1, + duty_cycle_percent: float = 50.0, + kind: str = "symmetric", + inverted: bool = False, + span_V: float = 0.2, + offset_V: float = 0.0, + delay_s: float = 0, + slew_V_s: Optional[float] = None, + ) -> Square_Context: """Set up a square-wave generator Args: @@ -1424,16 +1442,31 @@ def square_wave(self, frequency_Hz: Optional[float] = None, raise ValueError(error_ambiguous_wave) if not frequency_Hz and not period_s: frequency_Hz = 1000 - return Square_Context(self, frequency_Hz, repetitions, period_s, - duty_cycle_percent, kind, inverted, span_V, - offset_V, delay_s, slew_V_s) - - def sine_wave(self, frequency_Hz: Optional[float] = None, - period_s: Optional[float] = None, repetitions: int = -1, - inverted: bool = False, span_V: float = 0.2, - offset_V: float = 0.0, delay_s: float = 0, - slew_V_s: Optional[float] = None - ) -> Sine_Context: + return Square_Context( + self, + frequency_Hz, + repetitions, + period_s, + duty_cycle_percent, + kind, + inverted, + span_V, + offset_V, + delay_s, + slew_V_s, + ) + + def sine_wave( + self, + frequency_Hz: Optional[float] = None, + period_s: Optional[float] = None, + repetitions: int = -1, + inverted: bool = False, + span_V: float = 0.2, + offset_V: float = 0.0, + delay_s: float = 0, + slew_V_s: Optional[float] = None, + ) -> Sine_Context: """Set up a sine-wave generator Args: @@ -1456,15 +1489,20 @@ def sine_wave(self, frequency_Hz: Optional[float] = None, raise ValueError(error_ambiguous_wave) if not frequency_Hz and not period_s: frequency_Hz = 1000 - return Sine_Context(self, frequency_Hz, repetitions, period_s, - inverted, span_V, offset_V, delay_s, slew_V_s) - - def triangle_wave(self, frequency_Hz: Optional[float] = None, - period_s: Optional[float] = None, repetitions: int = -1, - duty_cycle_percent: float = 50.0, inverted: bool = False, - span_V: float = 0.2, offset_V: float = 0.0, - delay_s: float = 0, slew_V_s: Optional[float] = None - ) -> Triangle_Context: + return Sine_Context(self, frequency_Hz, repetitions, period_s, inverted, span_V, offset_V, delay_s, slew_V_s) + + def triangle_wave( + self, + frequency_Hz: Optional[float] = None, + period_s: Optional[float] = None, + repetitions: int = -1, + duty_cycle_percent: float = 50.0, + inverted: bool = False, + span_V: float = 0.2, + offset_V: float = 0.0, + delay_s: float = 0, + slew_V_s: Optional[float] = None, + ) -> Triangle_Context: """Set up a triangle-wave generator Args: @@ -1488,14 +1526,18 @@ def triangle_wave(self, frequency_Hz: Optional[float] = None, raise ValueError(error_ambiguous_wave) if not frequency_Hz and not period_s: frequency_Hz = 1000 - return Triangle_Context(self, frequency_Hz, repetitions, period_s, - duty_cycle_percent, inverted, span_V, - offset_V, delay_s, slew_V_s) - - def arbitrary_wave(self, trace_name: str, repetitions: int = 1, - scale: float = 1.0, offset_V: float = 0.0, - slew_V_s: Optional[float] = None - ) -> Awg_Context: + return Triangle_Context( + self, frequency_Hz, repetitions, period_s, duty_cycle_percent, inverted, span_V, offset_V, delay_s, slew_V_s + ) + + def arbitrary_wave( + self, + trace_name: str, + repetitions: int = 1, + scale: float = 1.0, + offset_V: float = 0.0, + slew_V_s: Optional[float] = None, + ) -> Awg_Context: """Set up an arbitrary-wave generator Args: @@ -1508,12 +1550,11 @@ def arbitrary_wave(self, trace_name: str, repetitions: int = 1, Returns: Awg_Context: context manager """ - return Awg_Context(self, trace_name, repetitions, scale, offset_V, - slew_V_s) + return Awg_Context(self, trace_name, repetitions, scale, offset_V, slew_V_s) def _set_fixed_voltage_immediately(self, v) -> None: - self.write(f'sour{self._channum}:volt:mode fix') - self.write(f'sour{self._channum}:volt {v}') + self.write(f"sour{self._channum}:volt:mode fix") + self.write(f"sour{self._channum}:volt {v}") def ask_channel(self, cmd: str) -> str: """Inject channel number into SCPI query @@ -1588,16 +1629,21 @@ def waveform(self, values: Sequence[float]) -> None: ValueError: size mismatch """ if len(values) != self.size: - raise ValueError(f'trace length {len(values)} does not match ' - f'allocated length {self.size}') + raise ValueError(f"trace length {len(values)} does not match " f"allocated length {self.size}") self._parent.write_floats(f'trac:data "{self.name}",', values) class Virtual_Sweep_Context: - def __init__(self, arrangement: 'Arrangement_Context', sweep: np.ndarray, - start_trigger: Optional[str], step_time_s: float, - step_trigger: Optional[str], repetitions: Optional[int]): + def __init__( + self, + arrangement: "Arrangement_Context", + sweep: np.ndarray, + start_trigger: Optional[str], + step_time_s: float, + step_trigger: Optional[str], + repetitions: Optional[int], + ): self._arrangement = arrangement self._sweep = sweep self._step_trigger = step_trigger @@ -1627,8 +1673,7 @@ def actual_values_V(self, contact: str) -> np.ndarray: return self._sweep[:, index] def start(self) -> None: - """Start the 2D sweep - """ + """Start the 2D sweep""" self._ensure_qdac_setup() trigger = self._arrangement.get_trigger_by_name(self._start_trigger_name) self._arrangement._qdac.trigger(trigger) @@ -1654,10 +1699,9 @@ def _route_inner_trigger(self) -> None: # All channels change in sync, so just use the first channel to make the # external trigger. channel = self._get_channel(0) - channel.write_channel(f'sour{"{0}"}:dc:mark:sst ' - f'{_trigger_context_to_value(trigger)}') + channel.write_channel(f'sour{"{0}"}:dc:mark:sst ' f"{_trigger_context_to_value(trigger)}") - def _get_channel(self, contact_index: int) -> 'QDac2Channel': + def _get_channel(self, contact_index: int) -> "QDac2Channel": channel_number = self._arrangement._channels[contact_index] qdac = self._arrangement._qdac return qdac.channel(channel_number) @@ -1668,21 +1712,24 @@ def _send_lists_to_qdac(self) -> None: def _send_list_to_qdac(self, contact_index, voltages): channel = self._get_channel(contact_index) - dc_list = channel.dc_list(voltages=voltages, dwell_s=self._step_time_s, - repetitions=self._repetitions) + dc_list = channel.dc_list(voltages=voltages, dwell_s=self._step_time_s, repetitions=self._repetitions) trigger = self._arrangement.get_trigger_by_name(self._start_trigger_name) dc_list.start_on(trigger) def _make_ready_to_start(self): # Bug circumvention for contact_index in range(self._arrangement.shape): channel = self._get_channel(contact_index) - channel.write_channel('sour{0}:dc:init') + channel.write_channel("sour{0}:dc:init") class Arrangement_Context: - def __init__(self, qdac: 'QDac2', contacts: Dict[str, int], - output_triggers: Optional[Dict[str, int]], - internal_triggers: Optional[Sequence[str]]): + def __init__( + self, + qdac: "QDac2", + contacts: dict[str, int], + output_triggers: Optional[dict[str, int]], + internal_triggers: Optional[Sequence[str]], + ): self._qdac = qdac self._fix_contact_order(contacts) self._allocate_triggers(internal_triggers, output_triggers) @@ -1713,9 +1760,7 @@ def contact_names(self) -> Sequence[str]: """ return self._contact_names - def _allocate_internal_triggers(self, - internal_triggers: Optional[Sequence[str]] - ) -> None: + def _allocate_internal_triggers(self, internal_triggers: Optional[Sequence[str]]) -> None: if not internal_triggers: return for name in internal_triggers: @@ -1747,7 +1792,7 @@ def set_virtual_voltage(self, contact: str, voltage: float) -> None: raise ValueError(f'No contact named "{contact}"') self._effectuate_virtual_voltage(index, voltage) - def set_virtual_voltages(self, contacts_to_voltages: Dict[str, float]) -> None: + def set_virtual_voltages(self, contacts_to_voltages: dict[str, float]) -> None: """Set virtual voltages on specific contacts in one go The actual voltage that each contact will receive depends on the @@ -1791,7 +1836,7 @@ def add_correction(self, contact: str, factors: Sequence[float]) -> None: multiplier[index] = factors self._correction = np.matmul(multiplier, self._correction) - def _fix_contact_order(self, contacts: Dict[str, int]) -> None: + def _fix_contact_order(self, contacts: dict[str, int]) -> None: self._contact_names = list() self._contacts = dict() self._channels = list() @@ -1846,12 +1891,12 @@ def get_trigger_by_name(self, name: str) -> QDac2Trigger_Context: try: return self._internal_triggers[name] except KeyError: - print(f'Internal triggers: {list(self._internal_triggers.keys())}') + print(f"Internal triggers: {list(self._internal_triggers.keys())}") raise def _all_channels_as_suffix(self) -> str: channels_str = ints_to_comma_separated_list(self.channel_numbers) - return f'(@{channels_str})' + return f"(@{channels_str})" def currents_A(self, nplc: int = 1, current_range: str = "low") -> Sequence[float]: """Measure currents on all contacts @@ -1861,22 +1906,26 @@ def currents_A(self, nplc: int = 1, current_range: str = "low") -> Sequence[floa current_range (str, optional): Current range (default low) """ channels_suffix = self._all_channels_as_suffix() - self._qdac.write(f'sens:rang {current_range},{channels_suffix}') - self._qdac.write(f'sens:nplc {nplc},{channels_suffix}') + self._qdac.write(f"sens:rang {current_range},{channels_suffix}") + self._qdac.write(f"sens:nplc {nplc},{channels_suffix}") # Discard first reading because of possible output-capacitor effects, etc slowest_line_freq_Hz = 50 sleep_s(1 / slowest_line_freq_Hz) - self._qdac.ask(f'read? {channels_suffix}') + self._qdac.ask(f"read? {channels_suffix}") # Then make a proper reading sleep_s((nplc + 1) / slowest_line_freq_Hz) - currents = self._qdac.ask(f'read? {channels_suffix}') + currents = self._qdac.ask(f"read? {channels_suffix}") return comma_sequence_to_list_of_floats(currents) - def virtual_sweep(self, contact: str, voltages: Sequence[float], - start_sweep_trigger: Optional[str] = None, - step_time_s: float = 1e-5, - step_trigger: Optional[str] = None, - repetitions: int = 1) -> Virtual_Sweep_Context: + def virtual_sweep( + self, + contact: str, + voltages: Sequence[float], + start_sweep_trigger: Optional[str] = None, + step_time_s: float = 1e-5, + step_trigger: Optional[str] = None, + repetitions: int = 1, + ) -> Virtual_Sweep_Context: """Sweep a contact to create a 1D sweep Args: @@ -1892,11 +1941,9 @@ def virtual_sweep(self, contact: str, voltages: Sequence[float], Virtual_Sweep_Context: context manager """ sweep = self._calculate_1d_values(contact, voltages) - return Virtual_Sweep_Context(self, sweep, start_sweep_trigger, - step_time_s, step_trigger, repetitions) + return Virtual_Sweep_Context(self, sweep, start_sweep_trigger, step_time_s, step_trigger, repetitions) - def _calculate_1d_values(self, contact: str, voltages: Sequence[float] - ) -> np.ndarray: + def _calculate_1d_values(self, contact: str, voltages: Sequence[float]) -> np.ndarray: original_voltage = self.virtual_voltage(contact) index = self._contact_index(contact) sweep = list() @@ -1906,12 +1953,17 @@ def _calculate_1d_values(self, contact: str, voltages: Sequence[float] self._virtual_voltages[index] = original_voltage return np.array(sweep) - def virtual_sweep2d(self, inner_contact: str, inner_voltages: Sequence[float], - outer_contact: str, outer_voltages: Sequence[float], - start_sweep_trigger: Optional[str] = None, - inner_step_time_s: float = 1e-5, - inner_step_trigger: Optional[str] = None, - repetitions: int = 1) -> Virtual_Sweep_Context: + def virtual_sweep2d( + self, + inner_contact: str, + inner_voltages: Sequence[float], + outer_contact: str, + outer_voltages: Sequence[float], + start_sweep_trigger: Optional[str] = None, + inner_step_time_s: float = 1e-5, + inner_step_trigger: Optional[str] = None, + repetitions: int = 1, + ) -> Virtual_Sweep_Context: """Sweep two contacts to create a 2D sweep Args: @@ -1927,15 +1979,14 @@ def virtual_sweep2d(self, inner_contact: str, inner_voltages: Sequence[float], Returns: Virtual_Sweep_Context: context manager """ - sweep = self._calculate_2d_values(inner_contact, inner_voltages, - outer_contact, outer_voltages) - return Virtual_Sweep_Context(self, sweep, start_sweep_trigger, - inner_step_time_s, inner_step_trigger, repetitions) + sweep = self._calculate_2d_values(inner_contact, inner_voltages, outer_contact, outer_voltages) + return Virtual_Sweep_Context( + self, sweep, start_sweep_trigger, inner_step_time_s, inner_step_trigger, repetitions + ) - def _calculate_2d_values(self, inner_contact: str, - inner_voltages: Sequence[float], - outer_contact: str, - outer_voltages: Sequence[float]) -> np.ndarray: + def _calculate_2d_values( + self, inner_contact: str, inner_voltages: Sequence[float], outer_contact: str, outer_voltages: Sequence[float] + ) -> np.ndarray: original_fast_voltage = self.virtual_voltage(inner_contact) original_slow_voltage = self.virtual_voltage(outer_contact) outer_index = self._contact_index(outer_contact) @@ -1950,12 +2001,17 @@ def _calculate_2d_values(self, inner_contact: str, self._virtual_voltages[outer_index] = original_slow_voltage return np.array(sweep) - def virtual_detune(self, contacts: Sequence[str], start_V: Sequence[float], - end_V: Sequence[float], steps: int, - start_trigger: Optional[str] = None, - step_time_s: float = 1e-5, - step_trigger: Optional[str] = None, - repetitions: int = 1) -> Virtual_Sweep_Context: + def virtual_detune( + self, + contacts: Sequence[str], + start_V: Sequence[float], + end_V: Sequence[float], + steps: int, + start_trigger: Optional[str] = None, + step_time_s: float = 1e-5, + step_trigger: Optional[str] = None, + repetitions: int = 1, + ) -> Virtual_Sweep_Context: """Sweep any number of contacts linearly from one set of values to another set of values Args: @@ -1970,19 +2026,19 @@ def virtual_detune(self, contacts: Sequence[str], start_V: Sequence[float], """ self._check_same_lengths(contacts, start_V, end_V) sweep = self._calculate_detune_values(contacts, start_V, end_V, steps) - return Virtual_Sweep_Context(self, sweep, start_trigger, step_time_s, - step_trigger, repetitions) + return Virtual_Sweep_Context(self, sweep, start_trigger, step_time_s, step_trigger, repetitions) @staticmethod def _check_same_lengths(contacts, start_V, end_V) -> None: n_contacts = len(contacts) if n_contacts != len(start_V): - raise ValueError(f'There must be exactly one voltage per contact: {start_V}') + raise ValueError(f"There must be exactly one voltage per contact: {start_V}") if n_contacts != len(end_V): - raise ValueError(f'There must be exactly one voltage per contact: {end_V}') + raise ValueError(f"There must be exactly one voltage per contact: {end_V}") - def _calculate_detune_values(self, contacts: Sequence[str], start_V: Sequence[float], - end_V: Sequence[float], steps: int): + def _calculate_detune_values( + self, contacts: Sequence[str], start_V: Sequence[float], end_V: Sequence[float], steps: int + ): original_voltages = [self.virtual_voltage(contact) for contact in contacts] indices = [self._contact_index(contact) for contact in contacts] sweep = list() @@ -2009,14 +2065,14 @@ def leakage(self, modulation_V: float, nplc: int = 2) -> np.ndarray: Returns: ndarray: contact-to-contact resistance in Ohms """ - steady_state_A, currents_matrix = self._leakage_currents(modulation_V, nplc, 'low') - with np.errstate(divide='ignore'): + steady_state_A, currents_matrix = self._leakage_currents(modulation_V, nplc, "low") + with np.errstate(divide="ignore"): return np.abs(modulation_V / diff_matrix(steady_state_A, currents_matrix)) - def _leakage_currents(self, modulation_V: float, nplc: int, - current_range: str - ) -> Tuple[Sequence[float], Sequence[Sequence[float]]]: - steady_state_A = self.currents_A(nplc, 'low') + def _leakage_currents( + self, modulation_V: float, nplc: int, current_range: str + ) -> tuple[Sequence[float], Sequence[Sequence[float]]]: + steady_state_A = self.currents_A(nplc, "low") currents_matrix = list() for index, channel_nr in enumerate(self.channel_numbers): original_V = self._virtual_voltages[index] @@ -2029,16 +2085,14 @@ def _leakage_currents(self, modulation_V: float, nplc: int, def _contact_index(self, contact: str) -> int: return self._contacts[contact] - def _allocate_triggers(self, internal_triggers: Optional[Sequence[str]], - output_triggers: Optional[Dict[str, int]] - ) -> None: - self._internal_triggers: Dict[str, QDac2Trigger_Context] = dict() + def _allocate_triggers( + self, internal_triggers: Optional[Sequence[str]], output_triggers: Optional[dict[str, int]] + ) -> None: + self._internal_triggers: dict[str, QDac2Trigger_Context] = dict() self._allocate_internal_triggers(internal_triggers) self._allocate_external_triggers(output_triggers) - def _allocate_external_triggers(self, output_triggers: - Optional[Dict[str, int]] - ) -> None: + def _allocate_external_triggers(self, output_triggers: Optional[dict[str, int]]) -> None: self._external_triggers = dict() if not output_triggers: return @@ -2071,7 +2125,7 @@ def __init__(self, name: str, address: str, **kwargs) -> None: **kwargs: additional argument to the Visa driver """ self._check_instrument_name(name) - super().__init__(name, address, terminator='\n', **kwargs) + super().__init__(name, address, terminator="\n", **kwargs) self._set_up_serial() self._set_up_debug_settings() self._set_up_channels() @@ -2088,7 +2142,7 @@ def n_channels(self) -> int: Returns: int: Number of channels """ - return len(self.submodules['channels']) + return len(self.submodules["channels"]) def channel(self, ch: int) -> QDac2Channel: """ @@ -2098,7 +2152,7 @@ def channel(self, ch: int) -> QDac2Channel: Returns: QDac2Channel: Visa representation of the channel """ - return getattr(self, f'ch{ch:02}') + return getattr(self, f"ch{ch:02}") @staticmethod def n_triggers() -> int: @@ -2121,7 +2175,7 @@ def n_external_outputs(self) -> int: Returns: int: Number of external output triggers """ - return len(self.submodules['external_triggers']) + return len(self.submodules["external_triggers"]) def allocate_trigger(self) -> QDac2Trigger_Context: """Allocate an internal trigger @@ -2137,7 +2191,7 @@ def allocate_trigger(self) -> QDac2Trigger_Context: try: number = self._internal_triggers.pop() except KeyError: - raise ValueError('no free internal triggers') + raise ValueError("no free internal triggers") return QDac2Trigger_Context(self, number) def free_trigger(self, trigger: QDac2Trigger_Context) -> None: @@ -2158,9 +2212,7 @@ def free_all_triggers(self) -> None: """ self._set_up_internal_triggers() - def connect_external_trigger(self, port: int, trigger: QDac2Trigger_Context, - width_s: float = 1e-6 - ) -> None: + def connect_external_trigger(self, port: int, trigger: QDac2Trigger_Context, width_s: float = 1e-6) -> None: """Route internal trigger to external trigger Args: @@ -2169,11 +2221,11 @@ def connect_external_trigger(self, port: int, trigger: QDac2Trigger_Context, width_s (float, optional): Output trigger width in seconds (default 1ms) """ internal = _trigger_context_to_value(trigger) - self.write(f'outp:trig{port}:sour int{internal}') - self.write(f'outp:trig{port}:widt {width_s}') + self.write(f"outp:trig{port}:sour int{internal}") + self.write(f"outp:trig{port}:widt {width_s}") def reset(self) -> None: - self.write('*rst') + self.write("*rst") sleep_s(5) def errors(self) -> str: @@ -2182,7 +2234,7 @@ def errors(self) -> str: Returns: str: Comma separated list of errors or '0, "No error"' """ - return self.ask('syst:err:all?') + return self.ask("syst:err:all?") def error(self) -> str: """Retrieve next error @@ -2190,7 +2242,7 @@ def error(self) -> str: Returns: str: The next error or '0, "No error"' """ - return self.ask('syst:err?') + return self.ask("syst:err?") def n_errors(self) -> int: """Peek at number of previous errors @@ -2198,7 +2250,7 @@ def n_errors(self) -> int: Returns: int: Number of errors """ - return int(self.ask('syst:err:coun?')) + return int(self.ask("syst:err:coun?")) def start_all(self) -> None: """Trigger the global SCPI bus (``*TRG``) @@ -2206,14 +2258,14 @@ def start_all(self) -> None: All generators, that have not been explicitly set to trigger on an internal or external trigger, will be started. """ - self.write('*trg') + self.write("*trg") def remove_traces(self) -> None: """Delete all trace definitions from the instrument This means that all AWGs loose their data. """ - self.write('trac:rem:all') + self.write("trac:rem:all") def traces(self) -> Sequence[str]: """List all defined traces @@ -2221,7 +2273,7 @@ def traces(self) -> Sequence[str]: Returns: Sequence[str]: trace names """ - return comma_sequence_to_list(self.ask('trac:cat?')) + return comma_sequence_to_list(self.ask("trac:cat?")) def allocate_trace(self, name: str, size: int) -> Trace_Context: """Reserve memory for a new trace @@ -2240,14 +2292,15 @@ def mac(self) -> str: Returns: str: Media Access Control (MAC) address of the instrument """ - mac = self.ask('syst:comm:lan:mac?') - return f'{mac[1:3]}-{mac[3:5]}-{mac[5:7]}-{mac[7:9]}-{mac[9:11]}' \ - f'-{mac[11:13]}' + mac = self.ask("syst:comm:lan:mac?") + return f"{mac[1:3]}-{mac[3:5]}-{mac[5:7]}-{mac[7:9]}-{mac[9:11]}" f"-{mac[11:13]}" - def arrange(self, contacts: Dict[str, int], - output_triggers: Optional[Dict[str, int]] = None, - internal_triggers: Optional[Sequence[str]] = None - ) -> Arrangement_Context: + def arrange( + self, + contacts: dict[str, int], + output_triggers: Optional[dict[str, int]] = None, + internal_triggers: Optional[Sequence[str]] = None, + ) -> Arrangement_Context: """An arrangement of contacts and triggers for virtual gates Each contact corresponds to a particular output channel. Each @@ -2271,8 +2324,7 @@ def arrange(self, contacts: Dict[str, int], Returns: Arrangement_Context: context manager """ - return Arrangement_Context(self, contacts, output_triggers, - internal_triggers) + return Arrangement_Context(self, contacts, output_triggers, internal_triggers) # ----------------------------------------------------------------------- # Instrument-wide functions @@ -2287,10 +2339,10 @@ def start_recording_scpi(self) -> None: Any previous recordings are removed. To inspect the SCPI commands sent to the instrument, call get_recorded_scpi_commands(). """ - self._scpi_sent: List[str] = list() + self._scpi_sent: list[str] = list() self._record_commands = True - def get_recorded_scpi_commands(self) -> List[str]: + def get_recorded_scpi_commands(self) -> list[str]: """ Returns: Sequence[str]: SCPI commands sent to the instrument @@ -2300,8 +2352,7 @@ def get_recorded_scpi_commands(self) -> List[str]: return commands def clear(self) -> None: - """Reset the VISA message queue of the instrument - """ + """Reset the VISA message queue of the instrument""" self.visa_handle.clear() def clear_read_queue(self) -> Sequence[str]: @@ -2361,12 +2412,12 @@ def write_floats(self, cmd: str, values: Sequence[float]) -> None: Remember to include separating space in command if needed. """ if self._no_binary_values: - compiled = f'{cmd}{floats_to_comma_separated_list(values)}' + compiled = f"{cmd}{floats_to_comma_separated_list(values)}" if self._record_commands: self._scpi_sent.append(compiled) return super().write(compiled) if self._record_commands: - self._scpi_sent.append(f'{cmd}{floats_to_comma_separated_list(values)}') + self._scpi_sent.append(f"{cmd}{floats_to_comma_separated_list(values)}") self.visa_handle.write_binary_values(cmd, values) # ----------------------------------------------------------------------- @@ -2383,40 +2434,36 @@ def _set_up_serial(self) -> None: self.visa_handle.baud_rate = 921600 # type: ignore def _check_for_wrong_model(self) -> None: - model = self.IDN()['model'] - if model != 'QDAC-II': - raise ValueError(f'Unknown model {model}. Are you using the right' - ' driver for your instrument?') + model = self.IDN()["model"] + if model != "QDAC-II": + raise ValueError(f"Unknown model {model}. Are you using the right" " driver for your instrument?") def _check_for_incompatiable_firmware(self) -> None: # Only compare the firmware, not the FPGA version - firmware = split_version_string_into_components(self.IDN()['firmware'])[1] - least_compatible_fw = '0.17.5' + firmware = split_version_string_into_components(self.IDN()["firmware"])[1] + least_compatible_fw = "0.17.5" if parse(firmware) < parse(least_compatible_fw): - raise ValueError(f'Incompatible firmware {firmware}. You need at ' - f'least {least_compatible_fw}') + raise ValueError(f"Incompatible firmware {firmware}. You need at " f"least {least_compatible_fw}") def _set_up_channels(self) -> None: - channels = ChannelList(self, 'Channels', QDac2Channel, - snapshotable=False) + channels = ChannelList(self, "Channels", QDac2Channel, snapshotable=False) for i in range(1, 24 + 1): - name = f'ch{i:02}' + name = f"ch{i:02}" channel = QDac2Channel(self, name, i) self.add_submodule(name, channel) channels.append(channel) channels.lock() - self.add_submodule('channels', channels) + self.add_submodule("channels", channels) def _set_up_external_triggers(self) -> None: - triggers = ChannelList(self, 'Channels', QDac2ExternalTrigger, - snapshotable=False) + triggers = ChannelList(self, "Channels", QDac2ExternalTrigger, snapshotable=False) for i in range(1, 5 + 1): - name = f'ext{i}' + name = f"ext{i}" trigger = QDac2ExternalTrigger(self, name, i) self.add_submodule(name, trigger) triggers.append(trigger) triggers.lock() - self.add_submodule('external_triggers', triggers) + self.add_submodule("external_triggers", triggers) def _set_up_internal_triggers(self) -> None: # A set of the available internal triggers @@ -2424,18 +2471,19 @@ def _set_up_internal_triggers(self) -> None: def _set_up_manual_triggers(self) -> None: self.add_parameter( - name='trigger', + name="trigger", # Manually trigger event set_parser=_trigger_context_to_value, - set_cmd='tint {}', + set_cmd="tint {}", ) def _set_up_simple_functions(self) -> None: - self.add_function('abort', call_cmd='abor') + self.add_function("abort", call_cmd="abor") def _check_instrument_name(self, name: str) -> None: if name.isidentifier(): return raise ValueError( f'Instrument name "{name}" is incompatible with QCoDeS parameter ' - 'generation (no spaces, punctuation, prepended numbers, etc)') + "generation (no spaces, punctuation, prepended numbers, etc)" + ) diff --git a/src/qumada/instrument/custom_drivers/ZI/MFLI.py b/src/qumada/instrument/custom_drivers/ZI/MFLI.py index ea5b515..523f1d8 100644 --- a/src/qumada/instrument/custom_drivers/ZI/MFLI.py +++ b/src/qumada/instrument/custom_drivers/ZI/MFLI.py @@ -182,13 +182,13 @@ def __init__( set_cmd=lambda x: self.instr.sigouts[0].range(x), docstring="Range of the output", ) - + self.add_parameter( - name = "output_enabled", - label = "Output Enabled", - get_cmd = lambda: self.instr.sigouts[0].on(), - set_cmd = lambda x: self.instr.sigouts[0].on(x), - docstring = "Turns Output1 on or off" + name="output_enabled", + label="Output Enabled", + get_cmd=lambda: self.instr.sigouts[0].on(), + set_cmd=lambda x: self.instr.sigouts[0].on(x), + docstring="Turns Output1 on or off", ) self.add_parameter( @@ -237,8 +237,8 @@ def __init__( ) def get_idn(self): - #TODO: Implement this function! - # /dev..../features/devtype + # TODO: Implement this function! + # /dev..../features/devtype # /dev..../features/serial # /dev..../features/options - return {} \ No newline at end of file + return {} diff --git a/src/qumada/instrument/mapping/QDevil/qdac2.py b/src/qumada/instrument/mapping/QDevil/qdac2.py index 4dfc8e8..7e911fe 100644 --- a/src/qumada/instrument/mapping/QDevil/qdac2.py +++ b/src/qumada/instrument/mapping/QDevil/qdac2.py @@ -22,8 +22,8 @@ import logging from qcodes.parameters import Parameter -from qumada.instrument.custom_drivers.QDevil.QDAC2 import QDac2 +from qumada.instrument.custom_drivers.QDevil.QDAC2 import QDac2 from qumada.instrument.mapping import QDAC2_MAPPING from qumada.instrument.mapping.base import InstrumentMapping @@ -160,7 +160,8 @@ def pulse( channel.dc_list( voltages=points, dwell_s=delay, - ) for channel, points in zip(channels, setpoints) + ) + for channel, points in zip(channels, setpoints) ] if sync_trigger is not None: @@ -186,10 +187,10 @@ def clean_generators(self): for dc_list in self.dc_lists: dc_list.abort() self.dc_lists = [] - + @staticmethod def query_instrument(parameters: list[Parameter]): - """ Check if all parameters are from the same instrument """ + """Check if all parameters are from the same instrument""" instruments = {parameter.root_instrument for parameter in parameters} if len(instruments) > 1: raise Exception( @@ -199,4 +200,3 @@ def query_instrument(parameters: list[Parameter]): qdac: QDac2 = instruments.pop() assert isinstance(qdac, QDac2) return qdac - \ No newline at end of file diff --git a/src/qumada/measurement/device_object.py b/src/qumada/measurement/device_object.py index 676ec05..2505941 100644 --- a/src/qumada/measurement/device_object.py +++ b/src/qumada/measurement/device_object.py @@ -15,16 +15,16 @@ from qumada.instrument.buffers.buffer import map_triggers from qumada.instrument.mapping import map_terminals_gui from qumada.measurement.scripts import ( + Generic_1D_Hysteresis_buffered, + Generic_1D_parallel_asymm_Sweep, Generic_1D_Sweep, Generic_1D_Sweep_buffered, - Generic_1D_parallel_asymm_Sweep, - Generic_1D_Hysteresis_buffered, Generic_2D_Sweep_buffered, Generic_nD_Sweep, - Timetrace, - Timetrace_buffered, Generic_Pulsed_Measurement, Generic_Pulsed_Repeated_Measurement, + Timetrace, + Timetrace_buffered, ) from qumada.utils.ramp_parameter import ramp_or_set_parameter @@ -251,7 +251,7 @@ def timetrace( metadata=None, station=None, buffered=False, - buffer_settings: dict|None = None, + buffer_settings: dict | None = None, priorize_stored_value=False, ): """ """ @@ -304,7 +304,7 @@ def sweep_2D( metadata=None, station=None, buffered=False, - buffer_settings: dict|None = None, + buffer_settings: dict | None = None, priorize_stored_value=False, restore_state=True, ): @@ -330,7 +330,7 @@ def sweep_2D( fast_param.value - fast_param_range / 2.0, fast_param.value + fast_param_range / 2.0, fast_num_points ) if buffer_settings is None: - buffer_settings = self.buffer_settings + buffer_settings = self.buffer_settings temp_buffer_settings = deepcopy(buffer_settings) if buffered is True: if "num_points" in temp_buffer_settings.keys(): @@ -370,17 +370,18 @@ def sweep_2D( del self.states["_temp_2D"] return data - def sweep_parallel(self, - params: list[Parameter], - setpoints: list[list[float]]|None = None, - target_values: list[float]|None = None, - num_points: int = 100, - name = None, - metadata = None, - station = None, - priorize_stored_value = False, - **kwargs - ): + def sweep_parallel( + self, + params: list[Parameter], + setpoints: list[list[float]] | None = None, + target_values: list[float] | None = None, + num_points: int = 100, + name=None, + metadata=None, + station=None, + priorize_stored_value=False, + **kwargs, + ): """ Sweep multiple parameters in parallel. Provide either setpoints or target_values. Setpoints have to have the same length for all parameters. @@ -395,15 +396,14 @@ def sweep_parallel(self, if not isinstance(station, Station): raise TypeError("No valid station assigned!") if setpoints is None and target_values is None: - raise(Exception("Either setpoints or target_values have to be provided!")) + raise (Exception("Either setpoints or target_values have to be provided!")) if target_values is not None and setpoints is not None: - raise(Exception("Either setpoints or target_values have to be provided, not both!")) + raise (Exception("Either setpoints or target_values have to be provided, not both!")) if setpoints is None: assert len(params) == len(target_values) setpoints = [np.linspace(param(), target, num_points) for param, target in zip(params, target_values)] assert len(params) == len(setpoints) assert all([len(setpoint) == len(setpoints[0]) for setpoint in setpoints]) - for terminal in self.terminals.values(): for parameter in terminal.terminal_parameters.values(): @@ -414,36 +414,34 @@ def sweep_parallel(self, parameter.setpoints = setpoints[params.index(parameter)] script = Generic_1D_parallel_asymm_Sweep() script.setup( - self.save_to_dict(priorize_stored_value=priorize_stored_value), - metadata=metadata, - name=name, - **kwargs + self.save_to_dict(priorize_stored_value=priorize_stored_value), metadata=metadata, name=name, **kwargs ) mapping = self.instrument_parameters map_terminals_gui(station.components, script.gate_parameters, mapping) data = script.run() return data - def pulsed_measurement(self, - params: list[Parameter], - setpoints: list[list[float]], - repetitions: int = 1, - name = None, - metadata = None, - station = None, - buffer_settings: dict|None = None, - priorize_stored_value = False, - **kwargs, - ): + def pulsed_measurement( + self, + params: list[Parameter], + setpoints: list[list[float]], + repetitions: int = 1, + name=None, + metadata=None, + station=None, + buffer_settings: dict | None = None, + priorize_stored_value=False, + **kwargs, + ): if station is None: station = self.station if not isinstance(station, Station): raise TypeError("No valid station assigned!") assert len(params) == len(setpoints) assert all([len(setpoint) == len(setpoints[0]) for setpoint in setpoints]) - assert repetitions >= 1 + assert repetitions >= 1 if buffer_settings is None: - buffer_settings = self.buffer_settings + buffer_settings = self.buffer_settings temp_buffer_settings = deepcopy(buffer_settings) if "num_points" in temp_buffer_settings.keys(): @@ -470,7 +468,7 @@ def pulsed_measurement(self, script.setup( self.save_to_dict(priorize_stored_value=priorize_stored_value), metadata=metadata, - measurement_name=name, # achtung geändert! + measurement_name=name, # achtung geändert! repetitions=repetitions, buffer_settings=temp_buffer_settings, **self.buffer_script_setup, @@ -482,7 +480,7 @@ def pulsed_measurement(self, data = script.run() return data - + def create_hook(func, hook): """ Decorator to hook a function onto an existing function. @@ -753,7 +751,7 @@ def measured_ramp( metadata=None, backsweep=False, buffered=False, - buffer_settings: dict|None = None, + buffer_settings: dict | None = None, priorize_stored_value=False, ): if station is None: @@ -800,7 +798,7 @@ def measured_ramp( self._parent_device.save_to_dict(priorize_stored_value=priorize_stored_value), metadata=metadata, name=name, - iterations = 1, + iterations=1, buffer_settings=temp_buffer_settings, **self._parent_device.buffer_script_setup, ) @@ -856,7 +854,7 @@ def __call__(self, value=None, ramp=None): return self.value else: if ramp is True: - self.ramp(value) + self.ramp(value) else: self.value = value diff --git a/src/qumada/measurement/measurement.py b/src/qumada/measurement/measurement.py index 21e8c8a..1553e7a 100644 --- a/src/qumada/measurement/measurement.py +++ b/src/qumada/measurement/measurement.py @@ -179,7 +179,7 @@ def setup( *, add_script_to_metadata: bool = True, add_parameters_to_metadata: bool = True, - buffer_settings: dict|None = None, + buffer_settings: dict | None = None, measurement_name: str | None = None, **settings: dict, ) -> None: @@ -631,9 +631,10 @@ def initialize(self, dyn_ramp_to_val=False, inactive_dyn_channels: list | None = if self.properties[gate][parameter]["type"].find("comp") >= 0: try: for i in range(len(self.compensating_parameters)): - if self.compensating_parameters[i]["gate"] == gate and self.compensating_parameters[i][ - "parameter" - ] == parameter: + if ( + self.compensating_parameters[i]["gate"] == gate + and self.compensating_parameters[i]["parameter"] == parameter + ): break i = self.compensating_parameters.index({"gate": gate, "parameter": parameter}) leverarms = self.compensating_leverarms[i] @@ -652,8 +653,10 @@ def initialize(self, dyn_ramp_to_val=False, inactive_dyn_channels: list | None = # Get only the relevant list entries for the current parameter try: for i in range(len(self.dynamic_parameters)): - if self.dynamic_parameters[i]["gate"] == comped_param["gate"] and self.dynamic_parameters[i][ - "parameter"] == comped_param["parameter"]: + if ( + self.dynamic_parameters[i]["gate"] == comped_param["gate"] + and self.dynamic_parameters[i]["parameter"] == comped_param["parameter"] + ): comped_index = i break # comped_index = self.dynamic_parameters.index(comped_param) diff --git a/src/qumada/measurement/shuttling_scripts.py b/src/qumada/measurement/shuttling_scripts.py index e6a3db2..a12dec0 100644 --- a/src/qumada/measurement/shuttling_scripts.py +++ b/src/qumada/measurement/shuttling_scripts.py @@ -6,7 +6,7 @@ def shuttle_setpoints(offsets, amplitudes, num_periods, phases, sampling_rate, b def shuttling_sin(x, offset, amplitude, phase, reverse = False): if reverse: sign = -1 - else: + else: sign = 1 return amplitude*np.sin(sign*x+phase)+offset for i in range(0, 4): @@ -16,7 +16,7 @@ def shuttling_sin(x, offset, amplitude, phase, reverse = False): ) for i in range(5, 7): pulses.append([barrier_voltages[i-5] for _ in range(int(sampling_rate*duration))]) - return pulses + return pulses def ramping_setpoints(starts, stops, duration, sampling_rate): setpoints = [] @@ -229,5 +229,3 @@ def run(self): datasets.append(datasaver.dataset) self.clean_up() return datasets - - diff --git a/src/tests/instrument_test.py b/src/tests/instrument_test.py index 0f70356..cca5c6b 100644 --- a/src/tests/instrument_test.py +++ b/src/tests/instrument_test.py @@ -21,8 +21,8 @@ # pylint: disable=missing-function-docstring import pytest from qcodes.instrument import Instrument, VisaInstrument -from qcodes.parameters import Parameter from qcodes.instrument_drivers.mock_instruments import DummyInstrument +from qcodes.parameters import Parameter from qumada.instrument.instrument import is_instrument_class