Source code for full_dia.utils

import argparse
import warnings
from pathlib import Path

import numpy as np
import pandas as pd
import torch
from numba import cuda, jit, prange
from sklearn.exceptions import ConvergenceWarning

from full_dia import __version__, cfg
from full_dia.log import Logger

warnings.filterwarnings(action="ignore", category=ConvergenceWarning)
warnings.filterwarnings(action="ignore", category=UserWarning)
import cupy as cp  # noqa: E402

try:
    _ = profile
except NameError:

[docs] def profile(func): return func
logger = Logger.get_logger()
[docs] @profile def release_gpu_scans(*map_gpus: dict) -> None: """ Release GPU-resident data and clear GPU memory. """ for map_gpu in map_gpus: del map_gpu["scan_rts"] del map_gpu["scan_seek_idx"] del map_gpu["scan_im"] del map_gpu["scan_mz"] del map_gpu["scan_height"] map_gpu.clear() del map_gpu del map_gpus # gc.collect() torch.cuda.empty_cache()
[docs] def convert_numba_to_tensor(x: cuda.cudadrv.devicearray.DeviceNDArray) -> torch.Tensor: """ Convert numba cuda array to torch cuda array with the help of cupy. """ x = cp.asarray(x).toDlpack() x = torch.from_dlpack(x) return x
[docs] def create_cuda_zeros( shape: tuple, dtype: torch.dtype = torch.float32 ) -> cuda.cudadrv.devicearray.DeviceNDArray: """ Create the Numba CUDA zero array with the help of pytorch. """ x = torch.zeros(shape, dtype=dtype, device=cfg.gpu_id) x = cuda.as_cuda_array(x) return x
[docs] def get_diann_info(path_ws): """ For developing. """ if not cfg.is_compare_mode: return df_diann = pd.read_csv(path_ws / "diann" / "report.tsv", sep="\t") df_diann = df_diann[df_diann["Q.Value"] < 0.01] df_diann["pr_id"] = df_diann["Modified.Sequence"] + df_diann[ "Precursor.Charge" ].astype(str) # rt rt_tol_diann = (df_diann["RT"] - df_diann["Predicted.RT"]).abs().max() * 60.0 info = "DIA-NN tol_rt: {:.2f}".format(rt_tol_diann) logger.info(info) # im im_tol_diann = (df_diann["IM"] - df_diann["Predicted.IM"]).abs().max() info = "DIA-NN tol_im: {:.4f}".format(im_tol_diann) logger.info(info) # ppm import re with open(path_ws / "diann" / "report.log.txt") as f: lines = f.readlines() for line in lines: if line.find("Recommended MS1 mass accuracy setting") > -1: pattern = r"\d+\.\d+" match = re.search(pattern, line) ppm_ms1 = match.group() logger.info("DIA-NN MS1 tol_ppm: {}".format(ppm_ms1)) if line.find("Optimised mass accuracy") > -1: pattern = r"\d+\.\d+" match = re.search(pattern, line) ppm_ms2 = match.group() logger.info("DIA-NN MS2 tol_ppm: {}".format(ppm_ms2)) if line.find("window radius") > -1: pw = int(line.split(" ")[-1]) logger.info("DIA-NN peak width: {}".format(pw)) if line.find("Training neural networks") > -1: x = " ".join(line.split(" ")[1:]) logger.info("DIA-NN " + x.strip()) if line.find("Number of IDs at 0.01 FDR") > -1: t = line.split(" ")[0][1:-1] ids = line.split(" ")[-1] logger.info(f"DIA-NN time: {t}") logger.info(f"DIA-NN 1% FDR prs: {ids}")
[docs] def cal_acc_recall( path_ws, df_input, diann_q_pr=None, diann_q_pro=None, diann_q_pg=None, alpha_q_pr=None, alpha_q_pro=None, alpha_q_pg=None, ): """ For developing. """ if not cfg.is_compare_mode: return df_alpha = df_input.copy() if "decoy" in df_alpha.columns: df_alpha = df_alpha[df_alpha["decoy"] == 0].reset_index(drop=True) # for diann pr df_diann = pd.read_csv(path_ws / "diann" / "report.tsv", sep="\t") if diann_q_pr is not None: df_diann_pr = df_diann[df_diann["Q.Value"] < diann_q_pr].copy() else: df_diann_pr = df_diann.copy() df_diann_pr["pr_id"] = df_diann_pr["Modified.Sequence"] + df_diann_pr[ "Precursor.Charge" ].astype(str) pr_diann = set(df_diann_pr["pr_id"]) # for full pr if alpha_q_pr is not None: df_alpha_pr = df_alpha[df_alpha.q_pr < alpha_q_pr].copy() else: df_alpha_pr = df_alpha.copy() # intersection df_cross_pr = df_alpha_pr[["pr_id", "measure_rt"]] df_cross_pr = df_cross_pr.merge(df_diann_pr, on="pr_id") rt_delta = (df_cross_pr.measure_rt - df_cross_pr.RT * 60.0).abs() df_cross_pr = df_cross_pr[rt_delta < cfg.locus_rt_thre] pr_cross_pr = set(df_cross_pr.pr_id) # recall and acc on pr level pr_alpha = set(df_alpha_pr["pr_id"]) pr_recall_2 = len(pr_cross_pr) / (len(pr_diann) + 1) pr_recall_1 = len(pr_diann & pr_alpha) / (len(pr_diann) + 1) pr_acc = len(pr_diann & pr_alpha) / (len(pr_alpha) + 1) pr_gain = (len(pr_alpha) - len(pr_diann)) / (len(pr_diann) + 1) info = ( "Df: {}, " "Prs: {}, " "Pr_gain: {:.2f}, " "pr_acc: {:.3f}, pr_recall_1: {:.3f}, pr_recall_2: {:.3f}".format( len(df_alpha), len(pr_alpha), pr_gain, pr_acc, pr_recall_1, pr_recall_2, ) ) # pro and pg if diann_q_pro or alpha_q_pro or diann_q_pg or alpha_q_pg: # protein df_diann_pro = df_diann[ (df_diann["Protein.Q.Value"] < diann_q_pro) & (df_diann["Proteotypic"] == 1) ] df_alpha_pro = df_alpha[ (df_alpha["q_pro"] < alpha_q_pro) & (df_alpha["proteotypic"] == 1) ].copy() pro_diann = set(df_diann_pro["Protein.Ids"]) pro_alpha = set(df_alpha_pro["protein_id"]) pro_recall = len(pro_diann & pro_alpha) / (len(pro_diann) + 1) pro_acc = len(pro_diann & pro_alpha) / (len(pro_alpha) + 1) pro_gain = (len(pro_alpha) - len(pro_diann)) / (len(pro_diann) + 1) # protein group df_diann_pg = df_diann[(df_diann["PG.Q.Value"] < diann_q_pg)] df_alpha_pg = df_alpha[(df_alpha["q_pg"] < alpha_q_pg)] pg_diann = set(df_diann_pg["Protein.Group"]) # raw pg_alpha = set(df_alpha_pg["protein_group"]) pg_recall = len(pg_diann & pg_alpha) / (len(pg_diann) + 1) pg_acc = len(pg_diann & pg_alpha) / (len(pg_alpha) + 1) pg_gain = (len(pg_alpha) - len(pg_diann)) / (len(pg_diann) + 1) info = ( "Prs: {}, " "Pr_gain: {:.2f}, " "pr_acc: {:.3f}, pr_recall_1: {:.3f}, pr_recall_2: {:.3f}, " "Pro_num: {}, " "Pro_gain: {:.2f}, " "pro_acc: {:.2f}, pro_recall: {:.2f}, " "Pg_num: {}, " "pg_gain: {:.2f}, " "pg_acc: {:.2f}, pg_recall: {:.2f}".format( len(pr_alpha), pr_gain, pr_acc, pr_recall_1, pr_recall_2, len(pro_alpha), pro_gain, pro_acc, pro_recall, len(pg_alpha), pg_gain, pg_acc, pg_recall, ) ) logger.info(info)
[docs] def save_as_pkl(df, fname): """ For developing. """ if cfg.is_compare_mode: df.to_pickle(cfg.dir_out_single / fname)
[docs] def clean_and_save( df_main: pd.DataFrame, df_other: pd.DataFrame, ws_single: Path ) -> None: """ Combine, clean and save the high-quality decoy and all target peptides identification information. Parameters ---------- df_main : pd.DataFrame High-quality decoy and target peptides. df_other : pd.DataFrame All target peptides. This will avoid the reextraction in global analysis. ws_single : Path The path to save file. """ cols_base = [ "pr_id", "pr_charge", "pr_index", "swath_id", "decoy", "locus", "measure_rt", "measure_im", ] cols = cols_base + [ "cscore_pr_run", "q_pr_run", ] df_main = df_main.loc[:, cols] cols = cols_base + ["score_ion_quant_" + str(i) for i in range(14)] cols += ["score_ion_sa_" + str(i) for i in range(14)] df_other = df_other.loc[:, cols] # assert set(df_main['pr_index']).issubset(set(df_other['pr_index'])) df = pd.merge(df_main, df_other, on=cols_base, how="outer") # dtype df["locus"] = df["locus"].astype(np.int32) cols_big = df.select_dtypes(include=[np.float64]).columns df[cols_big] = df[cols_big].astype(np.float32) output_file = cfg.dir_out_global / (ws_single.name + ".parquet") df.to_parquet(output_file)
[docs] def read_from_pq(ws_single: Path, cols: list | None = None) -> pd.DataFrame: """ Read .parquet file with specific columns. """ fname = cfg.dir_out_global / (ws_single.name + ".parquet") if cols is None: df = pd.read_parquet(fname) else: df = pd.read_parquet(fname, columns=cols) return df
[docs] @jit(nopython=True, nogil=True, parallel=True) def cal_group_rank(x, group_size_cumsum): """ Calculate group rank parallelly. """ item_num = len(x) rank = np.zeros(item_num, dtype=np.uint8) for i in prange(len(group_size_cumsum) - 1): start = group_size_cumsum[i] end = group_size_cumsum[i + 1] xx = x[start:end] rank[start:end] = np.argsort(np.argsort(-xx)) + 1 return rank
[docs] def move_all_zeros_end(a: np.ndarray) -> np.ndarray: """ Move all zero elements in the matrix to the end of rows. Based on http://stackoverflow.com/a/42859463/3293881 """ valid_mask = a != 0 flipped_mask = valid_mask.sum(1, keepdims=1) > np.arange(a.shape[1] - 1, -1, -1) flipped_mask = flipped_mask[:, ::-1] a[flipped_mask] = a[valid_mask] a[~flipped_mask] = 0 return a
[docs] def cal_sa_by_np(x: np.ndarray, y: np.ndarray) -> np.ndarray: """ Calculate the SA. The inputs have to be two-dimensions. """ norm_x = np.linalg.norm(x, axis=1) norm_y = np.linalg.norm(y, axis=1) norm_xy = norm_x * norm_y xy_sum = (x * y).sum(axis=1) sa = xy_sum / (norm_xy + 1e-7) sa = 1 - 2 * np.arccos(sa) / np.pi return sa
[docs] def convert_cols_to_diann(df: pd.DataFrame, ws_single: Path) -> pd.DataFrame: """ Convert local column names to DIA-NN's column names. """ df = df[df["decoy"] == 0].reset_index(drop=True) df["file_name"] = "/".join(ws_single.parts[-2:]) df["run"] = ws_single.stem cols_quant = ["score_ion_quant_" + str(i) for i in range(2 + cfg.fg_num)] tmp = np.round(df[cols_quant].values, 2).astype(str) df["ion_quant"] = [";".join(row) for row in tmp] cols_sa = ["score_ion_sa_" + str(i) for i in range(2 + cfg.fg_num)] tmp = np.round(df[cols_sa].values, 2).astype(str) df["ion_sa"] = [";".join(row) for row in tmp] df = df.rename( columns={ "file_name": "File.Name", "run": "Run", "protein_group": "Protein.Group", "protein_id": "Protein.Ids", "protein_name": "Protein.Names", "quant_pg_raw": "PG.Quantity.Raw", "quant_pg_deep": "PG.Quantity.Deep", "quant_pg_mix": "PG.Quantity.Mix", "pr_id": "Precursor.Id", "pr_charge": "Precursor.Charge", "q_pr_run": "Q.Value", "q_pr_global": "Global.Q.Value", "q_pg_run": "PG.Q.Value", "q_pg_global": "Global.PG.Q.Value", "proteotypic": "Proteotypic", "quant_pr_raw": "Precursor.Quantity.Raw", "quant_pr_deep": "Precursor.Quantity.Deep", "quant_pr_mix": "Precursor.Quantity.Mix", "measure_rt": "RT", "ion_sa": "Fragment.Correlations", "ion_quant": "Fragment.Quant.Raw", # 'cscore_pr_run': 'CScore', "measure_im": "IM", } ) df = df[ [ "File.Name", "Run", # PG "Protein.Group", "Protein.Ids", "Protein.Names", "PG.Q.Value", "Global.PG.Q.Value", "PG.Quantity.Raw", "PG.Quantity.Deep", "PG.Quantity.Mix", # Pr "Precursor.Id", "Precursor.Charge", "Proteotypic", "Q.Value", "Global.Q.Value", # 'CScore', "Precursor.Quantity.Raw", "Precursor.Quantity.Deep", "Precursor.Quantity.Mix", "Fragment.Quant.Raw", "Fragment.Correlations", "RT", "IM", ] ] return df
[docs] def get_args() -> argparse.Namespace: """ Parse command line arguments. """ parser = argparse.ArgumentParser("full_dia") # required=True parser.add_argument( "-ws", required=True, help="Specify the folder that contains .d files." ) parser.add_argument( "-lib", required=True, help="Specify the absolute path of a .speclib spectra library.", ) # optional parser.add_argument( "-out_name", type=str, default="full_dia", help="Specify the folder name of outputs. Default: full_dia.", ) parser.add_argument( "-gpu_id", type=int, default=0, help="Specify the GPU-ID (e.g. 0, 1, 2) which will be used. Default: 0", ) parser.add_argument( "-low_memory", action="store_true", help="Specify whether running in low memory mode. Default: False", ) parser.add_argument( "-overwrite", action="store_true", help="Specify whether overwrite the existing run-specific analysed files. Default: False", ) # develop parser.add_argument( "-cfg_develop", type=str, default=None, help="Developing use. Specify the developing cfg .yaml path", ) parser.add_argument( "-compare", action="store_true", help="Developing use. Default: False" ) # process params args = parser.parse_args() return args
[docs] def init_gpu_params(gpu_id: int) -> None: """ Initialize GPU params according to the GPU ID: 1. for pytorch 2. for numba.cuda 3. Empirically adjust the batch size for GPU code. """ torch.manual_seed(666) cfg.gpu_id = torch.device("cuda:" + str(gpu_id)) cfg.device_name = torch.cuda.get_device_name(gpu_id) torch.backends.cudnn.benchmark = True from numba import cuda cuda.select_device(gpu_id) # xic extraction occupied the GPU memory ratio if "4090" in cfg.device_name: cfg.batch_xic_seed = 5000 cfg.batch_xic_locus = cfg.batch_xic_seed * 5 cfg.batch_deep_center = 10000 cfg.batch_deep_big = 5000 else: cfg.batch_xic_seed = 4000 cfg.batch_xic_locus = cfg.batch_xic_seed * 5 cfg.batch_deep_center = 10000 cfg.batch_deep_big = 2000
[docs] def check_run_info(args: argparse.Namespace) -> None: """ Print run info: version, platform, time, cpu, memory, gpu, cmd. """ # show version and platform import platform logger.info(f"Full-DIA (v{__version__}) on {platform.system()} OS") # show time from datetime import datetime logger.info(f'Time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}') # show cpu import psutil cpu_num = psutil.cpu_count() cpu_frq = psutil.cpu_freq().max / 1000 logger.info(f"CPU: {cpu_num}cores, Frequency: {cpu_frq:.1f}GHz") # show memory total = psutil.virtual_memory().total / 1024**3 free = psutil.virtual_memory().available / 1024**3 logger.info(f"RAM: {free:.0f}G/{total:.0f}G in free/total") if total < 15: raise RuntimeError( f"Insufficient memory: {total:.0f} GB available, > 16 GB required." ) # show GPU i = args.gpu_id gpu_name = torch.cuda.get_device_name(i) free, total = cuda.current_context().get_memory_info() free, total = free / 1024**3, total / 1024**3 logger.info(f"GPU: {gpu_name}-{i}, {free:.0f}G/{total:.0f}G in free/total") if free < 3: raise RuntimeError( f"Insufficient GPU memory: {total:.0f} GB available, > 4 GB required." ) # show cmd import sys logger.info(f'CMD: {" ".join(sys.argv)}')
[docs] def init_multi_ws(ws_global: Path, out_name: str) -> None: """ Initialize the paths of .d files and the output folder. """ # output for global cfg.ws_global = ws_global cfg.dir_out_name = out_name cfg.dir_out_global = ws_global / out_name cfg.dir_out_global.mkdir(exist_ok=True) multi_ws = [] if ws_global.suffix == ".d": multi_ws.append(ws_global) else: for ws_i in ws_global.rglob("*.d"): if ws_i.is_dir(): multi_ws.append(ws_i) cfg.multi_ws = multi_ws cfg.file_num = len(cfg.multi_ws) if cfg.file_num < 2: raise ValueError( "At least two .d files are required for Full-DIA. " f"Got {cfg.file_num} file(s)." )
[docs] def init_single_ws(ws_i: int, total: int, ws_single: Path) -> None: """ Initialize the output path of single .d file. """ cfg.ws_single = Path(ws_single) cfg.dir_out_single = Path(ws_single) / cfg.dir_out_name if cfg.is_compare_mode: cfg.dir_out_global.mkdir(exist_ok=True) logger.info(f"================Run: {ws_i+1}/{total}================") logger.info(f".d: {str(ws_single.name)}")
[docs] def cross_cos(x): """ For developing. """ norms = np.linalg.norm(x, axis=1) + 1e-6 normalized_x = x / norms[:, np.newaxis] cosine_sim = np.dot(normalized_x, normalized_x.T) return cosine_sim
[docs] @jit(nopython=True, nogil=True, parallel=True) def interp_xics(xics, rts, target_dim): """ For developing. """ n_pep, n_ion, n_cycle = xics.shape result_xics = np.zeros((n_pep, n_ion, target_dim)) result_rts = np.zeros((n_pep, target_dim)) for i_pep in prange(n_pep): rts_pep = rts[i_pep] rts_interp = np.linspace(rts_pep[0], rts_pep[-1], target_dim) result_rts[i_pep] = rts_interp for i_ion in range(n_ion): xic_raw = xics[i_pep, i_ion] xic_interp = np.interp(rts_interp, rts_pep, xic_raw) result_xics[i_pep, i_ion] = xic_interp return result_rts, result_xics