Skip to content

Utils

Utilities for the feature_set module.

calculate_percentage_change(df, key_cols, round_digits=5, resolve_division_by_zero=False)

Calculate the percentage change for columns in the DataFrame.

Columns must be in the format: 'COLUMN_NAME_PREFIXMONTH', where MONTH is an integer.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to calculate the percentage change for.

required
key_cols list[str]

A list of key columns to identify unique records.

required
round_digits int

The number of decimal places to round the percentage change to (default is 5).

5
resolve_division_by_zero bool

Boolean indicating whether or not to resolve division by zero errors by filling NaN values with 1.

False

Returns:

Name Type Description
df DataFrame

The DataFrame with the calculated percentage change added as new columns.

Source code in amee_utils/feature_generator/feature_set/utils.py
def calculate_percentage_change(
    df: DataFrame,
    key_cols: list[str],
    round_digits: int = 5,
    resolve_division_by_zero: Optional[bool] = False,
) -> DataFrame:
    """
    Calculate the percentage change for columns in the DataFrame.

    Columns must be in the format: 'COLUMN_NAME_PREFIXMONTH', where MONTH is an integer.

    Parameters
    ----------
    df : DataFrame
        The DataFrame to calculate the percentage change for.
    key_cols : list[str]
        A list of key columns to identify unique records.
    round_digits : int
        The number of decimal places to round the percentage change to (default is 5).
    resolve_division_by_zero : bool, optional
        Boolean indicating whether or not to resolve division by zero errors by filling NaN values with 1.

    Returns
    -------
    df : DataFrame
        The DataFrame with the calculated percentage change added as new columns.
    """
    columns = [col for col in df.columns if col not in key_cols]

    column_pairs = get_pairs_from_columns(columns)
    for pair in column_pairs:
        for sorted_pair in pair.sorted_pairs:
            change_from_column = f"{pair.column_name}_{pair.prefix}{sorted_pair[0]}"
            change_to_column = f"{pair.column_name}_{pair.prefix}{sorted_pair[1]}"

            column_name_pct_change = (
                f"{pair.column_name}_PCT_CHANGE_" f"{pair.prefix}{sorted_pair[0]}_{pair.prefix}{sorted_pair[1]}"
            )

            pct_change_func = percentage_change(change_from_column, change_to_column)
            pct_change_func = F.when(
                (F.col(change_from_column) == 0) & (F.col(change_to_column) == 0),
                0,
            ).otherwise(pct_change_func)

            df = df.withColumn(
                column_name_pct_change,
                F.round(pct_change_func, round_digits).cast("double"),
            )

            if resolve_division_by_zero:
                df = resolve_division_by_zero_func(df, column_name_pct_change, change_from_column, change_to_column)

    return df

join_multiple_to_base(base_df, df_list, key_cols, join_type='left')

Join multiple DataFrames to a base DataFrame.

Parameters:

Name Type Description Default
base_df DataFrame

The base DataFrame to join the other DataFrames to.

required
df_list list of DataFrame

A list of DataFrames to join to the base DataFrame.

required
key_cols list of str

A list of key columns to join the DataFrames on.

required
join_type str

The type of join to perform (default is "left").

'left'

Returns:

Type Description
DataFrame

The base DataFrame with the other DataFrames joined to it.

Source code in amee_utils/feature_generator/feature_set/utils.py
def join_multiple_to_base(
    base_df: DataFrame,
    df_list: list[DataFrame],
    key_cols: list[str],
    join_type: str = "left",
) -> DataFrame:
    """
    Join multiple DataFrames to a base DataFrame.

    Parameters
    ----------
    base_df : DataFrame
        The base DataFrame to join the other DataFrames to.
    df_list : list of DataFrame
        A list of DataFrames to join to the base DataFrame.
    key_cols : list of str
        A list of key columns to join the DataFrames on.
    join_type : str, optional
        The type of join to perform (default is "left").

    Returns
    -------
    DataFrame
        The base DataFrame with the other DataFrames joined to it.
    """
    for df in df_list:
        base_df = base_df.join(df, on=key_cols, how=join_type)

    return base_df

percentage_change(change_from_column, change_to_column)

Calculate the percentage change between two columns in a PySpark DataFrame.

