import json
from pathlib import Path
import numpy as np
import pandas as pd
import pairk.backend.tools.sequence_utils as tools
import pairk.backend.tools.matrices as matrices
import pairk.backend.tools.pairwise_tools as pairwise_tools
import pairk.backend.exceptions as _exceptions
import copy
def score_alignment(
seq1: str, seq2: str, substitution_matrix_dict: dict[str, dict[str, float | int]]
) -> float:
assert len(seq1) == len(seq2)
score = 0
for s1, s2 in zip(seq1, seq2):
score += float(substitution_matrix_dict[s1][s2]) # type: ignore
return score
def score_kmer_2_seq_dict_dict(
kmer: str,
sequence: str,
substitution_matrix_dict: dict[str, dict[str, float | int]],
):
if len(sequence) < len(kmer):
raise ValueError("The sequence is shorter than the kmer")
scores = []
for i in range(len(sequence) - len(kmer) + 1):
subseq = sequence[i : i + len(kmer)]
score = score_alignment(kmer, subseq, substitution_matrix_dict)
scores.append(score)
return scores
def best_from_scores(sequence: str, scores: list[float], k):
best_match_pos = np.argmax(scores)
bestscore = scores[best_match_pos]
best_subseq = sequence[best_match_pos : best_match_pos + k]
return bestscore, best_subseq, int(best_match_pos)
def run_pairwise_kmer_alignment(
query_idr: str,
ortholog_idrs: dict[str, str],
k: int,
score_matrix_dict: dict[str, dict[str, float | int]],
):
kmers = tools.gen_kmers(query_idr, k)
positions = list(range(len(kmers)))
score_df = pairwise_tools.make_empty_kmer_ortho_df(
positions, list(ortholog_idrs.keys())
)
orthokmer_df = pairwise_tools.make_empty_kmer_ortho_df(
positions, list(ortholog_idrs.keys())
)
pos_df = pairwise_tools.make_empty_kmer_ortho_df(
positions, list(ortholog_idrs.keys())
)
score_df.loc[positions, "query_kmer"] = kmers
orthokmer_df.loc[positions, "query_kmer"] = kmers
pos_df.loc[positions, "query_kmer"] = kmers
for position, kmer in zip(positions, kmers):
for ortholog_id, ortholog_idr in ortholog_idrs.items():
if len(ortholog_idr) < k:
orthokmer_df.loc[position, ortholog_id] = "-" * k
continue
query_k_scores = score_kmer_2_seq_dict_dict(
kmer, ortholog_idr, score_matrix_dict
)
best_score, best_subseq, best_pos = best_from_scores(
ortholog_idr, query_k_scores, k
)
score_df.loc[position, ortholog_id] = best_score
orthokmer_df.loc[position, ortholog_id] = best_subseq
pos_df.loc[position, ortholog_id] = best_pos
return score_df, orthokmer_df, pos_df
[docs]
def pairk_alignment(
idr_dict: dict[str, str],
query_id: str,
k: int,
matrix_name: str = "EDSSMat50",
) -> pairwise_tools.PairkAln:
"""run pairwise k-mer alignment method using an exhaustive comparison of k-mers.
Each query k-mer is scored against each ortholog k-mer to find the best matching
ortholog k-mer in each ortholog. If an ortholog IDR is shorter than the k-mer, a
string of "-" characters ("-"\\*k) is assigned as the best matching ortholog k-mer for that
ortholog.
**Note**: if there are multiple top-scoring matches, only one is returned.
Parameters
----------
idr_dict : dict[str, str]
input sequences in dictionary format with the key being the sequence id and
the value being the sequence as a string
query_id : str
the id of the query sequence within the `idr_dict` dictionary
k : int
the length of the k-mers to use for the alignment
matrix_name : str, optional
The name of the scoring matrix to use in the algorithm, by default "EDSSMat50".
The available matrices can be viewed with the function `pairk.print_available_matrices()`.
Returns
-------
pairwise_tools.PairkAln
an object containing the alignment results. See the `pairk.PairkAln` class for more information.
"""
_exceptions.validate_matrix_name(matrix_name)
_exceptions.check_queryid_in_idr_dict(idr_dict, query_id)
scoremat_df = matrices.load_matrix_as_df(matrix_name)
scoremat_dict = matrices.matrix_df_to_dict(scoremat_df)
idr_str_dict = copy.deepcopy(idr_dict)
_exceptions.check_sequence_characters_dict(idr_str_dict, matrix_name)
query_idr = idr_str_dict.pop(query_id)
score_df, orthokmer_df, pos_df = run_pairwise_kmer_alignment(
query_idr, idr_str_dict, k, score_matrix_dict=scoremat_dict
)
return pairwise_tools.PairkAln(
orthokmer_df=orthokmer_df,
pos_df=pos_df,
score_df=score_df,
)
[docs]
def pairk_alignment_single_kmer(
kmer: str,
ortholog_idrs: dict[str, str],
matrix_name: str = "EDSSMat50",
):
"""
Align a single kmer to a dictionary of sequences in a pairwise manner (i.e.
align the kmer to each sequence one-by-one (with no gaps). Returns the score,
subsequence, and position of the best subsequence-kmer match for each
sequence in the input dictionary. If an ortholog IDR is shorter than the k-mer, a
string of "-" characters ("-"\\*k) is assigned as the best matching ortholog k-mer for that
ortholog.
**Note**: if there are multiple top-scoring matches, only one is returned.
Parameters
----------
kmer : str
the kmer to align
ortholog_idrs : dict[str, str]
a dictionary of sequences to align the kmer to, with the key being the
sequence id and the value being the sequence as a string
matrix_name : str, optional
The name of the scoring matrix to use in the algorithm, by default "EDSSMat50".
The available matrices can be viewed with the function `pairk.print_available_matrices()`.
Returns
-------
dict[str, float], dict[str, str], dict[str, int]
3 dictionaries containing the scores, subsequences, and positions of
the best subsequence-kmer matches for each sequence in the input
`ortholog_idrs` dictionary. Dictionary keys are the sequence ids.
"""
_exceptions.validate_matrix_name(matrix_name)
# I don't like how innefficient this is, but I find the normal error for
# invalid characters to be confusing
check_dict = copy.deepcopy(ortholog_idrs)
check_dict["kmer"] = kmer
_exceptions.check_sequence_characters_dict(check_dict, matrix_name)
scoremat_df = matrices.load_matrix_as_df(matrix_name)
scoremat_dict = matrices.matrix_df_to_dict(scoremat_df)
best_scores = {}
best_subseqs = {}
best_positions = {}
for ortholog_id, ortholog_idr in ortholog_idrs.items():
if len(ortholog_idr) < len(kmer):
best_subseqs[ortholog_id] = "-" * len(kmer)
continue
query_k_scores = score_kmer_2_seq_dict_dict(kmer, ortholog_idr, scoremat_dict)
best_score, best_subseq, best_pos = best_from_scores(
ortholog_idr, query_k_scores, len(kmer)
)
if query_k_scores.count(best_score) > 1:
print(f"Warning: multiple best scores found for {ortholog_id}")
best_scores[ortholog_id] = best_score
best_subseqs[ortholog_id] = best_subseq
best_positions[ortholog_id] = best_pos
return best_scores, best_subseqs, best_positions
# def reciprocal_best_match(query_idr, query_kmer, ortho_kmers, matrix_name):
# """
# Calculate the reciprocal best match (RBM) of a query kmer to a set of ortholog kmers.
# The RBM is calculated by aligning the best scoring ortholog kmer to the query sequence and checking if the best scoring query kmer is the same as the original query kmer.
# Parameters
# ----------
# query_idr : str
# the query sequence
# query_kmer : str
# the query kmer
# ortho_kmers : dict[str, str]
# a dictionary of ortholog kmers with the key being the ortholog id and the value being the ortholog kmer that originally aligned to the query kmer
# matrix_name : str
# the name of the scoring matrix to use in the alignment
# Returns
# -------
# dict[str, bool]
# a dictionary of boolean values indicating whether the query kmer ortholog kmer match is reciprocal
# """
# _exceptions.validate_matrix_name(matrix_name)
# _exceptions.check_sequence_characters_list(
# [query_idr] + list(ortho_kmers.values()), matrix_name
# )
# scoremat_df = matrices.load_matrix_as_df(matrix_name)
# scoremat_dict = matrices.matrix_df_to_dict(scoremat_df)
# rbm_dict = {}
# for ortholog_id, ortholog_kmer in ortho_kmers.items():
# try:
# rec_scores = score_kmer_2_seq_dict_dict(
# ortholog_kmer, query_idr, scoremat_dict
# )
# reci_best_score, reci_best_subseq, _ = best_from_scores(
# query_idr, rec_scores, len(query_kmer)
# )
# except ValueError as e:
# print(e)
# continue
# except KeyError as e:
# print(e)
# continue
# rbm_dict[ortholog_id] = reci_best_subseq == query_kmer
# return rbm_dict