multilinear_multiplication
MultilinearMultiplication
Bases: NToOneTransformer
N-to-1 transformation allowing to get 1 variable by applying a linear combination of N other variables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
coefs_sets | list[list[float]] | Coefficients of the multilinear functions following the shape of the formula. Example: coefs_sets = [[coef_11, coef_12, .., coef_1N], [coef_21, coef_22, .., coef_2N]] | required |
powers | list[float] | powers (list[float]): Powers applied to each multilinear aggregation concerned by the multiplicative transformation. Example: powers = [power_1, power_2] | required |
ignore_input_indexes | Optional[List[int]] | Indexes of the data in series to ignore when computing the repartition. | None |
Source code in eki_mmo_equations/n_to_one_transformations/multilinear_multiplication.py
class MultilinearMultiplication(NToOneTransformer):
"""N-to-1 transformation allowing to get 1 variable by applying a linear combination of N other variables.
```math
(\\text{coef}_{11} \\times \\text{serie}_{11} + \\text{coef}_{12} \\times{series}_{12} +
\\ldots + \\text{coef}_{1N} \\times \\text{serie}_{1N})^{\\text{power}_1} \\\\
\\times \\\\
(\\text{coef}_{21} \\times \\text{serie}_{21} + \\text{coef}_{22} \\times{series}_{22} +
\\ldots + \\text{coef}_{2N} \\times \\text{serie}_{2N})^{\\text{power}_2}
```
Args:
coefs_sets (list[list[float]]): Coefficients of the multilinear functions following the shape of the formula.
Example: coefs_sets = [[coef_11, coef_12, .., coef_1N], [coef_21, coef_22, .., coef_2N]]
powers (list[float]): powers (list[float]): Powers applied to each multilinear aggregation concerned by
the multiplicative transformation. Example: powers = [power_1, power_2]
ignore_input_indexes (Optional[List[int]], optional): Indexes of the data in series to ignore when
computing the repartition.
"""
def __init__(
self, coefs_sets: list[list[float]], powers: list[float], ignore_input_indexes: Optional[list[int]] = None
):
self.coefs_sets: list[list[float]] = coefs_sets
self.powers: list[float] = powers
self.ignore_input_indexes: list[int] = ignore_input_indexes if ignore_input_indexes else []
@property
def parameters(self) -> dict[str, Any]:
return self.__dict__
@property
def series_indexation(self) -> list[tuple[int, int]]:
"""Indexations of the series based on their aggregation policy.
Returns:
list[tuple[int, int]]: List of indexation segments where the series add to be added together.
For example, [(0,3), (3,5)] means the first forth series has to be sum up together and
the two next ones has to be sum up together.
"""
return self._get_series_indexation(self.coefs_sets)
def _get_series_indexation(self, coefs_sets: list[list[float]]) -> list[tuple[int, int]]:
"""Indexations of the series based on their aggregation policy.
Args:
coefs_sets (list[list[float]]): Coefs sets from there the indexations of the series structure
is deduced from.
Returns:
list[tuple[int, int]]: List of indexation segments where the series add to be added together.
For example, [(0,3), (4,5)] means the first forth series has to be sum up together and
the two next ones has to be sum up together.
"""
lower_bound: int = 0
series_indexation: list[tuple[int, int]] = []
for coefs_set in coefs_sets:
upper_bound = lower_bound + len(coefs_set)
series_indexation.append((lower_bound, upper_bound))
lower_bound = upper_bound
return series_indexation
# ------- METHODS -------
def transform(self, series: list[np.ndarray], copy=False) -> np.ndarray:
series = super().transform(series, copy)
return self._transformer(series, self.coefs_sets, self.powers, self.series_indexation)
def repartition(self, series: list[np.ndarray]) -> list[np.ndarray]:
"""Returns the repartition of the output series from the initial input series.
First, the function create weight serie based on referential series for each series of
aggregation and each component of the multiply function.
"""
# Create weight series within each aggregation set
repartition_series = [serie for i, serie in enumerate(series) if i not in self.ignore_input_indexes]
repartition_coefs_sets: list[list[float]] = []
ignore_power_indexes: list[float] = []
for k, repartition_coefs_set in enumerate(
[
[
coef
for j, coef in enumerate(coefs_set)
if j + series_indexation_set[0] not in self.ignore_input_indexes
]
for coefs_set, series_indexation_set in zip(self.coefs_sets, self.series_indexation)
]
):
if len(repartition_coefs_set) > 0:
repartition_coefs_sets.append(repartition_coefs_set)
else:
ignore_power_indexes.append(k)
repartition_series_indexation = self._get_series_indexation(repartition_coefs_sets)
repartition_powers = [power for i, power in enumerate(self.powers) if i not in ignore_power_indexes]
multilinear_weighted_series = [
np.array(
[
serie * coef
for serie, coef in zip(
repartition_series[series_indexation_set[0] : series_indexation_set[1]], coef_set # noqa: E203
)
]
)
for series_indexation_set, coef_set in zip(repartition_series_indexation, repartition_coefs_sets)
]
# Create weight series for each aggregation set component
multilinear_aggregated_series = [
np.sum(multilinear_weighted_series_set, axis=0)
for multilinear_weighted_series_set in multilinear_weighted_series
]
powered_series = np.array(
[serie**power for serie, power in zip(multilinear_aggregated_series, repartition_powers)]
)
# Create weighted series within each aggregation set to have it in term of proportion
with np.errstate(divide="ignore", invalid="ignore"):
powered_weighted_series = powered_series / reduce(lambda a, b: a * b, powered_series)
powered_proportionned_weighted_series = np.nan_to_num(
powered_weighted_series / np.sum(powered_weighted_series, axis=0)
)
# Create weighted serie for each aggregation set component to have it in term of proportion
multilinear_proportionned_weighted_series = [
np.nan_to_num(multilinear_weighted_series_set / np.sum(multilinear_weighted_series_set, axis=0))
for multilinear_weighted_series_set in multilinear_weighted_series
]
# Display the component proportion within each aggregation set to get a global proportion of the series
proportionned_weighted_series = [
powered_proportionned_weighted_serie * multilinear_proportionned_weighted_serie
for powered_proportionned_weighted_serie, multilinear_proportionned_weighted_series_set in zip(
powered_proportionned_weighted_series, multilinear_proportionned_weighted_series
)
for multilinear_proportionned_weighted_serie in multilinear_proportionned_weighted_series_set
]
for index in self.ignore_input_indexes:
proportionned_weighted_series.insert(index, np.zeros(len(series[index])))
return proportionned_weighted_series
# ------- TRANSFORMERS -------
@staticmethod
def _transformer(
series: list[np.ndarray],
coefs_sets: list[list[float]],
powers: list[float],
series_indexation: list[tuple[int, int]],
) -> np.ndarray:
"Returns the multiplication of linear combination of the different input variables."
multilinear_transformed_series = [
np.sum(
[
serie * coef
for serie, coef in zip(
series[series_indexation_set[0] : series_indexation_set[1]], coef_set # noqa: E203
)
],
axis=0,
)
for series_indexation_set, coef_set in zip(series_indexation, coefs_sets)
]
transformed_series = [serie**power for serie, power in zip(multilinear_transformed_series, powers)]
return reduce(lambda a, b: a * b, transformed_series)
# ------- CHECKERS -------
def check_params(self, series: list[np.ndarray]):
"""Check if parameters respect their application scope."""
if not np.sum([len(coefs_set) for coefs_set in self.coefs_sets]) == len(series):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters."
f"Length of concatenated coefs_sets {np.sum([len(coefs_set) for coefs_set in self.coefs_sets])}"
f" should match length of input series {len(series)}"
)
if not len(self.coefs_sets) == len(self.powers):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters. Length of coefs_sets {len(self.coefs_sets)}"
f" should match length of powers {len(self.powers)}"
)
if len(self.ignore_input_indexes) != len(set(self.ignore_input_indexes)):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters. Values ignore_input_indexes"
f" should be unique. Current values:{self.ignore_input_indexes}"
)
if not all(index < len(series) for index in self.ignore_input_indexes):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters. Each value of ignore_input_indexes"
f" should be lower than the number of series as input - lower than {len(series)}."
)
if len(self.ignore_input_indexes) >= len(series):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__}. Length of ignore_input_indexes"
f" should be inferior to length of input series - lower than {len(series)}."
)
series_indexation: list[tuple[int, int]] property
Indexations of the series based on their aggregation policy.
Returns:
| Type | Description |
|---|---|
list[tuple[int, int]] | list[tuple[int, int]]: List of indexation segments where the series add to be added together. For example, [(0,3), (3,5)] means the first forth series has to be sum up together and the two next ones has to be sum up together. |
check_params(series)
Check if parameters respect their application scope.
Source code in eki_mmo_equations/n_to_one_transformations/multilinear_multiplication.py
def check_params(self, series: list[np.ndarray]):
"""Check if parameters respect their application scope."""
if not np.sum([len(coefs_set) for coefs_set in self.coefs_sets]) == len(series):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters."
f"Length of concatenated coefs_sets {np.sum([len(coefs_set) for coefs_set in self.coefs_sets])}"
f" should match length of input series {len(series)}"
)
if not len(self.coefs_sets) == len(self.powers):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters. Length of coefs_sets {len(self.coefs_sets)}"
f" should match length of powers {len(self.powers)}"
)
if len(self.ignore_input_indexes) != len(set(self.ignore_input_indexes)):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters. Values ignore_input_indexes"
f" should be unique. Current values:{self.ignore_input_indexes}"
)
if not all(index < len(series) for index in self.ignore_input_indexes):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__} parameters. Each value of ignore_input_indexes"
f" should be lower than the number of series as input - lower than {len(series)}."
)
if len(self.ignore_input_indexes) >= len(series):
raise ParameterScopeException(
f"Incorrect {self.__class__.__name__}. Length of ignore_input_indexes"
f" should be inferior to length of input series - lower than {len(series)}."
)
repartition(series)
Returns the repartition of the output series from the initial input series. First, the function create weight serie based on referential series for each series of aggregation and each component of the multiply function.
Source code in eki_mmo_equations/n_to_one_transformations/multilinear_multiplication.py
def repartition(self, series: list[np.ndarray]) -> list[np.ndarray]:
"""Returns the repartition of the output series from the initial input series.
First, the function create weight serie based on referential series for each series of
aggregation and each component of the multiply function.
"""
# Create weight series within each aggregation set
repartition_series = [serie for i, serie in enumerate(series) if i not in self.ignore_input_indexes]
repartition_coefs_sets: list[list[float]] = []
ignore_power_indexes: list[float] = []
for k, repartition_coefs_set in enumerate(
[
[
coef
for j, coef in enumerate(coefs_set)
if j + series_indexation_set[0] not in self.ignore_input_indexes
]
for coefs_set, series_indexation_set in zip(self.coefs_sets, self.series_indexation)
]
):
if len(repartition_coefs_set) > 0:
repartition_coefs_sets.append(repartition_coefs_set)
else:
ignore_power_indexes.append(k)
repartition_series_indexation = self._get_series_indexation(repartition_coefs_sets)
repartition_powers = [power for i, power in enumerate(self.powers) if i not in ignore_power_indexes]
multilinear_weighted_series = [
np.array(
[
serie * coef
for serie, coef in zip(
repartition_series[series_indexation_set[0] : series_indexation_set[1]], coef_set # noqa: E203
)
]
)
for series_indexation_set, coef_set in zip(repartition_series_indexation, repartition_coefs_sets)
]
# Create weight series for each aggregation set component
multilinear_aggregated_series = [
np.sum(multilinear_weighted_series_set, axis=0)
for multilinear_weighted_series_set in multilinear_weighted_series
]
powered_series = np.array(
[serie**power for serie, power in zip(multilinear_aggregated_series, repartition_powers)]
)
# Create weighted series within each aggregation set to have it in term of proportion
with np.errstate(divide="ignore", invalid="ignore"):
powered_weighted_series = powered_series / reduce(lambda a, b: a * b, powered_series)
powered_proportionned_weighted_series = np.nan_to_num(
powered_weighted_series / np.sum(powered_weighted_series, axis=0)
)
# Create weighted serie for each aggregation set component to have it in term of proportion
multilinear_proportionned_weighted_series = [
np.nan_to_num(multilinear_weighted_series_set / np.sum(multilinear_weighted_series_set, axis=0))
for multilinear_weighted_series_set in multilinear_weighted_series
]
# Display the component proportion within each aggregation set to get a global proportion of the series
proportionned_weighted_series = [
powered_proportionned_weighted_serie * multilinear_proportionned_weighted_serie
for powered_proportionned_weighted_serie, multilinear_proportionned_weighted_series_set in zip(
powered_proportionned_weighted_series, multilinear_proportionned_weighted_series
)
for multilinear_proportionned_weighted_serie in multilinear_proportionned_weighted_series_set
]
for index in self.ignore_input_indexes:
proportionned_weighted_series.insert(index, np.zeros(len(series[index])))
return proportionned_weighted_series