Skip to content

Writer Class

Provide utility functions for writing data.

BaseWriter

Bases: ABC, WriterMixin

Abstract base class for data writers.

Source code in amee_utils/writer.py
class BaseWriter(ABC, WriterMixin):
    """Abstract base class for data writers."""

    def __init__(self, spark: SparkSession, save_as_table: bool) -> None:
        """Initialise the BaseWriter with a SparkSession object and save_as_table flag.

        Parameters
        ----------
        spark : SparkSession
            The SparkSession object to use for executing queries.
        save_as_table : bool
            True for UCWriter, False for DeltaWriter.

        Returns
        -------
        None
        """
        self.spark = spark
        self.save_as_table = save_as_table

    @abstractmethod
    def write(self, df: DataFrame, *args, **kwargs) -> None:
        """Abstract method to be implemented by subclasses for writing data.

        Parameters
        ----------
        df : DataFrame
            The DataFrame to be saved.

        Returns
        -------
        None
        """
        raise NotImplementedError("Subclass must implement abstract method")

    @abstractmethod
    def overwrite(self, df: DataFrame, *args, **kwargs) -> None:
        """Abstract method to be implemented by subclasses for overwriting data.

        Parameters
        ----------
        df : DataFrame
            The DataFrame to be saved.

        Returns
        -------
        None
        """
        raise NotImplementedError("Subclass must implement abstract method")

    @abstractmethod
    def upsert(self, df: DataFrame, *args, **kwargs) -> None:
        """Abstract method to be implemented by subclasses for upserting data.

        Parameters
        ----------
        df : DataFrame
            The DataFrame to be saved.

        Returns
        -------
        None
        """
        raise NotImplementedError("Subclass must implement abstract method")

    @abstractmethod
    def append(self, df: DataFrame, *args, **kwargs) -> None:
        """Abstract method to be implemented by subclasses for appending data.

        Parameters
        ----------
        df : DataFrame
            The DataFrame to be saved.

        Returns
        -------
        None
        """
        raise NotImplementedError("Subclass must implement abstract method")

    @abstractmethod
    def _create_if_not_exists(self, *args, **kwargs) -> None:
        """Abstract method to be implemented by subclasses for creating a table or schema if it does not exist.

        Parameters
        ----------
        *args : Any
            Positional arguments to be passed to the method.
        **kwargs : Any
            Keyword arguments to be passed to the method.

        Returns
        -------
        None
        """
        raise NotImplementedError("Subclass must implement abstract method")

    @abstractmethod
    def _get_write_target(self, table_name: str, path: Optional[Path] = None) -> str:
        """Abstract method to be implemented by subclasses for getting the write target.

        Parameters
        ----------
        table_name : str
            The name of the table inside the container where the data will be written.
        path : Path, optional
            The path where the Delta table will be created or updated.

        Returns
        -------
        str
        """
        raise NotImplementedError("Subclass must implement abstract method")

append(df, *args, **kwargs) abstractmethod

Abstract method to be implemented by subclasses for appending data.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to be saved.

required

Returns:

Type Description
None
Source code in amee_utils/writer.py
@abstractmethod
def append(self, df: DataFrame, *args, **kwargs) -> None:
    """Abstract method to be implemented by subclasses for appending data.

    Parameters
    ----------
    df : DataFrame
        The DataFrame to be saved.

    Returns
    -------
    None
    """
    raise NotImplementedError("Subclass must implement abstract method")

overwrite(df, *args, **kwargs) abstractmethod

Abstract method to be implemented by subclasses for overwriting data.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to be saved.

required

Returns:

Type Description
None
Source code in amee_utils/writer.py
@abstractmethod
def overwrite(self, df: DataFrame, *args, **kwargs) -> None:
    """Abstract method to be implemented by subclasses for overwriting data.

    Parameters
    ----------
    df : DataFrame
        The DataFrame to be saved.

    Returns
    -------
    None
    """
    raise NotImplementedError("Subclass must implement abstract method")

upsert(df, *args, **kwargs) abstractmethod

Abstract method to be implemented by subclasses for upserting data.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to be saved.

required

Returns:

Type Description
None
Source code in amee_utils/writer.py
@abstractmethod
def upsert(self, df: DataFrame, *args, **kwargs) -> None:
    """Abstract method to be implemented by subclasses for upserting data.

    Parameters
    ----------
    df : DataFrame
        The DataFrame to be saved.

    Returns
    -------
    None
    """
    raise NotImplementedError("Subclass must implement abstract method")

write(df, *args, **kwargs) abstractmethod

Abstract method to be implemented by subclasses for writing data.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to be saved.

required

Returns:

Type Description
None
Source code in amee_utils/writer.py
@abstractmethod
def write(self, df: DataFrame, *args, **kwargs) -> None:
    """Abstract method to be implemented by subclasses for writing data.

    Parameters
    ----------
    df : DataFrame
        The DataFrame to be saved.

    Returns
    -------
    None
    """
    raise NotImplementedError("Subclass must implement abstract method")

DeltaWriter

Bases: BaseWriter

Class for Delta Table writer.

Example
# Initialise Spark session and DeltaWriter
>>> spark = SparkSession.builder.getOrCreate()
>>> container_name = "schema_container_name"
>>> table_name = "my_table"
>>> delta_writer = DeltaWriter(
...     spark=spark,
...     container_name=container_name,
... )

# Create a sample DataFrame for testing
>>> data = spark.createDataFrame(
...     [
...         (1, "John", "M", "2023-08-16"),
...         (2, "Alice", "F", "2023-08-16")
...     ],
...     ["id", "name", "gender", "date"],
... )
>>> path = "path/to/save/data"

# Use write method to save the DataFrame
>>> delta_writer.write(
...     df=data,
...     path=path,
...     table_name=table_name,
... )

# Use overwrite method to overwrite existing data at the same location
>>> updated_data = spark.createDataFrame(
...     [
...         (1, "John", "M", "2023-08-17"),
...         (2, "Alice", "F", "2023-08-17")
...     ],
...     ["id", "name", "gender", "date"],
... )
>>> delta_writer.overwrite(
...     df=updated_data,
...     path=path,
...     table_name=table_name,
... )

