Source code for jii_multispeq.measurement.measure

"""
Take measurements using the MultispeQ device connected to 
a serial port. The returned data can be analyzed and viewed.
"""

import datetime
import time
import json
import re
import warnings
import hashlib
import os
import pandas as pd

from jii_multispeq.constants import REGEX_RETURN_END
from jii_multispeq.measurement.sanitize import sanitize

from tabulate import tabulate

from jii_multispeq.measurement.checksum import strip_crc32

[docs] def measure ( connection=None, protocol=[{}], filename='auto', notes="", directory="./local/" ): """ Take a measurement using a MultispeQ connected via a serial connection (USB or Bluetooth). :param connection: Connection to the MultispeQ. :type connection: serial :param protocol: Measurement Protocol :type protocol: str, dict or list :param filename: Name for saved measurement file. If set to None, no file is saved, Default name is current date and time. :type filename: str :param notes: Notes for the measurement :type notes: str :param directory: Directory the measurement is saved in. Default directory is "local". :type directory: str :return: The MultispeQ data is returned on success, otherwise None. :rtype: str :raises ValueError: if no connection is defined :raises ValueError: if no protocol for the MultispeQ is provided :raises ValueError: if protocol is not encoded as a string or dictionary :raises ValueError: if notes are not provided as a string :raises ValueError: if directory is not provided as a string :raises Exception: if connection is not open or device connected """ start = datetime.datetime.now() if connection is None: raise ValueError("A connection for the MultispeQ needs to be defined") if not isinstance(protocol, (dict, str, list)): raise ValueError("Provided protocol needs to be a string or dictionary") if not isinstance(notes, str): raise ValueError("Provided notes have to be a string") if not isinstance(directory, str): raise ValueError("Provided directory has to be a string") if filename == 'auto': filename = start.strftime("%Y-%m-%d_%H%M") # Check if the connection is open if not connection.is_open: raise Exception("Connection not open, connect device to port first") # Check if the protocol is a dictionary and stringify if isinstance( protocol, (dict, list) ): protocol = json.dumps( protocol, separators=(',', ':')) # Flush input buffer connection.reset_input_buffer() # Write the protocol to the Instrument connection.write( protocol.encode() ) # Send linebreak to start protocol connection.write( "\r\n".encode() ) # Ensure data is actually sent before reading connection.flush() # Data string data = "" # Regular expression to test for CRC32 checksum prog = re.compile( REGEX_RETURN_END, re.M | re.I ) # Read port while True: # Check if data is in the buffer if connection.in_waiting > 0: # Read bytes in buffer chunk = connection.read(connection.in_waiting).decode() data += chunk # Stop reading when linebreak received if prog.search( data ): break # Small delay to prevent excessive CPU usage time.sleep(0.01) # Remove linebreaks and split crc and data data, crc32 = strip_crc32( data ) ## Sanitize Strings json_str = sanitize( data ) try: data = json.loads(json_str) except json.decoder.JSONDecodeError as e: warnings.warn(e) return { 'Error': e }, crc32 #TODO: pass # Show a warning for battery below 25% if 'device_battery' in data and data['device_battery'] < 25: warnings.warn('Device battery low! Currently at %s%%, recharge soon.' % data['device_battery']) # Add filename if not filename is None: data['name'] = filename # Add Notes data['notes'] = notes # Add Date data['created_at'] = start.astimezone().isoformat() # Add Protocol data['protocol'] = json.loads(protocol) # Calculate Checksums data['md5_protocol'] = hashlib.md5( protocol.encode() ).hexdigest() data['md5_measurement'] = hashlib.md5( json_str.encode() ).hexdigest() ## Save file with measurements in notebook format if not filename is None: if not os.path.exists( directory ): os.makedirs( directory ) path = os.path.join( directory, (filename + '.json') ) ## Check if filename already exists. If true, " - #"" is attached i = 1 while os.path.exists(path): path = os.path.join( directory, '%s - %s.json' % (filename,i) ) i += 1 ## Write to disk with open( path, 'w') as fp: json.dump(data, fp, indent=2) return data, crc32
[docs] def analyze ( data=None, fn=None ): """ Pipe data from the MultispeQ measurement to an analysis function. This can be done manually, but this helps with the convoluted structure of the measurement output. :param data: MultispeQ data output :type data: dict :param fn: Function to analyze the provided data :type fn: function :return: Analyzed data output, returns None if fails :rtype: dict :raises ValueError: if no data is provided :raises Exeption: if fn is not a function """ if data is None: raise ValueError("No data is provided") if not isinstance(data, dict): raise ValueError("The provided data is not a dictionary") if fn is None: return data ## Output output = None if not hasattr( fn, '__call__'): raise Exception("No function is provided") try: ## Check if data is a dictionary and sample key is present if isinstance( data, dict ) and 'sample' in data: ## Check if sample is a list and has an element if isinstance( data['sample'], list ) and len(data['sample']) > 0: ## Now check if the first element in the sample list is a list as well that has an element # if isinstance( data['sample'][0], list ) and len(data['sample'][0]) > 0: ## Seems like it is the standard format # output = fn( data['sample'][0][0] ) ## TODO: Not sure if we should add data back or how we address it... # And the raw data # for key in data['sample'][0][0].keys(): # output[key] = data['sample'][0][0].get(key, None) output = fn( data['sample'][0] ) ## Perhaps some scrambled format, so sample is sent to the function else: output = fn( data['sample'] ) ## Probably unknown data source, so it just gets passed to the function else: output = fn( data ) except Exception as e: warnings.warn(e) pass ## Now that we have the output, so now the rest can be added keys = ['name', 'notes', 'created_at', 'protocol', 'md5_protocol', 'md5_measurement'] ## Low level data information keys += ['device_name', 'device_version', 'device_id', 'device_battery', 'device_firmware'] for key in keys: if key in data: output[key] = data.get(key, None) return output
[docs] def view ( data=None ): """ Tabular display of data for a single measurent. If a parameter is a list or dictionary or other type 'n/a' will be displayed as such content will not be plotted. :param data: MultispeQ data output :type data: dict :return: None :rtype: NoneType :raises ValueError: if no data is provided :raises ValueError: if data is not a dictionary """ if data is None: raise ValueError("No data provided") if not isinstance(data, dict): raise ValueError("Provided data needs to be a dictionary") keys = sorted(data.keys(), key=str.lower) table_content = [] for key in keys: value = data[key] vtype = type(value) if isinstance(value, (str, float, int )) or value is None: value = str(value) else: value = 'n/a' table_content.append([key, value, vtype]) table = tabulate(table_content, headers=['Parameter', 'Value', 'Type']) print(table) return None
[docs] def to_df ( data=None, append=None ): """ Adding data to dataframe. A new dataframe can be created or the data can be appended to an existing one, if the column format is the same. :param data: MultispeQ data output :type data: dict or list[dict] :return: None :rtype: NoneType :raises ValueError: if no data is provided :raises ValueError: if data is not a dictionary """ if data is None: raise ValueError("No data provided") if not isinstance(data, (dict, list)): raise ValueError("Provided data needs to be a dictionary or list of dictionarys") df = pd.DataFrame( data ) return df