# -*- coding: utf-8 -*-
####################################################################################
# Integron_Finder - Integron Finder aims at detecting integrons in DNA sequences #
# by finding particular features of the integron: #
# - the attC sites #
# - the integrase #
# - and when possible attI site and promoters. #
# #
# Authors: Jean Cury, Bertrand Neron, Eduardo PC Rocha #
# Copyright (c) 2015 - 2018 Institut Pasteur, Paris and CNRS. #
# See the COPYRIGHT file for details #
# #
# integron_finder is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# integron_finder is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program (COPYING file). #
# If not, see <http://www.gnu.org/licenses/>. #
####################################################################################
import os
import glob
import colorlog
import numpy as np
import pandas as pd
# display warning only for non installed integron_finder
from Bio import BiopythonExperimentalWarning
import warnings
warnings.simplefilter('ignore', BiopythonExperimentalWarning)
from Bio import SearchIO
_log = colorlog.getLogger(__name__)
[docs]def scan_hmm_bank(path):
"""
:param str path: - if the path is a dir:
return all files ending with .hmm in the dir
- if the path is a file:
parse the file, each line must be an expression (glob)
pointing to hmm files
:return: lists of hmm files to consider for annotation
:rtype: list of str
:raise IOError: if the path does not exists
"""
real_path = os.path.realpath(path)
files = []
if os.path.exists(real_path):
if os.path.isdir(real_path):
files = glob.glob(os.path.join(real_path, '*.hmm'))
elif os.path.isfile(real_path):
with open(real_path) as hmm_bank:
wrong_lines = 0
for bank_path in hmm_bank:
bank_path = bank_path.strip()
if bank_path.startswith('#'):
continue
elif not os.path.isabs(bank_path):
if "_prefix_share" in globals():
prefix = _prefix_share
elif 'INTEGRON_HOME' in os.environ:
prefix = os.environ['INTEGRON_HOME']
else:
prefix = ''
bank_path = os.path.normpath(os.path.join(prefix, bank_path))
bank_files = glob.glob(os.path.expanduser(bank_path))
if not bank_files:
wrong_lines += 1
_log.warning("func_annot '{}' does not match any files.".format(bank_path))
if wrong_lines > 10:
msg = "Too many lines with no hmm file in {}.\nIs there right file?" \
"\nsee documentation tutorial functional annotation".format(real_path)
_log.error(msg)
raise ValueError(msg)
else:
for path in bank_files:
_log.warning("the hmm {} will be used for functional annotation".format(path))
files.extend(bank_files)
return files
else:
raise IOError("{} no such file or directory".format(path))
[docs]def read_hmm(replicon_id, prot_db, infile, cfg, evalue=1., coverage=0.5):
"""
Function that parse hmmer --out output and returns a pandas DataFrame
filter output by evalue and coverage. (Being % of the profile aligned)
:param str replicon_id: the id of the replicon
:param prot_db: The protein database corresponding to the replicon translation
:type prot_db: :class:`integron_finder.prot_db.ProteinDB` object.
:param str infile: the hmm output (in tabulated format) to parse
:param cfg: the config
:type cfg: :class:`integron_finder.config.Config` object.
:param float evalue: filter out hits with evalue greater tha evalue.
:param float coverage: filter out hits with coverage under coverage (% of the profile aligned)
:returns: data Frame with columns:
| "Accession_number", "query_name", "ID_query", "ID_prot", "strand", "pos_beg", "pos_end", "evalue"
| each row correspond to a hit.
:rtype: a :class:`pandas.DataFrame`
"""
df = pd.DataFrame(columns=["Accession_number", "query_name", "ID_query",
"ID_prot", "strand", "pos_beg", "pos_end",
"evalue", "hmmfrom", "hmmto", "alifrom",
"alito", "len_profile"])
_log.debug("Parse {}".format(infile))
gen = SearchIO.parse(infile, 'hmmer3-text')
for idx, query_result in enumerate(gen):
len_profile = query_result.seq_len
query = query_result.id
try:
id_query = query_result.accession
except AttributeError:
id_query = "-"
for idx2, hit in enumerate(query_result.hits):
id_prot = hit.id
_, strand, pos_beg, pos_end = prot_db.get_description(hit.id)
evalue_tmp = []
hmmfrom = []
hmmto = []
alifrom = []
alito = []
for hsp in hit.hsps:
# strand = hsp.query_strand
evalue_tmp.append(hsp.evalue)
hmmfrom.append(hsp.query_start + 1)
hmmto.append(hsp.query_end)
alifrom.append(hsp.hit_start + 1)
alito.append(hsp.hit_end)
best_evalue = np.argmin(evalue_tmp)
df.loc[idx+idx2, "ID_prot"] = id_prot
df.loc[idx+idx2, "ID_query"] = id_query # "-" # remnant of ancient parsing function to keep data structure
df.loc[idx+idx2, "pos_beg"] = pos_beg
df.loc[idx+idx2, "pos_end"] = pos_end
df.loc[idx+idx2, "strand"] = strand
df.loc[idx+idx2, "evalue"] = evalue_tmp[best_evalue] # i-evalue
df.loc[idx+idx2, "hmmfrom"] = hmmfrom[best_evalue] # hmmfrom
df.loc[idx+idx2, "hmmto"] = hmmto[best_evalue] # hmm to
df.loc[idx+idx2, "alifrom"] = alifrom[best_evalue] # alifrom
df.loc[idx+idx2, "alito"] = alito[best_evalue] # ali to
df.loc[idx+idx2, "len_profile"] = float(len_profile)
df.loc[idx+idx2, "Accession_number"] = replicon_id
df.loc[idx+idx2, "query_name"] = query
intcols = ["pos_beg", "pos_end", "strand"]
floatcol = ["evalue", "len_profile"]
df[intcols] = df[intcols].astype(int)
df[floatcol] = df[floatcol].astype(float)
df = df[(((df.hmmto - df.hmmfrom) / df.len_profile) > coverage) & (df.evalue < evalue)]
df.index = list(range(len(df)))
df_out = df[["Accession_number", "query_name", "ID_query", "ID_prot",
"strand", "pos_beg", "pos_end", "evalue"]]
return df_out