Source code for jii_multispeq.device.command

"""
Commands are raw communication with the MultispeQ device.
"""

import json
import re
import time
import warnings
from tabulate import tabulate
from jii_multispeq.measurement.checksum import strip_crc32
from jii_multispeq.measurement.sanitize import sanitize
from jii_multispeq.constants import REGEX_RETURN_END, REGEX_RETURN_END_CLI

[docs] def is_connected ( connection=None ): """ Test the connection to a MultispeQ. :param connection: Connection to the MultispeQ. :type connection: serial :return: True if a MultispeQ is connected, otherwise False :rtype: bool :raises ValueError: if no connection is defined """ if connection is None: raise ValueError("A connection for the MultispeQ needs to be defined") # Set timeout to 0.01 connection.timeout = .01 if not connection.is_open: return False # Send handshake phrase data = send_command( connection, "hello" ) connection.timeout = None # Test if phrase MultispeQ Ready was received if data.strip() == 'MultispeQ Ready': print("MultispeQ found") return True return False
[docs] def info ( connection=None, verbose=True, include_config=False ): """ Get the MultispeQ instrument information. :param connection: Connection to the MultispeQ. :type connection: serial :param verbose: Print out data (default: True) :type verbose: bool :param include_config: Include device config in printed information (default: False) :type include_config: bool :return: Instrument Information :rtype: dict :raises ValueError: if verbose is not a boolean :raises ValueError: if baudrate is not a boolean """ if not isinstance(verbose, bool): raise ValueError("Input for verbose needs to be True or False") if not isinstance(include_config, bool): raise ValueError("Input for include_config needs to be True or False") # Send command data = send_command( connection, "1007", False) # Sanitize quotes data = sanitize( data ) try: data = json.loads(data) pass except json.JSONDecodeError as e: print(e) pass # Display Information if verbose and isinstance(data, dict): name = "## %s (%s) " % (data["device_name"], data["device_version"]) print( name ) print( "-" * (len(name)-1) ) tab = [ ['ID', data["device_id"]], ['Firmware', data["device_firmware"]], ['Battery [%]', data["device_battery"]] ] out = tabulate( tab , tablefmt="plain") print(out) print("\n## Settings") out = tabulate( data["settings"].items(), tablefmt="simple" ) print(out) if include_config: print("\n## Configuration") out = tabulate( data["configuration"].items(), tablefmt="simple") print(out) # Reset timeout connection.timeout = None return data
[docs] def get_memory ( connection=None, verbose=False ): """ Get the MultispeQ setting saved in its memory (EEPROM). :param connection: Connection to the MultispeQ. :type connection: serial :param verbose: Print out data (default: False) :type verbose: bool :return: Instrument memory :rtype: dict """ # Send command data = send_command( connection, "print_memory", False) # Sanitize quotes data = sanitize( data ) try: data = json.loads(data) pass except json.JSONDecodeError as e: print(e) pass # Display Information if verbose: output = tabulate( data.items() , headers=['Parameter', 'Value']) print( output ) # Reset timeout connection.timeout = None return data
[docs] def send_command ( connection=None, command="", verbose=False, is_silent=False ): """ Send a command to a MultispeQ device. :param connection: Connection to the MultispeQ. :type connection: serial :param command: Command :type command: str :param verbose: Print output (default: False) :type verbose: bool :param is_silent: Command will not return a response (default: False) :type is_silent: bool :return: Instrument output :rtype: str :raises ValueError: if no connection is defined :raises ValueError: if command is not provided as a string :raises Exception: if connection is not open or device connected """ if connection is None: raise ValueError("A connection for the MultispeQ needs to be defined") if not isinstance(command, str): raise ValueError("Provided command needs to be a string") if not isinstance(verbose, bool): raise ValueError("Input for verbose needs to be True or False") if not isinstance(verbose, bool): raise ValueError("Input for is_silent needs to be True or False") # Check if the connection is open if not connection.is_open: raise Exception("Connection not open, connect device or port locked by other application") # Set timeout to 0.01 connection.timeout = .01 # Check if it is a known command prog = re.compile( r"|".join(CMDS) ) if not prog.search( command ): warnings.warn("Unknown command, device or script might get stuck.") prog_cli = re.compile( REGEX_RETURN_END_CLI, re.MULTILINE ) prog = re.compile( REGEX_RETURN_END ) # Flush input buffer connection.reset_input_buffer() # Send command connection.write( command.encode() ) # Send linebreak to start command connection.write( "\r\n".encode() ) # Ensure data is actually sent before reading connection.flush() # Read port data = "" chunk = "" if is_silent: return data while True: if connection.in_waiting > 0: chunk = connection.read(connection.in_waiting).decode() data += chunk if verbose is True: print(chunk) # Stop reading when linebreak received if prog.search(chunk) or prog_cli.search(chunk): # Brief wait to ensure no more data is coming time.sleep(0.05) # Final check - if no more data waiting, we're done if connection.in_waiting == 0: break # Reset timeout connection.timeout = None return data
## List of commands CMDS = [ "1000", "1007", "1053", "battery", "compiled", "configure_bluetooth", "calibrate_magnetometer", "device_info", "digital_write", "expr", "flow_calibration_set_point", "flow_calibration_setting", "flow_calibration_value", "flow_off", "flow_v", "get_flow", "hall", "hello", "indicate", "indicate_off", "light", "memory", "on_5v", "p2p", "par_led", "print_all", "print_date", "print_magnetometer", "print_magnetometer_bias", "print_memory","pulse", "readonce", "reboot", "reset", "reset_flow_calibration", "reset_flow_zero_point", "scan_i2c", "set_accelerometer", "set_accelerometer_bias", "set_colorcal1", "set_colorcal2", "set_colorcal3", "set_colorcal_blanks", "set_cp", "set_dac", "set_date", "set_default_flow_rate", "set_detector1_offset", "set_detector2_offset", "set_detector3_offset", "set_detector4_offset", "set_device_info", "set_energy_save_time", "set_flow", "set_led_par", "set_magnetometer", "set_magnetometer_bias", "set_op", "set_open_closed_positions", "set_par", "set_serial", "set_shutdown_time", "set_thickness", "set_thickness_quick", "set_user_defined", "single_pulse", "sleep", "spm", "start_watchdog", "stop_watchdog", "tcs_length", "temp", "testmode", "upgrade", "usb", "usb_on", "xRb" ]