#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 10 19:35:14 2016
@author: roberto
"""
from typing import Union
from bedhandler.domain import RegionId, GeneId, Pool
from pandas import DataFrame
from bedhandler.handler import BedFileLoader
def ntuple(string: str) -> tuple:
"""
Return a tuple of integers from a string of comma-separated integers. When casting a substring to a int fails,
it uses the substring itself
:param str string: string of comma-separated integers
:return: tuple of int
"""
return tuple([maybeint(number) for number in string.strip().split(',')])
def maybeint(string: str) -> Union[str, int]:
"""
Try to cast a string to a int
:param str string: to cast
:return: int(string) or string in case cast fails
"""
try:
return int(string)
except ValueError:
return string
[docs]class ROI(object):
"""
This class stores "regions of interest" in the .bed file
- self.amplicons is a list of Amplicon objects
- self.targets is a DataFrame of valid targets for CNV analysis
:param str bedfile: path to file in which sequencing amplicons are listed (BED)
:param int spacing: is the number of nucleotides to ignore at amplicon start and end, to avoid overlapping reads
:param int mindata: is the minimum number of nucleotides for a valid target
:param int maxpool: is the maximum number of origin pools allowed for a valid target
:param list lines: optional list of lines containing beddata
"""
def __init__(self, bedfile: str, spacing: int = 20,
mindata: int = 50, maxpool: int = None, lines: list = None):
# spacing should be >= 0
if not type(spacing) == int or spacing < 0:
print("ROI: Invalid spacing: {0}; defaulting to 0".format(spacing))
spacing = 0
# mindata should be > 0
if not type(mindata) == int or mindata <= 0:
print("ROI: Invalid mindata: {0}; defaulting to 1".format(mindata))
mindata = 1
print("Loading .bed data.")
if maxpool:
print("ROI limited to max number of pools = {0}".format(maxpool))
self.source = bedfile
if lines is None:
bed_file = BedFileLoader(bedfile)
lines = bed_file.expand_columns()
self.amplicons = self.load_amplicons(lines, maxpool, -1)
if self.amplicons:
self.targets = self.define_targets(spacing, mindata)
else:
return
def __repr__(self):
return 'ROI("{0}"; {1} amplicons, {2} targets)'.format(
self.source, len(self.amplicons), len(self.targets))
[docs] @staticmethod
def load_amplicons(lines: list, maxpool: int, pool_loc: int) -> list:
"""
Return a sorted list of amplicons
:param pool_loc: location of pool data in each line
:param list lines: list of lines loaded from bed file
:param int maxpool: maximum number of origin pools allowed for a valid target
:return: sorted list of amplicons
"""
print("Loading amplicons...")
amplicons = []
total = 0
skipped = 0
for line in lines:
total += 1
newamplicon = Amplicon(line, pool_loc, maxpool)
if newamplicon is not None:
amplicons.append(newamplicon)
else:
skipped += 1
print("{0} amplicons read from the .bed file.".format(total))
print("{0} amplicons loaded.".format(len(amplicons)))
print("{0} amplicons out of range.".format(skipped))
print('Sorting by chromosome and range...')
amplicons.sort(key=lambda x: (x.chromosome, x.chromStart, x.chromEnd))
return amplicons
[docs] def define_targets(self, spacing, min_data) -> DataFrame:
"""
Return valid targets for further analysis. Each target is in the form: (chromosome, start, end, Amplicon)
:param int spacing: number of nucleotides to ignore at amplicon start and end, to avoid overlapping reads
:param int min_data: minimum number of nucleotides for a valid target
:return: a dataframe of targets
"""
print('Defining targets...')
targets = []
this_chrom = ''
last_chrom_end = 0
amp_size = len(self.amplicons)
for i, amplicon in enumerate(self.amplicons):
if amplicon.chrom != this_chrom:
print("Analyzing chromosome: " + amplicon.chrom)
this_chrom = amplicon.chrom
last_chrom_end = 0
# Skip amplicons that are contained within the last amplicon
if amplicon.chromEnd - spacing <= last_chrom_end:
continue
# Avoid regions of overlap with the previous amplicon
this_start = max(last_chrom_end + spacing,
amplicon.chromStart + spacing)
this_end = amplicon.chromEnd - spacing
# Avoid regions of overlap with the next amplicon, if applicable
if i < (amp_size - 1) and self.amplicons[i + 1].chrom == amplicon.chrom:
this_end = min(this_end,
self.amplicons[i + 1].chromStart - spacing)
# Valid amplicons must be at least `mindata` nt long:
if this_end - this_start >= min_data:
targets.append((
this_chrom,
this_start,
this_end,
str(amplicon.gene),
amplicon.pools
))
last_chrom_end = amplicon.chromEnd
print("{0} targets acquired.".format(len(targets)))
return DataFrame(targets, columns=['chrom', 'chromStart', 'chromEnd', 'gene', 'pools'])
class Amplicon(object):
"""
Hold data for each amplicon listed in the BED file
:param list bed_data: is the line in the .bed file corresponding to the amplicon
:param int pool_loc: location of pool data in bed_data
:param int max_pool: maximum number of origin pools allowed for a valid target
"""
def __new__(cls, bed_data: list, pool_loc: int, max_pool: int = None):
"""
Verify the need to create an Amplicon object
:param list bed_data: is the line in the .bed file corresponding to the amplicon
:param int pool_loc: location of pool data in bed_data
:param int max_pool: maximum number of origin pools allowed for a valid target
:return: None or cls
"""
pools = bed_data[pool_loc]
if not max_pool:
return object.__new__(cls)
if max_pool:
if len(pools) > max_pool:
return None
return object.__new__(cls)
def __init__(self, bed_data: list, pool_loc: int, max_pool: int = None):
fields = [('chrom', str),
('chromStart', int),
('chromEnd', int),
('regionId', RegionId),
('score', maybeint)]
if len(bed_data) > 7:
fields = fields + [('strand', str), ('frame', str)]
self.fields = fields + [('gene', GeneId), ('pools', Pool), ('submittedRegion', maybeint)]
self.fieldnames = {field[0] for field in fields}
self.max_pool = max_pool
for i in range(len(bed_data)):
setattr(self, self.fields[i][0], self.fields[i][1](bed_data[i]))
self.pools = bed_data[pool_loc]
chromosome = self.chrom.split('chr')[-1]
try:
self.chromosome = '{:0>2}'.format(int(chromosome))
except ValueError:
self.chromosome = chromosome
def __getattr__(self, attribute):
"""
Override getattr() to handle unset valid fields
"""
if attribute in self.fieldnames:
return None
else:
raise AttributeError
def __repr__(self):
return "Amplicon({0};{1}:{2}-{3})".format(self.gene,
self.chrom,
self.chromStart,
self.chromEnd)