Source code for neurosnap.msa

"""
Provides functions and classes related to processing protein sequence data.
"""

import io
import os
import re
import shutil
import subprocess
import tarfile
import tempfile
import time
from collections import Counter
from typing import Union, List, Dict, Tuple, Optional

import requests
from Bio import SearchIO

from neurosnap.api import USER_AGENT
from neurosnap.log import logger
from neurosnap.protein import STANDARD_AAs

### CONSTANTS ###
MMSEQS2_CITATION = """The MMseqs2 webserver used to generate this MSA is offered as a free service. Please support the authors in maintaining this free resource by citing them appropriately as follows:
@article{Mirdita2019,
  title        = {{MMseqs2 desktop and local web server app for fast, interactive sequence searches}},
  author       = {Mirdita, Milot and Steinegger, Martin and S{"{o}}ding, Johannes},
  year         = 2019,
  journal      = {Bioinformatics},
  volume       = 35,
  number       = 16,
  pages        = {2856--2858},
  doi          = {10.1093/bioinformatics/bty1057},
  pmid         = 30615063,
  comment      = {MMseqs2 search server}
}
@article{Mirdita2017,
  title        = {{Uniclust databases of clustered and deeply annotated protein sequences and alignments}},
  author       = {Mirdita, Milot and von den Driesch, Lars and Galiez, Clovis and Martin, Maria J. and S{"{o}}ding, Johannes and Steinegger, Martin},
  year         = 2017,
  journal      = {Nucleic Acids Res.},
  volume       = 45,
  number       = {D1},
  pages        = {D170--D176},
  doi          = {10.1093/nar/gkw1081},
  pmid         = 27899574,
  comment      = {Uniclust30/UniRef30 database}
}
@article{Mitchell2019,
  title        = {{MGnify: the microbiome analysis resource in 2020}},
  author       = {Mitchell, Alex L and Almeida, Alexandre and Beracochea, Martin and Boland, Miguel and Burgin, Josephine and Cochrane, Guy and Crusoe, Michael R and Kale, Varsha and Potter, Simon C and Richardson, Lorna J and Sakharova, Ekaterina and Scheremetjew, Maxim and Korobeynikov, Anton and Shlemov, Alex and Kunyavskaya, Olga and Lapidus, Alla and Finn, Robert D},
  year         = 2019,
  journal      = {Nucleic Acids Res.},
  doi          = {10.1093/nar/gkz1035},
  comment      = {MGnify database}
}
@article{Mirdita2022,
  title        = {{ColabFold: making protein folding accessible to all}},
  author       = {Mirdita, Milot and Sch{\"u}tze, Konstantin and Moriwaki, Yoshitaka and Heo, Lim and Ovchinnikov, Sergey and Steinegger, Martin},
  year         = 2022,
  journal      = {Nature Methods},
  doi          = {10.1038/s41592-022-01488-1},
  comment      = {ColabFold API}
}
"""


