Skip to content

multilinear_multiplication

MultilinearMultiplication

Bases: NToOneTransformer

N-to-1 transformation allowing to get 1 variable by applying a linear combination of N other variables.

Parameters:

Name Type Description Default
coefs_sets list[list[float]]

Coefficients of the multilinear functions following the shape of the formula. Example: coefs_sets = [[coef_11, coef_12, .., coef_1N], [coef_21, coef_22, .., coef_2N]]

required
powers list[float]

powers (list[float]): Powers applied to each multilinear aggregation concerned by the multiplicative transformation. Example: powers = [power_1, power_2]

required
ignore_input_indexes Optional[List[int]]

Indexes of the data in series to ignore when computing the repartition.

None
Source code in eki_mmo_equations/n_to_one_transformations/multilinear_multiplication.py
class MultilinearMultiplication(NToOneTransformer):
    """N-to-1 transformation allowing to get 1 variable by applying a linear combination of N other variables.

    ```math
    (\\text{coef}_{11} \\times \\text{serie}_{11} + \\text{coef}_{12} \\times{series}_{12} +
    \\ldots + \\text{coef}_{1N} \\times \\text{serie}_{1N})^{\\text{power}_1} \\\\
    \\times \\\\
    (\\text{coef}_{21} \\times \\text{serie}_{21} + \\text{coef}_{22} \\times{series}_{22} +
    \\ldots + \\text{coef}_{2N} \\times \\text{serie}_{2N})^{\\text{power}_2}
    ```

    Args:
        coefs_sets (list[list[float]]): Coefficients of the multilinear functions following the shape of the formula.
            Example: coefs_sets = [[coef_11, coef_12, .., coef_1N], [coef_21, coef_22, .., coef_2N]]
        powers (list[float]): powers (list[float]): Powers applied to each multilinear aggregation concerned by
            the multiplicative transformation. Example: powers = [power_1, power_2]
        ignore_input_indexes (Optional[List[int]], optional): Indexes of the data in series to ignore when
            computing the repartition.

    """

    def __init__(
        self, coefs_sets: list[list[float]], powers: list[float], ignore_input_indexes: Optional[list[int]] = None
    ):
        self.coefs_sets: list[list[float]] = coefs_sets
        self.powers: list[float] = powers
        self.ignore_input_indexes: list[int] = ignore_input_indexes if ignore_input_indexes else []

    @property
    def parameters(self) -> dict[str, Any]:
        return self.__dict__

    @property
    def series_indexation(self) -> list[tuple[int, int]]:
        """Indexations of the series based on their aggregation policy.

        Returns:
            list[tuple[int, int]]: List of indexation segments where the series add to be added together.
                For example, [(0,3), (3,5)] means the first forth series has to be sum up together and
                the two next ones has to be sum up together.
        """
        return self._get_series_indexation(self.coefs_sets)

    def _get_series_indexation(self, coefs_sets: list[list[float]]) -> list[tuple[int, int]]:
        """Indexations of the series based on their aggregation policy.

        Args:
            coefs_sets (list[list[float]]): Coefs sets from there the indexations of the series structure
                is deduced from.

        Returns:
            list[tuple[int, int]]: List of indexation segments where the series add to be added together.
                For example, [(0,3), (4,5)] means the first forth series has to be sum up together and
                the two next ones has to be sum up together.
        """
        lower_bound: int = 0
        series_indexation: list[tuple[int, int]] = []
        for coefs_set in coefs_sets:
            upper_bound = lower_bound + len(coefs_set)
            series_indexation.append((lower_bound, upper_bound))
            lower_bound = upper_bound
        return series_indexation

    # ------- METHODS -------

    def transform(self, series: list[np.ndarray], copy=False) -> np.ndarray:
        series = super().transform(series, copy)

        return self._transformer(series, self.coefs_sets, self.powers, self.series_indexation)

    def repartition(self, series: list[np.ndarray]) -> list[np.ndarray]:
        """Returns the repartition of the output series from the initial input series.
        First, the function create weight serie based on referential series for each series of
        aggregation and each component of the multiply function.
        """
        # Create weight series within each aggregation set
        repartition_series = [serie for i, serie in enumerate(series) if i not in self.ignore_input_indexes]
        repartition_coefs_sets: list[list[float]] = []
        ignore_power_indexes: list[float] = []

        for k, repartition_coefs_set in enumerate(
            [
                [
                    coef
                    for j, coef in enumerate(coefs_set)
                    if j + series_indexation_set[0] not in self.ignore_input_indexes
                ]
                for coefs_set, series_indexation_set in zip(self.coefs_sets, self.series_indexation)
            ]
        ):
            if len(repartition_coefs_set) > 0:
                repartition_coefs_sets.append(repartition_coefs_set)
            else:
                ignore_power_indexes.append(k)

        repartition_series_indexation = self._get_series_indexation(repartition_coefs_sets)

        repartition_powers = [power for i, power in enumerate(self.powers) if i not in ignore_power_indexes]

        multilinear_weighted_series = [
            np.array(
                [
                    serie * coef
                    for serie, coef in zip(
                        repartition_series[series_indexation_set[0] : series_indexation_set[1]], coef_set  # noqa: E203
                    )
                ]
            )
            for series_indexation_set, coef_set in zip(repartition_series_indexation, repartition_coefs_sets)
        ]

        # Create weight series for each aggregation set component
        multilinear_aggregated_series = [
            np.sum(multilinear_weighted_series_set, axis=0)
            for multilinear_weighted_series_set in multilinear_weighted_series
        ]

        powered_series = np.array(
            [serie**power for serie, power in zip(multilinear_aggregated_series, repartition_powers)]
        )

        # Create weighted series within each aggregation set to have it in term of proportion
        with np.errstate(divide="ignore", invalid="ignore"):
            powered_weighted_series = powered_series / reduce(lambda a, b: a * b, powered_series)
        powered_proportionned_weighted_series = np.nan_to_num(
            powered_weighted_series / np.sum(powered_weighted_series, axis=0)
        )

        # Create weighted serie for each aggregation set component to have it in term of proportion
        multilinear_proportionned_weighted_series = [
            np.nan_to_num(multilinear_weighted_series_set / np.sum(multilinear_weighted_series_set, axis=0))
            for multilinear_weighted_series_set in multilinear_weighted_series
        ]

        # Display the component proportion within each aggregation set to get a global proportion of the series
        proportionned_weighted_series = [
            powered_proportionned_weighted_serie * multilinear_proportionned_weighted_serie
            for powered_proportionned_weighted_serie, multilinear_proportionned_weighted_series_set in zip(
                powered_proportionned_weighted_series, multilinear_proportionned_weighted_series
            )
            for multilinear_proportionned_weighted_serie in multilinear_proportionned_weighted_series_set
        ]
        for index in self.ignore_input_indexes:
            proportionned_weighted_series.insert(index, np.zeros(len(series[index])))

        return proportionned_weighted_series

    # ------- TRANSFORMERS -------

    @staticmethod
    def _transformer(
        series: list[np.ndarray],
        coefs_sets: list[list[float]],
        powers: list[float],
        series_indexation: list[tuple[int, int]],
    ) -> np.ndarray:
        "Returns the multiplication of linear combination of the different input variables."
        multilinear_transformed_series = [
            np.sum(
                [
                    serie * coef
                    for serie, coef in zip(
                        series[series_indexation_set[0] : series_indexation_set[1]], coef_set  # noqa: E203
                    )
                ],
                axis=0,
            )
            for series_indexation_set, coef_set in zip(series_indexation, coefs_sets)
        ]
        transformed_series = [serie**power for serie, power in zip(multilinear_transformed_series, powers)]
        return reduce(lambda a, b: a * b, transformed_series)

    # ------- CHECKERS -------

    def check_params(self, series: list[np.ndarray]):
        """Check if parameters respect their application scope."""
        if not np.sum([len(coefs_set) for coefs_set in self.coefs_sets]) == len(series):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__} parameters."
                f"Length of concatenated coefs_sets {np.sum([len(coefs_set) for coefs_set in self.coefs_sets])}"
                f" should match length of input series {len(series)}"
            )
        if not len(self.coefs_sets) == len(self.powers):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__} parameters. Length of coefs_sets {len(self.coefs_sets)}"
                f" should match length of powers {len(self.powers)}"
            )
        if len(self.ignore_input_indexes) != len(set(self.ignore_input_indexes)):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__} parameters. Values ignore_input_indexes"
                f" should be unique. Current values:{self.ignore_input_indexes}"
            )
        if not all(index < len(series) for index in self.ignore_input_indexes):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__} parameters. Each value of ignore_input_indexes"
                f" should be lower than the number of series as input - lower than {len(series)}."
            )
        if len(self.ignore_input_indexes) >= len(series):
            raise ParameterScopeException(
                f"Incorrect {self.__class__.__name__}. Length of ignore_input_indexes"
                f" should be inferior to length of input series - lower than {len(series)}."
            )

