Skip to content

DeltaWriter

Import and Initialise

from amee_utils.writer import DeltaWriter

container_name = "schema_container_name"
table_name = "my_table"

delta_writer = DeltaWriter(
    spark=spark,
    container_name=container_name,
)

A Note on Partitioning

The write and overwrite functions of the writer allow for partitioning of the Delta Table through the partition_col kwarg:

delta_writer.write(
    df=data,
    path=path,
    table_name=table_name,
    partition_col="column_name",
)

Partitioning should be used when the expected data is intended to grow.

Example

container_name
The name of the container you want to write to. An example could be container_name='amee_aa_rw_20_preprocessed'.
table_name
The name of the table you want to create or update. An example could be table_name='example_table'.
from amee_utils.writer import DeltaWriter

container_name = "amee_aa_rw_20_preprocessed"
table_name = "example_table"

delta_writer = DeltaWriter(
    spark=spark,
    container_name=container_name,
)

In this example, you would be initialising the DeltaWriter to perform actions to tables inside the amee_aa_rw_20_preprocessed container

example_table

DeltaWriter.write()

Without Unique Index

# Create a sample DataFrame for testing
data = spark.createDataFrame(
    [
        (1, "John", "M", "2023-08-16"),
        (2, "Alice", "F", "2023-08-16")
    ],
    ["id", "name", "gender", "date"],
)
path = "path/to/save/data"

# Use write method to save the DataFrame
delta_writer.write(
    df=data,
    path=path,
    table_name=table_name,
)

Example

data
The PySpark DataFrame you want to save to a Delta table.
path
The path where the data will be saved. An example could be path='/mnt/amee-aa/rw/preprocessed/'.
table_name
The name of the table you want to create or update. An example could be table_name='example_table'.

# Create a sample DataFrame for testing
data = spark.createDataFrame(
    [
        (1, "John", "M", "2023-08-16"),
        (2, "Alice", "F", "2023-08-16")
    ],
    ["id", "name", "gender", "date"],
)
path = "/mnt/amee-aa/rw/preprocessed/"

# Use write method to save the DataFrame
delta_writer.write(
    df=data,
    path=path,
    table_name=table_name,
)
If the Delta table does not already exist, this will save the data at the location specified by path and the data will be surfaced as a Delta table.

example_table_overview

example_table_sample_data

With Unique Index

# Create a sample DataFrame for testing
data = spark.createDataFrame(
    [
        (1, "John", "M", "2023-08-16"),
        (2, "Alice", "F", "2023-08-16")
    ],
    ["id", "name", "gender", "date"],
)
path = "path/to/save/data"

# Use write method to save the DataFrame
delta_writer.write(
    df=data,
    path=path,
    table_name=table_name,
    unique_index=True,
    id_column_name="unique_id",
    unique_identifiers=["date", "id", "name"],
)

Example

data
The PySpark DataFrame you want to save to a Delta table.
path
The path where the data will be saved. An example could be path='/mnt/amee-aa/rw/preprocessed/'.
table_name
The name of the table you want to create or update. An example could be table_name='example_table'.
unique_index
Bool [True]. This indicates that you would like the Delta table to contain a unique identifier column.
id_column_name
This is the name for your unique identifier column. An example could be id_column_name="unique_id"
unique_identifiers
This is a list of columns you would like to use to generate the unique value with. An example could be unique_identifiers=["name", "age"]. This concatenates the values for each row in the "name" and "age" columns using the separator ||, and then creates a unique hash for that row using sha256.

# Create a sample DataFrame for testing
data = spark.createDataFrame(
    [
        (1, "John", "M", "2023-08-16"),
        (2, "Alice", "F", "2023-08-16")
    ],
    ["id", "name", "gender", "date"],
)
path = "/mnt/amee-aa/rw/preprocessed/"
unique_index=True
id_column_name="unique_id"
unique_identifiers=["date", "id", "name"]

# Use write method to save the DataFrame
delta_writer.write(
    df=data,
    path=path,
    table_name=table_name,
    unique_index=unique_index,
    id_column_name=id_column_name,
    unique_identifiers=unique_identifiers,
)
If the Delta table does not already exist, this will save the data at the location specified by path and the data will be surfaced as a Delta table. This table will now include a column named unique_id.

example_table_unique_index_overview

example_table_unique_index_sample_data

DeltaWriter.overwrite()

# Create a sample DataFrame for testing
updated_data = spark.createDataFrame(
    [
        (3, "Eve", "F", "2023-08-18"),
        (4, "Bob", "M", "2023-08-18")
    ],
    ["id", "name", "gender", "date"],
)
path = "path/to/save/data"

# Use write method to save the DataFrame
delta_writer.overwrite(
    df=data,
    path=path,
    table_name=table_name,
)

Example

updated_data
The new PySpark DataFrame you want to save to a Delta table. This will overwrite the existing data in that table.
path
The path where the data will be saved. An example could be path='/mnt/amee-aa/rw/preprocessed/'.
table_name
The name of the table you want to create or update. An example could be table_name='example_table'.

From the first example, we wrote data to example_table, and the Delta table looked like this

example_table_overview

example_table_sample_data

After using the overwrite() method, the data in the table now looks like this

example_table_overview

example_table_overwrite_sample_data

DeltaWriter.upsert()

# Use upsert method to update or insert new data into the existing Delta table
updated_data = spark.createDataFrame(
    [
        (4, "Sam", "M", "2023-08-19")
    ],
    ["id", "name", "gender", "date"],
)
delta_writer.upsert(
    df=updated_data,
    path=path,
    table_name=table_name,
    id_column_name="unique_id",
    unique_identifiers=["date", "id", "name"],
)

Example

updated_data
The new PySpark DataFrame you want to upsert to the Delta table. This will update the existing data in that table by inserting new rows or updating rows where the unique index matches.
path
The path where the existing data is be saved.
table_name
The name of the table you want to create or update. An example could be table_name='example_table'.
updated_data = spark.createDataFrame(
    [
        (2, "Alice", "M", "2023-08-16"),
        (3, "Eve", "F", "2023-08-18"),
        (4, "Bob", "M", "2023-08-18")
    ],
    ["id", "name", "gender", "date"],
)

path = "/mnt/amee-aa/rw/preprocessed/"
id_column_name="unique_id"
unique_identifiers=["date", "id", "name"]

delta_writer.upsert(
    df=updated_data,
    path=path,
    table_name=table_name,
    id_column_name=id_column_name,
    unique_identifiers=unique_identifiers,
)

From the example where a Delta table was created with a unique index, we can then update that table and insert new data with the upsert() method. The original table looked like this

example_table_unique_index_sample_data

After using upsert() to update and insert the updated_data, the table now looks like this

example_table_upsert_sample_data

Note that the rows for Eve and Bob were inserted, and the row for Alice was updated.