# Use write method to save the DataFrame with a unique index
>>> delta_writer.write(
...     df=data,
...     path=path,
...     table_name=table_name,
...     unique_index=True,
...     id_column_name="unique_id",
...     unique_identifiers=["date", "id", "name"],
... )

# Use overwrite method to overwrite existing data at the same location with a unique index
>>> updated_data_with_unique = spark.createDataFrame(
...     [
...         (2, "Eve", "F", "2023-08-18"),
...         (3, "Bob", "M", "2023-08-18")
...     ],
...     ["id", "name", "gender", "date"],
... )
>>> delta_writer.overwrite(
...     df=updated_data_with_unique,
...     path=path,
...     table_name=table_name,
...     unique_index=True,
...     id_column_name="unique_id",
...     unique_identifiers=["date", "id", "name"],
... )


# Use upsert method to update or insert new data into the existing Delta table
>>> updated_data = spark.createDataFrame(
...     [
...         (4, "Sam", "M", "2023-08-19")
...     ],
...     ["id", "name", "gender", "date"],
... )
>>> delta_writer.upsert(
...     df=updated_data,
...     path=path,
...     table_name=table_name,
...     id_column_name="unique_id",
...     unique_identifiers=["date", "id", "name"],
... )

# Append new data to the existing table
>>> append_data = [("Frank", 35), ("Grace", 28)]
>>> append_df = spark.createDataFrame(append_data, df_schema)
>>> delta_writer.append(
...     df=append_df,
...     path=path,
...     table_name=table_name,
... )
Source code in amee_utils/writer.py
class DeltaWriter(BaseWriter):
    """Class for Delta Table writer.

    Example
    -------
    ```python
    # Initialise Spark session and DeltaWriter
    >>> spark = SparkSession.builder.getOrCreate()
    >>> container_name = "schema_container_name"
    >>> table_name = "my_table"
    >>> delta_writer = DeltaWriter(
    ...     spark=spark,
    ...     container_name=container_name,
    ... )

    # Create a sample DataFrame for testing
    >>> data = spark.createDataFrame(
    ...     [
    ...         (1, "John", "M", "2023-08-16"),
    ...         (2, "Alice", "F", "2023-08-16")
    ...     ],
    ...     ["id", "name", "gender", "date"],
    ... )
    >>> path = "path/to/save/data"

    # Use write method to save the DataFrame
    >>> delta_writer.write(
    ...     df=data,
    ...     path=path,
    ...     table_name=table_name,
    ... )

    # Use overwrite method to overwrite existing data at the same location
    >>> updated_data = spark.createDataFrame(
    ...     [
    ...         (1, "John", "M", "2023-08-17"),
    ...         (2, "Alice", "F", "2023-08-17")
    ...     ],
    ...     ["id", "name", "gender", "date"],
    ... )
    >>> delta_writer.overwrite(
    ...     df=updated_data,
    ...     path=path,
    ...     table_name=table_name,
    ... )

    # Use write method to save the DataFrame with a unique index
    >>> delta_writer.write(
    ...     df=data,
    ...     path=path,
    ...     table_name=table_name,
    ...     unique_index=True,
    ...     id_column_name="unique_id",
    ...     unique_identifiers=["date", "id", "name"],
    ... )

    # Use overwrite method to overwrite existing data at the same location with a unique index
    >>> updated_data_with_unique = spark.createDataFrame(
    ...     [
    ...         (2, "Eve", "F", "2023-08-18"),
    ...         (3, "Bob", "M", "2023-08-18")
    ...     ],
    ...     ["id", "name", "gender", "date"],
    ... )
    >>> delta_writer.overwrite(
    ...     df=updated_data_with_unique,
    ...     path=path,
    ...     table_name=table_name,
    ...     unique_index=True,
    ...     id_column_name="unique_id",
    ...     unique_identifiers=["date", "id", "name"],
    ... )


    # Use upsert method to update or insert new data into the existing Delta table
    >>> updated_data = spark.createDataFrame(
    ...     [
    ...         (4, "Sam", "M", "2023-08-19")
    ...     ],
    ...     ["id", "name", "gender", "date"],
    ... )
    >>> delta_writer.upsert(
    ...     df=updated_data,
    ...     path=path,
    ...     table_name=table_name,
    ...     id_column_name="unique_id",
    ...     unique_identifiers=["date", "id", "name"],
    ... )

    # Append new data to the existing table
    >>> append_data = [("Frank", 35), ("Grace", 28)]
    >>> append_df = spark.createDataFrame(append_data, df_schema)
    >>> delta_writer.append(
    ...     df=append_df,
    ...     path=path,
    ...     table_name=table_name,
    ... )
    ```
    """

    def __init__(
        self,
        spark: SparkSession,
        container_name: str,
    ) -> None:
        """Initialise the DeltaWriter.

        Parameters
        ----------
        spark : SparkSession
            The SparkSession object to use for executing queries.
        container_name : str
            The name of the container being written to.

        Returns
        -------
        None
        """
        super().__init__(spark, save_as_table=False)
        self.container_name = container_name

    def write(
        self,
        df: DataFrame,
        path: Union[str, Path],
        table_name: str,
        unique_index: bool = False,
        id_column_name: Optional[str] = None,
        unique_identifiers: Optional[List[str]] = None,
        partition_col: Optional[str] = None,
    ) -> None:
        """Write the data to the specified container and table with mode 'errorifexists'.

        Parameters
        ----------
        df : DataFrame
            The DataFrame to be saved.
        path : str
            The location to save the DataFrame.
        table_name : str
            The name of the table inside the container where the data will be written.
        unique_index : bool, optional
            If True, a unique index column will be added to the DataFrame.
        id_column_name : str, optional
            The name of the column to store the unique index.
        unique_identifiers : List[str], optional
            A list of column names to be concatenated for generating the unique index.

        Returns
        -------
        None
        """
        path = Path(path)
        try:
            self._write_helper(
                df=df,
                table_name=table_name,
                mode=WriteMode.ERROR,
                save_as_table=self.save_as_table,
                unique_index=unique_index,
                id_column_name=id_column_name,
                unique_identifiers=unique_identifiers,
                partition_col=partition_col,
                path=path,
            )
        except AnalysisException as e:
            new_exception = AnalysisException(f"{e}: Use DeltaWriter.overwrite() instead.")
            raise new_exception from e

    def overwrite(
        self,
        df: DataFrame,
        path: Union[str, Path],
        table_name: str,
        unique_index: bool = False,
        id_column_name: Optional[str] = None,
        unique_identifiers: Optional[List[str]] = None,
        partition_col: Optional[str] = None,
    ) -> None:
        """Write the data to the specified container and table with mode 'overwrite'.

        Parameters
        ----------
        df : DataFrame
            The DataFrame containing the data to be merged or inserted with a unique index column.
        path : str
            The path where the Delta table will be created or updated.
        table_name : str
            The name of the table inside the container where the data will be written.
        unique_index : bool, optional
            If True, a unique index column will be added to the DataFrame.
        id_column_name : str
            The name of the column to store the unique index.
        unique_identifiers : List[str]
            A list of column names to be concatenated for generating the unique index.

        Returns
        -------
        None
        """
        path = Path(path)
        self._write_helper(
            df=df,
            path=path,
            table_name=table_name,
            mode=WriteMode.OVERWRITE,
            save_as_table=self.save_as_table,
            unique_index=unique_index,
            id_column_name=id_column_name,
            unique_identifiers=unique_identifiers,
            partition_col=partition_col,
        )

    def upsert(
        self,
        df: DataFrame,
        path: Union[str, Path],
        id_column_name: str,
        unique_identifiers: List[str],
        table_name: str,
    ) -> None:
        """Update or create a Delta table at the specified output path using the given DataFrame.

        This performs the equivalent of an upsert by updating exact matches and inserting unmatched rows.

        Parameters
        ----------
        df : DataFrame
            The DataFrame containing the data to be merged or inserted with a unique index column.
        path : str
            The path where the Delta table will be created or updated.
        id_column_name : str
            The name of the column to store the unique index.
        unique_identifiers : List[str]
            A list of column names to be concatenated for generating the unique index.
        table_name : str
            The name of the table inside the container where the data will be written.

        Returns
        -------
        None
        """
        path = Path(path)
        indexed_df = self._unique_indexer(
            df=df,
            id_column_name=id_column_name,
            unique_identifiers=unique_identifiers,
        )
        export_file_location = self._get_write_target(table_name, path)

        try:
            deltadf = DeltaTable.forPath(self.spark, export_file_location)

            (
                deltadf.alias("data")
                .merge(
                    indexed_df.alias("newData"),
                    f"data.{id_column_name} = newData.{id_column_name}",
                )
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
                .execute()
            )

        except AnalysisException as e:
            if f"`{export_file_location}` is not a Delta table." in str(e):
                self._write_helper(
                    df=indexed_df,
                    path=path,
                    table_name=table_name,
                    mode=WriteMode.ERROR,
                    save_as_table=self.save_as_table,
                    unique_index=True,
                    id_column_name=id_column_name,
                    unique_identifiers=unique_identifiers,
                )
                logger.info("Creating new table with unique index")
            else:
                logger.error(e)

    def append(
        self,
        df: DataFrame,
        path: Union[str, Path],
        table_name: str,
        unique_index: bool = False,
        id_column_name: Optional[str] = None,
        unique_identifiers: Optional[List[str]] = None,
        partition_col: Optional[str] = None,
    ) -> None:
        """Write the data to the specified container and table with mode 'append'.

        Parameters
        ----------
        df : DataFrame
            The DataFrame containing the data to be appended to the existing data.
        path : str
            The path where the Delta table will be created or updated.
        table_name : str
            The name of the table inside the container where the data will be written.
        unique_index : bool, optional
            If True, a unique index column will be added to the DataFrame.
        id_column_name : str
            The name of the column to store the unique index.
        unique_identifiers : List[str]
            A list of column names to be concatenated for generating the unique index.

        Returns
        -------
        None
        """
        path = Path(path)
        self._write_helper(
            df=df,
            path=path,
            table_name=table_name,
            mode=WriteMode.APPEND,
            save_as_table=self.save_as_table,
            unique_index=unique_index,
            id_column_name=id_column_name,
            unique_identifiers=unique_identifiers,
            partition_col=partition_col,
        )

    def _create_if_not_exists(
        self,
        path: Path,
        table_name: str,
    ) -> None:
        """Create or update table.

        Parameters
        ----------
        path : Path
            The path where the Delta table will be created or updated.
        table_name : str
            The name of the table inside the container where the data will be written.

        Returns
        -------
        None
        """
        self.spark.sql(f"CREATE SCHEMA IF NOT EXISTS {self.container_name}")
        self.spark.sql(
            f"""
            CREATE TABLE IF NOT EXISTS {self.container_name}.{table_name}
            USING DELTA LOCATION '{str(path)}'
            """
        )

    def _get_write_target(self, table_name: str, path: Optional[Path] = None) -> str:
        if path:
            return str(path / table_name)
        else:
            logger.error("Path is required to write using DeltaWriter.")
            raise ValueError("Path is required to write using DeltaWriter.")

