import math
import numba
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from numba import cuda
from full_dia import cfg, utils
from full_dia.log import Logger
try:
_ = profile
except NameError:
[docs]
def profile(func):
return func
logger = Logger.get_logger()
[docs]
@cuda.jit(device=True)
def gpu_cal_sa(v):
"""
Calculate the sa between V and Gaussian Vector: [0.0044, 0.054, 0.242, 0.399, 0.242, 0.054, 0.0044]
"""
e = 0.000001
norm_x = (
math.sqrt(
v[0] ** 2
+ v[1] ** 2
+ v[2] ** 2
+ v[3] ** 2
+ v[4] ** 2
+ v[5] ** 2
+ v[6] ** 2
)
+ e
)
# y = np.array([0.0044, 0.054, 0.242, 0.399, 0.242, 0.054, 0.0044])
norm_y = 0.531225
s = (
v[0] * 0.0044
+ v[1] * 0.054
+ v[2] * 0.242
+ v[3] * 0.399
+ v[4] * 0.242
+ v[5] * 0.054
+ v[6] * 0.0044
)
sa = s / (norm_x * norm_y)
if sa > 1.0:
sa = 1.0
return sa
[docs]
@cuda.jit
def gpu_sa_gausion_core(block_num, xics, scores, window_points, valids_num):
"""
Using share-memory to calculate the sa for each locus
"""
# Each block calculates a profile
tx = cuda.threadIdx.x
bx = cuda.blockIdx.x
blockdim = cuda.blockDim.x
if bx >= block_num:
return
ions_num = xics.shape[1]
point_num = xics.shape[2]
k = bx // ions_num
xic_idx = bx % ions_num
# less valid num
valid_num = valids_num[k]
if xic_idx > valid_num - 1:
return
x = xics[k, xic_idx]
half = window_points // 2
# copy to share memory
# 1h~2000 points,3h~6000 points
if xics.shape[2] < 500:
share_xic = cuda.shared.array(500, dtype=numba.float32)
elif xics.shape[2] < 1500:
share_xic = cuda.shared.array(1500, dtype=numba.float32)
else:
share_xic = cuda.shared.array(5500, dtype=numba.float32)
# pad for start and end
if tx == (blockdim - 1):
share_xic[0] = 0.0
elif tx == (blockdim - 2):
share_xic[1] = 0.0
elif tx == (blockdim - 3):
share_xic[2] = 0.0
elif tx == (blockdim - 4):
share_xic[half + point_num] = 0.0
elif tx == (blockdim - 5):
share_xic[half + point_num + 1] = 0.0
elif tx == (blockdim - 6):
share_xic[half + point_num + 2] = 0.0
mean_cols = int(point_num / blockdim)
if mean_cols < 1: # less 32
if tx < point_num:
share_xic[half + tx] = x[tx]
cuda.syncthreads()
# score
if tx < point_num:
v = share_xic[tx : (tx + 1 + half + half)]
score = gpu_cal_sa(v)
scores[k, xic_idx, tx] = score
else:
rest_cols = point_num - mean_cols * blockdim
for i in range(mean_cols): # together
share_xic[half + tx * mean_cols + i] = x[tx * mean_cols + i]
if tx < rest_cols: # wind up
share_xic[half + blockdim * mean_cols + tx] = x[blockdim * mean_cols + tx]
cuda.syncthreads()
# score
# together,each thread processes mean_cols
xx = share_xic[(tx * mean_cols) : ((tx + 1) * mean_cols + half + half)]
for i in range(half, len(xx) - half):
v = xx[(i - half) : (i + half + 1)]
score = gpu_cal_sa(v)
scores[k, xic_idx, i - half + tx * mean_cols] = score
# wind up
if tx < rest_cols:
v = share_xic[
(blockdim * mean_cols + tx) : (
blockdim * mean_cols + tx + 1 + half + half
)
]
score = gpu_cal_sa(v)
scores[k, xic_idx, blockdim * mean_cols + tx] = score
[docs]
@profile
def cal_coelution_by_gaussion(xics, window_points: int, valids_num: int) -> tuple:
"""
Coelution sa scores by sliding windows methods.
Parameters
----------
xics : numba.cuda.devicearray.DeviceNDArray
The extracted xics on GPU device.
window_points : int
Fixed to 7 cycles when computing the SA scores.
valids_num : int
The number of fragment ions + 2 (pr and unfragmented pr)
Returns
-------
tuple
scores : torch.Tensor
The mean SA coelution scores for peak groups.
scores_raw : torch.Tensor
The raw SA coelution scores for peak groups.
"""
valids_num = torch.from_numpy(valids_num).to(cfg.gpu_id)
# block -- profile
block_num = xics.shape[0] * xics.shape[1]
scores = utils.create_cuda_zeros(xics.shape)
threads_per_block = 32
gpu_sa_gausion_core[block_num, threads_per_block](
block_num, xics, scores, window_points, valids_num
)
cuda.synchronize()
scores = utils.convert_numba_to_tensor(scores)
scores_raw = 1 - 2 * torch.acos(scores) / np.pi # [k, f, n]
scores = torch.sum(scores_raw, dim=1)
scores = scores / valids_num.view(-1, 1)
# ends
scores[:, :3] = 0.0
scores[:, -3:] = 0.0
scores_raw[:, :, :3] = 0.0
scores_raw[:, :, -3:] = 0.0
return scores, scores_raw
# @profile
[docs]
def gpu_simple_smooth(input_xics):
"""
Smooth the xics (n_pep * n_ion * n_cycle) extracted from raw MS data.
"""
n = input_xics.shape[0] * input_xics.shape[1]
result_xics = utils.create_cuda_zeros(input_xics.shape)
threads_per_block = 32 # block -- profile
blocks_per_grid = n
gpu_simple_smooth_core[blocks_per_grid, threads_per_block](
n, input_xics, result_xics
)
cuda.synchronize()
return result_xics
[docs]
@cuda.jit
def gpu_simple_smooth_core(n, input_xics, output):
"""
The core of gpu_simple_smooth using a weighted mean method.
"""
# block -- profile
tx = cuda.threadIdx.x
bx = cuda.blockIdx.x
# input
ions_num = input_xics.shape[1]
k = bx // ions_num
xic_idx = bx % ions_num
input_xic = input_xics[k, xic_idx]
# no share-memory, directly in global memory
# [0, n-2] --> mean_cols; n-1 --> rest_cols
blockdim = cuda.blockDim.x
mean_cols = int(input_xics.shape[2] / blockdim)
rest_cols = input_xics.shape[2] - mean_cols * (blockdim - 1)
if tx < blockdim - 1:
for i in range(mean_cols):
idx = tx * mean_cols + i
if idx == 0:
output[k, xic_idx, idx] = (
0.667 * input_xic[idx] + 0.333 * input_xic[idx + 1]
)
else:
output[k, xic_idx, idx] = input_xic[idx] * 0.5 + 0.25 * (
input_xic[idx + 1] + input_xic[idx - 1]
)
else:
for i in range(rest_cols):
idx = (blockdim - 1) * mean_cols + i
if idx == input_xics.shape[2] - 1:
output[k, xic_idx, idx] = (
0.333 * input_xic[idx - 1] + 0.667 * input_xic[idx]
)
else:
output[k, xic_idx, idx] = input_xic[idx] * 0.5 + 0.25 * (
input_xic[idx + 1] + input_xic[idx - 1]
)
[docs]
@cuda.jit(device=True)
def find_maximum(
scan_im,
scan_mz,
scan_height,
query_left,
query_right,
query_im_left,
query_im_right,
):
"""
Find the maximum intensity value with tol for query in centroided data
"""
scan_len = len(scan_mz)
low = 0
high = scan_len - 1
best_j = 0
if scan_mz[low] == query_left:
best_j = low
elif scan_mz[high] == query_right:
best_j = high
else:
while high - low > 1:
mid = (low + high) // 2
if scan_mz[mid] == query_left:
best_j = mid
break
if scan_mz[mid] < query_left:
low = mid
else:
high = mid
if best_j == 0: # no match,high-low=1
if abs(scan_mz[low] - query_left) < abs(scan_mz[high] - query_left):
best_j = low
else:
best_j = high
# find first match in list!
while best_j > 0:
if scan_mz[best_j - 1] == scan_mz[best_j]:
best_j = best_j - 1
else:
break
seek_idx = best_j
best_seek = -1
y_max = 0
while seek_idx < scan_len:
x = scan_mz[seek_idx]
if x > query_right:
break
elif x < query_left: # exist multiple mz values
seek_idx += 1
continue
else:
im = scan_im[seek_idx]
if query_im_left < im < query_im_right:
y = scan_height[seek_idx]
if y > y_max:
y_max = y
best_seek = seek_idx
seek_idx += 1
if best_seek > 0:
im = scan_im[best_seek]
mz = scan_mz[best_seek]
else:
im = -1.0
mz = -1.0
return im, mz, y_max
[docs]
@profile
def cal_measure_im(
locus_ims: np.ndarray, locus_sas: np.ndarray, good_cut: float = 0.5
) -> np.ndarray:
"""
Calculate the measure_im for each locus, weighting with the sa values.
Parameters
----------
locus_ims : np.ndarray
Ion mobility values for locus. Dimension: [n_locus, n_ion]
locus_sas: np.ndarray
SA scores for locus. Dimension: [n_locus, n_ion]
good_cut : float, default=0.5
Only considering the ion with good_cut threshold
Returns
-------
locus_im : np.ndarray
The weighted mean ion mobility values. Dimension: [n_locus]
"""
condition1 = locus_ims <= 0.0
condition2 = locus_sas < locus_sas.max(axis=-1, keepdims=True) * good_cut
bad_idx = condition1 | condition2
locus_ims[bad_idx] = 0.0
locus_sas[bad_idx] = 0.0
locus_sas += 1e-7
locus_im = np.average(locus_ims, weights=locus_sas, axis=-1)
# assert locus_im.min() > 0.2
# assert locus_im.max() < 2.
return locus_im
[docs]
def reserve_sa_maximum(x: torch.Tensor) -> torch.Tensor:
"""
If x > x-1 and x > x+1, x is local maximum will be saved. If not, assign 0
Parameters
----------
x : torch.Tensor
SA raw values with dimension: [n_pep, n_cycle]
Returns
-------
x : torch.Tensor
SA values after suppression with dimension: [n_pep, n_cycle]
"""
x_pad = F.pad(x, (1, 1))
idx = (x_pad[:, 1:-1] > x_pad[:, 2:]) & (x_pad[:, 1:-1] > x_pad[:, 0:-2])
x[~idx] = 0
return x
[docs]
def screen_locus_by_sa(scores_sa: np.ndarray, top_sa_cut: float) -> np.ndarray:
"""
Screen multi locus of a pr that satisfy: local maximum, quantile1, quantile2
Parameters
----------
scores_sa : np.ndarray
Scores of locus.
top_sa_cut : float
Quantile threshold on sa level
Returns
-------
scores_sa : np.ndarray
Bad points have already assigned zero values.
"""
median_values = scores_sa.quantile(top_sa_cut, dim=1, keepdim=True)
rowmax_values = scores_sa.amax(dim=1, keepdim=True)
# local maximum
scores_sa = reserve_sa_maximum(scores_sa)
# screen
condition1 = (scores_sa / rowmax_values) < top_sa_cut
condition2 = scores_sa < median_values
bad_idx = condition1 | condition2
scores_sa[bad_idx] = 0.0
return scores_sa
[docs]
@profile
def screen_locus_by_deep(
df_batch: pd.DataFrame, locus_num: int, top_deep_q: float
) -> pd.DataFrame:
"""
Screen locus of a pr by deep scores.
Parameters
----------
df_batch : pd.DataFrame
Provide columns: "pr_id", "seek_score_deep", "seek_score_sa_x_deep"
n_pep * n_locus rows
top_deep_q : float
Threshold for deep_x / deep_max
Returns
-------
df_batch : pd.DataFrame
Less rows after screen.
"""
group_size_cumsum = np.concatenate([[0], np.cumsum(locus_num)])
group_rank_deep = utils.cal_group_rank(
df_batch["seek_score_deep"].values, group_size_cumsum
)
group_rank_x = utils.cal_group_rank(
df_batch["seek_score_sa_x_deep"].values, group_size_cumsum
)
# screen by top-n
condition1 = (group_rank_deep <= 2) | (group_rank_x <= 2)
# screen by ratio
group_max = df_batch.groupby("pr_id")["seek_score_sa_x_deep"].transform("max")
ratios = df_batch["seek_score_sa_x_deep"] / group_max
condition2 = ratios > top_deep_q
idx = condition1 & condition2
df_batch = df_batch[idx].reset_index(drop=True)
return df_batch
[docs]
def concat_nonzero_locus(
locus: np.ndarray, scores_sa: torch.Tensor, scores_sa_m: torch.Tensor
) -> tuple:
"""
After screening locus by sa, sa_input has much zero values. Select and
concat the nonzero values to vectors.
Parameters
----------
locus : np.ndarray
The locus of extracted xics. Dimension: [n_pep, n_locus]
scores_sa : torch.Tensor
The SA locus scores of extracted xics. Dimension: [n_pep, n_locus]
scores_sa_m : torch.Tensor
The SA ion scores of extracted xics. Dimension: [n_pep, n_ion, n_locus]
Returns
-------
tuple
locus_v : np.ndarray
The candidate locus after screening in a vector.
locus_num : np.ndarray
Indicate how many locus retained after screening for a peptide.
locus_sa_v : np.ndarray
The SA locus scores of candidate locus.
locus_sas : np.ndarray
The SA ion scores of candidate locus.
"""
good_idx = scores_sa > 0
locus_sa_v = scores_sa[good_idx].cpu().numpy()
locus_sas = scores_sa_m.transpose(1, 2)[good_idx].cpu().numpy()
good_idx = good_idx.cpu().numpy()
locus_v = locus[good_idx]
locus_num = good_idx.sum(axis=1)
assert len(locus_v) == len(locus_sas) == locus_num.sum()
return locus_v, locus_num, locus_sa_v, locus_sas
[docs]
def estimate_xic_boundary(xics: torch.Tensor, sa_gausion_m: torch.Tensor) -> tuple:
"""
Exstimate the boundary of an elution group in cycles.
Parameters
----------
xics : torch.Tensor
Dimension: [n_pep, n_ion, 13]
sa_gausion_m : torch.Tensor
Dimension: [n_pep, n_ion]
Returns
-------
tuple
left_idx_1d : np.ndarray
The start index for locus.
right_idx_1d : np.ndarray
The end index for locus.
"""
center_idx = int(xics.shape[-1] / 2)
sa_sum = sa_gausion_m.sum(dim=1)
# find valley
x_pad = F.pad(xics, (1, 1))
left_condition1 = x_pad[:, :, 1:-1] < x_pad[:, :, 2:]
left_condition2 = x_pad[:, :, 1:-1] <= x_pad[:, :, 0:-2]
right_condition1 = x_pad[:, :, 1:-1] <= x_pad[:, :, 2:]
right_condition2 = x_pad[:, :, 1:-1] < x_pad[:, :, 0:-2]
# left
condition = left_condition1 & left_condition2
condition = condition[:, :, :center_idx].int()
condition = condition.flip(2)
left_idx = torch.argmax(condition, dim=2) # first left valley
left_idx = center_idx - 1 - left_idx
no_valley_idx = condition.sum(dim=2) == 0
left_idx[no_valley_idx] = center_idx - 3 # no valley, set to half of 7
left_idx = (left_idx * sa_gausion_m).sum(dim=1) / (sa_sum + 1e-7)
left_idx = torch.round(left_idx) # or ceil
# right
condition = right_condition1 & right_condition2
condition = condition[:, :, (center_idx + 1) :].int()
right_idx = torch.argmax(condition, dim=2) # first right valley
right_idx = center_idx + 1 + right_idx
no_valley_idx = condition.sum(dim=2) == 0
right_idx[no_valley_idx] = center_idx + 3 # no valley, set to half of 7
right_idx = (right_idx * sa_gausion_m).sum(dim=1) / (sa_sum + 1e-7)
right_idx = torch.round(right_idx) # or floor
return left_idx.int().cpu().numpy(), right_idx.int().cpu().numpy()
[docs]
def grid_xic_best(df_batch, ms1_centroid, ms2_centroid):
"""
For developing.
"""
from itertools import product
locus_start_v = df_batch["score_elute_span_left"].values
locus_end_v = df_batch["score_elute_span_right"].values
tol_ppm_v = [20.0, 16.0, 12.0, 8.0, 4.0]
tol_im_v = [0.03, 0.02, 0.01]
grid_params = list(product(tol_ppm_v, tol_im_v))
xics_v = []
expand_dim = 64
for search_i, (tol_ppm, tol_im) in enumerate(grid_params):
_, rts, _, _, xics = extract_xics(
df_batch,
ms1_centroid,
ms2_centroid,
im_tolerance=tol_im,
ppm_tolerance=tol_ppm,
cycle_num=13,
by_pred=False,
)
xics = xics.copy_to_host()
mask1 = np.arange(xics.shape[2]) >= locus_start_v[:, None, None]
mask2 = np.arange(xics.shape[2]) <= locus_end_v[:, None, None]
xics = xics * mask1 * mask2
rts, xics = utils.interp_xics(xics, rts, expand_dim)
xics = gpu_simple_smooth(cuda.to_device(xics))
xics = xics.copy_to_host()
# find best profile
if search_i == 0:
sas = np.array(list(map(utils.cross_cos, xics)))
sa_sum = sas.sum(axis=-1)
best_ion_idx = sa_sum.argmax(axis=-1)
best_profile = xics[np.arange(len(xics)), best_ion_idx]
# bad_xic = np.abs(best_profile.argmax(axis=-1) - expand_dim / 2) > 6
# boundary by best_profile
box = best_profile > best_profile.max(axis=-1, keepdims=True) * 0.2
left = box.argmax(axis=-1)
right = expand_dim - 1 - box[:, ::-1].argmax(axis=-1)
df_batch["integral_left"] = left
df_batch["integral_right"] = right
xics_v.append(xics)
xics = np.transpose(np.stack(xics_v), (1, 0, 2, 3))
# find other profile with the help of best_profile
ion_num = xics.shape[2]
best_profile = np.repeat(best_profile[:, None, :], len(grid_params), axis=1)
best_profile = np.repeat(best_profile[:, :, None, :], ion_num, axis=2)
dot_sum = (best_profile * xics).sum(axis=-1)
norm1 = np.linalg.norm(best_profile, axis=-1) + 1e-6
norm2 = np.linalg.norm(xics, axis=-1) + 1e-6
sas = dot_sum / (norm1 * norm2)
sas = sas.max(axis=1)
return sas
[docs]
def update_sa_by_grid(df, ms):
"""
For developing.
"""
df_good = []
for swath_id in df["swath_id"].unique():
df_swath = df[df["swath_id"] == swath_id]
df_swath = df_swath.reset_index(drop=True)
# ms
ms1_centroid, ms2_centroid = ms.copy_map_to_gpu(swath_id, centroid=True)
# in batches
batch_n = cfg.batch_xic_locus
for _, df_batch in df_swath.groupby(df_swath.index // batch_n):
df_batch = df_batch.reset_index(drop=True)
# grid search for best profiles
sas = grid_xic_best(df_batch, ms1_centroid, ms2_centroid)
cols_center = ["score_center_elution_" + str(i) for i in range(14)]
df_batch[cols_center] = sas
df_good.append(df_batch)
utils.release_gpu_scans(ms1_centroid, ms2_centroid)
df = pd.concat(df_good, axis=0, ignore_index=True)
return df