import warnings
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import statsmodels.api as sm
from matplotlib import MatplotlibDeprecationWarning
from scipy.interpolate import interp1d
from scipy.stats import gaussian_kde
from full_dia import cfg, tims, utils
from full_dia.log import Logger
warnings.filterwarnings("ignore", category=MatplotlibDeprecationWarning)
try:
_ = profile
except NameError:
[docs]
def profile(func):
return func
logger = Logger.get_logger()
[docs]
def calib_rt(df_seed: pd.DataFrame, df_lib: pd.DataFrame) -> tuple:
"""
Fit RT from iRT to real RT based on df_seed, then update the RT in df_lib.
Parameters
----------
df_seed : pd.DataFrame
Columns: 'simple_seq', 'locus', 'score_deep', 'pred_irt', 'measure_rt'.
df_lib :
Columns: 'pred_irt'.
Returns
-------
tuple
df : pd.DataFrame
df_seed with 'pred_rt' and bias_rt will less than tolerance.
df_lib : pd.DataFrame
Add a new column 'pred_rt'.
"""
idx_max = df_seed.groupby(["locus"])["score_deep"].idxmax()
df = df_seed.loc[idx_max].reset_index(drop=True)
idx_max = df.groupby(["pred_irt"])["score_deep"].idxmax()
df = df.loc[idx_max].reset_index(drop=True)
idx_max = df.groupby("simple_seq")["score_deep"].idxmax()
df = df.loc[idx_max].reset_index(drop=True)
x = df["pred_irt"].values
y = df["measure_rt"].values
idx = np.argsort(x)
x0 = x[idx]
y0 = y[idx]
if cfg.is_compare_mode:
np.savez(cfg.dir_out_single / "update_rt.npz", x=x, y=y)
# Calib-RT
x1, y1, _ = screen_by_hist(x0, y0, bins=100)
x2, y2 = screen_by_graph(x1, y1)
x3, y3 = polish_ends(x2, y2, tol_bins=5)
confident_range = (x0 > x3.min()) & (x0 < x3.max())
x0, y0 = x0[confident_range], y0[confident_range]
x_fit, y_fit = fit_by_lowess(x3, y3, frac=0.2)
f = interp1d(x_fit, y_fit, kind="cubic", fill_value="extrapolate")
tol_turn = cal_turning_point(y0, f(x0))
y1_hat = f(x1)
good_idx = np.abs(y1 - y1_hat) < 1.5 * tol_turn
x11, y11 = x1[good_idx], y1[good_idx]
x_fit, y_fit = fit_by_lowess(x11, y11, frac=0.2)
f = interp1d(x_fit, y_fit, kind="cubic", fill_value="extrapolate")
# tol is for df_seed, tol_rt is for global extraction
tol_turn = cal_turning_point(y0, f(x0))
tol_ratio = cfg.tol_rt
cfg.tol_rt = max(tol_ratio, tol_turn)
info = "tol_rt, by ratio: {:.2f}, by seed: {:.2f}, pre-select: {:.2f}".format(
tol_ratio, tol_turn, cfg.tol_rt
)
logger.info(info)
# pred and screen for df_seed
x_new = df["pred_irt"].values
y_new = f(x_new).astype(np.float32)
df["pred_rt"] = y_new
df["bias_rt"] = df["pred_rt"] - df["measure_rt"]
df = df[df["bias_rt"].abs() < tol_turn]
df = df[(df["measure_pr_mz"] > 0) & (df["measure_im"] > 0)]
df = df.reset_index(drop=True)
bias = df["pred_rt"].values - df["measure_rt"].values
info = "Calib RT/IM/MZ by #seed: {}".format(len(df))
logger.info(info)
utils.cal_acc_recall(cfg.ws_single, df, diann_q_pr=0.01)
# pred for df_lib
x_new = df_lib["pred_irt"].values
y_new = f(x_new).astype(np.float32)
y_new[y_new < 0.0] = 0.0
df_lib["pred_rt"] = y_new
if cfg.is_compare_mode:
plot_fit_rt(
x,
y,
x1,
y1,
x11,
y11,
x_fit,
y_fit,
cfg.tol_rt,
bias,
fname="update_info_rt",
)
cal_rt_recall(cfg.ws_single, df_lib, cfg.tol_rt)
return df, df_lib
[docs]
def calib_im(df_tol: pd.DataFrame, df_lib: pd.DataFrame) -> tuple:
"""
Fit IM from iIM to real IM based on df_tol, then update the IM in df_lib.
Parameters
----------
df_tol : pd.DataFrame
Columns: 'score_deep', 'pred_iim', 'pred_im', 'measure_im'.
df_lib :
Columns: 'pred_iim'.
Returns
-------
tuple
df_tol : pd.DataFrame
df_tol with 'pred_im' and bias_im will less than tolerance.
df_lib : pd.DataFrame
Updated 'pred_im'.
"""
cal_im_recall(cfg.ws_single, df_lib, cfg.tol_im_xic)
cal_rt_im_recall(cfg.ws_single, df_lib, cfg.tol_rt, cfg.tol_im_xic)
idx_max = df_tol.groupby("pred_iim")["score_deep"].idxmax()
df_tol = df_tol.loc[idx_max].reset_index(drop=True)
x = df_tol["pred_iim"].values
y_measure = df_tol["measure_im"].values
y_pred_before = df_tol["pred_im"].values
bias_before = y_measure - y_pred_before
idx = np.argsort(x)
x, y_pred_before = x[idx], y_pred_before[idx]
y_measure, bias_before = y_measure[idx], bias_before[idx]
# lowess
lowess = sm.nonparametric.lowess
frac = 0.1
y_lowess = lowess(y_measure, x, frac=frac)
x_fit, y_fit = zip(*y_lowess)
x_fit, y_fit = np.array(x_fit), np.array(y_fit)
f = interp1d(x_fit, y_fit, kind="cubic", fill_value="extrapolate")
# 3σ
y_pred = f(x)
bias = y_measure - y_pred
bias_std = (bias - bias.mean()) / bias.std()
good_idx = bias_std < 3.0
x_good = x[good_idx]
y_measure_good = y_measure[good_idx]
y_pred_before = y_pred_before[good_idx]
bias_before = bias_before[good_idx]
y_pred_after = f(x_good)
bias_after = y_measure_good - y_pred_after
# cfg.tol_im = np.abs(bias_after).max()
# info = 'updated tol_im: {:.4f}'.format(cfg.tol_im)
# logger.info(info)
# logger.info('Keep tol_im: 0.05')
# pred and screen for df_seed
pred_ims = f(df_tol["pred_iim"].values).astype(np.float32)
df_tol["pred_im"] = pred_ims
df_tol["bias_im"] = (df_tol["pred_im"] - df_tol["measure_im"]).abs()
df_tol = df_tol[df_tol["bias_im"] < cfg.tol_im_xic]
df_tol = df_tol.reset_index(drop=True)
# pred for df_lib
pred_ims = f(df_lib["pred_iim"].values).astype(np.float32)
df_lib["pred_im"] = pred_ims
if cfg.is_compare_mode:
plot_fit_im(
y_measure_good,
y_pred_before,
y_pred_after,
x_fit,
y_fit,
bias_before,
bias_after,
fname="update_info_im",
)
cfg.tol_im_xic = cfg.tol_im_xic_after_calib
cal_im_recall(cfg.ws_single, df_lib, cfg.tol_im_xic)
cal_rt_im_recall(cfg.ws_single, df_lib, cfg.tol_rt, cfg.tol_im_xic)
return df_tol, df_lib
[docs]
@profile
def calib_mz(df_seed: pd.DataFrame, ms: tims.Tims) -> pd.DataFrame:
"""
Fit m/z and update the measured m/z values.
Parameters
----------
df_seed : pd.DataFrame
Columns: 'score_deep', 'measure_pr_mz', 'pr_mz'.
ms : tims.Tims
Save the raw measured m/z values.
Returns
-------
df_seed : pd.DataFrame
Nothing new to df_seed.
"""
idx_max = df_seed.groupby("measure_pr_mz")["score_deep"].idxmax()
df = df_seed.loc[idx_max].reset_index(drop=True)
x = df["measure_pr_mz"].values
y = df["pr_mz"].values
idx = np.argsort(x)
x = x[idx]
y = y[idx]
bias_old = (x - y) * 1000000.0 / y
lowess = sm.nonparametric.lowess
frac = 0.1
y_lowess = lowess(y, x, frac=frac)
x_fit, y_fit = zip(*y_lowess)
x_fit, y_fit = np.array(x_fit), np.array(y_fit)
f = interp1d(x_fit, y_fit, kind="cubic", fill_value="extrapolate")
y_pred = f(x)
bias = (y - y_pred) * 1000000.0 / y
bias_std = (bias - bias.mean()) / bias.std()
good_idx = np.abs(bias_std) < 3.0
x_good = x[good_idx]
y_good = y[good_idx]
y_pred = y_pred[good_idx]
bias_old = bias_old[good_idx]
bias_after = bias[good_idx]
# update tol_ppm
# cfg.tol_ppm = np.abs(bias_after).max()
# info = 'updated tol_ppm: {:.2f}'.format(cfg.tol_ppm)
# logger.info(info)
# logger.info('Keep tol_ppm: 20')
if cfg.is_compare_mode:
plot_fit_mz(
x_good,
y_good,
y_pred,
y_good,
x_fit,
y_fit,
bias_old,
bias_after,
fname="update_info_mz",
)
# update
for swath_id in range(len(ms.get_dia_quadrupole())):
if swath_id == 0:
continue
for ms_type in ["ms1", "ms2"]:
if ms_type == "ms1":
ms_map = ms.d_ms1_maps[swath_id]
else:
ms_map = ms.d_ms2_maps[swath_id]
(
all_rt,
cycle_valid_lens,
all_push,
all_tof,
all_height,
cycle_valid_lens2,
all_push2,
all_tof2,
all_height2,
) = ms_map
all_tof = f(all_tof)
all_tof = all_tof.astype(np.float32)
all_tof2 = f(all_tof2)
all_tof2 = all_tof2.astype(np.float32)
ms_map = (
all_rt,
cycle_valid_lens,
all_push,
all_tof,
all_height,
cycle_valid_lens2,
all_push2,
all_tof2,
all_height2,
)
if ms_type == "ms1":
ms.d_ms1_maps[swath_id] = ms_map
else:
ms.d_ms2_maps[swath_id] = ms_map
return df_seed
[docs]
def screen_by_hist(x_data: np.ndarray, y_data: np.ndarray, bins: int) -> tuple:
"""
From Calib-RT algorithm: https://doi.org/10.1093/bioinformatics/btae417
"""
# find high density points based on hist
extenti = (x_data.min(), x_data.max())
extentj = (y_data.min(), y_data.max())
hist, edges_x, edges_y = np.histogram2d(
x_data, y_data, bins=bins, range=(extenti, extentj)
)
edges_x = (edges_x[:-1] + edges_x[1:]) / 2
edges_y = (edges_y[:-1] + edges_y[1:]) / 2
cell_idXy = np.stack(
(np.arange(hist.shape[0]), hist.argmax(axis=1)), axis=-1
) # x -> max(y)
cell_idYx = np.stack(
(hist.argmax(axis=0), np.arange(hist.shape[1])), axis=-1
) # y -> max(x)
cell_idxy = np.vstack((cell_idXy, cell_idYx))
cell_idxy = np.unique(cell_idxy, axis=0)
cell_idxy = cell_idxy[~(cell_idxy == 0).any(axis=1)] # remove 0
x_hist = edges_x[cell_idxy[:, 0]]
y_hist = edges_y[cell_idxy[:, 1]]
cell_counts = hist[tuple(cell_idxy.T)]
return x_hist, y_hist, cell_counts
[docs]
def screen_by_graph(x_screen1: np.ndarray, y_screen1: np.ndarray) -> tuple:
"""
From Calib-RT algorithm: https://doi.org/10.1093/bioinformatics/btae417
"""
# find the longest length path
G = nx.DiGraph()
for i in range(len(x_screen1)):
x_curr, y_curr = x_screen1[i], y_screen1[i]
candidates_idx = (x_screen1 >= x_curr) & (y_screen1 >= y_curr)
candidates_idx[i] = False # no self-cycle
x_candidates = x_screen1[candidates_idx]
y_candidates = y_screen1[candidates_idx]
candidates = [
((x_curr, y_curr), (x, y)) for x, y in zip(x_candidates, y_candidates)
]
G.add_edges_from(candidates)
# maybe exist multiple longest paths
longest_path = nx.dag_longest_path(G)
x_screen2, y_screen2 = zip(*longest_path)
x_screen2, y_screen2 = np.array(x_screen2), np.array(y_screen2)
return x_screen2, y_screen2
[docs]
def polish_ends(x_screen2: np.ndarray, y_screen2: np.ndarray, tol_bins: int) -> tuple:
"""
From Calib-RT algorithm: https://doi.org/10.1093/bioinformatics/btae417
"""
# left
center_idx = int(len(x_screen2) / 2)
x, y = x_screen2[:center_idx], y_screen2[:center_idx]
stepx = x[1:] - x[:-1]
good_x = (stepx / stepx[stepx > 0].min()) < tol_bins
stepy = y[1:] - y[:-1]
good_y = (stepy / stepy[stepy > 0].min()) < tol_bins
good_xy = good_x & good_y
breaks_idx = np.where(~good_xy)[0]
break_idx = 0
if len(breaks_idx) > 0:
idx = np.where(breaks_idx < len(x) * 0.25)[0]
if len(idx) > 0:
break_idx = breaks_idx[idx][-1] + 1
x_left, y_left = x[break_idx:], y[break_idx:]
# right
x, y = x_screen2[center_idx:], y_screen2[center_idx:]
stepx = x[1:] - x[:-1]
good_x = (stepx / stepx[stepx > 0].min()) < tol_bins
stepy = y[1:] - y[:-1]
good_y = (stepy / stepy[stepy > 0].min()) < tol_bins
good_xy = good_x & good_y
breaks_idx = np.where(~good_xy)[0]
break_idx = len(x)
if len(breaks_idx) > 0:
idx = np.where(breaks_idx > len(x) * 0.75)[0]
if len(idx) > 0:
break_idx = breaks_idx[idx][0] + 1
x_right, y_right = x[:break_idx], y[:break_idx]
x = np.concatenate([x_left, x_right])
y = np.concatenate([y_left, y_right])
return x, y
[docs]
def fit_by_lowess(x: np.ndarray, y: np.ndarray, frac: float) -> tuple:
"""
Perform lowesss fit on x and y arrays using frac value.
"""
lowess = sm.nonparametric.lowess
y_lowess = lowess(y, x, frac=frac)
x_fit, y_fit = zip(*y_lowess)
x_fit, y_fit = np.array(x_fit), np.array(y_fit)
# drop duplicates
_, idx = np.unique(x_fit, return_index=True)
x_fit, y_fit = x_fit[idx], y_fit[idx]
return x_fit, y_fit
[docs]
def cal_turning_point(y_data: np.ndarray, y_pred: np.ndarray) -> float:
"""
Determine the tolerance corresponding to the elbow point from the tolerance–coverage curve.
See https://stackoverflow.com/questions/2018178/finding-the-best-trade-off-point-on-a-curve
"""
tol_rt_v = np.arange(1.0, int(0.5 * y_data.max()), 0.1)
cover_num_v = []
for tol_rt in tol_rt_v:
bias = y_pred - y_data
cover_num = np.sum(np.abs(bias) < tol_rt)
cover_num_v.append(cover_num)
cover_nums = np.array(cover_num_v)
curve = cover_nums
n_points = len(curve)
all_coord = np.vstack((tol_rt_v, curve)).T
first_point = all_coord[0]
line_vec = all_coord[-1] - all_coord[0]
line_vec_norm = line_vec / np.sqrt(np.sum(line_vec**2))
vec_from_first = all_coord - first_point
scalar_product = np.sum(
vec_from_first * np.tile(line_vec_norm, (n_points, 1)), axis=1
)
vec_from_first_parallel = np.outer(scalar_product, line_vec_norm)
vec_to_line = vec_from_first - vec_from_first_parallel
dist_to_line = np.sqrt(np.sum(vec_to_line**2, axis=1))
best_idx = np.argmax(dist_to_line)
tol_rt = tol_rt_v[best_idx]
return tol_rt
[docs]
def plot_fit_rt(x, y, x1, y1, x11, y11, x_fit, y_fit, tol_rt, bias, fname):
"""
For developing.
"""
plt.clf()
fig, ax = plt.subplots(2, 1)
# raw points
ax[0].plot(x, y, "bo", label="Raw", markersize=1.0)
# high density points
ax[0].plot(x1, y1, "go", label="Hist-Global", markersize=1.0)
# path points
ax[0].plot(x11, y11, "ro", label="Hist-Local", markersize=1.0)
# fit
ax[0].plot(x_fit, y_fit, label="Fit")
ax[0].plot(x_fit, y_fit + tol_rt, label="Fit+")
ax[0].plot(x_fit, y_fit - tol_rt, label="Fit-")
# bias
ax[1].hist(bias, bins=20, label="Bias of seeds")
ax[1].axvline(x=tol_rt)
for a in ax:
a.legend()
plt.tight_layout()
plt.savefig(cfg.dir_out_single / (fname + ".png"), bbox_inches="tight")
[docs]
def plot_fit_im(
y_measure, y_pred_before, y_pred_after, xout, yout, bias_old, bias, fname
):
"""
For developing.
"""
plt.clf()
fig, ax = plt.subplots(5, 1)
# before
x = y_pred_before
y = y_measure - x
xy = np.vstack([x, y])
z = gaussian_kde(xy)(xy)
idx = z.argsort()
x, y, z = x[idx], y[idx], z[idx]
xlim = (x.min(), x.max())
ax[0].scatter(x, y, s=1, c=z, label="Data points")
ax[0].set_xlim(xlim)
ax[0].set_ylim((-0.05, 0.05))
# after
x = y_pred_after
y = y_measure - x
xy = np.vstack([x, y])
z = gaussian_kde(xy)(xy)
idx = z.argsort()
x, y, z = x[idx], y[idx], z[idx]
ax[1].scatter(x, y, s=1, c=z, label="Good Points")
ax[1].set_xlim(xlim)
ax[1].set_ylim((-0.05, 0.05))
# Deviation trend line
ax[2].plot(xout, (yout - xout), markersize=1.0)
# bias before
xlim = (-0.05, 0.05)
ax[3].hist(bias_old, bins=20)
ax[3].set_xlim(xlim)
# bias after
ax[4].hist(bias, bins=20)
ax[4].set_xlim(xlim)
# plt.legend()
plt.tight_layout()
plt.savefig(cfg.dir_out_single / (fname + ".png"), bbox_inches="tight")
[docs]
def plot_fit_mz(x1, y1, x2, y2, x_fit, y_fit, bias_old, bias, fname):
"""
For developing.
"""
plt.clf()
fig, ax = plt.subplots(5, 1)
# data before
x = x1
y = (y1 - x1) * 1e6 / y1
xy = np.vstack([x, y])
z = gaussian_kde(xy)(xy)
idx = z.argsort()
x, y, z = x[idx], y[idx], z[idx]
xlim = (x.min(), x.max())
ax[0].scatter(x, y, s=1, c=z, label="Data points")
ax[0].set_xlim(xlim)
ax[0].set_ylim((-20, 20))
# data after
x = x2
y = (y2 - x2) * 1e6 / y1
xy = np.vstack([x, y])
z = gaussian_kde(xy)(xy)
idx = z.argsort()
x, y, z = x[idx], y[idx], z[idx]
ax[1].scatter(x, y, s=1, c=z, label="Data points")
ax[1].set_xlim(xlim)
ax[1].set_xlim(xlim)
ax[1].set_ylim((-20, 20))
# trend line
ax[2].plot(x_fit, (y_fit - x_fit) * 1e6 / y_fit, label="Fit line")
xlim = (bias_old.min(), bias_old.max())
ax[3].hist(bias_old, bins=20)
ax[3].set_xlim(xlim)
ax[4].hist(bias, bins=20)
ax[4].set_xlim(xlim)
# plt.legend()
plt.tight_layout()
plt.savefig(cfg.dir_out_single / (fname + ".png"), bbox_inches="tight")
[docs]
def cal_rt_recall(ws, df_lib, tol_rt):
"""
For developing.
"""
if not cfg.is_compare_mode:
return
df_diann = pd.read_csv(ws / "diann" / "report.tsv", sep="\t")
df_diann = df_diann[df_diann["Q.Value"] < 0.01]
df_diann["pr_id"] = df_diann["Modified.Sequence"] + df_diann[
"Precursor.Charge"
].astype(str)
df_diann["RT"] = df_diann["RT"] * 60.0
df_diann = df_diann[["pr_id", "RT"]]
if "group_rank" in df_lib.columns:
df = df_diann.merge(df_lib[df_lib.group_rank == 1], on="pr_id", how="left")
else:
df = df_diann.merge(df_lib, on="pr_id", how="left")
df["bias"] = (df["pred_rt"] - df["RT"]).abs()
good_num = (df["bias"][~df["bias"].isna()] < tol_rt).sum()
recall = good_num / len(df_diann)
info = "Based on the pred_rt, recall is: {:.3f}".format(recall)
logger.info(info)
[docs]
def cal_im_recall(ws, df_lib, tol_im):
"""
For developing.
"""
if not cfg.is_compare_mode:
return
df_diann = pd.read_csv(ws / "diann" / "report.tsv", sep="\t")
df_diann = df_diann[df_diann["Q.Value"] < 0.01]
df_diann["pr_id"] = df_diann["Modified.Sequence"] + df_diann[
"Precursor.Charge"
].astype(str)
df_diann = df_diann[["pr_id", "IM"]]
if "group_rank" not in df_lib.columns:
df = df_diann.merge(df_lib, on="pr_id", how="left")
else:
df = df_diann.merge(df_lib[df_lib.group_rank == 1], on="pr_id", how="left")
df["bias"] = (df["pred_im"] - df["IM"]).abs()
good_num = (df["bias"][~df["bias"].isna()] < tol_im).sum()
recall = good_num / len(df_diann)
info = "Based on the pred_im and tol_im-{:.3f}, recall is: {:.3f}".format(
tol_im, recall
)
logger.info(info)
[docs]
def cal_rt_im_recall(ws, df_lib, tol_rt, tol_im):
"""
For developing.
"""
if not cfg.is_compare_mode:
return
df_diann = pd.read_csv(ws / "diann" / "report.tsv", sep="\t")
df_diann = df_diann[df_diann["Q.Value"] < 0.01]
df_diann["pr_id"] = df_diann["Modified.Sequence"] + df_diann[
"Precursor.Charge"
].astype(str)
df_diann = df_diann[["pr_id", "RT", "IM"]]
if "diann_rt" not in df_lib.columns:
df_diann["diann_rt"] = df_diann["RT"] * 60.0
if "group_rank" not in df_lib.columns:
df = df_diann.merge(df_lib, on="pr_id", how="left")
else:
df = df_diann.merge(df_lib[df_lib.group_rank == 1], on="pr_id", how="left")
df["bias_rt"] = (df["pred_rt"] - df["diann_rt"]).abs()
df["bias_im"] = (df["pred_im"] - df["IM"]).abs()
df = df.dropna(subset=["bias_rt", "bias_im"]).reset_index(drop=True)
condition1 = df["bias_rt"] < tol_rt
condition2 = df["bias_im"] < tol_im
good_num = sum(condition1 & condition2)
recall = good_num / len(df_diann)
info = "Based on the pred_rt and pred_im, recall is: {:.3f}".format(recall)
logger.info(info)