append(df, path, table_name, unique_index=False, id_column_name=None, unique_identifiers=None, partition_col=None)

Write the data to the specified container and table with mode 'append'.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame containing the data to be appended to the existing data.

required
path str

The path where the Delta table will be created or updated.

required
table_name str

The name of the table inside the container where the data will be written.

required
unique_index bool

If True, a unique index column will be added to the DataFrame.

False
id_column_name str

The name of the column to store the unique index.

None
unique_identifiers List[str]

A list of column names to be concatenated for generating the unique index.

None

Returns:

Type Description
None
Source code in amee_utils/writer.py
def append(
    self,
    df: DataFrame,
    path: Union[str, Path],
    table_name: str,
    unique_index: bool = False,
    id_column_name: Optional[str] = None,
    unique_identifiers: Optional[List[str]] = None,
    partition_col: Optional[str] = None,
) -> None:
    """Write the data to the specified container and table with mode 'append'.

    Parameters
    ----------
    df : DataFrame
        The DataFrame containing the data to be appended to the existing data.
    path : str
        The path where the Delta table will be created or updated.
    table_name : str
        The name of the table inside the container where the data will be written.
    unique_index : bool, optional
        If True, a unique index column will be added to the DataFrame.
    id_column_name : str
        The name of the column to store the unique index.
    unique_identifiers : List[str]
        A list of column names to be concatenated for generating the unique index.

    Returns
    -------
    None
    """
    path = Path(path)
    self._write_helper(
        df=df,
        path=path,
        table_name=table_name,
        mode=WriteMode.APPEND,
        save_as_table=self.save_as_table,
        unique_index=unique_index,
        id_column_name=id_column_name,
        unique_identifiers=unique_identifiers,
        partition_col=partition_col,
    )

