"""General-purpose functions."""
from __future__ import division
from __future__ import print_function
from collections import OrderedDict
import os
import re
import six
from six.moves import range
import numpy as np
EPS = 10e-8
[docs]def make_dir(dirname):
"""Create directory `dirname` if non-existing.
Parameters
----------
dirname: str
Path of directory to be created.
Returns
-------
bool
`True`, if directory did not exist and was created.
"""
if os.path.exists(dirname):
return False
else:
os.makedirs(dirname)
return True
[docs]def slice_dict(data, idx):
"""Slice elements in dict `data` by `idx`.
Slices array-like objects in `data` by index `idx`. `data` can be
tree-like with sub-dicts, where the leafs must be sliceable by `idx`.
Parameters
----------
data: dict
dict to be sliced.
idx: slice
Slice index.
Returns
-------
dict
dict with same elements as in `data` with sliced by `idx`.
"""
if isinstance(data, dict):
data_sliced = dict()
for key, value in six.iteritems(data):
data_sliced[key] = slice_dict(value, idx)
return data_sliced
else:
return data[idx]
[docs]def fold_dict(data, nb_level=10**5):
"""Fold dict `data`.
Turns dictionary keys, e.g. 'level1/level2/level3', into sub-dicts, e.g.
data['level1']['level2']['level3'].
Parameters
----------
data: dict
dict to be folded.
nb_level: int
Maximum recursion depth.
Returns
-------
dict
Folded dict.
"""
if nb_level <= 0:
return data
groups = dict()
levels = set()
for key, value in data.items():
idx = key.find('/')
if idx > 0:
level = key[:idx]
group_dict = groups.setdefault(level, dict())
group_dict[key[(idx + 1):]] = value
levels.add(level)
else:
groups[key] = value
for level in levels:
groups[level] = fold_dict(groups[level], nb_level - 1)
return groups
[docs]def linear_weights(length, start=0.1):
"""Create linear-triangle weights.
Create array `x` of length `length` with linear weights, where the weight is
highest (one) for the center x[length//2] and lowest (`start` ) at the ends
x[0] and x[-1].
Parameters
----------
length: int
Length of the weight array.
start: float
Minimum weights.
Returns
-------
:class:`np.ndarray`
Array of length `length` with weight.
"""
weights = np.linspace(start, 1, np.ceil(length / 2))
tmp = weights
if length % 2:
tmp = tmp[:-1]
weights = np.hstack((weights, tmp[::-1]))
return weights
[docs]def to_list(value):
"""Convert `value` to a list."""
if not isinstance(value, list) and value is not None:
value = [value]
return value
[docs]def move_columns_front(frame, columns):
"""Move `columns` of Pandas DataFrame to the front."""
if not isinstance(columns, list):
columns = [columns]
columns = [column for column in columns if column in frame.columns]
return frame[columns + list(frame.columns[~frame.columns.isin(columns)])]
[docs]def get_from_module(identifier, module_params, ignore_case=True):
"""Return object from module.
Return object with name `identifier` from module with items `module_params`.
Parameters
----------
identifier: str
Name of object, e.g. a function, in module.
module_params: dict
`dict` of items in module, e.g. `globals()`
ignore_case: bool
If `True`, ignore case of `identifier`.
Returns
-------
object
Object with name `identifier` in module, e.g. a function or class.
"""
if ignore_case:
_module_params = dict()
for key, value in six.iteritems(module_params):
_module_params[key.lower()] = value
_identifier = identifier.lower()
else:
_module_params = module_params
_identifier = identifier
item = _module_params.get(_identifier)
if not item:
raise ValueError('Invalid identifier "%s"!' % identifier)
return item
[docs]def filter_regex(values, regexs):
"""Filters list of `values` by list of `regexs`.
Paramters
---------
values: list
list of `str` values.
regexs: list
list of `str` regexs.
Returns
-------
list
Sorted `list` of values in `values` that match any regex in `regexs`.
"""
if not isinstance(values, list):
values = [values]
if not isinstance(regexs, list):
regexs = [regexs]
filtered = set()
for value in values:
for regex in regexs:
if re.search(regex, value):
filtered.add(value)
return sorted(list(filtered))
[docs]class ProgressBar(object):
"""Vertical progress bar.
Unlike the progressbar2 package, logs progress as multiple lines instead of
single line, which enables printing to a file. Used, for example, in
Parameters
----------
nb_tot: int
Maximum value
logger: function
Function that takes a `str` and prints it.
interval: float
Logging frequency as fraction of one. For example, 0.1 logs every tenth
value.
See also
--------
dcpg_eval.py and dcpg_filter_act.py.
"""
def __init__(self, nb_tot, logger=print, interval=0.1):
if nb_tot <= 0:
raise ValueError('Total value must be greater than zero!')
self.nb_tot = nb_tot
self.logger = logger
self.interval = interval
self._value = 0
self._nb_interval = 0
def update(self, amount):
tricker = self._value == 0
amount = min(amount, self.nb_tot - self._value)
self._value += amount
self._nb_interval += amount
tricker |= self._nb_interval >= int(self.nb_tot * self.interval)
tricker |= self._value >= self.nb_tot
if tricker:
nb_digit = int(np.floor(np.log10(self.nb_tot))) + 1
msg = '{value:{nb_digit}d}/{nb_tot:d} ({per:3.1f}%)'
msg = msg.format(value=self._value, nb_digit=nb_digit,
nb_tot=self.nb_tot,
per=self._value / self.nb_tot * 100)
self.logger(msg)
self._nb_interval = 0
def close(self):
if self._value < self.nb_tot:
self.update(self.nb_tot)