Delta Lake is an open-source storage layer for Apache Spark and big data applications. It enhances existing data lakes by adding essential features data versioning, schema enforcement, and data lineage. These features help data reliability and quality, making it easier to build robust data pipelines. Below are some common Delta Lake commands in PySpark.

Delta Lake Queries
Photo by Christina Morillo on Pexels.com

Table of contents

  1. Creation of a Delta Table
  2. Read Data from a Delta Table
  3. Writes to a Delta Table
  4. Upsert (Merge) Data into a Delta Table
  5. Delete Data from a Delta Table
  6. Update Data in a Delta Table
  7. Vacuum files
  8. Describe Delta table
  9. Optimize Delta table
  10. Convert Parquet files to Delta table

Creation of a Delta Table

PySpark

from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Create Delta Table") \
.getOrCreate()
# Assume you have a DataFrame df containing your data
# Write the DataFrame to a Delta table
df.write.format("delta").save("path/to/delta_table")
# Optionally, you can also explicitly convert an existing Parquet table to Delta format
# df.write.format("delta").saveAsTable("delta_table")

Managed delta table

# Register the DataFrame as a temporary view
df.createOrReplaceTempView("your_dataframe_view")

# Use spark.sql to create or replace the Delta table
spark.sql("""
    CREATE OR REPLACE TABLE delta_table
    USING DELTA
    LOCATION 'path/to/delta_table'
    AS SELECT * FROM your_dataframe_view
""")

Spark SQL code

# Register the DataFrame as a temporary view
df.createOrReplaceTempView("your_dataframe_view")

# Use Spark SQL to create a Delta table from the temporary view
spark.sql("""
    CREATE TABLE delta_table
    USING DELTA
    AS SELECT * FROM your_dataframe_view
""")

Read Data from a Delta Table

Pyspark

from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Read Delta Table") \
.getOrCreate()
# Read data from a Delta table
df = spark.read.format("delta").load("path/to/delta_table")
# Show the DataFrame
df.show()

Reading Delta table: Spark SQL

# Use Spark SQL to read data from the managed Delta table
df_sql = spark.sql("SELECT * FROM delta_table")

Writes to a Delta Table

PySpark

from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Write to Delta Table") \
.getOrCreate()
# Assume you have a DataFrame df containing your data
# Write the DataFrame to a Delta table
df.write.format("delta").mode("append").save("path/to/delta_table")

Spark SQL

# Register the DataFrame as a temporary view
df.createOrReplaceTempView("temp_view")
Step 2: Use Spark SQL to Append Data to the Delta Table
Now, you can use an INSERT INTO SQL command to append data to the existing Delta table.

# Use Spark SQL to append data to the existing Delta table
spark.sql("""
    INSERT INTO delta.`path/to/delta_table`
    SELECT * FROM temp_view
""")

Upsert (Merge) Data into a Delta Table

PySpark

