Source code for pairk.backend.tools.sequence_utils

import copy
import json
import os
import re
import subprocess
import sys
from pathlib import Path

import numpy as np
import pandas as pd
from Bio import Align, AlignIO, Seq, SeqIO
from Bio.SeqRecord import SeqRecord
from Bio.Align import MultipleSeqAlignment

# from Bio.Align import MultipleSeqAlignment


def percent_identity(seq1: SeqRecord, seq2: SeqRecord) -> float:
    """
    Returns a percent identity between 0 (no identical residues) and 1 (all residues are identical)
    - The sequences must be pre-aligned (i.e. they have the same length)
    - The returned percent identity is computed as the number of identical residues divided by the
    length of compared alignment sites. Alignment sites where both sequences are gaps ('-' characters)
    are not compared.

    Parameters
    ----------
    seq1 : str or seqrecord
        first sequence
    seq2 : str or seqrecord
        second sequence
    """
    assert len(seq1) == len(
        seq2
    ), "sequences are not the same length. Are they aligned?"
    num_same = 0
    length = len(seq1)
    for i in range(len(seq1)):
        if seq1[i] == "-" and seq2[i] == "-":
            length -= 1
            continue
        if seq1[i] == seq2[i]:
            num_same += 1
    pid = num_same / length
    return pid


def get_first_non_gap_index(s: str) -> int:
    """get the index of the first non-gap character (first character that is
    not `-`) in a string

    Parameters
    ----------
    s : str
        input string

    Returns
    -------
    int
        the index of the first non-gap character in the string (0-indexed)
    """
    index = 0
    while index < len(s) and s[index] == "-":
        index += 1
    return index


def sort_aln_by_pid2ref(aln: Align.MultipleSeqAlignment, refseq: SeqRecord):
    aln.sort(
        key=lambda record: percent_identity(record, refseq),
        reverse=True,
    )
    return aln


def subsample_seqrec_list(seqrec_list, fraction=0.1):
    """return a subsampled list of seqrecords"""
    n = int(len(seqrec_list) * fraction)
    interval = int(len(seqrec_list) / n)
    new_list = []
    count = 0
    for seq in seqrec_list:
        if count % interval == 0:
            new_list.append(seq)
        count += 1
    return new_list


def subsample_seqrec_list_target_num(
    seqrec_list, max_len=9999999999, target_num=10, skip_list=None
):
    """return a subsampled list of seqrecords"""
    interval = int(len(seqrec_list) / target_num)
    if skip_list is None:
        skip_list = []
    new_list = []
    count = 0
    for seq in seqrec_list:
        if count % interval == 0:
            if seq.id in skip_list:
                continue
            if len(seq.seq) > max_len:
                continue
            new_list.append(seq)
        count += 1
    return new_list


def pad_hit(seq: str, st_pos: int, end_pos: int, l_flank: int = 0, r_flank: int = 0):
    st = max(0, st_pos - l_flank)
    end = min(len(seq) - 1, end_pos + r_flank)
    return st, end, seq[st : end + 1]


def z_score(scores):
    scores = np.array(scores)
    return (scores - np.mean(scores)) / np.std(scores)


def z_score_comparison(scores, scores_ref):
    u = np.mean(scores_ref)
    s = np.std(scores_ref)
    if s == 0:
        print("standard deviation is 0")
    return [(i - u) / s for i in scores]


def calculate_z_score_bg_region(
    score_list,
    score_list_mask,
    bg_slicing_region=None,
    num_bg_scores_cutoff=20,
):
    """calculates the z-score of a score list compared to the background across a region of interest

    Uses the score_list_mask choose which positions to use for the background scores. I do not want to include extremely gappy regions in the background calculation, so I use the score_list_mask to choose which positions to use for the background calculation. The background is calculated as the mean and standard deviation of the background scores. The z-score is calculated as (score - mean) / std

    Parameters
    ----------
    score_list : list
        list of scores. Usually for the entire sequence
    score_list_mask : list
        boolean mask. Must be the same length as the `score_list`. This is primarally used to mask the positions used to calculate the background of the z-score
    bg_slicing_region : list
        region of interest to use as the background ([start_position, end_position]). Numbering must coorespond to the `score_list` and `score_list_mask`
    num_bg_scores_cutoff : int
        minimum number of background scores to calculate the z-score. If there are not enough background scores, then the z-score is not calculated

    Returns
    -------
    dictionary
        dictionary of the z-score, and the background scores (keys: "z_scores", "bg_scores")
    """
    score_list_mask = np.array(score_list_mask)
    if bg_slicing_region is not None:
        score_list_mask = score_list_mask.copy()
        # set mask to False for positions outside of the slicing region
        score_list_mask[: bg_slicing_region[0]] = False
        score_list_mask[bg_slicing_region[1] + 1 :] = False
    bg_scores = np.array(score_list)[score_list_mask]
    if len(bg_scores) < num_bg_scores_cutoff:
        raise ValueError(
            f"not enough background scores to calculate z-score. Require at least {num_bg_scores_cutoff} background scores. Only have {len(bg_scores)} background scores"
        )
    z_scores = z_score_comparison(score_list, bg_scores)
    return {"z_scores": list(z_scores), "bg_scores": list(bg_scores)}