overwrite(df, path, table_name, unique_index=False, id_column_name=None, unique_identifiers=None, partition_col=None)

Write the data to the specified container and table with mode 'overwrite'.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame containing the data to be merged or inserted with a unique index column.

required
path str

The path where the Delta table will be created or updated.

required
table_name str

The name of the table inside the container where the data will be written.

required
unique_index bool

If True, a unique index column will be added to the DataFrame.

False
id_column_name str

The name of the column to store the unique index.

None
unique_identifiers List[str]

A list of column names to be concatenated for generating the unique index.

None

Returns:

Type Description
None
Source code in amee_utils/writer.py
def overwrite(
    self,
    df: DataFrame,
    path: Union[str, Path],
    table_name: str,
    unique_index: bool = False,
    id_column_name: Optional[str] = None,
    unique_identifiers: Optional[List[str]] = None,
    partition_col: Optional[str] = None,
) -> None:
    """Write the data to the specified container and table with mode 'overwrite'.

    Parameters
    ----------
    df : DataFrame
        The DataFrame containing the data to be merged or inserted with a unique index column.
    path : str
        The path where the Delta table will be created or updated.
    table_name : str
        The name of the table inside the container where the data will be written.
    unique_index : bool, optional
        If True, a unique index column will be added to the DataFrame.
    id_column_name : str
        The name of the column to store the unique index.
    unique_identifiers : List[str]
        A list of column names to be concatenated for generating the unique index.

    Returns
    -------
    None
    """
    path = Path(path)
    self._write_helper(
        df=df,
        path=path,
        table_name=table_name,
        mode=WriteMode.OVERWRITE,
        save_as_table=self.save_as_table,
        unique_index=unique_index,
        id_column_name=id_column_name,
        unique_identifiers=unique_identifiers,
        partition_col=partition_col,
    )

upsert(df, path, id_column_name, unique_identifiers, table_name)

Update or create a Delta table at the specified output path using the given DataFrame.

This performs the equivalent of an upsert by updating exact matches and inserting unmatched rows.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame containing the data to be merged or inserted with a unique index column.

required
path str

The path where the Delta table will be created or updated.

required
id_column_name str

The name of the column to store the unique index.

required
unique_identifiers List[str]

A list of column names to be concatenated for generating the unique index.

required
table_name str

The name of the table inside the container where the data will be written.

required

Returns:

