Source code for deepcpg.data.feature_extractor
"""Feature extraction."""
from __future__ import division
from __future__ import print_function
import numpy as np
from six.moves import range
[docs]class KnnCpgFeatureExtractor(object):
"""Extract k CpG sites next to target sites. Exclude CpG sites at the
same position.
"""
def __init__(self, k=1):
self.k = k
[docs] def extract(self, x, y, ys):
"""Extract state and distance of k CpG sites next to target sites.
Target site is excluded.
Parameters
----------
x: :class:`numpy.ndarray`
:class:`numpy.ndarray` with target positions sorted in ascending
order.
y: :class:`numpy.ndarray`
:class:`numpy.ndarray` with source positions sorted in ascending
order.
ys: :class:`numpy.ndarray`
:class:`numpy.ndarray` with source CpG states.
Returns
-------
tuple
Tuple (cpg, dist) with numpy arrays of dimension (len(x), 2k):
cpg: CpG states to the left (0:k) and right (k:2k)
dist: Distances to the left (0:k) and right (k:2k)
"""
n = len(x)
m = len(y)
k = self.k
kk = 2 * self.k
yc = self.__larger_equal(x, y)
knn_cpg = np.empty((n, kk), dtype=np.float16)
knn_cpg.fill(np.nan)
knn_dist = np.empty((n, kk), dtype=np.float32)
knn_dist.fill(np.nan)
for i in range(n):
# Left side
yl = yc[i] - k
yr = yc[i] - 1
if yr >= 0:
xl = 0
xr = k - 1
if yl < 0:
xl += np.abs(yl)
yl = 0
xr += 1
yr += 1
knn_cpg[i, xl:xr] = ys[yl:yr]
knn_dist[i, xl:xr] = np.abs(y[yl:yr] - x[i])
# Right side
yl = yc[i]
if yl >= m:
continue
if x[i] == y[yl]:
yl += 1
if yl >= m:
continue
yr = yl + k - 1
xl = 0
xr = k - 1
if yr >= m:
xr -= yr - m + 1
yr = m - 1
xl += k
xr += k + 1
yr += 1
knn_cpg[i, xl:xr] = ys[yl:yr]
knn_dist[i, xl:xr] = np.abs(y[yl:yr] - x[i])
return (knn_cpg, knn_dist)
def __larger_equal(self, x, y):
"""Return for each x[i] index j, s.t. y[j] >= x[i].
Parameters
----------
x: :class:`numpy.ndarray`
:class:`numpy.ndarray` of with positions sorted in ascending order.
y: :class:`numpy.ndarray`
:class:`numpy.ndarray` of with positions sorted in ascending order.
"""
n = len(x)
m = len(y)
rv = np.empty(n, dtype=np.int)
i = 0
j = 0
while i < n and j < m:
while j < m and x[i] > y[j]:
j += 1
rv[i] = j
i += 1
if i < n:
# x[i] > y[m - 1]
rv[i:] = m
return rv
[docs]class IntervalFeatureExtractor(object):
"""Check if positions are in a list of intervals (start, end)."""
[docs] @staticmethod
def join_intervals(s, e):
"""Transform a list of possible overlapping intervals into
non-overlapping intervals.
Parameters
----------
s: list
List with start of interval sorted in ascending order.
e: list
List with end of interval.
Returns
-------
tuple
Tuple (s, e) of non-overlapping intervals.
"""
rs = []
re = []
n = len(s)
if n == 0:
return (rs, re)
l = s[0]
r = e[0]
for i in range(1, n):
if s[i] > r:
rs.append(l)
re.append(r)
l = s[i]
r = e[i]
else:
r = max(r, e[i])
rs.append(l)
re.append(r)
return (rs, re)
[docs] @staticmethod
def index_intervals(x, ys, ye):
"""Return for positions x[i] index j, s.t. ys[j] <= x[i] <= ye[j] or -1.
Intervals must be non-overlapping!
Parameters
----------
x : list
List of positions.
ys: list
List with start of interval sorted in ascending order.
ye: list
List with end of interval.
Returns
-------
:class:`numpy.ndarray`
:class:`numpy.ndarray` of same length than x with index or -1.
"""
n = len(ys)
m = len(x)
rv = np.empty(m, dtype=np.int)
rv.fill(-1)
i = 0
j = 0
while i < n and j < m:
while j < m and x[j] <= ye[i]:
if x[j] >= ys[i]:
rv[j] = i
j += 1
i += 1
return rv
def extract(self, x, ys, ye):
return self.index_intervals(x, ys, ye) >= 0
class KmersFeatureExtractor(object):
def __init__(self, kmer_len, nb_char=4):
"""Extract kmer frequencies from integer sequences.
Parameters
----------
kmer_len: int
Kmer length.
nb_char: int
Number of characters in alphabet.
"""
self.kmer_len = kmer_len
self.nb_char = nb_char
self.nb_kmer = self.nb_char**self.kmer_len
def __call__(self, seqs):
"""Extract kmer frequencies from integer sequences.
Parameters
----------
seqs: :class:`numpy.ndarray`
:class:`numpy.ndarray` of size MxN, with M sequences of length N.
Returns
-------
:class:`numpy.ndarray`
:class:`numpy.ndarray` of size MxC with kmer frequencies.
"""
nb_seq, seq_len = seqs.shape
kmer_freq = np.zeros((nb_seq, self.nb_kmer), dtype=np.int32)
vec = np.array([self.nb_char**i for i in range(self.kmer_len)],
dtype=np.int32)
for i in range(nb_seq):
for j in range(seq_len - self.kmer_len + 1):
kmer = seqs[i, j:(j + self.kmer_len)]
kmer_freq[i, kmer.dot(vec)] += 1
return kmer_freq