def get_non_gap_indexes(aln_seq: str) -> list[int]:
    """get list of nongap positions in `aln_seq`"""
    return [c for c, i in enumerate(aln_seq) if i != "-"]


# ==============================================================================
# // sequence tools
# ==============================================================================
def str2seqrecord(seq_str, id=None, description=None):
    """convert a string to a SeqRecord object

    Parameters
    ----------
    seq_str : str
        sequence string
    id : str, optional
        id of the SeqRecord object, by default None
    description : str, optional
        description of the SeqRecord object, by default None

    Returns
    -------
    SeqRecord
        SeqRecord object with the sequence string
    """
    if description is None:
        description = ""
    return SeqIO.SeqRecord(Seq.Seq(seq_str), id=id, description=description)  # type: ignore


def pad_with_aas_or_gaps(seq: str, st: int, end: int, flank: int = 0) -> str:
    """
    slice a sequence at the specified positions
    flank the slice with the specified number of residues to the left and right
    if the flank would go out of bounds, slice to the end/beginning of the sequence and pad with dashes

    Parameters
    ----------
    seq : str
        sequence to be sliced
    st : int
        start position of the slice
    end : int
        end position of the slice
    flank : int, optional
        number of residues to flank the slice with, by default 0

    Returns
    -------
    str
        sliced sequence

    Examples
    --------
    >>> slice_seq('ABCDEF', 2, 4, flank=1)
    >>> s = '012FPpPp890'
    >>> st=10
    >>> end=7
    >>> flank=0
    >>> print(s[st:end+1])
    >>> for c, i in enumerate(slice_seq(s, st, end+1, flank)):
            print(c, i)
    >>> print(slice_seq(s, st, end+1, flank))

    """
    assert st <= end, "start must be less than or equal to end"
    assert flank >= 0, "flank must be positive"

    if st - flank < 0:
        left = "-" * abs(flank - st) + seq[: max(0, st)]
    else:
        left = seq[st - flank : st]
    if end + flank > len(seq):
        right = seq[max(0, st) : len(seq)] + "-" * (end + flank - len(seq))
    else:
        right = seq[max(0, st) : end + flank]
    # print(left)
    # print(right)
    return left + right


def gen_kmers(seq: str, k: int) -> list[str]:
    """
    generates list of length k "kmers" comprising `seq`

    Parameters
    ----------
    seq : str
        input sequence to split into k-mers. Should be entry in fasta file
    k : int
        length of fragments (k-mers)

    Returns
    -------
    kmers : list
        list of strings - k-mers generated from seq
    """
    k2 = k - 1
    kmers = []
    for i in range(len(seq) - k2):
        kmers.append(seq[i : i + k])
    return kmers


