"""Semi continuous unit operations.
Unit operations that accept constant or box-shaped flow rate profile
and provide periodic flow rate profile.
"""
__all__ = ['AlternatingChromatography', 'ACC', 'PCC', 'PCCWithWashDesorption']
__version__ = '0.7.1'
__author__ = 'Jure Sencar'
import typing as _typing
import numpy as _np
import scipy.interpolate as _interp
from bio_rtd.chromatography import bt_load as _bt_load
import bio_rtd.utils as _utils
import bio_rtd.core as _core
import bio_rtd.pdf as _pdf
[docs]class AlternatingChromatography(_core.UnitOperation):
"""Simulation of alternating chromatography.
This class implements logic common to various types of alternating
chromatography. It has a role of a base class for
specific types of alternating chromatography to extend.
Parameters
----------
t
Simulation time vector.
Starts with 0 and has a constant time step.
uo_id
Unique identifier.
load_bt
Load breakthrough logic.
peak_shape_pdf
Elution peak shape.
gui_title
Readable title for GUI. Default = "AC".
Notes
-----
**Quick description of which attributes are available:**
Non-binding species (optional):
* :attr:`non_binding_species`
Column volume (exactly one required):
* :attr:`cv`
* :attr:`ft_mean_retentate` and :attr:`column_porosity_retentate`
Column porosity for binding species (required in case of
:attr:`ft_mean_retentate` or wash or load recycling):
* :attr:`column_porosity_retentate`
Equilibration step duration (optional, if both, the values are
added together):
* :attr:`equilibration_cv`
* :attr:`equilibration_t`
Equilibration step flow rate (exactly one needed):
* :attr:`equilibration_f` - absolute, has priority if defined
* :attr:`equilibration_f_rel` - relative, default = 1
Load step duration:
* :attr:`load_cv` - preferred
* :attr:`load_c_end_ss` - concentration limit for breakthrough; also
requires :attr:`load_recycle_pdf`
* :attr:`load_c_end_relative_ss` - concentration limit for
breakthrough relative to steady-state load concentration; also
requires :attr:`load_recycle_pdf`
Iterative optimization of estimation of load step duration
(ignored if :attr:`load_cv` is defined):
* :attr:`load_c_end_estimate_with_iterative_solver` - default = True
* :attr:`load_c_end_estimate_with_iter_solver_max_iter` - default =
1000
Extension of first load step (optional; ignored if no recycling):
* :attr:`load_extend_first_cycle` - default = `False`
* :attr:`load_extend_first_cycle_cv` and
:attr:`load_extend_first_cycle_t` - added together if both defined
Load linear velocity - only for column height determination
(optional):
* :attr:`load_target_lin_velocity`
Wash step duration (optional, if both, the values are
added together):
* :attr:`wash_cv`
* :attr:`wash_t`
Wash step flow rate (exactly one needed):
* :attr:`wash_f` - absolute, has priority if defined
* :attr:`wash_f_rel` - relative, default = 1
Unaccounted losses - applied before peak cut (optional):
* :attr:`unaccounted_losses_rel` - relative, default = 1
Elution step duration (optional, if both, the values are
added together):
* :attr:`elution_cv`
* :attr:`elution_t`
Elution step flow rate (exactly one needed):
* :attr:`elution_f` - absolute, has priority if defined
* :attr:`elution_f_rel` - relative, default = 1
Elution buffer composition (optional):
* :attr:`elution_buffer_c`
Elution peak position duration - first momentum
(optional, if both, the values are added together):
* :attr:`elution_peak_position_cv`
* :attr:`elution_peak_position_t`
Elution peak cut start (one is required):
* :attr:`elution_peak_cut_start_t`
* :attr:`elution_peak_cut_start_cv`
* :attr:`elution_peak_cut_start_c_rel_to_peak_max`
* :attr:`elution_peak_cut_start_peak_area_share`
Elution peak cut end (one is required):
* :attr:`elution_peak_cut_end_t`
* :attr:`elution_peak_cut_end_cv`
* :attr:`elution_peak_cut_end_c_rel_to_peak_max`
* :attr:`elution_peak_cut_end_peak_area_share`
Regeneration step duration (optional, if both, the values are
added together):
* :attr:`regeneration_cv`
* :attr:`regeneration_t`
Regeneration step flow rate (exactly one needed):
* :attr:`regeneration_f` - absolute, has priority if defined
* :attr:`regeneration_f_rel` - relative, default = 1
Wash desorption (optional, also check if class supports it):
* :attr:`wash_desorption` - default = `False`
Load breakthrough recycle (optional):
* :attr:`load_recycle` - default = `False`
Load breakthrough propagation dynamics
(required if :attr:`load_recycle` is `True`
or :attr:`load_c_end_ss` is defined or
or :attr:`load_c_end_relative_ss` is defined):
* :attr:`load_recycle_pdf`
Wash recycle (optional):
* :attr:`wash_recycle` - default = `False`
Duration of wash recycling
(optional; ignored if :attr:`wash_recycle` is `False`):
* :attr:`wash_recycle_duration_cv` and
:attr:`wash_recycle_duration_t` - summed together if both defined.
* Entire wash step if
:attr:`wash_recycle_duration_cv` and
:attr:`wash_recycle_duration_t` are not defined.
Please note that subclasses might introduce new attributes or change
the default values of existing attributes.
"""
def __init__(self,
t: _np.ndarray,
uo_id: str,
load_bt: _core.ChromatographyLoadBreakthrough,
peak_shape_pdf: _core.PDF,
gui_title: str = "AC"):
super().__init__(t, uo_id, gui_title)
# Bind parameters.
self.load_bt: _core.ChromatographyLoadBreakthrough = load_bt
"""Determines what part of load material binds to the column."""
self.elution_peak_shape: _core.PDF = peak_shape_pdf
"""Elution peak shape."""
self.non_binding_species: _typing.Sequence[int] = []
"""Process buffer species that are NOT binding to the column.
Indexing starts with 0.
"""
self.cv: float = -1
"""Column volume.
Column volume should be defined by exactly one of the following
attribute groups:
* :attr:`cv` (this one)
* :attr:`ft_mean_retentate`
and :attr:`column_porosity_retentate`
"""
self.ft_mean_retentate: float = -1
"""Flow-through time of retentate under non-binding conditions.
Used to define column volume (independently of scale).
Column volume should be defined by exactly one of the following
attribute groups:
* :attr:`cv`
* :attr:`ft_mean_retentate` (this one) and
:attr:`column_porosity_retentate`
"""
self.column_porosity_retentate: float = -1
"""Column porosity for retentate under non-binding conditions.
Required in case :attr:`ft_mean_retentate` is used to define
column volume.
Required in case :attr:`load_c_end_ss` or
:attr:`load_c_end_relative_ss` are used to estimate
load step duration.
Required in case of load or wash recycling.
"""
self.equilibration_cv: float = -1
"""Duration of equilibration step.
The values of :attr:`equilibration_t` and
:attr:`equilibration_cv` are added together.
"""
self.equilibration_t: float = -1
"""Duration of equilibration step.
The values of :attr:`equilibration_t` and
:attr:`equilibration_cv` are added together.
"""
self.equilibration_f: float = -1
"""Equilibration step flow rate.
Equilibration step flow rate should be defined by
exactly one of the following attributes:
* :attr:`equilibration_f` (this one)
* :attr:`equilibration_f_rel`
"""
self.equilibration_f_rel: float = 1
"""Equilibration step flow rate relative to load flow rate.
Default = 1.
Equilibration step flow rate = :attr:`equilibration_f_rel`
* `load flow rate`
Equilibration step flow rate should be defined by
exactly one of the following attributes:
* :attr:`equilibration_f`
* :attr:`equilibration_f_rel` (this one)
"""
# Duration of the load phase.
self.load_cv: float = -1 # load duration in CV
"""Load phase duration in CV.
This is preferable way to define the duration of the load step
as it does not require any estimations about steady state.
Load phase duration should be defined by exactly one of
the following attribute groups:
* :attr:`load_cv` (this one)
* :attr:`load_c_end_ss`
* :attr:`load_c_end_relative_ss`
Notes
-----
First load step can be extended by setting
:attr:`load_extend_first_cycle`,
:attr:`load_extend_first_cycle_cv` and
:attr:`load_extend_first_cycle_t`.
"""
self.load_c_end_ss: _typing.Optional[_np.ndarray] = None
"""Load phase switch based on target product breakthrough conc.
Load phase duration is estimated from simulating steady state
operation and determining when the breakthrough reaches
specified concentration.
Steady state simulation requires
:attr:`column_porosity_retentate`
:attr:`load_recycle_pdf`.
Load phase duration should be defined by exactly one of
the following attribute groups:
* :attr:`load_cv` (preferred)
* :attr:`load_c_end_ss` (this one)
* :attr:`load_c_end_relative_ss`
Notes
-----
First load step can be extended by setting
:attr:`load_extend_first_cycle`,
:attr:`load_extend_first_cycle_cv` and
:attr:`load_extend_first_cycle_t`.
"""
self.load_c_end_relative_ss: float = -1
"""Load phase switch based on relative breakthrough conc.
Load phase duration is estimated from simulating steady state
operation and determining when the product (binding species)
in the breakthrough reaches specified relative concentration
(relative to load concentration in steady-state operation).
Steady state simulation requires
:attr:`column_porosity_retentate`
:attr:`load_recycle_pdf`.
Load phase duration should be defined by exactly one of
the following attribute groups:
* :attr:`load_cv` (preferred)
* :attr:`load_c_end_ss`
* :attr:`load_c_end_relative_ss` (this one)
Notes
-----
First load step can be extended by setting
:attr:`load_extend_first_cycle`,
:attr:`load_extend_first_cycle_cv` and
:attr:`load_extend_first_cycle_t`.
"""
self.load_c_end_estimate_with_iterative_solver: bool = True
"""Finer optimization of cycle length estimation.
Default = `True`.
In case load step duration is estimated based of breakthrough
criteria (i.e. by :attr:`load_c_end_ss` or
:attr:`load_c_end_relative_ss`), the model needs to simulate
steady-state operation in order to determine fixed load time.
This parameters enables iterative solver that allows more
precise estimation but might slow down the simulation.
Notes
-----
Max number of iteration steps is defined by
:attr:`load_c_end_estimate_with_iter_solver_max_iter`.
"""
self.load_c_end_estimate_with_iter_solver_max_iter: int = 1000
"""Max steps for optimization of cycle length estimation.
Default = 1000.
See Also
--------
:attr:`load_c_end_estimate_with_iterative_solver`
"""
self.load_extend_first_cycle: bool = False
"""Extend first load phase to achieve a faster steady-state.
Only relevant in case wash or load is recycled.
The duration of extension is defined by:
* :attr:`load_extend_first_cycle_cv` or
* :attr:`load_extend_first_cycle_t` or
* is determined automatically.
"""
self.load_extend_first_cycle_cv: float = -1
"""Duration of first load phase extension in column volumes.
Only relevant if :attr:`load_extend_first_cycle` is `True`.
If the duration if defined by
:attr:`load_extend_first_cycle_cv` and
:attr:`load_extend_first_cycle_t`
then the values are added together.
"""
self.load_extend_first_cycle_t: float = -1
"""Duration of first load phase extension (time).
Only relevant if :attr:`load_extend_first_cycle` is `True`.
If the duration if defined by
:attr:`load_extend_first_cycle_cv` and
:attr:`load_extend_first_cycle_t`
then the values are added together.
"""
self.load_target_lin_velocity: float = -1
"""Target load linear velocity.
It is used to provide information about required column height.
It does not have any impact on the rest of the model.
Units need to match other units in the model.
"""
self.wash_cv: float = -1
"""Duration of wash step.
The values of :attr:`wash_t` and
:attr:`wash_cv` are added together.
"""
self.wash_t: float = -1
"""Duration of wash step.
The values of :attr:`wash_t` and
:attr:`wash_cv` are added together.
"""
self.wash_f: float = -1
"""Wash step flow rate.
Wash step flow rate should be defined by
exactly one of the following attributes:
* :attr:`wash_f` (this one)
* :attr:`wash_f_rel`
"""
self.wash_f_rel: float = 1
"""Wash step flow rate relative to load flow rate. Default = 1.
Wash step flow rate = :attr:`wash_f_rel`
* `load flow rate`
Wash step flow rate should be defined by
exactly one of the following attributes:
* :attr:`wash_f`
* :attr:`wash_f_rel` (this one)
"""
self.unaccounted_losses_rel: float = 0
"""Unaccounted losses as a share of bound material.
Elution peak is scaled down by 1 - `unaccounted_losses_rel`
before applying peak cut criteria.
"""
self.elution_cv: float = -1
"""Duration of elution step.
The values of :attr:`elution_t` and
:attr:`elution_cv` are added together.
"""
self.elution_t: float = -1
"""Duration of elution step.
The values of :attr:`elution_t` and
:attr:`elution_cv` are added together.
"""
self.elution_f: float = -1
"""Elution step flow rate.
Elution step flow rate should be defined by
exactly one of the following attributes:
* :attr:`elution_f` (this one)
* :attr:`elution_f_rel`
"""
self.elution_f_rel: float = 1
"""Elution step flow rate relative to load flow rate.
Default = 1.
Elution step flow rate = :attr:`elution_f_rel`
* `load flow rate`
Elution step flow rate should be defined by
exactly one of the following attributes:
* :attr:`elution_f`
* :attr:`elution_f_rel` (this one)
"""
self.elution_buffer_c: _np.ndarray = _np.array([])
"""Elution buffer composition.
Default = empty array (= all components are 0).
If defined it must have a value for each specie.
"""
self.elution_peak_position_cv: float = -1
"""Position (cv) of elution peak in the elution step.
This is for 1st moment or mean residence time (and not
necessarily peak max position).
The values of :attr:`elution_peak_position_t` and
:attr:`elution_peak_position_cv` are added together.
"""
self.elution_peak_position_t: float = -1
"""Position (time) of elution peak in the elution step.
This is for 1st moment or mean residence time (and not
necessarily peak max position).
The values of :attr:`elution_peak_position_t` and
:attr:`elution_peak_position_cv` are added together.
"""
self.elution_peak_cut_start_t: float = -1
"""Elution peak cut start (time).
Exactly one peak cut start criteria should be defined.
"""
self.elution_peak_cut_start_cv: float = -1
"""Elution peak cut start (cv).
Exactly one peak cut start criteria should be defined.
"""
self.elution_peak_cut_start_c_rel_to_peak_max: float = -1
"""Elution peak cut start (signal relative to peak max).
Exactly one peak cut start criteria should be defined.
"""
self.elution_peak_cut_start_peak_area_share: float = -1
"""Elution peak cut start (share of total peak area).
Exactly one peak cut start criteria should be defined.
"""
self.elution_peak_cut_end_t: float = -1
"""Elution peak cut end (time).
Exactly one peak cut end criteria should be defined.
"""
self.elution_peak_cut_end_cv: float = -1
"""Elution peak cut end (cv).
Exactly one peak cut end criteria should be defined.
"""
self.elution_peak_cut_end_c_rel_to_peak_max: float = -1
"""Elution peak cut end (signal relative to peak max).
Exactly one peak cut end criteria should be defined.
"""
self.elution_peak_cut_end_peak_area_share: float = -1
"""Elution peak cut end (share of total peak area).
Exactly one peak cut end criteria should be defined.
"""
self.regeneration_cv: float = -1
"""Duration of regeneration step.
The values of :attr:`regeneration_t` and
:attr:`regeneration_cv` are added together.
"""
self.regeneration_t: float = -1
"""Duration of regeneration step.
The values of :attr:`regeneration_t` and
:attr:`regeneration_cv` are added together.
"""
self.regeneration_f: float = -1
"""Regeneration step flow rate.
Regeneration step flow rate should be defined by
exactly one of the following attributes:
* :attr:`regeneration_f` (this one)
* :attr:`regeneration_f_rel`
"""
self.regeneration_f_rel: float = 1
"""Regeneration step flow rate relative to load flow rate.
Default = 1.
Regeneration step flow rate = :attr:`regeneration_f_rel`
* `load flow rate`
Regeneration step flow rate should be defined by
exactly one of the following attributes:
* :attr:`regeneration_f`
* :attr:`regeneration_f_rel` (this one)
"""
self.wash_desorption: bool = False
"""Enable wash desorption.
Make sure the class implements the desorption dynamics.
"""
self.load_recycle: bool = False
"""Recycle load breakthrough. Default = False."""
self.load_recycle_pdf: _typing.Optional[_core.PDF] = None
"""PDF of wash and/or unbound load traveling through the column.
The unbound (not captured) part and desorbed part are propagated
through the column by :attr:`load_recycle_pdf`.
Void volume for :attr:`load_recycle_pdf` is defined as
:attr:`column_porosity_retentate` * `column volume`.
"""
self.wash_recycle: bool = False
"""Recycle wash. Default = False.
Wash is recycled onto 3rd column while the 2nd is on load step.
After the wash recycle, the 3rd column is connected to 2nd
column to recycle load breakthrough material.
"""
self.wash_recycle_duration_cv: float = -1
"""Duration of wash recycle (cv).
Relevant if :attr:`wash_recycle` is `True`.
If both (`wash_recycle_duration_cv` and
:attr:`wash_recycle_duration_t`) are defined, then the values
are added together. If none of those is defined, then the
entire wash step is recycled.
"""
self.wash_recycle_duration_t: float = -1
"""Duration of wash recycle (time).
Relevant if :attr:`wash_recycle` is `True`.
If both (`wash_recycle_duration_t` and
:attr:`wash_recycle_duration_cv`) are defined, then the values
are added together. If none of those is defined, then the
entire wash step is recycled.
"""
@_core.UnitOperation.log.setter
def log(self, logger: _core._logger.RtdLogger):
"""Propagates logger across other elements that support it."""
# Default logic.
self._logger = logger
self._logger.set_data_tree(self._log_entity_id, self._log_tree)
# Propagate logger across other elements with logging.
if self.load_recycle_pdf is not None:
self.load_recycle_pdf.set_logger_from_parent(self.uo_id, logger)
if self.load_recycle_pdf is not None:
self.elution_peak_shape.set_logger_from_parent(self.uo_id, logger)
if self.load_recycle_pdf is not None:
self.load_bt.set_logger_from_parent(self.uo_id, logger)
def _get_flow_value(self,
step_name: str, var_name: str,
flow: float, rel_flow: float) -> float:
"""Calc flow rate of chromatographic step.
If `flow` is specified, `flow` is used.
Otherwise `rel_flow` == flow rate relative to load flow rate is
used.
If none are positive, then the load flow rate is used
and a warning is logged.
Parameters
----------
step_name
Step name (e.g. "Wash") for log messages.
var_name
Step variable name (e.g. "wash_t") for log data.
flow
Flow rate.
rel_flow
Flow rate relative to load flow rate.
Returns
-------
float
Flow rate.
"""
if flow > 0:
self.log.i_data(self._log_tree, var_name, flow)
elif rel_flow > 0:
flow = rel_flow * self._load_f
self.log.i_data(self._log_tree, var_name, flow)
else:
self.log.w(f"{step_name} step flow rate is not defined,"
f" using load flow rate instead.")
flow = self._load_f
return flow
def _get_time_value(self,
step_name: str, var_name: str,
t: float, cv: float, flow: float) -> float:
"""Calc duration of chromatographic step.
If the step duration is specified in cv and in t, then the
value are added together.
Parameters
----------
step_name
Step name (e.g. "Wash") for log messages.
var_name
Step variable name (e.g. "wash_t") for log data.
t
Duration (time).
cv
Duration (cv).
flow
Flow rate (required if `cv` > 0).
Returns
-------
float
Total step duration (time).
"""
# Calc.
t_sum = max(t, 0)
if cv > 0:
assert flow > 0, f"{step_name}: Flow rate must be defined (> 0)" \
f" if the duration is specified in CVs."
assert self._cv > 0, f"CV must be determined (by `calc_cv`)" \
f" before calculating duration based on CVs."
t_sum += cv * self._cv / flow # sum
# Log.
if t <= 0 and cv <= 0:
self.log.w(step_name + " time is not defined")
else:
self.log.i_data(self._log_tree, var_name, t_sum)
return t_sum
def _assert_non_binding_species(self):
"""Make sure binding species list is valid."""
if len(self.non_binding_species) > 0:
assert max(self.non_binding_species) < self._n_species, \
"Index of non_binding_species too large (indexes start with 0)"
assert list(set(self.non_binding_species)) \
== list(self.non_binding_species), \
"List of non_binding_species should have ascending order"
assert len(self.non_binding_species) < self._n_species, \
"All species cannot be non-binding."
# Log.
self.log.i_data(self._log_tree,
'non_binding_species',
self.non_binding_species)
def _calc_load_f(self):
"""Determine load flow rate (when on)."""
assert self._is_flow_box_shaped(), "Inlet flow must be box shaped."
self._load_f = self._f.max()
self.log.d_data(self._log_tree, 'load_f', self._load_f)
def _calc_cv(self):
"""Determine column volume."""
self._ensure_single_non_negative_parameter(
log_level_multiple=self.log.ERROR, log_level_none=self.log.ERROR,
cv=self.cv,
ft_mean_retentate=self.ft_mean_retentate,
)
if self.cv > 0:
self._cv = self.cv
else: # `self.ft_mean_retentate` > 0.
assert self.column_porosity_retentate > 0, \
f"porosity_retentate must be defined to calc CV from " \
f" `self.ft_mean_retentate`."
assert self._load_f > 0, f"Load flow rate must be defined to" \
f" calc CV from `self.ft_mean_retentate`."
self._cv = self.ft_mean_retentate * self._load_f \
/ self.column_porosity_retentate
# Log.
self.log.i_data(self._log_tree, 'cv', self._cv)
def _report_column_dimensions(self):
"""Report column dimensions based on load linear velocity."""
if self.load_target_lin_velocity > 0:
self._col_h = self._cv * self.load_target_lin_velocity \
/ self._load_f
self.log.i_data(self._log_tree, "column_h", self._col_h)
self.log.i_data(self._log_tree,
"column_d",
(self._cv / self._col_h / _np.pi) ** 0.5 * 2)
def _calc_equilibration_t(self):
"""Determine equilibration step duration."""
if self.equilibration_cv > 0:
# Flow rate.
eq_f = self._get_flow_value("Equilibration",
"equilibration_f",
self.equilibration_f,
self.equilibration_f_rel)
# Duration.
self._equilibration_t = self._get_time_value("Equilibration",
"equilibration_t",
self.equilibration_t,
self.equilibration_cv,
eq_f)
else:
# Duration.
self._equilibration_t = max(self.equilibration_t, 0)
# Log.
self.log.i_data(self._log_tree,
'equilibration_t',
self._equilibration_t)
def _calc_wash_t_and_f(self):
"""Determine wash step flow rate and duration."""
# Flow rate.
self._wash_f = self._get_flow_value("Wash",
"wash_f",
self.wash_f,
self.wash_f_rel)
# Duration.
self._wash_t = self._get_time_value("Wash",
"wash_t",
self.wash_t,
self.wash_cv,
self._wash_f)
def _calc_elution_t_and_f(self):
"""Determine elution step flow rate and duration."""
# Flow rate.
self._elution_f = self._get_flow_value("Elution",
"elution_f",
self.elution_f,
self.elution_f_rel)
# Duration.
self._elution_t = self._get_time_value("Elution",
"elution_t",
self.elution_t,
self.elution_cv,
self._elution_f)
def _calc_elution_peak_t(self):
"""Determine elution peak mean position (1st momentum)."""
self._elution_peak_t = self._get_time_value(
"elution peak position",
"elution_peak_position_t",
self.elution_peak_position_t,
self.elution_peak_position_cv,
self._elution_f
)
def _update_elution_peak_pdf(self):
"""Update elution peak PDF."""
assert self._elution_peak_t > 0
assert self._elution_f > 0
# Calc elution peak shape.
self.elution_peak_shape.update_pdf(
rt_mean=self._elution_peak_t,
v_void=self._elution_peak_t * self._elution_f,
f=self._elution_f
)
self._p_elution_peak = \
self.elution_peak_shape.get_p() * (1 - self.unaccounted_losses_rel)
self.log.d_data(self._log_tree,
"p_elution_peak",
self._p_elution_peak)
def _calc_elution_peak_cut_i_start_and_i_end(self):
"""Calc elution peak cut start and end in form of time steps.
Values are relative to the beginning of the elution step.
"""
elution_peak_pdf: _np.ndarray = self._p_elution_peak.copy()
# Peak cut start.
self._ensure_single_non_negative_parameter(
log_level_multiple=self.log.ERROR, log_level_none=self.log.WARNING,
elution_peak_cut_start_peak_area_share=self
.elution_peak_cut_start_peak_area_share,
elution_peak_cut_start_c_rel_to_peak_max=self
.elution_peak_cut_start_c_rel_to_peak_max,
elution_peak_cut_start_cv=self.elution_peak_cut_start_cv,
elution_peak_cut_start_t=self.elution_peak_cut_start_t
)
# Calc `elution_peak_cut_start_i`.
if self.elution_peak_cut_start_peak_area_share >= 0:
elution_peak_cut_start_i = _utils.vectors.true_start(
_np.cumsum(elution_peak_pdf * self._dt)
>= self.elution_peak_cut_start_peak_area_share
)
elif self.elution_peak_cut_start_c_rel_to_peak_max >= 0:
elution_peak_cut_start_i = _utils.vectors.true_start(
elution_peak_pdf
>= self.elution_peak_cut_start_c_rel_to_peak_max
* elution_peak_pdf.max()
)
elif self.elution_peak_cut_start_cv >= 0:
elution_peak_cut_start_i = \
int(self.elution_peak_cut_start_cv
* self._cv / self._elution_f / self._dt)
elif self.elution_peak_cut_start_t >= 0:
elution_peak_cut_start_i = \
int(self.elution_peak_cut_start_t / self._dt)
else:
self.log.w(f"Elution peak cut start is not defined."
f" Now collecting from the beginning"
f" of the elution phase.")
elution_peak_cut_start_i = 0
# Log.
self.log.i_data(self._log_tree,
"elution_peak_cut_start_i",
elution_peak_cut_start_i)
self.log.i_data(self._log_tree,
"elution_peak_cut_start_t",
elution_peak_cut_start_i * self._dt)
# Peak cut end.
self._ensure_single_non_negative_parameter(
log_level_multiple=self.log.ERROR, log_level_none=self.log.WARNING,
elution_peak_cut_end_peak_area_share=self
.elution_peak_cut_end_peak_area_share,
elution_peak_cut_end_c_rel_to_peak_max=self
.elution_peak_cut_end_c_rel_to_peak_max,
elution_peak_cut_end_cv=self.elution_peak_cut_end_cv,
elution_peak_cut_end_t=self.elution_peak_cut_end_t,
)
# Calc `elution_peak_cut_end_i`.
if self.elution_peak_cut_end_peak_area_share >= 0:
elution_peak_cut_end_i = _utils.vectors.true_start(
_np.cumsum(elution_peak_pdf * self._dt)
>= (1 - self.elution_peak_cut_end_peak_area_share)
)
elif self.elution_peak_cut_end_c_rel_to_peak_max >= 0:
elution_peak_cut_end_i = _utils.vectors.true_end(
elution_peak_pdf
>= self.elution_peak_cut_end_c_rel_to_peak_max
* elution_peak_pdf.max()
)
elif self.elution_peak_cut_end_cv >= 0:
elution_peak_cut_end_i = \
int(self.elution_peak_cut_end_cv
* self._cv / self._elution_f / self._dt)
elif self.elution_peak_cut_end_t >= 0:
elution_peak_cut_end_i = \
_utils.vectors.true_end(self._t < self.elution_peak_cut_end_t)
else:
self.log.w(f"Elution peak cut end is not defined."
f" Now collecting to the end of the elution phase.")
elution_peak_cut_end_i = elution_peak_pdf.size
self._elution_peak_cut_start_i = elution_peak_cut_start_i
self._elution_peak_cut_end_i = elution_peak_cut_end_i
# Log.
self.log.i_data(self._log_tree,
"elution_peak_cut_end_i",
elution_peak_cut_end_i)
self.log.i_data(self._log_tree,
"elution_peak_cut_end_t",
elution_peak_cut_end_i * self._dt)
if self._elution_peak_cut_end_i * self._dt < self._elution_peak_t:
self.log.w(f"Peak end is cut before its maximum.")
if self._elution_peak_cut_end_i * self._dt > self._elution_t:
self.log.w(f"Peak cut end exceeds elution step duration.")
def _calc_elution_peak_mask(self):
"""Calc where the elution peak gets collected."""
self._elution_peak_mask = \
_np.ones(int(round(self._elution_t / self._dt)), dtype=bool)
self._elution_peak_mask[self._elution_peak_cut_end_i:] = False
self._elution_peak_mask[:self._elution_peak_cut_start_i] = False
self.log.d_data(self._log_tree,
"elution_peak_interval",
self._elution_peak_mask)
def _update_load_btc(self):
"""Update load breakthrough profile."""
assert self._cv > 0, "CV must be defined by now."
self.load_bt.update_btc_parameters(cv=self._cv)
def _calc_regeneration_t(self):
"""Calc regeneration step duration."""
if self.regeneration_cv > 0:
eq_f = self._get_flow_value("Regeneration",
"regeneration_f",
self.regeneration_f,
self.regeneration_f_rel)
self._regeneration_t = self._get_time_value("Regeneration",
"regeneration_t",
self.regeneration_t,
self.regeneration_cv,
eq_f)
else:
self._regeneration_t = max(self.regeneration_t, 0)
# Log.
self.log.i_data(self._log_tree, 'regeneration_t', self._regeneration_t)
def _update_load_recycle_pdf(self, flow):
"""Update pdf that describes propagation of recycled material.
Recycled material si composed of unbound (load) and desorbed
(wash) material throughout the column.
`self.load_recycle_pdf` gets updated.
"""
assert self.load_recycle_pdf is not None, \
f"`load_recycle_pdf` must be defined by now."
assert self.column_porosity_retentate > 0, \
f"Retentate porosity must be defined by now."
assert self._cv > 0, "CV must be defined by now."
v_void = self._cv * self.column_porosity_retentate
self.load_recycle_pdf.update_pdf(v_void=v_void,
f=flow,
rt_mean=v_void / flow)
self._p_load_recycle_pdf = self.load_recycle_pdf.get_p()
def _calc_load_recycle_wash_i(self):
"""Calculate wash recycle duration in form of time steps."""
if self.wash_recycle_duration_t > 0 \
or self.wash_recycle_duration_cv > 0:
self._wash_recycle_i_duration = int(self._get_time_value(
"Wash recycle", "load_wash_recycle_t",
self.wash_recycle_duration_t,
self.wash_recycle_duration_cv,
self._wash_f
) / self._dt)
else:
# Same as wash duration.
assert self._wash_t > 0
self._wash_recycle_i_duration = int(round(self._wash_t / self._dt))
def _get_load_bt_cycle_switch_criteria(self,
load_c_ss: _np.ndarray
) -> _np.ndarray:
"""Get steady-state cycle switch (== end of load) criteria.
Parameters
----------
load_c_ss
Load concentration during steady state operation.
Returns
-------
ndarray
Threshold concentration for load breakthrough.
"""
assert self.load_c_end_ss is not None \
or self.load_c_end_relative_ss > 0, \
f"Load step duration should be defined!"
if self.load_c_end_ss is not None:
load_c_end_ss = self.load_c_end_ss
if self.load_c_end_relative_ss > 0:
self.log.w(f"Cycle time defined by `load_c_end_ss`"
f" and `load_c_end_relative_ss`."
f" Simulation is using `load_c_end_ss`.")
else: # self.load_c_end_relative_ss > 0
load_c_end_ss = self.load_c_end_relative_ss * load_c_ss
# Log.
self.log.i_data(self._log_tree,
'load_c_end_ss',
load_c_end_ss)
return load_c_end_ss
# noinspection DuplicatedCode
def _calc_cycle_t(self):
"""Calculates cycle time (== load time for a single column).
Optional delay of first cycle is not part of this calculation.
"""
assert self._cv > 0
assert self._load_f > 0
if self.load_cv > 0:
t_cycle = self.load_cv * self._cv / self._load_f
if self.load_c_end_ss is not None \
or self.load_c_end_relative_ss > 0:
self.log.w(f"Cycle time defined in more than one way."
f" Simulation is using `load_cv`.")
else:
# Get bt profile for constant inlet.
# Inlet conc.
binding_species = [i for i in range(self._n_species)
if i not in self.non_binding_species]
load_c_ss = self._estimate_steady_state_mean_c(binding_species)
# Simulate first cycle at constant load concentration.
f_first_load = self._load_f * _np.ones(self._t.size)
c_first_load = load_c_ss * _np.ones([len(binding_species),
self._t.size])
bt_first_load: _np.ndarray = \
load_c_ss - self.load_bt.calc_c_bound(f_first_load,
c_first_load)
# Propagate breakthrough.
bt_first_load_out, bt_first_wash_out = \
self._sim_c_recycle_propagation(f_first_load,
bt_first_load,
None)
# Calc cycle duration.
load_c_end_ss = self._get_load_bt_cycle_switch_criteria(load_c_ss)
# noinspection PyTypeChecker
i_t_first_cycle = _utils.vectors.true_start(
bt_first_load_out.sum(0) >= load_c_end_ss.sum())
t_cycle = i_t_first_cycle * self._dt
# Wash desorption.
if self.wash_desorption and self.wash_recycle:
c_wash_desorbed = self._sim_c_wash_desorption(
f_first_load[:i_t_first_cycle],
c_first_load[:, :i_t_first_cycle]
- bt_first_load[:, :i_t_first_cycle])
else:
c_wash_desorbed = None
bt_first_load_out, bt_first_wash_out = \
self._sim_c_recycle_propagation(
f_first_load[:i_t_first_cycle],
bt_first_load[:, :i_t_first_cycle],
c_wash_desorbed)
if self.load_recycle:
if not self.load_c_end_estimate_with_iterative_solver:
self.log.w(f"Estimating cycle duration:"
f" Assuming sharp breakthrough profile.")
i_load_recycle_start = self._wash_recycle_i_duration \
if self.wash_recycle else 0
m_load_recycle = \
bt_first_load_out[
:,
i_load_recycle_start:i_t_first_cycle
].sum() * self._load_f * self._dt
_t_diff = m_load_recycle / self._load_f / load_c_ss.sum()
t_cycle -= _t_diff
self._load_recycle_m_ss = m_load_recycle
self.log.i_data(self._log_tree,
'm_load_recycle_ss',
m_load_recycle)
self.log.i_data(self._log_tree,
'shorten_cycle_t_due_to_bt_recycle',
_t_diff)
if self.wash_recycle:
if not self.load_c_end_estimate_with_iterative_solver:
self.log.w(f"Estimating cycle duration:"
f" Assuming sharp breakthrough profile.")
m_wash_recycle = bt_first_wash_out[
:,
:self._wash_recycle_i_duration
].sum() * self._wash_f * self._dt
_t_diff = m_wash_recycle / self._load_f / load_c_ss.sum()
t_cycle -= _t_diff
self._wash_recycle_m_ss = m_wash_recycle
self.log.i_data(self._log_tree,
'm_wash_recycle_ss',
m_wash_recycle)
self.log.i_data(self._log_tree,
'shorten_cycle_t_due_to_wash_recycle',
_t_diff)
if self.load_c_end_estimate_with_iterative_solver \
and (self.wash_recycle or self.load_recycle):
c_load_fist_cycle = load_c_ss * _np.ones([len(binding_species),
i_t_first_cycle * 2])
def sim_cycle(f_load: _np.ndarray,
c_load: _np.ndarray,
i_prev_cycle: int) -> _typing.Tuple[_np.ndarray,
_np.ndarray,
int]:
"""Simulates load-wash cycle. Calc load duration.
Load duration is determined based on breakthrough
criteria.
Parameters
----------
f_load
Load flow rate profile.
c_load
Load conc profile.
i_prev_cycle
Previous cycle duration in time steps.
Returns
-------
f_load_next_cycle
Load and wash breakthrough flow rate profile.
c_load_next_cycle
Load and wash breakthrough conc profile.
i_cycle
Current cycle duration in time steps.
"""
# Load.
bt_load: _np.ndarray = \
c_load - self.load_bt.calc_c_bound(f_load, c_load)
# Propagate breakthrough.
bt_load_out, _ = self._sim_c_recycle_propagation(
f_load,
bt_load,
None)
# 'Stop' load at specified breakthrough criteria.
# noinspection PyTypeChecker
i_cycle_duration = _utils.vectors.true_start(
bt_load_out.sum(0) >= load_c_end_ss.sum())
# Cut load at specified time.
bt_load = bt_load[:, :i_cycle_duration]
# Wash desorption.
if self.wash_desorption and self.wash_recycle:
c_first_wash_desorbed = self._sim_c_wash_desorption(
f_load[:i_cycle_duration],
c_load[:, :i_cycle_duration]
- bt_load[:, :i_cycle_duration])
else:
c_first_wash_desorbed = None
# Propagate load and wash leftovers.
bt_load_out, bt_wash_out = self._sim_c_recycle_propagation(
f_load[:i_cycle_duration],
bt_load,
c_first_wash_desorbed)
# Construct load for next cycle.
# Recycle load.
if self.load_recycle:
rec_load = bt_load_out[:,
i_prev_cycle:i_cycle_duration]
else:
rec_load = _np.zeros_like(
bt_load_out[:, i_prev_cycle:i_cycle_duration])
# Next load profiles.
c_next_load = _np.concatenate((rec_load,
c_load_fist_cycle),
axis=1)
f_next_load = self._load_f * _np.ones(c_next_load.shape[1])
wash_recycle_i_duration = self._wash_recycle_i_duration \
if self.wash_recycle else 0
# Log.
m_load_recycle_ss = \
bt_first_load_out[
:,
wash_recycle_i_duration:i_t_first_cycle
].sum() * self._load_f * self._dt
self._load_recycle_m_ss = m_load_recycle_ss
self.log.i_data(self._log_tree,
'm_load_recycle_ss',
m_load_recycle_ss)
# Recycle wash.
if self.wash_recycle:
c_next_load[:, :self._wash_recycle_i_duration] = \
bt_wash_out[:, :self._wash_recycle_i_duration]
f_next_load[:self._wash_recycle_i_duration] = \
self._wash_f
m_wash_recycle_ss = \
bt_wash_out[:,
:self._wash_recycle_i_duration
].sum() * self._wash_f * self._dt
self._wash_recycle_m_ss = m_wash_recycle_ss
self.log.i_data(self._log_tree,
'm_wash_recycle_ss',
m_wash_recycle_ss)
# Return next load and cycle duration.
return f_next_load, c_next_load, \
i_cycle_duration - i_prev_cycle
f_load_cycle = \
self._load_f * _np.ones(c_load_fist_cycle.shape[1])
c_load_cycle = c_load_fist_cycle
i_t_cycle_prev = i_t_first_cycle
i_t_cycle_estimate = 0
# Loop until cycle duration converges.
for i in range(
self.load_c_end_estimate_with_iter_solver_max_iter):
if abs(i_t_cycle_prev - i_t_cycle_estimate) <= 1:
self.log.i_data(self._log_tree,
"t_cycle_optimization_loop_iter",
i)
break
i_t_cycle_prev = i_t_cycle_estimate
f_load_cycle, c_load_cycle, i_t_cycle_estimate = \
sim_cycle(f_load_cycle, c_load_cycle, i_t_cycle_prev)
# print([i, i_t_cycle_prev, i_t_cycle_estimate])
if abs(i_t_cycle_prev - i_t_cycle_estimate) > 1:
self.log.w("Cycle duration estimator did not converge.")
t_cycle = i_t_cycle_estimate * self._dt
elif self.load_c_end_estimate_with_iterative_solver:
self.log.i(f"No need to use iterative solver in case of"
f" no recycling of load and/or wash.")
self._cycle_t = t_cycle
self.log.i_data(self._log_tree, 'cycle_t', t_cycle)
# noinspection DuplicatedCode
def _calc_first_cycle_extension_t(self):
"""Calc extension of first load.
First load step might be extended for processes with load and/or
wash recycle in order to get faster into steady-state regime.
"""
if not self.load_recycle and not self.wash_recycle:
self.log.w(f"Estimation of first cycle extension requested"
f" on a process without load recycle.")
self._first_cycle_extension_t = 0
return
elif not self.load_extend_first_cycle:
self.log.w(f"Estimation of first cycle extension requested"
f" on a process without extended first cycle.")
self._first_cycle_extension_t = 0
return
elif self.load_extend_first_cycle_t > 0:
self._first_cycle_extension_t = self.load_extend_first_cycle_t
return
elif self.load_extend_first_cycle_cv >= 0:
assert self._cv > 0, "CV should be defined by now."
assert self._load_f > 0, "Load flow rate should be defined by now."
self._first_cycle_extension_t = \
self.load_extend_first_cycle_cv * self._cv / self._load_f
elif self.load_cv > 0:
raise NotImplementedError(
f"Estimation of first cycle extension is only supported"
f" if the cycle length is defined by breakthrough cutoff"
f" criteria. This is due to the fact that if all the"
f" breakthrough material gets recycles,"
f" there is no single steady-state.")
else:
binding_species = [i for i in range(self._n_species)
if i not in self.non_binding_species]
load_c_ss = self._estimate_steady_state_mean_c(binding_species)
# simulate first cycle at constant load concentration
f_first_load = self._load_f * _np.ones(self._t.size)
c_first_load = load_c_ss * _np.ones([len(binding_species),
self._t.size])
bt_first_load: _np.ndarray = \
load_c_ss - self.load_bt.calc_c_bound(f_first_load,
c_first_load)
# propagate breakthrough
bt_first_load_out, _ = \
self._sim_c_recycle_propagation(f_first_load,
bt_first_load,
None)
load_c_end_ss = self._get_load_bt_cycle_switch_criteria(load_c_ss)
# noinspection PyTypeChecker
i_t_first_cycle = _utils.vectors.true_start(
bt_first_load_out.sum(0) >= load_c_end_ss.sum())
dm = 0
if self.load_recycle:
assert hasattr(self, "_load_recycle_m_ss"), \
f"Function `_calc_cycle_t()` should already be called."
dm += self._load_recycle_m_ss
if self.wash_recycle:
assert hasattr(self, "_wash_recycle_m_ss"), \
f"Function `_calc_cycle_t()` should already be called."
dm += self._wash_recycle_m_ss
di = 0
if dm > 0:
m_ext_bt = _np.cumsum(
bt_first_load_out.sum(0)[i_t_first_cycle:]
) * self._load_f * self._dt
di += _utils.vectors.true_start(m_ext_bt >= dm)
self._first_cycle_extension_t = di * self._dt
def _calc_cycle_start_i_list(self):
"""Calculate load switch positions in form of time steps."""
assert self._cycle_t > 0, \
f"Cycle length must have been determined" \
f" (by `_calc_cycle_t()`) by now"
flow_i_start, flow_i_end = \
_utils.vectors.true_start_and_end(self._f > 0)
if self.load_extend_first_cycle:
assert self._first_cycle_extension_t >= 0, \
f"Prolong of first load cycle is set to `True`," \
f" but the length is undefined."
if self._first_cycle_extension_t == 0:
self.log.w(f"Prolong of first load cycle is set to `True`,"
f" but the length of the extension is 0.")
load_extend_first_cycle_t = self._first_cycle_extension_t
self.log.i_data(self._log_tree,
"load_extend_first_cycle_t",
load_extend_first_cycle_t)
else:
load_extend_first_cycle_t = 0
cycle_start_t_list = _np.arange(
self._t[flow_i_start] + load_extend_first_cycle_t,
self._t[flow_i_end - 1],
self._cycle_t
)
cycle_start_t_list[0] = self._t[flow_i_start]
self._cycle_start_i_list = _np.rint(
cycle_start_t_list / self._dt).astype(_np.int32)
self.log.i_data(self._log_tree,
"cycle_start_t_list",
cycle_start_t_list)
def _prepare_simulation(self):
"""Prepare everything before cycle-by-cycle simulation."""
self._assert_non_binding_species()
self._calc_load_f()
self._calc_cv() # might depend on load_f
self._report_column_dimensions() # optional
# Equilibration.
self._calc_equilibration_t()
# Wash.
self._calc_wash_t_and_f()
# Elution.
self._calc_elution_t_and_f()
self._calc_elution_peak_t()
self._update_elution_peak_pdf()
self._calc_elution_peak_cut_i_start_and_i_end()
self._calc_elution_peak_mask()
# Regeneration.
self._calc_regeneration_t()
# Prepare for estimation of cycle length.
self._update_load_btc()
if self.load_recycle:
self._update_load_recycle_pdf(self._wash_f)
if self.wash_recycle:
self._calc_load_recycle_wash_i()
# Cycle time.
self._calc_cycle_t()
if self.load_extend_first_cycle:
self._calc_first_cycle_extension_t()
# Cycle start positions == column load switch time points.
self._calc_cycle_start_i_list()
# Make sure cycle duration is long enough.
_t_cycle_except_load = self._equilibration_t + self._wash_t \
+ self._elution_t + self._regeneration_t
if self._cycle_t < _t_cycle_except_load:
self.log.e(f"Load step ({self._cycle_t}) should not be shorter"
f" than eq_t + wash_t + elution_t + regeneration_t"
f" ({_t_cycle_except_load: .6})!")
def _sim_c_load_binding(self,
f_load: _np.ndarray,
c_load: _np.ndarray
) -> _typing.Tuple[_np.ndarray, _np.ndarray]:
"""Determine what part of load binds.
Load in this context might also contain wash and load recycle
from previous steps.
Parameters
----------
f_load
Load flow rate profile.
c_load
Load concentration profile.
Returns
-------
c_bound
Conc profile of bound material.
c_unbound
Conc profile of unbound material = `c_load` - `c_bound`.
"""
assert f_load.size == c_load.shape[1], \
"f_load and c_load must have the same length"
assert c_load.shape[0] == \
self._n_species - len(self.non_binding_species), \
"c_load must contain all binding species"
c_bound = self.load_bt.calc_c_bound(f_load, c_load)
# Returns bound and unbound part.
return c_bound, c_load - c_bound
def _sim_c_wash_desorption(self,
f_load: _np.ndarray,
c_bound: _np.ndarray) -> _np.ndarray:
"""Get conc profile of desorbed material during wash step.
The step has no default logic.
Thus it raises `NotImplementedError` if called.
Parameters
----------
f_load
Flow rate profile during 'effective load' step.
The step includes wash recycle, load recycle and load step
as a column sees it in a single cycle.
c_bound
Conc profile of captured material.
Returns
-------
ndarray
Conc profile of desorbed material during wash step.
Raises
------
NotImplementedError
This method has no default implementation. Thus it being
called it will raise the error.
"""
# Not implemented in core this class, as there is
# no consensus on typical dynamics and the way to describe it.
raise NotImplementedError("Function not implemented in this class")
def _sim_c_recycle_propagation(
self,
f_unbound: _np.ndarray,
c_unbound: _np.ndarray,
c_wash_desorbed: _typing.Optional[_np.ndarray]
) -> _typing.Tuple[_np.ndarray, _np.ndarray]:
"""Propagate unbound and desorbed material through the column.
Unbound (breakthrough during load) and desorbed (during wash)
sections might have a different flow rates as they come from
different steps - load and wash.
Parameters
----------
f_unbound
Flow rate profile during 'total load' step for a cycle.
The step includes wash recycle, load recycle and load step.
c_unbound
Conc profile of overloaded material during load step
(plus previous wash and load recycle).
c_wash_desorbed
Conc profile of desorbed material during wash step.
Returns
-------
c_unbound_propagated
Propagated conc profile of overloaded material.
c_wash_desorbed_propagated
Propagated conc profile of desorbed material.
"""
assert hasattr(self, "_wash_f") and self._wash_f > 0
assert hasattr(self, "_wash_t") and self._wash_t > 0
assert self.load_recycle_pdf is not None
assert c_unbound.shape[0] == \
self._n_species - len(self.non_binding_species)
assert c_unbound.shape[1] == f_unbound.size
if c_wash_desorbed is None or c_wash_desorbed.size == 0:
c_wash_desorbed = _np.zeros([
self._n_species - len(self.non_binding_species),
int(round(self._wash_t / self._dt))])
else:
assert c_wash_desorbed.shape[0] == \
self._n_species - len(self.non_binding_species)
assert c_wash_desorbed.shape[1] == \
int(round(self._wash_t / self._dt))
# Combine on volumetric scale.
v_load = self._dt * f_unbound.cumsum()
v_wash = v_load[-1] + \
self._dt * _np.arange(1, c_wash_desorbed.shape[1] + 1) \
* self._wash_f
min_flow = min(f_unbound.min(), self._wash_f)
dv = min_flow * self._dt
v = _np.arange(dv,
(v_wash[-1] if v_wash.size > 0 else v_load[-1]) + dv,
dv)
c_v_combined = _interp.interp1d(
_np.concatenate((v_load, v_wash), axis=0),
_np.concatenate((c_unbound, c_wash_desorbed), axis=1),
fill_value="extrapolate"
)(v)
c_v_combined[c_v_combined < 0] = 0
# Simulate traveling of leftover material through the column.
self._update_load_recycle_pdf(min_flow)
c_v_combined_propagated = _utils.convolution.time_conv(
self._dt, c_v_combined, self._p_load_recycle_pdf)
# Split back on time scale.
c_combined_propagated = _interp.interp1d(
v,
c_v_combined_propagated,
fill_value="extrapolate"
)(_np.concatenate((v_load, v_wash), axis=0))
c_combined_propagated[c_combined_propagated < 0] = 0
c_unbound_propagated = c_combined_propagated[:, :v_load.size]
c_wash_desorbed_propagated = c_combined_propagated[:, v_load.size:]
return c_unbound_propagated, c_wash_desorbed_propagated
def _sim_c_elution_desorption(self,
m_bound: _np.ndarray
) -> _typing.Tuple[_np.ndarray,
_np.ndarray]:
"""Simulate elution step.
Parameters
----------
m_bound
Vector with amount of product being bound to the column.
`m_bound.size == n_species`
Returns
-------
c_elution
Outlet concentration profile during the elution.
b_elution_peak
Boolean vector. Peak is collected where the value is `True`.
"""
assert self._elution_f > 0
assert self._elution_t > 0
i_elution_duration = int(round(self._elution_t / self._dt))
# Multiply elution peak with the amount of captured product.
c_elution = \
self._p_elution_peak[_np.newaxis, :i_elution_duration] * \
m_bound[:, _np.newaxis] / self._elution_f
# Pad with zeros to cover the entire elution step duration.
if c_elution.shape[1] < i_elution_duration:
c_elution = _np.pad(c_elution,
((0, 0),
(0, i_elution_duration - c_elution.shape[1])),
mode="constant")
# Boolean mask - `True` where peak is being collected.
b_elution_peak = self._elution_peak_mask
return c_elution, b_elution_peak
def _sim_c_elution_buffer(self, n_time_steps: int) -> _np.ndarray:
"""Get elution buffer composition at the outlet of the column.
By default the buffer composition is constant throughout the
elution step.
Feel free to override this function if you want to simulate
linear gradient or if the transient phenomena at the beginning
of peak cut needs to be considered.
Parameters
----------
n_time_steps
Duration of elution step in number of time steps.
Returns
-------
ndarray
Buffer concentration profile at the outlet of the column
during the elution step.
"""
# Elution buffer composition.
elution_buffer_composition = \
self.elution_buffer_c.reshape(self.elution_buffer_c.size, 1)
assert elution_buffer_composition.size == 0 \
or elution_buffer_composition.size == self._n_species, \
f"Elution buffer composition must be either empty or have" \
f" a concentration value for each specie."
assert _np.all(elution_buffer_composition >= 0), \
"Concentration values in elution buffer must be >= 0"
if elution_buffer_composition.size == 0:
elution_buffer_composition = _np.zeros([self._n_species, 1])
self.log.i_data(self._log_tree,
"elution_buffer_composition",
elution_buffer_composition)
# Constant profile.
c_elution_buffer = elution_buffer_composition \
* _np.ones_like(self._t[:n_time_steps])
return c_elution_buffer
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def _sim_c_regeneration(self,
m_bound: _np.ndarray
) -> _typing.Optional[_np.ndarray]:
"""Simulate regeneration step.
Parameters
----------
m_bound
Vector with amount of product being bound to the column at
the beginning of the regeneration step.
`m_bound.size == n_species`.
Returns
-------
Optional[ndarray]
Outlet concentration profile during regeneration step.
E.g. regeneration peak.
"""
# No default implementation.
c_regeneration = None
return c_regeneration
def _sim_c_out_cycle(self,
f_load: _np.ndarray,
c_load: _np.ndarray
) -> _typing.Tuple[_typing.Optional[_np.ndarray],
_typing.Optional[_np.ndarray],
_np.ndarray,
_np.ndarray,
_typing.Optional[_np.ndarray]]:
"""Simulates load-wash-elution-regeneration steps.
Regeneration is optional.
This function can be replaced in case user wants to use some
other variation of bind-elution dynamics.
Elution peak cut is applied in this function.
Elution peak shape must be defined by now.
Return profiles that are `None` are considered being zero.
Parameters
----------
f_load
Inlet (recycle + load) flow rate profile for a cycle.
The flow rate might be different during wash recycle.
c_load
Inlet (recycle + load) concentration profile.
Returns
-------
c_load
Conc profile at the outlet of the column during load.
c_wash
Conc profile at the outlet of the column during wash.
c_elution
Conc profile at the outlet of the column during elution.
b_elution
Boolean mask for elution step. `True` where peak is being
collected.
c_regeneration
Conc profile at the outlet of the column during
regeneration.
"""
assert self._load_f > 0
assert self._wash_f > 0
assert self._wash_t > 0
assert self._elution_f > 0
assert self._elution_t > 0
assert self._load_f > 0
assert self._cv > 0
# Evaluate binding.
c_bound, c_unbound = self._sim_c_load_binding(f_load, c_load)
# Log.
m_load = (c_load * f_load[_np.newaxis, :]).sum(1) * self._dt
m_bound = (c_bound * f_load[_np.newaxis, :]).sum(1) * self._dt
self.log.i_data(self._cycle_tree,
"column_utilization",
m_bound / self._cv / self.load_bt.get_total_bc())
self.log.i_data(self._cycle_tree, "m_load", m_load)
self.log.i_data(self._cycle_tree, "m_bound", m_bound)
self.log.i_data(self._cycle_tree, "m_unbound", m_load - m_bound)
self.log.d_data(self._cycle_tree, "f_load", f_load)
self.log.d_data(self._cycle_tree, "c_load", c_load)
self.log.d_data(self._cycle_tree, "c_bound", c_bound)
self.log.d_data(self._cycle_tree, "c_unbound", c_unbound)
# Evaluate desorption during wash.
c_wash_desorbed = None
if self.wash_desorption:
c_wash_desorbed = self._sim_c_wash_desorption(f_load, c_bound)
if c_wash_desorbed.size > 0:
# Subtract desorbed material from bound material.
m_bound -= c_wash_desorbed.sum(1)
# Log.
self.log.i_data(self._cycle_tree,
"m_wash_desorbed",
c_wash_desorbed.sum(1) * self._wash_f * self._dt)
self.log.d_data(self._cycle_tree,
"c_wash_desorbed",
c_wash_desorbed)
# Propagate unbound and desorbed material throughout the column.
c_out_load = c_unbound
c_out_wash = c_wash_desorbed
if self.load_recycle or self.wash_recycle:
c_out_load, c_out_wash = \
self._sim_c_recycle_propagation(f_load,
c_unbound,
c_wash_desorbed)
# Get elution peak.
c_out_elution, elution_peak_mask = \
self._sim_c_elution_desorption(m_bound)
# Log.
m_elution_peak = (c_out_elution * elution_peak_mask[_np.newaxis, :]
).sum(1) * self._elution_f * self._dt
m_elution = c_out_elution.sum(1) * self._elution_f * self._dt
self.log.i_data(self._cycle_tree,
"m_elution_peak", m_elution_peak)
self.log.i_data(self._cycle_tree,
"m_elution", m_elution)
self.log.i_data(self._cycle_tree,
"m_elution_peak_cut_loss", m_elution - m_elution_peak)
# Get regeneration peak.
c_out_regeneration = self._sim_c_regeneration(
m_bound - c_out_elution.sum(1) * self._elution_f * self._dt)
return c_out_load, c_out_wash, c_out_elution, \
elution_peak_mask, c_out_regeneration
def _calculate(self):
# Pre calculate parameters and repetitive profiles.
self._prepare_simulation()
# Assert proper list of binding species.
binding_species = [i for i in range(self._n_species)
if i not in self.non_binding_species]
assert len(binding_species) > 0
# Copy inlet vectors.
c_in_load = self._c[binding_species].copy()
f_in_load = self._f.copy()
f_in_i_end = min(_utils.vectors.true_end(f_in_load > 0), self._t.size)
c_in_load[:, f_in_i_end:] = 0
# Clear for results.
self._c[:] = 0
self._f[:] = 0
# Prepare logger.
log_data_cycles = list()
self.log.set_branch(self._log_tree, "cycles", log_data_cycles)
# Variable to store wash recycle to.
previous_c_bt_wash: _typing.Optional[_np.ndarray] = None
# Loop across cycles.
for i in range(self._cycle_start_i_list.size):
# Load-wash-elution-regeneration-equilibration steps for a column.
# Load step starts at `self._cycle_start_i_list[i]`.
# Prepare logger for this cycle.
self._cycle_tree = dict()
log_data_cycles.append(self._cycle_tree)
# Load start and end time as the column sees it.
if i > 0 and self.load_recycle:
# Column sees leftovers from previous load during recycling.
cycle_load_i_start = self._cycle_start_i_list[i - 1]
else:
cycle_load_i_start = self._cycle_start_i_list[i]
# Calc cycle end (either next cycle or end or simulation time).
if i + 1 < self._cycle_start_i_list.size:
cycle_load_i_end = self._cycle_start_i_list[i + 1]
else:
cycle_load_i_end = f_in_i_end - 1
# Log results.
self.log.i_data(self._cycle_tree,
"i_cycle_load_start",
cycle_load_i_start)
self.log.i_data(self._cycle_tree,
"i_cycle_load_step_start",
self._cycle_start_i_list[i])
self.log.i_data(self._cycle_tree,
"i_cycle_load_end",
cycle_load_i_end)
# Calc profiles at column outlet.
c_out_load, c_out_wash, c_out_elution, \
b_out_elution, c_out_regeneration = self._sim_c_out_cycle(
f_in_load[cycle_load_i_start:cycle_load_i_end],
c_in_load[:, cycle_load_i_start:cycle_load_i_end]
)
self.log.d_data(self._cycle_tree,
"c_out_load", c_out_load)
self.log.d_data(self._cycle_tree,
"c_out_wash", c_out_wash)
self.log.d_data(self._cycle_tree,
"c_out_elution", c_out_elution)
self.log.d_data(self._cycle_tree,
"b_out_elution", b_out_elution)
self.log.d_data(self._cycle_tree,
"c_out_regeneration", c_out_regeneration)
# Load recycle.
if self.load_recycle:
# Recycle load during the load step.
i_load_start_rel = self._cycle_start_i_list[i] \
- cycle_load_i_start
c_load_recycle = c_out_load[:, i_load_start_rel:]
c_in_load[:, self._cycle_start_i_list[i]:cycle_load_i_end] = \
c_load_recycle
self.log.i_data(self._cycle_tree, "m_load_recycle",
c_load_recycle.sum(1)
* self._load_f * self._dt)
self.log.d_data(self._cycle_tree, "c_load_recycle",
c_load_recycle)
# Losses during load == bt through 2nd column.
c_loss_bt_2nd_column = c_out_load[:, i_load_start_rel]
self.log.i_data(self._cycle_tree, "m_loss_bt_2nd_column",
c_loss_bt_2nd_column.sum()
* self._dt * self._load_f)
self.log.d_data(self._cycle_tree, "c_loss_bt_2nd_column",
c_loss_bt_2nd_column)
else:
# report losses during load
m_loss_load = c_out_load.sum() * self._dt * self._load_f
self.log.i_data(self._cycle_tree, "m_loss_load", m_loss_load)
# Wash recycle.
if self.wash_recycle:
if previous_c_bt_wash is not None \
and previous_c_bt_wash.size > 0:
# Clip wash recycle duration if needed.
i_wash_duration = min(
self._wash_recycle_i_duration,
self._t.size - self._cycle_start_i_list[i])
# Log losses due to discarding load bt during wash recycle.
s = c_in_load[
:,
self._cycle_start_i_list[i]:self._cycle_start_i_list[i]
+ i_wash_duration]
self.log.i_data(self._cycle_tree,
"m_loss_load_bt_during_wash_recycle",
s.sum() * self._dt * self._load_f)
self.log.d_data(self._cycle_tree,
"c_lost_load_during_wash_recycle", s)
self.log.d_data(self._cycle_tree, "c_wash_recycle",
previous_c_bt_wash[:, :i_wash_duration])
self.log.i_data(
self._cycle_tree, "m_wash_recycle",
previous_c_bt_wash[:, :i_wash_duration].sum(1)
* self._dt * self._wash_f)
# Apply previous wash recycle onto the inlet profile.
s[:] = previous_c_bt_wash[:, :i_wash_duration]
f_in_load[self._cycle_start_i_list[i]:
self._cycle_start_i_list[i]
+ i_wash_duration] = self._wash_f
# Save wash from this cycle to be used during the next cycle.
previous_c_bt_wash = c_out_wash
else:
# Report losses during wash.
if c_out_wash is None:
c_out_wash = _np.zeros(
[len(binding_species),
int(round(self._wash_t / self._dt))])
m_loss_wash = c_out_wash.sum() * self._dt * self._load_f
self.log.i_data(self._cycle_tree, "m_loss_wash", m_loss_wash)
# Elution.
[i_el_rel_start, i_el_rel_end] = \
_utils.vectors.true_start_and_end(b_out_elution)
i_el_start = min(
self._t.size,
cycle_load_i_end + c_out_wash.shape[1] + i_el_rel_start)
i_el_end = min(
self._t.size,
cycle_load_i_end + c_out_wash.shape[1] + i_el_rel_end)
i_el_rel_end = i_el_rel_start + i_el_end - i_el_start
# Log.
self.log.i_data(self._cycle_tree, "i_elution_start", i_el_start)
self.log.i_data(self._cycle_tree, "i_elution_end", i_el_end)
# Write to global outlet.
self._f[i_el_start:i_el_end] = self._elution_f
self._c[binding_species, i_el_start:i_el_end] = \
c_out_elution[:, i_el_rel_start:i_el_rel_end]
[docs]class ACC(AlternatingChromatography):
"""Alternating column chromatography without recycling.
Alternating load-bind-elution twin-column chromatography without
recycling of overloaded or washed out material.
This class offers no dynamics for desorption during wash step.
Parameters
----------
t
Simulation time vector.
Starts with 0 and has a constant time step.
uo_id
Unique identifier.
load_bt
Load breakthrough logic.
peak_shape_pdf
Elution peak shape.
gui_title
Readable title for GUI. Default = "ACC".
Notes
-----
For list of attributes refer to :class:`AlternatingChromatography`.
See Also
--------
:class:`AlternatingChromatography`
Examples
--------
>>> dt = 0.5 # min
>>> t = _np.arange(0, 24.1 * 60, dt)
>>> load_bt = _bt_load.ConstantPatternSolution(dt, dbc_100=50, k=0.12)
>>> peak_shape_pdf = _pdf.ExpModGaussianFixedRelativeWidth(t, 0.15, 0.3)
>>> acc_pro_a = ACC(
... t,
... load_bt=load_bt,
... peak_shape_pdf=peak_shape_pdf,
... uo_id="pro_a_acc",
... gui_title="ProteinA ACC",
... )
>>> acc_pro_a.cv = 100 # mL
>>> # Equilibration step.
>>> acc_pro_a.equilibration_cv = 1.5
>>> # Equilibration flow rate is same as load flow rate.
>>> acc_pro_a.equilibration_f_rel = 1
>>> # Load 10 CVs.
>>> acc_pro_a.load_cv = 20
>>> # Define wash step.
>>> acc_pro_a.wash_cv = 5
>>> # Elution step.
>>> acc_pro_a.elution_cv = 3
>>> # 1st momentum of elution peak from data from above.
>>> acc_pro_a.elution_peak_position_cv = 1.2
>>> acc_pro_a.elution_peak_cut_start_c_rel_to_peak_max = 0.05
>>> acc_pro_a.elution_peak_cut_end_c_rel_to_peak_max = 0.05
>>> # Regeneration step.
>>> acc_pro_a.regeneration_cv = 1.5
>>> # Inlet flow rate profile.
>>> f_in = _np.ones_like(t) * 15 # mL/min
>>> c_in = _np.ones([1, t.size]) * 2.5 # mg/mL
>>> # Simulate ACC.
>>> f_out, c_out = acc_pro_a.evaluate(f_in, c_in)
"""
def __init__(self,
t: _np.ndarray,
uo_id: str,
load_bt: _core.ChromatographyLoadBreakthrough,
peak_shape_pdf: _core.PDF,
gui_title: str = "ACC"):
super().__init__(t, uo_id, load_bt, peak_shape_pdf, gui_title)
def _sim_c_wash_desorption(self,
f_load: _np.ndarray,
c_bound: _np.ndarray) -> _np.ndarray:
"""Desorbed material during wash step is not supported by ACC.
Raises
------
NotImplementedError
Raises exception when function if called.
"""
raise NotImplementedError("Function not implemented in this class.")
[docs]class PCC(AlternatingChromatography):
"""Alternating column chromatography with recycling of load.
Alternating load-bind-elution twin-column chromatography with
optional recycling of overloaded or washed out material.
This class offers no dynamics for desorption during wash step.
PCC uses :attr:`load_bt` to determine what parts of the load (and
recycled material) bind to the column. The unbound (not captured)
part is propagated through the column by :attr:`load_recycle_pdf`.
Void volume for :attr:`load_recycle_pdf` is defined as
:attr:`column_porosity_retentate` * `column volume`.
Parameters
----------
t
Simulation time vector.
Starts with 0 and has a constant time step.
uo_id
Unique identifier.
load_bt
Load breakthrough logic.
load_recycle_pdf
Propagation of load breakthrough and/or washed out material
through the column.
column_porosity_retentate
Porosity of the column for binding species (protein).
peak_shape_pdf
Elution peak shape.
gui_title
Readable title for GUI. Default = "PCC".
Notes
-----
For list of additional attributes refer to
:class:`AlternatingChromatography`.
See Also
--------
:class:`AlternatingChromatography`
Examples
--------
>>> dt = 0.5 # min
>>> t = _np.arange(0, 24.1 * 60, dt)
>>> load_bt = _bt_load.ConstantPatternSolution(dt, dbc_100=50, k=0.12)
>>> peak_shape_pdf = _pdf.ExpModGaussianFixedRelativeWidth(t, 0.15, 0.3)
>>> load_recycle_pdf = _pdf.GaussianFixedDispersion(t, 2 * 2 / 30)
>>> pcc_pro_a = PCC(
... t,
... load_bt=load_bt,
... peak_shape_pdf=peak_shape_pdf,
... load_recycle_pdf=load_recycle_pdf,
... # Porosity of the column for protein.
... column_porosity_retentate=0.64,
... uo_id="pro_a_pcc",
... gui_title="ProteinA PCC",
... )
>>> pcc_pro_a.cv = 100 # mL
>>> # Equilibration step.
>>> pcc_pro_a.equilibration_cv = 1.5
>>> # Equilibration flow rate is same as load flow rate.
>>> pcc_pro_a.equilibration_f_rel = 1
>>> # Load until 70 % breakthrough.
>>> pcc_pro_a.load_c_end_relative_ss = 0.7
>>> # Automatically prolong first cycle to faster achieve a steady-state.
>>> pcc_pro_a.load_extend_first_cycle = True
>>> # Define wash step.
>>> # There is no desorption during wash step in this example.
>>> pcc_pro_a.wash_cv = 5
>>> pcc_pro_a.wash_recycle = True
>>> pcc_pro_a.wash_recycle_duration_cv = 2
>>> # Elution step.
>>> pcc_pro_a.elution_cv = 3
>>> # 1st momentum of elution peak from data from above.
>>> pcc_pro_a.elution_peak_position_cv = 1.2
>>> pcc_pro_a.elution_peak_cut_start_c_rel_to_peak_max = 0.05
>>> pcc_pro_a.elution_peak_cut_end_c_rel_to_peak_max = 0.05
>>> # Regeneration step.
>>> pcc_pro_a.regeneration_cv = 1.5
>>> # Inlet flow rate profile.
>>> f_in = _np.ones_like(t) * 15 # mL/min
>>> c_in = _np.ones([1, t.size]) * 2.5 # mg/mL
>>> # Simulate ACC.
>>> f_out, c_out = pcc_pro_a.evaluate(f_in, c_in) # doctest: +ELLIPSIS
pro_a_pcc: Steady-state concentration is being estimated ...
pro_a_pcc: Steady-state concentration is being estimated ...
"""
def __init__(self,
t: _np.ndarray,
uo_id: str,
load_bt: _core.ChromatographyLoadBreakthrough,
load_recycle_pdf: _core.PDF,
column_porosity_retentate: float,
peak_shape_pdf: _core.PDF,
gui_title: str = "PCC"):
super().__init__(t, uo_id, load_bt, peak_shape_pdf, gui_title)
self.load_recycle = True
"""Recycle load breakthrough. Default = `True`."""
self.wash_recycle = False
"""Recycle wash. Default = False."""
self.column_porosity_retentate = column_porosity_retentate
"""Column porosity for binding species.
See Also
--------
:class:`PCC`
Examples
--------
`column_porosity_retentate` is a mean residence time of the
product (protein) traveling through the column during
non-binding conditions (in CVs).
"""
self.load_recycle_pdf = load_recycle_pdf
"""PDF of wash and/or unbound load traveling through the column.
See Also
--------
:class:`PCC`
"""
def _sim_c_wash_desorption(self,
f_load: _np.ndarray,
c_bound: _np.ndarray) -> _np.ndarray:
"""Desorbed material during wash step is not supported by PCC.
Raises
------
NotImplementedError
Raises exception when function if called.
"""
raise NotImplementedError("Function not implemented in this class.")
[docs]class PCCWithWashDesorption(PCC):
"""Alternating column chromatography with recycling of load.
Alternating load-bind-elution twin-column chromatography with
optional recycling of overloaded or washed out material.
The material desorption during wash step is defined by exponential
half life time
* :attr:`wash_desorption_tail_half_time_cv`
and the amount of desorbable material which is defined by
* :attr:`wash_desorption_desorbable_material_share` or
* :attr:`wash_desorption_desorbable_above_dbc`.
PCC uses :attr:`load_bt` to determine what parts of the load (and
recycled material) bind to the column.
The unbound (not captured) part and desorbed part are propagated
through the column by :attr:`load_recycle_pdf`.
Void volume for :attr:`load_recycle_pdf` is defined as
:attr:`column_porosity_retentate` * `column volume`.
Parameters
----------
t
Simulation time vector.
Starts with 0 and has a constant time step.
uo_id
Unique identifier.
load_bt
Load breakthrough logic.
load_recycle_pdf
Propagation of load breakthrough and/or washed out material
through the column.
column_porosity_retentate
Porosity of the column for binding species (protein).
peak_shape_pdf
Elution peak shape.
gui_title
Readable title for GUI. Default = "PCCWithWashDesorption".
Notes
-----
During wash step, weaker binding isoforms might be desorbed and
recycled. In turn they are again desorbed and recycled during next
cycle and so on; resulting in increasing amount of desorbed material
during wash step (even in steady-state). This is not considered by
this class. Furthermore, it is not a favorable case in terms of RTD
as the weakly bound material propagates from column to column for
many cycles.
For list of additional attributes refer to
:class:`PCC` and :class:`AlternatingChromatography`.
See Also
--------
:class:`PCC`
:class:`AlternatingChromatography`
"""
def __init__(self,
t: _np.ndarray,
uo_id: str,
load_bt: _core.ChromatographyLoadBreakthrough,
load_recycle_pdf: _core.PDF,
column_porosity_retentate: float,
peak_shape_pdf: _core.PDF,
gui_title: str = "PCCWithWashDesorption"):
super().__init__(t, uo_id, load_bt, load_recycle_pdf,
column_porosity_retentate, peak_shape_pdf, gui_title)
self.load_recycle = True
"""Recycle load breakthrough. Default = `True`."""
self.wash_recycle = True
"""Recycle wash. Default = `True`."""
self.wash_desorption = True
"""Simulate desorption during wash step. Default = `True`."""
self.wash_desorption_tail_half_time_cv = -1
"""Wash desorption rate.
Required if :attr:`wash_desorption` is `True`.
Wash desorption is simulated as exponential decay with half-life
:attr:`wash_desorption_tail_half_time_cv`.
"""
self.wash_desorption_desorbable_material_share = -1
"""Share of material that can be desorbed during wash step.
Wash desorption is simulated as exponential decay. Only part of
adsorbed material is subjected to that exponential decay. That
part can be defined by:
* :attr:`wash_desorption_desorbable_material_share` (this one)
or
* :attr:`wash_desorption_desorbable_above_dbc`.
"""
self.wash_desorption_desorbable_above_dbc = -1
"""Share of material that can be desorbed during wash step.
Share is defined as a share of material loaded onto the column
that exceeds specified `wash_desorption_desorbable_above_dbc`
binding capacity.
Wash desorption is simulated as exponential decay. Only part of
adsorbed material is subjected to that exponential decay. That
part can be defined by:
* :attr:`wash_desorption_desorbable_material_share` (this one)
or
* :attr:`wash_desorption_desorbable_above_dbc`.
"""
def _sim_c_wash_desorption(self,
f_load: _np.ndarray,
c_bound: _np.ndarray) -> _np.ndarray:
"""Get conc profile of desorbed material during wash step.
`self.wash_desorption_tail_half_time_cv` needs to be defined.
One of `self.wash_desorption_desorbable_material_share` and
`self.wash_desorption_desorbable_above_dbc` needs to be defined.
Parameters
----------
f_load
Flow rate profile during 'effective load' step.
The step includes wash recycle, load recycle and load step
as a column sees it in a single cycle.
c_bound
Conc profile of captured material.
Returns
-------
ndarray
Conc profile of desorbed material during wash step.
"""
assert self.wash_desorption_tail_half_time_cv > 0
assert self._load_f > 0
assert self._wash_f > 0
assert self._wash_t > 0
assert self._cv > 0
assert self.wash_desorption_desorbable_material_share > 0 \
or self.wash_desorption_desorbable_above_dbc > 0
assert f_load.size == c_bound.shape[1]
assert c_bound.shape[0] \
== self._n_species - len(self.non_binding_species)
m_bound = (c_bound * f_load[_np.newaxis, :]).sum(1)[:, _np.newaxis] \
* self._dt
# Calc share of desorbable material.
k = -1
if self.wash_desorption_desorbable_material_share > 0:
k = self.wash_desorption_desorbable_material_share
if self.wash_desorption_desorbable_above_dbc > 0:
if k > 0:
self.log.w(
f"Share of desorbable material defined twice!!"
f" Using `load_recycle_wash_desorbable_material_share`")
else:
k = max(0,
1 - self.wash_desorption_desorbable_above_dbc
* self._cv / m_bound.sum())
assert 1 >= k >= 0, f"Share of desorbable material {k}" \
f" must be >= 0 and <= 1."
i_wash_duration = int(round(self._wash_t / self._dt))
# Generate exponential tail.
exp_pdf = _pdf.TanksInSeries(self._t[:i_wash_duration],
n_tanks=1,
pdf_id=f"wash_desorption_exp_drop")
exp_pdf.allow_open_end = True
exp_pdf.trim_and_normalize = False
tau = self.wash_desorption_tail_half_time_cv \
* self._cv / self._wash_f / _np.log(2)
exp_pdf.update_pdf(rt_mean=tau)
p = exp_pdf.get_p()[_np.newaxis, :i_wash_duration]
# Scale desorbed material conc due to differences in flow rate.
c_desorbed = m_bound * k * p / self._wash_f
# Pad with zeros if needed.
c_desorbed = _np.pad(c_desorbed,
((0, 0),
(0, i_wash_duration - c_desorbed.shape[1])),
mode="constant")
# Log.
self.log.d_data(self._cycle_tree if hasattr(self, "_cycle_tree")
else self._log_tree,
"p_desorbed",
p)
return c_desorbed