from pathlib import Path
import numpy as np
import pandas as pd
from full_dia.log import Logger
try:
_ = profile
except NameError:
[docs]
def profile(func):
return func
logger = Logger.get_logger()
[docs]
class Library:
"""
Reader class of the spectral library.
"""
def __init__(self, dir_lib: str):
"""
Load the library.
"""
dir_lib = Path(dir_lib)
logger.info("Loading lib: " + dir_lib.name)
self.lib_type = dir_lib.suffix
# parquet or tsv
if self.lib_type == ".parquet":
df = pd.read_parquet(dir_lib)
elif self.lib_type == ".tsv":
df = pd.read_csv(dir_lib, sep="\t")
else:
raise ValueError(
f"Unsupported spectral library format: '{self.lib_type}'. "
"Only .parquet and .tsv are supported."
)
self.check_lib(df)
df = df[df["Decoy"] == 0].reset_index(drop=True)
self.df_pr, self.df_map = self.construct_dfs(df)
assert len(self.df_pr) == self.df_pr["pr_id"].nunique()
logger.info(f"Lib prs: {len(self.df_pr)}")
[docs]
def check_lib(self, df: pd.DataFrame) -> None:
"""
Check spectral library:
column names, modifications, charges, loss, proteins, length
"""
required_columns = {
"Precursor.Id",
"Modified.Sequence",
"Stripped.Sequence",
"Precursor.Charge",
"Proteotypic",
"Decoy",
"N.Term",
"C.Term",
"RT",
"IM",
"Q.Value",
"Peptidoform.Q.Value",
"PTM.Site.Confidence",
"PG.Q.Value",
"Precursor.Mz",
"Product.Mz",
"Relative.Intensity",
"Fragment.Type",
"Fragment.Charge",
"Fragment.Series.Number",
"Fragment.Loss.Type",
"Exclude.From.Quant",
"Protein.Ids",
"Protein.Group",
"Protein.Names",
}
# check name
missing_cols = required_columns - set(df.columns)
if missing_cols:
raise ValueError(
f"The spectral library is missing required columns: {sorted(missing_cols)}. "
)
# check modification
x = df["Modified.Sequence"].copy()
x = x.drop_duplicates()
x = x.replace([r"C\(UniMod:4\)", r"M\(UniMod:35\)"], ["c", "m"], regex=True)
if x.str.contains(r"[\(\)0-9]").any():
raise ValueError(
"The spectral library contains unexpected modifications. "
"Only C(UniMod:4) and M(UniMod:35) are allowed."
)
# check charge
x = df["Precursor.Charge"].max()
if x > 4:
raise ValueError(
"The spectral library contains > 4 charge state. "
"Only charge 1-4 are allowed."
)
# check protein
x = df["Protein.Ids"].str.count(";") != df["Protein.Names"].str.count(";")
if x.any():
raise ValueError(
"The spectral library contains inconsistent Protein.IDs and Protein.Names."
)
# check fg type
x1 = df["Fragment.Type"].isin(["b", "y"]).all()
x2 = (df["Fragment.Loss.Type"] == "noloss").all()
if not (x1 and x2):
raise ValueError(
"The spectral library can only contain b/y fragment ions without neutral losses."
)
# check fg length. Full-DIA will encode 'b12_1' to 1121
if df["Fragment.Series.Number"].max() >= 100:
raise ValueError(
"The spectral library can not contain b/y fragment ions with >= 100 aas."
)
def __len__(self):
return len(self.df_pr)
[docs]
@profile
def construct_dfs(self, df: pd.DataFrame) -> tuple:
"""
Construct the df_pr and df_map from DIA-NN's .parquet library.
Parameters
----------
df : pd.DataFrame
The raw DIA-NN's .parquet file.
Returns
-------
tuple
df_pr : pd.DataFrame
Each row corresponds to a precursor and its fragment information.
df_map : pd.DataFrame
Each row represents the protein information corresponding to the peptide in the same row of df_pr.
"""
# info - pr
fg_height_v = df["Relative.Intensity"].values.astype(np.float32)
fg_height_max_idx = np.where(fg_height_v == fg_height_v.max())[0]
pr_id_v = df.loc[fg_height_max_idx, "Precursor.Id"]
good_idx = ~pr_id_v.duplicated() # for case: 1., 1., 0.8
fg_height_max_idx = fg_height_max_idx[good_idx]
pr_id_v = df.loc[fg_height_max_idx, "Precursor.Id"].values
pr_charge_v = df.loc[fg_height_max_idx, "Precursor.Charge"].values
pr_mz_v = df.loc[fg_height_max_idx, "Precursor.Mz"].values
pred_irt_v = df.loc[fg_height_max_idx, "RT"].values
pred_iim_v = df.loc[fg_height_max_idx, "IM"].values
pr_length_v = df.loc[fg_height_max_idx, "Stripped.Sequence"].str.len().values
# info - fg
fg_num_v = np.diff(fg_height_max_idx)
fg_num_v = np.append(fg_num_v, len(df) - fg_height_max_idx[-1])
fg_mz_v = df["Product.Mz"].values.astype(np.float32)
fg_type_v = np.where(df["Fragment.Type"] == "b", 1, 2)
fg_type_v = fg_type_v.astype(np.int16) # b-1, y-2
fg_index_v = df["Fragment.Series.Number"].values.astype(np.int16)
fg_charge_v = df["Fragment.Charge"].values.astype(np.int16)
fg_anno_v = fg_type_v * 1000 + fg_index_v * 10 + fg_charge_v
mask = np.arange(fg_num_v.max()) < fg_num_v[:, None]
fg_mz = np.zeros(mask.shape, dtype=np.float32)
fg_mz[mask] = fg_mz_v
fg_height = np.zeros(mask.shape, dtype=np.float32)
fg_height[mask] = fg_height_v
fg_anno = np.ones(mask.shape, dtype=np.int16) * 3011 # 3 is fg_loss
fg_anno[mask] = fg_anno_v
# df_map
protein_id_v = df.loc[fg_height_max_idx, "Protein.Ids"].values
protein_name_v = df.loc[fg_height_max_idx, "Protein.Names"].values
# pg_v = df.loc[fg_height_max_idx, 'Protein.Group'].values
gene_v = df.loc[fg_height_max_idx, "Genes"].values
df_map = pd.DataFrame()
df_map["protein_id"] = protein_id_v
df_map["protein_name"] = protein_name_v
df_map["gene"] = gene_v
x = df_map["protein_id"].str.count(";")
y = df_map["protein_name"].str.count(";")
assert (x == y).all(), "Protein ID/Name are not corresponding relationships!"
# df_pr
df_pr = pd.DataFrame()
df_pr["pr_id"] = pr_id_v
df_pr["pr_charge"] = pr_charge_v.astype(np.int8)
df_pr["pr_mz"] = pr_mz_v.astype(np.float32)
df_pr["pr_len"] = pr_length_v.astype(np.int8)
df_pr["pred_irt"] = pred_irt_v.astype(np.float32)
df_pr["pred_iim"] = pred_iim_v.astype(np.float32)
df_pr["fg_num"] = fg_num_v.astype(np.int8)
df_pr["pr_index"] = df_pr.index.values.astype(np.int32)
fg_num = fg_mz.shape[1]
df_pr[["fg_mz_" + str(i) for i in range(fg_num)]] = fg_mz
df_pr[["fg_height_" + str(i) for i in range(fg_num)]] = fg_height
df_pr[["fg_anno_" + str(i) for i in range(fg_num)]] = fg_anno
assert len(df_pr) == len(df_map)
return df_pr, df_map
# @profile
[docs]
def polish_lib_by_swath(
self, swath: np.ndarray, ws_diann: Path | None = None
) -> pd.DataFrame:
"""
Remove prs whose m/z values are not in the range of SWATH settings.
Parameters
----------
swath : np.ndarray
The SWATH settings.
ws_diann : Path, default=None
For developing.
Returns
-------
df_lib : pd.DataFrame
The polished library.
"""
df_lib = self.df_pr
# for developing
if ws_diann is not None:
df_diann = pd.read_csv(ws_diann / "diann" / "report.tsv", sep="\t")
df_diann = df_diann[df_diann["Q.Value"] < 0.01]
df_diann = df_diann[
[
"Modified.Sequence",
"Precursor.Charge",
"RT",
"IM",
"Precursor.Quantity",
]
]
df_diann["diann_rt"] = df_diann["RT"] * 60.0
df_diann["diann_im"] = df_diann["IM"]
df_diann["diann_pr_quant"] = df_diann["Precursor.Quantity"]
df_diann["pr_id"] = df_diann["Modified.Sequence"] + df_diann[
"Precursor.Charge"
].astype(str)
df = pd.merge(df_lib, df_diann, on="pr_id")
df = df.reset_index(drop=True)
del df["Modified.Sequence"]
del df["Precursor.Charge"]
del df["RT"]
del df["IM"]
del df["Precursor.Quantity"]
df_lib = df
# screen prs by range of m/z
pr_mz = df_lib["pr_mz"].values
pr_mz_min, pr_mz_max = swath[0], swath[-1]
good_idx = (pr_mz > pr_mz_min) & (pr_mz < pr_mz_max)
df_lib = df_lib.iloc[good_idx].reset_index(drop=True)
# drop duplicates
df_lib = df_lib.drop_duplicates(subset="pr_id", ignore_index=True)
assert len(df_lib) == df_lib.pr_id.nunique()
# remove BJOUXZ
df_lib["simple_seq"] = (
df_lib["pr_id"]
.str[:-1]
.replace(["C\(UniMod:4\)", "M\(UniMod:35\)"], ["c", "m"], regex=True)
)
bad_idx = df_lib["simple_seq"].str.contains("[BJOUXZ]", regex=True)
df_lib = df_lib[~bad_idx].reset_index(drop=True)
# fg_num >= 4
df_lib = df_lib[df_lib.fg_num >= 4]
df_lib = df_lib.reset_index(drop=True)
# pred_im
df_lib["pred_im"] = df_lib["pred_iim"]
# pr_mz_iso
mass_neutron = 1.0033548378
pr_mass = df_lib["pr_mz"] * df_lib["pr_charge"]
pr_mz_1H = (pr_mass + mass_neutron) / df_lib["pr_charge"]
pr_mz_2H = (pr_mass + 2 * mass_neutron) / df_lib["pr_charge"]
pr_mz_left = (pr_mass - mass_neutron) / df_lib["pr_charge"]
df_lib["pr_mz_1H"] = pr_mz_1H.astype(np.float32)
df_lib["pr_mz_2H"] = pr_mz_2H.astype(np.float32)
df_lib["pr_mz_left"] = pr_mz_left.astype(np.float32)
# assign swath_id
swath_id = np.digitize(df_lib["pr_mz"].values, swath)
df_lib["swath_id"] = swath_id.astype(np.int8)
idx = np.argsort(swath_id)
df_lib = df_lib.iloc[idx].reset_index(drop=True)
# decoy
df_lib["decoy"] = np.uint8(0)
# shuffle
np.random.seed(1)
df_lib = df_lib.sample(frac=1, random_state=1).reset_index(drop=True)
logger.info(f"Polishing spectral library: {len(df_lib)}prs")
return df_lib
[docs]
def assign_proteins(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Assign proteins based on the precursor index from raw df_map.
Parameters
----------
df : pd.DataFrame
Provide the "pr_index" column.
Returns
-------
df : pd.DataFrame
Add new columns: "protein_id", "protein_name", "proteotypic"
"""
# find corresponding protein and name by pr_index
if self.lib_type == ".parquet":
df_map = self.df_map
pr_index_q = df["pr_index"].values
result_protein_id = df_map.loc[pr_index_q, "protein_id"]
result_protein_name = df_map.loc[pr_index_q, "protein_name"]
df["protein_id"] = result_protein_id.values
df["protein_name"] = result_protein_name.values
df["proteotypic"] = df["protein_id"].str.count(";") + 1
df.loc[df["proteotypic"] != 1, "proteotypic"] = 0
# add DECOY
if "decoy" in df.columns:
decoy_idx = df["decoy"] == 1
df.loc[decoy_idx, "protein_id"] = "DECOY_" + df.loc[decoy_idx, "protein_id"]
df.loc[decoy_idx, "protein_id"] = df.loc[decoy_idx, "protein_id"].replace(
";", ";DECOY_", regex=True
)
df.loc[decoy_idx, "protein_name"] = (
"DECOY_" + df.loc[decoy_idx, "protein_name"]
)
df.loc[decoy_idx, "protein_name"] = df.loc[
decoy_idx, "protein_name"
].replace(";", ";DECOY_", regex=True)
return df
[docs]
def assign_fg_mz(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Assign fg mz values based on the precursor index from raw df_pr.
Parameters
----------
df : pd.DataFrame
Provide the "pr_index" column.
Returns
-------
df : pd.DataFrame
Add the m/z value columns of fragment ions.
"""
cols = ["fg_mz_" + str(i) for i in range(12)]
pr_index_q = df["pr_index"].values
df[cols] = self.df_pr.loc[pr_index_q, cols].values
return df