"""
Take measurements using the MultispeQ device connected to
a serial port. The returned data can be analyzed and viewed.
"""
import datetime
import json
import re
import warnings
import hashlib
import os
import pandas as pd
from jii_multispeq.constants import REGEX_RETURN_END
from jii_multispeq.measurement.sanitize import sanitize
from tabulate import tabulate
from jii_multispeq.measurement.checksum import strip_crc32
[docs]
def measure ( connection=None, protocol=[{}], filename='auto', notes="", directory="./local/" ):
"""
Take a measurement using a MultispeQ connected via a serial connection (USB or Bluetooth).
:param connection: Connection to the MultispeQ.
:type connection: serial
:param protocol: Measurement Protocol
:type protocol: str, dict or list
:param filename: Name for saved measurement file. If set to None, no file is saved, Default name is current date and time.
:type filename: str
:param notes: Notes for the measurement
:type notes: str
:param directory: Directory the measurement is saved in. Default directory is "local".
:type directory: str
:return: The MultispeQ data is returned on success, otherwise None.
:rtype: str
:raises ValueError: if no connection is defined
:raises ValueError: if no protocol for the MultispeQ is provided
:raises ValueError: if protocol is not encoded as a string or dictionary
:raises ValueError: if notes are not provided as a string
:raises ValueError: if directory is not provided as a string
:raises Exception: if connection is not open or device connected
"""
start = datetime.datetime.now()
if connection is None:
raise ValueError("A connection for the MultispeQ needs to be defined")
if not isinstance(protocol, (dict, str, list)):
raise ValueError("Provided protocol needs to be a string or dictionary")
if not isinstance(notes, str):
raise ValueError("Provided notes have to be a string")
if not isinstance(directory, str):
raise ValueError("Provided directory has to be a string")
if filename == 'auto':
filename = start.strftime("%Y-%m-%d_%H%M")
# Check if the connection is open
if not connection.is_open:
raise Exception("Connection not open, connect device to port first")
# Check if the protocol is a dictionary and stringify
if isinstance( protocol, (dict, list) ):
protocol = json.dumps( protocol, indent=None)
# Write the protocol to the Instrument
connection.write( protocol.encode() )
# Send linebreak to start protocol
connection.write( "\r\n".encode() )
# Data string
data = ""
# Regular expression to test for CRC32 checksum
prog = re.compile( REGEX_RETURN_END, re.M | re.I )
# Read port
while True:
data += connection.readline().decode()
# Stop reading when linebreak received
if prog.search( data ):
break
# Remove linebreaks and split crc and data
data, crc32 = strip_crc32( data )
## Sanitize Strings
json_str = sanitize( data )
try:
data = json.loads(json_str)
except json.decoder.JSONDecodeError as e:
warnings.warn(e)
return { 'Error': e }, crc32 #TODO: pass
# Show a warning for battery below 25%
if 'device_battery' in data and data['device_battery'] < 25:
warnings.warn('Device battery low! Currently at %s%%, recharge soon.' % data['device_battery'])
# Add filename
if not filename is None:
data['name'] = filename
# Add Notes
data['notes'] = notes
# Add Date
data['created_at'] = start.astimezone().isoformat()
# Add Protocol
data['protocol'] = json.loads(protocol)
# Calculate Checksums
data['md5_protocol'] = hashlib.md5( protocol.encode() ).hexdigest()
data['md5_measurement'] = hashlib.md5( json_str.encode() ).hexdigest()
## Save file with measurements in notebook format
if not filename is None:
if not os.path.exists( directory ):
os.makedirs( directory )
path = os.path.join( directory, (filename + '.json') )
## Check if filename already exists. If true, " - #"" is attached
i = 1
while os.path.exists(path):
path = os.path.join( directory, '%s - %s.json' % (filename,i) )
i += 1
## Write to disk
with open( path, 'w') as fp:
json.dump(data, fp, indent=2)
return data, crc32
[docs]
def analyze ( data=None, fn=None ):
"""
Pipe data from the MultispeQ measurement to an analysis function.
This can be done manually, but this helps with the convoluted
structure of the measurement output.
:param data: MultispeQ data output
:type data: dict
:param fn: Function to analyze the provided data
:type fn: function
:return: Analyzed data output, returns None if fails
:rtype: dict
:raises ValueError: if no data is provided
:raises Exeption: if fn is not a function
"""
if data is None:
raise ValueError("No data is provided")
if not isinstance(data, dict):
raise ValueError("The provided data is not a dictionary")
if fn is None:
return data
## Output
output = None
if not hasattr( fn, '__call__'):
raise Exception("No function is provided")
try:
## Check if data is a dictionary and sample key is present
if isinstance( data, dict ) and 'sample' in data:
## Check if sample is a list and has an element
if isinstance( data['sample'], list ) and len(data['sample']) > 0:
## Now check if the first element in the sample list is a list as well that has an element
# if isinstance( data['sample'][0], list ) and len(data['sample'][0]) > 0:
## Seems like it is the standard format
# output = fn( data['sample'][0][0] )
## TODO: Not sure if we should add data back or how we address it...
# And the raw data
# for key in data['sample'][0][0].keys():
# output[key] = data['sample'][0][0].get(key, None)
output = fn( data['sample'][0] )
## Perhaps some scrambled format, so sample is sent to the function
else:
output = fn( data['sample'] )
## Probably unknown data source, so it just gets passed to the function
else:
output = fn( data )
except Exception as e:
warnings.warn(e)
pass
## Now that we have the output, so now the rest can be added
keys = ['name', 'notes', 'created_at', 'protocol', 'md5_protocol', 'md5_measurement']
## Low level data information
keys += ['device_name', 'device_version', 'device_id', 'device_battery', 'device_firmware']
for key in keys:
if key in data:
output[key] = data.get(key, None)
return output
[docs]
def view ( data=None ):
"""
Tabular display of data for a single measurent. If a parameter is a
list or dictionary or other type 'n/a' will be displayed as such
content will not be plotted.
:param data: MultispeQ data output
:type data: dict
:return: None
:rtype: NoneType
:raises ValueError: if no data is provided
:raises ValueError: if data is not a dictionary
"""
if data is None:
raise ValueError("No data provided")
if not isinstance(data, dict):
raise ValueError("Provided data needs to be a dictionary")
keys = sorted(data.keys(), key=str.lower)
table_content = []
for key in keys:
value = data[key]
vtype = type(value)
if isinstance(value, (str, float, int )) or value is None:
value = str(value)
else:
value = 'n/a'
table_content.append([key, value, vtype])
table = tabulate(table_content, headers=['Parameter', 'Value', 'Type'])
print(table)
return None
[docs]
def to_df ( data=None, append=None ):
"""
Adding data to dataframe. A new dataframe can be created
or the data can be appended to an existing one, if the
column format is the same.
:param data: MultispeQ data output
:type data: dict or list[dict]
:return: None
:rtype: NoneType
:raises ValueError: if no data is provided
:raises ValueError: if data is not a dictionary
"""
if data is None:
raise ValueError("No data provided")
if not isinstance(data, (dict, list)):
raise ValueError("Provided data needs to be a dictionary or list of dictionarys")
df = pd.DataFrame( data )
return df