Skip to content

UCWriter

Import and Initialise

from amee_utils.writer import UCWriter

catalog_name = "heiaepgah01pwe01"
schema_name = "uc_writer_test"

uc_writer = UCWriter(
    spark=spark,
    catalog_name=catalog_name,
    schema_name=schema_name
)

A Note on Partitioning

The write and overwrite functions of the writer allow for partitioning of the Delta Table through the partition_col kwarg:

uc_writer.write(
    df=data,
    table_name=table_name,
    partition_col="column_name",
)

Partitioning should be used when the expected data is intended to grow.

Example

catalog_name
The name of the catalog you want to write to. An example could be catalog_name='heiaepgah01pwe01'.
schema_name
The name of the schema you want to create or update. An example could be schema_name='uc_writer_test'.
from amee_utils.writer import UCWriter

catalog_name = "heiaepgah01pwe01"
schema_name = "uc_writer_test"

uc_writer = UCWriter(
    spark=spark,
    catalog_name=catalog_name,
    schema_name=schema_name
)

In this example, you would be initialising the UCWriter to perform actions on tables inside the uc_writer_test schema of the heiaepgah01pwe01 catalog.

uc_new_schema

UCWriter.write()

Without Unique Index

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema for the DataFrame
df_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Create a sample DataFrame with random names and ages
data = [("John", 28), ("Alice", 24), ("Bob", 35), ("Eve", 40), ("Sam", 30)]
df = spark.createDataFrame(data, df_schema)

table_name = "test_table"

# Use write method to save the DataFrame
uc_writer.write(
    df=df,
    table_name=table_name,
)

Example

df
The PySpark DataFrame you want to save to a Unity Catalog table.
table_name

The name of the table you want to create or update.

An example could be table_name='test_table'.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema for the DataFrame
df_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Create a sample DataFrame with random names and ages
data = [("John", 28), ("Alice", 24), ("Bob", 35), ("Eve", 40), ("Sam", 30)]
df = spark.createDataFrame(data, df_schema)

table_name = "test_table"

# Use write method to save the DataFrame
uc_writer.write(
    df=df,
    table_name=table_name,
)

If the Unity Catalog table does not already exist, this will create a new table with the specified name in the schema and catalog defined during initialisation.

uc_write

With Unique Index

# Create a sample DataFrame for testing
data = [("John", 28), ("Alice", 24), ("Bob", 35), ("Eve", 40), ("Sam", 30)]
df = spark.createDataFrame(data, df_schema)

# Use write method to save the DataFrame with a unique index
uc_writer.write(
    df=df,
    table_name="test_table_with_unique_index",
    unique_index=True,
    id_column_name="unique_id",
    unique_identifiers=["name", "age"],
)

Example

df
The PySpark DataFrame you want to save to a Unity Catalog table.
table_name
The name of the table you want to create or update.

An example could be table_name='test_table_with_unique_index'.

unique_index
Bool [True]. This indicates that you would like the Unity Catalog table to contain a unique identifier column.
id_column_name
This is the name for your unique identifier column. An example could be id_column_name="unique_id".
unique_identifiers
This is a list of columns you would like to use to generate the unique value with. An example could be unique_identifiers=["name", "age"]. This concatenates the values for each row in the "name" and "age" columns using the separator ||, and then creates a unique hash for that row using sha256.
# Create a sample DataFrame for testing
data = [("John", 28), ("Alice", 24), ("Bob", 35), ("Eve", 40), ("Sam", 30)]
df = spark.createDataFrame(data, df_schema)

# Use write method to save the DataFrame with a unique index
uc_writer.write(
    df=df,
    table_name="test_table_with_unique_index",
    unique_index=True,
    id_column_name="unique_id",
    unique_identifiers=["name", "age"],
)

If the Unity Catalog table does not already exist, this will create a new table with the specified name in the schema and catalog defined during initialisation. This table will now include a column named unique_id.

uc_unique_index

UCWriter.overwrite()

# Sample DataFrame for overwrite (completely new data)
overwrite_data = [("Lucy", 28), ("Mike", 38), ("Emma", 33)]
overwrite_df = spark.createDataFrame(overwrite_data, df_schema)

# Overwrite the existing table with new data
uc_writer.overwrite(
    df=overwrite_df,
    table_name=table_name
)

Example

overwrite_df
The new PySpark DataFrame you want to save to a Unity Catalog table. This will overwrite the existing data in that table.
table_name
The name of the table you want to overwrite.

An example could be table_name='test_table'.

From the first example, we wrote data to test_table. After using the overwrite() method, the data in the table will be completely replaced with the new data.

uc_overwrite

UCWriter.upsert()

# Sample DataFrame for upsert (some existing, some new)
upsert_data = [("John", 29), ("Alice", 24), ("Sam", 32), ("Tom", 40)]
upsert_df = spark.createDataFrame(upsert_data, df_schema)

# Upsert data into the table using the 'name' and 'age' combination as the unique identifier
uc_writer.upsert(
    df=upsert_df,
    table_name="test_table_with_unique_index",
    id_column_name="unique_id",
    unique_identifiers=["name", "age"]
)

Example

upsert_df
The new PySpark DataFrame you want to upsert to the Unity Catalog table. This will update the existing data in that table by inserting new rows or updating rows where the unique index matches.
table_name

The name of the table you want to upsert data into.

An example could be table_name='test_table_with_unique_index'.

id_column_name
The name of the unique identifier column created when the table was written with a unique index.
unique_identifiers
The list of columns used to generate the unique value. This should match the columns used when creating the table with a unique index.

From the example where a Unity Catalog table was created with a unique index, we can then update that table and insert new data with the upsert() method.

uc_upsert

Note that the rows for John and Sam were updated with their new ages, the row for Alice remained unchanged, and a new row for Tom was inserted.