"""Module with abstract classes."""
__all__ = ['Inlet', 'UnitOperation',
'RtdModel', 'UserInterface',
'PDF', 'ChromatographyLoadBreakthrough',
'ParameterSetList']
__version__ = '0.7.1'
__author__ = 'Jure Sencar'
import typing as _typing
import numpy as _np
from abc import ABC as _ABC, abstractmethod as _abstractmethod
from collections import OrderedDict as _OrderedDict
from bio_rtd import logger as _logger
from bio_rtd import adj_par as _adj_par
from bio_rtd import utils as _utils
[docs]class DefaultLoggerLogic(_ABC):
# noinspection PyProtectedMember
"""Default binding of the `RtdLogger` to a class.
The class holds a reference to a :class:`bio_rtd.logger.RtdLogger`
instance. When the class receives the instance, it plants a data
tree into it. If the class is asked to provide the instance before
it received one, then an instance of
:class:`bio_rtd.logger.DefaultLogger` is created and passed on.
Parameters
----------
logger_parent_id
Custom unique id that belongs to the instance of the class.
The data tree of this instance is stored in
:class:`bio_rtd.logger.RtdLogger` under the `logger_parent_id`.
Examples
--------
>>> logger_parent_id = "parent_unit_operation"
>>> l = DefaultLoggerLogic(logger_parent_id)
>>> isinstance(l.log, _logger.DefaultLogger)
True
>>> # Log error: DefaultLogger raises RuntimeError.
>>> l.log.e("Error Description")
Traceback (most recent call last):
RuntimeError: Error Description
>>> # Log waring: DefaultLogger prints it.
>>> l.log.w("Warning Description")
Warning Description
>>> # Log info: DefaultLogger ignores it.
>>> l.log.i("Info")
>>> l.log.log_data = True
>>> l.log.log_level = _logger.RtdLogger.DEBUG
>>> l.log.i_data(l._log_tree, "a", 3) # store value in logger
>>> l.log.d_data(l._log_tree, "b", 7) # store at DEBUG level
>>> l.log.get_data_tree(logger_parent_id)["b"]
7
>>> l.log = _logger.StrictLogger()
>>> # Log waring: StrictLogger raises RuntimeError.
>>> l.log.w("Warning Info")
Traceback (most recent call last):
RuntimeError: Warning Info
See Also
--------
:class:`bio_rtd.logger.DefaultLogger`
"""
def __init__(self, logger_parent_id: str):
self._instance_id = logger_parent_id
self._log_entity_id = logger_parent_id
self._logger: _typing.Union[_logger.RtdLogger, None] = None
self._log_tree = dict() # place to store logged data
@property
def log(self) -> _logger.RtdLogger:
"""Reference of the `RtdLogger` instance.
Setter also plants instance data tree into passed logger.
If logger is requested, but not yet set, then a
:class:`bio_rtd.logger.DefaultLogger` is instantiated.
"""
if self._logger is None:
self.log = _logger.DefaultLogger() # init default logger
return self._logger
@log.setter
def log(self, logger: _logger.RtdLogger):
self._logger = logger
self._logger.set_data_tree(self._log_entity_id, self._log_tree)
[docs] def set_logger_from_parent(self, parent_id: str,
logger: _logger.RtdLogger):
"""Inherit logger from parent.
Parameters
----------
parent_id
Unique identifier of parent instance.
logger
Logger from parent instance.
"""
self._logger = logger
self._log_entity_id = f"{parent_id}/{self._instance_id}"
self._logger.set_data_tree(self._log_entity_id,
self._log_tree)
[docs]class Inlet(DefaultLoggerLogic, _ABC):
"""Generates starting flow rate and concentration profiles.
Parameters
----------
t
Simulation time vector.
Starts with 0 and has a constant time step.
species_list
List with names of simulating process fluid species.
inlet_id
Unique identifier of an instance. It is stored in :attr:`uo_id`.
gui_title
Readable title of an instance.
"""
def __init__(self, t: _np.ndarray, species_list: _typing.Sequence[str],
inlet_id: str, gui_title: str):
super().__init__(inlet_id) # logger
# Assert proper time vector.
assert t[0] == 0, "t should start with 0"
assert len(t.shape) == 1, "t should be a 1D np.ndarray"
self._t = _np.linspace(0, t[-1], t.size)
self._dt = t[-1] / (t.size - 1)
assert _np.all(_np.abs(self._t - t) < 0.001 * self._dt / t.size), \
"t should have a fixed step size"
# Species
self.species_list: _typing.Sequence[str] = species_list
"""List with names of simulating process fluid species."""
self._n_species = len(self.species_list)
# Strings
self.uo_id: str = inlet_id
"""Unique identifier of the instance."""
self.gui_title: str = gui_title
"""Human readable title (for plots)."""
# Placeholders
self.adj_par_list: _typing.Sequence[_adj_par.AdjustableParameter] = ()
"""List of adjustable parameters exposed to the GUI."""
# Outputs
self._f_out = _np.zeros_like(t)
self._c_out = _np.zeros([self._n_species, t.size])
[docs] def get_t(self) -> _np.ndarray:
"""Get simulation time vector."""
return self._t
[docs] def get_n_species(self) -> int:
"""Get number of process fluid species."""
return self._n_species
[docs] @_abstractmethod
def refresh(self): # pragma: no cover
"""Updates output profiles.
Internally it updates `self._f_out` and `self._c_out` based on
instance attribute values.
"""
pass
[docs] def get_result(self) -> _typing.Tuple[_np.ndarray, _np.ndarray]:
"""Get flow rate and concentration profiles.
Returns
-------
f_out
Flow rate profile.
c_out
Concentration profile.
"""
return self._f_out, self._c_out
[docs]class UnitOperation(DefaultLoggerLogic, _ABC):
"""Processes flow rate and concentration profiles.
Parameters
----------
t
Simulation time vector.
Starts with 0 and has a constant time step.
uo_id
Unique identifier.
gui_title
Readable title for GUI.
"""
def __init__(self, t: _np.ndarray, uo_id: str, gui_title: str = ""):
super().__init__(uo_id) # logger
# simulation time vector
assert t[0] == 0, "Time vector must start with 0"
self._t = t
self._dt = t[-1] / (t.size - 1) # time step
# id and title
self.uo_id: str = uo_id
"""Unique identifier of the instance"""
self.gui_title: str = gui_title
"""Readable title for GUI"""
# adjustable parameter list
self.adj_par_list = []
"""list of :class:`bio_rtd.adj_par.AdjustableParameter`: List
of adjustable parameters exposed to the GUI."""
# hide unit operation from plots
self.gui_hidden: bool = False
"""Hide the of the unit operation (default False)."""
# start-up phase (optional initial delay)
self.discard_inlet_until_t: float = -1
"""Discard inlet until given time."""
self.discard_inlet_until_min_c: _np.ndarray = _np.array([])
"""Discard inlet until given concentration is reached."""
self.discard_inlet_until_min_c_rel: _np.ndarray = _np.array([])
"""Discard inlet until given concentration relative to is reached.
Specified concentration is relative to the max concentration.
"""
self.discard_inlet_n_cycles: int = -1
"""Discard first n cycles of the periodic inlet flow rate profile."""
# shout-down phase (optional early stop)
self.discard_outlet_until_t: float = -1
"""Discard outlet until given time."""
self.discard_outlet_until_min_c: _np.ndarray = _np.array([])
"""Discard outlet until given concentration is reached."""
self.discard_outlet_until_min_c_rel: _np.ndarray = _np.array([])
"""Discard outlet until given concentration relative to is reached.
Specified concentration is relative to the max concentration.
"""
self.discard_outlet_n_cycles: int = -1
"""Discard first n cycles of the periodic outlet flow rate profile."""
# placeholders, populated during simulation
self._c: _np.ndarray = _np.array([]) # concentration profiles
self._f: _np.array = _np.array([]) # flow rate profile
self._n_species: int = 0 # number of species
def _assert_valid_species_list(self, species: _typing.Sequence[int]):
"""Species indexes start with 0.
List must be ordered in ascending order (to prevent bugs).
List must have unique values (again, to prevent bugs).
"""
if len(species) == 0:
self.log.w("Species list is empty")
else:
assert max(species) < self._n_species, \
"Index of species should be less than number of species"
assert min(species) >= 0, \
"Index of species should not be negative"
assert len(set(species)) == len(species), \
"Vector with species should not have duplicate values"
assert list(set(species)) == species, \
"Values in vector with species must be ascending"
def _is_flow_box_shaped(self) -> bool:
"""Constant profile with optional leading or trailing zeros."""
assert _np.all(self._f >= 0), "Flow rate is negative!!!"
if _np.all(self._f == 0):
self.log.w("Flow rate is 0!")
return False
max_flow_start, max_flow_end = \
_utils.vectors.true_start_and_end(self._f == self._f.max())
if _np.any(self._f[:max_flow_start] > 0):
return False
elif _np.any(self._f[max_flow_end:] > 0):
return False
elif _np.any(self._f[max_flow_start:max_flow_end] != self._f.max()):
return False
else:
return True
def _i_flow_on(self) -> _typing.Sequence[int]:
"""Detect when the flow rate switches from off to on.
In case of periodic flow rate, the function returns all
switching events.
Returns
-------
i_interval_start
Indexes of time points at which the flow rate turns on.
Each index corresponds to a leading non-zero value.
"""
if _np.all(self._f == 0):
self.log.w("Flow rate is 0!")
return []
assert _np.all(self._f[self._f != 0] == self._f.max()), \
"flow rate must have a constant 'on' value"
return list(_np.argwhere(_np.diff(self._f, prepend=0) > 0).flatten())
def _assert_periodic_flow(self) -> _typing.Tuple[_typing.Sequence[int],
float,
float]:
"""Assert and provides info about periodic flow rate.
Only last period is allowed to be shorter than others.
Returns
-------
i_flow_start_list
Indexes of time-points at which the flow rate gets turned on
i_flow_on_average
Number of time-points of flow 'on' interval.
t_cycle_duration
Duration of average cycle ('on' + 'off' interval)
"""
# Get all flow on positions.
i_flow_start_list = self._i_flow_on()
assert len(i_flow_start_list) > 0, "Flow is 0"
assert len(i_flow_start_list) > 1, \
"Periodic flow must have at least 2 cycles"
# Get average cycle duration.
t_cycle_duration = _np.mean(_np.diff(i_flow_start_list)) * self._dt
assert _np.all(_np.abs(
_np.diff(i_flow_start_list) - t_cycle_duration / self._dt
) <= 1), "Periodic flow rate must have fixed cycle duration"
# Assert periods.
i_flow_on_first = \
_utils.vectors.true_start(self._f[i_flow_start_list[0]:] == 0)
i_flow_on_total = 0.0
n_cycles = 0
on_mask = _np.zeros_like(self._f, dtype=bool)
for i in range(len(i_flow_start_list)):
# Get next stop.
if i + 1 == len(i_flow_start_list):
i_cycle_flow_on_duration = _utils.vectors.true_end(
self._f[i_flow_start_list[i]:] > 0
)
else:
i_cycle_flow_on_duration = _utils.vectors.true_start(
self._f[i_flow_start_list[i]:] == 0
)
i_flow_on_total += i_cycle_flow_on_duration
n_cycles += 1
i_flow_off = i_flow_start_list[i] + i_cycle_flow_on_duration
on_mask[i_flow_start_list[i]:i_flow_off] = True
if i + 1 == len(i_flow_start_list):
# allow last cycle to be clipped
assert i_cycle_flow_on_duration - i_flow_on_first <= 1
else:
# allow to be 1 time step off the first cycle
assert abs(i_cycle_flow_on_duration - i_flow_on_first) <= 1
# Flow can be either off or on at the constant value.
assert _np.all(self._f[on_mask] == self._f.max())
assert _np.all(self._f[~on_mask] == 0)
i_flow_on_average = i_flow_on_total / n_cycles
return i_flow_start_list, i_flow_on_average, t_cycle_duration
def _estimate_steady_state_mean_f(self) -> (float, float):
"""Estimate mean flow rate in a period of periodic flow rate.
Uses `self._assert_periodic_flow()` to get data about flow rate.
Returns
-------
(float, float)
f_mean
Mean flow rate in one cycle.
t_cycle_duration
Duration of a cycle ('on' + 'off' interval).
"""
_, i_flow_on_d, t_cycle_duration = self._assert_periodic_flow()
if i_flow_on_d < 6:
self.log.w(
f"Flow is turned on for {i_flow_on_d} (< 6) time points")
f_mean = self._f.max() * i_flow_on_d * self._dt / t_cycle_duration
return f_mean, t_cycle_duration
def _estimate_steady_state_mean_c(
self,
species: _typing.Optional[_typing.Sequence[int]] = None
) -> _np.ndarray:
"""Estimate mean concentration after start-up phase.
In case of box shaped flow rate, the steady-state concentration
is estimated as an average concentration between the first and
the last crossing of 90 % of the max concentration of the sum
across relevant species.
In case of periodic flow rate, the steady-state concentration
is estimated as an average concentration during the cycle with
the largest average concentration.
If possible, define the model so that this function is not
needed; estimates can in general lead to unintuitive results.
Parameters
----------
species
List of indexes of relevant species. (indexes start with 0).
If not specified, all species are selected.
Returns
-------
c_mean_ss: ndarray
Estimated steady-state mean concentration.
"""
if species is None:
species = range(self._n_species)
else:
self._assert_valid_species_list(species)
assert self._c.size > 0
if _np.all(self._c[species] == 0):
self.log.w("Concentration is zero for all relevant components")
load_c_ss = _np.zeros([len(species), 1])
self.log.i_data(self._log_tree, 'load_c_ss', load_c_ss)
return load_c_ss
if self._is_flow_box_shaped():
c = self._c[species][:, self._f > 0]
i_st, i_end = _utils.vectors.true_start_and_end(
c.sum(0) >= c.sum(0).max() * 0.9
)
self.log.w(f"Steady-state concentration is being estimated"
f" as an average concentration withing 90 %"
f" between the first and the last crossing of 90 %"
f" of the max concentration for sum across"
f" species: {species}")
load_c_ss = _np.mean(c[:, i_st:i_end], 1)[:, _np.newaxis]
self.log.i_data(self._log_tree, 'load_c_ss', load_c_ss)
return load_c_ss
else:
# Get info about periodic flow rate.
i_flow_on_start_list, i_flow_on_duration, t_cycle_duration = \
self._assert_periodic_flow()
self.log.w(f"Steady-state concentration is being estimated"
f" as an average concentration in the cycle"
f" with the largest average concentration"
f" summed across species: {species}")
m_max = 0
c_max = _np.zeros([len(species), 1])
for i, i_st in enumerate(i_flow_on_start_list):
i_flow_on_duration_int = int(round(i_flow_on_duration))
if i_st + i_flow_on_duration_int >= self._t.size:
continue
_c_max = self._c[species, i_st:i_st + i_flow_on_duration_int] \
.mean(1)
if _c_max.sum() > m_max:
c_max = _c_max
m_max = _c_max.sum()
# Add dimension: (n_species,) -> (n_species, 1)
load_c_ss = c_max[:, _np.newaxis]
self.log.i_data(self._log_tree, 'load_c_ss', load_c_ss)
return load_c_ss
def _ensure_single_non_negative_parameter(self, log_level_multiple: int,
log_level_none: int, **kwargs):
"""Check if one or multiple parameters are non-negative.
Parameters
----------
log_level_multiple
Log level at which the function reports to `RtdLogger` in
case of multiple non-negative parameters.
log_level_none
Log level at which the function reports to `RtdLogger` in
case of no non-negative parameters.
"""
non_null_keys = [key for key in kwargs.keys() if kwargs.get(key) >= 0]
if len(non_null_keys) > 1:
self.log.log(
log_level_multiple,
f"Only one of the parameters: {non_null_keys}"
f" should be defined!")
elif len(non_null_keys) == 0:
self.log.log(
log_level_none,
f"One of the parameters: {kwargs.keys()} should be defined!")
def _cut_start_of_c_and_f(self, outlet: bool = False):
"""Trim beginning of flow rate and concentration profiles
Parameters
----------
outlet
Current profiles have already been processed by this
`UnitOperation`.
"""
if self._f.max() == 0:
self.log.w("Flow rate is 0!")
self._f *= 0
self._c *= 0
return
if outlet:
discard_until_t = self.discard_outlet_until_t
discard_n_cycles = self.discard_outlet_n_cycles
discard_until_min_c = self.discard_outlet_until_min_c
discard_until_min_c_rel = self.discard_outlet_until_min_c_rel
else:
discard_until_t = self.discard_inlet_until_t
discard_n_cycles = self.discard_inlet_n_cycles
discard_until_min_c = self.discard_inlet_until_min_c
discard_until_min_c_rel = self.discard_inlet_until_min_c_rel
def get_cycle_start(_i, _i_start_list, _i_duration):
for _i_start in _i_start_list:
if _i <= _i_start + _i_duration:
return _i_start
i_init_cut = -1
_aux_has_min_c = discard_until_min_c.size > 0 \
and discard_until_min_c.max() > 0
_aux_has_min_c_rel = discard_until_min_c_rel.size > 0 \
and discard_until_min_c_rel.max() > 0
if _aux_has_min_c:
assert discard_until_min_c.size == self._n_species
discard_until_min_c = \
discard_until_min_c.reshape(self._n_species, 1)
if _aux_has_min_c_rel:
assert discard_until_min_c_rel.size == self._n_species
discard_until_min_c_rel = \
discard_until_min_c_rel.reshape(self._n_species, 1)
self._ensure_single_non_negative_parameter(
self.log.ERROR, 0,
discard_until_t=discard_until_t,
discard_until_min_c=1 if _aux_has_min_c else -1,
discard_until_min_c_rel=1 if _aux_has_min_c_rel else -1,
discard_n_cycles=discard_n_cycles,
)
if discard_until_t > 0:
if self._is_flow_box_shaped():
i_init_cut = int(discard_until_t / self._dt)
else:
i_start_list, i_duration, _ = self._assert_periodic_flow()
i_init_cut = get_cycle_start(int(discard_until_t / self._dt),
i_start_list, i_duration)
elif _aux_has_min_c or _aux_has_min_c_rel:
if _aux_has_min_c:
lim = _np.all(self._c >= discard_until_min_c, 0) \
* (self._f > 0)
else:
c_lim = discard_until_min_c_rel \
* self._c.max(1)[:, _np.newaxis]
lim = _np.all(self._c >= c_lim, 0) * (self._f > 0)
if _np.any(lim):
if self._is_flow_box_shaped():
i_init_cut = _utils.vectors.true_start(lim)
else:
i_start_list, i_duration, _ = self._assert_periodic_flow()
i_init_cut = get_cycle_start(
_utils.vectors.true_start(lim),
i_start_list, i_duration)
else:
if _aux_has_min_c:
c = discard_until_min_c
else:
c = discard_until_min_c_rel
self.log.w(f"Threshold concentration profile"
f" {c} was never reached!")
self._f[:] = 0
self._c[:] = 0
return
elif discard_n_cycles > 0:
i_flow_on_start_list, _, _ = self._assert_periodic_flow()
if len(i_flow_on_start_list) > discard_n_cycles:
i_init_cut = i_flow_on_start_list[discard_n_cycles]
else:
self.log.w(f"All the cycles {len(i_flow_on_start_list)}"
f" are removed!")
self._f[:] = 0
self._c[:] = 0
return
if i_init_cut > 0:
self._f[:i_init_cut] = 0
self._c[:, :i_init_cut] = 0
[docs] def evaluate(self, f_in: _np.array, c_in: _np.ndarray
) -> _typing.Tuple[_np.ndarray, _np.ndarray]:
"""Evaluate the propagation throughout the unit operation.
Parameters
----------
c_in
Inlet concentration profile with shape
(n_species, n_time_steps).
f_in
Inlet flow rate profile with shape (n_time_steps,).
Returns
-------
f_out
Outlet flow rate profile.
c_out
Outlet concentration profile.
"""
# Make a copy of inlet profiles, so we don't modify the originals.
self._f = f_in.copy()
self._c = c_in.copy()
# Clear log.
self._log_tree.clear()
# Make sure concentration is not negative.
if _np.any(self._c < 0):
raise AssertionError(
f"Negative concentration at unit operation: {self.uo_id}")
# Make sure flow is not negative.
if _np.any(self._f < 0):
raise AssertionError(
f"Negative flow at unit operation: {self.uo_id}")
# Make sure flow is still there.
if self._f.max() == 0:
self.log.w(f"No inlet flow at unit operation: {self.uo_id}")
return self._f * 0, self._c * 0
# Calculate number of tracking species.
self._n_species = self._c.shape[0]
# Cut off start if required.
self._cut_start_of_c_and_f(outlet=False)
# (Re-)calculate the outlet profiles.
self._calculate()
# Cutoff start of outlet if needed.
self._cut_start_of_c_and_f(outlet=True)
return self._f, self._c
@_abstractmethod
def _calculate(self): # pragma: no cover
"""Re-calculates the c_out and f_out from c_in and f_in.
Function should use and modify `self._f` and `self._c`.
"""
pass
[docs] def get_result(self) -> _typing.Tuple[_np.ndarray, _np.ndarray]:
"""Returns existing flow rate and concentration profiles.
Returns
-------
f_out
Outlet flow rate profile.
c_out
Outlet concentration profile.
"""
return self._f, self._c
[docs]class ParameterSetList(_ABC):
"""Abstract class for asserting keys in key-value pairs.
Key-value pairs passed to `assert_and_get_provided_kv_pairs` should
contain all keys from (at least) one of the key groups in
`POSSIBLE_KEY_GROUPS`. The method returns key-value pars with keys
from that group and all passed keys that can be also found in
`OPTIONAL_KEYS`.
Examples
--------
>>> class DummyClass(ParameterSetList):
... POSSIBLE_KEY_GROUPS = [['par_1'], ['par_2a', 'par_2b']]
... OPTIONAL_KEYS = ['key_plus_1', 'key_plus_2']
>>>
>>> dc = DummyClass()
>>> dc.assert_and_get_provided_kv_pairs(par_1=1, par_2a=2)
{'par_1': 1}
>>> dc.assert_and_get_provided_kv_pairs(par_2a=1, par_2b=2,
... key_plus_1=3, key_plus_9=2)
{'par_2a': 1, 'par_2b': 2, 'key_plus_1': 3}
>>> dc.assert_and_get_provided_kv_pairs(
... key_plus_1=1) # doctest: +ELLIPSIS
Traceback (most recent call last):
KeyError: "Keys ... do not contain any of the required groups: ...
"""
# noinspection PyPep8Naming
@property
@_abstractmethod
def POSSIBLE_KEY_GROUPS(self) \
-> _typing.Sequence[_typing.Sequence[str]]: # pragma: no cover
"""Possible key combinations.
Examples
--------
POSSIBLE_KEY_GROUPS = [['v_void'], ['f', 'rt_mean']]
"""
raise NotImplementedError
# noinspection PyPep8Naming
@property
@_abstractmethod
def OPTIONAL_KEYS(self) -> _typing.Sequence[str]: # pragma: no cover
"""Optional additional keys.
Examples
--------
OPTIONAL_KEYS = ['skew', 't_delay']
"""
raise NotImplementedError
[docs] def assert_and_get_provided_kv_pairs(self, **kwargs) -> dict:
"""
Parameters
----------
**kwargs
Inputs to `calc_pdf(**kwargs)` function
Returns
-------
dict
Filtered `**kwargs` so the keys contain first possible key
group in :attr:`POSSIBLE_KEY_GROUPS` and any number of
optional keys from :attr:`OPTIONAL_KEYS`.
Raises
------
ValueError
If `**kwargs` do not contain keys from any of the groups
in :attr:`POSSIBLE_KEY_GROUPS`.
"""
for group in self.POSSIBLE_KEY_GROUPS:
if any([key not in kwargs.keys() for key in group]):
continue
else:
# Get keys from groups.
d = {key: kwargs.get(key) for key in group}
# Get optional keys.
d_extra = {key: kwargs.get(key)
for key in self.OPTIONAL_KEYS
if key in kwargs.keys()}
# Combine and return.
return {**d, **d_extra}
raise KeyError(f"Keys {list(kwargs.keys())} do not contain any of"
f" the required groups: {self.POSSIBLE_KEY_GROUPS}")
[docs]class PDF(ParameterSetList, DefaultLoggerLogic, _ABC):
"""Abstract class for defining probability distribution functions.
Parameters
----------
t
Simulation time vector.
pdf_id
Unique identifier of the `PDF` instance.
"""
def __init__(self, t: _np.ndarray, pdf_id: str = ""):
super().__init__(pdf_id)
assert t[0] == 0
assert t[-1] > 0
self._dt = t[-1] / (t.size - 1)
self._t_steps_max = t.size
# apply cutoff
self.trim_and_normalize = True
"""Trim edges of the pdf and normalize it afterwards.
Default = True.
Relative threshold value is specified by
:attr:`cutoff_relative_to_max`.
Normalization is performed after the trimming.
The area of pd == 1.
"""
self.cutoff_relative_to_max = 0.0001
"""Cutoff as a share of max value of the pdf (default 0.0001).
It is defined to avoid very long tails of the distribution.
Cutoff is enabled if :attr:`trim_and_normalize` == True.
"""
# placeholder for the result of the pdf calculation
self._pdf: _np.array = _np.array([])
def _apply_cutoff_and_normalize(self):
# Get cutoff positions.
i_start, i_end = _utils.vectors.true_start_and_end(
self._pdf >= self.cutoff_relative_to_max * self._pdf.max()
)
# 0 at front.
self._pdf[:i_start] = 0
# Cut at end.
self._pdf = self._pdf[:i_end]
# Normalize.
self._pdf = self._pdf / self._pdf[:i_end].sum() / self._dt
[docs] def update_pdf(self, **kwargs):
"""Re-calculate PDF based on specified parameters.
The calculated probability distribution can be obtained by
:func:`get_p()`
Parameters
----------
**kwargs
Should contain keys from one of the group in
:attr:`POSSIBLE_KEY_GROUPS`.
It may contain additional keys from :attr:`OPTIONAL_KEYS`.
"""
kw = self.assert_and_get_provided_kv_pairs(**kwargs)
self._pdf = self._calc_pdf(kw)
if self.trim_and_normalize:
self._apply_cutoff_and_normalize()
@_abstractmethod
def _calc_pdf(self, kw_pars: dict) -> _np.ndarray: # pragma: no cover
"""Calculation of probability distribution.
Evaluates pdf for a given set of parameters. The keys of the
`kw_pars` include keys from one of the group in
:attr:`POSSIBLE_KEY_GROUPS` and any optional subset of keys from
:attr:`OPTIONAL_KEYS`.
Parameters
----------
kw_pars
Key-value parameters.
Keys contain keys from one of the groups specified in
`self.POSSIBLE_KEY_GROUPS`.
Keys may contain additional keys from `self.OPTIONAL_KEYS`.
"""
raise NotImplementedError
[docs] def get_p(self) -> _np.ndarray:
"""Get probability distribution.
Returns
-------
p: ndarray
Evaluated probability distribution function.
Corresponding time axis starts with 0 and has a fixed step
size (`_dt`).
If :attr:`trim_and_normalize` == 1 then `sum(p * _dt) == 1`.
"""
assert self._pdf.size > 0, f"PDF is empty. Make sure `update_pdf`" \
f" was called before `get_p`."
if self._pdf.size <= 5:
self.log.e("PDF should have a length of at least 5 time steps.")
return self._pdf
[docs]class ChromatographyLoadBreakthrough(ParameterSetList,
DefaultLoggerLogic, _ABC):
"""What parts of the load bind to the column.
Parameters
----------
dt
Time step duration.
bt_profile_id
Unique identifier of the PDF instance. Used for logs.
"""
def __init__(self, dt: float,
bt_profile_id: str = "ChromatographyLoadBreakthrough"):
super().__init__(bt_profile_id)
assert dt > 0
self._dt = dt
[docs] def update_btc_parameters(self, **kwargs):
"""Update binding dynamics for a given set of parameters.
Parameters
----------
**kwargs
Should contain keys from one of the group in
:attr:`POSSIBLE_KEY_GROUPS`.
It may contain additional keys from :attr:`OPTIONAL_KEYS`.
"""
kw = self.assert_and_get_provided_kv_pairs(**kwargs)
self._update_btc_parameters(kw)
[docs] def calc_c_bound(self,
f_load: _np.ndarray,
c_load: _np.ndarray) -> _np.ndarray:
"""Calculates what parts of load bind to the column.
The default implementation calculates cumulative mass of the
load material and passes it to :func:`_update_btc_parameters`
abstract method for evaluation on what shares of the load
bind to the column. Those shares are then multiplied by `c_load`
in order to obtain resulting `c_bound`.
This method is meant to be overridden, if needed.
Parameters
---------
f_load
Load flow rate profile.
c_load
Load concentration profile. Concentration profile should
include only species which bind to the column.
Returns
-------
c_bound: ndarray
Parts of the load that bind to the column during the
load step.
`c_bound` has the same shape as `c_load`.
"""
# All species are summed together.
m_cum_sum = _np.cumsum(
(c_load * f_load[_np.newaxis, :]).sum(0)) * self._dt
if m_cum_sum[-1] == 0: # empty concentration vector
return _np.zeros_like(c_load)
unbound_to_load_ratio = \
self._calc_unbound_to_load_ratio(m_cum_sum.flatten())
c_bound = c_load * (1 - unbound_to_load_ratio[_np.newaxis, :])
return c_bound
@_abstractmethod
def _update_btc_parameters(self,
kw_pars: dict): # pragma: no cover
"""Update binding dynamics for a given key-value set.
Parameters
----------
kw_pars: dict
Key-value parameters.
Keys contain keys from one of the groups specified in
`self.POSSIBLE_KEY_GROUPS` and may contain additional keys
from `self.OPTIONAL_KEYS`.
"""
pass
@_abstractmethod
def _calc_unbound_to_load_ratio(self, loaded_material: _np.ndarray
) -> _np.ndarray: # pragma: no cover
"""Calculates what share of the load binds to the column.
Typical implementation would be just direct evaluation of the
breakthrough curve. See the `ConstantPatternSolution` subclass.
Parameters
----------
loaded_material
Cumulative sum of the amount of loaded material.
Returns
-------
bound_to_unbound_ratio
Ratio between the amount of captured material and the
amount of load material. `bound_to_unbound_ratio` has
the same size as `loaded_material`.
"""
pass
[docs] @_abstractmethod
def get_total_bc(self) -> float: # pragma: no cover
"""Total binding capacity.
Meant e.g. for determining column utilization.
"""
pass
[docs]class RtdModel(DefaultLoggerLogic, _ABC):
"""Combines `Inlet` and a train of `UnitOperation`-s into a model.
The logger assigned to the instance of RtdModel is passed on to
:class:`bio_rtd.core.Inlet` and
:class:`bio_rtd.core.UnitOperation` instances.
Parameters
----------
inlet
Inlet profile.
dsp_uo_chain
Sequence of unit operations. The sequence needs to be in order.
logger
Logger for sending status messages and storing intermediate
data.
title
Title of the model.
desc
Description of the model.
See Also
--------
:class:`bio_rtd.core.Inlet`
:class:`bio_rtd.core.UnitOperation`
:class:`bio_rtd.logger.RtdLogger`
"""
def __init__(self,
inlet: Inlet,
dsp_uo_chain: _typing.Sequence[UnitOperation],
logger: _typing.Optional[_logger.RtdLogger] = None,
title: str = "RtdModel",
desc: str = ""):
super().__init__(title)
# Ensure unique `uo_id`s.
ids = [uo.uo_id for uo in dsp_uo_chain]
assert len(ids) == len(set(ids)), \
"Each unit operation must have a unique id (`uo_id`)"
# Bind parameters.
self.inlet = inlet
""":class:`bio_rtd.core.Inlet`: Inlet for `self.dsp_uo_chain`"""
self.dsp_uo_chain = dsp_uo_chain
"""sequence of :class:`bio_rtd.core.UnitOperation`: Chain of
unit operations in the process.
Unit operations need to be in proper order.
The logger in unit operations is overridden by the logger from
this model.
"""
self.title = title
"""Human readable title (mostly for plots)"""
self.desc = desc
"""Human readable description (also mostly for plots)"""
# Init data log tree with empty dict for each unit operation.
self._log_tree = _OrderedDict(
{inlet.uo_id: {}, **{uo.uo_id: {} for uo in dsp_uo_chain}})
self.log = logger if logger is not None else self.log
[docs] def get_dsp_uo(self, uo_id: str) -> UnitOperation:
"""Get reference to a `UnitOperation` with specified `uo_id`."""
for uo in self.dsp_uo_chain:
if uo.uo_id == uo_id:
return uo
raise KeyError(f"Unit operation `{uo_id}` not found.\n"
f"Available: {[uo.uo_id for uo in self.dsp_uo_chain]}")
[docs] def recalculate(self,
start_at: int = -1,
on_update_callback: _typing.Optional[
_typing.Callable[[int], None]
] = None):
"""Recalculate process fluid propagation.
Parameters
----------
start_at
Index of first unit operation for re-evaluation.
Indexing starts at 0 (-1 for the inlet). Default = -1.
on_update_callback
Optional callback function which receives an integer.
The integer corresponds to the index of re-evaluated unit
operation, starting with 0 (-1 for inlet).
This can serve as a trigger for updating UI or any other
post-processing after re-evaluation of unit operations.
"""
# Evaluate inlet profile.
if start_at == -1:
self.inlet.refresh()
self._notify_updated(-1, on_update_callback)
start_at = 0
# Get outlet of previous unit operation.
if start_at == 0:
f, c = self.inlet.get_result()
else:
f, c = self.dsp_uo_chain[start_at - 1].get_result()
# Evaluate subsequent unit operations.
for i in range(start_at, len(self.dsp_uo_chain)):
f, c = self.dsp_uo_chain[i].evaluate(f, c)
self._notify_updated(i, on_update_callback)
def _notify_updated(self, uo_i: int, on_update_callback):
# Show INFO log.
if uo_i == -1:
uo = self.inlet
self.log.i("Inlet profile updated")
else:
uo = self.dsp_uo_chain[uo_i]
self.log.i(f"Unit operation `{uo.uo_id}` updated")
# Store profiles in DEBUG data log.
if self.log.log_level <= self.log.DEBUG:
f, c = uo.get_result()
self.log.d_data(self._log_tree[uo.uo_id], "f", f)
self.log.d_data(self._log_tree[uo.uo_id], "c", c)
# Call callback function if specified.
if on_update_callback:
on_update_callback(uo_i)
@DefaultLoggerLogic.log.setter
def log(self, logger: _logger.RtdLogger):
self._logger = logger
self._logger.set_data_tree(self._log_entity_id, self._log_tree)
# Pass logger to inlet and unit operations.
self.inlet.log = logger
for uo in self.dsp_uo_chain:
uo.log = logger
[docs] def set_logger_from_parent(self, parent_id: str,
logger: _logger.RtdLogger): # pragma: no cover
# This dummy definition is here just to maintain the right order
# of methods in documentation.
super().set_logger_from_parent(parent_id, logger)
[docs]class UserInterface(_ABC):
"""Wrapper around RtdModel suitable for building GUI on top of it.
Parameters
----------
rtd_model
Residence time distribution model.
See Also
--------
:class:`bio_rtd.core.RtdModel`
"""
def __init__(self, rtd_model: RtdModel):
self.rtd_model = rtd_model
# default values - update them after init
self.start_at: int = -1
"""Index of first unit operation for re-evaluation.
Indexing starts at 0 (-1 for the inlet). Default = -1.
"""
self.species_label: _typing.Sequence[str] = \
self.rtd_model.inlet.species_list
"""Labels of the species in concentration array.
Initially inherited from :class:`bio_rtd.core.Inlet` instance.
"""
self.x_label: str = 't'
"""Label of x axis (time). Default = 't'"""
self.y_label_c: str = 'c'
"""Label of y axis (concentration). Default = 'c'"""
self.y_label_f: str = 'f'
"""Label of y axis (flow rate). Default = 'f'"""
[docs] def recalculate(self, forced=False):
"""Re-evaluates the model from the `start_at` index onwards.
Parameters
----------
forced
If true, the entire model (inlet + unit operations) is
re-evaluated. The same can be achieved by setting
:attr:`start_at` to -1.
"""
start_at = -1 if forced else self.start_at
def callback_fun(i):
if i == -1:
f, c = self.rtd_model.inlet.get_result()
else:
f, c = self.rtd_model.dsp_uo_chain[i].get_result()
self._update_ui_for_uo(i, f, c)
# Re-calculate the model.
self.rtd_model.recalculate(start_at, callback_fun)
[docs] @_abstractmethod
def build_ui(self): # pragma: no cover
"""Build the UI from scratch."""
raise NotImplementedError
@_abstractmethod
def _update_ui_for_uo(self, uo_i, f, c): # pragma: no cover
"""Update the UI for specific unit operation.
Parameters
----------
uo_i: int
Index of unit operation (-1 for inlet profile).
Indexes for DSP unit operation train start with 0.
c: _np.ndarray
Outlet concentration profile.
f: _np.ndarray
Outlet flow profile.
"""
raise NotImplementedError