series_indexation: list[tuple[int, int]] property

Indexations of the series based on their aggregation policy.

Returns:

Type Description
list[tuple[int, int]]

list[tuple[int, int]]: List of indexation segments where the series add to be added together. For example, [(0,3), (3,5)] means the first forth series has to be sum up together and the two next ones has to be sum up together.

check_params(series)

Check if parameters respect their application scope.

Source code in eki_mmo_equations/n_to_one_transformations/multilinear_multiplication.py
def check_params(self, series: list[np.ndarray]):
    """Check if parameters respect their application scope."""
    if not np.sum([len(coefs_set) for coefs_set in self.coefs_sets]) == len(series):
        raise ParameterScopeException(
            f"Incorrect {self.__class__.__name__} parameters."
            f"Length of concatenated coefs_sets {np.sum([len(coefs_set) for coefs_set in self.coefs_sets])}"
            f" should match length of input series {len(series)}"
        )
    if not len(self.coefs_sets) == len(self.powers):
        raise ParameterScopeException(
            f"Incorrect {self.__class__.__name__} parameters. Length of coefs_sets {len(self.coefs_sets)}"
            f" should match length of powers {len(self.powers)}"
        )
    if len(self.ignore_input_indexes) != len(set(self.ignore_input_indexes)):
        raise ParameterScopeException(
            f"Incorrect {self.__class__.__name__} parameters. Values ignore_input_indexes"
            f" should be unique. Current values:{self.ignore_input_indexes}"
        )
    if not all(index < len(series) for index in self.ignore_input_indexes):
        raise ParameterScopeException(
            f"Incorrect {self.__class__.__name__} parameters. Each value of ignore_input_indexes"
            f" should be lower than the number of series as input - lower than {len(series)}."
        )
    if len(self.ignore_input_indexes) >= len(series):
        raise ParameterScopeException(
            f"Incorrect {self.__class__.__name__}. Length of ignore_input_indexes"
            f" should be inferior to length of input series - lower than {len(series)}."
        )

