Skip to content

Lagged Aggregator

Module for the consolidation of lagged aggregation.

Author: Daniel Wertheimer

LaggedAggregation

Class for performing lagged aggregation on a DataFrame.

Source code in amee_utils/feature_generator/lagged_aggregator.py
class LaggedAggregation:
    """Class for performing lagged aggregation on a DataFrame."""

    def __init__(
        self,
        periods_list: Optional[List[int]] = None,
        time_col: Optional[str] = None,
        lag_type: Optional[str] = None,
        include_col_name: Optional[bool] = True,
    ) -> None:
        """
        Initialise LaggedAggregation with the specified parameters.

        Parameters
        ----------
        periods_list : List[int]
            List of time periods to create lagged features for (months or weeks depending on lag_type).
            For backward compatibility, this was previously called months_list.
        time_col : str
            Column name representing the time period in the DataFrame.
            For backward compatibility, this was previously called month_col.
        lag_type : str
            Type of lag to apply ('single_month', 'period', 'single_week', or 'period_week').
        include_col_name : bool, optional
            Whether to include the column name in the lagged feature names (default is True).

        Raises
        ------
        KeyError
            If the provided lag_type is not valid.

        Returns
        -------
        None
        """
        # Validate required parameters (for mypy)
        if periods_list is None:
            raise ValueError("periods_list (or months_list for backward compatibility) is required")
        if time_col is None:
            raise ValueError("time_col (or month_col for backward compatibility) is required")
        if lag_type is None:
            raise ValueError("lag_type is required")

        try:
            func_pref_mapping = LAG_TYPE_MAPPING[lag_type]
            self.lag_func = func_pref_mapping["function"]
            self.prefix = func_pref_mapping["prefix"]
        except KeyError as e:
            raise KeyError(f"Invalid lag type: {lag_type}. Must be one of {list(LAG_TYPE_MAPPING.keys())}.") from e

        self.periods_list = periods_list
        self.time_col = time_col
        self.include_col_name = include_col_name

    def apply(
        self,
        df: DataFrame,
        agg_col: str,
        key_cols: List[str],
        strategies: List[AggregationStrategy],
    ) -> DataFrame:
        """
        Apply lagged aggregation to the DataFrame.

        Parameters
        ----------
        df : DataFrame
            Input DataFrame to apply the lagged aggregation to.
        agg_col : str
            Column name to aggregate.
        key_cols : List[str]
            List of key columns to group by.
        strategies : List[AggregationStrategy]
            List of aggregation strategies to apply.

        Returns
        -------
        DataFrame
            DataFrame with lagged aggregation features added.
        """
        result_df = df.select(key_cols).distinct()

        for strategy in strategies:
            func_name = strategy.__class__.__name__.replace("Aggregation", "").upper()
            include_col_name = strategy.include_col_name()
            for lag_periods in self.periods_list:
                alias_ = (
                    f"{func_name}_{agg_col.upper()}_{self.prefix}{lag_periods}"
                    if include_col_name
                    else f"{func_name}_{self.prefix}{lag_periods}"
                )
                lagged_df = self.lag_func(df, lag_periods, time_col=self.time_col)  # type: ignore
                lagged_df = strategy.aggregate(lagged_df, agg_col, key_cols)
                old_name = lagged_df.columns[-1]
                lagged_df = lagged_df.withColumnRenamed(old_name, alias_)
                result_df = result_df.join(lagged_df, on=key_cols, how="left")

        return result_df

apply(df, agg_col, key_cols, strategies)

Apply lagged aggregation to the DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame to apply the lagged aggregation to.

required
agg_col str

Column name to aggregate.

required
key_cols List[str]

List of key columns to group by.

required
strategies List[AggregationStrategy]

List of aggregation strategies to apply.

required

Returns:

Type Description
DataFrame

DataFrame with lagged aggregation features added.

Source code in amee_utils/feature_generator/lagged_aggregator.py
def apply(
    self,
    df: DataFrame,
    agg_col: str,
    key_cols: List[str],
    strategies: List[AggregationStrategy],
) -> DataFrame:
    """
    Apply lagged aggregation to the DataFrame.

    Parameters
    ----------
    df : DataFrame
        Input DataFrame to apply the lagged aggregation to.
    agg_col : str
        Column name to aggregate.
    key_cols : List[str]
        List of key columns to group by.
    strategies : List[AggregationStrategy]
        List of aggregation strategies to apply.

    Returns
    -------
    DataFrame
        DataFrame with lagged aggregation features added.
    """
    result_df = df.select(key_cols).distinct()

    for strategy in strategies:
        func_name = strategy.__class__.__name__.replace("Aggregation", "").upper()
        include_col_name = strategy.include_col_name()
        for lag_periods in self.periods_list:
            alias_ = (
                f"{func_name}_{agg_col.upper()}_{self.prefix}{lag_periods}"
                if include_col_name
                else f"{func_name}_{self.prefix}{lag_periods}"
            )
            lagged_df = self.lag_func(df, lag_periods, time_col=self.time_col)  # type: ignore
            lagged_df = strategy.aggregate(lagged_df, agg_col, key_cols)
            old_name = lagged_df.columns[-1]
            lagged_df = lagged_df.withColumnRenamed(old_name, alias_)
            result_df = result_df.join(lagged_df, on=key_cols, how="left")

    return result_df