"""
Take measurements using the MultispeQ device connected to
a serial port. The returned data can be analyzed and viewed.
"""
import datetime
import time
import json
import re
import warnings
import hashlib
import os
import pandas as pd
try:
from ipywidgets import IntProgress
from IPython.display import display
import time
except ImportError as e:
pass # module doesn't exist, deal with it.
from jii_multispeq.constants import REGEX_RETURN_END, REGEX_RETURN_USER_INPUT
from jii_multispeq.measurement.sanitize import sanitize
from tabulate import tabulate
from jii_multispeq.measurement.checksum import strip_crc32
[docs]
def measure ( connection=None, protocol=[{}], filename='auto', notes="", directory="./local/", progressbar=True ):
"""
Take a measurement using a MultispeQ connected via a serial connection (USB or Bluetooth).
:param connection: Connection to the MultispeQ.
:type connection: serial
:param protocol: Measurement Protocol
:type protocol: str, dict or list
:param filename: Name for saved measurement file. If set to None, no file is saved, Default name is current date and time.
:type filename: str
:param notes: Notes for the measurement
:type notes: str
:param directory: Directory the measurement is saved in. Default directory is "local".
:type directory: str
:param progressbar: Display progress bar. Default setting is True.
:type directory: bool
:return: The MultispeQ data is returned on success, otherwise None.
:rtype: str
:raises ValueError: if no connection is defined
:raises ValueError: if no protocol for the MultispeQ is provided
:raises ValueError: if protocol is not encoded as a string or dictionary
:raises ValueError: if notes are not provided as a string
:raises ValueError: if directory is not provided as a string
:raises Exception: if connection is not open or device connected
"""
start = datetime.datetime.now()
if connection is None:
raise ValueError("A connection for the MultispeQ needs to be defined")
if not isinstance(protocol, (dict, str, list)):
raise ValueError("Provided protocol needs to be a string or dictionary")
if not isinstance(notes, str):
raise ValueError("Provided notes have to be a string")
if not isinstance(directory, str):
raise ValueError("Provided directory has to be a string")
if filename == 'auto':
filename = start.strftime("%Y-%m-%d_%H%M")
# Check if the connection is open
if not connection.is_open:
raise Exception("Connection not open, connect device to port first")
# Check if the protocol is a dictionary and stringify
if isinstance( protocol, (dict, list) ):
protocol = json.dumps( protocol, separators=(',', ':'))
# Flush input buffer
connection.reset_input_buffer()
# Write the protocol to the Instrument
connection.write( protocol.encode() )
# Send linebreak to start protocol
connection.write( "\r\n".encode() )
# Ensure data is actually sent before reading
connection.flush()
# Data string
data = ""
# Regular expression to test for CRC32 checksum
str_end = re.compile( REGEX_RETURN_END, re.M | re.I )
# Regular expression to test for protocol breaks for user input
str_input = re.compile( REGEX_RETURN_USER_INPUT, re.I )
# Add progress bar
f = None
if progressbar:
try:
p_max_count = count_sub_protocols(protocol)
f = IntProgress(
min=0,
max=p_max_count,
description='Loading:',
bar_style='', # 'success', 'info', 'warning', 'danger' or ''
style={'bar_color': 'rgb(67,134,134)'},
orientation='horizontal'
) # instantiate the bar
display(f) # display the bar
except ImportError as e:
pass
# Read port
while True:
# Check if data is in the buffer
if connection.in_waiting > 0:
# Read bytes in buffer
chunk = connection.read(connection.in_waiting).decode()
data += chunk
# Check if data stream is interrupted for user input
if str_input.search( data ):
input_match = str_input.search( data )
input_action, input_message = input_match.groups()
# Show input dialog triggered by protocol command alert, etc. (legacy)
try:
user_input = input( "%s (%s)" % ( input_message, input_action ) )
except:
# Send empty string to have device continue
user_input = "Warning: No user input available."
print(user_input)
pass
# Send data to device
connection.write( ( "%s+" % user_input).encode() )
# Test if chunk was a sub-protocol
if f is not None:
f.value = f.value + 1
# Stop reading when linebreak received
if str_end.search( chunk ):
if f is not None:
f.value = p_max_count
f.description = "Done"
break
# Small delay to prevent excessive CPU usage
time.sleep(0.01)
# Remove linebreaks and split crc and data
data, crc32 = strip_crc32( data )
## Sanitize Strings
json_str = sanitize( data )
try:
data = json.loads(json_str)
except json.decoder.JSONDecodeError as e:
warnings.warn(e)
return { 'Error': e }, crc32 #TODO: pass
# Show a warning for battery below 25%
if 'device_battery' in data and data['device_battery'] < 25:
warnings.warn('Device battery low! Currently at %s%%, recharge soon.' % data['device_battery'])
# Add filename
if not filename is None:
data['name'] = filename
# Add Notes
data['notes'] = notes
# Add Date
data['created_at'] = start.astimezone().isoformat()
# Add Protocol
data['protocol'] = json.loads(protocol)
# Calculate Checksums
data['md5_protocol'] = hashlib.md5( protocol.encode() ).hexdigest()
data['md5_measurement'] = hashlib.md5( json_str.encode() ).hexdigest()
## Save file with measurements in notebook format
if not filename is None:
if not os.path.exists( directory ):
os.makedirs( directory )
path = os.path.join( directory, (filename + '.json') )
## Check if filename already exists. If true, " - #"" is attached
i = 1
while os.path.exists(path):
path = os.path.join( directory, '%s - %s.json' % (filename,i) )
i += 1
## Write to disk
with open( path, 'w') as fp:
json.dump(data, fp, indent=2)
return data, crc32
[docs]
def analyze ( data=None, fn=None ):
"""
Pipe data from the MultispeQ measurement to an analysis function.
This can be done manually, but this helps with the convoluted
structure of the measurement output.
:param data: MultispeQ data output
:type data: dict
:param fn: Function to analyze the provided data
:type fn: function
:return: Analyzed data output, returns None if fails
:rtype: dict
:raises ValueError: if no data is provided
:raises Exeption: if fn is not a function
"""
if data is None:
raise ValueError("No data is provided")
if not isinstance(data, dict):
raise ValueError("The provided data is not a dictionary")
if fn is None:
return data
## Output
output = None
if not hasattr( fn, '__call__'):
raise Exception("No function is provided")
try:
## Check if data is a dictionary and sample key is present
if isinstance( data, dict ) and 'sample' in data:
## Check if sample is a list and has an element
if isinstance( data['sample'], list ) and len(data['sample']) > 0:
## Now check if the first element in the sample list is a list as well that has an element
if isinstance( data['sample'][0], list ) and len(data['sample'][0]) > 0:
## Seems like it is the legacy format
## The data gets flattened to match the new format
tmp = []
for item in data['sample']:
if isinstance(item, list):
tmp.extend(item)
else:
tmp.append(item)
data['sample'] = tmp
output = fn( data['sample'][0] )
## Perhaps some scrambled format, so sample is sent to the function
else:
output = fn( data['sample'] )
## Probably unknown data source, so it just gets passed to the function
else:
output = fn( data )
except Exception as e:
warnings.warn(e)
pass
## Now that we have the output, so now the rest can be added
keys = ['name', 'notes', 'created_at', 'protocol', 'md5_protocol', 'md5_measurement']
## Low level data information
keys += ['device_name', 'device_version', 'device_id', 'device_battery', 'device_firmware']
for key in keys:
if key in data:
output[key] = data.get(key, None)
return output
[docs]
def view ( data=None ):
"""
Tabular display of data for a single measurent. If a parameter is a
list or dictionary or other type 'n/a' will be displayed as such
content will not be plotted.
:param data: MultispeQ data output
:type data: dict
:return: None
:rtype: NoneType
:raises ValueError: if no data is provided
:raises ValueError: if data is not a dictionary
"""
if data is None:
raise ValueError("No data provided")
if not isinstance(data, dict):
raise ValueError("Provided data needs to be a dictionary")
keys = sorted(data.keys(), key=str.lower)
table_content = []
for key in keys:
value = data[key]
vtype = type(value)
if isinstance(value, (str, float, int )) or value is None:
value = str(value)
else:
value = 'n/a'
table_content.append([key, value, vtype])
table = tabulate(table_content, headers=['Parameter', 'Value', 'Type'])
print(table)
return None
[docs]
def to_df ( data=None, append=None ):
"""
Adding data to dataframe. A new dataframe can be created
or the data can be appended to an existing one, if the
column format is the same.
:param data: MultispeQ data output
:type data: dict or list[dict]
:return: None
:rtype: NoneType
:raises ValueError: if no data is provided
:raises ValueError: if data is not a dictionary
"""
if data is None:
raise ValueError("No data provided")
if not isinstance(data, (dict, list)):
raise ValueError("Provided data needs to be a dictionary or list of dictionarys")
df = pd.DataFrame( data )
return df
[docs]
def count_sub_protocols(protocol = None):
length = None
try:
if not isinstance(protocol, (dict,list) ):
protocol = json.loads(protocol)
except:
return length
if len(protocol) > 0:
pIdx = 0
protocol_count = []
if "_protocol_set_" in protocol[pIdx]:
## Get the minimum Protocol length
length = len(protocol[0]["_protocol_set_"])
## Check for v_arrays
v_arrays = None
if "v_arrays" in protocol[pIdx]:
v_arrays = protocol[pIdx]["v_arrays"]
## Check for set_repeats
set_repeats = 1
if "set_repeats" in protocol[pIdx]:
set_repeats = protocol[pIdx]["set_repeats"]
## Check if repeats are a variable referencing v_arrays
if isinstance(set_repeats, str) and set_repeats.startswith('#l') and v_arrays is not None:
array_index = int(set_repeats[2:])
set_repeats = len(v_arrays[array_index])
## Test if sub-protocols have protocol_repeats
for sp in protocol[pIdx]["_protocol_set_"]:
if "do_once" in sp:
protocol_count.append(1)
if "protocol_repeats" in sp:
p_repeats = sp["protocol_repeats"]
## Check if repeats are a variable referencing v_arrays
if isinstance(p_repeats, str) and p_repeats.startswith('#l') and v_arrays is not None:
array_index = int(p_repeats[2:])
p_repeats = len(v_arrays[array_index])
protocol_count.append(p_repeats * set_repeats)
else:
protocol_count.append(1)
length = sum(protocol_count)
return length