### FUNCTIONS ###
[docs] def read_msa( input_fasta: Union[str, io.TextIOBase], size: float = float("inf"), allow_chars: str = "", drop_chars: str = "", remove_chars: str = "*", uppercase: bool = True, ) -> Tuple[List[str], List[str]]: """Reads an MSA, a3m, or fasta file and returns an array of names and seqs. Returned headers will consist of all characters up until the first space with the "|" character replaced with an underscore. Parameters: input_fasta: Path to read input a3m file, fasta as a raw string, or a file-handle like object to read size: Number of rows to read allow_chars: Sequences that contain characters not included within STANDARD_AAs+allow_chars will throw an exception drop_chars: Drop sequences that contain these characters. For example, ``"-X"`` remove_chars: Removes these characters from sequences. For example, ``"*-X"`` uppercase: Converts all amino acid chars to uppercase when True Returns: A tuple of the form ``(names, seqs)`` - ``names``: list of protein names from the a3m file, including gaps - ``seqs``: list of protein sequences from the a3m file, including gaps """ names = [] seqs = [] allow_chars = allow_chars.replace("-", "\\-") drop_chars = drop_chars.replace("-", "\\-") remove_chars = remove_chars.replace("-", "\\-") if isinstance(input_fasta, str): if os.path.exists(input_fasta): f = open(input_fasta) else: f = io.StringIO(input_fasta) elif isinstance(input_fasta, io.TextIOBase): f = input_fasta else: raise ValueError(f"Invalid input for input_fasta, {type(input_fasta)} is not a valid type.") for i, line in enumerate(f, start=1): line = line.strip() if line: if line.startswith(">"): if seqs and seqs[-1] == "": raise ValueError(f"Invalid MSA/fasta. Header {names[-1]} is missing a sequence.") if len(seqs) >= size + 1: break match = re.search(r"^>([\w_-\|]*)", line) assert match is not None, f"Invalid MSA/fasta. {line} is not a valid header." name = match.group(1) name = name.replace("|", "_") assert len(name), f"Invalid MSA/fasta. line {i} has an empty header." names.append(name) seqs.append("") else: if uppercase: line = line.upper() # remove whitespace and remove_chars if remove_chars: line = re.sub(f"[{remove_chars}\\s]", "", line) # drop chars if drop_chars: match = re.search(f"[{drop_chars}]", line) if match is not None: names.pop() seqs.pop() continue match = re.search(f"^[{STANDARD_AAs+allow_chars}]*$", line) if match is None: raise ValueError( f"Sequence on line {i} contains an invalid character. Please specify whether you would like drop or replace characters in sequences like these. Sequence='{line}'" ) seqs[-1] += line f.close() assert len(names) == len(seqs), "Invalid MSA/fasta. The number sequences and headers found do not match." return names, seqs
[docs] def write_msa(output_path: str, names: List[str], seqs: List[str]): """Writes an MSA, a3m, or fasta to a file. Makes no assumptions about the validity of names or sequences. Will throw an exception if ``len(names) != len(seqs)`` Parameters: output_path: Path to output file to write, will overwrite existing files names: List of proteins names from the file seqs: List of proteins sequences from the file """ assert len(names) == len(seqs), "The number of names and sequences do not match." with open(output_path, "w") as f: for name, seq in zip(names, seqs): f.write(f">{name}\n{seq}\n")
[docs] def pad_seqs(seqs: List[str], char: str = "-", truncate: Union[bool, int] = False) -> List[str]: """Pads all sequences to the longest sequences length using a character from the right side. Parameters: seqs: List of sequences to pad chars: The character to perform the padding with, default is "-" truncate: When set to True will truncate all sequences to the length of the first, set to integer to truncate sequence to that length Returns: The padded sequences """ if truncate is True: longest_seq = len(seqs[0]) elif type(truncate) is int: assert truncate >= 1, "truncate must be either a boolean value or an integer greater than or equal to 1." longest_seq = truncate else: longest_seq = max(len(x) for x in seqs) for i, seq in enumerate(seqs): seqs[i] = seq.ljust(longest_seq, "-") seqs[i] = seqs[i][:longest_seq] return seqs
[docs] def get_seqid(seq1: str, seq2: str) -> float: """Calculate the pairwise sequence identity of two same length sequences or alignments. Will not perform any alignment steps. Parameters: seq1: The 1st sequence / aligned sequence. seq2: The 2nd sequence / aligned sequence. Returns: The pairwise sequence identity, 0 means no matches found, 100 means sequences were identical. """ assert len(seq1) == len(seq2), "Sequences are not the same length." num_matches = 0 for a, b in zip(seq1, seq2): if a == b: num_matches += 1 return 100 * num_matches / len(seq1)
[docs] def run_phmmer(query: str, database: str, evalue: float = 10.0, cpu: int = 2) -> List[str]: """Run phmmer using a query sequence against a database and return all the sequences that are considered as hits. Shamelessly stolen and adapted from https://github.com/seanrjohnson/protein_gibbs_sampler/blob/a5de349d5f6a474407fc0f19cecf39a0447a20a6/src/pgen/utils.py#L263 Parameters: query: Amino acid sequence of the protein you want to find hits for database: Path to reference database of sequences you want to search for hits and create and alignment with, must be a protein fasta file evalue: The threshold E value for the phmmer hit to be reported cpu: The number of CPU cores to be used to run phmmer Returns: List of hits ranked by how good the hits are """ assert shutil.which("phmmer") is not None, "Cannot find phmmer. Please ensure phmmer is installed and added to your PATH." # Create a fasta file containing the query protein sequence. The fasta file name is based on input genbank file name with tempfile.TemporaryDirectory() as tmp: queryfa_path = os.path.join(tmp, "query.fa") with open(queryfa_path, "w") as tmpfasta: print(f">QUERY\n{query}", file=tmpfasta) search_args = ["phmmer", "--noali", "--notextw", "--cpu", str(cpu), "-E", str(evalue), queryfa_path, database] out = subprocess.run(search_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") if out.returncode != 0: raise Exception(f"Error in hmmer execution: \n{out.stdout}\n{out.stderr}") hits = SearchIO.read(io.StringIO(out.stdout), "hmmer3-text") hit_names = [x.id for x in hits] return hit_names
[docs] def align_mafft(seqs: Union[str, List[str], Dict[str, str]], ep: float = 0.0, op: float = 1.53) -> Tuple[List[str], List[str]]: """Generates an alignment using mafft. Parameters: seqs: Can be: - fasta file path, - list of sequences, or - dictionary where values are AA sequences and keys are their corresponding names/IDs ep: ep value for mafft, default is 0.00 op: op value for mafft, default is 1.53 Returns: A tuple of the form ``(out_names, out_seqs)`` - ``out_names``: list of aligned protein names - ``out_seqs``: list of corresponding protein sequences """ # check if mafft is actually present assert ( shutil.which("mafft") is not None ), "Cannot create alignment without mafft being installed. Please install mafft either using a package manager or from https://mafft.cbrc.jp/alignment/software/" with tempfile.TemporaryDirectory() as tmp_dir: tmp_fasta_path = f"{tmp_dir}/tmp.fasta" if isinstance(seqs, str): tmp_fasta_path = seqs elif isinstance(seqs, list): with open(tmp_fasta_path, "w") as f: for i, seq in enumerate(seqs): f.write(f">seq_{i}\n{seq}\n") elif isinstance(seqs, dict): with open(tmp_fasta_path, "w") as f: for name, seq in seqs.items(): f.write(f">{name}\n{seq}\n") else: raise ValueError( f"Input seqs cannot be of type {type(seqs)}. Can be fasta file path, list of sequences, or dictionary where values are AA sequences and keys are their corresponding names/IDs." ) # logger.info(f"[*] Generating alignment with {len(seqs)} using mafft.") align_out = subprocess.run( ["mafft", "--thread", "8", "--maxiterate", "1000", "--globalpair", "--ep", str(ep), "--op", str(op), tmp_fasta_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: align_out.check_returncode() except: # noqa: E722 logger.error(align_out.stderr) raise Exception() return read_msa(io.StringIO(align_out.stdout.decode("utf-8")), allow_chars="-")
[docs] def run_phmmer_mafft(query: str, ref_db_path: str, size: int = float("inf"), in_name: str = "input_sequence") -> Tuple[List[str], List[str]]: """Generate MSA using phmmer and mafft from reference sequences. Parameters: query: Amino acid sequence of the protein whose MSA you want to create ref_db_path: Path to reference database of sequences with which you want to search for hits and create and alignment size: Top n number of sequences to keep in_name: Optional name for input sequence to put in the output Returns: A tuple of the form ``(out_names, out_seqs)`` - ``out_names``: list of aligned protein names - ``out_seqs``: list of corresponding protein sequences """ with tempfile.TemporaryDirectory() as tmp_dir: tmp_fasta_path = f"{tmp_dir}/tmp.fasta" # clean input fasta names, seqs = read_msa(ref_db_path, remove_chars="*-", drop_chars="X") # ensure no duplicate IDs reference_seqs = {} for i, (name, seq) in enumerate(zip(names, seqs)): if name not in seq: reference_seqs[name] = seq else: reference_seqs[f"{name}_{i}"] = seq # write cleaned fasta write_msa(tmp_fasta_path, reference_seqs.keys(), reference_seqs.values()) # find hits hits = run_phmmer(query, tmp_fasta_path) logger.info(f"Found {len(hits)}/{len(names)} in reference DB for query.") unaligned_seqs = {in_name: query} # keep target sequence at the top found_names = set(in_name) found_seqs = set(query) for i in range(min(size, len(hits) - 1)): hit_name = hits[i] hit_seq = reference_seqs[hit_name] if hit_name not in found_names and hit_seq not in found_seqs: found_names.add(hit_name) found_seqs.add(hit_seq) unaligned_seqs[hit_name] = hit_seq # generate alignment and return return align_mafft(unaligned_seqs)
[docs] def run_mmseqs2( seqs: str, output: str, database: str = "mmseqs2_uniref_env", use_filter: bool = True, use_templates: bool = False, pairing: Optional[str] = None, print_citations: bool = True, ) -> Tuple[List[str], Optional[List[str]]]: """Generate an a3m MSA using the ColabFold API. Will write all results to the output directory including templates, MSAs, and accompanying files. Code originally adapted from: https://github.com/sokrypton/ColabFold/ Parameters: seqs: Amino acid sequences for protein to generate an MSA of output: Output directory path, will overwrite existing results database: Choose the database to use, must be either "mmseqs2_uniref_env" or "mmseqs2_uniref" use_filter: Enables the diversity and msa filtering steps that ensures the MSA will not become enormously large (described in manuscript methods section of ColabFold paper) use_templates: Download templates as well using the mmseqs2 results pairing: Can be set to either "greedy", "complete", or None for no pairing print_citations: Prints citations Returns: - ``a3m_lines``: list of a3m lines - ``template_paths``: list of template paths """ if print_citations: print(MMSEQS2_CITATION) # API settings host_url = "https://api.colabfold.com" submission_endpoint = "ticket/pair" if pairing else "ticket/msa" headers = {} headers["User-Agent"] = USER_AGENT timeout = 6.02 # set the mode assert database in ["mmseqs2_uniref_env", "mmseqs2_uniref"], 'database must be either "mmseqs2_uniref_env" or "mmseqs2_uniref"' if use_filter: mode = "env" if database == "mmseqs2_uniref_env" else "all" else: mode = "env-nofilter" if database == "mmseqs2_uniref_env" else "nofilter" if pairing: use_templates = False # greedy is default, complete was the previous behavior assert pairing in ["greedy", "complete"], 'pairing must be either "greedy", "complete", or None' if pairing == "greedy": mode = "pairgreedy" elif pairing == "complete": mode = "paircomplete" # define path if os.path.isdir(output): shutil.rmtree(output) os.mkdir(output) temp_dir = tempfile.mkdtemp() def submit(seqs, mode, N=101): n, query = N, "" for seq in seqs: query += f">{n}\n{seq}\n" n += 1 while True: error_count = 0 try: r = requests.post(f"{host_url}/{submission_endpoint}", data={"q": query, "mode": mode}, timeout=timeout, headers=headers) except requests.exceptions.Timeout: print("Timeout while submitting to MSA server. Retrying...") continue except Exception as e: error_count += 1 print(f"Error while fetching result from MSA server. Retrying... ({error_count}/5)") print(f"Error: {e}") time.sleep(timeout) if error_count > 5: raise continue break try: out = r.json() except ValueError: print(f"Server didn't reply with json: {r.text}") out = {"status": "ERROR"} return out def status(ID): while True: error_count = 0 try: r = requests.get(f"{host_url}/ticket/{ID}", timeout=timeout, headers=headers) except requests.exceptions.Timeout: print("Timeout while fetching status from MSA server. Retrying...") continue except Exception as e: error_count += 1 print(f"Error while fetching result from MSA server. Retrying... ({error_count}/5)") print(f"Error: {e}") time.sleep(timeout) if error_count > 5: raise continue break try: out = r.json() except ValueError: print(f"Server didn't reply with json: {r.text}") out = {"status": "ERROR"} return out ## call mmseqs2 api # Resubmit job until it goes through seqs = [seqs] if isinstance(seqs, str) else seqs seqs_unique = [] [seqs_unique.append(x) for x in seqs if x not in seqs_unique] Ms = [101 + seqs_unique.index(seq) for seq in seqs] out = submit(seqs_unique, mode) while out["status"] in ["UNKNOWN", "RATELIMIT"]: print(f"Sleeping for {timeout}s. Reason: {out['status']}") # resubmit time.sleep(timeout) out = submit(seqs_unique, mode) if out["status"] == "ERROR": raise Exception( "MMseqs2 API is giving errors. Please confirm your input is a valid protein sequence. If error persists, please try again an hour later." ) if out["status"] == "MAINTENANCE": raise Exception("MMseqs2 API is undergoing maintenance. Please try again in a few minutes.") # wait for job to finish ID = out["id"] while out["status"] in ["UNKNOWN", "RUNNING", "PENDING"]: print(f"Sleeping for {timeout}s. Reason: {out['status']}") time.sleep(timeout) out = status(ID) if out["status"] == "ERROR" or out["status"] != "COMPLETE": print(out) raise Exception( "MMseqs2 API is giving errors. Please confirm your input is a valid protein sequence. If error persists, please try again an hour later." ) # Download results error_count = 0 while True: try: r = requests.get(f"{host_url}/result/download/{ID}", stream=True, timeout=timeout, headers=headers) except requests.exceptions.Timeout: print("Timeout while fetching result from MSA server. Retrying...") continue except Exception as e: error_count += 1 print(f"Error while fetching result from MSA server. Retrying... ({error_count}/5)") print(f"Error: {e}") time.sleep(timeout) if error_count > 5: raise continue break # extract files with tarfile.open(fileobj=r.raw, mode="r|gz") as tar: tar.extractall(path=temp_dir) for file in ["uniref.a3m", "bfd.mgnify30.metaeuk30.smag30.a3m", "pair.a3m", "pdb70.m8"]: src_path = os.path.join(temp_dir, file) if os.path.exists(src_path): shutil.move(src_path, os.path.join(output, file)) shutil.rmtree(temp_dir) if pairing: a3m_files = [f"{output}/pair.a3m"] else: a3m_files = [f"{output}/uniref.a3m"] if mode == "env": a3m_files.append(f"{output}/bfd.mgnify30.metaeuk30.smag30.a3m") # gather a3m lines a3m_lines = {} for a3m_file in a3m_files: update_M, M = True, None for line in open(a3m_file, "r"): if len(line) > 0: if "\x00" in line: line = line.replace("\x00", "") update_M = True if line.startswith(">") and update_M: M = int(line[1:].rstrip()) update_M = False if M not in a3m_lines: a3m_lines[M] = [] a3m_lines[M].append(line) a3m_lines = ["".join(a3m_lines[n]) for n in Ms] # remove null bytes from all files including pair files for fname in os.listdir(output): if fname in ["uniref.a3m", "bfd.mgnify30.metaeuk30.smag30.a3m", "pair.a3m"]: with open(f"{output}/{fname}", "r") as fin: with open(f"{output}/{fname}.tmp", "w") as fout: for line in fin: fout.write(line.replace("\x00", "")) shutil.move(f"{output}/{fname}.tmp", f"{output}/{fname}") if pairing is None: # Concatenate to create combined file with open(f"{output}/combined.a3m", "w") as fout: with open(f"{output}/uniref.a3m") as f: for line in f: fout.write(line) with open(f"{output}/bfd.mgnify30.metaeuk30.smag30.a3m") as f: # skip first two lines f.readline() f.readline() for line in f: fout.write(line) # templates if use_templates: templates = {} # print("seq\tpdb\tcid\tevalue") for line in open(f"{output}/pdb70.m8", "r"): p = line.rstrip().split() M, pdb, qid, e_value = p[0], p[1], p[2], p[10] M = int(M) if M not in templates: templates[M] = [] templates[M].append(pdb) # if len(templates[M]) <= 20: # print(f"{int(M)-N}\t{pdb}\t{qid}\t{e_value}") template_paths = {} for k, TMPL in templates.items(): TMPL_PATH = f"{output}/templates_{k}" if not os.path.isdir(TMPL_PATH): os.mkdir(TMPL_PATH) TMPL_LINE = ",".join(TMPL[:20]) response = None while True: error_count = 0 try: # https://requests.readthedocs.io/en/latest/user/advanced/#advanced # "good practice to set connect timeouts to slightly larger than a multiple of 3" response = requests.get(f"{host_url}/template/{TMPL_LINE}", stream=True, timeout=6.02, headers=headers) except requests.exceptions.Timeout: logger.warning("Timeout while submitting to template server. Retrying...") continue except Exception as e: error_count += 1 logger.warning(f"Error while fetching result from template server. Retrying... ({error_count}/5)") logger.warning(f"Error: {e}") time.sleep(5) if error_count > 5: raise continue break with tarfile.open(fileobj=response.raw, mode="r|gz") as tar: tar.extractall(path=TMPL_PATH, filter="data") os.symlink("pdb70_a3m.ffindex", f"{TMPL_PATH}/pdb70_cs219.ffindex") with open(f"{TMPL_PATH}/pdb70_cs219.ffdata", "w") as f: f.write("") template_paths[k] = TMPL_PATH template_paths_ = [] for n in Ms: if n not in template_paths: template_paths_.append(None) # print(f"{n-N}\tno_templates_found") else: template_paths_.append(template_paths[n]) template_paths = template_paths_ if use_templates: return (a3m_lines, template_paths) else: return (a3m_lines, None)
[docs] def run_mmseqs2_modes( seq: Union[str, List[str]], output: str, cov: int = 50, id: int = 90, max_msa: int = 2048, mode: str = "unpaired_paired", print_citations: bool = True, ): """ Generate a multiple sequence alignment (MSA) for the given sequence(s) using Colabfold's API. Key difference between this function and run_mmseqs2 is that this function supports different modes. The final a3m and most useful a3m file will be written as "output/final.a3m". Code originally adapted from: https://github.com/sokrypton/ColabFold/ Parameters: seq: Sequence(s) to generate the MSA for. If a list of sequences is provided, they will be considered as a single protein for the MSA. output: Output directory path, will overwrite existing results. cov: Coverage of the MSA id: Identity threshold for the MSA max_msa: Maximum number of sequences in the MSA mode: Mode to run the MSA generation in. Must be in ``["unpaired", "paired", "unpaired_paired"]`` print_citations: Whether to print the citations in the output. """ if print_citations: print(MMSEQS2_CITATION) # Check if HH-suite is installed and available hhfilter_path = shutil.which("hhfilter") assert hhfilter_path is not None, ( "HH-suite not found. Please ensure it is installed and available in your PATH. " "For installation instructions, visit: https://github.com/soedinglab/hh-suite" ) # Validate the mode assert mode in ["unpaired", "paired", "unpaired_paired"], "Invalid mode" seqs = [seq] if isinstance(seq, str) else seq # Collapse homooligomeric sequences counts = Counter(seqs) u_seqs = list(counts.keys()) u_nums = list(counts.values()) # Expand homooligomeric sequences first_seq = "/".join(sum([[x] * n for x, n in zip(u_seqs, u_nums)], [])) msa = [first_seq] # Handle paired MSA if applicable if mode in ["paired", "unpaired_paired"] and len(u_seqs) > 1: print("Getting paired MSA") out_paired, _ = run_mmseqs2(u_seqs, output, pairing="greedy", print_citations=False) headers, sequences = [], [] for a3m_lines in out_paired: n = -1 for line in a3m_lines.split("\n"): if len(line) > 0: if line.startswith(">"): n += 1 if len(headers) < (n + 1): headers.append([]) sequences.append([]) headers[n].append(line) else: sequences[n].append(line) # Filter MSA paired_in_fpath = os.path.join(output, "paired_in.a3m") paired_out_fpath = os.path.join(output, "paired_out.a3m") with open(paired_in_fpath, "w") as f: for n, sequence in enumerate(sequences): f.write(f">n{n}\n{''.join(sequence)}\n") os.system(f"hhfilter -i {paired_in_fpath} -id {id} -cov {cov} -o {paired_out_fpath}") with open(paired_out_fpath, "r") as f: for line in f: if line.startswith(">"): n = int(line[2:]) xs = sequences[n] # Expand homooligomeric sequences xs = ["/".join([x] * num) for x, num in zip(xs, u_nums)] msa.append("/".join(xs)) # Handle unpaired MSA if applicable if len(msa) < max_msa and (mode in ["unpaired", "unpaired_paired"] or len(u_seqs) == 1): print("Getting unpaired MSA") out, _ = run_mmseqs2(u_seqs, output, pairing=None, print_citations=False) Ls = [len(seq) for seq in u_seqs] sub_idx = [] sub_msa = [] sub_msa_num = 0 for n, a3m_lines in enumerate(out): sub_msa.append([]) in_fpath = os.path.join(output, f"in_{n}.a3m") out_fpath = os.path.join(output, f"out_{n}.a3m") with open(in_fpath, "w") as f: f.write(a3m_lines) # Filter os.system(f"hhfilter -i {in_fpath} -id {id} -cov {cov} -o {out_fpath}") with open(out_fpath, "r") as f: for line in f: if not line.startswith(">"): xs = ["-" * l for l in Ls] xs[n] = line.rstrip() # Expand homooligomeric sequences xs = ["/".join([x] * num) for x, num in zip(xs, u_nums)] sub_msa[-1].append("/".join(xs)) sub_msa_num += 1 sub_idx.append(list(range(len(sub_msa[-1])))) while len(msa) < max_msa and sub_msa_num > 0: for n in range(len(sub_idx)): if len(sub_idx[n]) > 0: msa.append(sub_msa[n][sub_idx[n].pop(0)]) sub_msa_num -= 1 if len(msa) == max_msa: break # Write final MSA to file with open(os.path.join(output, "final.a3m"), "w") as f: for n, sequence in enumerate(msa): f.write(f">n{n}\n{sequence}\n")