import os
import warnings
import numpy as np
import pandas as pd
from sklearn.ensemble import VotingClassifier
from sklearn.exceptions import ConvergenceWarning
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
from full_dia import cfg, utils
from full_dia.log import Logger
os.environ["PYTHONWARNINGS"] = "ignore" # multiprocess
warnings.simplefilter("ignore")
warnings.filterwarnings(action="ignore", category=UserWarning)
warnings.filterwarnings(action="ignore", category=ConvergenceWarning)
try:
_ = profile
except NameError:
[docs]
def profile(func):
return func
logger = Logger.get_logger()
[docs]
def adjust_rubbish_q(df: pd.DataFrame, batch_num: int) -> None:
"""
If the #ids are less than 5000, then we can set the rubbish_q_cut to 0.75 to save more #ids.
"""
ids = df[
(df["q_pr_run"] < 0.01) & (df["decoy"] == 0) & (df["group_rank"] == 1)
].pr_id.nunique()
ids = ids * batch_num
if ids < 5000:
cfg.rubbish_q_cut = 0.75
else:
cfg.rubbish_q_cut = cfg.rubbish_q_cut
[docs]
def cal_q_pr_core(df: pd.DataFrame, run_or_global: str):
"""
The core function to calculate the q values of peps using DIA-NN's model.
Parameters
----------
df : pd.DataFrame
Provide "cscore" values of peptides.
run_or_global : str
Specify the calculation on "run" or "global" level.
Returns
-------
df : pd.DataFrame
Add a new column: 'q_pr'.
"""
col_score = "cscore_pr_" + run_or_global
col_out = "q_pr_" + run_or_global
df = df.sort_values(by=col_score, ascending=False, ignore_index=True)
target_num = (df.decoy == 0).cumsum()
decoy_num = (df.decoy == 1).cumsum()
target_num[target_num == 0] = 1
df[col_out] = decoy_num / target_num
df[col_out] = df[col_out][::-1].cummin()
return df
[docs]
@profile
def cal_q_pr_batch(
df: pd.DataFrame,
batch_size: int,
n_model: int,
model_trained: list[MLPClassifier] | None = None,
scaler: StandardScaler | None = None,
) -> tuple:
"""
Calculate the q values of peptides in a batch.
Parameters
----------
df : pd.DataFrame
Provide columns: "pr_id", "score_", "decoy".
batch_size : int
The batch size for training an MLP.
n_model : int
The number of models to ensemble.
model_trained : list[MLPClassifier], default=None
The trained ensemble models or None.
scaler : StandardScaler, default=None
The trained scaler or None.
Returns
-------
tuple
df : pd.DataFrame
Add new columns: "cscore_pr_run", "group_rank", "q_pr_run"
model_trained : list[MLPClassifier]
The trained ensemble models.
scaler : StandardScaler
The trained scaler.
"""
col_idx = df.columns.str.startswith("score_")
# assert sum(col_idx) == 392
# logger.info('cols num: {}'.format(sum(col_idx)))
X = df.loc[:, col_idx].values
assert X.dtype == np.float32
y = 1 - df["decoy"].values # targets is positives
if scaler is None:
scaler = StandardScaler()
X = scaler.fit_transform(X) # no scale to Tree models
else:
X = scaler.transform(X)
# train
if model_trained is None: # the first batch
decoy_deeps = df.loc[df["decoy"] == 1, "score_big_deep_pre"].values
decoy_m, decoy_u = np.mean(decoy_deeps), np.std(decoy_deeps)
good_cut = min(0.5, decoy_m + 1.5 * decoy_u)
logger.info(f"Training with big_score_cut: {good_cut:.2f}")
train_idx = (df["group_rank"] == 1) & (df["score_big_deep_pre"] > good_cut)
X_train = X[train_idx]
y_train = y[train_idx]
n_pos, n_neg = sum(y_train == 1), sum(y_train == 0)
info = "Training the NN model: {} pos, {} neg".format(n_pos, n_neg)
logger.info(info)
param = (25, 20, 15, 10, 5)
mlps = [
MLPClassifier(
max_iter=1,
shuffle=True,
random_state=i, # init weights and shuffle
learning_rate_init=0.003,
solver="adam",
batch_size=batch_size, # DIA-NN is 50
activation="relu",
hidden_layer_sizes=param,
)
for i in range(n_model)
]
names = [f"mlp{i}" for i in range(n_model)]
model = VotingClassifier(
estimators=list(zip(names, mlps)),
voting="soft",
n_jobs=1 if __debug__ else n_model,
)
model.fit(X_train, y_train)
n_pos, n_neg = sum(y == 1), sum(y == 0)
info = "Predicting by the NN model: {} pos, {} neg".format(n_pos, n_neg)
logger.info(info)
cscore = model.predict_proba(X)[:, 1]
else:
model = model_trained
n_pos, n_neg = sum(y == 1), sum(y == 0)
info = "Predicting by the NN model: {} pos, {} neg".format(n_pos, n_neg)
logger.info(info)
cscore = model.predict_proba(X)[:, 1]
df["cscore_pr_run"] = cscore
# group rank
group_size = df.groupby("pr_id", sort=False).size()
group_size_cumsum = np.concatenate([[0], np.cumsum(group_size)])
group_rank = utils.cal_group_rank(df["cscore_pr_run"].values, group_size_cumsum)
df["group_rank"] = group_rank
df = df.loc[group_rank == 1]
df = cal_q_pr_core(df, "run")
return df, model, scaler
[docs]
@profile
def cal_q_pr_combined(df: pd.DataFrame, batch_size: int, n_model: int) -> pd.DataFrame:
"""
Calculate the q values of peptides across the batches using DIA-NN's model.
Parameters
----------
df : pd.DataFrame
Provide columns: "pr_id", "score_", "decoy".
batch_size : int
The batch size for training an MLP.
n_model : int
The number of models to ensemble.
Returns
-------
df : pd.DataFrame
Add new columns: "cscore_pr_run", "q_pr_run"
"""
col_idx = df.columns.str.startswith("score_")
logger.info("scores items: {}".format(sum(col_idx)))
X = df.loc[:, col_idx].values
assert X.dtype == np.float32
y = 1 - df["decoy"].values # targets is positives
# select by train_nn_type: 1-hard, 2-easy, 3-cross
scaler = StandardScaler()
X = scaler.fit_transform(X) # no scale to Tree models
# train
n_pos, n_neg = sum(y == 1), sum(y == 0)
info = "Training the NN model: {} pos, {} neg".format(n_pos, n_neg)
logger.info(info)
param = (25, 20, 15, 10, 5)
mlps = [
MLPClassifier(
max_iter=1,
shuffle=True,
random_state=i, # init weights and shuffle
learning_rate_init=0.003,
solver="adam",
batch_size=batch_size, # DIA-NN is 50
activation="relu",
hidden_layer_sizes=param,
)
for i in range(n_model)
]
names = [f"mlp{i}" for i in range(n_model)]
model = VotingClassifier(
estimators=list(zip(names, mlps)),
voting="soft",
n_jobs=1 if __debug__ else n_model,
)
model.fit(X, y)
# pred
cscore = model.predict_proba(X)[:, 1]
df["cscore_pr_run"] = cscore
# mirrors does not involve this
df = cal_q_pr_core(df, "run")
return df
[docs]
def cal_q_pg(
df_input_raw: pd.DataFrame, q_pr_cut: float, run_or_global: str
) -> pd.DataFrame:
"""
Calculate the q values of pgs based on the assigned peptides.
Parameters
----------
df_input_raw : pd.DataFrame
Provide columns: "strip_seq", "cscore_pr", "decoy", "q_pr".
q_pr_cut : float
The q value cut to select which peps will be used to calculate the cscore of a pg.
run_or_global : str
Specify the calculation on "run" or "global" level.
Returns
-------
df_res : pd.DataFrame
Add new columns: "cscore_pg", "q_pg"
"""
x = run_or_global
df_na = df_input_raw[df_input_raw["cscore_pr_" + x].isna()]
df_input = df_input_raw[~df_input_raw["cscore_pr_" + x].isna()]
if "strip_seq" not in df_input.columns:
if "simple_seq" not in df_input.columns:
df_input["simple_seq"] = (
df_input["pr_id"]
.str[:-1]
.replace([r"C\(UniMod:4\)", r"M\(UniMod:35\)"], ["c", "m"], regex=True)
)
df_input["strip_seq"] = df_input["simple_seq"].str.upper()
# seq to strip_seq
df_pep_score = df_input[["strip_seq", "cscore_pr_" + x]].copy()
idx_max = df_pep_score.groupby(["strip_seq"])["cscore_pr_" + x].idxmax()
df_pep_score = df_pep_score.loc[idx_max].reset_index(drop=True)
# row by protein group
df = df_input[df_input["q_pr_" + x] < q_pr_cut]
df = df[["strip_seq", "protein_group", "decoy"]]
df = df.drop_duplicates().reset_index(drop=True)
df = df.merge(df_pep_score, on="strip_seq")
df = (
df.groupby(by=["protein_group", "decoy"])
.agg(
{
("cscore_pr_" + x): lambda g: 1 - (1 - g).prod(),
# ('cscore_pr_' + x): lambda g: g.nlargest(1).sum(),
"strip_seq": lambda g: list(g),
}
)
.reset_index()
)
df = df.rename(columns={("cscore_pr_" + x): ("cscore_pg_" + x)})
# q
df = df.sort_values(by=("cscore_pg_" + x), ascending=False, ignore_index=True)
target_num = (df.decoy == 0).cumsum()
decoy_num = (df.decoy == 1).cumsum()
target_num[target_num == 0] = 1
df["q_pg_" + x] = decoy_num / target_num
df["q_pg_" + x] = df["q_pg_" + x][::-1].cummin()
df = df[["protein_group", "decoy", "cscore_pg_" + x, "q_pg_" + x]]
# return
df_result = df_input.merge(df, on=["protein_group", "decoy"], how="left")
not_in_range = df_result["q_pg_" + x].isna()
df_result.loc[not_in_range, "cscore_pg_" + x] = 0.0
df_result.loc[not_in_range, "q_pg_" + x] = 1
df_na["cscore_pg_" + x] = np.float32(0.0)
df_na["q_pg_" + x] = np.float32(1.0)
df_result = pd.concat([df_result, df_na], axis=0, ignore_index=True)
return df_result