"""Functions for reading and matching annotations."""
from __future__ import division
from __future__ import print_function
import pandas as pd
import numpy as np
from six.moves import range
[docs]def read_bed(filename, sort=False, usecols=[0, 1, 2], *args, **kwargs):
"""Read chromo,start,end from BED file without formatting chromo."""
d = pd.read_table(filename, header=None, usecols=usecols, *args, **kwargs)
d.columns = range(d.shape[1])
d.rename(columns={0: 'chromo', 1: 'start', 2: 'end'}, inplace=True)
if sort:
d.sort(['chromo', 'start', 'end'], inplace=True)
return d
[docs]def in_which(x, ys, ye):
"""Return index of positions in intervals.
Returns for positions `x[i]` index `j`, s.t. `ys[j] <= x[i] <= ye[j]`, or
-1 if `x[i]` is not overlapped by any interval.
Intervals must be non-overlapping!
Parameters
----------
x : list
list of positions.
ys: list
list with start of interval sorted in ascending order.
ye: list
list with end of interval.
Returns
-------
:class:`numpy.ndarray`
n:class:`numpy.ndarray` with indices of overlapping intervals or -1.
"""
n = len(ys)
m = len(x)
rv = np.empty(m, dtype=np.int)
rv.fill(-1)
i = 0
j = 0
while i < n and j < m:
while j < m and x[j] <= ye[i]:
if x[j] >= ys[i]:
rv[j] = i
j += 1
i += 1
return rv
[docs]def is_in(pos, start, end):
"""Test if position is overlapped by at least one interval."""
return in_which(pos, start, end) >= 0
[docs]def distance(pos, start, end):
"""Return shortest distance between a position and a list of intervals.
Parameters
----------
pos: list
List of integer positions.
start: list
Start position of intervals.
end: list
End position of intervals.
Returns
:class:`numpy.ndarray`
:class:`numpy.ndarray` of same length as `pos` with shortest distance
between each `pos[i]` and any interval.
"""
m = len(start)
n = len(pos)
i = 0
j = 0
end_prev = -10**7
dist = np.zeros(n)
while i < m and j < n:
while j < n and pos[j] <= end[i]:
if pos[j] < start[i]:
dist[j] = min(pos[j] - end_prev, start[i] - pos[j])
j += 1
end_prev = end[i]
i += 1
dist[j:] = pos[j:] - end_prev
assert np.all(dist >= 0)
return dist
[docs]def join_overlapping(s, e):
"""Join overlapping intervals.
Transforms a list of possible overlapping intervals into non-overlapping
intervals.
Parameters
----------
s : list
List with start of interval sorted in ascending order
e : list
List with end of interval.
Returns
-------
tuple
`tuple` (s, e) of non-overlapping intervals.
"""
rs = []
re = []
n = len(s)
if n == 0:
return (rs, re)
l = s[0]
r = e[0]
for i in range(1, n):
if s[i] > r:
rs.append(l)
re.append(r)
l = s[i]
r = e[i]
else:
r = max(r, e[i])
rs.append(l)
re.append(r)
return (rs, re)
[docs]def join_overlapping_frame(d):
"""Join overlapping intervals of Pandas DataFrame.
Uses `join_overlapping` to join overlapping intervals of
:class:`pandas.DataFrame` `d`.
"""
d = d.sort_values(['chromo', 'start', 'end'])
e = []
for chromo in d.chromo.unique():
dc = d.loc[d.chromo == chromo]
start, end = join_overlapping(dc.start.values, dc.end.values)
ec = pd.DataFrame(dict(chromo=chromo, start=start, end=end))
e.append(ec)
e = pd.concat(e)
e = e.loc[:, ['chromo', 'start', 'end']]
return e
[docs]def group_overlapping(s, e):
"""Assign group index of indices.
Assigns group index to intervals. Overlapping intervals will be assigned
to the same group.
Parameters
----------
s : list
list with start of interval sorted in ascending order.
e : list
list with end of interval.
Returns
-------
:class:`numpy.ndarray`
:class:`numpy.ndarray` with group indices.
"""
n = len(s)
group = np.zeros(n, dtype='int32')
if n == 0:
return group
idx = 0
r = e[0]
for i in range(1, n):
if s[i] > r:
idx += 1
r = e[i]
else:
r = max(r, e[i])
group[i] = idx
return group
[docs]def extend_len(start, end, min_len, min_pos=1):
"""Extend intervals to minimum length.
Extends intervals `start`-`end` with length smaller than `min_len` to length
`min_len` by equally decreasing `start` and increasing `end`.
Parameters
----------
start: list
List of start position of intervals.
end: list
List of end position of intervals.
min_len: int
Minimum interval length.
min_pos: int
Minimum position.
Returns
-------
tuple
`tuple` with start end end position of extended intervals.
"""
delta = np.maximum(0, min_len - (end - start + 1))
ext = np.floor(0.5 * delta).astype(np.int)
start_ext = np.maximum(min_pos, start - ext)
end_ext = end + np.maximum(0, (min_len - (end - start_ext + 1)))
assert np.all(min_len <= (end_ext - start_ext + 1))
return (start_ext, end_ext)
[docs]def extend_len_frame(d, min_len):
"""Extend length of intervals in Pandas DataFrame using `extend_len`."""
start, end = extend_len(d.start.values, d.end.values, min_len)
e = d.copy()
e['start'] = start
e['end'] = end
return e