Parameters:

Name Type Description Default
change_from_column str

The name of the column to calculate the percentage change from.

required
change_to_column str

The name of the column to calculate the percentage change to.

required

Returns:

Type Description
Column

A PySpark Column object representing the percentage change between the two columns.

Source code in amee_utils/feature_generator/feature_set/utils.py
def percentage_change(change_from_column: str, change_to_column: str) -> Column:
    """
    Calculate the percentage change between two columns in a PySpark DataFrame.

    Parameters
    ----------
    change_from_column : str
        The name of the column to calculate the percentage change from.
    change_to_column : str
        The name of the column to calculate the percentage change to.

    Returns
    -------
    Column
        A PySpark Column object representing the percentage change between the two columns.
    """
    from_col = F.col(change_from_column).cast("double")
    to_col = F.col(change_to_column).cast("double")
    return (to_col - from_col) / from_col

resolve_division_by_zero_func(df, column_name_pct_change, change_from_column, change_to_column)

Resolve division by zero scenarios for a specific percentage change column in a DataFrame.

Notes

This function addresses the cases where division by zero would occur during the calculation of percentage changes between two columns. It adjusts the specified percentage change column by:

  • Setting it to None (null in Spark) if both change_from_column and change_to_column are null, indicating no data available for percentage change calculation.
  • Setting it to -1 if change_to_column is null but change_from_column is not null, indicating a scenario that could be interpreted as a 100% decrease or a special case.
  • Leaving the value as is or filling NaN values with 1 for all other cases.
  • Filling NaN with 1 specifically also deals with cases where change_to_column is not null but change_from_column is null and can be interpreted as a 100% increase

Parameters:

Name Type Description Default
df DataFrame

The DataFrame containing the columns to calculate percentage change between and to apply the resolution.

required
column_name_pct_change str

The name of the column where the calculated percentage change is stored. This column will be modified based on the resolution rules.

required
change_from_column str

The name of the column representing the initial value in the percentage change calculation.

required
change_to_column str

The name of the column representing the subsequent value in the percentage change calculation.

required

Returns:

Name Type Description
df DataFrame

The modified DataFrame with division by zero scenarios resolved in the specified percentage change column.

Source code in amee_utils/feature_generator/feature_set/utils.py
def resolve_division_by_zero_func(
    df: DataFrame,
    column_name_pct_change: str,
    change_from_column: str,
    change_to_column: str,
) -> DataFrame:
    """
    Resolve division by zero scenarios for a specific percentage change column in a DataFrame.

    Notes
    -----
    This function addresses the cases where division by zero would occur during the calculation of
    percentage changes between two columns. It adjusts the specified percentage change column by:

    - Setting it to `None` (null in Spark) if both `change_from_column` and `change_to_column` are null,
    indicating no data available for percentage change calculation.
    - Setting it to `-1` if `change_to_column` is null but `change_from_column` is not null,
    indicating a scenario that could be interpreted as a 100% decrease or a special case.
    - Leaving the value as is or filling NaN values with `1` for all other cases.
    - Filling NaN with `1` specifically also deals with cases where `change_to_column` is not null but
    `change_from_column` is null and can be interpreted as a 100% increase

    Parameters
    ----------
    df : DataFrame
        The DataFrame containing the columns to calculate percentage change between and to apply the resolution.
    column_name_pct_change : str
        The name of the column where the calculated percentage change is stored. This column will be modified
        based on the resolution rules.
    change_from_column : str
        The name of the column representing the initial value in the percentage change calculation.
    change_to_column : str
        The name of the column representing the subsequent value in the percentage change calculation.

    Returns
    -------
    df : DataFrame
        The modified DataFrame with division by zero scenarios resolved in the specified percentage change column.
    """
    df = df.fillna(1, subset=[column_name_pct_change])
    df = df.withColumn(
        column_name_pct_change,
        F.when(
            (F.col(change_from_column).isNull()) & (F.col(change_to_column).isNull()),
            None,
        )
        .when(
            (F.col(change_to_column).isNull()) & (F.col(change_from_column).isNotNull()),
            -1,
        )
        .otherwise(F.col(column_name_pct_change)),
    )
    return df