Type Description
None
Source code in amee_utils/writer.py
def upsert(
    self,
    df: DataFrame,
    path: Union[str, Path],
    id_column_name: str,
    unique_identifiers: List[str],
    table_name: str,
) -> None:
    """Update or create a Delta table at the specified output path using the given DataFrame.

    This performs the equivalent of an upsert by updating exact matches and inserting unmatched rows.

    Parameters
    ----------
    df : DataFrame
        The DataFrame containing the data to be merged or inserted with a unique index column.
    path : str
        The path where the Delta table will be created or updated.
    id_column_name : str
        The name of the column to store the unique index.
    unique_identifiers : List[str]
        A list of column names to be concatenated for generating the unique index.
    table_name : str
        The name of the table inside the container where the data will be written.

    Returns
    -------
    None
    """
    path = Path(path)
    indexed_df = self._unique_indexer(
        df=df,
        id_column_name=id_column_name,
        unique_identifiers=unique_identifiers,
    )
    export_file_location = self._get_write_target(table_name, path)

    try:
        deltadf = DeltaTable.forPath(self.spark, export_file_location)

        (
            deltadf.alias("data")
            .merge(
                indexed_df.alias("newData"),
                f"data.{id_column_name} = newData.{id_column_name}",
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )

    except AnalysisException as e:
        if f"`{export_file_location}` is not a Delta table." in str(e):
            self._write_helper(
                df=indexed_df,
                path=path,
                table_name=table_name,
                mode=WriteMode.ERROR,
                save_as_table=self.save_as_table,
                unique_index=True,
                id_column_name=id_column_name,
                unique_identifiers=unique_identifiers,
            )
            logger.info("Creating new table with unique index")
        else:
            logger.error(e)

write(df, path, table_name, unique_index=False, id_column_name=None, unique_identifiers=None, partition_col=None)

Write the data to the specified container and table with mode 'errorifexists'.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to be saved.

required
path str

The location to save the DataFrame.

required
table_name str

The name of the table inside the container where the data will be written.

required
unique_index bool

If True, a unique index column will be added to the DataFrame.

False
id_column_name str

The name of the column to store the unique index.

None
unique_identifiers List[str]

A list of column names to be concatenated for generating the unique index.

None

Returns:

Type Description
None
Source code in amee_utils/writer.py
def write(
    self,
    df: DataFrame,
    path: Union[str, Path],
    table_name: str,
    unique_index: bool = False,
    id_column_name: Optional[str] = None,
    unique_identifiers: Optional[List[str]] = None,
    partition_col: Optional[str] = None,
) -> None:
    """Write the data to the specified container and table with mode 'errorifexists'.

    Parameters
    ----------
    df : DataFrame
        The DataFrame to be saved.
    path : str
        The location to save the DataFrame.
    table_name : str
        The name of the table inside the container where the data will be written.
    unique_index : bool, optional
        If True, a unique index column will be added to the DataFrame.
    id_column_name : str, optional
        The name of the column to store the unique index.
    unique_identifiers : List[str], optional
        A list of column names to be concatenated for generating the unique index.

    Returns
    -------
    None
    """
    path = Path(path)
    try:
        self._write_helper(
            df=df,
            table_name=table_name,
            mode=WriteMode.ERROR,
            save_as_table=self.save_as_table,
            unique_index=unique_index,
            id_column_name=id_column_name,
            unique_identifiers=unique_identifiers,
            partition_col=partition_col,
            path=path,
        )
    except AnalysisException as e:
        new_exception = AnalysisException(f"{e}: Use DeltaWriter.overwrite() instead.")
        raise new_exception from e

UCWriter

Bases: BaseWriter

Class for writing to Unity Catalog tables with specified catalog and schema.

Supports writing, upserting, and overwriting data with optional unique indexing.

The catalog and schema are specified during initialisation, and the table name is provided when writing data. This allows for writing multiple tables to the same catalog and schema without re-initialising the writer.

Example
>>> uc_writer = UCWriter(spark=spark, catalog_name="main_catalog", schema_name="default_schema")

# Sample DataFrame
>>> data = spark.createDataFrame(
...     [("John", 28), ("Alice", 24)],
...     ["name", "age"]
... )
>>> uc_writer.write(df=data, table_name="my_table")

# Overwrite existing data
>>> overwrite_data = [("Lucy", 28), ("Mike", 38), ("Emma", 33)]
>>> overwrite_df = spark.createDataFrame(overwrite_data, df_schema)
>>> uc_writer.overwrite(df=overwrite_df, table_name="my_table")

# Create table with a unique index on 'name' and 'age'
>>> unique_data = [("John", 28), ("Alice", 24), ("Bob", 35), ("Eve", 40), ("Sam", 30)]
>>> unique_df = spark.createDataFrame(unique_data, df_schema)
>>> uc_writer.write(
...     df=unique_df,
...     table_name="my_table_with_unique_index",
...     unique_index=True,
...     id_column_name="unique_id",
...     unique_identifiers=["name", "age"]
... )

# Upsert new data based on the 'name' and 'age' combination
>>> upsert_data = [("John", 29), ("Alice", 24), ("Sam", 32), ("Tom", 40)]
>>> upsert_df = spark.createDataFrame(upsert_data, df_schema)
>>> uc_writer.upsert(
...     df=upsert_df,
...     table_name="my_table_with_unique_index",
...     id_column_name="unique_id",
...     unique_identifiers=["name", "age"]
... )

# Append new data to the existing table
>>> append_data = [("Frank", 35), ("Grace", 28)]
>>> append_df = spark.createDataFrame(append_data, df_schema)
>>> uc_writer.append(
...     df=append_df,
...     table_name="my_table",
... )
Source code in amee_utils/writer.py
class UCWriter(BaseWriter):
    """Class for writing to Unity Catalog tables with specified catalog and schema.

    Supports writing, upserting, and overwriting data with optional unique indexing.

    The catalog and schema are specified during initialisation, and the table name is provided when writing data.
    This allows for writing multiple tables to the same catalog and schema without re-initialising the writer.

    Example
    -------
    ```python
    >>> uc_writer = UCWriter(spark=spark, catalog_name="main_catalog", schema_name="default_schema")

    # Sample DataFrame
    >>> data = spark.createDataFrame(
    ...     [("John", 28), ("Alice", 24)],
    ...     ["name", "age"]
    ... )
    >>> uc_writer.write(df=data, table_name="my_table")

    # Overwrite existing data
    >>> overwrite_data = [("Lucy", 28), ("Mike", 38), ("Emma", 33)]
    >>> overwrite_df = spark.createDataFrame(overwrite_data, df_schema)
    >>> uc_writer.overwrite(df=overwrite_df, table_name="my_table")

    # Create table with a unique index on 'name' and 'age'
    >>> unique_data = [("John", 28), ("Alice", 24), ("Bob", 35), ("Eve", 40), ("Sam", 30)]
    >>> unique_df = spark.createDataFrame(unique_data, df_schema)
    >>> uc_writer.write(
    ...     df=unique_df,
    ...     table_name="my_table_with_unique_index",
    ...     unique_index=True,
    ...     id_column_name="unique_id",
    ...     unique_identifiers=["name", "age"]
    ... )

    # Upsert new data based on the 'name' and 'age' combination
    >>> upsert_data = [("John", 29), ("Alice", 24), ("Sam", 32), ("Tom", 40)]
    >>> upsert_df = spark.createDataFrame(upsert_data, df_schema)
    >>> uc_writer.upsert(
    ...     df=upsert_df,
    ...     table_name="my_table_with_unique_index",
    ...     id_column_name="unique_id",
    ...     unique_identifiers=["name", "age"]
    ... )

    # Append new data to the existing table
    >>> append_data = [("Frank", 35), ("Grace", 28)]
    >>> append_df = spark.createDataFrame(append_data, df_schema)
    >>> uc_writer.append(
    ...     df=append_df,
    ...     table_name="my_table",
    ... )
    ```
    """

    def __init__(
        self,
        spark: SparkSession,
        catalog_name: str,
        schema_name: str,
    ) -> None:
        """Initialise the UCWriter with catalog and schema.

        Parameters
        ----------
        spark : SparkSession
            The SparkSession object to use for executing queries.
        catalog_name : str
            The name of the catalog where the tables are located.
        schema_name : str
            The name of the schema inside the catalog where the tables are located.

        Returns
        -------
        None
        """
        super().__init__(spark, save_as_table=True)
        self.catalog_name = catalog_name
        self.schema_name = schema_name

    def _create_if_not_exists(self) -> None:
        """Create the schema in Unity Catalog if it does not already exist."""
        try:
            create_schema_sql = f"""
            CREATE SCHEMA IF NOT EXISTS {self.catalog_name}.{self.schema_name}
            """
            self.spark.sql(create_schema_sql)
            logger.info(f"Schema {self.catalog_name}.{self.schema_name} created or already exists.")
        except AnalysisException as e:
            raise AnalysisException(f"Error creating schema: {e}") from e

    def write(
        self,
        df: DataFrame,
        table_name: str,
        unique_index: bool = False,
        id_column_name: Optional[str] = None,
        unique_identifiers: Optional[List[str]] = None,
        partition_col: Optional[Union[str, List[str]]] = None,
    ) -> None:
        """Write the data to Unity Catalog with mode 'errorifexists'.

        Parameters
        ----------
        df : DataFrame
            The DataFrame to be saved.
        table_name : str
            The name of the table (without catalog and schema).
        unique_index : bool, optional
            If True, a unique index column will be added.
        id_column_name : str, optional
            Name of the column to store the unique index.
        unique_identifiers : List[str], optional
            Column names used to generate the unique index.
        partition_col : Optional[Union[str, List[str]]], optional
            Column(s) to partition the data on.

        Raises
        ------
        AnalysisException
            If the table already exists, suggesting to use `overwrite` instead.

        Returns
        -------
        None
        """
        try:
            self._write_helper(
                df=df,
                table_name=table_name,
                mode=WriteMode.ERROR,
                save_as_table=self.save_as_table,
                unique_index=unique_index,
                id_column_name=id_column_name,
                unique_identifiers=unique_identifiers,
                partition_col=partition_col,
            )
        except AnalysisException as e:
            raise AnalysisException(f"{e}: Use UCWriter.overwrite() instead.") from e

    def overwrite(
        self,
        df: DataFrame,
        table_name: str,
        unique_index: bool = False,
        id_column_name: Optional[str] = None,
        unique_identifiers: Optional[List[str]] = None,
        partition_col: Optional[Union[str, List[str]]] = None,
    ) -> None:
        """Overwrite the data in Unity Catalog.

        Parameters
        ----------
        df : DataFrame
            The DataFrame to be saved.
        table_name : str
            The name of the table (without catalog and schema).
        unique_index : bool, optional
            If True, a unique index column will be added.
        id_column_name : str, optional
            Name of the column to store the unique index.
        unique_identifiers : List[str], optional
            Column names used to generate the unique index.
        partition_col : Optional[Union[str, List[str]]], optional
            Column(s) to partition the data on.

        Returns
        -------
        None
        """
        self._write_helper(
            df=df,
            table_name=table_name,
            mode=WriteMode.OVERWRITE,
            save_as_table=self.save_as_table,
            unique_index=unique_index,
            id_column_name=id_column_name,
            unique_identifiers=unique_identifiers,
            partition_col=partition_col,
        )

    def upsert(
        self,
        df: DataFrame,
        table_name: str,
        id_column_name: str,
        unique_identifiers: List[str],
    ) -> None:
        """Upsert the data in Unity Catalog.

        Parameters
        ----------
        df : DataFrame
            The DataFrame containing data to merge or insert.
        table_name : str
            The name of the table (without catalog and schema).
        id_column_name : str
            The name of the column to store the unique index.
        unique_identifiers : List[str]
            Column names used to generate the unique index.

        Returns
        -------
        None
        """
        full_table_name = self._get_write_target(table_name)
        indexed_df = self._unique_indexer(
            df=df,
            id_column_name=id_column_name,
            unique_identifiers=unique_identifiers,
        )

        try:
            deltadf = DeltaTable.forName(self.spark, full_table_name)

            deltadf.alias("data").merge(
                indexed_df.alias("newData"),
                f"data.{id_column_name} = newData.{id_column_name}",
            ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

        except AnalysisException as e:
            if "is not a Delta table" in str(e):
                self.write(
                    df=indexed_df,
                    table_name=table_name,
                    unique_index=True,
                    id_column_name=id_column_name,
                    unique_identifiers=unique_identifiers,
                )
                logger.info(f"Created new table with unique index: {full_table_name}")
            else:
                logger.error(e)

    def append(
        self,
        df: DataFrame,
        table_name: str,
        unique_index: bool = False,
        id_column_name: Optional[str] = None,
        unique_identifiers: Optional[List[str]] = None,
        partition_col: Optional[Union[str, List[str]]] = None,
    ) -> None:
        """Append the data to the existing data in Unity Catalog.

        Parameters
        ----------
        df : DataFrame
            The DataFrame to be saved.
        table_name : str
            The name of the table (without catalog and schema).
        unique_index : bool, optional
            If True, a unique index column will be added.
        id_column_name : str, optional
            Name of the column to store the unique index.
        unique_identifiers : List[str], optional
            Column names used to generate the unique index.
        partition_col : Optional[Union[str, List[str]]], optional
            Column(s) to partition the data on.

        Returns
        -------
        None

        Notes
        -----
        Interesting fact - this method was added after Bryce almost broke Databricks by
        upserting millions of rows in batches through a for loop.
        """
        self._write_helper(
            df=df,
            table_name=table_name,
            mode=WriteMode.APPEND,
            save_as_table=self.save_as_table,
            unique_index=unique_index,
            id_column_name=id_column_name,
            unique_identifiers=unique_identifiers,
            partition_col=partition_col,
        )

    def _get_write_target(self, table_name: str, path: Optional[Path] = None) -> str:
        return f"{self.catalog_name}.{self.schema_name}.{table_name}"

append(df, table_name, unique_index=False, id_column_name=None, unique_identifiers=None, partition_col=None)

Append the data to the existing data in Unity Catalog.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to be saved.

required
table_name str

The name of the table (without catalog and schema).

required
unique_index bool

If True, a unique index column will be added.

False
id_column_name str

Name of the column to store the unique index.

None
unique_identifiers List[str]

Column names used to generate the unique index.

None
partition_col Optional[Union[str, List[str]]]

Column(s) to partition the data on.

None

Returns:

Type Description
None
Notes

Interesting fact - this method was added after Bryce almost broke Databricks by upserting millions of rows in batches through a for loop.

Source code in amee_utils/writer.py
def append(
    self,
    df: DataFrame,
    table_name: str,
    unique_index: bool = False,
    id_column_name: Optional[str] = None,
    unique_identifiers: Optional[List[str]] = None,
    partition_col: Optional[Union[str, List[str]]] = None,
) -> None:
    """Append the data to the existing data in Unity Catalog.

    Parameters
    ----------
    df : DataFrame
        The DataFrame to be saved.
    table_name : str
        The name of the table (without catalog and schema).
    unique_index : bool, optional
        If True, a unique index column will be added.
    id_column_name : str, optional
        Name of the column to store the unique index.
    unique_identifiers : List[str], optional
        Column names used to generate the unique index.
    partition_col : Optional[Union[str, List[str]]], optional
        Column(s) to partition the data on.

    Returns
    -------
    None

    Notes
    -----
    Interesting fact - this method was added after Bryce almost broke Databricks by
    upserting millions of rows in batches through a for loop.
    """
    self._write_helper(
        df=df,
        table_name=table_name,
        mode=WriteMode.APPEND,
        save_as_table=self.save_as_table,
        unique_index=unique_index,
        id_column_name=id_column_name,
        unique_identifiers=unique_identifiers,
        partition_col=partition_col,
    )

overwrite(df, table_name, unique_index=False, id_column_name=None, unique_identifiers=None, partition_col=None)

Overwrite the data in Unity Catalog.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to be saved.

required
table_name str

The name of the table (without catalog and schema).

required
unique_index bool

If True, a unique index column will be added.

False
id_column_name str

Name of the column to store the unique index.

None
unique_identifiers List[str]

Column names used to generate the unique index.

None
partition_col Optional[Union[str, List[str]]]

Column(s) to partition the data on.

None

Returns:

Type Description
None
Source code in amee_utils/writer.py
def overwrite(
    self,
    df: DataFrame,
    table_name: str,
    unique_index: bool = False,
    id_column_name: Optional[str] = None,
    unique_identifiers: Optional[List[str]] = None,
    partition_col: Optional[Union[str, List[str]]] = None,
) -> None:
    """Overwrite the data in Unity Catalog.

    Parameters
    ----------
    df : DataFrame
        The DataFrame to be saved.
    table_name : str
        The name of the table (without catalog and schema).
    unique_index : bool, optional
        If True, a unique index column will be added.
    id_column_name : str, optional
        Name of the column to store the unique index.
    unique_identifiers : List[str], optional
        Column names used to generate the unique index.
    partition_col : Optional[Union[str, List[str]]], optional
        Column(s) to partition the data on.

    Returns
    -------
    None
    """
    self._write_helper(
        df=df,
        table_name=table_name,
        mode=WriteMode.OVERWRITE,
        save_as_table=self.save_as_table,
        unique_index=unique_index,
        id_column_name=id_column_name,
        unique_identifiers=unique_identifiers,
        partition_col=partition_col,
    )

upsert(df, table_name, id_column_name, unique_identifiers)

Upsert the data in Unity Catalog.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame containing data to merge or insert.

required
table_name str

The name of the table (without catalog and schema).

required
id_column_name str

The name of the column to store the unique index.

required
unique_identifiers List[str]

Column names used to generate the unique index.

required

Returns:

Type Description
None
Source code in amee_utils/writer.py
def upsert(
    self,
    df: DataFrame,
    table_name: str,
    id_column_name: str,
    unique_identifiers: List[str],
) -> None:
    """Upsert the data in Unity Catalog.

    Parameters
    ----------
    df : DataFrame
        The DataFrame containing data to merge or insert.
    table_name : str
        The name of the table (without catalog and schema).
    id_column_name : str
        The name of the column to store the unique index.
    unique_identifiers : List[str]
        Column names used to generate the unique index.

    Returns
    -------
    None
    """
    full_table_name = self._get_write_target(table_name)
    indexed_df = self._unique_indexer(
        df=df,
        id_column_name=id_column_name,
        unique_identifiers=unique_identifiers,
    )

    try:
        deltadf = DeltaTable.forName(self.spark, full_table_name)

        deltadf.alias("data").merge(
            indexed_df.alias("newData"),
            f"data.{id_column_name} = newData.{id_column_name}",
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

    except AnalysisException as e:
        if "is not a Delta table" in str(e):
            self.write(
                df=indexed_df,
                table_name=table_name,
                unique_index=True,
                id_column_name=id_column_name,
                unique_identifiers=unique_identifiers,
            )
            logger.info(f"Created new table with unique index: {full_table_name}")
        else:
            logger.error(e)

write(df, table_name, unique_index=False, id_column_name=None, unique_identifiers=None, partition_col=None)

Write the data to Unity Catalog with mode 'errorifexists'.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to be saved.

required
table_name str

The name of the table (without catalog and schema).

required
unique_index bool

If True, a unique index column will be added.

False
id_column_name str

Name of the column to store the unique index.

None
unique_identifiers List[str]

Column names used to generate the unique index.

None
partition_col Optional[Union[str, List[str]]]

Column(s) to partition the data on.

None

Raises:

Type Description
AnalysisException

If the table already exists, suggesting to use overwrite instead.

Returns:

Type Description
None
Source code in amee_utils/writer.py
def write(
    self,
    df: DataFrame,
    table_name: str,
    unique_index: bool = False,
    id_column_name: Optional[str] = None,
    unique_identifiers: Optional[List[str]] = None,
    partition_col: Optional[Union[str, List[str]]] = None,
) -> None:
    """Write the data to Unity Catalog with mode 'errorifexists'.

    Parameters
    ----------
    df : DataFrame
        The DataFrame to be saved.
    table_name : str
        The name of the table (without catalog and schema).
    unique_index : bool, optional
        If True, a unique index column will be added.
    id_column_name : str, optional
        Name of the column to store the unique index.
    unique_identifiers : List[str], optional
        Column names used to generate the unique index.
    partition_col : Optional[Union[str, List[str]]], optional
        Column(s) to partition the data on.

    Raises
    ------
    AnalysisException
        If the table already exists, suggesting to use `overwrite` instead.

    Returns
    -------
    None
    """
    try:
        self._write_helper(
            df=df,
            table_name=table_name,
            mode=WriteMode.ERROR,
            save_as_table=self.save_as_table,
            unique_index=unique_index,
            id_column_name=id_column_name,
            unique_identifiers=unique_identifiers,
            partition_col=partition_col,
        )
    except AnalysisException as e:
        raise AnalysisException(f"{e}: Use UCWriter.overwrite() instead.") from e

WriteMode

Bases: StrEnum

Enumeration for specifying write modes in data writing operations.

This enumeration defines two possible write modes for data writing operations in the context of a DeltaWriter. The write modes control how data is handled when writing to a storage location that already contains data.

Attributes:

Name Type Description
OVERWRITE str

Specifies the overwrite write mode. If data already exists in the target storage location, it will be replaced by the new data being written.

ERROR str

Specifies the error write mode. If data already exists in the target storage location, an error will be raised, preventing data overwrite.

APPEND str

Specifies the append write mode. If data already exists in the target storage location, the new data will be appended to the existing data.

Source code in amee_utils/writer.py
class WriteMode(StrEnum):
    """
    Enumeration for specifying write modes in data writing operations.

    This enumeration defines two possible write modes for data writing operations
    in the context of a DeltaWriter. The write modes control how data is handled
    when writing to a storage location that already contains data.

    Attributes
    ----------
    OVERWRITE : str
        Specifies the overwrite write mode. If data already exists in the target
        storage location, it will be replaced by the new data being written.
    ERROR : str
        Specifies the error write mode. If data already exists in the target storage
        location, an error will be raised, preventing data overwrite.
    APPEND : str
        Specifies the append write mode. If data already exists in the target storage
        location, the new data will be appended to the existing data.
    """

    OVERWRITE = "overwrite"
    ERROR = "error"
    APPEND = "append"

WriterMixin

Mixin class for writer classes.

Methods:

Name Description
_get_write_target

Get the write target (path or table name) for the specific writer.

_create_if_not_exists

Abstract method to be implemented by subclasses for creating a table or schema if it does not exist.

_unique_indexer

Create a unique index column in the DataFrame for data merging when writing to storage.

_ensure_unique_index

Check and optionally create a unique index column in the DataFrame.

_write_helper

Handle data writing with specified mode and optional settings shared across methods.

Source code in amee_utils/writer.py
class WriterMixin:
    """Mixin class for writer classes.

    Methods
    -------
    _get_write_target() -> str
        Get the write target (path or table name) for the specific writer.
    _create_if_not_exists() -> None
        Abstract method to be implemented by subclasses for creating a table or schema if it does not exist.
    _unique_indexer() -> DataFrame
        Create a unique index column in the DataFrame for data merging when writing to storage.
    _ensure_unique_index() -> DataFrame
        Check and optionally create a unique index column in the DataFrame.
    _write_helper() -> None
        Handle data writing with specified mode and optional settings shared across methods.
    """

    @abstractmethod
    def _get_write_target(self, table_name: str, path: Optional[Path] = None) -> str:
        """Get the write target (path or table name) for the specific writer."""
        raise NotImplementedError("Subclass must implement abstract method")

    @abstractmethod
    def _create_if_not_exists(self, *args, **kwargs) -> None:
        """Abstract method to be implemented by subclasses for creating a table or schema if it does not exist."""
        raise NotImplementedError("Subclass must implement abstract method")

    @staticmethod
    def _unique_indexer(
        df: DataFrame,
        id_column_name: str,
        unique_identifiers: List[str],
    ) -> DataFrame:
        """Create a unique index column in the DataFrame for data merging when writing to storage.

        This method generates a unique index column within the DataFrame using the provided
        `id_column_name` and a list of `unique_identifiers` column names. The unique index
        is created by concatenating the values of the specified columns and hashing them.

        Parameters
        ----------
        df : DataFrame
            The DataFrame to which the unique index column will be added.
        id_column_name : str
            The name of the column to store the unique index.
        unique_identifiers : List[str]
            A list of column names to be concatenated for generating the unique index.

        Returns
        -------
        df_output_idx : DataFrame
            The input DataFrame with the unique index column added.
        """
        concat_columns = [F.col(column_name) for column_name in unique_identifiers]

        df_output_idx = df.withColumn(
            id_column_name,
            F.sha2(
                F.concat_ws("||", *[F.coalesce(col, F.lit("null")) for col in concat_columns]),
                256,
            ),
        )

        return df_output_idx

    @classmethod
    def _ensure_unique_index(
        cls,
        df: DataFrame,
        unique_index: bool,
        id_column_name: Optional[str],
        unique_identifiers: Optional[List[str]],
    ) -> DataFrame:
        """Check and optionally create a unique index column in the DataFrame.

        This method checks whether a unique index column should be created in the given DataFrame,
        based on the provided parameters. If `unique_index` is True, a unique index column will be
        added using the specified `id_column_name` and `unique_identifiers` for generating the index.

        Parameters
        ----------
        df : DataFrame
            The DataFrame to potentially add the unique index column to.
        unique_index : bool
            If True, a unique index column will be added to the DataFrame.
        id_column_name : str, optional
            The name of the column to store the unique index.
        unique_identifiers : List[str], optional
            A list of column names to be concatenated for generating the unique index.

        Returns
        -------
        DataFrame
            The DataFrame with or without the unique index column.

        Raises
        ------
        ValueError
            If unique_index is True but id_column_name or unique_identifiers are missing.
        """
        if not unique_index:
            return df

        if not id_column_name or not unique_identifiers:
            raise ValueError("id_column_name and unique_identifiers are required when unique_index is True.")

        return cls._unique_indexer(
            df=df,
            id_column_name=id_column_name,
            unique_identifiers=unique_identifiers,
        )

    def _write_helper(
        self,
        df: DataFrame,
        table_name: str,
        mode: WriteMode,
        save_as_table: bool,
        unique_index: bool = False,
        path: Optional[Path] = None,
        id_column_name: Optional[str] = None,
        unique_identifiers: Optional[List[str]] = None,
        partition_col: Optional[Union[str, List[str]]] = None,
    ) -> None:
        """Handle data writing with specified mode and optional settings shared across methods.

        Parameters
        ----------
        df : DataFrame
            The DataFrame to be saved.
        table_name : str
            The name of the table inside the container where the data will be written.
        mode : WriteMode
            The write mode to use when writing the data.
        save_as_table : bool
            True for UCWriter, False for DeltaWriter.
        unique_index : bool, optional
            If True, a unique index column will be added to the DataFrame.
        path : Path, optional
            The path where the Delta table will be created or updated.
        id_column_name : str, optional
            The name of the column to store the unique index.
        unique_identifiers : List[str], optional
            A list of column names to be concatenated for generating the unique index.
        partition_col : str, optional
            The name of the column to partition the data by.

        Returns
        -------
        None
        """
        data = self._ensure_unique_index(
            df=df,
            unique_index=unique_index,
            id_column_name=id_column_name,
            unique_identifiers=unique_identifiers,
        )

        _write_func = data.write.format("delta").mode(mode.value)
        if partition_col:
            _write_func = _write_func.partitionBy(partition_col)

        if save_as_table:
            self._create_if_not_exists()
            write_target = self._get_write_target(table_name)
            _write_func.saveAsTable(write_target)
        else:
            write_target = self._get_write_target(table_name, path)
            _write_func.save(write_target)
            self._create_if_not_exists(write_target, table_name)

        logger.info(f"Saved data to {write_target}")