repartition(series)

Returns the repartition of the output series from the initial input series. First, the function create weight serie based on referential series for each series of aggregation and each component of the multiply function.

Source code in eki_mmo_equations/n_to_one_transformations/multilinear_multiplication.py
def repartition(self, series: list[np.ndarray]) -> list[np.ndarray]:
    """Returns the repartition of the output series from the initial input series.
    First, the function create weight serie based on referential series for each series of
    aggregation and each component of the multiply function.
    """
    # Create weight series within each aggregation set
    repartition_series = [serie for i, serie in enumerate(series) if i not in self.ignore_input_indexes]
    repartition_coefs_sets: list[list[float]] = []
    ignore_power_indexes: list[float] = []

    for k, repartition_coefs_set in enumerate(
        [
            [
                coef
                for j, coef in enumerate(coefs_set)
                if j + series_indexation_set[0] not in self.ignore_input_indexes
            ]
            for coefs_set, series_indexation_set in zip(self.coefs_sets, self.series_indexation)
        ]
    ):
        if len(repartition_coefs_set) > 0:
            repartition_coefs_sets.append(repartition_coefs_set)
        else:
            ignore_power_indexes.append(k)

    repartition_series_indexation = self._get_series_indexation(repartition_coefs_sets)

    repartition_powers = [power for i, power in enumerate(self.powers) if i not in ignore_power_indexes]

    multilinear_weighted_series = [
        np.array(
            [
                serie * coef
                for serie, coef in zip(
                    repartition_series[series_indexation_set[0] : series_indexation_set[1]], coef_set  # noqa: E203
                )
            ]
        )
        for series_indexation_set, coef_set in zip(repartition_series_indexation, repartition_coefs_sets)
    ]

    # Create weight series for each aggregation set component
    multilinear_aggregated_series = [
        np.sum(multilinear_weighted_series_set, axis=0)
        for multilinear_weighted_series_set in multilinear_weighted_series
    ]

    powered_series = np.array(
        [serie**power for serie, power in zip(multilinear_aggregated_series, repartition_powers)]
    )

    # Create weighted series within each aggregation set to have it in term of proportion
    with np.errstate(divide="ignore", invalid="ignore"):
        powered_weighted_series = powered_series / reduce(lambda a, b: a * b, powered_series)
    powered_proportionned_weighted_series = np.nan_to_num(
        powered_weighted_series / np.sum(powered_weighted_series, axis=0)
    )

    # Create weighted serie for each aggregation set component to have it in term of proportion
    multilinear_proportionned_weighted_series = [
        np.nan_to_num(multilinear_weighted_series_set / np.sum(multilinear_weighted_series_set, axis=0))
        for multilinear_weighted_series_set in multilinear_weighted_series
    ]

    # Display the component proportion within each aggregation set to get a global proportion of the series
    proportionned_weighted_series = [
        powered_proportionned_weighted_serie * multilinear_proportionned_weighted_serie
        for powered_proportionned_weighted_serie, multilinear_proportionned_weighted_series_set in zip(
            powered_proportionned_weighted_series, multilinear_proportionned_weighted_series
        )
        for multilinear_proportionned_weighted_serie in multilinear_proportionned_weighted_series_set
    ]
    for index in self.ignore_input_indexes:
        proportionned_weighted_series.insert(index, np.zeros(len(series[index])))

    return proportionned_weighted_series