Skip to content

Period Feature Set

Module for the PeriodFeatureSet class and PeriodFeatureConfig class.

Author: Daniel Wertheimer

PeriodFeatureConfig

Bases: PeriodMonthFeatureConfig

Configuration class for period-based feature calculation.

This class inherits from PeriodMonthFeatureConfig and is used to maintain backward compatibility. It is recommended to use PeriodMonthFeatureConfig for new implementations.

Source code in amee_utils/feature_generator/feature_set/period_month.py
class PeriodFeatureConfig(PeriodMonthFeatureConfig):
    """
    Configuration class for period-based feature calculation.

    This class inherits from PeriodMonthFeatureConfig and is used to maintain backward compatibility.
    It is recommended to use PeriodMonthFeatureConfig for new implementations.
    """

    def __attrs_post_init__(self):
        """
        Post-initialisation validation for PeriodFeatureConfig.

        Ensures valid types and values for the attributes.
        """
        import warnings

        super().__attrs_post_init__()
        warnings.warn(
            "PeriodFeatureConfig is deprecated. Use PeriodMonthFeatureConfig instead.",
            DeprecationWarning,
            stacklevel=2,
        )

PeriodMonthFeatureConfig

Bases: FeatureConfig

Configuration class for period-based feature calculation.

Attributes:

Name Type Description
lag_months list[int]

List of integers representing the number of months to lag. (default is [3, 6])

calculation_columns list[str]

List of columns to perform calculations on.

deviation_from_mean_months list[int]

List of integers representing months for mean deviation calculation.

calculate_deltas bool

Flag to determine if deltas should be calculated.

calculate_percentage_change bool

Flag to determine if percentage change should be calculated.

resolve_divide_by_zero bool

Flag to determine if divide by zero errors should be resolved.

Source code in amee_utils/feature_generator/feature_set/period_month.py
@attrs.define
class PeriodMonthFeatureConfig(FeatureConfig):
    """
    Configuration class for period-based feature calculation.

    Attributes
    ----------
    lag_months : list[int]
        List of integers representing the number of months to lag. (default is [3, 6])
    calculation_columns : list[str]
        List of columns to perform calculations on.
    deviation_from_mean_months : list[int]
        List of integers representing months for mean deviation calculation.
    calculate_deltas : bool
        Flag to determine if deltas should be calculated.
    calculate_percentage_change : bool
        Flag to determine if percentage change should be calculated.
    resolve_divide_by_zero : bool
        Flag to determine if divide by zero errors should be resolved.
    """

    lag_months: list[int] = [3, 6]
    calculation_columns: list[str] = []
    deviation_from_mean_months: list[int] = []
    calculate_deltas: bool = False
    calculate_percentage_change: bool = False
    resolve_divide_by_zero: bool = False

    def __attrs_post_init__(self):
        """
        Post-initialisation validation for PeriodFeatureConfig.

        Ensures valid types and values for the attributes.
        """
        if not isinstance(self.deviation_from_mean_months, list):
            raise ValueError("deviation_from_mean_months must be a list of integers.")
        if not self.calculation_columns:
            raise ValueError("calculation_columns must be a non-empty list of strings.")
        if self.resolve_divide_by_zero and (not self.calculate_percentage_change or not self.calculate_deltas):
            raise ValueError(
                "resolve_divide_by_zero can only be True if calculate_percentage_change and calculate_deltas are True."
            )

PeriodMonthFeatureSet

Bases: FeatureSet[PeriodMonthFeatureConfig]

FeatureSet class for calculating period-based features.

Methods:

Name Description
calculate

Calculate the features based on the given configurations.

deviation_from_mean

Calculate the deviation from mean for specified columns.

_calculate_deltas

Calculate the deltas between specified columns.

