Source code for full_dia.deepmap

import math
from pathlib import Path

import numpy as np
import pandas as pd
import torch
from numba import cuda

from full_dia import cfg, models, utils
from full_dia.utils import create_cuda_zeros

try:
    _ = profile
except NameError:

[docs] def profile(func): return func
[docs] def load_models(dir_center=None, dir_big=None): """ Load DeepProfile-14 and DeepProfile-56 models. """ channels = 2 + cfg.fg_num model_center = load_model_center(dir_center, channels) channels = 4 * (2 + cfg.fg_num) model_big = load_model_big(dir_big, channels) return model_center, model_big
[docs] def load_model_big(dir_model, n_channel): """ Load DeepProfile-56 model. """ model = models.DeepMap(n_channel) device = cfg.gpu_id if dir_model is None: pt_path = Path(__file__).resolve().parent / "pretrained" / "deepbig_ys_fast.pt" model.load_state_dict(torch.load(pt_path, map_location=device)) else: model.load_state_dict(torch.load(dir_model, map_location=device)) model = model.to(device) model.eval() return model
[docs] def load_model_center(dir_model, n_channel): """ Load DeepProfile-14 model. """ model = models.DeepMap(n_channel) device = cfg.gpu_id if dir_model is None: pt_path = ( Path(__file__).resolve().parent / "pretrained" / "deepcenter_ys_fast.pt" ) model.load_state_dict(torch.load(pt_path, map_location=device)) else: model.load_state_dict(torch.load(dir_model, map_location=device)) model = model.to(device) model.eval() return model
[docs] @cuda.jit(device=True) def find_first_index(scan_mz, query_left, query_right): """ Find first index that match the query value Parameters ---------- scan_mz : cuda.array MS data of a cycle with m/z ascending order. query_left : float The target m/z value with - ppm. query_right : float The target m/z value with + ppm. Returns ------- best_j : int The index of the first m/z that matches the query. """ scan_len = len(scan_mz) low = 0 high = scan_len - 1 best_j = 0 if scan_mz[low] == query_left: best_j = low elif scan_mz[high] == query_right: best_j = high else: while high - low > 1: mid = (low + high) // 2 if scan_mz[mid] == query_left: best_j = mid break if scan_mz[mid] < query_left: low = mid else: high = mid if best_j == 0: # on match,high-low=1 if abs(scan_mz[low] - query_left) < abs(scan_mz[high] - query_left): best_j = low else: best_j = high # find first match in list! while best_j > 0: if scan_mz[best_j - 1] == scan_mz[best_j]: best_j = best_j - 1 else: break return best_j
[docs] @cuda.jit def gpu_bin_map( n, cycle_num, idx_start_v, ms1_scan_seek_idx, ms1_scan_im, ms1_scan_mz, ms1_scan_height, ms2_scan_seek_idx, ms2_scan_im, ms2_scan_mz, ms2_scan_height, query_mz_m, ppm_tolerance, query_im_v, im_tolerance, im_gap, result_maps, ms1_ion_num, ): """ Each CUDA thread generates a map (cycle + mobility + intensity) of an elution group. When multiple signals fall into the same bin, retain only the one with the highest intensity. """ thread_idx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x if thread_idx >= n: return # pr idx, ion idx ions_num = query_mz_m.shape[1] k = thread_idx // ions_num ion_idx = thread_idx % ions_num # params query_mz = query_mz_m[k, ion_idx] query_mz_left = query_mz * (1.0 - ppm_tolerance / 1000000.0) query_mz_right = query_mz * (1.0 + ppm_tolerance / 1000000.0) query_im = query_im_v[k] query_im_left = query_im - im_tolerance query_im_right = query_im + im_tolerance im_base = query_im - im_tolerance # both for ms1 and ms2 idx_start = idx_start_v[k] idx_end = idx_start + cycle_num if ion_idx < ms1_ion_num: scans_seek_idx = ms1_scan_seek_idx scans_im = ms1_scan_im scans_mz = ms1_scan_mz scans_height = ms1_scan_height else: scans_seek_idx = ms2_scan_seek_idx scans_im = ms2_scan_im scans_mz = ms2_scan_mz scans_height = ms2_scan_height for cycle_idx, scan_idx in enumerate(range(idx_start, idx_end)): start = scans_seek_idx[scan_idx] end = scans_seek_idx[scan_idx + 1] scan_len = end - start scan_im = scans_im[start:end] scan_mz = scans_mz[start:end] scan_height = scans_height[start:end] seek = find_first_index(scan_mz, query_mz_left, query_mz_right) while seek < scan_len: mz = scan_mz[seek] if mz > query_mz_right: break elif mz < query_mz_left: # exist multiple mz values seek += 1 continue else: im = scan_im[seek] if query_im_left < im < query_im_right: y = scan_height[seek] im_idx = int((im - im_base) / im_gap) y_map_curr = result_maps[k, ion_idx, cycle_idx, im_idx] if y > y_map_curr: result_maps[k, ion_idx, cycle_idx, im_idx] = y seek += 1
[docs] @cuda.jit def gpu_bin_maps( n, locus_num, cycle_num, idx_start_m, ms1_scan_seek_idx, ms1_scan_im, ms1_scan_mz, ms1_scan_height, ms2_scan_seek_idx, ms2_scan_im, ms2_scan_mz, ms2_scan_height, query_mz_m, tol_ppm, query_im_v, tol_im_map, im_gap, result_maps, ms1_ion_num, ): """ maps: [n_pr, n_locus, n_ion, n_cycle, n_im_bin] Each thread generates maps for multi elution groups of a pr When multiple signals fall into the same bin, retain only the one with the highest intensity. """ thread_idx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x if thread_idx >= n: return # pr idx, ion idx ions_num = query_mz_m.shape[1] locus_per_peptide = locus_num * ions_num k = thread_idx // locus_per_peptide locus = thread_idx % locus_per_peptide // ions_num ion_idx = thread_idx % locus_per_peptide % ions_num # params query_mz = query_mz_m[k, ion_idx] query_mz_left = query_mz * (1.0 - tol_ppm / 1000000.0) query_mz_right = query_mz * (1.0 + tol_ppm / 1000000.0) query_im = query_im_v[k] query_im_left = query_im - tol_im_map query_im_right = query_im + tol_im_map im_base = query_im - tol_im_map ## both for ms1 and ms2 idx_start = idx_start_m[k, locus] idx_end = idx_start + cycle_num if ion_idx < ms1_ion_num: scans_seek_idx = ms1_scan_seek_idx scans_im = ms1_scan_im scans_mz = ms1_scan_mz scans_height = ms1_scan_height else: scans_seek_idx = ms2_scan_seek_idx scans_im = ms2_scan_im scans_mz = ms2_scan_mz scans_height = ms2_scan_height for cycle_idx, scan_idx in enumerate(range(idx_start, idx_end)): start = scans_seek_idx[scan_idx] end = scans_seek_idx[scan_idx + 1] scan_len = end - start scan_im = scans_im[start:end] scan_mz = scans_mz[start:end] scan_height = scans_height[start:end] seek = find_first_index(scan_mz, query_mz_left, query_mz_right) while seek < scan_len: x = scan_mz[seek] if x > query_mz_right: break elif x < query_mz_left: # exist multi mz values seek += 1 continue else: im = scan_im[seek] if query_im_left < im < query_im_right: y = scan_height[seek] im_idx = int((im - im_base) / im_gap) y_curr = result_maps[k, locus, ion_idx, cycle_idx, im_idx] if y > y_curr: result_maps[k, locus, ion_idx, cycle_idx, im_idx] = y seek += 1
[docs] @profile def extract_maps( df_batch: pd.DataFrame, idx_start_m: np.ndarray, locus_num: int, cycle_num: int, map_im_size: int, map_gpu_ms1: dict, map_gpu_ms2: dict, tol_ppm: float, tol_im_map: float, im_gap: float, neutron_num: int, ) -> torch.Tensor: """ Extrac maps for multi elution groups of a pr. Parameters ---------- df_batch : pd.DataFrame Provide pr info. idx_start_m : np.ndarray Cycle start index. locus_num : int How many locus to extract for a pr. cycle_num : int How many cycle to extract for a pr. Default 13. map_im_size : float Default 50. map_gpu_ms1 : dict Provide the MS1 data. map_gpu_ms2 : dict Provide the MS2 data. tol_ppm : float The tolerance of ppm. tol_im_map : float The tolerance of im. im_gap : float The bin width of im in a map. neutron_num : int Specify the neutron num. Returns ------- Maps : torch.Tensor The extracted maps. """ batch_size = len(df_batch) idx_start = idx_start_m[df_batch.index] # params if neutron_num == -1: query_mz_ms1 = df_batch["pr_mz_left"].values query_mz_ms1 = np.tile(query_mz_ms1, (2, 1)).T query_mz_ms2 = np.array(df_batch["fg_mz_left"].values.tolist()) query_mz_m = np.concatenate([query_mz_ms1, query_mz_ms2], axis=1) ms1_ion_num = 1 elif neutron_num == 0: query_mz_ms1 = df_batch["pr_mz"].values query_mz_ms1 = np.tile(query_mz_ms1, (2, 1)).T cols_center = ["fg_mz_" + str(i) for i in range(cfg.fg_num)] query_mz_ms2 = df_batch[cols_center].values query_mz_m = np.concatenate([query_mz_ms1, query_mz_ms2], axis=1) ms1_ion_num = 1 elif neutron_num == 1: query_mz_ms1 = df_batch["pr_mz_1H"].values query_mz_ms1 = np.tile(query_mz_ms1, (2, 1)).T cols_1H = ["fg_mz_1H_" + str(i) for i in range(cfg.fg_num)] query_mz_ms2 = df_batch[cols_1H].values query_mz_m = np.concatenate([query_mz_ms1, query_mz_ms2], axis=1) ms1_ion_num = 1 elif neutron_num == 2: query_mz_ms1 = df_batch["pr_mz_2H"].values query_mz_ms1 = np.tile(query_mz_ms1, (2, 1)).T cols_2H = ["fg_mz_2H_" + str(i) for i in range(cfg.fg_num)] query_mz_ms2 = df_batch[cols_2H].values query_mz_m = np.concatenate([query_mz_ms1, query_mz_ms2], axis=1) ms1_ion_num = 1 elif neutron_num > 2: # total ms1_cols = [ "pr_mz_left", "pr_mz", "pr_mz_1H", "pr_mz_2H", "pr_mz_left", "pr_mz", "pr_mz_1H", "pr_mz_2H", ] # unfrag ms1 = df_batch[ms1_cols].values cols_left = ["fg_mz_left_" + str(i) for i in range(cfg.fg_num)] left = df_batch[cols_left].values cols_center = ["fg_mz_" + str(i) for i in range(cfg.fg_num)] center = df_batch[cols_center].values cols_1H = ["fg_mz_1H_" + str(i) for i in range(cfg.fg_num)] fg_1H = df_batch[cols_1H].values cols_2H = ["fg_mz_2H_" + str(i) for i in range(cfg.fg_num)] fg_2H = df_batch[cols_2H].values query_mz_m = np.concatenate([ms1, left, center, fg_1H, fg_2H], axis=1) ms1_ion_num = 4 else: raise ValueError("neutron_num in extract_maps has to be [-1, 1, 2, >2]") query_im_v = df_batch["measure_im"].values # GPU idx_start = cuda.to_device(idx_start) query_mz_m = cuda.to_device(query_mz_m) query_im_v = cuda.to_device(query_im_v) result_maps = create_cuda_zeros( (batch_size, locus_num, query_mz_m.shape[1], cycle_num, map_im_size) ) # kernel func, each thread generates maps for a pr k = batch_size n = k * locus_num * query_mz_m.shape[1] threads_per_block = 512 blocks_per_grid = math.ceil(n / threads_per_block) gpu_bin_maps[blocks_per_grid, threads_per_block]( n, locus_num, cycle_num, idx_start, map_gpu_ms1["scan_seek_idx"], map_gpu_ms1["scan_im"], map_gpu_ms1["scan_mz"], map_gpu_ms1["scan_height"], map_gpu_ms2["scan_seek_idx"], map_gpu_ms2["scan_im"], map_gpu_ms2["scan_mz"], map_gpu_ms2["scan_height"], query_mz_m, tol_ppm, query_im_v, tol_im_map, im_gap, result_maps, ms1_ion_num, ) cuda.synchronize() result_maps = utils.convert_numba_to_tensor(result_maps) return result_maps
[docs] @profile def scoring_maps( model: torch.nn.Module, df_input: pd.DataFrame, map_gpu_ms1: dict, map_gpu_ms2: dict, cycle_num: int, map_im_gap: float, map_im_dim: float, ppm_tolerance: float, im_tolerance: float, neutron_num: int, return_feature: bool = True, ) -> tuple: """ Extract and score the Profile-14 maps. Parameters ---------- model : torch.nn.Module DeepProfile-14 df_input : pd.DataFrame Provide the pr info. map_gpu_ms1 : dict Provide the MS1 data. map_gpu_ms2 : dict Provide the MS2 data. cycle_num : int How many cycle to extract for a pr. Default 13. map_im_gap : float The bin width of im in a map. map_im_dim : float The dimension of im in a map. ppm_tolerance : float The tolerance of ppm. im_tolerance : float The tolerance of im. neutron_num : int Specify the neutron num. return_feature : bool, default=True Whether to return feature or not. Returns ------- tuple pred : torch.Tensor The scores by DeepProfile-14. features : np.ndarray The features by DeepProfile-14. """ # locus locus_m = df_input["locus"].values.reshape(-1, 1) locus_num = locus_m.shape[1] # cycle start and end cycle_total = len(map_gpu_ms1["scan_rts"]) idx_start_m = locus_m - int((cycle_num - 1) / 2) idx_start_m[idx_start_m < 0] = 0 idx_start_max = cycle_total - cycle_num idx_start_m[idx_start_m > idx_start_max] = idx_start_max # in batches feature_v, pred_v = [], [] batch_num = cfg.batch_deep_center for _, df_batch in df_input.groupby(df_input.index // batch_num): maps = extract_maps( df_batch, idx_start_m, locus_num, cycle_num, map_im_dim, map_gpu_ms1, map_gpu_ms2, ppm_tolerance, im_tolerance, map_im_gap, neutron_num=neutron_num, ) # maps: [k, locus, 2+fg_num, map_cycle_dim, map_im_dim] maps = maps.view( maps.shape[0] * maps.shape[1], maps.shape[2], maps.shape[3], maps.shape[4] ) # valid ion nums non_fg_num = maps.shape[1] - cfg.fg_num valid_ion_nums = non_fg_num + df_batch["fg_num"].values valid_ion_nums = ( torch.from_numpy(np.repeat(valid_ion_nums, locus_num)).long().to(cfg.gpu_id) ) with torch.no_grad(): # with torch.cuda.amp.autocast(): feature, pred = model(maps, valid_ion_nums) torch.cuda.synchronize() # for profile pred = torch.softmax(pred, 1) pred = pred[:, 1].view(len(df_batch), locus_num) pred_v.append(pred) if return_feature: feature = feature.view(len(df_batch), locus_num, -1) feature = feature.cpu() feature = feature.numpy() feature_v.append(feature) pred = torch.cat(pred_v).to(dtype=torch.float32) # torch autocast to 16 feature = np.vstack(feature_v) if return_feature else None return pred, feature
[docs] @profile def extract_scoring_big( model_center: torch.nn.Module, model_big: torch.nn.Module, df_input: pd.DataFrame, map_gpu_ms1: dict, map_gpu_ms2: dict, cycle_num: int, map_im_gap: float, map_im_dim: float, ppm_tolerance: float, im_tolerance: float, ) -> tuple: """ Extrac and scoring Maps using DeepProfile-14 and DeepProfile-56. Parameters ---------- model_center : torch.nn.Module The DeepProfile-14 model. model_big : torch.nn.Module The DeepProfile-56 model. df_input : pd.DataFrame Provide the pr info. map_gpu_ms1 : dict Provide the MS1 data. map_gpu_ms2 : dict Provide the MS2 data. cycle_num : int How many cycle to extract for a pr. Default 13. map_im_gap : float The bin width of im in a map. map_im_dim : float The dimension of im in a map. ppm_tolerance : float The ppm tolerance. im_tolerance : float The mobility tolerance. Returns ------- tuple pred_v : list[np.ndarray] The deep scores for [14-left, 14-center, 14-1H, 14-2H, 56-total] feature_v : list[np.ndarray] The deep features for [14-left, 14-center, 14-1H, 14-2H, 56-total] """ # locus locus_v = df_input["locus"].values # cycle start and end cycle_total = len(map_gpu_ms1["scan_rts"]) idx_start_v = locus_v - int((cycle_num - 1) / 2) idx_start_v[idx_start_v < 0] = 0 idx_start_max = cycle_total - cycle_num idx_start_v[idx_start_v > idx_start_max] = idx_start_max # params ms1_cols = [ "pr_mz_left", "pr_mz", "pr_mz_1H", "pr_mz_2H", "pr_mz_left", "pr_mz", "pr_mz_1H", "pr_mz_2H", ] # unfrag ms1 = df_input[ms1_cols].values cols_left = ["fg_mz_left_" + str(i) for i in range(cfg.fg_num)] left = df_input[cols_left].values cols_center = ["fg_mz_" + str(i) for i in range(cfg.fg_num)] center = df_input[cols_center].values cols_1H = ["fg_mz_1H_" + str(i) for i in range(cfg.fg_num)] fg_1H = df_input[cols_1H].values cols_2H = ["fg_mz_2H_" + str(i) for i in range(cfg.fg_num)] fg_2H = df_input[cols_2H].values query_mz_m = np.concatenate([ms1, left, center, fg_1H, fg_2H], axis=1) ms1_ion_num = 4 query_im_v = df_input["measure_im"].values # cuda input idx_start_v = cuda.to_device(idx_start_v) query_mz_m = cuda.to_device(query_mz_m) query_im_v = cuda.to_device(query_im_v) # cuda output n = len(df_input) ions_num = query_mz_m.shape[1] maps = create_cuda_zeros((n, ions_num, cycle_num, map_im_dim)) # kernel func, each thread for a elution groups of a pr thread_num = n * ions_num threads_per_block = 256 blocks_per_grid = math.ceil(thread_num / threads_per_block) gpu_bin_map[blocks_per_grid, threads_per_block]( thread_num, cycle_num, idx_start_v, map_gpu_ms1["scan_seek_idx"], map_gpu_ms1["scan_im"], map_gpu_ms1["scan_mz"], map_gpu_ms1["scan_height"], map_gpu_ms2["scan_seek_idx"], map_gpu_ms2["scan_im"], map_gpu_ms2["scan_mz"], map_gpu_ms2["scan_height"], query_mz_m, ppm_tolerance, query_im_v, im_tolerance, map_im_gap, maps, ms1_ion_num, ) cuda.synchronize() # -1H, center, +H, +2H, total maps = utils.convert_numba_to_tensor(maps) pred_v, feature_v = [], [] for i in range(5): if i != 4: idx = [i, i + 4] + list(range(8 + i * 12, 20 + i * 12)) valid_ion_nums = 2 + df_input["fg_num"].values model = model_center else: idx = list(range(56)) valid_ion_nums = 4 * (2 + df_input["fg_num"].values) model = model_big maps_sub = maps[:, idx] valid_ion_nums = torch.from_numpy(valid_ion_nums).long().to(cfg.gpu_id) with torch.no_grad(): # with torch.cuda.amp.autocast(): feature, pred = model(maps_sub, valid_ion_nums) torch.cuda.synchronize() pred = torch.softmax(pred, 1) pred = pred[:, 1].cpu().numpy().astype(np.float32) feature = feature.cpu().numpy() pred_v.append(pred) feature_v.append(feature) return pred_v, feature_v