[docs] class FastaImporter: """import fasta file and return seqrecord objects in various formats Parameters ---------- fasta_path : str file path to fasta file """ def __init__(self, fasta_path: str | Path): self.fasta_path = fasta_path
[docs] def import_as_list(self) -> list[SeqRecord]: """return list of SeqRecord objects for each sequence in the fasta file Returns ------- List[SeqRecord] list of SeqRecord objects """ with open(self.fasta_path) as handle: return list(SeqIO.parse(handle, "fasta"))
[docs] def import_as_dict(self) -> dict[str, SeqRecord]: """return dictionary of SeqRecord objects for each sequence in the fasta file Returns ------- dict[str, SeqRecord] dictionary of SeqRecord objects, keys are the sequence ids and values are the SeqRecord objects """ with open(self.fasta_path) as handle: return SeqIO.to_dict(SeqIO.parse(handle, "fasta"))
[docs] def import_as_str_dict(self) -> dict[str, str]: """return dictionary of strings for each sequence in the fasta file Returns ------- dict[str, str] dictionary of sequence strings, keys are the sequence ids and values are the sequences as strings """ with open(self.fasta_path) as handle: d = SeqIO.to_dict(SeqIO.parse(handle, "fasta")) return {k: str(v.seq) for k, v in d.items()}
[docs] def import_as_alignment(self) -> Align.MultipleSeqAlignment: """return multiple sequence alignment object Returns ------- Align.MultipleSeqAlignment multiple sequence alignment object """ with open(self.fasta_path) as handle: return AlignIO.read(handle, "fasta")
def strip_dashes_from_str(seq_str): """remove `-` characters from a sequence string Parameters ---------- seq_str : str sequence string Returns ------- str sequence string with `-` characters removed """ return seq_str.replace("-", "") def strip_dashes_from_sequences(sequences: list[SeqRecord]) -> list[SeqRecord]: """remove `-` characters from sequences in a list of SeqRecord objects Parameters ---------- sequences : list list of SeqRecord objects Returns ------- list list of SeqRecord objects with `-` characters removed from sequences """ sequences = copy.deepcopy(sequences) for s in sequences: s.seq = Seq.Seq(strip_dashes_from_str(str(s.seq))) return sequences def write_fasta(sequences, output_filename): """save list of SeqRecord objects to a fasta file Parameters ---------- sequences : list list of SeqRecord objects output_filename : str file path to new output fasta file """ with open(output_filename, "w") as handle: SeqIO.write(sequences, handle, "fasta") def longest_common_substring(s1, s2): """find the largest substring shared by two strings""" m = [[0] * (1 + len(s2)) for i in range(1 + len(s1))] longest, x_longest = 0, 0 for x in range(1, 1 + len(s1)): for y in range(1, 1 + len(s2)): if s1[x - 1] == s2[y - 1]: m[x][y] = m[x - 1][y - 1] + 1 if m[x][y] > longest: longest = m[x][y] x_longest = x else: m[x][y] = 0 return s1[x_longest - longest : x_longest] # ============================================================================== # // alignment tools # ============================================================================== def reindex_alignment(record): """convert indexes of an ungapped sequence to indexes of it's sequence with gaps Parameters ---------- record : SeqRecord object biopython sequence record object with gaps present Returns ------- str ungapped sequence list list of indexes of nongap characters in the gapped sequence. index the return list with non-gap indexes to get the index of the gapped sequence. If the gapped sequence is `A--A--A`, the return list will be `[0, 3, 6]` """ s = str(record.seq) unal_seq = "" index_map = [] for al_pos, i in enumerate(s): # print(al_pos, i) if i != "-": unal_seq = unal_seq + i index_map.append(al_pos) return unal_seq, index_map def reindex_alignment_str(seq_str): """convert indexes of an ungapped sequence to indexes of it's sequence with gaps Parameters ---------- seq_str : str string of a sequence with gaps present Returns ------- str ungapped sequence list list of indexes of nongap characters in the gapped sequence. index the return list with non-gap indexes to get the index of the gapped sequence. If the gapped sequence is `A--A--A`, the return list will be `[0, 3, 6]` Examples -------- >>> aligned = 'A--A--A' >>> unaligned, ind = reindex_alignment_str(aligned) >>> print(unaligned) 'AAA' >>> print(ind) [0, 3, 6] >>> print(aligned[ind[0]:ind[-1]+1]) 'A--A--A' """ unal_seq = "" index_map = [] for al_pos, i in enumerate(seq_str): # print(al_pos, i) if i != "-": unal_seq = unal_seq + i index_map.append(al_pos) return unal_seq, index_map def find_alnslice_positions_in_unaln(aln: str, aln_start: int, aln_end: int): """Go from a slice of an aligned sequence, to the positions in the unaligned sequence that correspond to the slice. Parameters ---------- aln : str aligned sequence with gaps aln_start : int start position of the slice of the aligned sequence aln_end : int end position of the slice of the aligned sequence Returns ------- list list of positions of the slice in the unaligned sequence """ positions = [] unaln_index = 0 for i in range(len(aln)): if aln[i] != "-": if aln_start <= i < aln_end: positions.append(unaln_index) unaln_index += 1 return positions def aln_2_idr_position_map( aln: MultipleSeqAlignment, idr_aln_start: int, idr_aln_end: int ) -> dict[str, list[int]]: """from an input multiple sequence alignment, generate a dictionary of the start and end positions of the IDRs in the unaligned sequences. The IDRs are defined by single start/end positions in the alignment. In other words, the IDRs are defined by a single slice of the alignment. Parameters ---------- aln : MultipleSeqAlignment BioPython MultipleSeqAlignment object idr_aln_start : int start position of the IDRs in the alignment idr_aln_end : int end position of the IDRs in the alignment Returns ------- dict[str, list[int]] idr_position_map: dictionary of the start and end positions of the IDRs in the unaligned sequences. The keys are the sequence ids and the values are lists of the start and end positions of the IDRs in the unaligned sequences. The start and end positions are 0-indexed. """ idr_position_map = {} for seqrecord in list(aln): p = find_alnslice_positions_in_unaln( str(seqrecord.seq), idr_aln_start, idr_aln_end ) if len(p) == 0: idr_position_map[seqrecord.id] = [-1, -1] continue idr_position_map[seqrecord.id] = [p[0], p[-1]] return idr_position_map # t = '--LPPPP---PP------TEST--' # tun = tools.strip_dashes_from_str(t) # start = 6 # end = 13 # print(t[start:end]) # unal_pos =find_alnslice_positions_in_unaln(t, start, end) # print(unal_pos) # if len(unal_pos) == 0: # print('No positions') # else: # print(tun[unal_pos[0]:unal_pos[-1]+1]) # for i in unal_pos: # print(i, tun[i]) def find_subseq_in_aligned_seq_str(unaligned_subseq_str, aligned_seq_str): """ Find the position of unaligned_subsequence in aligned_seq_str return the positions of the subsequence in the aligned sequence indeces returns: al_start: start position of the subsequence in the aligned sequence al_end: end position of the subsequence in the aligned sequence aligned_seq_str[al_start:al_end]: the subsequence in the aligned sequence """ unaligned_seq_str, alignment_index_key = reindex_alignment_str(aligned_seq_str) if unaligned_subseq_str not in unaligned_seq_str: raise ValueError(f"{unaligned_subseq_str} not in `aligned_seq_str`") unal_start = unaligned_seq_str.find(unaligned_subseq_str) unal_end = unal_start + len(unaligned_subseq_str) - 1 al_start = alignment_index_key[unal_start] al_end = alignment_index_key[unal_end] return al_start, al_end, aligned_seq_str[al_start : al_end + 1] # ============================================================================== # // parse accession ids # ============================================================================== # def split_uniprot(prot_id): # j = re.compile(r"^[st][pr]\|(.+)\|(.+)") # prot = j.findall(prot_id)[0] # accession = prot[0] # name = prot[1] # return name, accession # def get_description(desc): # r = re.compile(r"^sp\|.+\|\w+ (.+) OS=.+") # return r.findall(desc)[0] # def convert_id_to_name(uniprot_id): # import requests # url = f"https://www.uniprot.org/uniprot/{uniprot_id}.xml" # response = requests.get(url) # if response.status_code == 200: # content = response.text # start = content.find("<name>") + len("<name>") # end = content.find("</name>") # uniprot_name = content[start:end].strip() # return uniprot_name # else: # return "not_found" # ============================================================================== # // regex utilities # ============================================================================== def regex2overlapping(regex): new_regex = r"(?=(" + regex + "))" return new_regex def get_regex_matches(regex_pattern: str, seq_str: str): """searches for all matches of a regex pattern in a sequence string returns a generator object that yields the match sequence, start index, and end index Parameters ---------- regex_pattern : str regular expression pattern seq_str : str string to search for matches Yields ------ tuple (match sequence, start index, end index) """ p = re.compile(regex_pattern) for m in p.finditer(seq_str): if m.start() == m.end(): # even if there are groups in the lookahead, the first group should be the full match b/c that group surrounds the entire regex # so this will work whether or not there are groups in the lookahead match_seq = m.groups()[0] else: match_seq = seq_str[m.start() : m.end()] yield match_seq, m.start(), m.start() + len(match_seq) - 1