Source code for pycollect.decode
"""
=========
GE Decode
=========
Synchronize in decoder with a buffer reference, decoder `GEDecode` will attempt
to process new data added to the buffer.
.. code:: ipython3
decoder = GEDecode(device.BUFFER)
decoder.process(True)
There are a set of methods for clear correctly the stored buffer and recollected data.
.. code:: ipython3
device.clear_buffer() # clear input buffer.
decoder.clear_buffer() # clear decoded data, breaks the synchrony.
decoder.clear_data() # clear recollected data.
"""
import os
import time
import struct
import json
from threading import Thread
from datetime import datetime, timedelta, date
from .dataconstants import CONST
from .edfwriter import EDF, EDFChannel
from .headers import DatexHeaderResponse, PhysiologicalData, HeaderHandler
from .measures import WAVEFORMS_DICT, LABEL_TO_DICT, GROUPS_DICT
from pandas import DataFrame, np
########################################################################
[docs]class GEDecode:
"""Decode raw data into Pandas DataFrames.
There are two main objects:
* `DATA_SUBRECORD`: Pandas DataFrame with the subrecords data.
* `DATA_WAVE`: Dictionary with waveform name as key with the
Pandas DataFrame with the waveform data.
"""
#----------------------------------------------------------------------
[docs] def __init__(self, buffer, filter_subrecords=None, filter_waveforms=None):
"""
Parameters
----------
buffer: array reference
Reference with raw data, the processer will attempt to decoded new
data added to this object.
filter_subrecords: array, optional
Sublist with desired subrecords, if empty then will process all available measures.
filter_waveforms: array, optional
List of desired waveforms, if emty then will process all requested waveforms.
"""
self.m_list = []
self.record_list = []
self.frame_list = []
if filter_subrecords is None:
filter_subrecords = []
if filter_waveforms is None:
filter_waveforms = []
# Display user data
self.DATA_SUBRECORD = DataFrame()
self.DATA_WAVE = {}
# Read and write intern data
self.__DATA_SUBRECORD__ = DataFrame()
self.__DATA_WAVE__ = {}
# Modules, groups and measures availables
self.MODULES = []
self.MODULES_ACTIVE = []
self.MEANSURES_AVAILABLE = []
# Sublist with measures
self.FILTER_SUBRECORDS = filter_subrecords
self.FILTER_WAVEFORMS = filter_waveforms
# Temporal and permanent buffers
self.BUFFER = buffer
self.PROCCESED_BUFFER = []
self.PROCESSING = False
self.m_fstart = True
self.m_storestart = False
self.m_storeend = False
self.m_bitshiftnext = False
self.m_transmissionstart = True
# Default header can be empty
self.edf_header = {}
# First process in initialization
self.__processing__()
#----------------------------------------------------------------------
[docs] def clear_buffer(self):
"""Clear the processed buffer.
This action brake the synchrony with GEDevice.
"""
self.BUFFER = []
self.PROCCESED_BUFFER = []
#----------------------------------------------------------------------
[docs] def clear_data(self):
"""Clear the processed data."""
self.DATA_SUBRECORD = DataFrame()
self.DATA_WAVE = {}
self.__DATA_SUBRECORD__ = DataFrame()
self.__DATA_WAVE__ = {}
# self.DATA_TREND_10S = DataFrame()
#----------------------------------------------------------------------
[docs] def process(self, flag=True, delay=1):
"""Enable or disable the continuous data processing.
Parameters
----------
flag: bool, True
Enable or disable the continuous data processing.
delay: integer in seconds
Delay in seconds between each process attempt.
"""
self.PROCESSING = flag
if self.PROCESSING:
self.thr_process = Thread(target=self.__continuous_processing__, args=(delay, ))
self.thr_process.start()
#----------------------------------------------------------------------
[docs] def __continuous_processing__(self, delay=1/4):
"""Continuous processing.
Parameters
----------
delay: integer in seconds
Delay in seconds between each attempted processing.
"""
while self.PROCESSING:
time.sleep(delay)
self.__processing__()
#----------------------------------------------------------------------
[docs] def __processing__(self):
"""Process the input buffer one byte at a time.
Process the input buffer until a header is completed.
"""
bytes_ = self.BUFFER[len(self.PROCCESED_BUFFER):]
self.PROCCESED_BUFFER = self.BUFFER[:]
for byte in bytes_:
self.__create_framelist__(byte)
if self.frame_list:
record_list = self.__create_recordlist__()
self.read_subrecords(record_list[:])
self.read_waveforms(record_list[:])
self.frame_list = []
#----------------------------------------------------------------------
[docs] def save_as_csv(self, filename):
"""Save the decoded data into a set of CSV files.
Parameters
----------
filename : str
Absolute or realtive path for CSV file.
Returns
-------
list
A list with filenames generated.
"""
if os.path.exists(os.path.dirname(os.path.abspath(filename))):
os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True)
if not '.csv' in filename:
filename = filename + '.csv'
filenames = []
if not self.__DATA_SUBRECORD__.empty:
self.__DATA_SUBRECORD__.to_csv(filename)
filenames.append(filename)
if self.__DATA_WAVE__:
for df in self.__DATA_WAVE__:
self.__DATA_WAVE__[df].to_csv(filename.replace('.csv', '.{}.csv'.format(df)))
filenames.append(filename.replace('.csv', '.{}.csv'.format(df)))
return filenames
#----------------------------------------------------------------------
[docs] def set_edf_header(self, **header):
"""Set the EDF+ patient header.
Parameters
----------
admincode : str
Sets the admincode.
birthdate : date object from datetime
Sets the birthdate.
equipment : str
Describes the measurement equpipment.
gender : int
Sets the gender, 1 is male, 0 is female.
patientcode : str
Sets the patient code.
patientname : str
Sets the patient name.
patient_additional : str
Sets the additional patient info.
recording_additional : str
Sets the additional recordinginfo.
startdate : datetime object
Sets the recording start Time.
technician : str
Sets the technicians name.
"""
self.edf_header = header
#----------------------------------------------------------------------
[docs] def save_as_raw(self, filename):
"""Save the decoded data into a RAW file.
Parameters
----------
filename : str
Absolute or realtive path for RAW file.
Returns
-------
list
A list with filenames generated.
"""
if os.path.exists(os.path.dirname(os.path.abspath(filename))):
os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True)
if not '.raw' in filename:
filename = filename + '.raw'
if self.BUFFER:
file = open(filename, 'wb')
file.write(bytes(self.BUFFER))
file.close()
return [filename]
return []
#----------------------------------------------------------------------
[docs] def save_as_edf(self, filename, edf_header=None, annotations=None):
"""Save the decoded data into a set of EDF+ files.
Parameters
----------
filename : str
Absolute or realtive path for EDF+ file.
edf_header: dict
Declare the EDF+ patient header.
Returns
-------
list
A list with filenames genrated.
"""
if edf_header:
pass
else:
edf_header = self.edf_header
if annotations is None:
annotations = []
if edf_header is None:
edf_header = {}
#set birthdate in datetime python format
if edf_header.get('birthdate', None):
edf_header['birthdate'] = datetime.fromtimestamp(edf_header.get('birthdate', 0))
else:
edf_header['birthdate'] = date(1900, 1, 1)
# Differents EDF because will have different startdate
l1 = self.save_waves_as_edf(filename, edf_header, annotations)
l2 = self.save_disp_as_edf(filename, edf_header, annotations)
return l1 + l2
#----------------------------------------------------------------------
def save_waves_as_edf(self, filename, edf_header, annotations):
""""""
if os.path.exists(os.path.dirname(os.path.abspath(filename))):
os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True)
if not self.__DATA_WAVE__:
return []
if not '.edf' in filename:
filename = filename + '.edf'
filename = filename.replace('.edf', '.waves.edf')
if os.path.exists(filename):
os.remove(filename)
# edf_header['birthdate'] = datetime.fromtimestamp(edf_header['birthdate'])
edf = EDF(filename)
edf.set_header(**edf_header)
# NOTE: ``first_datetime`` is already a python datetime, so do not need ``fromtimestamp``.
first_datetime = self.__DATA_WAVE__[list(self.__DATA_WAVE__.keys())[0]]['datetime'][0]
# edf.header.update({'startdate': datetime.fromtimestamp(first_datetime),})
edf.header.update({'startdate': first_datetime})
for df in self.__DATA_WAVE__:
wave = self.__DATA_WAVE__[df]
data = np.nan_to_num(np.array(wave['values'].tolist(), dtype=np.float))
channel = EDFChannel(data)
channel['label'] = df
channel['dimension'] = WAVEFORMS_DICT[df]['unit']
channel['sample_rate'] = WAVEFORMS_DICT[df]['samps']
if not WAVEFORMS_DICT[df].get('physical_min', None) is None:
channel['physical_min'] = WAVEFORMS_DICT[df]['physical_min']
else:
channel['physical_min'] = np.floor(min(data))
if not WAVEFORMS_DICT[df].get('physical_max', None) is None:
channel['physical_max'] = WAVEFORMS_DICT[df]['physical_max']
else:
channel['physical_max'] = np.ceil(max(data))
channel['digital_min'] = -2 ** 15
channel['digital_max'] = 2 ** 15
channel['transducer'] = WAVEFORMS_DICT[df]['transducer']
channel['prefilter'] = WAVEFORMS_DICT[df]['prefilter']
edf.add_channel(channel)
for annotation in annotations:
edf.write_annotation(**annotation)
edf.save()
return [filename]
#----------------------------------------------------------------------
def save_disp_as_edf(self, filename, edf_header, annotations):
""""""
if os.path.exists(os.path.dirname(os.path.abspath(filename))):
os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True)
if self.__DATA_SUBRECORD__.empty:
return []
if not '.edf' in filename:
filename = filename + '.edf'
filename = filename.replace('.edf', '.disp.edf')
# edf_header['birthdate'] = datetime.fromtimestamp(edf_header['birthdate'])
if os.path.exists(filename):
os.remove(filename)
edf = EDF(filename)
edf.set_header(**edf_header)
# NOTE: ``first_datetime`` will be of type int/float, so need ``fromtimestamp`` to become in a python datetime object.
first_datetime = json.loads(self.__DATA_SUBRECORD__.iloc[0].to_json()).pop('datetime') / 1e3
# edf.header.update({'startdate': datetime.fromtimestamp(first_datetime),})
edf.header.update({'startdate': datetime.fromtimestamp(first_datetime),})
keys = list(self.__DATA_SUBRECORD__.columns)
for key in keys:
if key == 'datetime':
continue
data = self.__DATA_SUBRECORD__[key].tolist()
# If the first data is not a number, then the array conains labels
if not isinstance(data[0], (int, float)):
continue
# If the first data is a number, but there are a label somewhere
if list(filter(lambda n: isinstance(n, (str, bytes)), data)):
continue
# After eliminate labels, now data contain only numbers,
np_data = np.array(data, dtype=np.float)
channel = EDFChannel(np.nan_to_num(np_data))
channel['label'] = key
channel['dimension'] = LABEL_TO_DICT[key].get('unit', '')
channel['sample_rate'] = 1
# if WAVEFORMS_DICT[df]['physical_min']:
# channel['physical_min'] = 0
# else:
# if WAVEFORMS_DICT[df]['physical_max']:
# channel['physical_max'] = 1
# else:
# channel['physical_max'] = np.ceil(max(data))
# channel['physical_min'] = -5
# channel['physical_max'] = 5
if not LABEL_TO_DICT[key].get('physical_min', None) is None:
channel['physical_min'] = LABEL_TO_DICT[key]['physical_min']
else:
channel['physical_min'] = np.floor(min(data))
if not LABEL_TO_DICT[key].get('physical_max', None) is None:
channel['physical_max'] = LABEL_TO_DICT[key]['physical_max']
else:
channel['physical_max'] = np.ceil(max(data))
channel['digital_min'] = -2 ** 15
channel['digital_max'] = 2 ** 15
channel['transducer'] = ''
channel['prefilter'] = ''
edf.add_channel(channel)
for annotation in annotations:
edf.write_annotation(**annotation)
# edf.
edf.save()
return [filename]
#----------------------------------------------------------------------
[docs] def __create_framelist__(self, byte):
"""Search for a complete and validated header.
Parameters
----------
byte : byte
Byte readed from serial raw data.
"""
if byte == CONST.FRAMECHAR and self.m_fstart:
self.m_fstart = False
self.m_storestart = True
elif byte == CONST.FRAMECHAR and not self.m_fstart:
self.m_fstart = False
self.m_storeend = True
self.m_storestart = False
if byte != CONST.FRAMECHAR:
self.m_list.append(byte)
if self.m_storestart:
if byte == CONST.CTRLCHAR:
self.m_bitshiftnext = True
else:
if self.m_bitshiftnext:
byte |= CONST.BIT5
self.m_bitshiftnext = False
self.m_list.append(byte)
elif byte != CONST.FRAMECHAR:
self.m_list.append(byte)
elif self.m_storeend:
framelen = len(self.m_list)
if framelen:
checksum = sum(self.m_list[:-1]).to_bytes(4, 'big')[-1]
if checksum == self.m_list[-1]:
self.frame_list.append(self.m_list[:])
# else:
# print("Wrong checksum")
self.m_list = []
self.m_storeend = False
else:
self.m_storestart = True
self.m_storeend = False
self.m_fstart = False
#----------------------------------------------------------------------
[docs] def __create_recordlist__(self):
"""Complete the raw header with the correct/full size.
Returns
-------
list
Full sized DatexHeaderResponse.
"""
data_list = []
for header in self.frame_list:
data = header[:-1] + [0]*(DatexHeaderResponse.LENGTH - len(header)) + [header[-1]]
data_list.append(data)
return data_list
#----------------------------------------------------------------------
[docs] def read_waveforms(self, record_list, ignore_missging=False):
"""Update the dictionary of waveforms with new decoded data.
Parameters
----------
record_list: array
List of raw headers.
"""
for record in map(DatexHeaderResponse, record_list):
if record['r_maintype'] != CONST.DRI_MT_WAVE:
continue
sroffArray = [record['sr_offset1'], record['sr_offset2'], record['sr_offset3'], record['sr_offset4'], record['sr_offset5'], record['sr_offset6'], record['sr_offset7'], record['sr_offset8']]
srtypeArray = [record['sr_type1'], record['sr_type2'], record['sr_type3'], record['sr_type4'], record['sr_type5'], record['sr_type6'], record['sr_type7'], record['sr_type8']]
unixtime = record['r_time']
date_ = datetime(1970, 1, 1, 0, 0, 0, 0) + timedelta(seconds=unixtime)
for i, srtype, offset in zip(range(8), srtypeArray, sroffArray):
if srtype == CONST.DRI_EOL_SUBR_LIST:
break
if i == 7:
nextoffset = 1450
else:
nextoffset = sroffArray[i+1]
# When just one waveform is requested.
if nextoffset == 0:
nextoffset = WAVEFORMS_DICT[{d[1]:d[0] for d in CONST.DRI_WAVEFORM.items()}[srtype]]['samps'] * 2
nextoffset = nextoffset + offset + 6
buffer = record['-data,'][6+offset : nextoffset]
for k, m in CONST.DRI_WAVEFORM.items():
if m == srtype:
if self.FILTER_WAVEFORMS and not k in self.FILTER_WAVEFORMS:
continue
# if not k in self.FILTER_WAVEFORMS:
# continue
values = self.read_shorts(buffer[:])
# NO shifting
if not False in (np.array(values) > CONST.DATA_INVALID):
values = [v*WAVEFORMS_DICT[k].get('shift', 1) for v in values]
# values = [v*WAVEFORMS_DICT[k].get('shift_', 1) for v in values]
dates = [date_] * len(values)
# update DATA_WAVE_SUBRECORD
if not k in self.DATA_WAVE:
self.DATA_WAVE[k] = DataFrame({'datetime': dates, 'values': values,})
else:
self.DATA_WAVE[k] = self.DATA_WAVE[k].append(DataFrame({'datetime': dates, 'values': values,}), ignore_index=True, sort=True)
# update __DATA_WAVE_SUBRECORD__
if not k in self.__DATA_WAVE__:
self.__DATA_WAVE__[k] = DataFrame({'datetime': dates, 'values': values,})
else:
self.__DATA_WAVE__[k] = self.__DATA_WAVE__[k].append(DataFrame({'datetime': dates, 'values': values,}), ignore_index=True, sort=True)
if nextoffset <= offset or nextoffset > 1450:
break
if not ignore_missging:
for f in self.DATA_WAVE:
if f not in self.__DATA_WAVE__:
self.DATA_WAVE[f](DataFrame({'datetime': dates[0], 'values': CONST.DATA_INVALID,}))
self.__DATA_WAVE__[f](DataFrame({'datetime': dates[0], 'values': CONST.DATA_INVALID,}))
#----------------------------------------------------------------------
[docs] def read_shorts(self, buffer):
"""Convert an array if bytes from signed 16 bits to integer.
Parameters
----------
buffer: array
Array with raw bytes, It will be processed in pairs (for complete 16 bits).
"""
return [i[0] for i in struct.iter_unpack('<h', bytes(buffer))]
#----------------------------------------------------------------------
[docs] def read_subrecords(self, record_list):
"""Update the DataFrame of subrecords with new decoded data.
Parameters
----------
record_list: array
Raw DatexHeaderResponse.
"""
for record in map(DatexHeaderResponse, record_list):
if record['r_maintype'] != CONST.DRI_MT_PHDB:
continue
sroffArray = [record['sr_offset1'], record['sr_offset2'], record['sr_offset3'], record['sr_offset4'], record['sr_offset5'], record['sr_offset6'], record['sr_offset7'], record['sr_offset8']]
srtypeArray = [record['sr_type1'], record['sr_type2'], record['sr_type3'], record['sr_type4'], record['sr_type5'], record['sr_type6'], record['sr_type7'], record['sr_type8']]
unixtime = record['r_time']
# print(unixtime)
date_ = datetime(1970, 1, 1, 0, 0, 0, 0) + timedelta(seconds=unixtime)
#----------------------------------------------------------------------
# phdata_ptr = PhysiologicalData()
# phdata_ptr.load(record['-data,'])
#----------------------------------------------------------------------
phdata_ptr = PhysiologicalData()
for i, srtype, offset in zip(range(8), srtypeArray, sroffArray):
if srtype == CONST.DRI_EOL_SUBR_LIST:
break
buffer = record['-data,'][4+offset:4+offset + 270]
if i == 0:
phdata_ptr.DATA['basic'] = HeaderHandler(data=buffer, init=phdata_ptr['basic'], size=270).DATA
elif i== 1:
phdata_ptr.DATA['ext1'] = HeaderHandler(data=buffer, init=phdata_ptr['ext1'], size=270).DATA
elif i == 2:
phdata_ptr.DATA['ext2'] = HeaderHandler(data=buffer, init=phdata_ptr['ext2'], size=270).DATA
elif i == 3:
phdata_ptr.DATA['ext3'] = HeaderHandler(data=buffer, init=phdata_ptr['ext3'], size=270).DATA
#----------------------------------------------------------------------
subrecord = FormatSubrecord(date_, phdata_ptr) #.format()
self.MODULES, self.MODULES_ACTIVE, self.MEANSURES_AVAILABLE = subrecord.module_status()
df = subrecord.format(self.MODULES_ACTIVE, self.FILTER_SUBRECORDS)
# DATA_SUBRECORD update
if not df is None:
if self.DATA_SUBRECORD.empty:
self.DATA_SUBRECORD = df.copy()
else:
self.DATA_SUBRECORD = self.DATA_SUBRECORD.append(df.copy(), ignore_index=True, sort=True)
# __DATA_SUBRECORD__ update
if not df is None:
if self.__DATA_SUBRECORD__.empty:
self.__DATA_SUBRECORD__ = df.copy()
else:
self.__DATA_SUBRECORD__ = self.__DATA_SUBRECORD__.append(df.copy(), ignore_index=True, sort=True)
########################################################################
[docs]class FormatSubrecord:
"""Parse raw data into Pandas DataFrames."""
#----------------------------------------------------------------------
[docs] def __init__(self, date_, header):
"""
Parameters
----------
date_ : Datetime object
Datetime of the current set.
header : PhysiologicalData object
Header with raw data.
"""
self.data = {}
self.date = date_
self.header = header
self.data = [
# Basic
('INV-BP', ['p_group', 'p1']),
('INV-BP', ['p_group', 'p2']),
('INV-BP', ['p_group', 'p3']),
('INV-BP', ['p_group', 'p4']),
('INV-BP', ['p_group', 'p5']),
('INV-BP', ['p_group', 'p6']),
('TEMP', ['t_group', 't1']),
('TEMP', ['t_group', 't2']),
('TEMP', ['t_group', 't3']),
('TEMP', ['t_group', 't4']),
'ECG',
'NIBP',
'SpO2',
'CO2',
'O2',
'N2O',
'AA',
'FLOW-VOL',
'CO-WEDGE',
'NMT',
'ECG-EXTRA',
'SvO2',
#Ext1
'ECG-ARRH',
'ECG-12',
#Ext2
'NMT2',
'EEG',
'EEG-BIS',
'ENTROPY',
'EEG2',
# Ext3
'GASEX',
'FLOW-VOL2',
'BAL-GAS',
'TONO',
'AA2',
]
#----------------------------------------------------------------------
[docs] def module_status(self):
"""Return the list of present and active modules.
Based in the status bits is possible determine the state of the
respective module.
Returns
-------
list
List of groups with modules availables.
list
List of groups with modules availables and actives.
list
List of measures availables and actives.
"""
modules = []
modules_active = []
meansures_available = []
for measure in self.data:
if isinstance(measure, tuple):
measure, replace = measure
name = '{} ({})'.format(measure, replace[1])
else:
replace = ['', '']
name = measure
exist, active = self.check_module(measure, replace)
if exist:
modules.append(name)
if active:
modules_active.append(name)
measures = []
for g in GROUPS_DICT[measure]:
if g['label'].endswith(': MOD'):
continue
if g['label'].endswith(': ACT'):
continue
if 'label_format' in g:
d = LABEL_TO_DICT[g['label_format'][1]]['dict']
k = '{subclass}:{key}'.format(**LABEL_TO_DICT[g['label_format'][1]])
label = g['label_format'][0].format(d[self.header[k]])
else:
label = g['label']
if replace[1]:
measures.append(label.replace(measure, name))
else:
measures.append(label)
meansures_available.extend(measures)
return modules, modules_active, meansures_available
#----------------------------------------------------------------------
[docs] def format(self, active, filters=None):
"""Process a subrecord (their raw header), aply shifts and get references.
Parameters
----------
active : array
List of groups to parse.
filters: array, optional
A sub list of desired subrecords.
Returns
-------
DataFrame
DataFrame with single row that contains all measures with label
as headers.
"""
# if not active:
# return
formated = {}
for measure in self.data:
if isinstance(measure, tuple):
measure, replace = measure
else:
replace = ['', '']
# if measure in [st.split(':')[0].split(' ')[0] for st in active]:
for component in GROUPS_DICT[measure]:
label = component['label']
if label.endswith(': ACT') or label.endswith(': MOD'):
continue
if replace[1]:
name = '{} ({})'.format(label.strip(), replace[1])
else:
name = label.strip()
# if filters and not name in filters:
# continue
if filters and not (name in filters):
continue
if 'label_format' in component:
d = LABEL_TO_DICT[component['label_format'][1]]['dict']
k = '{subclass}:{key}'.format(**LABEL_TO_DICT[component['label_format'][1]])
label = component['label_format'][0].format(d[self.header[k]])
# desc = component['desc']
key = '{}:{}'.format(component['subclass'], component['key'].replace(*replace))
shift = component.get('shift', 1)
if 'dict' in component:
dict_ = component['dict']
if self.header[key] == 'False':
value = dict_[0]
elif self.header[key] == 'True':
value = dict_[1]
else:
value = dict_.get(self.header[key], None)
else:
# if isinstance(self.header[key], (int, float)):
if key.split(':')[-1].replace('-', '0').isdigit():
value = self.header[key]
if isinstance(value, (int, float)) and value > CONST.DATA_INVALID:
value = value * shift
else:
value = self.get_short(key)
# value = short_value
if isinstance(value, (int, float)) and value:
if value > CONST.DATA_INVALID:
value = value * shift
# else:
# value = self.header[key]
formated[name] = [value]
if formated:
formated['datetime'] = self.date
return DataFrame(formated)
#----------------------------------------------------------------------
[docs] def get_short(self, key):
"""From index in header, convert data to integer, validates and return it.
Parameters
----------
key : Header index
Location if header of target value.
Returns
-------
int, None, bytearray
Int if valus is a signed 16 bits, None if the value is over range
and bytearray in other cases.
"""
data = self.header[key+',']
if len(data) == 2:
value = struct.unpack('>h', bytes(data))[0]
if value <= CONST.DATA_OVER_RANGE:
return None
else:
return value
else:
return self.header[key]
#----------------------------------------------------------------------
[docs] def check_module(self, label, replace=None):
"""Check the status module from determinate group.
Parameters
----------
label : str
Group for check their respective module.
replace : dict
Some groups are indexed for multiples modules, in that case, the
indexes must be removed.
Returns
-------
bool
Module exist.
bool
Module is active.
"""
mod_exist = LABEL_TO_DICT['{}: MOD'.format(label)]
mod_activ = LABEL_TO_DICT['{}: ACT'.format(label)]
if replace[0]:
e = mod_exist['key'].replace(*replace)
a = mod_activ['key'].replace(*replace)
else:
e = mod_exist['key']
a = mod_activ['key']
exist = self.header['{}:{}'.format(mod_exist['subclass'], e)] == 'True'
activ = self.header['{}:{}'.format(mod_activ['subclass'], a)] == 'True'
return exist, activ