Source code in amee_utils/feature_generator/feature_set/period_month.py
class PeriodMonthFeatureSet(FeatureSet[PeriodMonthFeatureConfig]):
    """
    FeatureSet class for calculating period-based features.

    Methods
    -------
    calculate(df, dataset_config, feature_config, calculation_date) -> DataFrame
        Calculate the features based on the given configurations.

    deviation_from_mean(df, key_cols, agg_col, mean_months) -> DataFrame
        Calculate the deviation from mean for specified columns.

    _calculate_deltas(df, key_cols) -> DataFrame
        Calculate the deltas between specified columns.
    """

    def calculate(
        self,
        df: DataFrame,
        dataset_config: DatasetConfig,
        feature_config: PeriodMonthFeatureConfig,
        calculation_date: datetime,
    ) -> DataFrame:
        """
                Calculate period-based features.

        Parameters
        ----------
        df : DataFrame
            Input DataFrame.
        dataset_config : DatasetConfig
            Configuration for the dataset.
        feature_config : PeriodMonthFeatureConfig
            Configuration for the feature calculation.
        calculation_date : datetime
            Date for which the calculation is performed.

        Returns
        -------
        DataFrame
            DataFrame with the calculated features.
        """
        df = create_month_date_col(df=df, date_col=dataset_config.date_col)
        final_df = df.select(dataset_config.key_cols).distinct()
        lagged_aggregation = LaggedAggregation(
            periods_list=feature_config.lag_months, time_col="month", lag_type="period"
        )
        for col in feature_config.calculation_columns:
            base_df = df.select(dataset_config.key_cols).distinct()
            strategies = [SumAggregation(), MeanAggregation(), StddevAggregation()]
            agg_df = lagged_aggregation.apply(
                df=df,
                key_cols=dataset_config.key_cols,
                agg_col=col,
                strategies=strategies,
            )
            deviation_df = self.deviation_from_mean(
                df=df,
                key_cols=dataset_config.key_cols,
                agg_col=col,
                mean_months=feature_config.deviation_from_mean_months,
            )
            agg_df = join_multiple_to_base(
                base_df=base_df,
                df_list=[agg_df, deviation_df],
                key_cols=dataset_config.key_cols,
            )

            if feature_config.calculate_deltas:
                agg_df = self._calculate_deltas(df=agg_df, key_cols=dataset_config.key_cols)

            if feature_config.calculate_percentage_change:
                agg_df = calculate_percentage_change(
                    df=agg_df,
                    key_cols=dataset_config.key_cols,
                    round_digits=2,
                    resolve_division_by_zero=feature_config.resolve_divide_by_zero,
                )
            final_df = final_df.join(agg_df, on=dataset_config.key_cols, how="left")

        return final_df

    @staticmethod
    def deviation_from_mean(df: DataFrame, key_cols: list[str], agg_col: str, mean_months: list[int]) -> DataFrame:
        """
        Calculate the deviation from mean for the specified columns.

        Parameters
        ----------
        df : DataFrame
            Input DataFrame.
        key_cols : list[str]
            List of key columns.
        agg_col : str
            Column to aggregate.
        mean_months : list[int]
            List of months to calculate the mean deviation.

        Returns
        -------
        DataFrame
            DataFrame with the deviation from mean columns.
        """
        if not mean_months:
            raise ValueError("mean_months must be a non-empty list of integers.")
        CURRENT_COL = f"CURRENT_{agg_col}".upper()
        MEAN_COL = f"MEAN_{agg_col}".upper()
        base = df.select(*key_cols).distinct()

        for month in mean_months:
            FINAL_COL = f"DEVIATION_FROM_MEAN_{agg_col}_P{month}".upper()
            _df = create_period_lag_df(df=df, lag_months=month + 1, time_col="month")
            max_date = _df.agg(F.max("month")).collect()[0][0]

            comparison_df = (
                _df.filter(F.col("month") == max_date).groupBy(*key_cols).agg(F.sum(F.col(agg_col)).alias(CURRENT_COL))
            )

            monthly_totals = (
                _df.filter(F.col("month") < max_date)
                .groupBy(*key_cols, "month")
                .agg(F.sum(F.col(agg_col)).alias(MEAN_COL))
            )
            final_mean = monthly_totals.groupBy(*key_cols).agg(F.mean(F.col(MEAN_COL)).alias(MEAN_COL))
            comparison_df = comparison_df.join(final_mean, on=key_cols, how="left")
            comparison_df = comparison_df.withColumn(FINAL_COL, F.col(CURRENT_COL) - F.col(MEAN_COL)).select(
                *key_cols, FINAL_COL
            )
            base = base.join(comparison_df, on=key_cols, how="left")

        return base

    @staticmethod
    def _calculate_deltas(df: DataFrame, key_cols: list[str]) -> DataFrame:
        """
        Calculate the deltas between specified columns.

        Parameters
        ----------
        df : DataFrame
            Input DataFrame.
        key_cols : list[str]
            List of key columns.

        Returns
        -------
        DataFrame
            DataFrame with delta columns.
        """
        columns = [col for col in df.columns if col not in key_cols]
        column_pairs = get_pairs_from_columns(columns)
        for pair in column_pairs:
            for sorted_pair in pair.sorted_pairs:
                change_from_column = f"{pair.column_name}_{pair.prefix}{sorted_pair[0]}"
                change_to_column = f"{pair.column_name}_{pair.prefix}{sorted_pair[1]}"
                column_name_delta = f"{pair.column_name}_DELTA_P{sorted_pair[0]}_P{sorted_pair[1]}"

                df = df.withColumn(
                    column_name_delta,
                    F.col(change_from_column) - F.col(change_to_column),
                )

        return df

calculate(df, dataset_config, feature_config, calculation_date)

    Calculate period-based features.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
dataset_config DatasetConfig

Configuration for the dataset.

