Commit 6d49ef64 authored by yugao@uic.edu's avatar yugao@uic.edu

faster single thread b64 splitter

Need to do:
1.  multi-tread zlib?
2.  decompressor back to mzml
parent 3a7ff6a9
# mscompress-pub.b64_zlib_worker created by bathy at 9/25/2021
import multiprocessing
import pybase64
import numpy as np
import zlib
import multiprocessing as mp
import math
def base64_decoder(base64_data: bytes, number_fmt, compress_method, array_type):
np_dtype_numtype = {'i': np.int32, 'e': np.single, 'f': np.float32, 'q': np.int64, 'd': np.float64}
if base64_data is not None:
num_type = np_dtype_numtype[number_fmt[-1]]
decode_base64 = pybase64._pybase64.b64decode_as_bytearray(base64_data)
if compress_method == 'zlib':
decode_base64 = zlib.decompress(decode_base64)
data = np.frombuffer(decode_base64, dtype=num_type)
# if array_type == 'intensity':
# data = np.log2(np.where(data>0.00001, data, 0.00001)/np.linalg.norm(data)) # performs log only on intensity
else:
data = np.array([])
return data
def worker(data_chunk, mzml_fp, number_fmt, compress_method, out_q):
result=[]
for each in data_chunk:
start, end = each
mzml_fp.seek(start)
data=mzml_fp.read(end-start)
decode_data = base64_decoder(data, number_fmt, compress_method)
result.append(decode_data)
out_q.put(result)
def mp_factorizer(nums, nprocs):
out_q = mp.Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []
for i in range(nprocs):
p = mp.Process(
target=worker,
args=(nums[chunksize * i:chunksize * (i + 1)],
out_q))
procs.append(p)
p.start()
resultdict = {}
for i in range(nprocs):
resultdict.update(out_q.get())
for p in procs:
p.join()
return resultdict
if __name__ == "__main__":
print(mp_factorizer(range(100), 3))
\ No newline at end of file
......@@ -7,7 +7,34 @@ import pybase64
import zlib
from xml.etree import cElementTree as ET
np_dtype_numtype = {'i': np.int32, 'e': np.single, 'f': np.float32, 'q': np.int64, 'd': np.float64}
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
def decode_pos(data_position, mzml_fp, data_fmt, array_type):
data_start, data_end = data_position
mzml_fp.seek(data_start)
data = mzml_fp.read(data_end-data_start)
return base64_decoder(data, data_fmt['data_encoding'], data_fmt['data_compression'], array_type)
def base64_decoder(base64_data: bytes, number_fmt, compress_method, array_type):
np_dtype_numtype = {'i': np.int32, 'e': np.single, 'f': np.float32, 'q': np.int64, 'd': np.float64}
if base64_data is not None:
num_type = np_dtype_numtype[number_fmt[-1]]
decode_base64 = pybase64._pybase64.b64decode_as_bytearray(base64_data)
if compress_method == 'zlib':
decode_base64 = zlib.decompress(decode_base64)
data = np.frombuffer(decode_base64, dtype=num_type)
# if array_type == 'intensity':
# data = np.log2(np.where(data>0.00001, data, 0.00001)/np.linalg.norm(data)) # performs log only on intensity
else:
data = np.array([])
return data
accession_dict = {"MS:1000519": "32i",
"MS:1000520": "16e",
......@@ -20,73 +47,61 @@ accession_dict = {"MS:1000519": "32i",
"MS:1000514": "mass"}
def find_string(file_name, match_tag_start, match_tag_end, data_format, spec_no):
start_time=time.time()
def find_string(mzml_read_fp, match_tag_start, match_tag_end, data_format, spec_no):
start_time = time.time()
file_name = mzml_read_fp.name
file_size = os.path.getsize(file_name)
encoded_start_tag = match_tag_start.encode('utf-8')
encoded_end_tag = match_tag_end.encode('utf-8')
len_start_tag=len(match_tag_start)
len_start_tag = len(match_tag_start)
len_end_tag = len(match_tag_end)
data_positions=[]
data_positions = []
mass_fmt = ("$%s$" % '$'.join([data_format['mass']['data_encoding'], data_format['mass']['data_compression'], data_format['mass']['data_type']])).encode('utf-8')
int_fmt = ("$%s$" % '$'.join([data_format['intensity']['data_encoding'], data_format['intensity']['data_compression'], data_format['intensity']['data_type']])).encode('utf-8')
i=0
with open(file_name, 'r', encoding='utf-8') as f, open(file_name.replace('.mzML', '.smzml'), 'wb') as fo:
i = 0
with open(file_name.replace('.mzML', '.smzml'), 'wb') as fo:
# memory-map the file, size 0 means whole file
m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
last_end=0
m = mmap.mmap(mzml_read_fp.fileno(), 0, access=mmap.ACCESS_READ)
last_end = 0
while True:
if i==spec_no*2:
print("total time used to position all b64 data:",time.time()-start_time)
fo.write(m.read(file_size-m.tell()))
if i == spec_no * 2:
print("total time used to position all b64 data:", time.time() - start_time)
fo.write(m.read(file_size - m.tell()))
return data_positions
start = m.find(encoded_start_tag)
fo.write(m.read(start + len_start_tag - last_end))
m.seek(start)
end = m.find(encoded_end_tag)
data_positions.append((start+len_start_tag, end))
m.seek(end+len_end_tag)
if i%2==0:
data_positions.append((start + len_start_tag, end))
m.seek(end + len_end_tag)
if i % 2 == 0:
fo.write(mass_fmt)
else:
fo.write(int_fmt)
fo.write(b'</binary>')
last_end=end + len_end_tag
i+=1
def base64_decoder(base64_data:bytes, number_fmt, compress_method, array_type):
if base64_data is not None:
num_type = np_dtype_numtype[number_fmt[-1]]
decode_base64 = pybase64._pybase64.b64decode_as_bytearray(base64_data)
if compress_method == 'zlib':
decode_base64 = zlib.decompress(decode_base64)
data = np.frombuffer(decode_base64, dtype=num_type)
#if array_type == 'intensity':
# data = np.log2(np.where(data>0.00001, data, 0.00001)/np.linalg.norm(data)) # performs log only on intensity
else:
data = np.array([])
return data
last_end = end + len_end_tag
i += 1
def mzml_splitter(mzml_file: str):
start=time.time()
start = time.time()
# Generate file names
int_binary_file = mzml_file.replace('.mzML', '.bint')
mass_binary_file = mzml_file.replace('.mzML', '.bmass')
mzml_out = mzml_file.replace('.mzML', '.smzml')
#mzml_out = mzml_file.replace('.mzML', '.smzml')
# Open file pointers
mass_binary_out_fp = open(mass_binary_file, 'wb')
int_binary_out_fp = open(int_binary_file, 'wb')
#mzml_out_fp = open(mzml_out, 'w', newline='\n', encoding='utf-8')
mzml_read_fp = open(mzml_file, 'rb')
# mzml_out_fp = open(mzml_out, 'w', newline='\n', encoding='utf-8')
data_format={}
data_format = {}
# Iterate mzML file and get all data into data_position[]
current_encoding = '32f'
current_compress = 'no compression'
spec_no=0
spec_no = 0
for event, elem in iter(ET.iterparse(mzml_file, events=('start',))):
if elem.tag.endswith('}cvParam'):
if elem.get('accession').endswith(('MS:1000521', 'MS:1000522', 'MS:1000523', 'MS:1000519', 'MS:1000520')): # retrieves the datatype based on MS accession
......@@ -95,66 +110,31 @@ def mzml_splitter(mzml_file: str):
current_compress = accession_dict[elem.get('accession')]
if elem.get('accession').endswith(('MS:1000515', 'MS:1000514')): # retrives array_type
current_type = accession_dict[elem.get('accession')]
data_format[current_type]={'data_encoding': current_encoding,'data_compression': current_compress,'data_type': current_type}
data_format[current_type] = {'data_encoding': current_encoding, 'data_compression': current_compress, 'data_type': current_type}
elif elem.tag.endswith('}spectrumList'):
spec_no=int(elem.get('count'))
print("Total binary data:",spec_no*2)
spec_no = int(elem.get('count'))
print("Total binary data:", spec_no * 2)
elif len(data_format.keys())==2:
elif len(data_format.keys()) == 2:
print("Mass and intensity data format", data_format)
break
data_position=find_string(mzml_file,'<binary>', '</binary>', data_format, spec_no)
# with open(mzml_file, 'r', encoding='utf-8') as mzml_in_fp:
# #m = mmap.mmap(mzml_in_fp.fileno(), 0, access=mmap.ACCESS_READ)
# i=0
# for line in mzml_in_fp:
# if line.lstrip().startswith('<binary>') and i< spec_no*2:
# mzml_out_fp.write(line.split('<binary>')[0])
#
# if i%2==0:
# fmt = data_format['mass']
# else:
# fmt = data_format['intensity']
#
# # m.seek(data_position[i][0])
# # b64_content_byte = m.read(data_position[i][1] - data_position[i][0])
# # number_array = base64_decoder(b64_content_byte, fmt['data_encoding'], fmt['data_compression'], fmt['data_type'])
# # number_fmt = np_dtype_numtype[fmt['data_encoding'][-1]]
# # if fmt['data_type'] == 'mass':
# # mass_binary_out_fp.write(number_array.astype(number_fmt))
# # else:
# # int_binary_out_fp.write(number_array.astype(number_fmt))
#
# mzml_out_fp.write("<binary>$%s$</binary>\n" % '$'.join([fmt['data_encoding'], fmt['data_compression'], fmt['data_type']]))
# i+=1
# else:
# mzml_out_fp.write(line)
# m = mmap.mmap(mzml_in_fp.fileno(), 0, access=mmap.ACCESS_READ)
# m.seek(data_position[i][0])
# b64_content_byte = m.read(data_position[i][1] - data_position[i][0])
# number_array = base64_decoder(b64_content_byte, fmt['data_encoding'], fmt['data_compression'], fmt['data_type'])
# number_fmt = np_dtype_numtype[fmt['data_encoding'][-1]]
# if fmt['data_type'] == 'mass':
# mass_binary_out_fp.write(number_array.astype(number_fmt))
# else:
# int_binary_out_fp.write(number_array.astype(number_fmt))
# Close file pointers
#mzml_out_fp.close()
data_position = find_string(mzml_read_fp, '<binary>', '</binary>', data_format, spec_no)
data_chunks = chunks(data_position,2)
for each_data in data_chunks:
mass_pos, int_pos = each_data
mass_binary_out_fp.write(decode_pos(mass_pos, mzml_read_fp, data_format['mass'], 'mass'))
int_binary_out_fp.write(decode_pos(mass_pos, mzml_read_fp, data_format['intensity'], 'intensity'))
mzml_read_fp.close()
int_binary_out_fp.close()
mass_binary_out_fp.close()
if __name__ == '__main__':
file_name =r'K:\test\test\fusion_20200116_qjl_YLD_19.mzML'
start=time.time()
print("Total file size",os.path.getsize(file_name)/1e9,"GB")
file_name = r'K:\test\test\fusion_20200116_qjl_YLD_19.mzML'
start = time.time()
print("Total file size", os.path.getsize(file_name) / 1e9, "GB")
mzml_splitter(file_name)
print(time.time()-start)
print(time.time() - start)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment