Skip to content

diminishing_hill_multilinear_multiplication

DiminishingHillMultilinearMultiplication

Bases: NToOneTransformer

N-to-1 transformation allowing to get 1 variable by applying a linear combination of N other variables, applying appropriate Hill functions, and finalizing with multiplication.

The hill function is as follows:

Parameters:

Name Type Description Default
coefs_sets list[list[float]]

Coefficients of the multilinear functions following the shape of the formula. Example: coefs_sets = [ [coef_11, coef_12, .., coef_1N], [coef_21, coef_22, .., coef_2N], … [coef_Q1, coef_Q2, .., coef_QN] ]

required
powers list[float]

Powers applied to each Hill or Identify function of multilinear aggregation concerned by the multiplicative transformation. Example: powers = [power_1, power_2, …, power_Q]

required
K_sets list[float]

List of K parameters that will be applied after aggregating variables Example: K_sets = [K_1, …, K_M]

required
S_sets list[float]

List of S parameters that will be applied after aggregating variables Example: S_sets = [S_1, …, S_M]

required
max_sets Optional[list[float]]

List of max parameters that will be applied after aggregating variables. Example: K_sets = [max_1, …, max_M]

None
ignore_input_indexes Optional[List[int]]

Indexes of the data in series to ignore when computing the repartition.

None
Source code in eki_mmo_equations/n_to_one_transformations/diminishing_hill_multilinear_multiplication.py
class DiminishingHillMultilinearMultiplication(NToOneTransformer):
    """N-to-1 transformation allowing to get 1 variable by applying a linear combination of N other variables,
       applying appropriate Hill functions, and finalizing with multiplication.

    ```math
    (\\text{Hill}(\\text{coef}_{11} \\times \\text{serie}_{11} + \\text{coef}_{12} \\times \\text{serie}_{12} +
    \\ldots + \\text{coef}_{1N} \\times \\text{serie}_{1N}))^{\\text{power}_1} \\\\
    \\times \\\\
    (\\text{Hill}(\\text{coef}_{21} \\times \\text{serie}_{21} + \\text{coef}_{22} \\times \\text{serie}_{22} +
    \\ldots + \\text{coef}_{2N} \\times \\text{serie}_{2N}))^{\\text{power}_2} \\\\
    \\times \\\\
    (\\text{coef}_{P1} \\times \\text{serie}_{P1} + \\text{coef}_{P2} \\times \\text{serie}_{P2} +
    \\ldots + \\text{coef}_{PN} \\times \\text{serie}_{PN})^{\\text{power}_{P}} \\\\
    \\times \\\\
    (\\text{coef}_{Q1} \\times \\text{serie}_{Q1} + \\text{coef}_{Q2} \\times \\text{serie}_{Q2} +
    \\ldots + \\text{coef}_{QN} \\times \\text{serie}_{QN})^{\\text{power}_{Q}}
    ```

    The hill function is as follows:
    ```math
        \\frac{1}{1 + (\\frac{K \\times max}{serie})^S}
    ```

    Args:
        coefs_sets (list[list[float]]): Coefficients of the multilinear functions following the shape of the formula.
            Example: coefs_sets = [
                [coef_11, coef_12, .., coef_1N], [coef_21, coef_22, .., coef_2N], ... [coef_Q1, coef_Q2, .., coef_QN]
            ]
        powers (list[float]): Powers applied to each Hill or Identify function of multilinear aggregation concerned by
            the multiplicative transformation. Example: powers = [power_1, power_2, ..., power_Q]
        K_sets (list[float]): List of K parameters that will be applied after aggregating variables
            Example: K_sets = [K_1, ..., K_M]
        S_sets (list[float]): List of S parameters that will be applied after aggregating variables
            Example: S_sets = [S_1, ..., S_M]
        max_sets (Optional[list[float]], optional): List of max parameters that will be applied after aggregating
            variables. Example: K_sets = [max_1, ..., max_M]
        ignore_input_indexes (Optional[List[int]], optional): Indexes of the data in series to ignore when
            computing the repartition.
    """

    def __init__(
        self,
        coefs_sets: list[list[float]],
        powers: list[float],
        K_sets: list[float],
        S_sets: list[float],
        max_sets: Optional[list[float]] = None,
        ignore_input_indexes: Optional[list[int]] = None,
    ):
        self.coefs_sets: list[list[float]] = coefs_sets
        self.powers: list[float] = powers
        self.K_sets: list[float] = K_sets
        self.S_sets: list[float] = S_sets
        self.max_sets: list[float] = [np.nan] * len(self.K_sets) if max_sets is None else max_sets
        self.ignore_input_indexes: list[int] = ignore_input_indexes if ignore_input_indexes else []

    @property
    def parameters(self) -> dict[str, Any]:
        return self.__dict__

    @property
    def series_indexation(self) -> list[tuple[int, int]]:
        """Indexations of the series based on their aggregation policy.

        Returns:
            list[tuple[int, int]]: List of indexation segments where the series add to be added together.
                For example, [(0,3), (3,5)] means the first forth series has to be sum up together and
                the two next ones has to be sum up together.
        """
        return self._get_series_indexation(self.coefs_sets)

    def _get_series_indexation(self, coefs_sets: list[list[float]]) -> list[tuple[int, int]]:
        """Indexations of the series based on their aggregation policy.

        Args:
            coefs_sets (list[list[float]]): Coefs sets from there the indexations of the series structure
                is deduced from.

        Returns:
            list[tuple[int, int]]: List of indexation segments where the series are to be added together.
                For example, [(0,3), (4,5)] means the first to forth series are to be summed up together and
                the next two, the fifth and sixth series, are to be summed up together.
        """
        lower_bound: int = 0
        series_indexation: list[tuple[int, int]] = []
        for coefs_set in coefs_sets:
            upper_bound = lower_bound + len(coefs_set)
            series_indexation.append((lower_bound, upper_bound))
            lower_bound = upper_bound
        return series_indexation

    # ------- METHODS -------
    def fit(self, series: list[np.ndarray], y=None):
        # This method creates the appropriate max_sets if
        # self.max_sets was not given
        max_sets = []
        # Applying only the aggregating of series here
        aggregated_series = [
            np.sum(
                [
                    serie * coef
                    for serie, coef in zip(
                        series[series_indexation_set[0] : series_indexation_set[1]], coef_set  # noqa: E203
                    )
                ],
                axis=0,
            )
            for series_indexation_set, coef_set in zip(self.series_indexation, self.coefs_sets)
        ]
        for max, aggregated_series_set in zip(self.max_sets, aggregated_series):
            if (np.isnan(max)) | (max is None):
                if np.any(aggregated_series_set[aggregated_series_set > 0]):
                    max = aggregated_series_set.max()
                else:
                    max = aggregated_series_set.min() or 1
            max_sets.append(max)
        self.max_sets = max_sets

        return super().fit(series, y)

    def transform(self, series: list[np.ndarray], copy=False) -> np.ndarray:
        series = super().transform(series, copy)

        return self._transformer(
            series,
            self.coefs_sets,
            self.powers,
            self.K_sets,
            self.S_sets,
            self.max_sets,
            self.series_indexation,
        )

    def repartition(self, series: list[np.ndarray]) -> list[np.ndarray]:
        """Returns the repartition of the output series from the initial input series.
        First, the function create weight serie based on referential series for each series of hill or
        identify function of aggregation and each component of the multiply function.
        """
        # Create weight series within each aggregation set
        repartition_series = [serie for i, serie in enumerate(series) if i not in self.ignore_input_indexes]
        repartition_coefs_sets: list[list[float]] = []
        ignore_power_indexes: list[float] = []

        # Filter out the coefs from series that will be ignored (stated in repartition_series)
        # If any entirety of an originally aggregated series set is to be filtered/ignored,
        # the associated power_index of the filtered aggregated serie will also be filtered
        # and tracked with ignore_power_indexes
        for k, repartition_coefs_set in enumerate(
            [
                [
                    coef
                    for j, coef in enumerate(coefs_set)
                    if j + series_indexation_set[0] not in self.ignore_input_indexes
                ]
                for coefs_set, series_indexation_set in zip(self.coefs_sets, self.series_indexation)
            ]
        ):
            if len(repartition_coefs_set) > 0:
                repartition_coefs_sets.append(repartition_coefs_set)
            else:
                ignore_power_indexes.append(k)

        # Get the new indexation of the filtered series
        repartition_series_indexation = self._get_series_indexation(repartition_coefs_sets)

        # Filter out appropriate indexes from post-serie aggregation arguments
        repartition_powers = [power for i, power in enumerate(self.powers) if i not in ignore_power_indexes]
        repartition_K_sets = [K for i, K in enumerate(self.K_sets) if i not in ignore_power_indexes]
        repartition_S_sets = [S for i, S in enumerate(self.S_sets) if i not in ignore_power_indexes]
        repartition_max_sets = [max for i, max in enumerate(self.max_sets) if i not in ignore_power_indexes]

        multilinear_weighted_series = [
            np.array(
                [
                    serie * coef
                    for serie, coef in zip(
                        repartition_series[series_indexation_set[0] : series_indexation_set[1]], coef_set  # noqa: E203
                    )
                ]
            )
            for series_indexation_set, coef_set in zip(repartition_series_indexation, repartition_coefs_sets)
        ]

        # Create weight series for each aggregation set component
        multilinear_aggregated_series = [
            np.sum(multilinear_weighted_series_set, axis=0)
            for multilinear_weighted_series_set in multilinear_weighted_series
        ]

        # Using the repartitioned K set as an indicator to how many
        # aggregated series to apply the hill function to
        hill_multilinear_aggregated_series = np.array(
            [
                DiminishingHill(K=K, S=S, max=max).transform(serie)
                for serie, K, S, max in zip(
                    multilinear_aggregated_series[0 : len(repartition_K_sets)],  # noqa: E203
                    repartition_K_sets,
                    repartition_S_sets,
                    repartition_max_sets,
                )
            ]
            + [serie for serie in multilinear_aggregated_series[len(repartition_K_sets) :]]  # noqa: E203
        )
        powered_series = np.array(
            [serie**power for serie, power in zip(hill_multilinear_aggregated_series, repartition_powers)]
        )

        # Create weighted series within each aggregation set to have it in term of proportion
        with np.errstate(divide="ignore", invalid="ignore"):
            powered_weighted_series = powered_series / reduce(lambda a, b: a * b, powered_series)
        powered_proportionned_weighted_series = np.nan_to_num(
            powered_weighted_series / np.sum(powered_weighted_series, axis=0)
        )

        # Create weighted serie for each aggregation set component to have it in term of proportion
        multilinear_proportionned_weighted_series = [
            np.nan_to_num(multilinear_weighted_series_set / np.sum(multilinear_weighted_series_set, axis=0))
            for multilinear_weighted_series_set in multilinear_weighted_series
        ]

        # Display the component proportion within each aggregation set to get a global proportion of the series
        proportionned_weighted_series = [
            powered_proportionned_weighted_serie * multilinear_proportionned_weighted_serie
            for powered_proportionned_weighted_serie, multilinear_proportionned_weighted_series_set in zip(
                powered_proportionned_weighted_series, multilinear_proportionned_weighted_series
            )
            for multilinear_proportionned_weighted_serie in multilinear_proportionned_weighted_series_set
        ]
        for index in self.ignore_input_indexes:
            proportionned_weighted_series.insert(index, np.zeros(len(series[index])))

        return proportionned_weighted_series

    # ------- TRANSFORMERS -------

    # Main method that must be changed
    @staticmethod
    def _transformer(
        series: list[np.ndarray],
        coefs_sets: list[list[float]],
        powers: list[float],
        K_sets: list[float],
        S_sets: list[float],
        max_sets: list[float],
        series_indexation: list[tuple[int, int]],
    ) -> np.ndarray:
        """Returns the multiplication of the hill or identity function being applied to
        linear combinations of the different input variables."""

        number_of_hill_parameters = len(K_sets)

        # Applying only the aggregating of series here
        hill_transformed_series = [
            np.sum(
                [
                    serie * coef
                    for serie, coef in zip(
                        series[series_indexation_set[0] : series_indexation_set[1]], coef_set  # noqa: E203
                    )
                ],
                axis=0,
            )
            for series_indexation_set, coef_set in zip(series_indexation, coefs_sets)
        ]

        # Apply Hill function to aggregated series by order and length of Hill parameters
        for i in range(number_of_hill_parameters):
            hill_transformed_series[i] = DiminishingHill(K=K_sets[i], S=S_sets[i], max=max_sets[i]).transform(
                hill_transformed_series[i]
            )

        hill_transformed_series = [serie**power for serie, power in zip(hill_transformed_series, powers)]

        return reduce(lambda a, b: a * b, hill_transformed_series)

    # ------- CHECKERS -------

    def check_params(self, series: list[np.ndarray]):
        # Check to make sure length of hill parameters are
        if len(self.K_sets) > len(self.coefs_sets):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__}. Length of hill parameters"
                f" should be less than the number of total variables after aggregation."
            )
        self._check_multilinear_multiply_params(series)
        self._check_ignore_input_indexes_params(series)
        self._check_diminishing_hill_params(series)

    def _check_multilinear_multiply_params(self, series: list[np.ndarray]):
        """Check if parameters respect their application scope."""
        if not np.sum([len(coefs_set) for coefs_set in self.coefs_sets]) == len(series):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__} parameters."
                f"Length of concatenated coefs_sets {np.sum([len(coefs_set) for coefs_set in self.coefs_sets])}"
                f" should match length of input series {len(series)}"
            )
        if not len(self.coefs_sets) == len(self.powers):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__} parameters. Length of coefs_sets {len(self.coefs_sets)}"
                f" should match length of powers {len(self.powers)}"
            )

    def _check_ignore_input_indexes_params(self, series: list[np.ndarray]):
        if len(self.ignore_input_indexes) != len(set(self.ignore_input_indexes)):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__} parameters. Values ignore_input_indexes"
                f" should be unique. Current values:{self.ignore_input_indexes}"
            )
        if not all(index < len(series) for index in self.ignore_input_indexes):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__} parameters. Each value of ignore_input_indexes"
                f" should be lower than the number of series as input - lower than {len(series)}."
            )
        if len(self.ignore_input_indexes) >= len(series):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__}. Length of ignore_input_indexes"
                f" should be inferior to length of input series - lower than {len(series)}."
            )

    def _check_diminishing_hill_params(self, series: list[np.ndarray]):
        # Check to make sure hill parameters are the same length
        if not (len(self.K_sets) == len(self.S_sets) == len(self.max_sets)):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__}. Length of hill parameters should"
                f" all equal each other. K = {len(self.K_sets)}, S = {len(self.S_sets)}, Max = {len(self.max_sets)}"
            )
        # Checkk Hill functions parameters
        if any((K <= 0) or (K >= 1) for K in self.K_sets):
            raise ParameterScopeException(f"All parameters K must be in ]0, 1[, not {self.K_sets}.")

        if any((S <= 0) for S in self.S_sets):
            raise ParameterScopeException(f"All parameters S must be strictly positive, not {self.S_sets}.")

        if any((max <= 0) for max in self.max_sets):
            raise ParameterScopeException(f"All parameters max must be strictly positive, not {self.max_sets}.")

series_indexation: list[tuple[int, int]] property

Indexations of the series based on their aggregation policy.

Returns:

Type Description
list[tuple[int, int]]

list[tuple[int, int]]: List of indexation segments where the series add to be added together. For example, [(0,3), (3,5)] means the first forth series has to be sum up together and the two next ones has to be sum up together.

repartition(series)

Returns the repartition of the output series from the initial input series. First, the function create weight serie based on referential series for each series of hill or identify function of aggregation and each component of the multiply function.

Source code in eki_mmo_equations/n_to_one_transformations/diminishing_hill_multilinear_multiplication.py
def repartition(self, series: list[np.ndarray]) -> list[np.ndarray]:
    """Returns the repartition of the output series from the initial input series.
    First, the function create weight serie based on referential series for each series of hill or
    identify function of aggregation and each component of the multiply function.
    """
    # Create weight series within each aggregation set
    repartition_series = [serie for i, serie in enumerate(series) if i not in self.ignore_input_indexes]
    repartition_coefs_sets: list[list[float]] = []
    ignore_power_indexes: list[float] = []

    # Filter out the coefs from series that will be ignored (stated in repartition_series)
    # If any entirety of an originally aggregated series set is to be filtered/ignored,
    # the associated power_index of the filtered aggregated serie will also be filtered
    # and tracked with ignore_power_indexes
    for k, repartition_coefs_set in enumerate(
        [
            [
                coef
                for j, coef in enumerate(coefs_set)
                if j + series_indexation_set[0] not in self.ignore_input_indexes
            ]
            for coefs_set, series_indexation_set in zip(self.coefs_sets, self.series_indexation)
        ]
    ):
        if len(repartition_coefs_set) > 0:
            repartition_coefs_sets.append(repartition_coefs_set)
        else:
            ignore_power_indexes.append(k)

    # Get the new indexation of the filtered series
    repartition_series_indexation = self._get_series_indexation(repartition_coefs_sets)

    # Filter out appropriate indexes from post-serie aggregation arguments
    repartition_powers = [power for i, power in enumerate(self.powers) if i not in ignore_power_indexes]
    repartition_K_sets = [K for i, K in enumerate(self.K_sets) if i not in ignore_power_indexes]
    repartition_S_sets = [S for i, S in enumerate(self.S_sets) if i not in ignore_power_indexes]
    repartition_max_sets = [max for i, max in enumerate(self.max_sets) if i not in ignore_power_indexes]

    multilinear_weighted_series = [
        np.array(
            [
                serie * coef
                for serie, coef in zip(
                    repartition_series[series_indexation_set[0] : series_indexation_set[1]], coef_set  # noqa: E203
                )
            ]
        )
        for series_indexation_set, coef_set in zip(repartition_series_indexation, repartition_coefs_sets)
    ]

    # Create weight series for each aggregation set component
    multilinear_aggregated_series = [
        np.sum(multilinear_weighted_series_set, axis=0)
        for multilinear_weighted_series_set in multilinear_weighted_series
    ]

    # Using the repartitioned K set as an indicator to how many
    # aggregated series to apply the hill function to
    hill_multilinear_aggregated_series = np.array(
        [
            DiminishingHill(K=K, S=S, max=max).transform(serie)
            for serie, K, S, max in zip(
                multilinear_aggregated_series[0 : len(repartition_K_sets)],  # noqa: E203
                repartition_K_sets,
                repartition_S_sets,
                repartition_max_sets,
            )
        ]
        + [serie for serie in multilinear_aggregated_series[len(repartition_K_sets) :]]  # noqa: E203
    )
    powered_series = np.array(
        [serie**power for serie, power in zip(hill_multilinear_aggregated_series, repartition_powers)]
    )

    # Create weighted series within each aggregation set to have it in term of proportion
    with np.errstate(divide="ignore", invalid="ignore"):
        powered_weighted_series = powered_series / reduce(lambda a, b: a * b, powered_series)
    powered_proportionned_weighted_series = np.nan_to_num(
        powered_weighted_series / np.sum(powered_weighted_series, axis=0)
    )

    # Create weighted serie for each aggregation set component to have it in term of proportion
    multilinear_proportionned_weighted_series = [
        np.nan_to_num(multilinear_weighted_series_set / np.sum(multilinear_weighted_series_set, axis=0))
        for multilinear_weighted_series_set in multilinear_weighted_series
    ]

    # Display the component proportion within each aggregation set to get a global proportion of the series
    proportionned_weighted_series = [
        powered_proportionned_weighted_serie * multilinear_proportionned_weighted_serie
        for powered_proportionned_weighted_serie, multilinear_proportionned_weighted_series_set in zip(
            powered_proportionned_weighted_series, multilinear_proportionned_weighted_series
        )
        for multilinear_proportionned_weighted_serie in multilinear_proportionned_weighted_series_set
    ]
    for index in self.ignore_input_indexes:
        proportionned_weighted_series.insert(index, np.zeros(len(series[index])))

    return proportionned_weighted_series