diminishing_hill_multilinear_multiplication
DiminishingHillMultilinearMultiplication
Bases: NToOneTransformer
N-to-1 transformation allowing to get 1 variable by applying a linear combination of N other variables, applying appropriate Hill functions, and finalizing with multiplication.
The hill function is as follows:
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
coefs_sets | list[list[float]] | Coefficients of the multilinear functions following the shape of the formula. Example: coefs_sets = [ [coef_11, coef_12, .., coef_1N], [coef_21, coef_22, .., coef_2N], … [coef_Q1, coef_Q2, .., coef_QN] ] | required |
powers | list[float] | Powers applied to each Hill or Identify function of multilinear aggregation concerned by the multiplicative transformation. Example: powers = [power_1, power_2, …, power_Q] | required |
K_sets | list[float] | List of K parameters that will be applied after aggregating variables Example: K_sets = [K_1, …, K_M] | required |
S_sets | list[float] | List of S parameters that will be applied after aggregating variables Example: S_sets = [S_1, …, S_M] | required |
max_sets | Optional[list[float]] | List of max parameters that will be applied after aggregating variables. Example: K_sets = [max_1, …, max_M] | None |
ignore_input_indexes | Optional[List[int]] | Indexes of the data in series to ignore when computing the repartition. | None |
Source code in eki_mmo_equations/n_to_one_transformations/diminishing_hill_multilinear_multiplication.py
class DiminishingHillMultilinearMultiplication(NToOneTransformer):
"""N-to-1 transformation allowing to get 1 variable by applying a linear combination of N other variables,
applying appropriate Hill functions, and finalizing with multiplication.
```math
(\\text{Hill}(\\text{coef}_{11} \\times \\text{serie}_{11} + \\text{coef}_{12} \\times \\text{serie}_{12} +
\\ldots + \\text{coef}_{1N} \\times \\text{serie}_{1N}))^{\\text{power}_1} \\\\
\\times \\\\
(\\text{Hill}(\\text{coef}_{21} \\times \\text{serie}_{21} + \\text{coef}_{22} \\times \\text{serie}_{22} +
\\ldots + \\text{coef}_{2N} \\times \\text{serie}_{2N}))^{\\text{power}_2} \\\\
\\times \\\\
(\\text{coef}_{P1} \\times \\text{serie}_{P1} + \\text{coef}_{P2} \\times \\text{serie}_{P2} +
\\ldots + \\text{coef}_{PN} \\times \\text{serie}_{PN})^{\\text{power}_{P}} \\\\
\\times \\\\
(\\text{coef}_{Q1} \\times \\text{serie}_{Q1} + \\text{coef}_{Q2} \\times \\text{serie}_{Q2} +
\\ldots + \\text{coef}_{QN} \\times \\text{serie}_{QN})^{\\text{power}_{Q}}
```
The hill function is as follows:
```math
\\frac{1}{1 + (\\frac{K \\times max}{serie})^S}
```
Args:
coefs_sets (list[list[float]]): Coefficients of the multilinear functions following the shape of the formula.
Example: coefs_sets = [
[coef_11, coef_12, .., coef_1N], [coef_21, coef_22, .., coef_2N], ... [coef_Q1, coef_Q2, .., coef_QN]
]
powers (list[float]): Powers applied to each Hill or Identify function of multilinear aggregation concerned by
the multiplicative transformation. Example: powers = [power_1, power_2, ..., power_Q]
K_sets (list[float]): List of K parameters that will be applied after aggregating variables
Example: K_sets = [K_1, ..., K_M]
S_sets (list[float]): List of S parameters that will be applied after aggregating variables
Example: S_sets = [S_1, ..., S_M]
max_sets (Optional[list[float]], optional): List of max parameters that will be applied after aggregating
variables. Example: K_sets = [max_1, ..., max_M]
ignore_input_indexes (Optional[List[int]], optional): Indexes of the data in series to ignore when
computing the repartition.
"""
def __init__(
self,
coefs_sets: list[list[float]],
powers: list[float],
K_sets: list[float],
S_sets: list[float],
max_sets: Optional[list[float]] = None,
ignore_input_indexes: Optional[list[int]] = None,
):
self.coefs_sets: list[list[float]] = coefs_sets
self.powers: list[float] = powers
self.K_sets: list[float] = K_sets
self.S_sets: list[float] = S_sets
self.max_sets: list[float] = [np.nan] * len(self.K_sets) if max_sets is None else max_sets
self.ignore_input_indexes: list[int] = ignore_input_indexes if ignore_input_indexes else []
@property
def parameters(self) -> dict[str, Any]:
return self.__dict__
@property
def series_indexation(self) -> list[tuple[int, int]]:
"""Indexations of the series based on their aggregation policy.
Returns:
list[tuple[int, int]]: List of indexation segments where the series add to be added together.
For example, [(0,3), (3,5)] means the first forth series has to be sum up together and
the two next ones has to be sum up together.
"""
return self._get_series_indexation(self.coefs_sets)
def _get_series_indexation(self, coefs_sets: list[list[float]]) -> list[tuple[int, int]]:
"""Indexations of the series based on their aggregation policy.
Args:
coefs_sets (list[list[float]]): Coefs sets from there the indexations of the series structure
is deduced from.
Returns:
list[tuple[int, int]]: List of indexation segments where the series are to be added together.
For example, [(0,3), (4,5)] means the first to forth series are to be summed up together and
the next two, the fifth and sixth series, are to be summed up together.
"""
lower_bound: int = 0
series_indexation: list[tuple[int, int]] = []
for coefs_set in coefs_sets:
upper_bound = lower_bound + len(coefs_set)
series_indexation.append((lower_bound, upper_bound))
lower_bound = upper_bound
return series_indexation
# ------- METHODS -------
def fit(self, series: list[np.ndarray], y=None):
# This method creates the appropriate max_sets if
# self.max_sets was not given
max_sets = []
# Applying only the aggregating of series here
aggregated_series = [
np.sum(
[
serie * coef
for serie, coef in zip(
series[series_indexation_set[0] : series_indexation_set[1]], coef_set # noqa: E203
)
],
axis=0,
)
for series_indexation_set, coef_set in zip(self.series_indexation, self.coefs_sets)
]
for max, aggregated_series_set in zip(self.max_sets, aggregated_series):
if (np.isnan(max)) | (max is None):
if np.any(aggregated_series_set[aggregated_series_set > 0]):
max = aggregated_series_set.max()
else:
max = aggregated_series_set.min() or 1
max_sets.append(max)
self.max_sets = max_sets
return super().fit(series, y)
def transform(self, series: list[np.ndarray], copy=False) -> np.ndarray:
series = super().transform(series, copy)
return self._transformer(
series,
self.coefs_sets,
self.powers,
self.K_sets,
self.S_sets,
self.max_sets,
self.series_indexation,
)
def repartition(self, series: list[np.ndarray]) -> list[np.ndarray]:
"""Returns the repartition of the output series from the initial input series.
First, the function create weight serie based on referential series for each series of hill or
identify function of aggregation and each component of the multiply function.
"""
# Create weight series within each aggregation set
repartition_series = [serie for i, serie in enumerate(series) if i not in self.ignore_input_indexes]
repartition_coefs_sets: list[list[float]] = []
ignore_power_indexes: list[float] = []
# Filter out the coefs from series that will be ignored (stated in repartition_series)
# If any entirety of an originally aggregated series set is to be filtered/ignored,
# the associated power_index of the filtered aggregated serie will also be filtered
# and tracked with ignore_power_indexes
for k, repartition_coefs_set in enumerate(
[
[
coef
for j, coef in enumerate(coefs_set)
if j + series_indexation_set[0] not in self.ignore_input_indexes
]
for coefs_set, series_indexation_set in zip(self.coefs_sets, self.series_indexation)
]
):
if len(repartition_coefs_set) > 0:
repartition_coefs_sets.append(repartition_coefs_set)
else:
ignore_power_indexes.append(k)
# Get the new indexation of the filtered series
repartition_series_indexation = self._get_series_indexation(repartition_coefs_sets)
# Filter out appropriate indexes from post-serie aggregation arguments
repartition_powers = [power for i, power in enumerate(self.powers) if i not in ignore_power_indexes]
repartition_K_sets = [K for i, K in enumerate(self.K_sets) if i not in ignore_power_indexes]
repartition_S_sets = [S for i, S in enumerate(self.S_sets) if i not in ignore_power_indexes]
repartition_max_sets = [max for i, max in enumerate(self.max_sets) if i not in ignore_power_indexes]
multilinear_weighted_series = [
np.array(
[
serie * coef
for serie, coef in zip(
repartition_series[series_indexation_set[0] : series_indexation_set[1]], coef_set # noqa: E203
)
]
)
for series_indexation_set, coef_set in zip(repartition_series_indexation, repartition_coefs_sets)
]
# Create weight series for each aggregation set component
multilinear_aggregated_series = [
np.sum(multilinear_weighted_series_set, axis=0)
for multilinear_weighted_series_set in multilinear_weighted_series
]
# Using the repartitioned K set as an indicator to how many
# aggregated series to apply the hill function to
hill_multilinear_aggregated_series = np.array(
[
DiminishingHill(K=K, S=S, max=max).transform(serie)
for serie, K, S, max in zip(
multilinear_aggregated_series[0 : len(repartition_K_sets)], # noqa: E203
repartition_K_sets,
repartition_S_sets,
repartition_max_sets,
)
]
+ [serie for serie in multilinear_aggregated_series[len(repartition_K_sets) :]] # noqa: E203
)
powered_series = np.array(
[serie**power for serie, power in zip(hill_multilinear_aggregated_series, repartition_powers)]
)
# Create weighted series within each aggregation set to have it in term of proportion
with np.errstate(divide="ignore", invalid="ignore"):
powered_weighted_series = powered_series / reduce(lambda a, b: a * b, powered_series)
powered_proportionned_weighted_series = np.nan_to_num(
powered_weighted_series / np.sum(powered_weighted_series, axis=0)
)
# Create weighted serie for each aggregation set component to have it in term of proportion
multilinear_proportionned_weighted_series = [
np.nan_to_num(multilinear_weighted_series_set / np.sum(multilinear_weighted_series_set, axis=0))
for multilinear_weighted_series_set in multilinear_weighted_series
]
# Display the component proportion within each aggregation set to get a global proportion of the series
proportionned_weighted_series = [
powered_proportionned_weighted_serie * multilinear_proportionned_weighted_serie
for powered_proportionned_weighted_serie, multilinear_proportionned_weighted_series_set in zip(
powered_proportionned_weighted_series, multilinear_proportionned_weighted_series
)
for multilinear_proportionned_weighted_serie in multilinear_proportionned_weighted_series_set
]
for index in self.ignore_input_indexes:
proportionned_weighted_series.insert(index, np.zeros(len(series[index])))
return proportionned_weighted_series
# ------- TRANSFORMERS -------
# Main method that must be changed
@staticmethod
def _transformer(
series: list[np.ndarray],
coefs_sets: list[list[float]],
powers: list[float],
K_sets: list[float],
S_sets: list[float],
max_sets: list[float],
series_indexation: list[tuple[int, int]],
) -> np.ndarray:
"""Returns the multiplication of the hill or identity function being applied to
linear combinations of the different input variables."""
number_of_hill_parameters = len(K_sets)
# Applying only the aggregating of series here
hill_transformed_series = [
np.sum(
[
serie * coef
for serie, coef in zip(
series[series_indexation_set[0] : series_indexation_set[1]], coef_set # noqa: E203
)
],
axis=0,
)
for series_indexation_set, coef_set in zip(series_indexation, coefs_sets)
]
# Apply Hill function to aggregated series by order and length of Hill parameters
for i in range(number_of_hill_parameters):
hill_transformed_series[i] = DiminishingHill(K=K_sets[i], S=S_sets[i], max=max_sets[i]).transform(
hill_transformed_series[i]
)
hill_transformed_series = [serie**power for serie, power in zip(hill_transformed_series, powers)]
return reduce(lambda a, b: a * b, hill_transformed_series)
# ------- CHECKERS -------
def check_params(self, series: list[np.ndarray]):
# Check to make sure length of hill parameters are
if len(self.K_sets) > len(self.coefs_sets):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__}. Length of hill parameters"
f" should be less than the number of total variables after aggregation."
)
self._check_multilinear_multiply_params(series)
self._check_ignore_input_indexes_params(series)
self._check_diminishing_hill_params(series)
def _check_multilinear_multiply_params(self, series: list[np.ndarray]):
"""Check if parameters respect their application scope."""
if not np.sum([len(coefs_set) for coefs_set in self.coefs_sets]) == len(series):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters."
f"Length of concatenated coefs_sets {np.sum([len(coefs_set) for coefs_set in self.coefs_sets])}"
f" should match length of input series {len(series)}"
)
if not len(self.coefs_sets) == len(self.powers):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters. Length of coefs_sets {len(self.coefs_sets)}"
f" should match length of powers {len(self.powers)}"
)
def _check_ignore_input_indexes_params(self, series: list[np.ndarray]):
if len(self.ignore_input_indexes) != len(set(self.ignore_input_indexes)):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters. Values ignore_input_indexes"
f" should be unique. Current values:{self.ignore_input_indexes}"
)
if not all(index < len(series) for index in self.ignore_input_indexes):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters. Each value of ignore_input_indexes"
f" should be lower than the number of series as input - lower than {len(series)}."
)
if len(self.ignore_input_indexes) >= len(series):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__}. Length of ignore_input_indexes"
f" should be inferior to length of input series - lower than {len(series)}."
)
def _check_diminishing_hill_params(self, series: list[np.ndarray]):
# Check to make sure hill parameters are the same length
if not (len(self.K_sets) == len(self.S_sets) == len(self.max_sets)):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__}. Length of hill parameters should"
f" all equal each other. K = {len(self.K_sets)}, S = {len(self.S_sets)}, Max = {len(self.max_sets)}"
)
# Checkk Hill functions parameters
if any((K <= 0) or (K >= 1) for K in self.K_sets):
raise ParameterScopeException(f"All parameters K must be in ]0, 1[, not {self.K_sets}.")
if any((S <= 0) for S in self.S_sets):
raise ParameterScopeException(f"All parameters S must be strictly positive, not {self.S_sets}.")
if any((max <= 0) for max in self.max_sets):
raise ParameterScopeException(f"All parameters max must be strictly positive, not {self.max_sets}.")
series_indexation: list[tuple[int, int]] property
Indexations of the series based on their aggregation policy.
Returns:
| Type | Description |
|---|---|
list[tuple[int, int]] | list[tuple[int, int]]: List of indexation segments where the series add to be added together. For example, [(0,3), (3,5)] means the first forth series has to be sum up together and the two next ones has to be sum up together. |
repartition(series)
Returns the repartition of the output series from the initial input series. First, the function create weight serie based on referential series for each series of hill or identify function of aggregation and each component of the multiply function.
Source code in eki_mmo_equations/n_to_one_transformations/diminishing_hill_multilinear_multiplication.py
def repartition(self, series: list[np.ndarray]) -> list[np.ndarray]:
"""Returns the repartition of the output series from the initial input series.
First, the function create weight serie based on referential series for each series of hill or
identify function of aggregation and each component of the multiply function.
"""
# Create weight series within each aggregation set
repartition_series = [serie for i, serie in enumerate(series) if i not in self.ignore_input_indexes]
repartition_coefs_sets: list[list[float]] = []
ignore_power_indexes: list[float] = []
# Filter out the coefs from series that will be ignored (stated in repartition_series)
# If any entirety of an originally aggregated series set is to be filtered/ignored,
# the associated power_index of the filtered aggregated serie will also be filtered
# and tracked with ignore_power_indexes
for k, repartition_coefs_set in enumerate(
[
[
coef
for j, coef in enumerate(coefs_set)
if j + series_indexation_set[0] not in self.ignore_input_indexes
]
for coefs_set, series_indexation_set in zip(self.coefs_sets, self.series_indexation)
]
):
if len(repartition_coefs_set) > 0:
repartition_coefs_sets.append(repartition_coefs_set)
else:
ignore_power_indexes.append(k)
# Get the new indexation of the filtered series
repartition_series_indexation = self._get_series_indexation(repartition_coefs_sets)
# Filter out appropriate indexes from post-serie aggregation arguments
repartition_powers = [power for i, power in enumerate(self.powers) if i not in ignore_power_indexes]
repartition_K_sets = [K for i, K in enumerate(self.K_sets) if i not in ignore_power_indexes]
repartition_S_sets = [S for i, S in enumerate(self.S_sets) if i not in ignore_power_indexes]
repartition_max_sets = [max for i, max in enumerate(self.max_sets) if i not in ignore_power_indexes]
multilinear_weighted_series = [
np.array(
[
serie * coef
for serie, coef in zip(
repartition_series[series_indexation_set[0] : series_indexation_set[1]], coef_set # noqa: E203
)
]
)
for series_indexation_set, coef_set in zip(repartition_series_indexation, repartition_coefs_sets)
]
# Create weight series for each aggregation set component
multilinear_aggregated_series = [
np.sum(multilinear_weighted_series_set, axis=0)
for multilinear_weighted_series_set in multilinear_weighted_series
]
# Using the repartitioned K set as an indicator to how many
# aggregated series to apply the hill function to
hill_multilinear_aggregated_series = np.array(
[
DiminishingHill(K=K, S=S, max=max).transform(serie)
for serie, K, S, max in zip(
multilinear_aggregated_series[0 : len(repartition_K_sets)], # noqa: E203
repartition_K_sets,
repartition_S_sets,
repartition_max_sets,
)
]
+ [serie for serie in multilinear_aggregated_series[len(repartition_K_sets) :]] # noqa: E203
)
powered_series = np.array(
[serie**power for serie, power in zip(hill_multilinear_aggregated_series, repartition_powers)]
)
# Create weighted series within each aggregation set to have it in term of proportion
with np.errstate(divide="ignore", invalid="ignore"):
powered_weighted_series = powered_series / reduce(lambda a, b: a * b, powered_series)
powered_proportionned_weighted_series = np.nan_to_num(
powered_weighted_series / np.sum(powered_weighted_series, axis=0)
)
# Create weighted serie for each aggregation set component to have it in term of proportion
multilinear_proportionned_weighted_series = [
np.nan_to_num(multilinear_weighted_series_set / np.sum(multilinear_weighted_series_set, axis=0))
for multilinear_weighted_series_set in multilinear_weighted_series
]
# Display the component proportion within each aggregation set to get a global proportion of the series
proportionned_weighted_series = [
powered_proportionned_weighted_serie * multilinear_proportionned_weighted_serie
for powered_proportionned_weighted_serie, multilinear_proportionned_weighted_series_set in zip(
powered_proportionned_weighted_series, multilinear_proportionned_weighted_series
)
for multilinear_proportionned_weighted_serie in multilinear_proportionned_weighted_series_set
]
for index in self.ignore_input_indexes:
proportionned_weighted_series.insert(index, np.zeros(len(series[index])))
return proportionned_weighted_series