DeltaWriter
Import and Initialise
from amee_utils.writer import DeltaWriter
container_name = "schema_container_name"
table_name = "my_table"
delta_writer = DeltaWriter(
spark=spark,
container_name=container_name,
)
A Note on Partitioning
The write and overwrite functions of the writer allow for partitioning of the Delta Table through the partition_col kwarg:
Partitioning should be used when the expected data is intended to grow.
Example
container_name- The name of the container you want to write to.
An example could be
container_name='amee_aa_rw_20_preprocessed'. table_name- The name of the table you want to create or update.
An example could be
table_name='example_table'.
from amee_utils.writer import DeltaWriter
container_name = "amee_aa_rw_20_preprocessed"
table_name = "example_table"
delta_writer = DeltaWriter(
spark=spark,
container_name=container_name,
)
In this example, you would be initialising the DeltaWriter to perform actions to tables inside the amee_aa_rw_20_preprocessed container

DeltaWriter.write()
Without Unique Index
# Create a sample DataFrame for testing
data = spark.createDataFrame(
[
(1, "John", "M", "2023-08-16"),
(2, "Alice", "F", "2023-08-16")
],
["id", "name", "gender", "date"],
)
path = "path/to/save/data"
# Use write method to save the DataFrame
delta_writer.write(
df=data,
path=path,
table_name=table_name,
)
Example
data- The PySpark DataFrame you want to save to a Delta table.
path- The path where the data will be saved.
An example could be
path='/mnt/amee-aa/rw/preprocessed/'. table_name- The name of the table you want to create or update.
An example could be
table_name='example_table'.
# Create a sample DataFrame for testing
data = spark.createDataFrame(
[
(1, "John", "M", "2023-08-16"),
(2, "Alice", "F", "2023-08-16")
],
["id", "name", "gender", "date"],
)
path = "/mnt/amee-aa/rw/preprocessed/"
# Use write method to save the DataFrame
delta_writer.write(
df=data,
path=path,
table_name=table_name,
)
path and the data will be surfaced as a Delta table.


With Unique Index
# Create a sample DataFrame for testing
data = spark.createDataFrame(
[
(1, "John", "M", "2023-08-16"),
(2, "Alice", "F", "2023-08-16")
],
["id", "name", "gender", "date"],
)
path = "path/to/save/data"
# Use write method to save the DataFrame
delta_writer.write(
df=data,
path=path,
table_name=table_name,
unique_index=True,
id_column_name="unique_id",
unique_identifiers=["date", "id", "name"],
)
Example
data- The PySpark DataFrame you want to save to a Delta table.
path- The path where the data will be saved.
An example could be
path='/mnt/amee-aa/rw/preprocessed/'. table_name- The name of the table you want to create or update.
An example could be
table_name='example_table'. unique_index- Bool [True]. This indicates that you would like the Delta table to contain a unique identifier column.
id_column_name- This is the name for your unique identifier column. An example could be
id_column_name="unique_id" unique_identifiers- This is a list of columns you would like to use to generate the unique value with. An example could be
unique_identifiers=["name", "age"]. This concatenates the values for each row in the"name"and"age"columns using the separator||, and then creates a unique hash for that row using sha256.
# Create a sample DataFrame for testing
data = spark.createDataFrame(
[
(1, "John", "M", "2023-08-16"),
(2, "Alice", "F", "2023-08-16")
],
["id", "name", "gender", "date"],
)
path = "/mnt/amee-aa/rw/preprocessed/"
unique_index=True
id_column_name="unique_id"
unique_identifiers=["date", "id", "name"]
# Use write method to save the DataFrame
delta_writer.write(
df=data,
path=path,
table_name=table_name,
unique_index=unique_index,
id_column_name=id_column_name,
unique_identifiers=unique_identifiers,
)
path and the data will be surfaced as a Delta table. This table will now include a column named unique_id.


DeltaWriter.overwrite()
# Create a sample DataFrame for testing
updated_data = spark.createDataFrame(
[
(3, "Eve", "F", "2023-08-18"),
(4, "Bob", "M", "2023-08-18")
],
["id", "name", "gender", "date"],
)
path = "path/to/save/data"
# Use write method to save the DataFrame
delta_writer.overwrite(
df=data,
path=path,
table_name=table_name,
)
Example
updated_data- The new PySpark DataFrame you want to save to a Delta table. This will overwrite the existing data in that table.
path- The path where the data will be saved.
An example could be
path='/mnt/amee-aa/rw/preprocessed/'. table_name- The name of the table you want to create or update.
An example could be
table_name='example_table'.
From the first example, we wrote data to example_table, and the Delta table looked like this


After using the overwrite() method, the data in the table now looks like this


DeltaWriter.upsert()
# Use upsert method to update or insert new data into the existing Delta table
updated_data = spark.createDataFrame(
[
(4, "Sam", "M", "2023-08-19")
],
["id", "name", "gender", "date"],
)
delta_writer.upsert(
df=updated_data,
path=path,
table_name=table_name,
id_column_name="unique_id",
unique_identifiers=["date", "id", "name"],
)
Example
updated_data- The new PySpark DataFrame you want to upsert to the Delta table. This will update the existing data in that table by inserting new rows or updating rows where the unique index matches.
path- The path where the existing data is be saved.
table_name- The name of the table you want to create or update.
An example could be
table_name='example_table'.
updated_data = spark.createDataFrame(
[
(2, "Alice", "M", "2023-08-16"),
(3, "Eve", "F", "2023-08-18"),
(4, "Bob", "M", "2023-08-18")
],
["id", "name", "gender", "date"],
)
path = "/mnt/amee-aa/rw/preprocessed/"
id_column_name="unique_id"
unique_identifiers=["date", "id", "name"]
delta_writer.upsert(
df=updated_data,
path=path,
table_name=table_name,
id_column_name=id_column_name,
unique_identifiers=unique_identifiers,
)
From the example where a Delta table was created with a unique index, we can then update that table and insert new data with the upsert() method. The original table looked like this

After using upsert() to update and insert the updated_data, the table now looks like this

Note that the rows for Eve and Bob were inserted, and the row for Alice was updated.