Skip to content

Commit

Permalink
feat WIP: process tic positive and negative
Browse files Browse the repository at this point in the history
  • Loading branch information
Lan Le committed Sep 2, 2024
1 parent 2ee2616 commit b54c068
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 124 deletions.
3 changes: 2 additions & 1 deletion chem_spectra/lib/composer/lcms.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def __gen_config(self):
def __gen_ms_spectra(self):
msspcs = []
ms_tempfile = tempfile.TemporaryFile()
for time, value in self.core.data.items():
spectra_data = self.core.data[2] # the 1st and 2nd is tic positive and negative
for time, value in spectra_data.items():
xs, ys = value['mz'], value['intensities']
msspc = [
'##PAGE={}\n'.format(time),
Expand Down
147 changes: 147 additions & 0 deletions chem_spectra/lib/composer/ms_fix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import matplotlib
matplotlib.use('Agg')

import tempfile # noqa: E402
import matplotlib.pyplot as plt # noqa: E402

from chem_spectra.lib.composer.base import BaseComposer # noqa: E402


TEXT_SPECTRUM_ORIG = '$$ === CHEMSPECTRA SPECTRUM ORIG ===\n'
TEXT_MS_DATA_TABLE = '##DATA TABLE= (XY..XY), PEAKS\n' # '##XYDATA= (X++(Y..Y))\n' # noqa


class MSComposer(BaseComposer):
def __init__(self, core):
super().__init__(core)
self.title = core.fname
self.meta = self.__compose()

def __gen_headers_spectrum_orig(self):
return [
'\n',
TEXT_SPECTRUM_ORIG,
'##TITLE={}\n'.format(self.title),
'##JCAMP-DX=5.00\n',
'##DATA TYPE={}\n'.format('MASS SPECTRUM'),
'##DATA CLASS= NTUPLES\n',
'##ORIGIN=\n',
'##OWNER=\n',
'##SPECTROMETER/DATA SYSTEM=\n',
'##.SPECTROMETER TYPE={}\n'.format(self.core.dic.get('SPECTROMETER TYPE', '')), # TRAP # noqa: E501
'##.INLET={}\n'.format(self.core.dic.get('INLET', '')), # GC
'##.IONIZATION MODE={}\n'.format(self.core.dic.get('IONIZATION MODE', '')), # EI+ # noqa: E501
'##$CSCATEGORY=SPECTRUM\n',
'##$CSSCANAUTOTARGET={}\n'.format(self.core.auto_scan),
'##$CSSCANEDITTARGET={}\n'.format(
self.core.edit_scan or self.core.auto_scan
),
'##$CSSCANCOUNT={}\n'.format(len(self.core.datatables)),
'##$CSTHRESHOLD={}\n'.format(self.core.thres / 100),
]

def __gen_ntuples_begin(self):
return ['##NTUPLES={}\n'.format('MASS SPECTRUM')]

def __gen_ntuples_end(self):
return ['##END NTUPLES={}\n'.format('MASS SPECTRUM')]

def __gen_config(self):
return [
'##VAR_NAME= MASS, INTENSITY, RETENTION TIME\n',
'##SYMBOL= X, Y, T\n',
'##VAR_TYPE= INDEPENDENT, DEPENDENT, INDEPENDENT\n',
'##VAR_FORM= AFFN, AFFN, AFFN\n',
'##VAR_DIM= , , 3\n',
'##UNITS= M/Z, RELATIVE ABUNDANCE, SECONDS\n',
'##FIRST= , , 1\n',
'##LAST= , , {}\n'.format(len(self.core.datatables)),
]

def __gen_ms_spectra(self):
msspcs = []
# with open('ms_compose.txt', 'a') as tmpfile:
# for idx, dt in enumerate(self.core.datatables):
# msspc = [
# '##PAGE={}\n'.format(idx + 1),
# '##NPOINTS={}\n'.format(dt['pts']),
# TEXT_MS_DATA_TABLE,
# ]
# my_content = msspc + dt['dt']
# tmpfile.write(''.join(my_content))

for idx, dt in enumerate(self.core.datatables):
msspc = [
'##PAGE={}\n'.format(idx + 1),
'##NPOINTS={}\n'.format(dt['pts']),
TEXT_MS_DATA_TABLE,
]
msspcs = msspcs + msspc + dt['dt']
# with open('ms_compose.txt', 'r') as tmpfile:
# # msspcs = tmpfile.read()
# lines = tmpfile.readlines()
# msspcs = ''.join(lines)
return msspcs

def __compose(self):
meta = []
meta.extend(self.__gen_headers_spectrum_orig())

meta.extend(self.__gen_ntuples_begin())
meta.extend(self.__gen_config())
meta.extend(self.__gen_ms_spectra())
meta.extend(self.__gen_ntuples_end())

# meta.extend(self.generate_original_metadata())

meta.extend(self.gen_ending())
return meta

def __prism(self, spc):
blues_x, blues_y, greys_x, greys_y = [], [], [], []
thres = 0
if spc.shape[0] > 0: # RESOLVE_VSMBNAN2
thres = spc[:, 1].max() * (self.core.thres / 100)

for pt in spc:
x, y = pt[0], pt[1]
if y >= thres:
blues_x.append(x)
blues_y.append(y)
else:
greys_x.append(x)
greys_y.append(y)
return blues_x, blues_y, greys_x, greys_y

def prism_peaks(self):
idx = (self.core.edit_scan or self.core.auto_scan) - 1
spc = self.core.spectra[idx]
return self.__prism(spc) + tuple([idx+1])

def tf_img(self):
plt.rcParams['figure.figsize'] = [16, 9]
plt.rcParams['font.size'] = 14
# PLOT data
blues_x, blues_y, greys_x, greys_y, _ = self.prism_peaks()
plt.bar(greys_x, greys_y, width=0, edgecolor='#dddddd')
plt.bar(blues_x, blues_y, width=0, edgecolor='#1f77b4')

# PLOT label
plt.xlabel('X (m/z)', fontsize=18)
plt.ylabel('Y (Relative Abundance)', fontsize=18)
plt.grid(False)

# Save
tf = tempfile.NamedTemporaryFile(suffix='.png')
plt.savefig(tf, format='png')
tf.seek(0)
plt.clf()
plt.cla()
return tf

def tf_csv(self):
return None

def generate_nmrium(self):
return None

15 changes: 13 additions & 2 deletions chem_spectra/lib/converter/lcms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,21 @@ def __init__(self, target_dir, params=False, fname=''):
self.data = self.__read(target_dir, fname)

def __read(self, target_dir, fname):
tic_positive_file_path = os.path.join(target_dir, 'TIC_PLUS.csv')
tic_postive_data = self.__read_tic(tic_positive_file_path)

tic_negative_file_path = os.path.join(target_dir, 'TIC_MINUS.csv')
tic_negative_data = self.__read_tic(tic_negative_file_path, True)

spectra_file_path = os.path.join(target_dir, 'MZ_Spectra.csv')
data_frame = pd.read_csv(spectra_file_path, index_col='time', header=0)
grouped_df = data_frame.groupby('time').agg(list)
grouped_dict = {time: {'mz': group['mz'], 'intensities': group['intensities']} for time, group in grouped_df.iterrows()}
return grouped_dict
data_dict = {time: {'mz': group['mz'], 'intensities': group['intensities']} for time, group in grouped_df.iterrows()}
return [tic_postive_data, tic_negative_data, data_dict]

def __read_tic(self, file_path, is_negative = False):
data_frame = pd.read_csv(file_path, header=0)
tic_postive_data = data_frame.to_dict(orient='list')
return tic_postive_data


8 changes: 0 additions & 8 deletions tests/lib/composer/test_lcms_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,3 @@ def test_init_lcms_composer_success(zip_file):

assert lcms_composer is not None
assert lcms_composer.core == lcms_converter

# def test_ms_composer_original_metadata(jcamp_file):
# base_converter = JcampBaseConverter(jcamp_file)
# ms_converter = JcampMSConverter(base=base_converter)
# ms_composer = MSComposer(core=ms_converter)

# assert ms_composer is not None
# assert '$$ === CHEMSPECTRA ORIGINAL METADATA ===\n' in ms_composer.meta
132 changes: 19 additions & 113 deletions tests/lib/converter/lcms/test_lcms_converter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter as LCMSConveter
import tempfile
import zipfile
import mimetypes
import base64
from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter as LCMSConveter

target_dir = './tests/fixtures/source/lcms/lcms.zip'

Expand All @@ -23,125 +23,31 @@ def test_lcms_converter_failed():
converter = LCMSConveter(None)
assert converter.data is None

def test_bagit_convert_to_jcamp():
def test_lcms_converter_success():
with tempfile.TemporaryDirectory() as td:
with zipfile.ZipFile(target_dir, 'r') as z:
z.extractall(td)

converter = LCMSConveter(td, fname='lcms')
assert converter.data is not None
# assert len(converter.data) == 1

# def test_bagit_convert_to_jcamp_cv_layout():
# with tempfile.TemporaryDirectory() as td:
# with zipfile.ZipFile(cv_layout_path, 'r') as z:
# z.extractall(td)

# converter = BagItConveter(td)
# jcamp = converter.data[0]
# assertJcampContent(jcamp, '##DATA TYPE=CYCLIC VOLTAMMETRY')

# def test_bagit_convert_to_jcamp_aif_layout():
# with tempfile.TemporaryDirectory() as td:
# with zipfile.ZipFile(aif_layout_path, 'r') as z:
# z.extractall(td)

# converter = BagItConveter(td)
# jcamp = converter.data[0]
# assertJcampContent(jcamp, '##DATA TYPE=SORPTION-DESORPTION MEASUREMENT')

# def test_bagit_convert_to_jcamp_emissions_layout():
# with tempfile.TemporaryDirectory() as td:
# with zipfile.ZipFile(emissions_layout_path, 'r') as z:
# z.extractall(td)

# converter = BagItConveter(td)
# jcamp = converter.data[0]
# assertJcampContent(jcamp, '##DATA TYPE=Emissions')

# def test_bagit_convert_to_jcamp_dls_acf_layout():
# with tempfile.TemporaryDirectory() as td:
# with zipfile.ZipFile(dls_acf_layout_path, 'r') as z:
# z.extractall(td)

# converter = BagItConveter(td)
# jcamp = converter.data[0]
# assertJcampContent(jcamp, '##DATA TYPE=DLS ACF')

# def test_bagit_convert_to_jcamp_dls_intensity_layout():
# with tempfile.TemporaryDirectory() as td:
# with zipfile.ZipFile(dls_intensity_layout_path, 'r') as z:
# z.extractall(td)

# converter = BagItConveter(td)
# jcamp = converter.data[0]
# assertJcampContent(jcamp, '##DATA TYPE=DLS intensity')

# def test_bagit_convert_to_images():
# with tempfile.TemporaryDirectory() as td:
# with zipfile.ZipFile(cv_layout_path, 'r') as z:
# z.extractall(td)

# converter = BagItConveter(td)
# assert converter.images is not None
# assert len(converter.images) == 3
# pngImage = converter.images[0]
# assertFileType(pngImage, 'image/png')

# def test_bagit_convert_to_csv():
# with tempfile.TemporaryDirectory() as td:
# with zipfile.ZipFile(cv_layout_path, 'r') as z:
# z.extractall(td)

# converter = BagItConveter(td)
# assert converter.list_csv is not None
# assert len(converter.list_csv) == 3
# csvFile = converter.list_csv[0]
# assertFileType(csvFile, 'text/csv')
assert len(converter.data) == 3

# def test_get_base64_data_failed():
# converter = BagItConveter(None)
# data = converter.get_base64_data()
# assert data is None

# def test_get_base64_data_succeed():
# with tempfile.TemporaryDirectory() as td:
# with zipfile.ZipFile(cv_layout_path, 'r') as z:
# z.extractall(td)

# converter = BagItConveter(td)
# list_base64 = converter.get_base64_data()
# assert len(list_base64) == 3
# for base64Str in list_base64:
# isBase64(base64Str)

# def test_get_combined_image_failed():
# converter = BagItConveter(None)
# combined_image = converter.combined_image
# assert combined_image is None

# def test_get_combined_image():
# with tempfile.TemporaryDirectory() as td:
# with zipfile.ZipFile(cv_layout_path, 'r') as z:
# z.extractall(td)

# converter = BagItConveter(td)
# combined_image = converter.combined_image
# assertFileType(combined_image, 'image/png')

# def test_bagit_has_one_file_no_combined_image():
# with tempfile.TemporaryDirectory() as td:
# with zipfile.ZipFile(dls_acf_layout_path, 'r') as z:
# z.extractall(td)
def test_lcms_converter_tic_positive():
with tempfile.TemporaryDirectory() as td:
with zipfile.ZipFile(target_dir, 'r') as z:
z.extractall(td)

# converter = BagItConveter(td)
# assert converter.combined_image is None
converter = LCMSConveter(td, fname='lcms')
tic_positive = converter.data[0]
assert len(tic_positive['time']) > 0
assert len(tic_positive['Intensity']) > 0

# def test_bagit_convert_to_jcamp_dsc_layout():
# with tempfile.TemporaryDirectory() as td:
# with zipfile.ZipFile(dsc_layout_path, 'r') as z:
# z.extractall(td)
def test_lcms_converter_tic_negative():
with tempfile.TemporaryDirectory() as td:
with zipfile.ZipFile(target_dir, 'r') as z:
z.extractall(td)

# converter = BagItConveter(td)
# jcamp = converter.data[0]
# assertJcampContent(jcamp, '##DATA TYPE=DIFFERENTIAL SCANNING CALORIMETRY')
converter = LCMSConveter(td, fname='lcms')
tic_positive = converter.data[1]
assert len(tic_positive['time']) > 0
assert len(tic_positive['Intensity']) > 0

0 comments on commit b54c068

Please sign in to comment.