required
feature_config PeriodMonthFeatureConfig

Configuration for the feature calculation.

required
calculation_date datetime

Date for which the calculation is performed.

required

Returns:

Type Description
DataFrame

DataFrame with the calculated features.

Source code in amee_utils/feature_generator/feature_set/period_month.py
def calculate(
    self,
    df: DataFrame,
    dataset_config: DatasetConfig,
    feature_config: PeriodMonthFeatureConfig,
    calculation_date: datetime,
) -> DataFrame:
    """
            Calculate period-based features.

    Parameters
    ----------
    df : DataFrame
        Input DataFrame.
    dataset_config : DatasetConfig
        Configuration for the dataset.
    feature_config : PeriodMonthFeatureConfig
        Configuration for the feature calculation.
    calculation_date : datetime
        Date for which the calculation is performed.

    Returns
    -------
    DataFrame
        DataFrame with the calculated features.
    """
    df = create_month_date_col(df=df, date_col=dataset_config.date_col)
    final_df = df.select(dataset_config.key_cols).distinct()
    lagged_aggregation = LaggedAggregation(
        periods_list=feature_config.lag_months, time_col="month", lag_type="period"
    )
    for col in feature_config.calculation_columns:
        base_df = df.select(dataset_config.key_cols).distinct()
        strategies = [SumAggregation(), MeanAggregation(), StddevAggregation()]
        agg_df = lagged_aggregation.apply(
            df=df,
            key_cols=dataset_config.key_cols,
            agg_col=col,
            strategies=strategies,
        )
        deviation_df = self.deviation_from_mean(
            df=df,
            key_cols=dataset_config.key_cols,
            agg_col=col,
            mean_months=feature_config.deviation_from_mean_months,
        )
        agg_df = join_multiple_to_base(
            base_df=base_df,
            df_list=[agg_df, deviation_df],
            key_cols=dataset_config.key_cols,
        )

        if feature_config.calculate_deltas:
            agg_df = self._calculate_deltas(df=agg_df, key_cols=dataset_config.key_cols)

        if feature_config.calculate_percentage_change:
            agg_df = calculate_percentage_change(
                df=agg_df,
                key_cols=dataset_config.key_cols,
                round_digits=2,
                resolve_division_by_zero=feature_config.resolve_divide_by_zero,
            )
        final_df = final_df.join(agg_df, on=dataset_config.key_cols, how="left")

    return final_df

deviation_from_mean(df, key_cols, agg_col, mean_months) staticmethod

Calculate the deviation from mean for the specified columns.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
key_cols list[str]

List of key columns.

required
agg_col str

Column to aggregate.

required
mean_months list[int]

List of months to calculate the mean deviation.

required

Returns:

Type Description
DataFrame

DataFrame with the deviation from mean columns.

Source code in amee_utils/feature_generator/feature_set/period_month.py
@staticmethod
def deviation_from_mean(df: DataFrame, key_cols: list[str], agg_col: str, mean_months: list[int]) -> DataFrame:
    """
    Calculate the deviation from mean for the specified columns.

    Parameters
    ----------
    df : DataFrame
        Input DataFrame.
    key_cols : list[str]
        List of key columns.
    agg_col : str
        Column to aggregate.
    mean_months : list[int]
        List of months to calculate the mean deviation.

    Returns
    -------
    DataFrame
        DataFrame with the deviation from mean columns.
    """
    if not mean_months:
        raise ValueError("mean_months must be a non-empty list of integers.")
    CURRENT_COL = f"CURRENT_{agg_col}".upper()
    MEAN_COL = f"MEAN_{agg_col}".upper()
    base = df.select(*key_cols).distinct()

    for month in mean_months:
        FINAL_COL = f"DEVIATION_FROM_MEAN_{agg_col}_P{month}".upper()
        _df = create_period_lag_df(df=df, lag_months=month + 1, time_col="month")
        max_date = _df.agg(F.max("month")).collect()[0][0]

        comparison_df = (
            _df.filter(F.col("month") == max_date).groupBy(*key_cols).agg(F.sum(F.col(agg_col)).alias(CURRENT_COL))
        )

        monthly_totals = (
            _df.filter(F.col("month") < max_date)
            .groupBy(*key_cols, "month")
            .agg(F.sum(F.col(agg_col)).alias(MEAN_COL))
        )
        final_mean = monthly_totals.groupBy(*key_cols).agg(F.mean(F.col(MEAN_COL)).alias(MEAN_COL))
        comparison_df = comparison_df.join(final_mean, on=key_cols, how="left")
        comparison_df = comparison_df.withColumn(FINAL_COL, F.col(CURRENT_COL) - F.col(MEAN_COL)).select(
            *key_cols, FINAL_COL
        )
        base = base.join(comparison_df, on=key_cols, how="left")

    return base