import networkx as nx
import numpy as np
import pandas as pd
from full_dia.log import Logger
try:
_ = profile
except NameError:
[docs]
def profile(func):
return func
logger = Logger.get_logger()
[docs]
def assemble_pep_to_pg_core(graph: nx.Graph) -> tuple:
"""
Perform IDPicker algorithm on pep-protein bipartite graph.
Parameters
----------
graph : nx.Graph
The bipartite graph of protein and peptide that needs assignment.
Returns
-------
tuple
protein_v : list
The proteins after assignment.
peptide_v : list of list
The peptides after assignment.
"""
graph = nx.freeze(graph)
graph = nx.Graph(graph)
left_nodes = [
node for node, data in graph.nodes(data=True) if data["bipartite"] == 0
]
right_nodes = [
node for node, data in graph.nodes(data=True) if data["bipartite"] == 1
]
protein_v, peptide_v = [], []
while right_nodes:
# select nodes with most edges, if in tie, select the best edge's nodes
# after removing, some nodes no edges, max return 0
df = [
[
node,
len(graph[node]),
max((edge["weight"] for edge in graph[node].values()), default=0),
sum(edge["weight"] for edge in graph[node].values()),
]
for node in left_nodes
]
df = pd.DataFrame(df, columns=["Node", "Degree", "CScore_Max", "CScore_Sum"])
df["N"] = df["Node"].str.count(";")
df = df.sort_values(
by=["Degree", "CScore_Max", "CScore_Sum", "N", "Node"],
ascending=[False, False, False, True, True],
)
df = df.reset_index(drop=True)
node = df.loc[0, "Node"]
neighbors = list(graph.neighbors(node))
protein_v.append(node)
peptide_v.append(neighbors)
graph.remove_nodes_from([node] + neighbors)
left_nodes = [
node for node, data in graph.nodes(data=True) if data["bipartite"] == 0
]
right_nodes = [
node for node, data in graph.nodes(data=True) if data["bipartite"] == 1
]
return protein_v, peptide_v
[docs]
def assemble_pep_to_pg(
df_input: pd.DataFrame, q_cut_infer: float, run_or_global: str
) -> pd.DataFrame:
"""
Assemble peps to pgs.
Parameters
----------
df_input : pd.DataFrame
Must have columns: protein_id and simple_seq/strip_seq
q_cut_infer : float
Q-value cutoff to select peptides to assembly.
run_or_global : {'run', 'global'}
Assemble on run or global level.
Returns
-------
df : pd.DataFrame
Copy of df_input with a new column: protein_group.
"""
col_q_pr = "q_pr_" + run_or_global
col_cscore_pr = "cscore_pr_" + run_or_global
if "strip_seq" not in df_input.columns:
if "simple_seq" not in df_input.columns:
df_input["simple_seq"] = (
df_input["pr_id"]
.str[:-1]
.replace([r"C\(UniMod:4\)", r"M\(UniMod:35\)"], ["c", "m"], regex=True)
)
df_input["strip_seq"] = df_input["simple_seq"].str.upper()
df = df_input[df_input[col_q_pr] < q_cut_infer]
df = df[["protein_id", "strip_seq", col_cscore_pr]]
df["protein_id"] = df["protein_id"].str.split(";")
proteins = df["protein_id"].explode().values
protein_num = df["protein_id"].apply(len)
df = df.loc[np.repeat(df.index, protein_num)]
df = df.reset_index(drop=True)
df["protein_id"] = proteins
# protein meta
df_protein = df.groupby("protein_id", sort=False)["strip_seq"].agg(set)
df_protein = df_protein.reset_index()
df_protein["strip_seq"] = df_protein["strip_seq"].apply(tuple)
df_protein = df_protein.groupby("strip_seq", sort=False)["protein_id"].agg(set)
df_protein = df_protein.reset_index()
# corresponding
df_protein["Protein.Meta"] = df_protein["protein_id"].str.join(";")
proteins = df_protein["protein_id"].explode().values
protein_num = df_protein["protein_id"].apply(len)
df_protein = df_protein.loc[np.repeat(df_protein.index, protein_num)].reset_index(
drop=True
)
df_protein["Protein"] = proteins
df_protein = df_protein[["Protein", "Protein.Meta"]]
df_protein.set_index("Protein", inplace=True)
# from 1 vs. 1 to meta vs. meta
df["Protein.Meta"] = df_protein.loc[df["protein_id"]]["Protein.Meta"].values
df["Peptide.Meta"] = df["strip_seq"] # no need to make peptide.meta
df = df[["Protein.Meta", "Peptide.Meta", col_cscore_pr]]
df = df.sort_values(col_cscore_pr, ascending=False)
df = df.drop_duplicates(
subset=["Protein.Meta", "Peptide.Meta"], keep="first"
).reset_index(drop=True)
# graph
graph = nx.Graph()
graph.add_nodes_from(df["Protein.Meta"], bipartite=0)
graph.add_nodes_from(df["Peptide.Meta"], bipartite=1)
edges = [
(row["Protein.Meta"], row["Peptide.Meta"], row[col_cscore_pr])
for _, row in df.iterrows()
]
graph.add_weighted_edges_from(edges)
# assign
protein_v, peptide_v = [], []
subgraphs = list(nx.connected_components(graph))
for subgraph in subgraphs:
subgraph = graph.subgraph(subgraph)
proteins, peptides = assemble_pep_to_pg_core(subgraph)
protein_v.extend(proteins)
peptide_v.extend(peptides)
df = pd.DataFrame({"strip_seq": peptide_v, "protein_group": protein_v})
pep_num = df["strip_seq"].apply(len).values
peptide_v = df["strip_seq"].explode().tolist()
df = df.loc[np.repeat(df.index, pep_num)]
df = df.reset_index(drop=True)
df["strip_seq"] = peptide_v
# result
df = df_input.merge(df, on="strip_seq", how="left").reset_index(drop=True)
not_in_range = df["protein_group"].isna()
df.loc[not_in_range, "protein_group"] = df.loc[not_in_range, "protein_id"]
return df