Source code for full_dia.polish

import numpy as np
import pandas as pd
from numba import jit

from full_dia import cfg, utils
from full_dia.log import Logger

logger = Logger.get_logger()

try:
    _ = profile
except NameError:

[docs] def profile(func): return func
[docs] @jit(nopython=True, nogil=True) def is_fg_share(fg_mz_1, fg_mz_2, tol_ppm): """ Calculate how many ions in fg_mz_1 are matched to fg_mz_2. """ x, y = fg_mz_1.reshape(-1, 1), fg_mz_2.reshape(1, -1) delta_mz = np.abs(x - y) ppm = delta_mz / (x + 1e-7) * 1e6 ppm_b = ppm < tol_ppm is_share_x = np.array([ppm_b[i, :].any() for i in range(len(ppm_b))]) is_share_x = is_share_x & (fg_mz_1 > 0) return is_share_x
[docs] @jit(nopython=True, nogil=True, parallel=False) def polish_prs_core( swath_id_v, measure_locus_v, measure_im_v, fg_mz_m, tol_locus, tol_im, tol_ppm, sa_m ): """ The big fish eats the small fish. If a fg ion shared by more confident pr, the sa and fg_mz will be zeros. """ for i in range(len(swath_id_v)): swath_id_i = swath_id_v[i] measure_locus_i = measure_locus_v[i] measure_im_i = measure_im_v[i] fg_mz_i = fg_mz_m[i] for j in range(i + 1, len(swath_id_v)): swath_id_j = swath_id_v[j] if swath_id_i != swath_id_j: break measure_locus_j = measure_locus_v[j] if abs(measure_locus_i - measure_locus_j) > tol_locus: continue measure_im_j = measure_im_v[j] if abs(measure_im_i - measure_im_j) > tol_im: continue sa_i = sa_m[i] fg_mz_j = fg_mz_m[j] is_share_v = is_fg_share(fg_mz_j, fg_mz_i, tol_ppm) is_share_v = is_share_v & (sa_i > 0) for jj in np.where(is_share_v)[0]: sa_m[j, jj] = 0 fg_mz_m[j, jj] = 0 return sa_m, fg_mz_m
[docs] def polish_prs( df_input: pd.DataFrame, tol_im: float = 0.03, tol_ppm: float = 20, tol_sa_ratio: float = 0.75, tol_share_num: int = 5, ) -> pd.DataFrame: """ As individual DIA signals can be shared among multiple peptides, an additional post-processing step is required to refine the results. Parameters ---------- df_input : pd.DataFrame Provide the pr identification results. tol_im : float, default = 0.03 If the bias im of two peps falls in this tolerance, they are competitors. tol_ppm : float, default = 20 If the ppm of two peps falls in this tolerance, they are competitors. tol_sa_ratio : float, default = 0.75 If all fragment ions of a peptide have SA values above the threshold and are more likely to originate from more confident peptides, the peptide is removed. tol_share_num : int, default=5 If all fragment ions of a peptide have matched number above the threshold and are more likely to originate from more confident peptides, the peptide is removed. Returns ------- df : pd.DataFrame The polished df. """ df_input_target = df_input[df_input["decoy"] == 0].reset_index(drop=True) df_input_decoy = df_input[df_input["decoy"] == 1].reset_index(drop=True) df_v = [] for df_target in [df_input_target, df_input_decoy]: target_num_before = len(df_target) # process I/L peptideform df_target["pr_IL"] = df_target["pr_id"].replace( ["I", "L"], ["x", "x"], regex=True ) idx_max = df_target.groupby("pr_IL")["cscore_pr_run"].idxmax() polish_IL_num = len(df_target) - len(idx_max) df_target = df_target.loc[idx_max].reset_index(drop=True) del df_target["pr_IL"] # tol_locus is from the half of span spans = df_target.loc[df_target["q_pr_run"] < 0.01, "score_elute_span"] tol_locus = np.ceil(0.5 * spans.median()) df_target = df_target.sort_values( by=["swath_id", "cscore_pr_run"], ascending=[True, False], ignore_index=True ) swath_id_v = df_target["swath_id"].values measure_locus_v = df_target["locus"].values measure_im_v = df_target["measure_im"].values cols_center = ["fg_mz_" + str(i) for i in range(cfg.fg_num)] fg_mz_center = df_target[cols_center].values cols_center = ["score_ion_sa_" + str(i) for i in range(2, 14)] sa_center = df_target[cols_center].values fg_mz_m = np.concatenate([fg_mz_center], axis=1) fg_mz_m = np.ascontiguousarray(fg_mz_m) fg_mz_m_raw = fg_mz_m.copy() sa_m = np.concatenate([sa_center], axis=1) sa_m = np.ascontiguousarray(sa_m) sa_m_raw = sa_m.copy() sa_m, fg_mz_m = polish_prs_core( swath_id_v, measure_locus_v, measure_im_v, fg_mz_m, tol_locus, tol_im, tol_ppm, sa_m, ) # screen nonshare_ratio = sa_m.sum(axis=1) / (1e-6 + sa_m_raw.sum(axis=1)) good_condition1 = nonshare_ratio > tol_sa_ratio good_condition2 = (fg_mz_m_raw > fg_mz_m).sum(axis=1) < tol_share_num good_idx = good_condition1 & good_condition2 df_target = df_target.iloc[good_idx] polish_bad_num = len(good_idx) - sum(good_idx) info = "Removing dubious prs: {}-{}-{}={}".format( target_num_before, polish_IL_num, polish_bad_num, len(df_target) ) logger.info(info) df_v.append(df_target) df = pd.concat(df_v, ignore_index=True) if cfg.is_compare_mode: utils.cal_acc_recall(cfg.ws_single, df[df["decoy"] == 0], diann_q_pr=0.01) return df
[docs] @jit(nopython=True, nogil=True, parallel=False) def make_interference_areas_zero_core( swath_id_v, measure_locus_v, measure_im_v, fg_mz_m, area_m, sa_m, tol_locus, tol_im, tol_ppm, other_idx, ): """ The big fish eats the small fish. If a fg ion shared by more confident pr, the sa and fg_mz will be zeros. """ for i in range(other_idx): swath_id_i = swath_id_v[i] measure_locus_i = measure_locus_v[i] measure_im_i = measure_im_v[i] fg_mz_i = fg_mz_m[i] for j in range(other_idx, len(swath_id_v)): swath_id_j = swath_id_v[j] if swath_id_i != swath_id_j: continue measure_locus_j = measure_locus_v[j] if abs(measure_locus_i - measure_locus_j) > tol_locus: continue measure_im_j = measure_im_v[j] if abs(measure_im_i - measure_im_j) > tol_im: continue fg_mz_j = fg_mz_m[j] is_share_v = is_fg_share(fg_mz_j, fg_mz_i, tol_ppm) for jj in np.where(is_share_v)[0]: sa_m[j, jj] = 0 area_m[j, jj] = 0 return area_m, sa_m
[docs] def make_interference_areas_zero( df_input: pd.DataFrame, tol_locus: int = 3, tol_im: float = 0.03, tol_ppm: float = 20, ) -> pd.DataFrame: """ In global analysis, if a fragment ion maybe produced by a more confident pr, its area and SA score will be made to zeros. Parameters ---------- df_input : pd.DataFrame Provide the identification info on the run level. tol_locus : int, default = 3 If the bias locus of two peps falls in this tolerance, they are competitors. tol_im : float, default = 0.03 If the bias im of two peps falls in this tolerance, they are competitors. tol_ppm : float, default = 20 If the ppm of two fragment ions falls in this tolerance, they are competitors. Returns ------- df : pd.DataFrame The intensities and SA values of fragment ions that lose the competition are set to zero. """ df_target = df_input[df_input["decoy"] == 0].copy() df_decoy = df_input[df_input["decoy"] == 1].copy() df_target = df_target.sort_values( by=["cscore_pr_run"], ascending=[False], ignore_index=True, # 'cscore_pr_global ) # NA is positing end swath_id_v = df_target["swath_id"].values measure_locus_v = df_target["locus"].values measure_im_v = df_target["measure_im"].values cols_center = ["fg_mz_" + str(i) for i in range(cfg.fg_num)] fg_mz_m = df_target[cols_center].values fg_mz_m = np.ascontiguousarray(fg_mz_m) cols_quant = ["score_ion_quant_" + str(i) for i in range(2, 14)] area_m = df_target[cols_quant].values area_m = np.ascontiguousarray(area_m) cols_sa = ["score_ion_sa_" + str(i) for i in range(2, 14)] sa_m = df_target[cols_sa].values sa_m = np.ascontiguousarray(sa_m) other_idx = np.where(df_target["cscore_pr_run"].isna())[0][0] area_m, sa_m = make_interference_areas_zero_core( swath_id_v, measure_locus_v, measure_im_v, fg_mz_m, area_m, sa_m, tol_locus, tol_im, tol_ppm, other_idx, ) bad_idx = (area_m == 0) | (sa_m == 0) area_m[bad_idx] = 0.0 sa_m[bad_idx] = 0.0 df_target[cols_quant] = area_m df_target[cols_sa] = sa_m df = pd.concat([df_target, df_decoy], ignore_index=True) return df