from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Upsert into Delta Table") \
.getOrCreate()
# Sample data for demonstration
data = [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie')]
data_updates = [(1, 'Alice Updated'), (4, 'David')]

# Create DataFrames from the sample data
df_existing = spark.createDataFrame(data, ["id", "name"])
df_updates = spark.createDataFrame(data_updates, ["id", "name"])

# Path to Delta table
delta_table_path = "path/to/delta_table"
# Write the existing DataFrame to Delta table
df_existing.write.format("delta").save(delta_table_path)

# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)

# Perform upsert (merge) operation
deltaTable.alias("target") \
.merge(df_updates.alias("source"), "target.id = source.id") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

# Show the updated Delta table
deltaTable_updated = spark.read.format("delta").load(delta_table_path)
deltaTable_updated.show()
# Optionally, you can optimize the Delta table after the upsert operation
deltaTable_updated.optimize()
# Stop the SparkSession
spark.stop()

Spark SQL code

Step 1: Write the Existing DataFrame to the Delta Table
You have already written the df_existing DataFrame to the Delta table using PySpark, so you can skip this step if the Delta table is already created.

Step 2: Create a Temporary View from the Updates DataFrame
Register the df_updates DataFrame as a temporary view to use in the SQL query:

# Register the updates DataFrame as a temporary view
df_updates.createOrReplaceTempView("updates_view")
Step 3: Use the MERGE INTO Statement for Upsert
Now, perform the upsert operation using SQL:

# Perform the upsert operation using Spark SQL
spark.sql(f"""
    MERGE INTO delta.`{delta_table_path}` AS target
    USING updates_view AS source
    ON target.id = source.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")
Step 4: Show the Updated Delta Table (Optional)
You can load and show the updated Delta table to verify the changes:

# Show the updated Delta table
spark.sql(f"SELECT * FROM delta.`{delta_table_path}`").show()

Delete Data from a Delta Table

PySpark

from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Delete from Delta Table") \
.getOrCreate()
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)
# Define the condition for data deletion
condition = "date < '2023-01-01'" # Adjust the condition as per your requirements

# Delete data from the Delta table based on the condition
deltaTable.delete(condition)
# Optionally, you can also vacuum the Delta table after the delete operation
deltaTable.vacuum()

# Stop the SparkSession
spark.stop()

Spark SQL code

Step 1: Delete from the Delta Table Using Spark SQL
You can use a SQL DELETE statement to remove data from the Delta table based on a condition.

# Use Spark SQL to delete data from the Delta table
spark.sql(f"""
    DELETE FROM delta.`{delta_table_path}`
    WHERE date < '2023-01-01'
""")
Step 2: Vacuum the Delta Table (Optional)
If you want to clean up old files and free up storage space, use the VACUUM command in Spark SQL:

# Optionally, vacuum the Delta table to clean up old data
spark.sql(f"VACUUM delta.`{delta_table_path}`")

Update Data in a Delta Table

PySpark

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
.appName("Update Delta Table") \
.getOrCreate()
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)
# Sample data for demonstration
data_updates = [(1, 'Alice Updated'), (3, 'Charlie Updated')]
# Create DataFrame from the sample data
df_updates = spark.createDataFrame(data_updates, ["id", "name"])

# Perform the update operation
deltaTable.alias("target") \
.merge(df_updates.alias("source"), "target.id = source.id") \
.whenMatchedUpdate(set={"name": "source.name"}) \
.execute()

# Show the updated Delta table
deltaTable_updated = spark.read.format("delta").load(delta_table_path)
deltaTable_updated.show()
# Stop the SparkSession
spark.stop()

Spark SQL code

Step 1: Create a Temporary View for the Updates DataFrame
First, register the df_updates DataFrame as a temporary view so you can reference it in SQL:

# Register the updates DataFrame as a temporary view
df_updates.createOrReplaceTempView("updates_view")
Step 2: Perform the Update Operation Using MERGE INTO
You can use the MERGE INTO SQL statement to perform the update operation:

# Use Spark SQL to update the Delta table
spark.sql(f"""
    MERGE INTO delta.`{delta_table_path}` AS target
    USING updates_view AS source
    ON target.id = source.id
    WHEN MATCHED THEN 
        UPDATE SET target.name = source.name
""")

Vacuum files

PySpark

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
.appName("Vacuum Delta Table") \
.getOrCreate()

# Path to Delta table
delta_table_path = "path/to/delta_table"

# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)

# Vacuum files in the Delta table
deltaTable.vacuum()
# Stop the SparkSession
spark.stop()

Spark SQL Code

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Vacuum Delta Table") \
    .getOrCreate()

# Path to Delta table
delta_table_path = "path/to/delta_table"

# Perform the vacuum operation using Spark SQL
spark.sql(f"VACUUM delta.`{delta_table_path}`")

# Stop the SparkSession
spark.stop()

Describe Delta table

PySpark

from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Describe Delta Table") \
.getOrCreate()
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)
# Describe the Delta table
deltaTable.describe().show()
# Stop the SparkSession
spark.stop()

Spark SQL code

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Describe Delta Table") \
    .getOrCreate()

# Path to Delta table
delta_table_path = "path/to/delta_table"

# Register the Delta table as a temporary view
spark.read.format("delta").load(delta_table_path).createOrReplaceTempView("delta_table_view")

# Describe the Delta table using Spark SQL
spark.sql("DESCRIBE delta_table_view").show()

# Stop the SparkSession
spark.stop()

Optimize Delta table

from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Optimize Delta Table") \
.getOrCreate()

# Path to Delta table
delta_table_path = "path/to/delta_table"

# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)
# Optimize the Delta table
deltaTable.optimize()
# Stop the SparkSession
spark.stop()

Spark SQL code

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Optimize Delta Table") \
    .getOrCreate()

# Path to Delta table
delta_table_path = "path/to/delta_table"

# Perform the optimize operation using Spark SQL
spark.sql(f"OPTIMIZE delta.`{delta_table_path}`")

# Stop the SparkSession
spark.stop()

Convert Parquet files to Delta table

PySpark

from pyspark.sql import SparkSession
from delta.tables import DeltaTable

# Create a SparkSession
spark = SparkSession.builder \
.appName("Convert Parquet to Delta Table") \
.getOrCreate()
# Path to Parquet files
parquet_path = "path/to/parquet_files"
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Convert Parquet files to Delta format
deltaTable = DeltaTable.convertToDelta(spark, parquet_path)
# Optionally, you can specify the location where you want to save the Delta table
# deltaTable.write.format("delta").save(delta_table_path)
# Stop the SparkSession
spark.stop()

Spark SQL code

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Convert Parquet to Delta Table") \
    .getOrCreate()

# Path to Parquet files
parquet_path = "path/to/parquet_files"

# Path to Delta table
delta_table_path = "path/to/delta_table"

# Load Parquet data into a temporary view
spark.read.format("parquet").load(parquet_path).createOrReplaceTempView("parquet_view")

# Write the Parquet data as a Delta table
spark.sql(f"""
    CREATE TABLE delta.`{delta_table_path}`
    USING DELTA
    AS SELECT * FROM parquet_view
""")

# Stop the SparkSession
spark.stop()

These are just a few examples of Delta Lake commands. Delta Lake provides several other functionalities and commands for managing data in data lakes efficiently and reliably. You can find more information in the Delta Lake documentation.